# Great Expectations applied to Iceberg table on CDW
The exercise is to run some quality test that wil be performed and then save the results on a iceberg table in order to can generate a time-based evolution dashboard.


In [21]:
# The variables:
source_database="spain_gas_prices"
source_table="bv_gasprices_locations_latest_prices_view_v1"
limit="" #" order by ingestdate desc limit 100000" to prevent spark sql memory errors
quality_database="spain_gas_prices"
quality_table="quality_rules_history"
cloudera_data_virtual_warehouse="smerchan-vw-demo"

## Great Expectation install
using pip
First thing is to install great expectations. Or recheck it :-)


In [17]:
!pip install great_expectations



Needed imports in order to make everything works ok

In [18]:
import great_expectations as gx
import os 
import pandas as pd
from datetime import datetime
import json 
import cml.data_v1 as cmldata

In [19]:
# This function get the result of the "expectation" (the GE test) and return a DF with the data.

def process_output_to_table (output_json):
    #data= json.loads(output_json)
    # Extract relevant data
    success = output_json["success"]
    test=output_json["expectation_config"]["expectation_type"]
    column_name = output_json["expectation_config"]["kwargs"]["column"]
    
    element_count = output_json["result"]["element_count"]
    unexpected_count = output_json["result"]["unexpected_count"]
    unexpected_percent = output_json["result"]["unexpected_percent"]
    if unexpected_percent is None:
        unexpected_percent =0.0

        

    # Create a DataFrame
    df = pd.DataFrame({
        "success": [success],
        "test": [test],
        "expectation_config": [output_json["expectation_config"]],  # Store the entire dictionary
        "column_tested": [column_name],
        "element_count": [element_count],
        "unexpected_count": [unexpected_count],
        "unexpected_percent": [unexpected_percent],
    })

    return (df)


## Get the source table values
and store them in a df in orden to use GE from pandas (there is not a direct connection to impala yet)


In [22]:
CONNECTION_NAME = cloudera_data_virtual_warehouse
conn = cmldata.get_connection(CONNECTION_NAME)

## Sample Usage to get pandas data frame
SQL_QUERY = "SELECT * FROM "+source_database+"."+source_table + limit 
print (SQL_QUERY)
df = conn.get_pandas_dataframe(SQL_QUERY)
# Closing the connection




SELECT * FROM spain_gas_prices.bv_gasprices_locations_latest_prices_view_v1


In [23]:
conn.close()

<impala.hiveserver2.HiveServer2Connection at 0x7feb959ff010>

In [24]:
df #As we are in a Notebook, just calling the df will pretty print it :-)

Unnamed: 0,latitude,longitude,name,address,town,province,pricediesela,pricefuel95e10,pricefuel95e5,pricebiodiesel,...,pricecompressednaturalgas,priceliquefiednaturalgas,priceliquefiedpetroleumgases,pricedieselb,pricedieselpremium,pricefuel95e5premium,pricefuel98e10,pricefuel98e5,pricehydrogen,stations
0,41.200417,-6.582694,REPSOL ...,CARRETERA SA-314 KM. ...,Masueco ...,SALAMANCA ...,1.569,,1.699,,...,,,,1.309,1.679,,,,,1
1,36.663139,-4.753139,REPSOL ...,"AVDA. REINA SOFIA, S/N ...",Coín ...,MÁLAGA ...,1.558,,1.728,,...,,,,1.318,1.678,,,1.888,,1
2,41.491667,2.185278,REPSOL ...,"CARRETERA N-152 KM. 11,9 ...",Montcada i Reixac ...,BARCELONA ...,1.619,,1.799,,...,,,0.989,,1.709,,,1.949,,1
3,36.609806,-6.278444,SP ...,"CARRETERA CR CA-603 KM. 4,350 ...",Puerto de Santa María (El) ...,CÁDIZ ...,1.529,,1.689,,...,,,,,1.629,,,,,1
4,40.119250,-3.933611,CEPSA ...,"CARRETERA CM-4004 KM. 17,50 ...",Cedillo del Condado ...,TOLEDO ...,1.529,,1.689,,...,,,,1.255,1.595,1.749,,1.821,,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
11922,36.922528,-4.280500,AGRO-OLIVARERA RIOGORDO ...,"PARAJE EL CAMPILLO, S/N ...",Riogordo ...,MÁLAGA ...,1.365,,1.566,,...,,,,0.959,,,,,,1
11923,40.536861,-3.615222,GALP ...,"AVENIDA FERNANDO ALONSO, 7 ...",Alcobendas ...,MADRID ...,1.484,,1.674,,...,,,0.999,,1.554,,,1.824,,1
11924,37.032833,-4.530917,GAFISUR ANTEQUERA ...,"CALLE ACLARADORES LOS, 10 ...",Antequera ...,MÁLAGA ...,1.389,,1.529,,...,,,,,,,,,,1
11925,28.867833,-13.831611,ESTACION PLAYA BLANCA ...,CARRETERA ARRECIFE-PLAYA BLANCA KM. 34 ...,Yaiza ...,PALMAS (LAS) ...,1.339,,1.379,,...,,,,,,,,1.499,,1


