In [22]:
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import udf, avg, col,lit,call_udf,countDistinct,sproc,udf
from snowflake.snowpark.types import IntegerType, FloatType, StringType, BooleanType
import pandas as pd
from config import snowflake_conn_prop_local as snowflake_conn_prop
import sys
import json
import platform
import os,requests
from pathlib import Path
import glob


from snowflake.snowpark import version
print(version.VERSION)
session = Session.builder.configs(snowflake_conn_prop).create()

print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

(0, 7, 0)
[Row(CURRENT_WAREHOUSE()='CLUSTER1', CURRENT_DATABASE()='NYCTAXI', CURRENT_SCHEMA()='TAXI')]


### Initialize

- Here we are downloading the GE 15.14 version from the Pypi to theb local folder on the machine where this code is running.
- This has been tested with GE 15.14. There was some issue with 15.17 where its trying to look for the package ipywidgets which is found in Snowflake anaconda channel but not found in information_schema.packages 


In [23]:
# Current working directory is the Project Home DIR.
PROJECT_HOME_DIR = '.'
LOCAL_TEMP_DIR = os.path.join(PROJECT_HOME_DIR, 'temp') 
LOCAL_LIB_DIR = os.path.join(LOCAL_TEMP_DIR, 'libs')
LOCAL_TARFile_DIR = os.path.join(LOCAL_TEMP_DIR, 'tarfiles')
LIB_URLS = [
    #'https://files.pythonhosted.org/packages/9f/57/1539d783553f3d67cea1b55d7fe494373c5c0c9af689d4c0e0c2d3197739/great_expectations-0.15.17-py3-none-any.whl'
    'https://files.pythonhosted.org/packages/8e/9d/cecb12289f7967b15facf550a0bbb9c1e910968c3a61b91fd8cdb80aeb3c/great_expectations-0.15.14.tar.gz'
    
]

for lib_url in LIB_URLS:
    # get the file name, from the url
    splits = lib_url.split('/')
    tot_splits = len(splits)
    target_file = splits[-1]
    
    local_lib_fl = f'{LOCAL_TARFile_DIR}/{target_file}'
    print(local_lib_fl)

    # Create a local directory for TAR and extracting tar..
    Path(LOCAL_TARFile_DIR).mkdir(parents=True, exist_ok=True)
    print(f'Create local dir: {LOCAL_TARFile_DIR}')

    Path(LOCAL_LIB_DIR).mkdir(parents=True, exist_ok=True)
    print(f'Create local dir: {LOCAL_LIB_DIR}')

    print(f'Downloading library from PyPI to {LOCAL_TARFile_DIR} ...')
    with open(local_lib_fl, "wb") as f:
        r = requests.get(lib_url)
        f.write(r.content)

        
# Extract GE tar file

import tarfile
file = tarfile.open(local_lib_fl)
print(f'Started Extracting GE tar file to {LOCAL_TARFile_DIR} ...')
file.extractall(f'{LOCAL_LIB_DIR}/ge')
file.close()
print(f'Done extracting GE tar file to {LOCAL_TARFile_DIR} ...')
                                    

./temp/tarfiles/great_expectations-0.15.14.tar.gz
Create local dir: ./temp/tarfiles
Create local dir: ./temp/libs
Downloading library from PyPI to ./temp/tarfiles ...
Started Extracting GE tar file to ./temp/tarfiles ...
Done extracting GE tar file to ./temp/tarfiles ...


In [24]:
# Getting the path for the great_expectation folder after the tar file is extracted. This path is used in the session.add_imports()

import glob
ge_import_path=''
for result in glob.iglob('./temp/libs/ge/great_expectations*'):
    ge_import_path=result+'/great_expectations'
print(ge_import_path)


./temp/libs/ge/great_expectations-0.15.14/great_expectations


### Creating Python Stored Procedure

In [27]:
        
from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig, S3StoreBackendDefaults
from config import snowflake_conn_prop_local as snowflake_udf_conn_prop
from great_expectations.core.batch import BatchRequest, RuntimeBatchRequest
from great_expectations.data_context import BaseDataContext
from snowflake.snowpark.types import IntegerType, StringType, StructField,VariantType,StructType,BooleanType
from great_expectations.checkpoint.types.checkpoint_result import CheckpointResult
from great_expectations.data_context import DataContext
from config import snowflake_conn_prop_local as snowflake_conn_prop
from great_expectations.data_context import BaseDataContext
from great_expectations.data_context.types.base import DataContextConfig, DatasourceConfig, FilesystemStoreBackendDefaults
from great_expectations.checkpoint import Checkpoint

import json
import os


