In [1]:
import os

os.chdir("..")
os.getcwd()

'c:\\Users\\princ\\OneDrive\\Desktop\\project-python\\creditcard'

In [2]:
import os
from datetime import datetime


def get_current_time_stamp():
    return f"{datetime.now().strftime('%Y%m%d%H%M%S')}"


ROOT_DIR = os.getcwd()  # to get current working directory
CURRENT_TIME_STAMP = get_current_time_stamp()
# config constants
CONFIG_DIR = os.path.join(ROOT_DIR, "configs")
CONFIG_FILE_NAME = "config.yaml"
CONFIG_FILE_PATH = os.path.join(CONFIG_DIR, CONFIG_FILE_NAME)

In [3]:
from pathlib import Path

from pydantic import BaseModel, DirectoryPath, FilePath


class DataIngestionConfig(BaseModel):
    dataset_download_id: str
    raw_data_file_path: Path
    ingested_train_file_path: Path
    ingested_test_data_path: Path


class TrainingPipelineConfig(BaseModel):
    artifact_dir: DirectoryPath
    pipeline_name: str
    experiment_code : str


class DataValidationConfig(BaseModel):
    experiment_code: str
    data_validated_artifact_dir: DirectoryPath
    schema_file_path: FilePath
    report_file_dir: Path
    data_validated_test_collection: str
    data_validated_train_collection: str

In [4]:
from pathlib import Path

from pydantic import BaseModel, DirectoryPath, FilePath


class DataIngestionArtifact(BaseModel):
    train_file_path: FilePath
    test_file_path: FilePath


class DataValidationArtifact(BaseModel):
    schema_file_path : FilePath
    report_file_dir : DirectoryPath
    is_validated : bool

In [5]:
import os
import sys


import pandas as pd

from evidently.report import Report
from evidently.metric_preset import (
    DataDriftPreset,
    DataQualityPreset,
)
from evidently.metrics import *

from evidently.test_suite import TestSuite

from evidently.tests import *
from box import ConfigBox

# from CreditCard.entity import DataValidationConfig
# from CreditCard.entity import DataIngestionArtifact, DataValidationArtifact
from CreditCard.logging import logger
from CreditCard.Database import MongoDB
from CreditCard.exception import App_Exception
from CreditCard.utils import read_yaml


