In [23]:
from airflow.decorators import dag, task
from airflow.operators.python import PythonOperator
from datetime import datetime
import os
import random
import great_expectations as gx
import shutil
import pandas as pd
import numpy as np
from collections import namedtuple
import psycopg2
from psycopg2 import sql
from jinja2 import Environment, FileSystemLoader
from pymsteams import connectorcard
import urllib.parse

In [30]:
def validate_data(file_path: str) -> dict:

    df = ge.read_csv(file_path)
    validation_results = []
    bad_rows = []

    for i in range(len(df)):
        row_results = {"row_index": i, "failed_rules": [], "passed_rules": []}
        validator = df.iloc[[i]]

        # Execute all the validation rules
        date_quality = dict(temp_df.expect_column_values_to_not_be_null(column="Date"))
        cpi_quality = dict(temp_df.expect_column_values_to_not_be_null(column="CPI"))
        column_order = dict(temp_df.expect_table_columns_to_match_ordered_list(column_list=["Store","Dept","Date","Temperature",
                                                                                            "Fuel_Price","MarkDown1","MarkDown2",
                                                                                            "MarkDown3","MarkDown4","MarkDown5",
                                                                                            "CPI","Unemployment","IsHoliday","Type","Size"]))
        store_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="Store", type_='int64'))
        dept_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="Dept", type_='int64'))
        date_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="Date", type_='object'))
        temperature_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="Temperature", type_='float64'))
        fuel_price_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="Fuel_Price", type_='float64'))
        markdown1_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="MarkDown1", type_='float64'))
        markdown2_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="MarkDown2", type_='float64'))
        markdown3_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="MarkDown3", type_='float64'))
        markdown4_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="MarkDown4", type_='float64'))
        markdown5_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="MarkDown5", type_='float64'))
        cpi_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="CPI", type_='float64'))
        unemployment_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="Unemployment", type_='float64'))
        isholiday_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="IsHoliday", type_='bool'))
        type_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="Type", type_='object'))
        size_type_quality = dict(temp_df.expect_column_values_to_be_of_type(column="Size", type_='int64'))

        # Check which rules failed and which ones passed
        if not date_quality["success"]:
            row_results["failed_rules"].append("date_quality")
        else:
            row_results["passed_rules"].append("date_quality")

        if not cpi_quality["success"]:
            row_results["failed_rules"].append("cpi_quality")
        else:
            row_results["passed_rules"].append("cpi_quality")

        if not column_order["success"]:
            row_results["failed_rules"].append("column_order")
        else:
            row_results["passed_rules"].append("column_order")

        if not store_type_quality["success"]:
            row_results["failed_rules"].append("store_type_quality")
        else:
            row_results["passed_rules"].append("store_type_quality")

        if not dept_type_quality["success"]:
            row_results["failed_rules"].append("dept_type_quality")
        else:
            row_results["passed_rules"].append("dept_type_quality")

        if not date_type_quality["success"]:
            row_results["failed_rules"].append("date_type_quality")
        else:
            row_results["passed_rules"].append("date_type_quality")

        if not temperature_type_quality["success"]:
            row_results["failed_rules"].append("temperature_type_quality")
        else:
            row_results["passed_rules"].append("temperature_type_quality")

        if not fuel_price_type_quality["success"]:
            row_results["failed_rules"].append("fuel_price_type_quality")
        else:
            row_results["passed_rules"].append("fuel_price_type_quality")

        if not markdown1_type_quality["success"]:
            row_results["failed_rules"].append("markdown1_type_quality")
        else:
            row_results["passed_rules"].append("markdown1_type_quality")

        if not markdown2_type_quality["success"]:
            row_results["failed_rules"].append("markdown2_type_quality")
        else:
            row_results["passed_rules"].append("markdown2_type_quality")

        if not markdown3_type_quality["success"]:
            row_results["failed_rules"].append("markdown3_type_quality")
        else:
            row_results["passed_rules"].append("markdown3_type_quality")

        if not markdown4_type_quality["success"]:
            row_results["failed_rules"].append("markdown4_type_quality")
        else:
            row_results["passed_rules"].append("markdown4_type_quality")

        if not markdown5_type_quality["success"]:
            row_results["failed_rules"].append("markdown5_type_quality")
        else:
            row_results["passed_rules"].append("markdown5_type_quality")

        if not cpi_type_quality["success"]:
            row_results["failed_rules"].append("cpi_type_quality")
        else:
            row_results["passed_rules"].append("cpi_type_quality")

        if not unemployment_type_quality["success"]:
            row_results["failed_rules"].append("unemployment_type_quality")
        else:
            row_results["passed_rules"].append("unemployment_type_quality")

        if not isholiday_type_quality["success"]:
            row_results["failed_rules"].append("isholiday_type_quality")
        else:
            row_results["passed_rules"].append("isholiday_type_quality")

        if not type_type_quality["success"]:
            row_results["failed_rules"].append("type_type_quality")
        else:
            row_results["passed_rules"].append("type_type_quality")

        if not size_type_quality["success"]:
            row_results["failed_rules"].append("size_type_quality")
        else:
            row_results["passed_rules"].append("size_type_quality")

        # Append the results for this row to the overall validation results
        validation_results.append(row_results)

        if row_results["failed_rules"]:
            bad_rows.append(i)

    corrupted_ratio = len([result for result in validation_results if result["failed_rules"]]) / len(df)

    return {
        "validation_results": validation_results,
        "corrupted_ratio": corrupted_ratio,
        "file_path": file_path,
        "bad_rows": bad_rows
    }

In [31]:
def read_data() -> str:
    raw_data_directory = '/Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/airflow/dags/corrupted_data'
    random_file = random.choice(os.listdir(raw_data_directory))
    file_path = os.path.join(raw_data_directory, random_file)
    return file_path

In [32]:
file_path = read_data()

In [7]:
results = validate_data(file_path)

