In [1]:
import great_expectations as gx
import pandas as pd
from datetime import datetime
from great_expectations.checkpoint import Checkpoint
from great_expectations.data_context.types.base import DataContextConfig
import logging

In [2]:
def create_validator(path, context):
    validator = context.sources.pandas_default.read_csv(path)

    validator.expect_column_values_to_not_be_null('TotRmsAbvGrd')
    validator.expect_column_values_to_be_between('TotRmsAbvGrd', min_value=1, max_value=30)
    validator.expect_column_values_to_not_be_null('WoodDeckSF')
    validator.expect_column_values_to_not_be_null('YrSold')
    validator.expect_column_values_to_not_be_null('1stFlrSF')
    validator.expect_column_values_to_not_be_null('Foundation_BrkTil')
    validator.expect_column_values_to_not_be_null('Foundation_CBlock')
    validator.expect_column_values_to_not_be_null('Foundation_PConc')
    validator.expect_column_values_to_not_be_null('Foundation_Slab')
    validator.expect_column_values_to_not_be_null('Foundation_Stone')
    validator.expect_column_values_to_not_be_null('Foundation_Wood')
    validator.expect_column_values_to_not_be_null('KitchenQual_Ex')
    validator.expect_column_values_to_not_be_null('KitchenQual_Fa')
    validator.expect_column_values_to_not_be_null('KitchenQual_Gd')
    validator.expect_column_values_to_not_be_null('KitchenQual_TA')
    validator.save_expectation_suite(discard_failed_expectations=False)
    return validator

def validate(path, df):
    context = gx.get_context()
    now = datetime.now()

    # datasource  = context.sources.add_pandas("house_datasource")
    # data_asset = datasource.add_dataframe_asset(name="house_asset")
    # batch_request = data_asset.build_batch_request(dataframe=df)

    # context.add_or_update_expectation_suite(expectation_suite_name='expectation_house')
    # validator = context.get_validator(
    #     batch_request=data_asset.build_batch_request(dataframe=df),
    #     expectation_suite_name='expectation_house',
    # )
    validator = create_validator(path, context)

    # checkpoint = Checkpoint(
    #     name="house_checkpoint",
    #     run_name_template="housing_checkpoint",
    #     data_context=context,
    #     batch_request=batch_request,
    #     expectation_suite_name='expectation_house',
    #     action_list=[
    #         {
    #             "name": "store_validation_result",
    #             "action": {"class_name": "StoreValidationResultAction"},
    #         },
    #             {"name": "update_data_docs", "action": {"class_name": "UpdateDataDocsAction"}},
    #         ],
    #     )

    checkpoint = context.add_or_update_checkpoint(name='housing_checkpoint', validator=validator)
    result_format: dict = {
        "result_format": "COMPLETE",
        "unexpected_index_column_names": ["Id"],
    }
    checkpoint_result = checkpoint.run(result_format=result_format)
    context.view_validation_result(checkpoint_result)
    return checkpoint_result

In [3]:
def filter_expectations_result(result_json):
    # logging.info(result_json)
    run_results = result_json['run_results']
    val_result_id = list(run_results)[0]
    val_results = run_results[val_result_id]['validation_result']

    print(val_results['results'])
    # info = ''
    failed_rows = []
    failed_cols = []
    failed_conf = []

    for res in val_results['results']:
        if res['success'] == False:
            _col = res['expectation_config']['kwargs']['column']
            _exp_conf = res['expectation_config']['expectation_type']
            # info += f'column -> {_col}\n'
            # info += f'    expectation_config -> {_exp_conf}\n'
            indexes_list = res['result']['partial_unexpected_index_list']
            failed_rows_id = [item['Id'] for item in indexes_list]
            failed_rows = list(set(failed_rows + failed_rows_id))
            failed_conf = list(set(failed_conf + [_exp_conf]))
            failed_cols = list(set(failed_cols + [_col]))
            # info += f'    rows with errors -> {failed_rows_id}\n'

    desc = f'Validation Error on Rows {failed_rows} of file _FILENAME_ !'
    desc += f'Columns {failed_cols} failed on the following expectations : {failed_conf}'
    desc = desc.replace('[', '')
    desc = desc.replace(']', '')
    # logging.info(info)
    return failed_rows, desc


In [4]:
# path = '../data/folder_C/_file_0.csv'
path = '../data/folder_A/file_0.csv'

df = pd.read_csv(path)
df = df[df.columns[1:]]

In [5]:
result = validate(path,df)

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/78 [00:00<?, ?it/s]

In [69]:
log,failed_rows = filter_expectations_result(result.to_json_dict())
print(log)
print(failed_rows)

[{'success': True, 'expectation_config': {'expectation_type': 'expect_column_values_to_not_be_null', 'kwargs': {'column': 'TotRmsAbvGrd', 'batch_id': 'default_pandas_datasource-#ephemeral_pandas_asset'}, 'meta': {}}, 'result': {'element_count': 10, 'unexpected_count': 0, 'unexpected_percent': 0.0, 'partial_unexpected_list': [], 'partial_unexpected_counts': [], 'partial_unexpected_index_list': [], 'unexpected_list': [], 'unexpected_index_list': [], 'unexpected_index_query': 'df.filter(items=[], axis=0)'}, 'meta': {}, 'exception_info': {'raised_exception': False, 'exception_traceback': None, 'exception_message': None}}, {'success': True, 'expectation_config': {'expectation_type': 'expect_column_values_to_be_between', 'kwargs': {'column': 'TotRmsAbvGrd', 'max_value': 30, 'min_value': 1, 'batch_id': 'default_pandas_datasource-#ephemeral_pandas_asset'}, 'meta': {}}, 'result': {'element_count': 10, 'unexpected_count': 0, 'unexpected_percent': 0.0, 'partial_unexpected_list': [], 'unexpected_i

In [5]:
filtered_df = df[~df['Id'].isin(failed_rows)]

rows = list(filtered_df['Id'].values)
rows

[425, 411, 454, 828, 368, 1266, 1412]

In [12]:
path = '../data/folder_A/file_error_example.csv'
context = gx.get_context()
df = pd.read_csv(path)
result = validate(path,df)
log, failed_rows = log_failed_expectations(result.to_json_dict())
print(log)
print(failed_rows)

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/48 [00:00<?, ?it/s]

Validation Error on Rows [1323, 556, 411] of file _FILENAME_ !Columns ['Foundation'] failed on the following expectations : ['expect_column_values_to_be_in_set', 'expect_column_values_to_not_be_null']
[1323, 556, 411]
