### To test data quality using Greate expectation

In [1]:
import os
import pandas as pd
import great_expectations as gx
from great_expectations.dataset import PandasDataset
from datetime import datetime
import json
from great_expectations import DataContext


In [2]:
GOOD_DATA_DIR = "./raw_data/good_data"
BAD_DATA_DIR = "./raw_data/bad_data"

In [3]:
import great_expectations as gx
context = gx.get_context()
suite = context.get_expectation_suite("milling_machine_data_quality")
print(suite)

{
  "expectation_suite_name": "milling_machine_data_quality",
  "ge_cloud_id": null,
  "expectations": [
    {
      "expectation_type": "expect_table_columns_to_match_ordered_list",
      "kwargs": {
        "column_list": [
          "Product ID",
          "Type",
          "Air temperature [K]",
          "Process temperature [K]",
          "Rotational speed [rpm]",
          "Torque [Nm]",
          "Tool wear [min]"
        ]
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "Product ID"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_be_unique",
      "kwargs": {
        "column": "Product ID"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_match_regex",
      "kwargs": {
        "column": "Product ID",
        "regex": "^[LMH]\\d+$"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_c

In [4]:
data = pd.read_csv("../raw-data/data_split_17.csv")

df = pd.DataFrame(data)

In [5]:
df.head()

Unnamed: 0,Product ID,Type,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min]
0,M21660,M,,310.6,1310,48.8,103
1,L53981,InvalidType,301.1,310.7,1334,48.2,106
2,XYZ123,M,301.1,310.7,-5000,31.0,108
3,M21663,M,301.1,310.7,1367,43.4,111
4,L53984,InvalidType,,310.6,1588,33.7,114


In [6]:
def extract_validation_statistics(file_name, ge_df, required_columns, suite, error_threshold=0.9):
    """
    Extract statistics from the validation results and return them for database insertion.

    Parameters:
    ge_df (Great Expectations DataFrame): The dataframe with expectations.
    file_name (str): The name of the file being processed.
    required_columns (list): List of required columns to check for null values.
    suite (Great Expectations Expectation Suite): The loaded expectation suite.
    error_threshold (float): The threshold for considering errors (default is 0.9 for 90%).

    Returns:
    dict: A dictionary containing the validation statistics for the file.
    """
    # Run the validation
    validation_results = ge_df.validate(expectation_suite=suite)

    # Extract the underlying pandas DataFrame
    # ge_df = ge_df.dataframe

    # Get total rows
    total_rows = len(ge_df)

    # Initialize counters for valid and invalid rows
    valid_rows = 0
    invalid_rows = 0
    error_details = {}

    # Loop through each validation result and accumulate errors
    for result in validation_results["results"]:
        if not result["success"]:
            invalid_rows += 1
            # Extract error details
            expectation_name = result["expectation_config"]["expectation_type"]
            error_details[expectation_name] = error_details.get(expectation_name, 0) + 1

    valid_rows = total_rows - invalid_rows

    # Calculate error rate
    error_rate = (invalid_rows / total_rows) * 100 if total_rows > 0 else 0

    # Check for missing columns (non-null requirement) based on the expected threshold
    missing_columns = []
    for col in required_columns:
        column_valid = ge_df[col].notnull().mean() >= error_threshold
        if not column_valid:
            missing_columns.append(col)

    # Prepare the statistics for the database
    stats = {
        "id": None,  # Placeholder for database auto-generated ID (can be handled by DB)
        "file_name": file_name,
        "total_rows": total_rows,
        "valid_rows": valid_rows,
        "invalid_rows": invalid_rows,
        "error_rate": error_rate,
        "error_details": json.dumps(error_details),  # Convert error details to JSON string
        "processed_at": datetime.now().isoformat(),  # Store current timestamp in ISO format
        "missing_columns": missing_columns,
    }

    return stats


In [6]:
required_columns = [
    "Product ID", "Type", "Air temperature [K]", "Process temperature [K]", 
     "Rotational speed [rpm]", "Torque [Nm]", "Tool wear [min]"
]

In [8]:
def check_criticality(validation_result):
    # Initialize variables for error rate calculation
    total_expectations = len(validation_result.get("results", []))
    #print(total_expectations)
    failed_expectations = sum(1 for result in validation_result.get("results", []) if not result.get("success", True))
    # print(f"Ivalid_expectaion_count ---> {failed_expectations}")
    error_rate = (failed_expectations / total_expectations) * 100 if total_expectations > 0 else 0
    # print(error_rate)
    # Determine overall criticality based on error rate
    if error_rate >= 50:
        overall_criticality = "High"
    elif 20 <= error_rate < 50:
        overall_criticality = "Medium"
    else:
        overall_criticality = "Low"
    
    error_stat_pot = [overall_criticality, failed_expectations, error_rate]
    
    return error_stat_pot

In [9]:
directory_path = "../raw-data"

val_stat = pd.DataFrame(columns=['file_name', 'criticality_level', 'failed_expection_count', 'error_rate'])
# Loop through each file in the directory
for file_name in os.listdir(directory_path):
    if file_name.endswith(".csv"):  # Process only CSV files
        # Build the full file path
        file_path = os.path.join(directory_path, file_name)
        
        # Read the CSV file into a pandas DataFrame
        new_df = pd.read_csv(file_path)
        
        # Convert the pandas DataFrame to a Great Expectations DataFrame
        ge_df = gx.from_pandas(new_df)
        # print(ge_df.head())  # Display the first few rows as an example
        
        context = DataContext()
        
        suite = context.get_expectation_suite("milling_machine_data_quality")
        validation_result = ge_df.validate(expectation_suite=suite)
        
        erro_stat_pot = check_criticality(validation_result=validation_result)
        val_stat.loc[len(val_stat)] = [file_name, erro_stat_pot[0], erro_stat_pot[1], erro_stat_pot[2]]
        # print(f"Processing file: {file_name}")
        # print(f"with c leve ---> {c_level}")
        
        

In [10]:
val_stat

Unnamed: 0,file_name,criticality_level,failed_expection_count,error_rate
0,data_split_24.csv,Low,0,0.0
1,data_split_14.csv,Medium,8,34.782609
2,data_split_8.csv,Low,2,8.695652
3,data_split_21.csv,Medium,9,39.130435
4,data_split_23.csv,Low,2,8.695652
5,data_split_9.csv,Low,3,13.043478
6,data_split_15.csv,Low,1,4.347826
7,data_split_11.csv,Medium,7,30.434783
8,data_split_18.csv,Medium,9,39.130435
9,data_split_12.csv,Medium,8,34.782609


In [8]:
file_name = "data_split_9.csv"
new_df = pd.read_csv("../raw-data/data_split_9.csv")
ge_9_df = gx.from_pandas(new_df)
ge_9_df

Unnamed: 0,Product ID,Type,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min]
0,L50780,L,301.7,310.5,1523,34.7,181
1,L50781,L,301.7,310.5,1377,,183
2,M18462,M,301.7,310.6,1417,45.2,185
3,L50783,L,301.7,310.7,1542,40.0,188
4,L50784,L,301.8,310.8,1557,33.3,190
...,...,...,...,...,...,...,...
395,L51175,L,302.3,311.3,1621,31.6,135
396,MissingID,M,302.3,311.3,1625,33.1,137
397,L51177,L,302.2,311.1,1394,53.7,140
398,L51178,L,302.1,311.0,1586,35.1,142


### Validationg whole file

In [9]:
context = DataContext()        
suite = context.get_expectation_suite("milling_machine_data_quality")
validation_result_9 = ge_9_df.validate(expectation_suite=suite)

In [10]:
validation_result_9

{
  "success": false,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "expectation_type": "expect_table_columns_to_match_ordered_list",
        "kwargs": {
          "column_list": [
            "Product ID",
            "Type",
            "Air temperature [K]",
            "Process temperature [K]",
            "Rotational speed [rpm]",
            "Torque [Nm]",
            "Tool wear [min]"
          ]
        },
        "meta": {}
      },
      "result": {
        "observed_value": [
          "Product ID",
          "Type",
          "Air temperature [K]",
          "Process temperature [K]",
          "Rotational speed [rpm]",
          "Torque [Nm]",
          "Tool wear [min]"
        ]
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_message": null,
        "exception_traceback": null
      }
    },
    {
      "success": true,
      "expectation_config": {
        "expectation_type":

### Experiments

In [11]:
import great_expectations as gx

context = gx.get_context()

datasource = context.get_datasource("milling_machine_data")
print("Available Data Connectors:", datasource.data_connectors.keys())
print(datasource.get_available_data_asset_names())

Available Data Connectors: dict_keys(['default_inferred_data_connector_name', 'default_runtime_data_connector_name'])
{'default_inferred_data_connector_name': ['data_split_16.csv', 'data_split_8.csv', 'data_split_21.csv', 'data_split_4.csv', 'data_split_0.csv', 'data_split_7.csv', 'data_split_22.csv', 'bad_data', 'data_split_2.csv', 'data_split_11.csv', 'data_split_12.csv', 'data_split_19.csv', 'data_split_13.csv', 'good_data', 'data_split_9.csv', 'data_split_17.csv', 'data_split_1.csv', 'data_split_18.csv', 'data_split_23.csv', 'data_split_20.csv', 'data_split_3.csv', 'data_split_10.csv', 'data_split_15.csv', 'data_split_6.csv', 'data_split_5.csv', 'data_split_24.csv', 'data_split_14.csv'], 'default_runtime_data_connector_name': ['my_runtime_asset_name']}


In [12]:
from great_expectations.core.batch import BatchRequest
batch_request = BatchRequest(
    datasource_name="milling_machine_data",  # Replace with actual datasource name
    data_connector_name="default_inferred_data_connector_name",  # Replace with actual data connector name
    data_asset_name="data_split_16.csv",  # Replace with actual data asset
)

In [13]:
GREAT_EXPECTATION = '/home/kuzhalogi/WorkSpace/EquipmentFailurePred/gx'
SUITE_NAME = "milling_machine_data_quality"
suite = context.get_expectation_suite(SUITE_NAME)
# print(suite)
validator = context.get_validator(batch_request=batch_request, expectation_suite=suite)

In [14]:
# val_stat_9 = validate_data("../raw-data/data_split_11.csv")
df11 = gx.read_csv("../raw-data/data_split_17.csv")

In [15]:
sam_df = df11.sample(n=6)
sam_df.info()

<class 'great_expectations.dataset.pandas_dataset.PandasDataset'>
Index: 6 entries, 296 to 30
Data columns (total 7 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   Product ID               6 non-null      object 
 1   Type                     6 non-null      object 
 2   Air temperature [K]      5 non-null      float64
 3   Process temperature [K]  6 non-null      float64
 4   Rotational speed [rpm]   6 non-null      int64  
 5   Torque [Nm]              6 non-null      float64
 6   Tool wear [min]          6 non-null      int64  
dtypes: float64(3), int64(2), object(2)
memory usage: 384.0+ bytes


In [33]:
sam_df

Unnamed: 0,Product ID,Type,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min]
296,L54276,L,300.6,310.2,1489,47.8,21
178,L54158,L,300.7,311.1,1427,50.3,137
346,L54326,L,300.4,309.9,-5000,31.8,154
389,L54369,L,300.4,310.4,1486,1000.0,48
340,L54320,L,,309.9,1382,46.5,134
30,M21690,M,301.0,9999.0,1359,53.4,180


In [None]:
# sam_result=sam_df.validate(expectation_suite=suite)

In [17]:
import pandas as pd

def validation_result_to_df(validation_result):
    """
    Convert a Great Expectations validation result into a Pandas DataFrame.
    """
    results = []
    
    # Iterate through each expectation result
    for result in validation_result.results:
        row = {
            "expectation_type": result.expectation_config.expectation_type,
            "success": result.success,
            "column": result.expectation_config.kwargs.get("column"),
        }
        results.append(row)
    
    # Create a DataFrame
    df = pd.DataFrame(results)
    return df


In [18]:
all_validation_results = []
for i, row in df11.iterrows():
    # Convert the row into a single-row DataFrame
    row_df = row.to_frame().T
    
    # Convert the DataFrame to a Great Expectations DataFrame
    gx_df = gx.from_pandas(row_df)
    
    # Validate the row against the expectation suite
    resu = gx_df.validate(expectation_suite=suite)
    
    # Convert the validation result to a DataFrame
    validation_r = validation_result_to_df(resu)
    validation_r["row_index"] = i
    failed_validations = validation_r[validation_r["success"] == False]
    all_validation_results.append(failed_validations)
    

In [34]:
def validate_rows(df: pd.DataFrame, suite: any) -> list:
    all_validation_failed = []
    for i, row in df.iterrows():
        # Convert the row into a single-row DataFrame
        row_df = row.to_frame().T
        # Convert the DataFrame to a Great Expectations DataFrame
        gx_df = gx.from_pandas(row_df)
        # Validate the row against the expectation suite
        resu = gx_df.validate(expectation_suite=suite)
        # Convert the validation result to a DataFrame
        validation_r = validation_result_to_df(resu)
        validation_r["row_index"] = i
        failed_validations = validation_r[validation_r["success"] == False]
        all_validation_failed.append(failed_validations)
    return all_validation_failed
        

In [35]:
rows_validation_list = validate_rows(sam_df,suite)

In [36]:
IsBadData = False if len(rows_validation_list) == 0 else True
if IsBadData:
    failed_validations_df = pd.concat(rows_validation_list, ignore_index=True)
else:
    print("The file has good data skipping the tasks")

In [37]:
failed_validations_df

Unnamed: 0,expectation_type,success,column,row_index
0,expect_column_values_to_be_between,False,Rotational speed [rpm],346
1,expect_column_values_to_be_between,False,Torque [Nm],389
2,expect_column_values_to_not_be_null,False,Air temperature [K],340
3,expect_column_values_to_be_between,False,Process temperature [K],30


In [39]:
def get_validation_stats(OG_df: pd.DataFrame,failed_validations_df: pd.DataFrame):

    total_rows = len(OG_df)
    # Number of rows with at least one failed expectation
    bad_rows = failed_validations_df[failed_validations_df["success"] == False]["row_index"].nunique()
    # Number of rows with all expectations passed
    good_rows = total_rows - bad_rows
    # Failure rate (percentage of rows with at least one failed expectation)
    failure_rate = (bad_rows / total_rows) * 100
    stats_capsule = {"total_rows":total_rows,
                     "good_rows":good_rows,
                     "bad_rows":bad_rows,
                     "failure_rate":failure_rate,
                     }
    return stats_capsule


def get_failed_stats(failed_validations_df: pd.DataFrame):
    # Most common failed expectations
    most_common_failed_expectations = (
        failed_validations_df[failed_validations_df["success"] == False]["expectation_type"]
        .value_counts()
        .reset_index()
    )
    most_common_failed_expectations.columns = ["expectation_type", "failure_count"]
    # Columns with the most failures
    columns_with_most_failures = (
        failed_validations_df[failed_validations_df["success"] == False]["column"]
        .value_counts()
        .reset_index()
    )
    columns_with_most_failures.columns = ["column", "failure_count"]
    
    return most_common_failed_expectations, columns_with_most_failures


In [41]:
get_failed_stats(failed_validations_df)

(                      expectation_type  failure_count
 0   expect_column_values_to_be_between              3
 1  expect_column_values_to_not_be_null              1,
                     column  failure_count
 0   Rotational speed [rpm]              1
 1              Torque [Nm]              1
 2      Air temperature [K]              1
 3  Process temperature [K]              1)

In [40]:
get_validation_stats(sam_df,failed_validations_df)

{'total_rows': 6,
 'good_rows': 2,
 'bad_rows': 4,
 'failure_rate': 66.66666666666666}

TODO:
for the send_alert task, use a failure rate to decide the criticality level if based on that send alerts.

- 75 to 100
- 75 - 80 low
- 80 - 89 medium
- 90 - 00 high

that the level of criticallity you have to send alert with. the expectation type and most failure columns


you have to create separate tables to store those fialure expectations types and the column with their count.

In [None]:
# Print statistics
print("Validation Statistics for the Entire File:")
print(f"Total Rows: {total_rows}")
print(f"Good Rows: {good_rows}")
print(f"Bad Rows: {bad_rows}")
print(f"Failure Rate: {failure_rate:.2f}%")
print("\nMost Common Failed Expectations:")
print(most_common_failed_expectations)
print("\nColumns with the Most Failures:")
print(columns_with_most_failures)

In [42]:
row_with_errors = failed_validations_df["row_index"].unique()
row_with_errors=set(row_with_errors)

In [43]:
print(row_with_errors)

{346, 340, 389, 30}


In [48]:
import os
def split_file(df: pd.DataFrame, error_indices: set, file_name: str, good_data_dir: str, bad_data_dir: str):
    """
    Split the DataFrame into good and bad rows, and save them to respective directories.
    """
    # Convert set to list for indexing
    error_indices_list = list(error_indices)
    
    good_rows = df.drop(error_indices_list)
    bad_rows = df.loc[error_indices_list]

    # print(good_rows)
    # Save good rows to good_data
    # good_file_path = os.path.join(good_data_dir, file_name)
    # good_rows.to_csv(good_file_path, index=False)
    # print(f"Good rows saved to {good_file_path}.")
    # print(bad_rows)
    # Save bad rows to bad_data
    # bad_file_name = f"bad_{file_name}"
    # bad_file_path = os.path.join(bad_data_dir, bad_file_name)
    # bad_rows.to_csv(bad_file_path, index=False)
    # print(f"Bad rows saved to {bad_file_path}.")
    return good_rows, bad_rows

In [52]:
gr, br =split_file(sam_df,row_with_errors,"data_split_17.csv","../raw-data/good_data/","../raw-data/bad_data")

In [54]:
print(gr)

    Product ID Type  Air temperature [K]  Process temperature [K]  \
296     L54276    L                300.6                    310.2   
178     L54158    L                300.7                    311.1   

     Rotational speed [rpm]  Torque [Nm]  Tool wear [min]  
296                    1489         47.8               21  
178                    1427         50.3              137  


In [None]:
def split_and_save_files(capsule):
    """
    Split the DataFrame into good and bad rows, and save them to respective directories.
    """
    file_name = capsule["stats"]["file_name"]
    file_path = capsule["file_path"]
    df = pd.read_csv(file_path)
    failed_validations_df = capsule["validations_failed_df"]
    row_with_errors = failed_validations_df["row_index"].unique()
    row_with_errors=set(row_with_errors)
    
    # Convert set to list for indexing
    error_indices_list = list(row_with_errors)
    
    good_rows = df.drop(error_indices_list)
    bad_rows = df.loc[error_indices_list]

    # Save good rows to good_data
    good_file_name = f"good_{file_name}"
    good_file_path = os.path.join(GOOD_DATA_FOLDER, good_file_name)
    good_rows.to_csv(good_file_path, index=False)
    logging.info(f"Good rows saved to {good_file_path}.")

    # Save bad rows to bad_data
    bad_file_name = f"bad_{file_name}"
    bad_file_path = os.path.join(BAD_DATA_FOLDER, bad_file_name)
    bad_rows.to_csv(bad_file_path, index=False)
    logging.info(f"Bad rows saved to {bad_file_path}.")

## Validation by each row

### Utils.py

In [None]:
import os
import pandas as pd
import logging

def create_directories(base_dir: str, good_data_dir: str, bad_data_dir: str):
    """
    Create 'good_data' and 'bad_data' directories if they don't exist.
    """
    os.makedirs(good_data_dir, exist_ok=True)
    os.makedirs(bad_data_dir, exist_ok=True)
    print(f"Created directories: {good_data_dir}, {bad_data_dir}")

def move_file(file_path: str, target_dir: str):
    """
    Move a file to the specified target directory.
    """
    file_name = os.path.basename(file_path)
    new_path = os.path.join(target_dir, file_name)
    os.rename(file_path, new_path)
    print(f"File moved to {new_path}.")


### actual dag

In [31]:
def split_and_save_files(capsule):
    file_path = capsule["file_path"]
    df = pd.read_csv(file_path)

    # Load the Great Expectations suite
    context = gx.data_context.DataContext(GREAT_EXPECTATION)
    suite = context.get_expectation_suite(SUITE_NAME)

    # Determine the file name and base directory
    file_name = os.path.basename(file_path)
    base_dir = os.path.dirname(file_path)
    good_data_dir = os.path.join(base_dir, "good_data")
    bad_data_dir = os.path.join(base_dir, "bad_data")

    # Create directories if they don't exist
    create_directories(base_dir, good_data_dir, bad_data_dir)

    # Get indices of rows with errors
    error_indices = get_error_indices(df, suite)

    if not error_indices:
        # No errors: Move the file to good_data
        move_file(file_path, good_data_dir)
    elif len(error_indices) == len(df):
        # All rows have errors: Move the file to bad_data
        move_file(file_path, bad_data_dir)
    else:
        # Some rows have errors: Split the file
        split_file(df, error_indices, file_name, good_data_dir, bad_data_dir)

        # Remove the original file after splitting
        os.remove(file_path)
        print(f"Original file {file_name} removed after splitting.")

In [None]:
split_and_save_files(val_stat_9)

In [37]:
from great_expectations.core.expectation_suite import ExpectationSuite
import json

with open("../gx/expectations/milling_machine_data_quality.json", "r") as f:
    suite_dict = json.load(f)

suite = ExpectationSuite(**suite_dict)

In [38]:
suite

{
  "expectation_suite_name": "milling_machine_data_quality",
  "ge_cloud_id": null,
  "expectations": [
    {
      "expectation_type": "expect_table_columns_to_match_ordered_list",
      "kwargs": {
        "column_list": [
          "Product ID",
          "Type",
          "Air temperature [K]",
          "Process temperature [K]",
          "Rotational speed [rpm]",
          "Torque [Nm]",
          "Tool wear [min]"
        ]
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_not_be_null",
      "kwargs": {
        "column": "Product ID"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_be_unique",
      "kwargs": {
        "column": "Product ID"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_column_values_to_match_regex",
      "kwargs": {
        "column": "Product ID",
        "regex": "^[LMH]\\d+$"
      },
      "meta": {}
    },
    {
      "expectation_type": "expect_c

In [18]:
only_features_df=pd.read_csv('../data/onlyfeatures.csv')

In [19]:
only_features_df.describe()

Unnamed: 0,Air temperature [K],Process temperature [K],Rotational speed [rpm],Torque [Nm],Tool wear [min]
count,10000.0,10000.0,10000.0,10000.0,10000.0
mean,300.00493,310.00556,1538.7761,39.98691,107.951
std,2.000259,1.483734,179.284096,9.968934,63.654147
min,295.3,305.7,1168.0,3.8,0.0
25%,298.3,308.8,1423.0,33.2,53.0
50%,300.1,310.1,1503.0,40.1,108.0
75%,301.5,311.1,1612.0,46.8,162.0
max,304.5,313.8,2886.0,76.6,253.0


In [20]:
only_features_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 10000 entries, 0 to 9999
Data columns (total 7 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   Product ID               10000 non-null  object 
 1   Type                     10000 non-null  object 
 2   Air temperature [K]      10000 non-null  float64
 3   Process temperature [K]  10000 non-null  float64
 4   Rotational speed [rpm]   10000 non-null  int64  
 5   Torque [Nm]              10000 non-null  float64
 6   Tool wear [min]          10000 non-null  int64  
dtypes: float64(3), int64(2), object(2)
memory usage: 547.0+ KB
