# Integrate Data Validation Into Your Pipeline



In [13]:
# Prep environment and logging

import json
import os
import sys
import logging
import great_expectations as ge
import great_expectations.jupyter_ux
import uuid # used to generate run_id
from great_expectations.profile import ColumnsExistProfiler
import pandas as pd
from datetime import datetime
from great_expectations.dataset.sqlalchemy_dataset import SqlAlchemyDataset
from great_expectations.dataset.pandas_dataset import PandasDataset, MetaPandasDataset
from sqlalchemy import create_engine
import tzlocal

__file__ = '/Users/mparayil/Desktop/Development/dsa-data-workflows/grtexp_agero_dsa/great_expectations/ge_prod/validate_expectations.py'
from great_expectations.datasource.types import BatchKwargs
path = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
sys.path.append(path)
import ge_prod.ge_data_access as gda


In [11]:
SNOWFLAKE_SERVICE_USERNAME = os.getenv('snowflake_dsa_username')
SNOWFLAKE_SERVICE_PASSWORD = os.getenv('snowflake_service_password')
SNOWFLAKE_ACCOUNT = os.getenv('snowflake_account')
SNOWFLAKE_DSA_WAREHOUSE = os.getenv('snowflake_dsa_warehouse')
SNOWFLAKE_DSA_DATABASE = os.getenv('snowflake_dsa_database')
SNOWFLAKE_DSA_SCHEMA = os.getenv('snowflake_dsa_schema')
SNOWFLAKE_DSA_ETL_ROLE = os.getenv('snowflake_dsa_etl_role')

def sfquery_to_df(query: str, conn_url: str, parse_date_cols=None, chunk_size=None):
    engine = create_engine(conn_url)
    conn = engine.connect()
    chunk_list = []
    if parse_date_cols is None or chunk_size is None:
        for chunk in pd.read_sql_query(query, conn, chunksize=1000):
            chunk_list.append(chunk)
        df = pd.concat(chunk_list)
    conn.close()
    engine.dispose()
    return df
conn_url = f"snowflake://{SNOWFLAKE_SERVICE_USERNAME}:{SNOWFLAKE_SERVICE_PASSWORD}@{SNOWFLAKE_ACCOUNT}/\
{SNOWFLAKE_DSA_DATABASE}/{SNOWFLAKE_DSA_SCHEMA}?warehouse={SNOWFLAKE_DSA_WAREHOUSE}&role={SNOWFLAKE_DSA_ETL_ROLE}"

rule_query = """
select * from network_outreach
where dispatch_request_time_utc < to_date('2020-01-01')
"""
validate_query = """
select * from network_outreach
where dispatch_request_time_utc >= to_date('2020-01-01')
"""

In [4]:
# placeholder query to validate daily data against expectations
# i.e today's date is 2020-01-30, query will look between 2020-01-29 00:00:00 and 2020-01-29 23:59:59.000

daily_batch_query = """
SELECT * FROM customer_complaints where last_modified_time_utc >= cast(current_date() 
-1 AS TIMESTAMP) AND last_modified_time_utc <= DATEADD(SECOND, -1, CAST(current_date()
AS TIMESTAMP));
"""

In [5]:
daily_batch_query

'\nSELECT * FROM customer_complaints where last_modified_time_utc >= cast(current_date() \n-1 AS TIMESTAMP) AND last_modified_time_utc <= DATEADD(SECOND, -1, CAST(current_date()\nAS TIMESTAMP));\n'

### Get a DataContext object


In [6]:
context = ge.data_context.DataContext()

2020-03-01T18:05:57-0500 - INFO - Using project config: /Users/mparayil/Desktop/Development/dsa-data-workflows/grtexp_agero_dsa/great_expectations/great_expectations.yml


### Get a pipeline run id

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#set-a-run-id)


In [7]:
# Generate a run-id that GE will use to key shared parameters
validate_run_id = datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S')
print('validate_run_id: ', validate_run_id)