[0m


In [8]:
results

{'validation_results': [{'row_index': 0,
   'failed_rules': ['store_type_quality',
    'dept_type_quality',
    'cpi_type_quality',
    'isholiday_type_quality',
    'size_type_quality'],
   'passed_rules': ['date_quality',
    'cpi_quality',
    'column_order',
    'date_type_quality',
    'temperature_type_quality',
    'fuel_price_type_quality',
    'markdown1_type_quality',
    'markdown2_type_quality',
    'markdown3_type_quality',
    'markdown4_type_quality',
    'markdown5_type_quality',
    'unemployment_type_quality',
    'type_type_quality']},
  {'row_index': 1,
   'failed_rules': ['store_type_quality',
    'dept_type_quality',
    'cpi_type_quality',
    'isholiday_type_quality',
    'size_type_quality'],
   'passed_rules': ['date_quality',
    'cpi_quality',
    'column_order',
    'date_type_quality',
    'temperature_type_quality',
    'fuel_price_type_quality',
    'markdown1_type_quality',
    'markdown2_type_quality',
    'markdown3_type_quality',
    'markdown4_type_

In [9]:
for result in results['validation_results']:
    row_index = results['bad_rows']
    failed_rules = np.unique(result['failed_rules'])

print(len(failed_rules))

5


In [17]:
results["validation_results"][1]["failed_rules"]

['store_type_quality',
 'dept_type_quality',
 'cpi_type_quality',
 'isholiday_type_quality',
 'size_type_quality']

In [22]:
result = results["validation_results"]
failed_rules = []
passed_rules = []
for i in range(len(result)):
    fails = results["validation_results"][i]["failed_rules"]
    for el in fails:
        failed_rules.append(el)
    success = results["validation_results"][i]["passed_rules"]
    for el in success:
        passed_rules.append(el)

In [26]:
def send_alerts(validation_results, teams_webhook, report_directory):

        result = results["validation_results"]
        failed_rules = []
        passed_rules = []
        for i in range(len(result)):
            fails = results["validation_results"][i]["failed_rules"]
            for el in fails:
                failed_rules.append(el)
            success = results["validation_results"][i]["passed_rules"]
            for el in success:
                passed_rules.append(el)
        
        num_failed_rules = len(np.unique(failed_rules))

        if num_failed_rules > 5:
            criticality = "High"
        elif num_failed_rules > 2:
            criticality = "Medium"
        else:
            criticality = "Low"

        html_content = f"""
        <html>
        <head>
            <title>Data Validation Report</title>
        </head>
        <body>
            <h1>Data Validation Report</h1>
            <p>{num_failed_rules} data problem(s) detected with {criticality} criticality.</p>
            <h2>Validation Results</h2>
            <ul>
        """

        html_content += "<li>"
        html_content += f"Failed rules: {', '.join(failed_rules)}<br>"
        html_content += f"Passed rules: {', '.join(passed_rules)}<br>"
        html_content += "</li>"

        html_content += """
            </ul>
        </body>
        </html>
        """

        report_date = datetime.now().strftime("%Y-%m-%d")
        report_file = os.path.join(report_directory, f'data_val_report_{report_date}.html')

        with open(report_file, 'w') as f:
            f.write(html_content)
        
        alert = connectorcard(teams_webhook)
        alert.title(f"{num_failed_rules} data problem(s) detected with {criticality} criticality.")
        alert.text(f"Link to the report: file://{report_file}")
        alert.send()

        print("Alert sent successfully.")

In [27]:
teams_webhook = "https://epitafr.webhook.office.com/webhookb2/20776877-17e6-405b-bf9f-f0810f814f2a@3534b3d7-316c-4bc9-9ede-605c860f49d2/IncomingWebhook/1a33c6e51db14cb5861ae1833b3e578b/f5fc93d8-16f4-4ce7-950b-5f1d0d1c64dc"
report_directory = "/Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/airflow/dags/reports"

send_alerts(results, teams_webhook, report_directory)

Alert sent successfully.


## Test with "classic" GE method

In [33]:
import great_expectations as gx
from great_expectations.checkpoint import Checkpoint

context = gx.get_context()
validator = context.sources.pandas_default.read_csv(file_path)

validator.expect_table_columns_to_match_ordered_list(column_list=["Store","Dept","Date","Temperature",
                                                                                            "Fuel_Price","MarkDown1","MarkDown2",
                                                                                            "MarkDown3","MarkDown4","MarkDown5",
                                                                                            "CPI","Unemployment","IsHoliday","Type","Size"])
validator.expect_column_values_to_not_be_null(column="Date")
validator.expect_column_values_to_not_be_null(column="CPI")
validator.expect_column_values_to_be_of_type(column="Store", type_='int64')
validator.expect_column_values_to_be_of_type(column="Dept", type_='int64')
validator.expect_column_values_to_be_of_type(column="Date", type_='object')
validator.expect_column_values_to_be_of_type(column="Temperature", type_='float64')
validator.expect_column_values_to_be_of_type(column="Fuel_Price", type_='float64')
validator.expect_column_values_to_be_of_type(column="MarkDown1", type_='float64')
validator.expect_column_values_to_be_of_type(column="MarkDown2", type_='float64')
validator.expect_column_values_to_be_of_type(column="MarkDown3", type_='float64')
validator.expect_column_values_to_be_of_type(column="MarkDown4", type_='float64')
validator.expect_column_values_to_be_of_type(column="MarkDown5", type_='float64')
validator.expect_column_values_to_be_of_type(column="CPI", type_='float64')
validator.expect_column_values_to_be_of_type(column="Unemployment", type_='float64')
validator.expect_column_values_to_be_of_type(column="IsHoliday", type_='bool')
validator.expect_column_values_to_be_of_type(column="Type", type_='object')
validator.expect_column_values_to_be_of_type(column="Size", type_='int64')

validator.save_expectation_suite()

checkpoint = context.add_or_update_checkpoint(
    name="my_test_checkpoint",
    validator=validator,
)
checkpoint_result = checkpoint.run()
context.view_validation_result(checkpoint_result)

[[34m2024-04-20T16:00:29.911+0200[0m] {[34mfile_data_context.py:[0m222} INFO[0m - FileDataContext loading fluent config[0m
[[34m2024-04-20T16:00:29.924+0200[0m] {[34mconfig.py:[0m185} INFO[0m - Loading 'datasources' ->
[{'assets': [...], 'name': 'test_data', 'type': 'pandas'},
 {'name': 'default_pandas_datasource', 'type': 'pandas'}][0m
[[34m2024-04-20T16:00:29.927+0200[0m] {[34mfluent_base_model.py:[0m276} INFO[0m - _PandasDataAsset.dict() - missing `config_provider`, skipping config substitution[0m
[[34m2024-04-20T16:00:29.974+0200[0m] {[34mabstract_data_context.py:[0m5504} INFO[0m - Loaded 'test_data' from fluent config[0m
[[34m2024-04-20T16:00:29.976+0200[0m] {[34mfile_data_context.py:[0m174} INFO[0m - Saving 1 Fluent Datasources to /Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/gx/great_expectations.yml[0m
[[34m2024-04-20T16:00:29.976+0200[0m] {[34mfluent_base_model.py:[0m276} INFO[0m - DataFrameAsset.dict() - missing `config_provider`, ski

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[[34m2024-04-20T16:00:30.411+0200[0m] {[34mvalidator.py:[0m1461} INFO[0m - 	11 expectation(s) included in expectation_suite. Omitting 7 expectation(s) that failed when last run; set discard_failed_expectations=False to include them.[0m
[[34m2024-04-20T16:00:30.465+0200[0m] {[34mvalidator.py:[0m1461} INFO[0m - 	11 expectation(s) included in expectation_suite.[0m


Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

In [34]:
checkpoint_result

{
  "run_id": {
    "run_name": null,
    "run_time": "2024-04-20T16:00:30.440424+02:00"
  },
  "run_results": {
    "ValidationResultIdentifier::default/__none__/20240420T140030.440424Z/default_pandas_datasource-#ephemeral_pandas_asset": {
      "validation_result": {
        "success": true,
        "results": [
          {
            "success": true,
            "expectation_config": {
              "expectation_type": "expect_table_columns_to_match_ordered_list",
              "kwargs": {
                "column_list": [
                  "Store",
                  "Dept",
                  "Date",
                  "Temperature",
                  "Fuel_Price",
                  "MarkDown1",
                  "MarkDown2",
                  "MarkDown3",
                  "MarkDown4",
                  "MarkDown5",
                  "CPI",
                  "Unemployment",
                  "IsHoliday",
                  "Type",
                  "Size"
                ],
         

In [60]:
def validate_data(file_path: str) -> dict:
    
    context = gx.get_context()
    validator = context.sources.pandas_default.read_csv(file_path)
    df = pd.read_csv(file_path)
    validator.expect_table_columns_to_match_ordered_list(column_list=["Store","Dept","Date","Temperature",
                                                                                                "Fuel_Price","MarkDown1","MarkDown2",
                                                                                                "MarkDown3","MarkDown4","MarkDown5",
                                                                                                "CPI","Unemployment","IsHoliday","Type","Size"])
    # Execute all the validation rules
    for column in df.columns:
        validator.expect_column_values_to_not_be_null(column=column)
    
    validator.expect_column_values_to_be_of_type(column="Store", type_='int64')
    validator.expect_column_values_to_be_of_type(column="Dept", type_='int64')
    validator.expect_column_values_to_be_of_type(column="Date", type_='object')
    validator.expect_column_values_to_be_of_type(column="Temperature", type_='float64')
    validator.expect_column_values_to_be_of_type(column="Fuel_Price", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown1", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown2", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown3", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown4", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown5", type_='float64')
    validator.expect_column_values_to_be_of_type(column="CPI", type_='float64')
    validator.expect_column_values_to_be_of_type(column="Unemployment", type_='float64')
    validator.expect_column_values_to_be_of_type(column="IsHoliday", type_='bool')
    validator.expect_column_values_to_be_of_type(column="Type", type_='object')
    validator.expect_column_values_to_be_of_type(column="Size", type_='int64')

    validator.save_expectation_suite(discard_failed_expectations=False)

    checkpoint = context.add_or_update_checkpoint(
        name="dsp_checkpoint",
        validator=validator
    )
    checkpoint_result = checkpoint.run()

    return checkpoint_result

In [61]:
file_path = '/Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/airflow/dags/corrupted_data/shuffled_partition18.csv'
checkpoint_result = validate_data(file_path)

[[34m2024-04-20T16:35:32.346+0200[0m] {[34mfile_data_context.py:[0m222} INFO[0m - FileDataContext loading fluent config[0m
[[34m2024-04-20T16:35:32.353+0200[0m] {[34mconfig.py:[0m185} INFO[0m - Loading 'datasources' ->
[{'assets': [...], 'name': 'test_data', 'type': 'pandas'},
 {'name': 'default_pandas_datasource', 'type': 'pandas'}][0m
[[34m2024-04-20T16:35:32.354+0200[0m] {[34mfluent_base_model.py:[0m276} INFO[0m - _PandasDataAsset.dict() - missing `config_provider`, skipping config substitution[0m
[[34m2024-04-20T16:35:32.382+0200[0m] {[34mabstract_data_context.py:[0m5504} INFO[0m - Loaded 'test_data' from fluent config[0m
[[34m2024-04-20T16:35:32.382+0200[0m] {[34mfile_data_context.py:[0m174} INFO[0m - Saving 1 Fluent Datasources to /Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/gx/great_expectations.yml[0m
[[34m2024-04-20T16:35:32.383+0200[0m] {[34mfluent_base_model.py:[0m276} INFO[0m - DataFrameAsset.dict() - missing `config_provider`, ski

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[[34m2024-04-20T16:35:32.815+0200[0m] {[34mvalidator.py:[0m1461} INFO[0m - 	31 expectation(s) included in expectation_suite.[0m
[[34m2024-04-20T16:35:32.904+0200[0m] {[34mvalidator.py:[0m1461} INFO[0m - 	31 expectation(s) included in expectation_suite.[0m


Calculating Metrics:   0%|          | 0/78 [00:00<?, ?it/s]

In [62]:
context.view_validation_result(checkpoint_result)

In [109]:
checkpoint_result

{
  "run_id": {
    "run_name": null,
    "run_time": "2024-04-20T16:35:32.863600+02:00"
  },
  "run_results": {
    "ValidationResultIdentifier::default/__none__/20240420T143532.863600Z/default_pandas_datasource-#ephemeral_pandas_asset": {
      "validation_result": {
        "success": false,
        "results": [
          {
            "success": false,
            "expectation_config": {
              "expectation_type": "expect_table_columns_to_match_ordered_list",
              "kwargs": {
                "column_list": [
                  "Store",
                  "Dept",
                  "Date",
                  "Temperature",
                  "Fuel_Price",
                  "MarkDown1",
                  "MarkDown2",
                  "MarkDown3",
                  "MarkDown4",
                  "MarkDown5",
                  "CPI",
                  "Unemployment",
                  "IsHoliday",
                  "Type",
                  "Size"
                ],
       

In [93]:
checkpoint_result["run_results"]

{ValidationResultIdentifier::default/__none__/20240420T143532.863600Z/default_pandas_datasource-#ephemeral_pandas_asset: {'validation_result': {
    "success": false,
    "results": [
      {
        "success": false,
        "expectation_config": {
          "expectation_type": "expect_table_columns_to_match_ordered_list",
          "kwargs": {
            "column_list": [
              "Store",
              "Dept",
              "Date",
              "Temperature",
              "Fuel_Price",
              "MarkDown1",
              "MarkDown2",
              "MarkDown3",
              "MarkDown4",
              "MarkDown5",
              "CPI",
              "Unemployment",
              "IsHoliday",
              "Type",
              "Size"
            ],
            "batch_id": "default_pandas_datasource-#ephemeral_pandas_asset"
          },
          "meta": {}
        },
        "result": {
          "observed_value": [
            "IsHoliday",
            "Temperature",
     

In [114]:
checkpoint_result.list_validation_result_identifiers()

[ValidationResultIdentifier::default/__none__/20240420T143532.863600Z/default_pandas_datasource-#ephemeral_pandas_asset]

In [115]:
checkpoint_result.list_validation_results(group_by="expectation_suite_name")

{'default': [{
    "success": false,
    "results": [
      {
        "success": false,
        "expectation_config": {
          "expectation_type": "expect_table_columns_to_match_ordered_list",
          "kwargs": {
            "column_list": [
              "Store",
              "Dept",
              "Date",
              "Temperature",
              "Fuel_Price",
              "MarkDown1",
              "MarkDown2",
              "MarkDown3",
              "MarkDown4",
              "MarkDown5",
              "CPI",
              "Unemployment",
              "IsHoliday",
              "Type",
              "Size"
            ],
            "batch_id": "default_pandas_datasource-#ephemeral_pandas_asset"
          },
          "meta": {}
        },
        "result": {
          "observed_value": [
            "IsHoliday",
            "Temperature",
            "Fuel_Price",
            "MarkDown1",
            "Dept",
            "MarkDown3",
            "Date",
            "Store"

In [129]:
checkpoint_result.to_json_dict()

{'run_id': {'run_name': None, 'run_time': '2024-04-20T16:35:32.863600+02:00'},
 'run_results': {'ValidationResultIdentifier::default/__none__/20240420T143532.863600Z/default_pandas_datasource-#ephemeral_pandas_asset': {'validation_result': {'success': False,
    'results': [{'success': False,
      'expectation_config': {'expectation_type': 'expect_table_columns_to_match_ordered_list',
       'kwargs': {'column_list': ['Store',
         'Dept',
         'Date',
         'Temperature',
         'Fuel_Price',
         'MarkDown1',
         'MarkDown2',
         'MarkDown3',
         'MarkDown4',
         'MarkDown5',
         'CPI',
         'Unemployment',
         'IsHoliday',
         'Type',
         'Size'],
        'batch_id': 'default_pandas_datasource-#ephemeral_pandas_asset'},
       'meta': {}},
      'result': {'observed_value': ['IsHoliday',
        'Temperature',
        'Fuel_Price',
        'MarkDown1',
        'Dept',
        'MarkDown3',
        'Date',
        'Store',


In [137]:
result_json = checkpoint_result.to_json_dict()

In [138]:
result_json

{'run_id': {'run_name': None, 'run_time': '2024-04-20T16:35:32.863600+02:00'},
 'run_results': {'ValidationResultIdentifier::default/__none__/20240420T143532.863600Z/default_pandas_datasource-#ephemeral_pandas_asset': {'validation_result': {'success': False,
    'results': [{'success': False,
      'expectation_config': {'expectation_type': 'expect_table_columns_to_match_ordered_list',
       'kwargs': {'column_list': ['Store',
         'Dept',
         'Date',
         'Temperature',
         'Fuel_Price',
         'MarkDown1',
         'MarkDown2',
         'MarkDown3',
         'MarkDown4',
         'MarkDown5',
         'CPI',
         'Unemployment',
         'IsHoliday',
         'Type',
         'Size'],
        'batch_id': 'default_pandas_datasource-#ephemeral_pandas_asset'},
       'meta': {}},
      'result': {'observed_value': ['IsHoliday',
        'Temperature',
        'Fuel_Price',
        'MarkDown1',
        'Dept',
        'MarkDown3',
        'Date',
        'Store',


In [146]:
statistics = result_json['run_results']['ValidationResultIdentifier::default/__none__/20240420T143532.863600Z/default_pandas_datasource-#ephemeral_pandas_asset']['validation_result']['statistics']

31

In [139]:
# Assuming checkpoint_result is the JSON dictionary returned by checkpoint_result.to_json_dict()
run_results = result_json.get("run_results", {})

for expectation_identifier, expectation_result in run_results.items():
    # Accessing the expectation result dictionary
    validation_result = expectation_result.get("validation_result", {})
    success = validation_result.get("success", False)
    results = validation_result.get("results", [])

    # Iterating over the results for each expectation
    for result in results:
        expectation_config = result.get("expectation_config", {})
        expectation_type = expectation_config.get("expectation_type", "")
        kwargs = expectation_config.get("kwargs", {})
        column = kwargs.get("column", "")
        
        result_info = result.get("result", {})
        element_count = result_info.get("element_count", 0)
        unexpected_count = result_info.get("unexpected_count", 0)
        unexpected_percent = result_info.get("unexpected_percent", 0.0)

        # Print or process the extracted information as needed
        print("Success:", success)
        print("Expectation Type:", expectation_type)
        print("Column:", column)
        print("Element Count:", element_count)
        print("Unexpected Count:", unexpected_count)
        print("Unexpected Percent:", unexpected_percent)


Success: False
Expectation Type: expect_table_columns_to_match_ordered_list
Column: 
Element Count: 0
Unexpected Count: 0
Unexpected Percent: 0.0
Success: False
Expectation Type: expect_column_values_to_not_be_null
Column: IsHoliday
Element Count: 2107
Unexpected Count: 0
Unexpected Percent: 0.0
Success: False
Expectation Type: expect_column_values_to_be_of_type
Column: IsHoliday
Element Count: 0
Unexpected Count: 0
Unexpected Percent: 0.0
Success: False
Expectation Type: expect_column_values_to_not_be_null
Column: Temperature
Element Count: 2107
Unexpected Count: 0
Unexpected Percent: 0.0
Success: False
Expectation Type: expect_column_values_to_be_of_type
Column: Temperature
Element Count: 0
Unexpected Count: 0
Unexpected Percent: 0.0
Success: False
Expectation Type: expect_column_values_to_not_be_null
Column: Fuel_Price
Element Count: 2107
Unexpected Count: 0
Unexpected Percent: 0.0
Success: False
Expectation Type: expect_column_values_to_be_of_type
Column: Fuel_Price
Element Count: 

In [140]:
# Initialize an empty dictionary to store the extracted information
expectation_data = {}

for expectation_identifier, expectation_result in run_results.items():
    # Accessing the expectation result dictionary
    validation_result = expectation_result.get("validation_result", {})
    success = validation_result.get("success", False)
    results = validation_result.get("results", [])

    # Iterating over the results for each expectation
    for result in results:
        expectation_config = result.get("expectation_config", {})
        expectation_type = expectation_config.get("expectation_type", "")
        kwargs = expectation_config.get("kwargs", {})
        column = kwargs.get("column", "all_columns" if not kwargs.get("column") else kwargs.get("column"))
        
        result_info = result.get("result", {})
        element_count = result_info.get("element_count", 0)
        unexpected_count = result_info.get("unexpected_count", 0)
        unexpected_percent = result_info.get("unexpected_percent", 0.0)

        # Store the extracted information in the dictionary
        if column not in expectation_data:
            expectation_data[column] = []

        # Append the information for the current expectation to the list
        expectation_data[column].append({
            "success": success,
            "expectation_type": expectation_type,
            "element_count": element_count,
            "unexpected_count": unexpected_count,
            "unexpected_percent": unexpected_percent
        })

# Print the dictionary containing the extracted information
expectation_data


{'all_columns': [{'success': False,
   'expectation_type': 'expect_table_columns_to_match_ordered_list',
   'element_count': 0,
   'unexpected_count': 0,
   'unexpected_percent': 0.0}],
 'IsHoliday': [{'success': False,
   'expectation_type': 'expect_column_values_to_not_be_null',
   'element_count': 2107,
   'unexpected_count': 0,
   'unexpected_percent': 0.0},
  {'success': False,
   'expectation_type': 'expect_column_values_to_be_of_type',
   'element_count': 0,
   'unexpected_count': 0,
   'unexpected_percent': 0.0}],
 'Temperature': [{'success': False,
   'expectation_type': 'expect_column_values_to_not_be_null',
   'element_count': 2107,
   'unexpected_count': 0,
   'unexpected_percent': 0.0},
  {'success': False,
   'expectation_type': 'expect_column_values_to_be_of_type',
   'element_count': 0,
   'unexpected_count': 0,
   'unexpected_percent': 0.0}],
 'Fuel_Price': [{'success': False,
   'expectation_type': 'expect_column_values_to_not_be_null',
   'element_count': 2107,
   'u

In [141]:
import json

# Extracting the value of the 'update_data_docs' variable
update_data_docs = result_json.get('run_results', {}).get('ValidationResultIdentifier::default/__none__/20240420T143532.863600Z/default_pandas_datasource-#ephemeral_pandas_asset', {}).get('actions_results', {}).get('update_data_docs', {}).get('local_site', None)

# Printing the value of 'update_data_docs'
print(update_data_docs)


file:///Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/gx/uncommitted/data_docs/local_site/validations/default/__none__/20240420T143532.863600Z/default_pandas_datasource-%23ephemeral_pandas_asset.html


In [76]:
indexes = []

for expectation_identifier, expectation_result in run_results.items():
    # Accessing the expectation result dictionary
    validation_result = expectation_result.get("validation_result", {})
    success = validation_result.get("success", False)
    results = validation_result.get("results", [])

    # Iterating over the results for each expectation
    for result in results:
        result_info = result.get("result", {})
        rows = result_info.get("partial_unexpected_index_list")
        if rows != None:
            for row in rows:
                if row not in indexes:
                    indexes.append(row)

indexes

[158,
 236,
 274,
 319,
 454,
 691,
 976,
 989,
 1005,
 1007,
 1021,
 1227,
 1412,
 1467,
 1542,
 1559,
 1668,
 1824,
 1840,
 1898,
 28,
 169,
 314,
 388,
 431,
 433,
 513,
 612,
 632,
 717,
 763,
 765,
 912,
 988,
 995,
 1289,
 1296,
 1304,
 1485,
 1537,
 33,
 130,
 131,
 285,
 423,
 438,
 448,
 464,
 494,
 568,
 719,
 746,
 842,
 1350,
 1444,
 1562,
 1580,
 1598,
 1644,
 1759,
 334,
 756,
 817,
 870,
 1091,
 1109,
 1138,
 1153,
 1158,
 1220,
 1518,
 1526,
 1621,
 1643,
 1719,
 1761,
 1841,
 2034,
 170,
 224,
 287,
 363,
 473,
 713,
 745,
 844,
 923,
 964,
 1032,
 1071,
 1076,
 1122,
 1155,
 1199,
 1335,
 1466,
 1756,
 45,
 219,
 220,
 221,
 222,
 223,
 225,
 226,
 227,
 228,
 229,
 230,
 231,
 232,
 233,
 234,
 235,
 237,
 0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 51,
 114,
 83,
 115,
 148,
 152,
 189,
 263,
 307,
 606,
 781,
 1210,
 1510,
 1683,
 1703,
 1966,
 325,
 516,
 733,
 737,
 858,
 1009,
 1073,
 1166,
 1221,
 1395,
 1404,
 1511

## Function

- read-data: reads one file randomly from raw-data and return the file path
- validate-data: checks for any data errors in the file and determines the data errors criticality
- send-alerts: generates the data validation report and sends an alert using teams
- save-file: splits the file in 2 files if needed and saves them in folder bad-data or good-data
- save-data-errors: saves the data problems along with data criticality to the database

read-data: OK

In [7]:
def read_data() -> str:
    raw_data_directory = '/Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/airflow/dags/corrupted_data'
    random_file = random.choice(os.listdir(raw_data_directory))
    file_path = os.path.join(raw_data_directory, random_file)
    return file_path

validate-data: OK

In [8]:
def validate_data(file_path: str) -> dict:
    
    context = gx.get_context()
    validator = context.sources.pandas_default.read_csv(file_path)
    df = pd.read_csv(file_path)
    validator.expect_table_columns_to_match_ordered_list(column_list=["Store","Dept","Date","Temperature",
                                                                                                "Fuel_Price","MarkDown1","MarkDown2",
                                                                                                "MarkDown3","MarkDown4","MarkDown5",
                                                                                                "CPI","Unemployment","IsHoliday","Type","Size"])
    # Execute all the validation rules
    for column in df.columns:
        validator.expect_column_values_to_not_be_null(column=column)
    
    validator.expect_column_values_to_be_of_type(column="Store", type_='int64')
    validator.expect_column_values_to_be_of_type(column="Dept", type_='int64')
    validator.expect_column_values_to_be_of_type(column="Date", type_='object')
    validator.expect_column_values_to_be_of_type(column="Temperature", type_='float64')
    validator.expect_column_values_to_be_of_type(column="Fuel_Price", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown1", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown2", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown3", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown4", type_='float64')
    validator.expect_column_values_to_be_of_type(column="MarkDown5", type_='float64')
    validator.expect_column_values_to_be_of_type(column="CPI", type_='float64')
    validator.expect_column_values_to_be_of_type(column="Unemployment", type_='float64')
    validator.expect_column_values_to_be_of_type(column="IsHoliday", type_='bool')
    validator.expect_column_values_to_be_of_type(column="Type", type_='object')
    validator.expect_column_values_to_be_of_type(column="Size", type_='int64')

    validator.save_expectation_suite(discard_failed_expectations=False)

    checkpoint = context.add_or_update_checkpoint(
        name="dsp_checkpoint",
        validator=validator
    )
    checkpoint_result = checkpoint.run()
    result_json = checkpoint_result.to_json_dict()

    return checkpoint_result, result_json

send-alerts: OK

In [51]:
def send_alerts(checkpoint_result, result_json, teams_webhook):
    key = list(checkpoint_result.get_statistics()['validation_statistics'])[0]
    report_link = result_json.get('run_results', {}).get(list(result_json.get('run_results', {}))[0], {}).get('actions_results', {}).get('update_data_docs', {}).get('local_site', None)
    encoded_report_link = urllib.parse.quote(report_link, safe=':/')
    statistics = checkpoint_result.get_statistics()['validation_statistics'][key]

    total_expectations = statistics["evaluated_expectations"]
    successful_expectations = statistics["successful_expectations"]
    failed_expecations = statistics["unsuccessful_expectations"]
    percentage = statistics["success_percent"]

    status = ""

    if percentage < 20:
        status = "LOW"
    if 20 < percentage < 50:
        status = "MEDIUM"
    if 50 < percentage < 80:
        status = "MAJOR"
    else:
        stayus = "CRITIC"
    
    alert = connectorcard(teams_webhook)
    alert.title(f"{status} ALERT")
    alert.text(f"{successful_expectations} rules succeeded, and {failed_expecations} rules failed out of {total_expectations}. Success ratio: {percentage}. To open the report in terminal run: `cd {encoded_report_link[7:149]} && open *.html `")
    alert.send()
    
    print("Alert sent successfully.")

save_data_errors

In [63]:
def save_data_errors(db_params, result_json):

    current_datetime = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    run_results = result_json.get("run_results", {})
    
    expectation_data = {}

    for expectation_identifier, expectation_result in run_results.items():
        # Accessing the expectation result dictionary
        validation_result = expectation_result.get("validation_result", {})
        success = validation_result.get("success", False)
        results = validation_result.get("results", [])

        # Iterating over the results for each expectation
        for result in results:
            expectation_config = result.get("expectation_config", {})
            expectation_type = expectation_config.get("expectation_type", "")
            kwargs = expectation_config.get("kwargs", {})
            column = kwargs.get("column", "all_columns" if not kwargs.get("column") else kwargs.get("column"))
            
            result_info = result.get("result", {})
            element_count = result_info.get("element_count", 0)
            unexpected_count = result_info.get("unexpected_count", 0)
            unexpected_percent = result_info.get("unexpected_percent", 0.0)

            # Store the extracted information in the dictionary
            if column not in expectation_data:
                expectation_data[column] = []

            # Append the information for the current expectation to the list
            expectation_data[column].append({
                "success": success,
                "expectation_type": expectation_type,
                "element_count": element_count,
                "unexpected_count": unexpected_count,
                "unexpected_percent": unexpected_percent
            })
    
    # Connect to the PostgreSQL database
    conn = psycopg2.connect(
        dbname=db_params['database'],
        user=db_params['user'],
        password=db_params['password'],
        host=db_params['host'],
        port=db_params['port']
    )

    # Create a cursor object using the cursor() method
    cursor = conn.cursor()

    # Loop through the expectation data and insert into the database
    for column, data in expectation_data.items():
        for entry in data:
            # Extract relevant information
            rule = entry['expectation_type']
            rows = entry['element_count']
            missing_values = entry['unexpected_count']
            percentage = entry['unexpected_percent']
            # Calculate criticality based on percentage, adjust as needed
            criticality = 0
            if percentage == 0:
                    criticality = 1
            elif percentage <= 0.25:
                criticality = 2
            elif percentage <= 0.5:
                criticality = 3
            elif percentage <= 0.75:
                criticality = 4
            else:
                criticality = 5

            # Prepare SQL query to insert data
            insert_query = sql.SQL("INSERT INTO data_quality_errors (date, rule, rows, missing_values, percentage, criticality) VALUES (%s, %s, %s, %s, %s, %s)")

            # Execute the SQL query
            cursor.execute(insert_query, (current_datetime, rule, rows, missing_values, percentage, criticality))

    # Commit changes
    conn.commit()

    # Close the cursor and connection
    cursor.close()
    conn.close()

    print("Data errors saved successfully.")

save_file

In [110]:
def save_file(checkpoint_result: dict, file_path) -> None:
    run_results = result_json.get("run_results", {})
    key = list(checkpoint_result.get_statistics()['validation_statistics'])[0]
    statistics = checkpoint_result.get_statistics()['validation_statistics'][key]
    corrupted_ratio = statistics["success_percent"]

    df = pd.read_csv(file_path)
    ct = datetime.now()
    ts = str(ct.timestamp())
    good_data_directory = '../airflow/dags/good_data'
    bad_data_directory = '../airflow/dags/bad_data'
    file_path_good_data = os.path.join(good_data_directory, f'good_data_{ts}.csv')
    file_path_bad_data = os.path.join(bad_data_directory, f'bad_data_{ts}.csv')

    indexes = []

    for expectation_identifier, expectation_result in run_results.items():
        validation_result = expectation_result.get("validation_result", {})
        success = validation_result.get("success", False)
        results = validation_result.get("results", [])
        for result in results:
            result_info = result.get("result", {})
            rows = result_info.get("partial_unexpected_index_list")
            if rows != None:
                for row in rows:
                    if row not in indexes:
                        indexes.append(row)

    print(corrupted_ratio)
    print(file_path)
    
    if corrupted_ratio == 0.0:
        shutil.move(file_path, os.path.join(good_data_directory, os.path.basename(file_path)))
        print("file moved to good_data_directory")
    elif corrupted_ratio <= 50:
        good_data = df.drop(index=indexes)
        bad_data = df.iloc[indexes]
        good_data.to_csv(file_path_good_data) 
        bad_data.to_csv(file_path_bad_data)
        os.remove(file_path)
        print("removed bad data from file")
    else:
        shutil.move(file_path, os.path.join(bad_data_directory, os.path.basename(file_path)))
        print("file corrupted ratio is too high, we drop it")

In [86]:
key = list(checkpoint_result.get_statistics()['validation_statistics'])[0]
statistics = checkpoint_result.get_statistics()['validation_statistics'][key]
statistics["success_percent"]

35.483870967741936

### TEST

In [52]:
teams_webhook = "https://epitafr.webhook.office.com/webhookb2/20776877-17e6-405b-bf9f-f0810f814f2a@3534b3d7-316c-4bc9-9ede-605c860f49d2/IncomingWebhook/1a33c6e51db14cb5861ae1833b3e578b/f5fc93d8-16f4-4ce7-950b-5f1d0d1c64dc"
file_path = read_data()
checkpoint_result, result_json = validate_data(file_path)
send_alerts(result_json, teams_webhook)

[[34m2024-04-20T19:17:15.407+0200[0m] {[34mfile_data_context.py:[0m222} INFO[0m - FileDataContext loading fluent config[0m
[[34m2024-04-20T19:17:15.426+0200[0m] {[34mconfig.py:[0m185} INFO[0m - Loading 'datasources' ->
[{'assets': [...], 'name': 'test_data', 'type': 'pandas'},
 {'name': 'default_pandas_datasource', 'type': 'pandas'}][0m
[[34m2024-04-20T19:17:15.428+0200[0m] {[34mfluent_base_model.py:[0m276} INFO[0m - _PandasDataAsset.dict() - missing `config_provider`, skipping config substitution[0m
[[34m2024-04-20T19:17:15.469+0200[0m] {[34mabstract_data_context.py:[0m5504} INFO[0m - Loaded 'test_data' from fluent config[0m
[[34m2024-04-20T19:17:15.470+0200[0m] {[34mfile_data_context.py:[0m174} INFO[0m - Saving 1 Fluent Datasources to /Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/gx/great_expectations.yml[0m
[[34m2024-04-20T19:17:15.471+0200[0m] {[34mfluent_base_model.py:[0m276} INFO[0m - DataFrameAsset.dict() - missing `config_provider`, ski

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[[34m2024-04-20T19:17:15.960+0200[0m] {[34mvalidator.py:[0m1461} INFO[0m - 	31 expectation(s) included in expectation_suite.[0m
[[34m2024-04-20T19:17:16.027+0200[0m] {[34mvalidator.py:[0m1461} INFO[0m - 	31 expectation(s) included in expectation_suite.[0m


Calculating Metrics:   0%|          | 0/88 [00:00<?, ?it/s]

Alert sent successfully.


In [162]:
list(checkpoint_result.get_statistics()['validation_statistics'])[0]

ValidationResultIdentifier::default/__none__/20240420T162010.207397Z/default_pandas_datasource-#ephemeral_pandas_asset

In [177]:
checkpoint_result.get_statistics()['validation_statistics']

{ValidationResultIdentifier::default/__none__/20240420T162724.814298Z/default_pandas_datasource-#ephemeral_pandas_asset: {'evaluated_expectations': 31,
  'successful_expectations': 11,
  'unsuccessful_expectations': 20,
  'success_percent': 35.483870967741936}}

In [194]:
key = list(checkpoint_result.get_statistics()['validation_statistics'])[0]
print(str(key))
a = checkpoint_result.get_statistics()['validation_statistics'][key]
a

ValidationResultIdentifier::default/__none__/20240420T163153.164941Z/default_pandas_datasource-#ephemeral_pandas_asset


{'evaluated_expectations': 31,
 'successful_expectations': 11,
 'unsuccessful_expectations': 20,
 'success_percent': 35.483870967741936}

In [201]:
link = str(key)[28:]
print(link)
modified_string = link.replace('#', '%23')
print(modified_string)

default/__none__/20240420T163153.164941Z/default_pandas_datasource-#ephemeral_pandas_asset
default/__none__/20240420T163153.164941Z/default_pandas_datasource-%23ephemeral_pandas_asset


In [204]:
list(result_json.get('run_results', {}))[0]

'ValidationResultIdentifier::default/__none__/20240420T163153.164941Z/default_pandas_datasource-#ephemeral_pandas_asset'

In [216]:
update_data_docs = result_json.get('run_results', {}).get(list(result_json.get('run_results', {}))[0], {}).get('actions_results', {}).get('update_data_docs', {}).get('local_site', None)
report_link = str(update_data_docs).replace("none", "_none_")

file:///Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/gx/uncommitted/data_docs/local_site/validations/default/__none__/20240420T165005.854845Z/default_pandas_datasource-%23ephemeral_pandas_asset.html
file:///Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/gx/uncommitted/data_docs/local_site/validations/default/___none___/20240420T165005.854845Z/default_pandas_datasource-%23ephemeral_pandas_asset.html


In [64]:
db_params = {
    "host": "localhost",
    "port": 5433,
    "database": "postgres",
    "user": "postgres",
    "password": "1234"
}
save_data_errors(db_params, result_json)

Data errors saved successfully.


In [111]:
teams_webhook = "https://epitafr.webhook.office.com/webhookb2/20776877-17e6-405b-bf9f-f0810f814f2a@3534b3d7-316c-4bc9-9ede-605c860f49d2/IncomingWebhook/1a33c6e51db14cb5861ae1833b3e578b/f5fc93d8-16f4-4ce7-950b-5f1d0d1c64dc"
file_path = read_data()
checkpoint_result, result_json = validate_data(file_path)
send_alerts(result_json, teams_webhook)

[[34m2024-04-20T20:48:15.587+0200[0m] {[34mfile_data_context.py:[0m222} INFO[0m - FileDataContext loading fluent config[0m
[[34m2024-04-20T20:48:15.595+0200[0m] {[34mconfig.py:[0m185} INFO[0m - Loading 'datasources' ->
[{'assets': [...], 'name': 'test_data', 'type': 'pandas'},
 {'name': 'default_pandas_datasource', 'type': 'pandas'}][0m
[[34m2024-04-20T20:48:15.596+0200[0m] {[34mfluent_base_model.py:[0m276} INFO[0m - _PandasDataAsset.dict() - missing `config_provider`, skipping config substitution[0m
[[34m2024-04-20T20:48:15.637+0200[0m] {[34mabstract_data_context.py:[0m5504} INFO[0m - Loaded 'test_data' from fluent config[0m
[[34m2024-04-20T20:48:15.640+0200[0m] {[34mfile_data_context.py:[0m174} INFO[0m - Saving 1 Fluent Datasources to /Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/gx/great_expectations.yml[0m
[[34m2024-04-20T20:48:15.641+0200[0m] {[34mfluent_base_model.py:[0m276} INFO[0m - DataFrameAsset.dict() - missing `config_provider`, ski

Calculating Metrics:   0%|          | 0/2 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/6 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/8 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[0m


Calculating Metrics:   0%|          | 0/1 [00:00<?, ?it/s]

[[34m2024-04-20T20:48:16.234+0200[0m] {[34mvalidator.py:[0m1461} INFO[0m - 	31 expectation(s) included in expectation_suite.[0m
[[34m2024-04-20T20:48:16.310+0200[0m] {[34mvalidator.py:[0m1461} INFO[0m - 	31 expectation(s) included in expectation_suite.[0m


Calculating Metrics:   0%|          | 0/88 [00:00<?, ?it/s]

Alert sent successfully.


In [112]:
db_params = {
    "host": "localhost",
    "port": 5433,
    "database": "postgres",
    "user": "postgres",
    "password": "1234"
}
save_data_errors(db_params, result_json)

Data errors saved successfully.


In [113]:
save_file(checkpoint_result, file_path)

35.483870967741936
/Users/julien/Documents/EPITA/S2/DSP/dsp-project-JPS/airflow/dags/corrupted_data/partition_134.csv
removed bad data from file
