#### Import Great Expectations

In [0]:
from ruamel import yaml

from datetime import datetime,date
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context import BaseDataContext
from great_expectations.core.batch import RuntimeBatchRequest

from great_expectations.data_context.types.base import (
    DataContextConfig,
    FilesystemStoreBackendDefaults,
)


import pandas as pd
from great_expectations.dataset import SparkDFDataset

from great_expectations.render.view import DefaultJinjaPageView
from great_expectations.render.renderer import *
from great_expectations.profile.basic_dataset_profiler import BasicDatasetProfiler


In [0]:
root_directory = "/dbfs/great_expectations/"

#### Set up Great Expectations

In [0]:
data_context_config = DataContextConfig(
    store_backend_defaults=FilesystemStoreBackendDefaults(
        root_directory=root_directory
    ),
)
context = BaseDataContext(project_config=data_context_config)

#### Prepare and connect to Data

In [0]:
df=spark.sql('select * from edpdlscrsa_nutter_test.nutter_tbl_l1_load')
df.display()

policy_number,company_code,plan_code,issue_date,qualification_code,system_of_record,edp_last_update_id,edp_last_update_type,edp_last_update_ts,nutter_temptbl_identifier
0002179340,JNL,019855F1,1980-11-17,0,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,49f8d0a01a8bb4c3623f3b4ef5162e67e30ed75f
0002264240,JNL,01985001,1981-11-28,0,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,e0e97c92549e9dd39f7c1a05eebcd554d5877c12
0002289140,JNL,019912F1,1982-02-03,A,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,387a0e65595c806af449f9420fb482fe02114492
0002332540,JNL,019912F1,1982-03-17,A,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,19eceda55ee6eea7aaf94cc4e40dc7d925a898f0
0002561740,JNL,019940F1,1983-01-28,A,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,79c2c4bb46c984e22c6115fb81cb64f38f33c43d
0003679840,JNL,019875F1,1986-04-11,A,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,fb9876f67778a2ccd16ce2eb38a20c35488a4f3a
0003706640,JNL,019875F1,1986-04-15,A,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,359cf629bbbd406bee0fd84aa3cbf7fe96d8bf9f
0004837840,JNL,01982621,1988-10-21,A,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,f1dfe2cf28880aaa4027bf19b7f9eef11a0f5d11
0004900840,JNL,019826F1,1989-03-24,A,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,15dc7aeda624244b4707462e3c4da4b5162538aa
0004916440,JNL,019822F1,1989-04-13,A,CYB,a187f1b1-1dab-45d2-9872-6aa02b2c92dd,nutter test dataset,2022-11-07T14:59:39.719+0000,c1e717c1027faf58440ff6a0e2d0fa3e1c072d5a


In [0]:
my_spark_datasource_config = {
    "name": "Hive_datasource",
    "class_name": "Datasource",
    "execution_engine": {"class_name": "SparkDFExecutionEngine"},
    "data_connectors": {
        "Hive_dataconnector": {
            "module_name": "great_expectations.datasource.data_connector",
            "class_name": "RuntimeDataConnector",
            "batch_identifiers": [
                "some_key_maybe_pipeline_stage",
                "some_other_key_maybe_run_id",
            ],
        }
    },
}

In [0]:
context.test_yaml_config(yaml.dump(my_spark_datasource_config))

In [0]:
context.add_datasource(**my_spark_datasource_config)

In [0]:
batch_request = RuntimeBatchRequest(
    datasource_name="Hive_datasource",
    data_connector_name="Hive_dataconnector",
    data_asset_name="QC_Data_Asset",  # This can be anything that identifies this data_asset for you
    batch_identifiers={
        "some_key_maybe_pipeline_stage": "prod",
        "some_other_key_maybe_run_id": f"my_run_name_{date.today().strftime('%Y%m%d')}",
    },
    runtime_parameters={"batch_data": df},  # Your dataframe goes here
)

#### Create the Expectations

In [0]:
expectation_suite_name = "qc_expectation_suite"
context.create_expectation_suite(
    expectation_suite_name=expectation_suite_name, overwrite_existing=True
)
validator = context.get_validator(
    batch_request=batch_request,
    expectation_suite_name=expectation_suite_name,
)

print(validator.head())

##Validations using GE