## Great Expectation context
In order to user the GE calls, we first need to hava a context
https://docs.greatexpectations.io/docs/oss/tutorials/quickstart/

"A Data Context is the primary entry point for a Great Expectations (GX) deployment, and it provides the configurations and methods for all supporting GX components.

As the primary entry point for the GX API, the Data Context provides a convenient method for accessing common objects based on untyped input or common defaults. A Data Context also allows you to configure top-level components, and you can use different storage methodologies to back up your Data Context configuration. After you instantiate your DataContext and store its configurations, it always behaves the same way."


In [25]:
#We generate a GE context to evaluate the data.
context = gx.get_context()




## GE DataSource
Is the way to connect to the data, and the data is called the data asset.

https://docs.greatexpectations.io/docs/oss/tutorials/quickstart/

https://docs.greatexpectations.io/docs/reference/learn/terms/datasource/

"A Data Source provides a standard API for accessing and interacting with data from a wide variety of source systems.

Data Sources provide a standard API across multiple backends: the Data Source API remains the same for PostgreSQL, CSV Filesystems, and all other supported data backends."

In [26]:
# 1º THE DATA SOURCE
#We add a data source (to host our data frame)
datasource= context.sources.add_pandas(name="pandas_data_source")
#print (context)

In [27]:
# 2º THE DATA ASSET 
#assign the name of the data asset and asign it to the data source, and load the df to the asset.
name="bv_gasprices_locations_lastday_dataframe"
data_asset = datasource.add_dataframe_asset(name=name)
my_batch_request = data_asset.build_batch_request(dataframe=df) #here we add the df to the "batch"




## GE Batch and validators attached.
The Batch is a way to add test to it, and the tests are called validations. That validations are formed with "expectations" (what to expect of the data)

https://docs.greatexpectations.io/docs/oss/guides/validation/validate_data_overview

"Batch Requests are used to specify the data that a Checkpoint Validates. You can add additional validation data to your Checkpoint by assigning it Batch Requests, or specifying that a Batch Request is required at run time.

Expectation Suites contain the Expectations that the Checkpoint runs against the validation data specified in its Batch Requests. Checkpoints are assigned Expectation Suites and Batch Requests in pairs, and when the Checkpoint is run it will Validate each of its Expectation Suites against the data provided by its paired Batch Request.

For more information on adding Batch Requests and Expectation Suites to a Checkpoint, see How to add validations data or suites to a Checkpoint."


In [28]:
# 3 USE THE BATCH AN CREATE A VALIDATOR TO BEGIN ANALYZING Expectations RULES.
# Define expectations in a separate file or directly on the validator (modify as needed)

context.add_or_update_expectation_suite("gas_stations_expectation_suite")
# Optional. Run assert "my_expectation_suite" in context.list_expectation_suite_names() to veriify the Expectation Suite was created.
validator = context.get_validator(
    batch_request=my_batch_request,
    expectation_suite_name="gas_stations_expectation_suite",
)
validator.head()

Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

