In [1]:
import os

In [None]:
%pwd


In [None]:
os.chdir('../')

In [None]:
%pwd

In [7]:
from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    data_path: Path

In [8]:
from src.inproject.constants import *
from src.inproject.utils.common import read_yaml, create_directories

In [18]:
class ConfigurationManager:
    def __init__(self, 
                 config_pathway=CONFIG_FILE_PATH, 
                 param_pathway=PARAM_FILE_PATH, 
                 schema_pathway=SCHEMA_FILE_PATH):
        
        self.config = read_yaml(config_pathway)
        self.param = read_yaml(param_pathway)
        self.schema = read_yaml(schema_pathway)

        create_directories([self.config.artifacts_root])

    def get_data_transformation_config(self)->DataTransformationConfig:
        config = self.config.data_transformation

        create_directories([config.root_dir])

        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,
            data_path=config.data_path
        )
        return data_transformation_config

    

In [33]:
from inproject.logging import logger
from sklearn.model_selection import train_test_split
import pandas as pd
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, LabelEncoder

In [44]:
# import pandas as pd
# from sklearn.model_selection import train_test_split
# from sklearn.impute import SimpleImputer
# from sklearn.preprocessing import LabelEncoder, StandardScaler
# import os
# import logging

# logger = logging.getLogger(__name__)

class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config

    def remove_outlier(self):
        data = pd.read_csv(self.config.data_path)
        data = data.drop(columns='Policy Start Date')
        train, test = train_test_split(data, test_size=0.20, random_state=42)

        cat_col = data.select_dtypes(include=['object']).columns.tolist()
        num_col = [col for col in train.select_dtypes(exclude=['object']).columns if col != 'target']  # Exclude target
        num_col = num_col[1:]
        print(f"Numerical Columns: {num_col}")
        print(f"Categorical Columns: {cat_col}")

        for col in num_col:
            Q1 = train[col].quantile(0.25)
            Q3 = train[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            train = train[(train[col] >= lower_bound) & (train[col] <= upper_bound)]

        print(f"Train shape after outlier removal: {train.shape}")
        print(f"Test shape after outlier removal: {test.shape}")

        return train, test, cat_col, num_col
    
    def replace_missing_values(self, train, test, cat_col, num_col):
        num_imputer = SimpleImputer(strategy='mean')
        cat_imputer = SimpleImputer(strategy='most_frequent')

        train[num_col] = num_imputer.fit_transform(train[num_col])
        test[num_col] = num_imputer.transform(test[num_col])

        train[cat_col] = cat_imputer.fit_transform(train[cat_col])
        test[cat_col] = cat_imputer.transform(test[cat_col])

        print(f"Train shape after missing value imputation: {train.shape}")
        print(f"Test shape after missing value imputation: {test.shape}")

        return train, test

    def label_encoding_categorical(self, train, test, cat_col):
        label_encoder = LabelEncoder()

        # Iterate over each categorical column and apply LabelEncoder
        for col in cat_col:
            train[col] = label_encoder.fit_transform(train[col])
            test[col] = label_encoder.transform(test[col])

        return train, test
    
    def scaling(self, train, test, num_col):
        scaler = StandardScaler()
        train[num_col] = scaler.fit_transform(train[num_col])
        test[num_col] = scaler.transform(test[num_col])

        # Save the scaled train and test sets
        train.to_csv(os.path.join(self.config.root_dir, 'train.csv'), index=False)
        test.to_csv(os.path.join(self.config.root_dir, 'test.csv'), index=False)

        # Logging
        logger.info("Split into training and test sets, with scaling applied.")
        logger.info(f"Train shape: {train.shape}")
        logger.info(f"Test shape: {test.shape}")

        print(f"Train shape after scaling: {train.shape}")
        print(f"Test shape after scaling: {test.shape}")


In [None]:
#pipeline

try:
    config = ConfigurationManager()
    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config)
    train, test, cat_col, num_col = data_transformation.remove_outlier()
    train, test = data_transformation.replace_missing_values(train, test, cat_col, num_col)
    train, test = data_transformation.label_encoding_categorical(train, test, cat_col)
    data_transformation.scaling(train, test, num_col)

except Exception as e:
    raise e