validate_run_id:  2020-03-01 23:05:59


### Choose data asset name and expectation suite name

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#choose-data-asset-and-expectation-suite)


In [8]:
ge.jupyter_ux.list_available_data_asset_names(context)

Inspecting your data sources. This may take a moment...


In [9]:
data_asset_name = "customer_complaints" # TODO: replace with your value!
normalized_data_asset_name = context.normalize_data_asset_name(data_asset_name)
expectation_suite_name = "warnings_2019Q4" # TODO: replace with your value!
print(normalized_data_asset_name)

NormalizedDataAssetName(datasource='agero_dsa_pandas', generator='default', generator_asset='customer_complaints')


In [14]:
validate_df = gda.snowflake_connector_to_df(daily_batch_query)

In [15]:
batch_kwargs = {'dataset': validate_df}
batch = context.get_batch(data_asset_name=normalized_data_asset_name, 
                          expectation_suite_name=expectation_suite_name, batch_kwargs=batch_kwargs)

In [17]:
batch.shape

(165, 22)

In [18]:
# getting validate_df batchId & fingerprint
validate_batch_id = batch.batch_id
validate_batch_fingerprint = batch.batch_fingerprint

print('validate_batch_fingerprint: ', '\n', validate_batch_fingerprint)
print('------------------------------------------------------')
print('validate_batch_id: ', '\n', validate_batch_id)

validate_batch_fingerprint:  
 {'partition_id': '20200301T230843.888604Z', 'fingerprint': 'f4503256d85d7eac9f6e88f1ee38481a'}
------------------------------------------------------
validate_batch_id:  
 {'timestamp': 1583104113.2253902, 'PandasInMemoryDF': True, 'fingerprint': '93e35ab82d7f8c2138ebdff3b68f4cee'}



##### If your pipeline processes Pandas Dataframes:

```
df = pd.read_csv(file_path_to_validate)
df.head()
batch = context.get_batch(data_asset_name, expectation_suite_name, df)
```

##### If your pipeline processes Spark Dataframes:
```
from pyspark.sql import SparkSession
from great_expectations.dataset import PandasDataset, SqlAlchemyDataset, SparkDFDataset
spark = SparkSession.builder.getOrCreate()
df = SparkDFDataset(spark.read.csv(file_path_to_validate))
df.spark_df.show()
batch = context.get_batch(data_asset_name, expectation_suite_name, df)
```

##### If your pipeline processes SQL querues:
```
batch = context.get_batch(data_asset_name, expectation_suite_name, query="SELECT * from ....") # the query whose result set you want to validate
```


### Validate the batch

This is the "workhorse" method of Great Expectations. Call it in your pipeline code after loading the file and just before passing it to your computation.

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#validate)



In [19]:
validation_result = batch.validate(run_id=validate_run_id, catch_exceptions=True, result_format='SUMMARY')

2020-03-01T18:08:59-0500 - INFO - 	76 expectation(s) included in expectation_suite.


In [32]:
logging.basicConfig(level=logging.DEBUG)

if validation_result['success']:
    print('This batch, run_id: {0} meets all expectations from a valid batch of {1:s}'.format(validate_run_id,
                                                                                              str(data_asset_name)))
    logging.info('This batch, run_id: {0} meets all expectations from a valid batch of {1:s}'.format(validate_run_id,
                                                                                              str(data_asset_name)))
else:
    print('something went wrong...')
    print('This batch, run_id: {0} does not meet all expectations of {1:s}'.format(validate_run_id, data_asset_name))
    logging.warning('This batch, run_id: {0} does not meet all expectations of {1:s}'.format(validate_run_id, data_asset_name))
    
    for test in validation_result['results']:
        if not test['success']:
            validation_rule = test['expectation_config']
            column = test['expectation_config']['kwargs']
            print(validation_rule, column, '\n', sep='\n')