Unnamed: 0,latitude,longitude,name,address,town,province,pricediesela,pricefuel95e10,pricefuel95e5,pricebiodiesel,...,pricecompressednaturalgas,priceliquefiednaturalgas,priceliquefiedpetroleumgases,pricedieselb,pricedieselpremium,pricefuel95e5premium,pricefuel98e10,pricefuel98e5,pricehydrogen,stations
0,41.200417,-6.582694,REPSOL ...,CARRETERA SA-314 KM. ...,Masueco ...,SALAMANCA ...,1.569,,1.699,,...,,,,1.309,1.679,,,,,1
1,36.663139,-4.753139,REPSOL ...,"AVDA. REINA SOFIA, S/N ...",Coín ...,MÁLAGA ...,1.558,,1.728,,...,,,,1.318,1.678,,,1.888,,1
2,41.491667,2.185278,REPSOL ...,"CARRETERA N-152 KM. 11,9 ...",Montcada i Reixac ...,BARCELONA ...,1.619,,1.799,,...,,,0.989,,1.709,,,1.949,,1
3,36.609806,-6.278444,SP ...,"CARRETERA CR CA-603 KM. 4,350 ...",Puerto de Santa María (El) ...,CÁDIZ ...,1.529,,1.689,,...,,,,,1.629,,,,,1
4,40.11925,-3.933611,CEPSA ...,"CARRETERA CM-4004 KM. 17,50 ...",Cedillo del Condado ...,TOLEDO ...,1.529,,1.689,,...,,,,1.255,1.595,1.749,,1.821,,1


## GE Expectations 
In order to do it simple, we reun some validator expectations and parse the output in order to save it in a df, to do later the insert of all of that results to the quality table results.


In [29]:
## Add expectation to expectation suite


In [30]:

output=validator.expect_column_values_to_not_be_null(column="latitude")
df=process_output_to_table (output)

output=validator.expect_column_values_to_not_be_null(column="longitude")
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_not_be_null(column="town")
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_not_be_null(column="province")
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_not_be_null(column="address")
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_not_be_null(column="stations")
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_not_be_null(column="stations")
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("pricediesela", min_value=0, max_value=3)
#print (output)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("pricefuel95e10", min_value=0, max_value=3)
#print (output)
df2=process_output_to_table (output)
df=pd.concat([df,df2])
output=validator.expect_column_values_to_be_between("pricefuel95e5", min_value=0, max_value=3)
#print (output)
df2=process_output_to_table (output)
df=pd.concat([df,df2])
output=validator.expect_column_values_to_be_between("pricebiodiesel", min_value=0, max_value=3)
#print (output)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("pricebioetanol", min_value=0, max_value=3)
#print (output)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("pricecompressednaturalgas", min_value=0, max_value=3)
#print (output)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("priceliquefiednaturalgas", min_value=0, max_value=3)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("priceliquefiedpetroleumgases", min_value=0, max_value=3)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("pricedieselb", min_value=0, max_value=3)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("pricedieselpremium", min_value=0, max_value=3)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("pricefuel95e5premium", min_value=0, max_value=3)
df2=process_output_to_table (output)
df=pd.concat([df,df2])
current_datetime=datetime.now()

output=validator.expect_column_values_to_be_between("pricefuel98e10", min_value=0, max_value=3)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("pricefuel98e5", min_value=0, max_value=3)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("pricehydrogen", min_value=0, max_value=3)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("longitude", min_value=-18.23, max_value=4.38)
df2=process_output_to_table (output)
df=pd.concat([df,df2])

output=validator.expect_column_values_to_be_between("latitude", min_value=27.0, max_value=43.9)
df2=process_output_to_table (output)
df=pd.concat([df,df2])



Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

In [31]:
# Lets see the df with all the results, and the timestamp well formated for the insert to the iceberg table.

df['test_time']=current_datetime
df['table_tested']="bv_gasprices_locations_latest_prices_view_v1"
df['database_tested']="spain_gas_prices"
df

Unnamed: 0,success,test,expectation_config,column_tested,element_count,unexpected_count,unexpected_percent,test_time,table_tested,database_tested
0,True,expect_column_values_to_not_be_null,"[_expectation_type, _kwargs, _raw_kwargs, meta...",latitude,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices
0,True,expect_column_values_to_not_be_null,"[_expectation_type, _kwargs, _raw_kwargs, meta...",longitude,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices
0,True,expect_column_values_to_not_be_null,"[_expectation_type, _kwargs, _raw_kwargs, meta...",town,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices
0,True,expect_column_values_to_not_be_null,"[_expectation_type, _kwargs, _raw_kwargs, meta...",province,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices
0,True,expect_column_values_to_not_be_null,"[_expectation_type, _kwargs, _raw_kwargs, meta...",address,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices
0,True,expect_column_values_to_not_be_null,"[_expectation_type, _kwargs, _raw_kwargs, meta...",stations,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices
0,True,expect_column_values_to_not_be_null,"[_expectation_type, _kwargs, _raw_kwargs, meta...",stations,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices
0,True,expect_column_values_to_be_between,"[_expectation_type, _kwargs, _raw_kwargs, meta...",pricediesela,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices
0,True,expect_column_values_to_be_between,"[_expectation_type, _kwargs, _raw_kwargs, meta...",pricefuel95e10,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices
0,True,expect_column_values_to_be_between,"[_expectation_type, _kwargs, _raw_kwargs, meta...",pricefuel95e5,11927,0,0.0,2024-05-20 09:15:18.210066,bv_gasprices_locations_latest_prices_view_v1,spain_gas_prices


