# Data Quality Checks Generation

# Setup

## Imports

In [1]:
import sys
sys.path.append('../')

from src.config.config import config
from src.utils.azure_utils import get_azure_blob_fs
from src.utils.logging_utils import logger

import pandas as pd
import numpy as np
import pyarrow.dataset as ds
import json

from tdda.constraints import discover_df

## Functions

In [2]:
def load_gold_data() -> pd.DataFrame:
    """
    Load the latest Gold layer data from Azure Blob Storage into a Pandas DataFrame.

    Returns:
        pd.DataFrame: The Gold layer data with columns renamed and sorted by date.
    """
    gold_blob_path = f"{config.CONTAINER_NAME}/{config.FOLDER}/gold/"

    # Find the latest file
    logger.info(f"Finding the latest file in {gold_blob_path}")
    abfs = get_azure_blob_fs()
    files = abfs.ls(gold_blob_path)
    latest_file = max(files, key=lambda x: x.split("/")[-1])
    gold_blob_path = latest_file

    # Load the Gold Parquet file
    logger.info(f"Loading Gold data from {gold_blob_path}")
    pqdata = ds.dataset(gold_blob_path, filesystem=abfs)

    return (
        pqdata
        .to_table()
        .to_pandas()
        .reset_index(drop=True)
    )

def load_silver_data():
    """
    Load the latest Silver layer data from Azure Blob Storage into a Pandas DataFrame.

    Returns:
        pd.DataFrame: The Silver layer data with columns renamed and sorted by date.
    """
    silver_blob_path = f"{config.CONTAINER_NAME}/{config.FOLDER}/silver/"

    # Find the latest file
    logger.info(f"Finding the latest file in {silver_blob_path}")
    abfs = get_azure_blob_fs()
    files = abfs.ls(silver_blob_path)
    latest_file = max(files, key=lambda x: x.split("/")[-1])
    silver_blob_path = latest_file

    # Load the Silver Parquet file
    logger.info(f"Loading Silver data from {silver_blob_path}")
    pqdata = ds.dataset(silver_blob_path, filesystem=abfs)

    return (
        pqdata
        .to_table()
        .to_pandas()
        .reset_index(drop=True)
    )

def load_bronze_data() -> pd.DataFrame:
    """
    Load the latest Bronze layer data from Azure Blob Storage into a Pandas DataFrame.

    Returns:
        pd.DataFrame: The Bronze layer data with columns renamed and sorted by date.
    """
    bronze_blob_path = f"{config.CONTAINER_NAME}/{config.FOLDER}/bronze/"

    # Find the latest file
    logger.info(f"Finding the latest file in {bronze_blob_path}")
    abfs = get_azure_blob_fs()
    files = abfs.ls(bronze_blob_path)
    latest_file = max(files, key=lambda x: x.split("/")[-1])
    bronze_blob_path = latest_file

    # Load the Bronze JSON file
    logger.info(f"Loading Bronze data from {bronze_blob_path}")
    with abfs.open(bronze_blob_path, "r") as file:
        return pd.DataFrame(json.load(file))


def load_predictions() -> pd.DataFrame:
    """
    Load the latest predictions data from Azure Blob Storage into a Pandas DataFrame.

    Returns:
        pd.DataFrame: The predictions data sorted by date.
    """
    predictions_blob_path = f"{config.CONTAINER_NAME}/{config.FOLDER}/predictions/"

    # Find the latest file
    abfs = get_azure_blob_fs()
    files = abfs.ls(predictions_blob_path)
    latest_file = max(files, key=lambda x: x.split("/")[-1])
    predictions_blob_path = latest_file

    # Load the predictions Parquet file
    logger.info(f"Loading predictions from {predictions_blob_path}")
    pqdata = ds.dataset(predictions_blob_path, filesystem=abfs)

    return (
        pqdata
        .to_table()
        .to_pandas()
        .reset_index(drop=True)
    )

# Load Data

