# Expectations

In [50]:
import great_expectations as ge
from great_expectations.checkpoint import SimpleCheckpoint
import pandas as pd
import logging
import yaml
from typing import Dict, Tuple, Any
import os

logger = logging.getLogger(__name__)

In [51]:
df = pd.read_csv("../data/01_raw/house-pricing.csv")
df_model_input = pd.read_csv("../data/05_model_input/X_train_transformed.csv")

In [52]:
df.head()

Unnamed: 0,Id,MSSubClass,MSZoning,LotFrontage,LotArea,Street,Alley,LotShape,LandContour,Utilities,...,PoolArea,PoolQC,Fence,MiscFeature,MiscVal,MoSold,YrSold,SaleType,SaleCondition,SalePrice
0,1,60,RL,65.0,8450,Pave,,Reg,Lvl,AllPub,...,0,,,,0,2,2008,WD,Normal,208500
1,2,20,RL,80.0,9600,Pave,,Reg,Lvl,AllPub,...,0,,,,0,5,2007,WD,Normal,181500
2,3,60,RL,68.0,11250,Pave,,IR1,Lvl,AllPub,...,0,,,,0,9,2008,WD,Normal,223500
3,4,70,RL,60.0,9550,Pave,,IR1,Lvl,AllPub,...,0,,,,0,2,2006,WD,Abnorml,140000
4,5,60,RL,84.0,14260,Pave,,IR1,Lvl,AllPub,...,0,,,,0,12,2008,WD,Normal,250000


In [53]:
gdf = ge.from_pandas(df)
num_cols = df.select_dtypes(include=['number']).columns.tolist()
cat_cols = df.select_dtypes(include=['object']).columns.tolist()

Check column amount

In [54]:
column_number = len(gdf.columns)
gdf.expect_table_column_count_to_equal(column_number)