In [0]:
validator.expect_column_values_to_be_unique(column="qualification_code",
                                            result_format="BOOLEAN_ONLY")

In [0]:
validator.expect_column_to_exist(column="qualification_id")


In [0]:
validator.expect_table_row_count_to_equal(50)

In [0]:
custom_test = CustomTest(tests.tests_results)

custom_test.column_exists(
  data=actual_ds,
  column_name='age',
  threshold=0.5
)

In [0]:
validator.expect_column_values_to_be_of_type("edp_last_update_ts","TimestampType")

In [0]:
validator.expect_query_to_have_no_duplicate_value_combinations(1,3)

In [0]:
validator.save_expectation_suite(discard_failed_expectations=False)

In [0]:
# Create a SparkDFDataset instance of raw_test

mandatory_columns = ["policy_number","company_code","plan_code"]
raw_test = SparkDFDataset(df)


In [0]:
for column in mandatory_columns:
  try:
      assert raw_test.expect_column_to_exist(column).success, f"Manadatory column {column} does not exists: FAILED"
      print(f"Column {column} exists: PASSED")
  except AssertionError as e:
      print(e)
          

In [0]:
gdf = SparkDFDataset(df)

gdf.expect_column_values_to_be_of_type("company_code", "StringType") #-> My Company code should be string, buit not any Numerical

gdf.expect_column_values_to_be_between("policy_number", 0, 9999999999) #-> My Policy number should fall in this range Only

# document_model = ExpectationSuitePageRenderer().render(gdf.get_expectation_suite())
# displayHTML(DefaultJinjaPageView().render(document_model))

validation_result = gdf.validate()

document_model = ValidationResultsPageRenderer().render(validation_result)
displayHTML(DefaultJinjaPageView().render(document_model))

Unnamed: 0,Unnamed: 1
Evaluated Expectations,2
Successful Expectations,1
Unsuccessful Expectations,1
Success Percent,50%

Unnamed: 0,Unnamed: 1
Great Expectations Version,0.15.43
Run Name,__none__
Run Time,2023-01-13T11:33:49Z

Unnamed: 0,Unnamed: 1
ge_batch_id,254fe81c-9336-11ed-87a0-00163ed348e5

Status,Expectation,Observed Value
,values must be of type StringType.,StringType

Status,Expectation,Observed Value
Sampled Unexpected Values,Unnamed: 1_level_1,Unnamed: 2_level_1
,values must be greater than or equal to 0 and less than or equal to 9999999999.  2 unexpected values found. 4% of 50 total rows.  Sampled Unexpected Values  2707192016  986966803Z,4% unexpected
Sampled Unexpected Values,,
2707192016,,
986966803Z,,

Sampled Unexpected Values
2707192016
986966803Z


In [0]:
for column in mandatory_columns:
  try:
    test_result = raw_test.expect_column_values_to_not_be_null(column)
    assert test_result.success,\
      f"{test_result.result['unexpected_count']} of {test_result.result['element_count']} items in {column} are null : FAILED"
    
    print(f"All item in column {column} are not null:PASSED")
  except AssertionError as e:
    print(e)
  except AnalysisException as e:
    print(e)      

#'element_count': The total number of values in the column
#'unexpected_count': The total count of unexpected values in the column (also in `BASIC`)   
# more info : https://legacy.docs.greatexpectations.io/en/0.13.8/reference/core_concepts/expectations/result_format.html

In [0]:
unique_check_result = raw_test.expect_column_values_to_be_unique("policy_number")

failed_msg = " ".join([f"""{unique_check_result.result['unexpected_count']} of {unique_check_result.result['element_count']} items""",
                       f"""or {round(unique_check_result.result['unexpected_percent'],2)}% are not unique:FAILED"""])
                        
print(f"""{'column id is unique: PASSED if unique_check_result.success else failed_msg'}""")

In [0]:
date_format_check =  raw_test.expect_column_values_to_match_strftime_format('edp_last_update_ts', strftime('%Y%m%d')  #'%Y-%m-%d %H:%M:%S')

print(f"""edp_last_update_ts column values are in correct datetime format: {'PASSED' if date_format_check.success else 'FAILED'}""")

In [0]:
test_result = raw_test.expect_column_values_to_be_in_set("company_code", ["JNL"])
print(f"""All the comapny codes are of JNL: {'PASSED' if test_result.success else 'FAILED'}""")