In [3]:
bronze = load_bronze_data()
silver = load_silver_data()
gold = load_gold_data()
predictions = load_predictions()
data_dict = {
    "bronze": bronze,
    "silver": silver,
    "gold": gold,
    "predictions": predictions
}

[32m2024-08-27 20:43:36.584[0m | [1mINFO    [0m | [36m__main__[0m:[36mload_bronze_data[0m:[36m65[0m - [1mFinding the latest file in data/energy_consumption/bronze/[0m
[32m2024-08-27 20:43:37.606[0m | [1mINFO    [0m | [36m__main__[0m:[36mload_bronze_data[0m:[36m72[0m - [1mLoading Bronze data from data/energy_consumption/bronze/raw_data_20240827.json[0m
[32m2024-08-27 20:43:45.976[0m | [1mINFO    [0m | [36m__main__[0m:[36mload_silver_data[0m:[36m38[0m - [1mFinding the latest file in data/energy_consumption/silver/[0m
[32m2024-08-27 20:43:46.378[0m | [1mINFO    [0m | [36m__main__[0m:[36mload_silver_data[0m:[36m45[0m - [1mLoading Silver data from data/energy_consumption/silver/cleaned_data_20240827.parquet[0m
[32m2024-08-27 20:43:47.458[0m | [1mINFO    [0m | [36m__main__[0m:[36mload_gold_data[0m:[36m11[0m - [1mFinding the latest file in data/energy_consumption/gold/[0m
[32m2024-08-27 20:43:48.052[0m | [1mINFO    [0m | [36m__m

# Quality Checks

In [4]:
quality_report = {data_name: discover_df(data) for data_name, data in data_dict.items()}

## Bronze

In [5]:
display(data_dict['bronze'].head())
quality_report['bronze'].to_dict()

Unnamed: 0,cod_areacarga,din_atualizacao,dat_referencia,din_referenciautc,val_cargaglobal,val_cargaglobalcons,val_cargaglobalsmmgd,val_cargasupervisionada,val_carganaosupervisionada,val_cargammgd,val_consistencia
0,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T03:30:00.000Z,14785.096,14785.096,14780.366,14463.574,316.7916,4.7297,0.0
1,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T04:00:00.000Z,14307.296,14307.296,14302.565,13985.748,316.8175,4.7303,0.0
2,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T04:30:00.000Z,13920.842,13920.842,13916.111,13599.25,316.861,4.7304,0.0
3,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T05:00:00.000Z,13615.293,13615.293,13610.563,13293.569,316.9945,4.7296,0.0
4,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T05:30:00.000Z,13452.07,13452.07,13447.342,13130.2705,317.0712,4.7285,0.0


OrderedDict([('creation_metadata',
              OrderedDict([('local_time', '2024-08-27T20:43:49'),
                           ('utc_time', '2024-08-27T23:43:49'),
                           ('creator', 'TDDA 2.2.05'),
                           ('host', 'pedro-A70-MOB'),
                           ('user', 'pedro'),
                           ('n_records', 61152),
                           ('n_selected', 61152)])),
             ('fields',
              OrderedDict([('cod_areacarga',
                            OrderedDict([('type', 'string'),
                                         ('min_length', 2),
                                         ('max_length', 2),
                                         ('max_nulls', 0),
                                         ('allowed_values', ['SP'])])),
                           ('din_atualizacao',
                            OrderedDict([('type', 'string'),
                                         ('min_length', 24),
                            

## Silver

In [6]:
display(data_dict['silver'].head())
quality_report['silver'].to_dict()

Unnamed: 0,data,carga_mw
0,2021-03-01 03:30:00,14785.096
1,2021-03-01 04:00:00,14307.296
2,2021-03-01 04:30:00,13920.842
3,2021-03-01 05:00:00,13615.293
4,2021-03-01 05:30:00,13452.07


OrderedDict([('creation_metadata',
              OrderedDict([('local_time', '2024-08-27T20:43:49'),
                           ('utc_time', '2024-08-27T23:43:49'),
                           ('creator', 'TDDA 2.2.05'),
                           ('host', 'pedro-A70-MOB'),
                           ('user', 'pedro'),
                           ('n_records', 61152),
                           ('n_selected', 61152)])),
             ('fields',
              OrderedDict([('data',
                            OrderedDict([('type', 'date'),
                                         ('min', '2021-03-01 03:30:00'),
                                         ('max', '2024-08-26 03:00:00'),
                                         ('max_nulls', 0)])),
                           ('carga_mw',
                            OrderedDict([('type', 'real'),
                                         ('min', -8456.68),
                                         ('max', 26495.3),
                                 

## Gold

In [7]:
display(data_dict['gold'].head())
quality_report['gold'].to_dict()

Unnamed: 0,date,daily_carga_mw
0,2021-03-01,740064.127
1,2021-03-02,910450.209
2,2021-03-03,909328.941
3,2021-03-04,904879.849
4,2021-03-05,878506.3495


OrderedDict([('creation_metadata',
              OrderedDict([('local_time', '2024-08-27T20:43:49'),
                           ('utc_time', '2024-08-27T23:43:49'),
                           ('creator', 'TDDA 2.2.05'),
                           ('host', 'pedro-A70-MOB'),
                           ('user', 'pedro'),
                           ('n_records', 1275),
                           ('n_selected', 1275)])),
             ('fields',
              OrderedDict([('date',
                            OrderedDict([('type', 'date'),
                                         ('min', '2021-03-01 00:00:00'),
                                         ('max', '2024-08-26 00:00:00'),
                                         ('max_nulls', 0)])),
                           ('daily_carga_mw',
                            OrderedDict([('type', 'real'),
                                         ('min', 112149.84199999999),
                                         ('max', 1086165.883),
               

## Predictions

In [8]:
display(data_dict['predictions'].head())
quality_report['predictions'].to_dict()

Unnamed: 0,ds,y,unique_id,CatBoostRegressor,LGBMRegressor,XGBRegressor,RandomForestRegressor,CatBoostRegressor-lo-99,CatBoostRegressor-lo-95,CatBoostRegressor-lo-90,...,RandomForestRegressor-hi-20,RandomForestRegressor-hi-30,RandomForestRegressor-hi-40,RandomForestRegressor-hi-50,RandomForestRegressor-hi-60,RandomForestRegressor-hi-70,RandomForestRegressor-hi-80,RandomForestRegressor-hi-90,RandomForestRegressor-hi-95,RandomForestRegressor-hi-99
0,2024-06-15,816371.131,0,,,,,,,,...,,,,,,,,,,
1,2024-06-16,727934.964,0,,,,,,,,...,,,,,,,,,,
2,2024-06-17,851129.98,0,,,,,,,,...,,,,,,,,,,
3,2024-06-18,886222.722,0,,,,,,,,...,,,,,,,,,,
4,2024-06-19,890934.761,0,,,,,,,,...,,,,,,,,,,


OrderedDict([('creation_metadata',
              OrderedDict([('local_time', '2024-08-27T20:43:49'),
                           ('utc_time', '2024-08-27T23:43:49'),
                           ('creator', 'TDDA 2.2.05'),
                           ('host', 'pedro-A70-MOB'),
                           ('user', 'pedro'),
                           ('n_records', 133),
                           ('n_selected', 133)])),
             ('fields',
              OrderedDict([('ds',
                            OrderedDict([('type', 'date'),
                                         ('min', '2024-06-15 00:00:00'),
                                         ('max', '2024-10-25 00:00:00'),
                                         ('max_nulls', 0)])),
                           ('y',
                            OrderedDict([('type', 'real'),
                                         ('min', 698073.5960000001),
                                         ('max', 934259.6354999999),
                           

## Save it

In [9]:
# # Save the quality report to a JSON file
# for data_name, report in quality_report.items():
#     with open(f"../src/data/quality_reports/{data_name}_quality_report.json", "w") as file:
#         file.write(report.to_json())
#         print(f"Quality report for {data_name} saved to ../data/quality_reports/{data_name}_quality_report.json")

# Infer schemas

In [10]:
from pandera.schema_inference.pandas import infer_schema

for data_name, data in data_dict.items():
    schema = infer_schema(data)
    #schema.to_script("../src/data/schemas/{}_schema.py".format(data_name))
    print(f"Schema for {data_name} saved to ../data/schemas/{data_name}_schema.py")

Schema for bronze saved to ../data/schemas/bronze_schema.py
Schema for silver saved to ../data/schemas/silver_schema.py
Schema for gold saved to ../data/schemas/gold_schema.py
Schema for predictions saved to ../data/schemas/predictions_schema.py


# Check Schemas

In [11]:
from src.data.schemas.bronze_schema import schema as bronze_schema
from src.data.schemas.silver_schema import schema as silver_schema
from src.data.schemas.gold_schema import schema as gold_schema
from src.data.schemas.predictions_schema import schema as predictions_schema

In [12]:
bronze_schema.validate(data_dict['bronze'])

Unnamed: 0,cod_areacarga,din_atualizacao,dat_referencia,din_referenciautc,val_cargaglobal,val_cargaglobalcons,val_cargaglobalsmmgd,val_cargasupervisionada,val_carganaosupervisionada,val_cargammgd,val_consistencia
0,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T03:30:00.000Z,14785.096,14785.096,14780.366,14463.5740,316.7916,4.7297,0.0
1,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T04:00:00.000Z,14307.296,14307.296,14302.565,13985.7480,316.8175,4.7303,0.0
2,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T04:30:00.000Z,13920.842,13920.842,13916.111,13599.2500,316.8610,4.7304,0.0
3,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T05:00:00.000Z,13615.293,13615.293,13610.563,13293.5690,316.9945,4.7296,0.0
4,SP,2023-04-28T04:58:15.065Z,2021-03-01,2021-03-01T05:30:00.000Z,13452.070,13452.070,13447.342,13130.2705,317.0712,4.7285,0.0
...,...,...,...,...,...,...,...,...,...,...,...
61147,SP,2024-08-27T03:22:24.348Z,2024-08-25,2024-08-26T01:00:00.000Z,16666.055,16666.055,16649.766,15236.8670,1412.8990,16.2900,0.0
61148,SP,2024-08-27T03:22:24.348Z,2024-08-25,2024-08-26T01:30:00.000Z,16123.604,16123.604,16107.161,14690.5790,1416.5818,16.4421,0.0
61149,SP,2024-08-27T03:22:24.348Z,2024-08-25,2024-08-26T02:00:00.000Z,15480.127,15480.127,15463.709,14043.7480,1419.9609,16.4179,0.0
61150,SP,2024-08-27T03:22:24.348Z,2024-08-25,2024-08-26T02:30:00.000Z,14918.517,14918.517,14902.227,13479.8300,1422.3960,16.2900,0.0


In [13]:
silver_schema.validate(data_dict['silver'])

Unnamed: 0,data,carga_mw
0,2021-03-01 03:30:00,14785.096
1,2021-03-01 04:00:00,14307.296
2,2021-03-01 04:30:00,13920.842
3,2021-03-01 05:00:00,13615.293
4,2021-03-01 05:30:00,13452.070
...,...,...
61147,2024-08-26 01:00:00,16666.055
61148,2024-08-26 01:30:00,16123.604
61149,2024-08-26 02:00:00,15480.127
61150,2024-08-26 02:30:00,14918.517


In [14]:
gold_schema.validate(data_dict['gold'])

Unnamed: 0,date,daily_carga_mw
0,2021-03-01,740064.1270
1,2021-03-02,910450.2090
2,2021-03-03,909328.9410
3,2021-03-04,904879.8490
4,2021-03-05,878506.3495
...,...,...
1270,2024-08-22,929876.4890
1271,2024-08-23,934259.6355
1272,2024-08-24,855712.3575
1273,2024-08-25,748091.7795


In [15]:
predictions_schema.validate(data_dict['predictions'])

Unnamed: 0,ds,y,unique_id,CatBoostRegressor,LGBMRegressor,XGBRegressor,RandomForestRegressor,CatBoostRegressor-lo-99,CatBoostRegressor-lo-95,CatBoostRegressor-lo-90,...,RandomForestRegressor-hi-20,RandomForestRegressor-hi-30,RandomForestRegressor-hi-40,RandomForestRegressor-hi-50,RandomForestRegressor-hi-60,RandomForestRegressor-hi-70,RandomForestRegressor-hi-80,RandomForestRegressor-hi-90,RandomForestRegressor-hi-95,RandomForestRegressor-hi-99
0,2024-06-15,816371.131,0,,,,,,,,...,,,,,,,,,,
1,2024-06-16,727934.964,0,,,,,,,,...,,,,,,,,,,
2,2024-06-17,851129.980,0,,,,,,,,...,,,,,,,,,,
3,2024-06-18,886222.722,0,,,,,,,,...,,,,,,,,,,
4,2024-06-19,890934.761,0,,,,,,,,...,,,,,,,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
128,2024-10-21,,0,803651.407382,769239.075103,816328.0625,798513.861240,762473.822842,763912.531213,765710.916677,...,805537.953570,806356.363486,807174.773403,807993.183319,808811.593235,813422.510235,818033.427235,822644.344235,824949.802735,826794.169535
129,2024-10-22,,0,832695.684901,816920.088404,880758.5000,839956.220215,791518.100361,792956.808732,794755.194196,...,846980.312545,847798.722461,848617.132378,849435.542294,850253.952210,854864.869210,859475.786210,864086.703210,866392.161710,868236.528510
130,2024-10-23,,0,823444.996255,826470.780959,893525.3750,843281.566845,782267.411714,783706.120086,785504.505550,...,850305.659175,851124.069091,851942.479008,852760.888924,853579.298840,858190.215840,862801.132840,867412.049840,869717.508340,871561.875140
131,2024-10-24,,0,815474.443484,821612.337913,891047.3125,831031.642050,774296.858944,775735.567315,777533.952779,...,838055.734380,838874.144296,839692.554213,840510.964129,841329.374045,845940.291045,850551.208045,855162.125045,857467.583545,859311.950345


In [27]:
bronze['din_atualizacao'] = pd.to_datetime(bronze['din_atualizacao']).apply(lambda x: x.replace(tzinfo=None))
bronze['din_referenciautc'] = pd.to_datetime(bronze['din_referenciautc']).apply(lambda x: x.replace(tzinfo=None))
bronze[['din_atualizacao','din_referenciautc','val_cargaglobal']].tail(30)

Unnamed: 0,din_atualizacao,din_referenciautc,val_cargaglobal
61122,2024-08-27 03:22:24.348,2024-08-25 12:30:00,14963.54
61123,2024-08-27 03:22:24.348,2024-08-25 13:00:00,15248.594
61124,2024-08-27 03:22:24.348,2024-08-25 13:30:00,15605.788
61125,2024-08-27 03:22:24.348,2024-08-25 14:00:00,15975.923
61126,2024-08-27 03:22:24.348,2024-08-25 14:30:00,16288.75
61127,2024-08-27 03:22:24.348,2024-08-25 15:00:00,16569.576
61128,2024-08-27 03:22:24.348,2024-08-25 15:30:00,16739.709
61129,2024-08-27 03:22:24.348,2024-08-25 16:00:00,16605.78
61130,2024-08-27 03:22:24.348,2024-08-25 16:30:00,16410.697
61131,2024-08-27 03:22:24.348,2024-08-25 17:00:00,16252.445
