-logging

In [15]:
import logging
import os

from from_root import from_root
from datetime import datetime

LOG_FILE = f"{datetime.now().strftime('%m_%d_%Y_%H_%M_%S')}.log"

log_dir = 'logs'

logs_path = os.path.join(log_dir, LOG_FILE)

os.makedirs(log_dir, exist_ok=True)


logging.basicConfig(
    filename=logs_path,
    format="[ %(asctime)s ] %(name)s - %(levelname)s - %(message)s",
    level=logging.DEBUG,
)

#Exception

In [16]:
import os
import sys

def error_message_detail(error, error_detail:sys):
    _, _, exc_tb = error_detail.exc_info()
    file_name = exc_tb.tb_frame.f_code.co_filename
    error_message = "Error occurred python script name [{0}] line number [{1}] error message [{2}]".format(
        file_name, exc_tb.tb_lineno, str(error)
    )

    return error_message

class Credit_card_Exception(Exception):
    def __init__(self, error_message, error_detail):
        """
        :param error_message: error message in string format
        """
        super().__init__(error_message)
        self.error_message = error_message_detail(
            error_message, error_detail=error_detail
        )

    def __str__(self):
        return self.error_message

# Utility

In [17]:
import os
import sys

import numpy as np
import dill
import yaml
from pandas import DataFrame

from Demo_project.exception import Credit_card_Exception
from Demo_project.logger import logging


def read_yaml_file(file_path: str) -> dict:
    try:
        with open(file_path, "rb") as yaml_file:
            return yaml.safe_load(yaml_file)

    except Exception as e:
        raise Credit_card_Exception(e, sys) from e


def write_yaml_file(file_path: str, content: object, replace: bool = False) -> None:
    try:
        if replace:
            if os.path.exists(file_path):
                os.remove(file_path)
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, "w") as file:
            yaml.dump(content, file)
    except Exception as e:
        raise Credit_card_Exception(e, sys) from e


def load_object(file_path: str) -> object:
    logging.info("Entered the load_object method of utils")

    try:

        with open(file_path, "rb") as file_obj:
            obj = dill.load(file_obj)

        logging.info("Exited the load_object method of utils")

        return obj

    except Exception as e:
        raise Credit_card_Exception(e, sys) from e

def save_numpy_array_data(file_path: str, array: np.array):
    """
    Save numpy array data to file
    file_path: str location of file to save
    array: np.array data to save
    """
    try:
        dir_path = os.path.dirname(file_path)
        os.makedirs(dir_path, exist_ok=True)
        with open(file_path, 'wb') as file_obj:
            np.save(file_obj, array)
    except Exception as e:
        raise Credit_card_Exception(e, sys) from e


def load_numpy_array_data(file_path: str) -> np.array:
    """
    load numpy array data from file
    file_path: str location of file to load
    return: np.array data loaded
    """
    try:
        with open(file_path, 'rb') as file_obj:
            return np.load(file_obj)
    except Exception as e:
        raise Credit_card_Exception(e, sys) from e


def save_object(file_path: str, obj: object) -> None:
    logging.info("Entered the save_object method of utils")

    try:
        os.makedirs(os.path.dirname(file_path), exist_ok=True)
        with open(file_path, "wb") as file_obj:
            dill.dump(obj, file_obj)

        logging.info("Exited the save_object method of utils")

    except Exception as e:
        raise Credit_card_Exception(e, sys) from e


def drop_columns(df: DataFrame, cols: list)-> DataFrame:

    """
    drop the columns form a pandas DataFrame
    df: pandas DataFrame
    cols: list of columns to be dropped
    """
    logging.info("Entered drop_columns methon of utils")

    try:
        df = df.drop(columns=cols, axis=1)

        logging.info("Exited the drop_columns method of utils")
        
        return df
    except Exception as e:
        raise Credit_card_Exception(e, sys) from e

# Data_Ingestion 

1.constant_file

In [None]:
# code part  writing in  constant file
import os
from datetime import date

from dotenv import load_dotenv
load_dotenv()


DATABASE_NAME="demo_project_DB"

COLLECTION_NAME="fraud_data"

MONGODB_URL_KEY="MONGODB_URL"

PIPELINE_NAME:str ="Demo_project"  # NOT A SRC its pipeline name

ARTIFICAT_DIR:str="artificat"

MODEL_FILE_NAME="model.pkl"

TARGET_COLUMN="default payment next month"

PREPROCESSING_OBJECT_FILE_NAME="preprocessing.pkl"


FILE_NAME:str="default of credit card data.xls"

TRAIN_FILE_NAME:str="train.xls"