{
  "result": {
    "observed_value": 81
  },
  "success": true,
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Check if column exist

In [55]:
column_list = gdf.columns.tolist()

In [56]:
column_list_yaml = yaml.dump(column_list)
print(column_list_yaml)

- Id
- MSSubClass
- MSZoning
- LotFrontage
- LotArea
- Street
- Alley
- LotShape
- LandContour
- Utilities
- LotConfig
- LandSlope
- Neighborhood
- Condition1
- Condition2
- BldgType
- HouseStyle
- OverallQual
- OverallCond
- YearBuilt
- YearRemodAdd
- RoofStyle
- RoofMatl
- Exterior1st
- Exterior2nd
- MasVnrType
- MasVnrArea
- ExterQual
- ExterCond
- Foundation
- BsmtQual
- BsmtCond
- BsmtExposure
- BsmtFinType1
- BsmtFinSF1
- BsmtFinType2
- BsmtFinSF2
- BsmtUnfSF
- TotalBsmtSF
- Heating
- HeatingQC
- CentralAir
- Electrical
- 1stFlrSF
- 2ndFlrSF
- LowQualFinSF
- GrLivArea
- BsmtFullBath
- BsmtHalfBath
- FullBath
- HalfBath
- BedroomAbvGr
- KitchenAbvGr
- KitchenQual
- TotRmsAbvGrd
- Functional
- Fireplaces
- FireplaceQu
- GarageType
- GarageYrBlt
- GarageFinish
- GarageCars
- GarageArea
- GarageQual
- GarageCond
- PavedDrive
- WoodDeckSF
- OpenPorchSF
- EnclosedPorch
- 3SsnPorch
- ScreenPorch
- PoolArea
- PoolQC
- Fence
- MiscFeature
- MiscVal
- MoSold
- YrSold
- SaleType
- SaleCondi

In [57]:
def check_if_column_exist(gdf, column_list):
    for column in column_list:
        gdf.expect_column_to_exist(column)

In [58]:
check_if_column_exist(gdf, column_list)

Check dtypes

In [59]:
def check_dtype(gdf, columns, dtype):
    if dtype == 'numeric':
        for column in columns:
            gdf.expect_column_values_to_be_in_type_list(column, ['int64', 'float64'])
    else:
        for column in columns:
            gdf.expect_column_values_to_be_in_type_list(column, ["str"])

In [60]:
check_dtype(gdf, num_cols, dtype='numeric')
check_dtype(gdf, cat_cols, dtype='object')

Check unique values of categorical columns

In [61]:
cat_cols = df.select_dtypes(include=['object']).columns
cat_cols_dict = {}
for column in cat_cols:
    cat_cols_dict[column] = df[column].unique().tolist()

In [62]:
cat_cols_dict

{'MSZoning': ['RL', 'RM', 'C (all)', 'FV', 'RH'],
 'Street': ['Pave', 'Grvl'],
 'Alley': [nan, 'Grvl', 'Pave'],
 'LotShape': ['Reg', 'IR1', 'IR2', 'IR3'],
 'LandContour': ['Lvl', 'Bnk', 'Low', 'HLS'],
 'Utilities': ['AllPub', 'NoSeWa'],
 'LotConfig': ['Inside', 'FR2', 'Corner', 'CulDSac', 'FR3'],
 'LandSlope': ['Gtl', 'Mod', 'Sev'],
 'Neighborhood': ['CollgCr',
  'Veenker',
  'Crawfor',
  'NoRidge',
  'Mitchel',
  'Somerst',
  'NWAmes',
  'OldTown',
  'BrkSide',
  'Sawyer',
  'NridgHt',
  'NAmes',
  'SawyerW',
  'IDOTRR',
  'MeadowV',
  'Edwards',
  'Timber',
  'Gilbert',
  'StoneBr',
  'ClearCr',
  'NPkVill',
  'Blmngtn',
  'BrDale',
  'SWISU',
  'Blueste'],
 'Condition1': ['Norm',
  'Feedr',
  'PosN',
  'Artery',
  'RRAe',
  'RRNn',
  'RRAn',
  'PosA',
  'RRNe'],
 'Condition2': ['Norm',
  'Artery',
  'RRNn',
  'Feedr',
  'PosN',
  'PosA',
  'RRAn',
  'RRAe'],
 'BldgType': ['1Fam', '2fmCon', 'Duplex', 'TwnhsE', 'Twnhs'],
 'HouseStyle': ['2Story',
  '1Story',
  '1.5Fin',
  '1.5Unf',
  

In [63]:
import yaml
result = yaml.dump(cat_cols_dict)
print(result)

Alley:
- .nan
- Grvl
- Pave
BldgType:
- 1Fam
- 2fmCon
- Duplex
- TwnhsE
- Twnhs
BsmtCond:
- TA
- Gd
- .nan
- Fa
- Po
BsmtExposure:
- 'No'
- Gd
- Mn
- Av
- .nan
BsmtFinType1:
- GLQ
- ALQ
- Unf
- Rec
- BLQ
- .nan
- LwQ
BsmtFinType2:
- Unf
- BLQ
- .nan
- ALQ
- Rec
- LwQ
- GLQ
BsmtQual:
- Gd
- TA
- Ex
- .nan
- Fa
CentralAir:
- Y
- N
Condition1:
- Norm
- Feedr
- PosN
- Artery
- RRAe
- RRNn
- RRAn
- PosA
- RRNe
Condition2:
- Norm
- Artery
- RRNn
- Feedr
- PosN
- PosA
- RRAn
- RRAe
Electrical:
- SBrkr
- FuseF
- FuseA
- FuseP
- Mix
- .nan
ExterCond:
- TA
- Gd
- Fa
- Po
- Ex
ExterQual:
- Gd
- TA
- Ex
- Fa
Exterior1st:
- VinylSd
- MetalSd
- Wd Sdng
- HdBoard
- BrkFace
- WdShing
- CemntBd
- Plywood
- AsbShng
- Stucco
- BrkComm
- AsphShn
- Stone
- ImStucc
- CBlock
Exterior2nd:
- VinylSd
- MetalSd
- Wd Shng
- HdBoard
- Plywood
- Wd Sdng
- CmentBd
- BrkFace
- Stucco
- AsbShng
- Brk Cmn
- ImStucc
- AsphShn
- Stone
- Other
- CBlock
Fence:
- .nan
- MnPrv
- GdWo
- GdPrv
- MnWw
FireplaceQu:
- .nan
- TA
-

In [64]:
def check_categorical_unique_values(gdf, dict_cat_cols):
    for column in dict_cat_cols.keys():
        gdf.expect_column_values_to_be_in_set(column, dict_cat_cols[column])

In [65]:
check_categorical_unique_values(gdf, cat_cols_dict)

Check SalePrice deviation

In [66]:
# def calculate_median_deviation(gdf, column):
#     df = gdf.copy()
#     median = df[column].median()
#     absolute_deviations = df[column].apply(lambda x: abs(x - median))
#     mad = absolute_deviations.median()
#     return mad

In [67]:
median_sales_price = df['SalePrice'].median()
median_threshold = 0.1
gdf.expect_column_median_to_be_between("SalePrice", median_sales_price*(1-median_threshold), median_sales_price*(1+median_threshold))

{
  "result": {
    "observed_value": 163000.0,
    "element_count": 1460,
    "missing_count": null,
    "missing_percent": null
  },
  "success": true,
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

Check columns not be null

In [68]:
def check_nulls(gdf, columns):

    for column in columns:
        gdf.expect_column_values_to_not_be_null(column)

In [69]:
check_nulls(gdf, gdf.columns)

Check if ID is unique

In [70]:
# check if id is unique
gdf.expect_column_values_to_be_unique("Id")

{
  "result": {
    "element_count": 1460,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0,
    "partial_unexpected_list": []
  },
  "success": true,
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [71]:
import datetime

current_year = datetime.datetime.now().year

In [72]:
gdf.expect_column_values_to_be_between("YearBuilt", 1800, current_year)

{
  "result": {
    "element_count": 1460,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0,
    "partial_unexpected_list": []
  },
  "success": true,
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [73]:
gdf.expect_column_max_to_be_between("YrSold", 1950, current_year)


{
  "result": {
    "observed_value": 2010,
    "element_count": 1460,
    "missing_count": null,
    "missing_percent": null
  },
  "success": true,
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

In [74]:
gdf[num_cols]

Unnamed: 0,Id,MSSubClass,LotFrontage,LotArea,OverallQual,OverallCond,YearBuilt,YearRemodAdd,MasVnrArea,BsmtFinSF1,...,WoodDeckSF,OpenPorchSF,EnclosedPorch,3SsnPorch,ScreenPorch,PoolArea,MiscVal,MoSold,YrSold,SalePrice
0,1,60,65.0,8450,7,5,2003,2003,196.0,706,...,0,61,0,0,0,0,0,2,2008,208500
1,2,20,80.0,9600,6,8,1976,1976,0.0,978,...,298,0,0,0,0,0,0,5,2007,181500
2,3,60,68.0,11250,7,5,2001,2002,162.0,486,...,0,42,0,0,0,0,0,9,2008,223500
3,4,70,60.0,9550,7,5,1915,1970,0.0,216,...,0,35,272,0,0,0,0,2,2006,140000
4,5,60,84.0,14260,8,5,2000,2000,350.0,655,...,192,84,0,0,0,0,0,12,2008,250000
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
1455,1456,60,62.0,7917,6,5,1999,2000,0.0,0,...,0,40,0,0,0,0,0,8,2007,175000
1456,1457,20,85.0,13175,6,6,1978,1988,119.0,790,...,349,0,0,0,0,0,0,2,2010,210000
1457,1458,70,66.0,9042,7,9,1941,2006,0.0,275,...,0,60,0,0,0,0,2500,5,2010,266500
1458,1459,20,68.0,9717,5,6,1950,1996,0.0,49,...,366,0,112,0,0,0,0,4,2010,142125


In [75]:
validation = gdf.validate()

In [76]:
validation.to_json_dict()

{'statistics': {'evaluated_expectations': 291,
  'successful_expectations': 272,
  'unsuccessful_expectations': 19,
  'success_percent': 93.47079037800687},
 'results': [{'result': {'observed_value': 81},
   'success': True,
   'meta': {},
   'expectation_config': {'expectation_type': 'expect_table_column_count_to_equal',
    'kwargs': {'value': 81, 'result_format': 'BASIC'},
    'meta': {}},
   'exception_info': {'raised_exception': False,
    'exception_message': None,
    'exception_traceback': None}},
  {'result': {},
   'success': True,
   'meta': {},
   'expectation_config': {'expectation_type': 'expect_column_to_exist',
    'kwargs': {'column': 'Id', 'result_format': 'BASIC'},
    'meta': {}},
   'exception_info': {'raised_exception': False,
    'exception_message': None,
    'exception_traceback': None}},
  {'result': {'observed_value': 'int64'},
   'success': True,
   'meta': {},
   'expectation_config': {'expectation_type': 'expect_column_values_to_be_in_type_list',
    'kwar

In [77]:
folder_path = '../data/08_reporting/Expectations_reporting'
os.makedirs(folder_path, exist_ok=True)

validation_results = gdf.validate()
print(folder_path + "/feature_engineered_data_validation_results.json")

../data/08_reporting/Expectations_reporting/feature_engineered_data_validation_results.json


In [78]:
import json
validation_results = gdf.validate()
file_path_validation_results = os.path.join(folder_path, "feature_engineered_data_validation_results.json")
with open(file_path_validation_results, 'w') as json_file:
    json.dump(validation_results.to_json_dict(), json_file)

In [79]:
logger = logging.getLogger(__name__)

In [None]:
failed_expectations = [result for result in validation["results"] if not result["success"]]
    
logger.info(
        f"Total Expectations: {len(validation['results'])}"
        f"Failed Expectations: {len(failed_expectations)}"
    )
    
if failed_expectations:
        collect_errors = []
        for idx, failed_expectation in enumerate(failed_expectations, start=1):
            collect_errors.append(
                f"Failed Expectation {idx}:"
                f"  Expectation Type: {failed_expectation['expectation_config']['expectation_type']}"
                f"  Column: {failed_expectation['expectation_config']['kwargs']['column']}"
                f"  Details: {failed_expectation['result']}")
    
        raise Exception(
            f"Data Quality Validation Failed: {collect_errors}"
        )

In [None]:
import json
import os
# Create the data/08_reporting folder if it doesn't exist
folder_path = '../data/08_reporting'
os.makedirs(folder_path, exist_ok=True)

# Define the file path
file_path = os.path.join(folder_path, 'collect_errors.json')

# Save the list as a JSON file
with open(file_path, 'w') as json_file:
    json.dump(collect_errors, json_file)

print(f"The JSON file has been saved at {file_path}.")

The JSON file has been saved at ../data/08_reporting\collect_errors.json.


Node function

In [81]:
def check_data_cleaning(df: pd.DataFrame, parameters : Dict[str, Any]) -> Tuple[pd.DataFrame, Dict]:
    """
    Check expectations for the cleaned dataset.
    - Check if the number of columns.
    - Check if the columns exist.
    - Check if the columns are of the correct type.
    - Check if the categorical columns have the correct unique values.
    - Check if the numeric columns are within the correct range.
    - Check if the columns have null values.
    - Check if the ID has only unique values.
    - Check if the YearBuilt and YrSold are within the correct range.

    Afterwards save the validation results and raise an exception and save the errors, if any of the expectations fail.

    Args:
    --
        df (pd.DataFrame): Dataframe to check for nulls.
        parameters (Dict): Parameters from the configuration file.

    Returns:
    --
        df (pd.DataFrame): Input dataframe.
    """

    # Creates a folder to save the expectations results.
    folder_path = '../data/08_reporting/Expectations_reporting'
    os.makedirs(folder_path, exist_ok=True)

    current_year = datetime.date.today().year + 1
    num_cols = df.select_dtypes(include=['number']).columns.tolist()
    cat_cols = df.select_dtypes(include=['object']).columns.tolist()
    num_columns = parameters["num_columns"]
    column_list = parameters["column_list"]
    cat_unique_values = parameters["categorical_unique_values"]

    gdf = ge.from_pandas(df)

    gdf.expect_table_column_count_to_equal(num_columns)
    check_if_column_exist(gdf, column_list)
    check_dtype(gdf, num_cols, dtype='numeric')
    check_dtype(gdf, cat_cols, dtype='object')
    check_categorical_unique_values(gdf, cat_unique_values)

    check_nulls(gdf, gdf.columns)
    gdf.expect_column_values_to_be_unique("Id")
    gdf.expect_column_max_to_be_between("YearBuilt", 1800, current_year)
    gdf.expect_column_max_to_be_between("YrSold", 1950, current_year)
    
    # Create the validation results and save them in a json file.
    validation_results = gdf.validate()
    file_path_validation_results = os.path.join(folder_path, "Cleaned_data_validation_results.json")
    validation_results.to_json_dict(file_path_validation_results)

    failed_expectations = [result for result in validation_results["results"] if not result["success"]]
    
    logger.info(
        f"Total Expectations: {len(validation_results['results'])}"
        f"Failed Expectations: {len(failed_expectations)}"
    )
    
    # Collects the errors in a list and saves them in a json file.
    # Afterwards raises an exception with the errors.
    if failed_expectations:
        collect_errors = []
        for idx, failed_expectation in enumerate(failed_expectations, start=1):
            collect_errors.append(
                f"  Failed Expectation {idx}:"
                f"  Expectation Type: {failed_expectation['expectation_config']['expectation_type']}"
                f"  Column: {failed_expectation['expectation_config']['kwargs']['column']}"
                f"  Details: {failed_expectation['result']}")
            
            # Saves the collected errors in a json file.
            file_path = os.path.join(folder_path, 'Cleaned_data_errors.json')
            with open(file_path, 'w') as json_file:
                json.dump(collect_errors, json_file)
    
        raise Exception(
            f"Data Quality Validation Failed: {collect_errors}"
        )
      
    return df

In [None]:
def check_data_feature_engineering(df: pd.DataFrame, parameters : Dict[str, Any]) -> Tuple[pd.DataFrame, Dict]:
    """
    Check expectations for the feature engineered dataset.
    - Check if the numerical features are in the expected range.
    - Check if the onehotencoded categorical features have values 0 or 1.

    Afterwards save the validation results and raise an exception and save the errors, if any of the expectations fail.

    Args:
    --
        df (pd.DataFrame): Dataframe to check for nulls.
        parameters (Dict): Parameters from the configuration file.

    Returns:
    --
        df (pd.DataFrame): Input dataframe.
    """

    df = df.copy().drop(columns=['y_pred'])
    
    folder_path = '../data/08_reporting/Expectations_reporting'
    os.makedirs(folder_path, exist_ok=True)
    
    num_cols = df.select_dtypes(include=['number']).columns
    cat_cols = df.select_dtypes(include=['object']).columns

    ohencoded_values = [0,1]
    ranges = {'min': -5, 'max': 5}

    gdf = ge.from_pandas(df)
    for column in num_cols:
        gdf.expect_column_values_to_be_between(column,ranges['min'],ranges['max'])

    for column in cat_cols:
        gdf.expect_column_values_to_be_in_set(column, ohencoded_values)

    # Create the validation results and save them in a json file.
    validation_results = gdf.validate()
    file_path_validation_results = os.path.join(folder_path, "feature_engineered_data_validation_results.json")
    with open(file_path_validation_results, 'w') as json_file:
        json.dump(validation_results.to_json_dict(), json_file)

    failed_expectations = [result for result in validation_results["results"] if not result["success"]]
    
    logger.info(
        f"Total Expectations: {len(validation_results['results'])}"
        f"Failed Expectations: {len(failed_expectations)}"
    )
    
    # Collects the errors in a list and saves them in a json file.
    # Afterwards raises an exception with the errors.
    if failed_expectations:
        collect_errors = []
        for idx, failed_expectation in enumerate(failed_expectations, start=1):
            collect_errors.append(
                f"  Failed Expectation {idx}:"
                f"  Expectation Type: {failed_expectation['expectation_config']['expectation_type']}"
                f"  Column: {failed_expectation['expectation_config']['kwargs']['column']}"
                f"  Details: {failed_expectation['result']}")
            
            # Saves the collected errors in a json file.
            file_path = os.path.join(folder_path, 'feature_engineered_data_errors.json')
            with open(file_path, 'w') as json_file:
                json.dump(collect_errors, json_file)
    
        raise Exception(
            f"Data Quality Validation Failed: {collect_errors}"
        )
   
    return df

In [109]:

import pandas as pd
import logging
from typing import Dict, Tuple, Any
import great_expectations as ge
import datetime
import json
import os

logger = logging.getLogger(__name__)    

# CODE FOR DATA FEATURE ENGINEERING EXPECTATIONS
def check_data_feature_engineering(df: pd.DataFrame, parameters : Dict[str, Any]) -> Tuple[pd.DataFrame, Dict]:
    """
    Check expectations for the feature engineered dataset.
    - Check if the numerical features are in the expected range.
    - Check if the onehotencoded categorical features have values 0 or 1.

    Afterwards save the validation results and raise an exception and save the errors, if any of the expectations fail.

    Args:
    --
        df (pd.DataFrame): Dataframe to check for nulls.
        parameters (Dict): Parameters from the configuration file.

    Returns:
    --
        df (pd.DataFrame): Input dataframe.
    """
    
    df = df.copy().drop(columns=['y_pred'])

    folder_path = '../data/08_reporting/Expectations_reporting'
    os.makedirs(folder_path, exist_ok=True)
    
    num_cols = df.select_dtypes(include=['number']).columns
    cat_cols = df.select_dtypes(include=['object']).columns

    ohencoded_values = [0,1]
    ranges = parameters["num_quality_ranges"]

    gdf = ge.from_pandas(df)
    for column in num_cols:
        gdf.expect_column_values_to_be_between(column,ranges['min'],ranges['max'])

    for column in cat_cols:
        gdf.expect_column_values_to_be_in_set(column, ohencoded_values)

    # Create the validation results and save them in a json file.
    validation_results = gdf.validate()
    file_path_validation_results = os.path.join(folder_path, "feature_engineered_data_validation_results.json")
    with open(file_path_validation_results, 'w') as json_file:
        json.dump(validation_results.to_json_dict(), json_file)

    failed_expectations = [result for result in validation_results["results"] if not result["success"]]
    
    logger.info(
        f"Total Expectations: {len(validation_results['results'])}"
        f"Failed Expectations: {len(failed_expectations)}"
    )
    
    # Collects the errors in a list and saves them in a json file.
    # Afterwards raises an exception with the errors.
    if failed_expectations:
        collect_errors = []
        for idx, failed_expectation in enumerate(failed_expectations, start=1):
            collect_errors.append(
                f"  Failed Expectation {idx}:"
                f"  Expectation Type: {failed_expectation['expectation_config']['expectation_type']}"
                f"  Column: {failed_expectation['expectation_config']['kwargs']['column']}"
                f"  Details: {failed_expectation['result']}")
            
            # Saves the collected errors in a json file.
            file_path = os.path.join(folder_path, 'feature_engineered_data_errors.json')
            with open(file_path, 'w') as json_file:
                json.dump(collect_errors, json_file)
    
        raise Exception(
            f"Data Quality Validation Failed: {collect_errors}"
        )
   

# CODE FOR DATA CLEANING EXPECTATIONS
def check_nulls(gdf, columns):
    for column in columns:
        gdf.expect_column_values_to_not_be_null(column)

def check_categorical_unique_values(gdf, dict_cat_cols):
    for column in dict_cat_cols.keys():
        gdf.expect_column_values_to_be_in_set(column, dict_cat_cols[column])

def check_dtype(gdf, columns, dtype):
    if dtype == 'numeric':
        for column in columns:
            gdf.expect_column_values_to_be_in_type_list(column, ['int64', 'float64'])
    else:
        for column in columns:
            gdf.expect_column_values_to_be_in_type_list(column, ["str"])

def check_if_column_exist(gdf, column_list):
    for column in column_list:
        gdf.expect_column_to_exist(column)

def check_data_cleaning(df: pd.DataFrame, parameters : Dict[str, Any]) -> Tuple[pd.DataFrame, Dict]:
    """
    Check expectations for the cleaned dataset.
    - Check if the number of columns.
    - Check if the columns exist.
    - Check if the columns are of the correct type.
    - Check if the categorical columns have the correct unique values.
    - Check if the numeric columns are within the correct range.
    - Check if the columns have null values.
    - Check if the YearBuilt and YrSold are within the correct range.

    Afterwards save the validation results and raise an exception and save the errors, if any of the expectations fail.

    Args:
    --
        df (pd.DataFrame): Dataframe to check for nulls.
        parameters (Dict): Parameters from the configuration file.

    Returns:
    --
        df (pd.DataFrame): Input dataframe.
    """

    # Creates a folder to save the expectations results.
    folder_path = '../data/08_reporting/Expectations_reporting'
    os.makedirs(folder_path, exist_ok=True)

    current_year = datetime.date.today().year + 1
    num_cols = df.select_dtypes(include=['number']).columns.tolist()
    cat_cols = df.select_dtypes(include=['object']).columns.tolist()
    num_columns = parameters["num_columns"]
    column_list = parameters["column_list"]
    cat_unique_values = parameters["categorical_unique_values"]

    gdf = ge.from_pandas(df)

    gdf.expect_table_column_count_to_equal(num_columns)
    check_if_column_exist(gdf, column_list)
    check_dtype(gdf, num_cols, dtype='numeric')
    check_dtype(gdf, cat_cols, dtype='object')
    check_categorical_unique_values(gdf, cat_unique_values)
    
    check_nulls(gdf, gdf.columns)
    gdf.expect_column_max_to_be_between("numerical__YearBuilt", 1800, current_year)
    gdf.expect_column_max_to_be_between("numerical__YrSold", 1950, current_year)
    
    # Create the validation results and save them in a json file.
    validation_results = gdf.validate()
    file_path_validation_results = os.path.join(folder_path, "Cleaned_data_validation_results.json")
    with open(file_path_validation_results, 'w') as json_file:
        json.dump(validation_results.to_json_dict(), json_file)

    failed_expectations = [result for result in validation_results["results"] if not result["success"]]
    
    logger.info(
        f"Total Expectations: {len(validation_results['results'])}"
        f"Failed Expectations: {len(failed_expectations)}"
    )
    
    # Collects the errors in a list and saves them in a json file.
    # Afterwards raises an exception with the errors.
    if failed_expectations:
        collect_errors = []
        for idx, failed_expectation in enumerate(failed_expectations, start=1):
            collect_errors.append(
                f"  Failed Expectation {idx}:"
                f"  Expectation Type: {failed_expectation['expectation_config']['expectation_type']}"
                f"  Column: {failed_expectation['expectation_config']['kwargs']['column']}"
                f"  Details: {failed_expectation['result']}")
            
            # Saves the collected errors in a json file.
            file_path = os.path.join(folder_path, 'Cleaned_data_errors.json')
            with open(file_path, 'w') as json_file:
                json.dump(collect_errors, json_file)
    
        raise Exception(
            f"Data Quality Validation Failed: {collect_errors}"
        )
      




In [110]:
import pandas as pd
test_df = pd.read_csv('../data/07_model_output/df_with_predict.csv')

In [111]:
# filter test_df to only contain the columns that start with num
test_df = test_df.filter(regex='^num')

In [37]:
test_df.describe().T

Unnamed: 0,count,mean,std,min,25%,50%,75%,max
numerical__numerical__MSSubClass,292.0,0.141781,0.828675,-0.6,-0.6,0.0,0.4,2.8
numerical__numerical__LotFrontage,292.0,-0.06362,1.062118,-2.597045,-0.544413,-0.009049,0.402955,5.455587
numerical__numerical__LotArea,292.0,0.054847,1.431243,-1.971673,-0.555589,-0.143821,0.402711,14.871072
numerical__numerical__OverallQual,292.0,0.005137,0.720925,-2.0,-0.5,0.0,0.5,2.0
numerical__numerical__OverallCond,292.0,0.537671,1.100743,-3.0,0.0,0.0,1.0,4.0
numerical__numerical__YearBuilt,292.0,0.009917,0.58857,-1.916667,-0.354167,0.0625,0.5625,0.770833
numerical__numerical__YearRemodAdd,292.0,-0.243691,0.534789,-1.157895,-0.684211,-0.052632,0.243421,0.421053
numerical__numerical__MasVnrArea,292.0,0.622536,1.263942,0.0,0.0,0.0,0.878012,9.638554
numerical__numerical__BsmtFinSF1,292.0,0.068797,0.616716,-0.533287,-0.533287,-0.014563,0.417129,2.501387
numerical__numerical__BsmtFinSF2,292.0,52.136986,173.346231,0.0,0.0,0.0,0.0,1474.0


In [117]:

import pandas as pd
import logging
from typing import Dict, Tuple, Any
import great_expectations as ge
import datetime
import json
import os

logger = logging.getLogger(__name__)    

# CODE FOR DATA FEATURE ENGINEERING EXPECTATIONS
def check_data_feature_engineering(df: pd.DataFrame, parameters : Dict[str, Any]) -> Tuple[pd.DataFrame, Dict]:
    """
    Check expectations for the feature engineered dataset.
    - Check if the numerical features are in the expected range.
    - Check if the onehotencoded categorical features have values 0 or 1.

    Afterwards save the validation results and raise an exception and save the errors, if any of the expectations fail.

    Args:
    --
        df (pd.DataFrame): Dataframe to check for nulls.
        parameters (Dict): Parameters from the configuration file.

    Returns:
    --
        df (pd.DataFrame): Input dataframe.
    """
    
    df = df.copy().drop(columns=['y_pred'])

    folder_path = '../data/08_reporting/Expectations_reporting'
    os.makedirs(folder_path, exist_ok=True)
    
    num_cols = df.select_dtypes(include=['number']).columns
    cat_cols = df.select_dtypes(include=['object']).columns

    ohencoded_values = [0,1]
    ranges = parameters["num_quality_ranges"]

    gdf = ge.from_pandas(df)
    for column in num_cols:
        gdf.expect_column_values_to_be_between(column,ranges['min'],ranges['max'])

    for column in cat_cols:
        gdf.expect_column_values_to_be_in_set(column, ohencoded_values)

    # Create the validation results and save them in a json file.
    validation_results = gdf.validate()
    file_path_validation_results = os.path.join(folder_path, "feature_engineered_data_validation_results.json")
    with open(file_path_validation_results, 'w') as json_file:
        json.dump(validation_results.to_json_dict(), json_file)

    failed_expectations = [result for result in validation_results["results"] if not result["success"]]
    
    logger.info(
        f"Total Expectations: {len(validation_results['results'])}"
        f"Failed Expectations: {len(failed_expectations)}"
    )
    
    # Collects the errors in a list and saves them in a json file.
    # Afterwards raises an exception with the errors.
    if failed_expectations:
        collect_errors = []
        for idx, failed_expectation in enumerate(failed_expectations, start=1):
            collect_errors.append(
                f"  Failed Expectation {idx}:"
                f"  Expectation Type: {failed_expectation['expectation_config']['expectation_type']}"
                f"  Column: {failed_expectation['expectation_config']['kwargs']['column']}"
                f"  Details: {failed_expectation['result']}")
            
            # Saves the collected errors in a json file.
            file_path = os.path.join(folder_path, 'feature_engineered_data_errors.json')
            with open(file_path, 'w') as json_file:
                json.dump(collect_errors, json_file)
    
        raise Exception(
            f"Data Quality Validation Failed: {collect_errors}"
        )
   

# CODE FOR DATA CLEANING EXPECTATIONS
def check_nulls(gdf, columns):
    for column in columns:
        gdf.expect_column_values_to_not_be_null(column)

def check_categorical_unique_values(gdf, dict_cat_cols):
    for column in dict_cat_cols.keys():
        gdf.expect_column_values_to_be_in_set(column, dict_cat_cols[column])

def check_dtype(gdf, columns, dtype):
    if dtype == 'numeric':
        for column in columns:
            gdf.expect_column_values_to_be_in_type_list(column, ['int64', 'float64'])
    else:
        for column in columns:
            gdf.expect_column_values_to_be_in_type_list(column, ["str"])

def check_if_column_exist(gdf, column_list):
    for column in column_list:
        gdf.expect_column_to_exist(column)

def check_data_cleaning(df: pd.DataFrame, parameters : Dict[str, Any]) -> Tuple[pd.DataFrame, Dict]:
    """
    Check expectations for the cleaned dataset.
    - Check if the number of columns.
    - Check if the columns exist.
    - Check if the columns are of the correct type.
    - Check if the categorical columns have the correct unique values.
    - Check if the numeric columns are within the correct range.
    - Check if the columns have null values.
    - Check if the YearBuilt and YrSold are within the correct range.

    Afterwards save the validation results and raise an exception and save the errors, if any of the expectations fail.

    Args:
    --
        df (pd.DataFrame): Dataframe to check for nulls.
        parameters (Dict): Parameters from the configuration file.

    Returns:
    --
        df (pd.DataFrame): Input dataframe.
    """

    # Creates a folder to save the expectations results.
    folder_path = '../data/08_reporting/Expectations_reporting'
    os.makedirs(folder_path, exist_ok=True)

    current_year = datetime.date.today().year + 1
    num_cols = df.select_dtypes(include=['number']).columns.tolist()
    cat_cols = df.select_dtypes(include=['object']).columns.tolist()
    num_columns = parameters["num_columns"]
    column_list = parameters["column_list"]
    cat_unique_values = parameters["categorical_unique_values"]

    gdf = ge.from_pandas(df)

    gdf.expect_table_column_count_to_equal(num_columns)
    check_if_column_exist(gdf, column_list)
    check_dtype(gdf, num_cols, dtype='numeric')
    check_dtype(gdf, cat_cols, dtype='object')
    check_categorical_unique_values(gdf, cat_unique_values)
    
    check_nulls(gdf, gdf.columns)
    gdf.expect_column_max_to_be_between("numerical__YearBuilt", 1800, current_year)
    gdf.expect_column_max_to_be_between("numerical__YrSold", 1950, current_year)
    
    # Create the validation results and save them in a json file.
    validation_results = gdf.validate()
    file_path_validation_results = os.path.join(folder_path, "Cleaned_data_validation_results.json")
    with open(file_path_validation_results, 'w') as json_file:
        json.dump(validation_results.to_json_dict(), json_file)

    failed_expectations = [result for result in validation_results["results"] if not result["success"]]
    
    logger.info(
        f"Total Expectations: {len(validation_results['results'])}"
        f"Failed Expectations: {len(failed_expectations)}"
    )
    
    # Collects the errors in a list and saves them in a json file.
    # Afterwards raises an exception with the errors.
    if failed_expectations:
        collect_errors = []
        for idx, failed_expectation in enumerate(failed_expectations, start=1):
            print(failed_expectation)
            collect_errors.append(
                f"  Failed Expectation {idx}:"
                f"  Expectation Type: {failed_expectation['expectation_config']['expectation_type']}"
                f"  Column: {failed_expectation['expectation_config']['kwargs']['column']}"
                f"  Details: {failed_expectation['result']}")
            
            # Saves the collected errors in a json file.
            file_path = os.path.join(folder_path, 'Cleaned_data_errors.json')
            with open(file_path, 'w') as json_file:
                json.dump(collect_errors, json_file)
    
        raise Exception(
            f"Data Quality Validation Failed: {collect_errors}"
        )

In [119]:
# load yml file to dict from this path C:\Users\jkick\Documents\OFFLINE\mlops-nova-project\conf\base\parameters\data_quality.yml
import yaml
with open('../conf/base/parameters/data_quality.yml') as file:
    parameters = yaml.load(file, Loader=yaml.FullLoader)

In [120]:
#imoprt file C:\Users\jkick\Documents\OFFLINE\mlops-nova-project\data\02_intermediate\X_train_cleaned.csv
import pandas as pd
df = pd.read_csv('../data/02_intermediate/X_train_cleaned.csv')

In [121]:
check_data_cleaning(df, parameters)