something went wrong...
This batch, run_id: 2020-03-01 23:05:59 does not meet all expectations of customer_complaints
{'expectation_type': 'expect_column_values_to_not_be_null', 'kwargs': {'column': 'CASE_RESOLUTION', 'mostly': 0.99, 'result_format': 'SUMMARY'}}
{'column': 'CASE_RESOLUTION', 'mostly': 0.99, 'result_format': 'SUMMARY'}


{'expectation_type': 'expect_column_values_to_not_be_null', 'kwargs': {'column': 'CASE_SUMMARY', 'mostly': 0.75, 'result_format': 'SUMMARY'}}
{'column': 'CASE_SUMMARY', 'mostly': 0.75, 'result_format': 'SUMMARY'}


{'expectation_type': 'expect_column_values_to_not_be_null', 'kwargs': {'column': 'COMPLAINT_CATEGORY', 'mostly': 0.75, 'result_format': 'SUMMARY'}}
{'column': 'COMPLAINT_CATEGORY', 'mostly': 0.75, 'result_format': 'SUMMARY'}


{'expectation_type': 'expect_column_values_to_not_be_null', 'kwargs': {'column': 'COMPLAINT_REASON', 'mostly': 0.75, 'result_format': 'SUMMARY'}}
{'column': 'COMPLAINT_REASON', 'mostly': 0.75, 'result_format': 'SUMMARY'

### Review the validation results

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#review-validation-results)


In [22]:
print(json.dumps(validation_result, indent=4))

{
    "results": [
        {
            "success": true,
            "expectation_config": {
                "expectation_type": "expect_column_to_exist",
                "kwargs": {
                    "column": "CASE_COMMENTS",
                    "result_format": "SUMMARY"
                }
            },
            "exception_info": {
                "raised_exception": false,
                "exception_message": null,
                "exception_traceback": null
            }
        },
        {
            "success": true,
            "result": {
                "element_count": 165,
                "unexpected_count": 0,
                "unexpected_percent": 0.0,
                "partial_unexpected_list": []
            },
            "expectation_config": {
                "expectation_type": "expect_column_values_to_not_be_null",
                "kwargs": {
                    "column": "CASE_COMMENTS",
                    "mostly": 0.99,
                    "result_format":

In [24]:
# This is an example of invoking a validation operator that is configured by default in the great_expectations.yml file

results = context.run_validation_operator(
    assets_to_validate=[batch],
    run_id=validate_run_id,
    validation_operator_name="action_list_operator",
)

2020-03-01T18:09:58-0500 - INFO - 	76 expectation(s) included in expectation_suite.


INFO:great_expectations.data_asset.data_asset:	76 expectation(s) included in expectation_suite.


In [25]:
print(context.stores)

{'expectations_store': <great_expectations.data_context.store.namespaced_read_write_store.ExpectationsStore object at 0x126eebe50>, 'validations_store': <great_expectations.data_context.store.namespaced_read_write_store.ValidationsStore object at 0x128b8efd0>, 'evaluation_parameter_store': <great_expectations.data_context.store.evaluation_parameter_store.InMemoryEvaluationParameterStore object at 0x127fdbed0>}


In [26]:
context.open_data_docs()

### TODO: Parsing Validation Result

### To be worked on, working code below

In [None]:
import uuid
from prettytable import PrettyTable
x = PrettyTable()

x.field_names = ["Table Name", "Column name", "Success", "Unexpected Count", "Expectation Type", "Partial Unexpected List"]

run_result = validation_result.get("success")
if run_result:
    print('Data validation successful')
else:
    for col in validation_result.get("results"):
#         print(col.get('expectation_config').get('kwargs').get('column'))
#         if not col.get("success"):
#             print("Column:", col.get('expectation_config').get('kwargs').get('column'))
#             print(json.dumps(col, indent=4))
            x.add_row(["master_dispatches", col.get('expectation_config').get('kwargs').get('column'),
                       col.get('success'),
                       col.get('result').get('unexpected_count', 0),
                       col.get('expectation_config').get('expectation_type'),
                       col.get('result').get('partial_unexpected_list', [])])
print(x)

In [None]:
# columns_lst = ["Table Name", "Column name", "Success", "Unexpected Count", "Expectation Type",
#                  "Partial Unexpected List", "Date"]
import uuid
columns_lst = ["table_name_date", "column_name", "success", "unexpected_count", "expectation_type",
                 "partial_unexpected_list", "created_date", "uuid"]

df = pd.DataFrame(columns=columns_lst)
run_result = validation_result.get("success")
date_time = datetime.utcnow()
date = datetime.utcnow().strftime("%Y_%m_%d")
if run_result:
    print('Data validation successful')
else:
    for col in validation_result.get("results"):
        row = (["master_dispatches_"+date, col.get('expectation_config').get('kwargs').get('column'),
                   col.get('success'),
                   col.get('result').get('unexpected_count', 0),
                   col.get('expectation_config').get('expectation_type'),
                   col.get('result').get('partial_unexpected_list', []),
                   date_time, str(uuid.uuid4())])
        
        df = df.append(pd.Series(row, index=df.columns), ignore_index=True)
df['created_date'] = df['created_date'].astype(str)
df['partial_unexpected_list'] = df['partial_unexpected_list'].astype(str)
df['unexpected_count'] = df['unexpected_count'].astype(int)
        
df

In [None]:
import pandas as pd
from great_expectations.dataset import PandasDataset


class CustomPandasDataset(PandasDataset):

    @PandasDataset.column_pair_map_expectation
    def expect_column_pair_values_A_to_be_not_null_when_B_has_specific_value(self,
                                                                             column_A,
                                                                             column_B,
                                                                             specific_val,
                                                                             ignore_row_if="both_values_are_missing",
                                                                             result_format=None, include_config=False,
                                                                             catch_exceptions=None, meta=None
                                                                             ):

        temp_df = pd.DataFrame({"A": column_A, "B": column_B})

        results = []
        for i, t in temp_df.iterrows():
            if pd.isnull(t["A"]):
                a = None
            else:
                a = t["A"]

            if pd.isnull(t["B"]):
                b = None
            else:
                b = t["B"]

            ans = True
            if b == specific_val:
                ans = a != None
            results.append(ans)

        return pd.Series(results, temp_df.index)

    @PandasDataset.column_pair_map_expectation
    def expect_column_pair_values_A_to_be_inrange_when_B_inrange(self,
                                                                 column_A,
                                                                 column_B,
                                                                 column_range_lst,
                                                                 ignore_row_if="both_values_are_missing",
                                                                 result_format=None, include_config=False,
                                                                 catch_exceptions=None, meta=None
                                                                 ):

        temp_df = pd.DataFrame({"A": column_A, "B": column_B})

        results = []
        for i, t in temp_df.iterrows():
            if pd.isnull(t["A"]):
                a = None
            else:
                a = t["A"]

            if pd.isnull(t["B"]):
                b = None
            else:
                b = t["B"]

            ans = False
            for row in column_range_lst:

                if a >= row[0] and a <= row[1] and b >= row[2] and b <= row[3]:
                    ans = True
                    break
            results.append(ans)
        return pd.Series(results, temp_df.index)

### Finishing touches - notifications and saving validation results and validated batches

#### Notifications
You want to be notified when the pipeline validated a batch, especially when the validation failed.

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#send-notifications)

#### Saving validation results

To enable the storing of validation results, uncomment the `result_store` section in your great_expectations.yml file. 

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#save-validation-results)

#### Saving failed batches

When a batch fails validation (it does not pass all the expectations of the data asset), it is useful to save the batch along with the validation results for future review. You can enable this option in your project's great_expectations.yml file. 

[Read more in the tutorial](https://docs.greatexpectations.io/en/latest/getting_started/pipeline_integration.html?utm_source=notebook&utm_medium=integrate_validation#save-failed-batches)