TEST_FILE_NAME:str="test.xls"


"""
Data Ingestion related constant start with DATA_INGESTION VAR NAME
"""
DATA_INGESTION_COLLECTION_NAME: str = "fraud_data"
DATA_INGESTION_DIR_NAME: str = "data_ingestion"
DATA_INGESTION_FEATURE_STORE_DIR: str = "feature_store"
DATA_INGESTION_INGESTED_DIR: str = "ingested"
DATA_INGESTION_TRAIN_TEST_SPLIT_RATIO: float = 0.2

# Creating MongoDBClient

In [19]:
# code in configuration mongo_db_connection.py

import sys
import os
import pymongo
import certifi
from Demo_project.exception import Credit_card_Exception
from Demo_project.logger import logging
from Demo_project.constants import DATABASE_NAME, MONGODB_URL_KEY

ca = certifi.where()

class MongoDBClient:
    """
    Class Name :   export_data_into_feature_store
    Description :   This method exports the dataframe from mongodb feature store as dataframe 
    
    Output      :   connection to mongodb database
    On Failure  :   raises an exception
    """
    client = None

    def __init__(self, database_name=DATABASE_NAME) -> None:
        try:
            if MongoDBClient.client is None:
                mongo_db_url = os.getenv(MONGODB_URL_KEY)
                if mongo_db_url is None:
                    raise Exception(f"Environment key: {MONGODB_URL_KEY} not set.")
                MongoDBClient.client = pymongo.MongoClient(mongo_db_url,tlsCAFile=ca)
            self.client= MongoDBClient.client
            self.database = self.client[database_name]
            self.database_name = database_name
            logging.info("Connected to MongoDB database successfull")
        except Exception as e:
            raise Credit_card_Exception(e,sys) 

In [20]:
from Demo_project.configuration.mongo_db_connection import MongoDBClient

def main():
    try:
        # Initialize MongoDB client
        mongo_client = MongoDBClient()  # Default uses DATABASE_NAME

        # Print connection details for confirmation
        print(f"Connected to MongoDB database: {mongo_client.databse_name}")
        print(f"Client: {mongo_client.client}")
        
        # Accessing a specific collection (example)
        collection_name = "fraud_data"  # Replace with your collection name
        collection = mongo_client.databse[collection_name]
        print(f"Connected to collection: {collection_name}")

        # Fetching and displaying documents from the collection
        documents = collection.find()
        print("Documents in the collection:")
        for doc in documents:
            print(doc)

    except Exception as e:
        print(f"Error occurred: {str(e)}")

if __name__ == "__main__":
    main()

Error occurred: Error occurred python script name [c:\demo\demo_project\Demo_project\configuration\mongo_db_connection.py] line number[28] error message [Environment key: MONGODB_URL not set.]


# Data_Transformation

In [None]:
import sys
import numpy as np
import pandas as pd
from imblearn.combine import SMOTEENN
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler, OneHotEncoder, OrdinalEncoder, PowerTransformer
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from statsmodels.stats.outliers_influence import variance_inflation_factor
from us_visa.constants import TARGET_COLUMN, SCHEMA_FILE_PATH, CURRENT_YEAR
from us_visa.entity.config_entity import DataTransformationConfig
from us_visa.entity.artifact_entity import DataTransformationArtifact, DataIngestionArtifact, DataValidationArtifact
from us_visa.exception import USvisaException
from us_visa.logger import logging
from us_visa.utils.main_utils import save_object, save_numpy_array_data, read_yaml_file, drop_columns
from us_visa.entity.estimator import TargetValueMapping