class DataValidation:
    def __init__(
        self,
        data_validation_config: DataValidationConfig,
        data_ingestion_artifact: DataIngestionArtifact,
    ):
        try:
            self.data_validation_config = data_validation_config
            self.data_ingestion_artifact = data_ingestion_artifact
            logger.info(f"{'>>' * 30}Data Validation log started.{'<<' * 30} \n\n")
        except Exception as e:
            raise App_Exception(e, sys) from e

    def get_train_and_test_df(self):
        try:
            train_df = pd.read_csv(self.data_ingestion_artifact.train_file_path)
            test_df = pd.read_csv(self.data_ingestion_artifact.test_file_path)
            train_df.rename(
                columns={"default.payment.next.month": "default"}, inplace=True
            )
            test_df.rename(
                columns={"default.payment.next.month": "default"}, inplace=True
            )
            return train_df, test_df
        except Exception as e:
            raise App_Exception(e, sys) from e

    def validate_dataset_schema(
        self,
    ) -> bool:
        try:
            logger.info("Validating dataset schema")
            validation_status = False
            schema_config = read_yaml(
                file_path=self.data_validation_config.schema_file_path
            )
            schema_dict = schema_config.columns
            self.train_df, self.test_df = self.get_train_and_test_df()

            for column, data_type in schema_dict.items():
                self.train_df[column].astype(data_type)
                self.test_df[column].astype(data_type)
            logger.info("Dataset schema validation completed")
            validation_status = True
            logger.info(f"Validation_status {validation_status}")
            return validation_status
        except Exception as e:
            raise App_Exception(e, sys) from e

    def get_current_last_data(self):
        current = self.train_df
        reference = self.train_connection.find_many_as_df()
        return reference, current

    def get_and_save_data_drift_report(self):
        try:
            tests = TestSuite(
                tests=[
                    TestNumberOfColumnsWithMissingValues(),
                    TestNumberOfRowsWithMissingValues(),
                    TestNumberOfConstantColumns(),
                    TestNumberOfDuplicatedRows(),
                    TestNumberOfDuplicatedColumns(),
                    TestColumnsType(),
                    TestNumberOfDriftedColumns(),
                ]
            )

            drift_status = True
            self.reference, current = self.get_current_last_data()

            tests.run(reference_data=self.reference, current_data=current)
            TEST_FILE_NAME = (
                f"test_report_{self.data_validation_config.experiment_code}.json"
            )
            tests.save_html(
                os.path.join(
                    self.data_validation_config.report_file_dir, TEST_FILE_NAME
                )
            )
            test_data = ConfigBox(tests.as_dict())
            report = Report(metrics=[DataDriftPreset(), DataQualityPreset()])
            report.run(reference_data=self.reference, current_data=current)
            PROFILE_FILE_NAME = TEST_FILE_NAME.replace("json", "html")
            report.save_html(
                os.path.join(
                    self.data_validation_config.report_file_dir, PROFILE_FILE_NAME
                )
            )
            report_data = ConfigBox(tests.as_dict())
            if not (
                report_data.metrics[0].result.dataset_drift
                and test_data.summary.all_passed
            ):
                drift_status = False
            return drift_status

        except Exception as e:
            raise App_Exception(e, sys) from e
        
    def check_data_to_insert( self,new_data : pd.DataFrame, reference_data : pd.DataFrame):
        data_insert_status = reference_data.equals(new_data)
        if not data_insert_status:
            index_to_insert = [ele for ele in range(len(new_data)) if new_data[ele , 0] not in reference_data.ID.to_list]
            data_to_insert = new_data.loc[index_to_insert]
        return data_insert_status , data_to_insert
            
        

    def initiate_data_validation(self) -> DataValidationArtifact:
        try:
            data_validation_config_info = self.data_validation_config
            test_collections = (
                data_validation_config_info.data_validated_test_collection
            )
            train_collections = (
                data_validation_config_info.data_validated_train_collection
            )
            self.test_connection = MongoDB(test_collections, drop_collection=False)
            self.train_connection = MongoDB(train_collections, drop_collection=False)

            validation_status = self.validate_dataset_schema()
            data_drift = self.get_and_save_data_drift_report()
            if data_drift:
                raise Exception("Data drift found")
            if validation_status and not (data_drift):
                logger.info("Data validation completed")
                train_insert_status, train_to_insert = self.check_data_to_insert(
                    self.train_df, self.reference
                )
                test_reference = self.test_connection.find_many_as_df()
                test_insert_status, test_to_insert = self.check_data_to_insert(
                    self.test_df, test_reference
                )
                if train_insert_status:
                    self.train_connection.Insert_Many(
                        train_to_insert.to_dict(orient="records")
                    )
                if test_insert_status:
                    self.test_connection.Insert_Many(
                        test_to_insert.to_dict(orient="records")
                    )
                validation_status = True
                logger.info("Data Validation successfully.")

            else:

                validation_status = False
                logger.info("Data Validation not successfully.")
                
            data_validation_artifact = DataValidationArtifact(
                    schema_file_path=self.data_validation_config.schema_file_path,
                    report_file_dir=self.data_validation_config.report_file_dir,
                    is_validated=validation_status,)

            return data_validation_artifact

        except Exception as e:
            raise App_Exception(e, sys) from e

    def __del__(self):
        logger.info(f"{'>>' * 30}Data Validation log completed.{'<<' * 30} \n\n")

In [15]:
import sys
import  os
import  json

# from CreditCard.entity import DataIngestionConfig ,  TrainingPipelineConfig
from CreditCard.exception import App_Exception
from CreditCard.logging import logger
from CreditCard.utils import read_yaml , create_directories
from pathlib import Path
from CreditCard.constants import CONFIG_FILE_PATH ,  CURRENT_TIME_STAMP , ROOT_DIR , CONFIG_DIR



