In [1]:
import os
import pandas as pd
from dataclasses import dataclass
from pathlib import Path
from mlProject.logging.customLogger import logger

In [2]:
!pwd

/Users/vedantmahida/Desktop/end-to-end-ml-with-mlflow/notebooks


In [3]:
os.chdir('../')

In [4]:
!pwd

/Users/vedantmahida/Desktop/end-to-end-ml-with-mlflow


In [5]:
# Schema Validation

# 1.Update config.yaml
# 2. Update schema.yaml (see columns)
df = pd.read_csv('artifacts/data_ingestion/winequality-red.csv')
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1599 entries, 0 to 1598
Data columns (total 12 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   fixed acidity         1599 non-null   float64
 1   volatile acidity      1599 non-null   float64
 2   citric acid           1599 non-null   float64
 3   residual sugar        1599 non-null   float64
 4   chlorides             1599 non-null   float64
 5   free sulfur dioxide   1599 non-null   float64
 6   total sulfur dioxide  1599 non-null   float64
 7   density               1599 non-null   float64
 8   pH                    1599 non-null   float64
 9   sulphates             1599 non-null   float64
 10  alcohol               1599 non-null   float64
 11  quality               1599 non-null   int64  
dtypes: float64(11), int64(1)
memory usage: 150.0 KB


In [6]:
df.dtypes.values

array([dtype('float64'), dtype('float64'), dtype('float64'),
       dtype('float64'), dtype('float64'), dtype('float64'),
       dtype('float64'), dtype('float64'), dtype('float64'),
       dtype('float64'), dtype('float64'), dtype('int64')], dtype=object)

In [7]:
df.pH.dtype

dtype('float64')

In [8]:
[df[col].dtype for col in df.columns]

[dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('float64'),
 dtype('int64')]

In [9]:
# 4.Update entity
@dataclass(frozen=True)
class DataValidationConfig:
  root_dir: Path
  unzip_data_dir: Path
  STATUS_FILE: str
  all_schema: dict

In [10]:
# 5. Update configuration manager in src config
from mlProject.constants import *
from mlProject.utils.common import read_yaml, create_directories

CONFIG_FILE_PATH, PARAMS_FILE_PATH, SCHEMA_FILE_PATH

(PosixPath('config/config.yaml'),
 PosixPath('params.yaml'),
 PosixPath('schema.yaml'))

In [11]:
class ConfigurationManager:
  def __init__(
      self,
      config_filepath = CONFIG_FILE_PATH,
      params_filepath = PARAMS_FILE_PATH,
      schema_filepath = SCHEMA_FILE_PATH):
    
      self.config = read_yaml(config_filepath)
      self.params = read_yaml(params_filepath)
      self.schema = read_yaml(schema_filepath)

      create_directories([self.config.artifacts_root])
  

  def get_data_validation_config(self) -> DataValidationConfig:
     config = self.config.data_validation
     schema = self.schema.COLUMNS

     create_directories([config.root_dir])

     data_validation_config = DataValidationConfig(
        root_dir=config.root_dir,
        STATUS_FILE=config.STATUS_FILE,
        unzip_data_dir=config.unzip_data_dir,
        all_schema=schema
     )
     
     return data_validation_config

In [12]:
# 6. Update the components
class DataValidation:
  def __init__(self, config: DataValidationConfig):
    self.config = config
  
  def validate_all_columns(self) -> bool:
    try:
      validation_status = None

      data = pd.read_csv(self.config.unzip_data_dir)
      all_columns = list(data.columns)
      all_dtypes = [data[col].dtype for col in all_columns]

      all_schema = self.config.all_schema.keys()

      # Validate if all columns are present with correct datatypes
      for col, dtype in zip(all_columns, all_dtypes):
        # set status to False if not validated schema and write to status file
        # else (true) write to status file
        if col not in all_schema or dtype != self.config.all_schema[col]:
          validation_status = False
          with open(self.config.STATUS_FILE, 'w') as f:
            f.write(f"Validation Status: {validation_status}")
        else:
          validation_status = True
          with open(self.config.STATUS_FILE, 'w') as f:
            f.write(f"Validation Status: {validation_status}")
      
      return validation_status
    
    except Exception as e:
      raise e

In [13]:
# 7. Update the pipeline
try:
  config = ConfigurationManager()
  data_validation  = config.get_data_validation_config()
  data_validation = DataValidation(config=data_validation)
  data_validation.validate_all_columns()
  
except Exception as e:
  raise e

Mar 20 11:01:43 common:65 INFO YAML file: config/config.yaml loaded successfully!
Mar 20 11:01:43 common:65 INFO YAML file: params.yaml loaded successfully!
Mar 20 11:01:43 common:65 INFO YAML file: schema.yaml loaded successfully!
Mar 20 11:01:43 common:29 INFO Directory created at: artifacts
Mar 20 11:01:43 common:29 INFO Directory created at: artifacts/data_validation