class DataTransformation:
    def __init__(self, data_ingestion_artifact: DataIngestionArtifact,
                 data_transformation_config: DataTransformationConfig,
                 data_validation_artifact: DataValidationArtifact):
        """
        :param data_ingestion_artifact: Output reference of data ingestion artifact stage
        :param data_transformation_config: Configuration for data transformation
        """
        try:
            self.data_ingestion_artifact = data_ingestion_artifact
            self.data_transformation_config = data_transformation_config
            self.data_validation_artifact = data_validation_artifact
            self._schema_config = read_yaml_file(file_path=SCHEMA_FILE_PATH)
        except Exception as e:
            raise USvisaException(e, sys)

    @staticmethod
    def read_data(file_path) -> pd.DataFrame:
        try:
            return pd.read_csv(file_path)
        except Exception as e:
            raise USvisaException(e, sys)

    @staticmethod
    def replace_to_zero(df, columns):
        """
        Replace invalid values (-2, -1, 0) in specified columns with 0.
        """
        try:
            for col in columns:
                fil = (df[col] == -2) | (df[col] == -1) | (df[col] == 0)
                df.loc[fil, col] = 0
            return df
        except Exception as e:
            raise USvisaException(e, sys)

    @staticmethod
    def remove_outliers(df, columns):
        """
        Remove outliers using IQR method for specified columns.
        """
        try:
            for col in columns:
                Q1 = df[col].quantile(0.25)
                Q3 = df[col].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR
                df = df[(df[col] >= lower_bound) & (df[col] <= upper_bound)]
            return df
        except Exception as e:
            raise USvisaException(e, sys)

    @staticmethod
    def calculate_vif(df, numerical_columns):
        """
        Calculate Variance Inflation Factor (VIF) for numerical columns.
        """
        try:
            X = df[numerical_columns]
            vif_data = pd.DataFrame()
            vif_data["Feature"] = X.columns
            vif_data["VIF"] = [variance_inflation_factor(X.values, i) for i in range(X.shape[1])]
            return vif_data
        except Exception as e:
            raise USvisaException(e, sys)

    def get_data_transformer_object(self):
        """
        Method to create and return a data transformer object for the data.
        """
        logging.info("Entered get_data_transformer_object method of DataTransformation class")

        try:
            # Pipeline for numerical columns
            numeric_transformer = Pipeline(steps=[
                ("imputer", SimpleImputer(strategy="median")),  # Impute missing values for numerical columns
                ("scaler", StandardScaler())  # Scaling numerical features
            ])

            # Pipeline for categorical columns
            oh_transformer = Pipeline(steps=[
                ("imputer", SimpleImputer(strategy="most_frequent")),  # Impute missing values for categorical columns
                ("one_hot_encoder", OneHotEncoder()),  # One-hot encoding for categorical features
                ("scaler", StandardScaler(with_mean=False))  # Scaling categorical features
            ])
            
            # Ordinal encoding for specified columns
            ordinal_encoder = OrdinalEncoder()

            # Get columns from schema configuration
            oh_columns = self._schema_config['oh_columns']
            or_columns = self._schema_config['or_columns']
            transform_columns = self._schema_config['transform_columns']
            num_features = self._schema_config['num_features']

            # Power Transformer for feature transformation
            transform_pipe = Pipeline(steps=[('transformer', PowerTransformer(method='yeo-johnson'))])

            # Combining all transformations using ColumnTransformer
            preprocessor = ColumnTransformer([
                ("OneHotEncoder", oh_transformer, oh_columns),
                ("Ordinal_Encoder", ordinal_encoder, or_columns),
                ("Transformer", transform_pipe, transform_columns),
                ("StandardScaler", numeric_transformer, num_features)
            ])

            logging.info("Created preprocessor object from ColumnTransformer")
            return preprocessor
        except Exception as e:
            raise USvisaException(e, sys)

    def initiate_data_transformation(self) -> DataTransformationArtifact:
        """
        Method to initiate the data transformation component for the pipeline.
        """
        try:
            if self.data_validation_artifact.validation_status:
                logging.info("Starting data transformation")

                # Read and clean data
                train_df = DataTransformation.read_data(file_path=self.data_ingestion_artifact.trained_file_path)
                test_df = DataTransformation.read_data(file_path=self.data_ingestion_artifact.test_file_path)

                # Replace invalid values in specified columns
                target_columns = self._schema_config['replace_invalid_columns']
                train_df = self.replace_to_zero(train_df, target_columns)
                test_df = self.replace_to_zero(test_df, target_columns)

                # Remove duplicates from both train and test data
                train_df = train_df.drop_duplicates()
                test_df = test_df.drop_duplicates()

                # Remove outliers from numerical columns
                numerical_columns = self._schema_config['num_features']
                train_df = self.remove_outliers(train_df, numerical_columns)
                test_df = self.remove_outliers(test_df, numerical_columns)

                # VIF Analysis for Multicollinearity Check
                vif_data = self.calculate_vif(train_df, numerical_columns)
                vif_threshold = 5.0
                selected_features = vif_data[vif_data["VIF"] <= vif_threshold]["Feature"].tolist()

                # Retaining only selected features
                train_df = train_df[selected_features + [TARGET_COLUMN]]
                test_df = test_df[selected_features + [TARGET_COLUMN]]

                # Getting preprocessor object
                preprocessor = self.get_data_transformer_object()

                # Separating features and target
                input_feature_train_df = train_df.drop(columns=[TARGET_COLUMN], axis=1)
                target_feature_train_df = train_df[TARGET_COLUMN]

                input_feature_test_df = test_df.drop(columns=[TARGET_COLUMN], axis=1)
                target_feature_test_df = test_df[TARGET_COLUMN]

                # Apply transformations to the train and test data
                input_feature_train_arr = preprocessor.fit_transform(input_feature_train_df)
                input_feature_test_arr = preprocessor.transform(input_feature_test_df)

                # Apply SMOTEENN to handle class imbalance
                smt = SMOTEENN(sampling_strategy="minority")
                input_feature_train_final, target_feature_train_final = smt.fit_resample(
                    input_feature_train_arr, target_feature_train_df
                )
                input_feature_test_final, target_feature_test_final = smt.fit_resample(
                    input_feature_test_arr, target_feature_test_df
                )

                # Create final train and test arrays
                train_arr = np.c_[input_feature_train_final, np.array(target_feature_train_final)]
                test_arr = np.c_[input_feature_test_final, np.array(target_feature_test_final)]

                # Save preprocessed data and objects
                save_object(self.data_transformation_config.transformed_object_file_path, preprocessor)
                save_numpy_array_data(self.data_transformation_config.transformed_train_file_path, array=train_arr)
                save_numpy_array_data(self.data_transformation_config.transformed_test_file_path, array=test_arr)

                # Return the transformation artifact
                return DataTransformationArtifact(
                    transformed_object_file_path=self.data_transformation_config.transformed_object_file_path,
                    transformed_train_file_path=self.data_transformation_config.transformed_train_file_path,
                    transformed_test_file_path=self.data_transformation_config.transformed_test_file_path
                )
            else:
                raise Exception(self.data_validation_artifact.message)

        except Exception as e:
            raise USvisaException(e, sys)


