In [1]:
import pandas as pd
import numpy as np
import os

In [2]:
os.chdir("../")
%pwd

'/home/jatin/Projects/customer_churn_prediction'

In [9]:
# Data Transformation component

from dataclasses import dataclass
from pathlib import Path

@dataclass(frozen=True)
class DataTransformationConfig:
    """
    Storing configuration related to the data transformation.
    """
    root_dir: Path
    local_data_file: Path
    filtered_data_file: Path
    encoded_data_file: Path
    encoder_file: Path
    schema: dict
    target_column: dict
    params: dict

In [10]:
from customer_churn_prediction.constants import *
from customer_churn_prediction.utils.common import read_yaml, create_directory

In [11]:
read_yaml(SCHEMA_FILE_PATH).TARGET_COLUMN.items()

[2026-01-17 21:57:22,810]:INFO:common.py:Yaml file: schema.yaml is loaded successfully


dict_items([('name', 'Churn'), ('type', 'object')])

In [12]:
from customer_churn_prediction.constants import *
from customer_churn_prediction.utils.common import read_yaml, create_directory

class ConfigurationManager:
    def __init__(
            self,
            config_path=CONFIG_FILE_PATH,
            schema_path=SCHEMA_FILE_PATH,
            params_path=PARAMS_FILE_PATH
    ):
        
        self.config = read_yaml(config_path)
        self.schema = read_yaml(schema_path)
        self.params = read_yaml(params_path)

        create_directory([self.config.artifacts_root])

    def get_data_transformation_config(self)-> DataTransformationConfig:
        """
        Return data transformation config
        """
        config = self.config.data_transformation
        schema = self.schema.COLUMNS
        target_column = self.schema.TARGET_COLUMN
        params = self.params
        create_directory([config.root_dir])
        
        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,
            local_data_file=config.local_data_file,
            filtered_data_file=config.filtered_data_file,
            encoded_data_file=config.encoded_data_file,
            encoder_file=config.encoder_file,
            schema=schema,
            target_column=target_column,
            params=params
        )
        return data_transformation_config

In [13]:
import pickle

from imblearn.over_sampling import SMOTE
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder

from customer_churn_prediction import logger