### Create a Spark Session to save the results to the iceberg table

In [33]:

CONNECTION_NAME = "se-aw-edl"
conn = cmldata.get_connection(CONNECTION_NAME)
spark = conn.get_spark_session()

# Sample usage to run query through spark
EXAMPLE_SQL_QUERY = "show databases"
spark.sql(EXAMPLE_SQL_QUERY).show()


Setting spark.hadoop.yarn.resourcemanager.principal to smerchan


Spark Application Id:spark-cb8bd75733de4343aa60c8d0804cfe0c


Hive Session ID = e944632d-7f35-4a61-992f-eb66e0a616db


+--------------------+
|           namespace|
+--------------------+
|               cyber|
|             default|
|  information_schema|
|logistics_mlops_p...|
|             martydb|
|          smallfiles|
|    spain_gas_prices|
|                 sys|
|telco_mlops_pauld...|
+--------------------+



### Writing data to the table
Using a iteration loop for each record, we save the data to the database.

In [34]:
for index,row in df.iterrows():
    #print (f"Success:{row['success']} Test:{row['test']} Column:{row['column_tested']} Element Count:{row['element_count']}")
    test_time=datetime.strptime(str(row['test_time']),"%Y-%m-%d %H:%M:%S.%f")
    test_time_str_iceberg=test_time.strftime('%Y-%m-%d')
    #print (test_time_str_iceberg)
    # Here we will need a way to insert the data in the SQL
    sql_sentence= "insert into "+quality_database+"."+quality_table
    sql_sentence+= "(sucess, test, expectation_config, column_tested, element_count, unexpected_count, "
    sql_sentence+= "unexpected_percent, test_time, table_tested, database_tested) "
    sql_sentence+= "values ("
    sql_sentence+="'"
    sql_sentence+=str(row['success'])
    sql_sentence+="'"
    sql_sentence+=","
    sql_sentence+="'"
    sql_sentence+=row['test']
    sql_sentence+="'"
    sql_sentence+=","
    sql_sentence+="'"
    sql_sentence+=str(row['expectation_config'])
    sql_sentence+="'"
    sql_sentence+=","
    sql_sentence+="'"
    sql_sentence+=row['column_tested']
    sql_sentence+="'"
    sql_sentence+=","
    sql_sentence+=str(row['element_count'])
    sql_sentence+=","
    sql_sentence+=str(row['unexpected_count'])
    sql_sentence+=","
    sql_sentence+=str(row['unexpected_percent'])
    sql_sentence+=","
    sql_sentence+="cast('"+test_time_str_iceberg+"' as date)"
    sql_sentence+=","
    sql_sentence+="'"
    sql_sentence+=row['table_tested']
    sql_sentence+="'"
    sql_sentence+=","
    sql_sentence+="'"
    sql_sentence+=row['database_tested']
    sql_sentence+="'"
    sql_sentence+=")"
    #print (sql_sentence)
    spark.sql(sql_sentence).show()

                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++

++
||
++
++

++
||
++
++



                                                                                

++
||
++
++



                                                                                

++
||
++
++

++
||
++
++



                                                                                

++
||
++
++

++
||
++
++



                                                                                

++
||
++
++

++
||
++
++

++
||
++
++



                                                                                

++
||
++
++



In [16]:
## Let's see the list of expectations loaded
##validator.get_expectation_suite()

In [27]:
#my_suite = validator.get_expectation_suite("gas_stations_expectation_suite")

In [28]:
#my_suite.show_expectations_by_expectation_type()