- DataValidation.py:

In [None]:
import json
import sys
import pandas as pd
from evidently.model_profile import Profile
from evidently.model_profile.sections import DataDriftProfileSection
from pandas import DataFrame

from us_visa.exception import USvisaException
from us_visa.logger import logging
from us_visa.utils.main_utils import read_yaml_file, write_yaml_file
from us_visa.entity.artifact_entity import DataIngestionArtifact, DataValidationArtifact
from us_visa.entity.config_entity import DataValidationConfig
from us_visa.constants import SCHEMA_FILE_PATH

class DataValidation:
    def __init__(self, data_ingestion_artifact: DataIngestionArtifact, data_validation_config: DataValidationConfig):
        """
        :param data_ingestion_artifact: Output reference of data ingestion artifact stage
        :param data_validation_config: configuration for data validation
        """
        try:
            self.data_ingestion_artifact = data_ingestion_artifact
            self.data_validation_config = data_validation_config
            self._schema_config = read_yaml_file(file_path=SCHEMA_FILE_PATH)
        except Exception as e:
            raise USvisaException(e, sys)

    def validate_number_of_columns(self, dataframe: DataFrame) -> bool:
        """
        Method Name :   validate_number_of_columns
        Description :   This method validates the number of columns
        
        Output      :   Returns bool value based on validation results
        On Failure  :   Write an exception log and then raise an exception
        """
        try:
            status = len(dataframe.columns) == len(self._schema_config["columns"])
            logging.info(f"Is required column present: [{status}]")
            return status
        except Exception as e:
            raise USvisaException(e, sys)

    def is_column_exist(self, df: DataFrame) -> bool:
        """
        Method Name :   is_column_exist
        Description :   This method validates the existence of a numerical and categorical columns
        
        Output      :   Returns bool value based on validation results
        On Failure  :   Write an exception log and then raise an exception
        """
        try:
            dataframe_columns = df.columns
            missing_numerical_columns = []
            missing_categorical_columns = []
            for column in self._schema_config["numerical_columns"]:
                if column not in dataframe_columns:
                    missing_numerical_columns.append(column)

            if len(missing_numerical_columns) > 0:
                logging.info(f"Missing numerical column: {missing_numerical_columns}")

            for column in self._schema_config["categorical_columns"]:
                if column not in dataframe_columns:
                    missing_categorical_columns.append(column)

            if len(missing_categorical_columns) > 0:
                logging.info(f"Missing categorical column: {missing_categorical_columns}")

            return False if len(missing_categorical_columns) > 0 or len(missing_numerical_columns) > 0 else True
        except Exception as e:
            raise USvisaException(e, sys) from e

    @staticmethod
    def read_data(file_path) -> DataFrame:
        try:
            return pd.read_csv(file_path)
        except Exception as e:
            raise USvisaException(e, sys)

    def detect_dataset_drift(self, reference_df: DataFrame, current_df: DataFrame) -> bool:
        """
        Method Name :   detect_dataset_drift
        Description :   This method validates if drift is detected
        
        Output      :   Returns bool value based on validation results
        On Failure  :   Write an exception log and then raise an exception
        """
        try:
            data_drift_profile = Profile(sections=[DataDriftProfileSection()])
            data_drift_profile.calculate(reference_df, current_df)

            report = data_drift_profile.json()
            json_report = json.loads(report)

            write_yaml_file(file_path=self.data_validation_config.drift_report_file_path, content=json_report)

            n_features = json_report["data_drift"]["data"]["metrics"]["n_features"]
            n_drifted_features = json_report["data_drift"]["data"]["metrics"]["n_drifted_features"]

            logging.info(f"{n_drifted_features}/{n_features} drift detected.")
            drift_status = json_report["data_drift"]["data"]["metrics"]["dataset_drift"]
            return drift_status
        except Exception as e:
            raise USvisaException(e, sys) from e

    def initiate_data_validation(self) -> DataValidationArtifact:
        """
        Method Name :   initiate_data_validation
        Description :   This method initiates the data validation component for the pipeline
        
        Output      :   Returns bool value based on validation results
        On Failure  :   Write an exception log and then raise an exception
        """
        try:
            validation_error_msg = ""
            logging.info("Starting data validation")
            train_df, test_df = (DataValidation.read_data(file_path=self.data_ingestion_artifact.trained_file_path),
                                 DataValidation.read_data(file_path=self.data_ingestion_artifact.test_file_path))

            # Validate the number of columns
            status = self.validate_number_of_columns(dataframe=train_df)
            logging.info(f"All required columns present in training dataframe: {status}")
            if not status:
                validation_error_msg += f"Columns are missing in training dataframe."
            status = self.validate_number_of_columns(dataframe=test_df)
            logging.info(f"All required columns present in testing dataframe: {status}")
            if not status:
                validation_error_msg += f"Columns are missing in test dataframe."

            # Validate if all necessary columns exist
            status = self.is_column_exist(df=train_df)
            if not status:
                validation_error_msg += f"Columns are missing in training dataframe."
            status = self.is_column_exist(df=test_df)
            if not status:
                validation_error_msg += f"columns are missing in test dataframe."

            # Check for duplicates
            if train_df.duplicated().any():
                logging.info("Duplicate rows found in training data")
                validation_error_msg += f"Duplicate rows found in training data."
            if test_df.duplicated().any():
                logging.info("Duplicate rows found in test data")
                validation_error_msg += f"Duplicate rows found in test data."

            # Check for missing values
            if train_df.isnull().sum().any():
                logging.info(f"Missing values in training data: {train_df.isnull().sum()}")
                validation_error_msg += f"Missing values found in training data."
            if test_df.isnull().sum().any():
                logging.info(f"Missing values in test data: {test_df.isnull().sum()}")
                validation_error_msg += f"Missing values found in test data."

            # Check data types
            for column in self._schema_config["columns"]:
                if column in train_df.columns and train_df[column].dtype != self._schema_config["columns"][column]:
                    logging.info(f"Data type mismatch for column {column} in training data")
                    validation_error_msg += f"Data type mismatch for column {column} in training data."
                if column in test_df.columns and test_df[column].dtype != self._schema_config["columns"][column]:
                    logging.info(f"Data type mismatch for column {column} in test data")
                    validation_error_msg += f"Data type mismatch for column {column} in test data."

            # Rename columns according to schema (if needed)
            column_rename_mapping = self._schema_config.get('column_rename', {})
            if column_rename_mapping:
                train_df.rename(columns=column_rename_mapping, inplace=True)
                test_df.rename(columns=column_rename_mapping, inplace=True)
                logging.info(f"Renamed columns in train and test data: {column_rename_mapping}")

            validation_status = len(validation_error_msg) == 0

            # If validation passes, check for drift
            if validation_status:
                drift_status = self.detect_dataset_drift(train_df, test_df)
                if drift_status:
                    logging.info(f"Drift detected.")
                    validation_error_msg = "Drift detected"
                else:
                    validation_error_msg = "Drift not detected"
            else:
                logging.info(f"Validation_error: {validation_error_msg}")

            # Create the DataValidationArtifact
            data_validation_artifact = DataValidationArtifact(
                validation_status=validation_status,
                message=validation_error_msg,
                drift_report_file_path=self.data_validation_config.drift_report_file_path
            )

            logging.info(f"Data validation artifact: {data_validation_artifact}")
            return data_validation_artifact
        except Exception as e:
            raise USvisaException(e, sys) from e