session.sql("create or replace stage phani_greatexpectation").collect()
session.clear_packages()
session.add_packages('pandas','pycryptodomex','boto3','tzlocal','tqdm','requests','ruamel.yaml','ipython','jsonpatch','mistune','jinja2','jsonschema','scipy','altair','Click','colorama','cryptography','snowflake-snowpark-python','sqlalchemy','chardet','asn1crypto')
session.clear_imports()
session.add_import(ge_import_path)
# session.add_import('jwt')

@sproc(session=session,name="usp_generateValidationResults", replace=True, return_type=StringType(), is_permanent=True, stage_location='@phani_greatexpectation/ge_AllLibs')
def generateValidationResults(session: Session) -> str:
    
    from pathlib import Path
    import os ,sys ,json ,tarfile
    
    data_context_config = DataContextConfig(
    datasources={
        "dataframe_datasource": DatasourceConfig(
            class_name="PandasDatasource",
            batch_kwargs_generators={
                "subdir_reader": {
                    "class_name": "SubdirReaderBatchKwargsGenerator",
                    "base_directory": "/tmp/great_expectation/",
                }
            },
        )
    },
    store_backend_defaults=FilesystemStoreBackendDefaults(root_directory="/tmp/great_expectation"),
    )
    
    # Creating the GE context here
    context = BaseDataContext(project_config=data_context_config)
    
    # Providing the datasource details which here is the pandas DF. We define the actual DF in the batch request which is defined after creating the DS
    
    datasource_config = {
    "name": "pandas_dataframe_datasource",
    "class_name": "Datasource",
    "module_name": "great_expectations.datasource",
    "execution_engine": {
        "module_name": "great_expectations.execution_engine",
        "class_name": "PandasExecutionEngine",
    },
    "data_connectors": {
        "default_runtime_data_connector_name": {
            "class_name": "RuntimeDataConnector",
            "module_name": "great_expectations.datasource.data_connector",
            "batch_identifiers": ["default_identifier_name"],
        },
    },
            }
    con='done'

    # Adding the DS to the context
    context.add_datasource(**datasource_config)
    
    # Converting the Snowpark DF into Pandas DF.
    df=session.sql("select top 2000 * from TAXI_TRIPS_MAT_VIEW").to_pandas()
    
    #Creating the batch request whivh will be used 
    batch_request = RuntimeBatchRequest(
                                datasource_name="pandas_dataframe_datasource",
                                data_connector_name="default_runtime_data_connector_name",
                                data_asset_name="PandasData",  # This can be anything that identifies this data_asset for you
                                runtime_parameters={"batch_data": df},  # df is your dataframe, you have created above.
                                batch_identifiers={"default_identifier_name": "default_identifier"},
                                )
    
    # Creating the expecation suite
    context.create_expectation_suite(
    expectation_suite_name="pandas_expectation_suite", overwrite_existing=True)
    
    # Creating the validator which takes the batch request and expectation suite name
    validator = context.get_validator(
        batch_request=batch_request, expectation_suite_name="pandas_expectation_suite"
    )
    
    #Creating the required expectation. You can also create custom expectations as well. You can add additional inbuilt expectations as per the requirement
    validator.expect_column_values_to_be_in_set("TAXI_TYPE",["yellow","green"])
    validator.expect_column_min_to_be_between("VENDOR_ID",1,10)
    validator.expect_column_mean_to_be_between("VENDOR_ID",2,14)
    
    #Saving the expectation 
    validator.save_expectation_suite(discard_failed_expectations=False)
    
    # Creating the checkpoint without writing to the file system and running by passing the run time parameters

    my_checkpoint_name = "pandas_checkpoint"
    checkpoint_config = {
                "name": my_checkpoint_name,
                "config_version": 1.0,
                "class_name": "SimpleCheckpoint",
                "run_name_template": "%Y%m%d-%H%M%S-my-pandas_run-name-template",
            }
            
    context.add_checkpoint(**checkpoint_config)

    # run expectation_suite against Pandas dataframe
    res = context.run_checkpoint(
            checkpoint_name = my_checkpoint_name,
            validations=[
                {
                    "batch_request": batch_request,
                    "expectation_suite_name": "pandas_expectation_suite",
                }
            ],
        )
    
        
    # Defining the schema, creating the Snowpark DF and and writing the validation results to a table.
    schema = StructType([StructField("RunStatus", BooleanType()),StructField("RunId", VariantType()), StructField("RunValidation", VariantType())])

    df=session.create_dataframe([[res.success, json.loads(str(res.run_id)),json.loads(str(res.list_validation_results()))]], schema)

    df.write.mode('append').saveAsTable('GreatExpeactionValidationsResults')
    return 'SUCCESS'

    

In [28]:
session.sql("call usp_generateValidationResults()").collect()

[Row(USP_GENERATEVALIDATIONRESULTS='SUCCESS')]