# DATA VALIDATION PIPELINE

In [1]:
import os

os.chdir("../") # currently in ../research so I'm trying to change the working directory to the whole project
%pwd

'/Users/nanakwamekankam/Desktop/NK/Portfolio Projects/End-to-end-ML-Project-with-MLFlow'

In [2]:
import pandas as pd

df = pd.read_csv("/Users/nanakwamekankam/Desktop/NK/Portfolio Projects/End-to-end-ML-Project-with-MLFlow/artifacts/data_ingestion/winequality-red.csv")
df.head(10)

Unnamed: 0,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
0,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
1,7.8,0.88,0.0,2.6,0.098,25.0,67.0,0.9968,3.2,0.68,9.8,5
2,7.8,0.76,0.04,2.3,0.092,15.0,54.0,0.997,3.26,0.65,9.8,5
3,11.2,0.28,0.56,1.9,0.075,17.0,60.0,0.998,3.16,0.58,9.8,6
4,7.4,0.7,0.0,1.9,0.076,11.0,34.0,0.9978,3.51,0.56,9.4,5
5,7.4,0.66,0.0,1.8,0.075,13.0,40.0,0.9978,3.51,0.56,9.4,5
6,7.9,0.6,0.06,1.6,0.069,15.0,59.0,0.9964,3.3,0.46,9.4,5
7,7.3,0.65,0.0,1.2,0.065,15.0,21.0,0.9946,3.39,0.47,10.0,7
8,7.8,0.58,0.02,2.0,0.073,9.0,18.0,0.9968,3.36,0.57,9.5,7
9,7.5,0.5,0.36,6.1,0.071,17.0,102.0,0.9978,3.35,0.8,10.5,5


In [3]:
df.isnull().sum()

fixed acidity           0
volatile acidity        0
citric acid             0
residual sugar          0
chlorides               0
free sulfur dioxide     0
total sulfur dioxide    0
density                 0
pH                      0
sulphates               0
alcohol                 0
quality                 0
dtype: int64

In [4]:
df.isna().sum()

fixed acidity           0
volatile acidity        0
citric acid             0
residual sugar          0
chlorides               0
free sulfur dioxide     0
total sulfur dioxide    0
density                 0
pH                      0
sulphates               0
alcohol                 0
quality                 0
dtype: int64

In [5]:
df.shape

(1599, 12)

In [6]:
df.columns

Index(['fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar',
       'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density',
       'pH', 'sulphates', 'alcohol', 'quality'],
      dtype='object')

In [7]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataValidationConfig:
    root_dir: Path
    unzip_data_dir: Path
    STATUS_FILE: str
    all_schema: dict

In [8]:
from mlProject.constants import *
from mlProject.utils.common import read_yaml, create_directories


class ConfigurationManager:
    def __init__(self):
        self,
        self.config = read_yaml(CONFIG_FILE_PATH)
        self.param = read_yaml(PARAMS_FILE_PATH)
        self.schema = read_yaml(SCHEMA_FILE_PATH)

        create_directories([self.config.artifacts_root])


    def get_data_validation_config(self):
        config = self.config.data_validation
        schema = self.schema.COLUMNS
        
        create_directories([config.root_dir])

        data_validation_config = DataValidationConfig(
            root_dir= config.root_dir,
            unzip_data_dir= config.unzip_data_dir,
            STATUS_FILE= config.STATUS_FILE,
            all_schema= schema
        )

        return data_validation_config

In [13]:
import os
from mlProject import logger


class DataValidation:
    def __init__(self, config):
        self.config = config

    def validate_all_columns(self):
        # checks schema to make sure all columns present in the dataset are accounted for
        try:
            validation_status = True
            df = pd.read_csv(self.config.unzip_data_dir)
            columns = list(df.columns)
            schema = self.config.all_schema

            for column in columns:
                if column not in schema:
                    validation_status = False
                    with open(self.config.STATUS_FILE, 'w') as f:
                        f.write(f"Validation status: {validation_status} (Unexpected column{column} found in dataset not in schema)")
                # else:
                #     validation_status = True
                #     with open(self.config.STATUS_FILE, 'w') as f:
                #         f.write(f"Validation status: {validation_status}")

            # Check for datatype mismatches
            for column, expected_dtype in schema.items():
                if column in df.columns:
                    actual_dtype = df[column].dtype.name  # Get the data type name
                    if actual_dtype != expected_dtype:
                        validation_status = False
                        with open(self.config.STATUS_FILE, 'w') as f:
                            f.write(f"Validation failed: Column '{column}' expected '{expected_dtype}' but found '{actual_dtype}'.\n")
                        return validation_status  # Exit early if invalid
                else:
                    validation_status = False
                    with open(self.config.STATUS_FILE, 'w') as f:
                        f.write(f"Validation failed: Missing column '{column}'.\n")
                    return validation_status
            
            # If all checks pass
            with open(self.config.STATUS_FILE, 'w') as f:
                f.write(f"Validation status: {validation_status}\n")

            return validation_status
        
        except Exception as e:
            raise e
    


In [14]:
try:
    config = ConfigurationManager()
    data_validation_config = config.get_data_validation_config()
    data_validation = DataValidation(config=data_validation_config)
    data_validation.validate_all_columns()
except Exception as e:
    raise e

[2025-02-04 15:03:57,836: INFO: common: yaml file: config/config.yaml loaded successfully]
[2025-02-04 15:03:57,841: INFO: common: yaml file: params.yaml loaded successfully]
[2025-02-04 15:03:57,844: INFO: common: yaml file: schema.yaml loaded successfully]
[2025-02-04 15:03:57,845: INFO: common: created directory at: artifacts]
[2025-02-04 15:03:57,846: INFO: common: created directory at: artifacts/data_validation]