class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config

    def filter_dataset(self):
        """
        Read the data from the raw stored file.
        Select only the relevant columns and store it in another file.
        """
        target_column = self.config.target_column.name
        relevant_columns = list(self.config.schema.keys())
        relevant_columns.append(target_column)
        data = pd.read_csv(self.config.local_data_file)
        final_data = data[relevant_columns]
        final_data.to_csv(self.config.filtered_data_file,index=False)
        logger.info(f"Selected only relevant columns based on the schema and stored in {self.config.filtered_data_file}")

    def categorical_column_encoder(self,encoding):
        """
        Encode the categorical features and target column.

        Params:
            encoding (str): The type of encoding want to apply on columns
        """
        try:
            data = pd.read_csv(self.config.filtered_data_file)
            columns = self.config.schema.items()
            categorical_features = list(filter(lambda col: col[1] in ['str','object'],columns))
            logger.info(f"categorical_features {categorical_features}")
            encoders = {}
            for column, d_type in categorical_features:
                if encoding == 'label_encoding':
                    label_encoder = LabelEncoder()
                    data[column] = label_encoder.fit_transform(data[column])
                    encoders[column] = label_encoder
                elif encoding == 'one_hot_encoding':
                    logger.info(f"Encoding based on One Hot Encoder is not implemented yet.")
                else:
                    logger.info(f"Only Label Encoding is implemented yet for encoding categorical variables")

            if self.config.target_column.type in  ['str','object']:
                column = self.config.target_column.name
                label_encoder = LabelEncoder()
                data[column] = label_encoder.fit_transform(data[column])
                encoders[column] = label_encoder
            data.to_csv(self.config.encoded_data_file,index=False)
            with open(self.config.encoder_file,'wb') as f:
                pickle.dump(encoders,f)
        except Exception:
            logger.exception(f"Exception occured while encoding the categorical variables")
            raise
        
    def train_test_splitting(self):
        """
        Split the data into training and test set.
        The test_size is read from the params.yaml file
        The random_state is read from the params.yaml file
        """
        data = pd.read_csv(self.config.encoded_data_file)
        train, test = train_test_split(
            data,
            test_size=self.config.params.test_size,
            random_state=self.config.params.random_state
        )
        train.to_csv(os.path.join(self.config.root_dir,"train.csv"),index=False)
        test.to_csv(os.path.join(self.config.root_dir,"test.csv"),index=False)
        logger.info("Splitted data into training and test set")
        logger.info(f"training data shape: {train.shape}")
        logger.info(f"test data shape: {test.shape}")

    def handle_inbalanced_data(self):
        smote_random_state = self.config.params.data_transformation.smote_random_state or 23
        train_data = pd.read_csv(os.path.join(self.config.root_dir,"train.csv"))
        x_train = train_data.drop(columns=[self.config.target_column.name])
        y_train = train_data[self.config.target_column.name]
        smote = SMOTE(random_state=smote_random_state)
        x_train_res, y_train_res = smote.fit_resample(x_train,y_train)
        train_resampled = pd.concat([x_train_res,y_train_res],axis=1)
        train_resampled.to_csv(os.path.join(self.config.root_dir,"train.csv"),index=False)
        logger.info("Applied SMOTE and saved resampled training data")

    def is_inbalanced(self, y, threshould=0.7):
        """
        Check for the class inbalance in the target column based on the threshould.

        Params:
            y (pandas series): target column series.
            thershould (float): Threshould to decide class inbalancy.
        """
        class_counts = y.value_counts(normalize=True)
        minimum_class_ratio = class_counts.min()
        return minimum_class_ratio < threshould
    
    def manage_inbalanced_data(self):
        """
        Check and handle for the inbalance class in the target column of training data.
        """
        train_data = pd.read_csv(os.path.join(self.config.root_dir,"train.csv"))
        y_train = train_data[self.config.target_column.name]
        if self.is_inbalanced(y_train, self.config.params.data_transformation.smote_threshold):
            self.handle_inbalanced_data()
        else:
            logger.info("Data is already balanced")

In [14]:
# Create the pipeline

from customer_churn_prediction import logger
try:
    config = ConfigurationManager()
    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(data_transformation_config)
    data_transformation.filter_dataset()
    data_transformation.categorical_column_encoder(encoding='label_encoding')
    data_transformation.train_test_splitting()
    data_transformation.manage_inbalanced_data()
except Exception:
    logger.exception(f"Exception occured while executing the data transformation pipeline")
    raise

[2026-01-17 21:57:26,077]:INFO:common.py:Yaml file: config/config.yaml is loaded successfully
[2026-01-17 21:57:26,082]:INFO:common.py:Yaml file: schema.yaml is loaded successfully
[2026-01-17 21:57:26,093]:INFO:common.py:Yaml file: params.yaml is loaded successfully
[2026-01-17 21:57:26,095]:INFO:common.py:Directory created at: artifacts
[2026-01-17 21:57:26,097]:INFO:common.py:Directory created at: artifacts/data_transformation
[2026-01-17 21:57:26,232]:INFO:3173689160.py:Selected only relevant columns based on the schema and stored in artifacts/data_transformation/customer_churn_data.csv
[2026-01-17 21:57:26,257]:INFO:3173689160.py:categorical_features [('gender', 'object'), ('Partner', 'object'), ('Dependents', 'object'), ('PhoneService', 'object'), ('MultipleLines', 'object'), ('InternetService', 'object'), ('OnlineSecurity', 'object'), ('OnlineBackup', 'object'), ('DeviceProtection', 'object'), ('TechSupport', 'object'), ('StreamingTV', 'object'), ('StreamingMovies', 'object'), (