class ConfigurationManager:

    def __init__(self,
                 config_file_path: Path = CONFIG_FILE_PATH) -> None:
        try:
            self.config_info = read_yaml(path_to_yaml=Path(config_file_path))
            self.pipeline_config = self.get_training_pipeline_config()
            self.time_stamp = CURRENT_TIME_STAMP

        except Exception as e:
            raise App_Exception(e, sys) from e

    def get_data_ingestion_config(self) -> DataIngestionConfig:
        
        try:
            data_ingestion_info = self.config_info.data_ingestion_config
            artifact_dir = self.pipeline_config.artifact_dir
            dataset_download_id = data_ingestion_info.dataset_download_id
            data_ingestion_dir_name = data_ingestion_info.ingestion_dir
            raw_data_dir = data_ingestion_info.raw_data_dir
            raw_file_name = data_ingestion_info.dataset_download_file_name
            data_ingestion_dir = os.path.join(artifact_dir, data_ingestion_dir_name)
            raw_data_file_path  = os.path.join(data_ingestion_dir, raw_data_dir, raw_file_name)
            ingested_dir_name = data_ingestion_info.ingested_dir
            ingested_dir_path = os.path.join(data_ingestion_dir,ingested_dir_name)
            
            ingested_train_file_path  = os.path.join(ingested_dir_path, data_ingestion_info.ingested_train_file)
            ingested_test_file_path = os.path.join(ingested_dir_path, data_ingestion_info.ingested_test_file)
            create_directories([os.path.dirname(raw_data_file_path), os.path.dirname(ingested_train_file_path)])
            
            data_ingestion_config = DataIngestionConfig(dataset_download_id = dataset_download_id , 
                                                        raw_data_file_path = raw_data_file_path , 
                                                        ingested_train_file_path = ingested_train_file_path , 
                                                        ingested_test_data_path  = ingested_test_file_path)
            
            return data_ingestion_config
        except Exception as e:
            raise App_Exception(e, sys) from e
    def get_training_pipeline_config(self) -> TrainingPipelineConfig:
        try:
            training_config = self.config_info.training_pipeline_config
            training_pipeline_name = training_config.pipeline_name
            training_experiment_code = training_config.experiment_code
            training_artifacts = os.path.join(ROOT_DIR, training_config.artifact_dir , training_experiment_code)
            create_directories(path_to_directories = [training_artifacts])
            training_pipeline_config =  TrainingPipelineConfig(artifact_dir=training_artifacts ,
                                                               experiment_code = training_experiment_code,
                                                               pipeline_name=training_pipeline_name)
            logger.info(f"Training pipeline config: {training_pipeline_config}")
            return training_pipeline_config
        except Exception as e:
            raise App_Exception(e, sys) from e
        
    def get_data_validation_config(self) -> DataValidationConfig:
        try:
            artifact_dir = self.pipeline_config.artifact_dir
            experiment_code = self.pipeline_config.experiment_code 
            data_validation_config_info = self.config_info.data_validation_config
            data_validated_artifact_dir = Path(os.path.join(artifact_dir,data_validation_config_info.data_validation_dir))
            schema_file_path = os.path.join(CONFIG_DIR, data_validation_config_info.schema_file_name)
            report_file_dir = os.path.join(data_validated_artifact_dir, data_validation_config_info.report_dir)
            
            create_directories([report_file_dir])

            data_validation_config = DataValidationConfig(experiment_code = experiment_code ,
                                                          data_validated_artifact_dir= data_validated_artifact_dir,
                                                          schema_file_path=schema_file_path,
                                                          report_file_dir = report_file_dir , 
                                                          data_validated_test_collection = data_validation_config_info.data_validated_test_collection_name,
                                                          data_validated_train_collection = data_validation_config_info.data_validated_train_collection_name)
            return data_validation_config
        except Exception as e:
            raise App_Exception(e, sys)

In [16]:
config = ConfigurationManager()

2023-03-17 12:52:53.406 | INFO     | CreditCard.utils.common:read_yaml:34 - yaml file: c:\Users\princ\OneDrive\Desktop\project-python\creditcard\configs\config.yaml loaded successfully
2023-03-17 12:52:53.408 | INFO     | CreditCard.utils.common:create_directories:53 - created directory at: c:\Users\princ\OneDrive\Desktop\project-python\creditcard\artifact\Base_model
2023-03-17 12:52:53.410 | INFO     | __main__:get_training_pipeline_config:62 - Training pipeline config: artifact_dir=WindowsPath('c:/Users/princ/OneDrive/Desktop/project-python/creditcard/artifact/Base_model') pipeline_name='CreditCard' experiment_code='Base_model'


In [17]:
config.get_data_validation_config()

2023-03-17 12:52:53.805 | INFO     | CreditCard.utils.common:create_directories:53 - created directory at: c:\Users\princ\OneDrive\Desktop\project-python\creditcard\artifact\Base_model\stage01_data_validation\report


DataValidationConfig(experiment_code='Base_model', data_validated_artifact_dir=WindowsPath('c:/Users/princ/OneDrive/Desktop/project-python/creditcard/artifact/Base_model/stage01_data_validation'), schema_file_path=WindowsPath('c:/Users/princ/OneDrive/Desktop/project-python/creditcard/configs/schema.yaml'), report_file_dir=WindowsPath('c:/Users/princ/OneDrive/Desktop/project-python/creditcard/artifact/Base_model/stage01_data_validation/report'), data_validated_test_collection='data_validated_test', data_validated_train_collection='data_validated_train')