In [2]:
import os

In [3]:
%pwd

'c:\\projects\\hr_attrition\\research'

In [4]:
os.chdir("../")

In [5]:
from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path
    data_path: Path

In [5]:
from hr_attrition.constants import *
from hr_attrition.utils.common import read_yaml, create_directories

In [6]:
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH,
        schema_filepath = SCHEMA_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)

        create_directories([self.config.artifacts_root])
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation

        create_directories([config.root_dir])

        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,
            data_path=config.data_path,
        )

        return data_transformation_config

In [7]:
import os
from hr_attrition.utils.logging import logger
from sklearn.model_selection import train_test_split
import pandas as pd

In [1]:
import os
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, PowerTransformer
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
from hr_attrition import logger
from hr_attrition.entity.config_entity import DataTransformationConfig


class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config

    def load_data(self):
        data = pd.read_csv(self.config.data_path)
        logger.info("Data loaded successfully.")
        return data

    def preprocess_data(self, df: pd.DataFrame) -> pd.DataFrame:
        # Map Attrition to binary: Yes -> 1, No -> 0
        df["Attrition"] = df["Attrition"].map({"Yes": 1, "No": 0})
        logger.info("Mapped target 'Attrition' to binary.")

        # Separate features and target
        target = df["Attrition"]
        X = df.drop(columns=["Attrition"])

        # Separate numeric and categorical columns
        num_cols = X.select_dtypes(include=['int64', 'float64']).columns.tolist()
        cat_cols = X.select_dtypes(include=['object']).columns.tolist()

        logger.info(f"Numerical columns: {num_cols}")
        logger.info(f"Categorical columns: {cat_cols}")

        # Pipelines
        numeric_pipeline = Pipeline(steps=[
            ("imputer", SimpleImputer(strategy="mean")),
            ("scaler", StandardScaler()),
            ("power_transform", PowerTransformer(method="yeo-johnson"))
        ])

        categorical_pipeline = Pipeline(steps=[
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("encoder", OneHotEncoder(drop="first", handle_unknown='ignore'))
        ])

        preprocessor = ColumnTransformer(transformers=[
            ("num", numeric_pipeline, num_cols),
            ("cat", categorical_pipeline, cat_cols)
        ])

        logger.info("Fitting and transforming the features.")
        X_processed = preprocessor.fit_transform(X)

        # Get column names
        cat_encoded_cols = preprocessor.named_transformers_["cat"]["encoder"].get_feature_names_out(cat_cols)
        feature_names = num_cols + list(cat_encoded_cols)

        X_processed = pd.DataFrame(X_processed, columns=feature_names)

        # Add target back
        df_final = pd.concat([X_processed, target.reset_index(drop=True)], axis=1)

        return df_final

    def train_test_spliting(self):
        df = self.load_data()
        df_processed = self.preprocess_data(df)

        train, test = train_test_split(
            df_processed,
            test_size=self.config.test_size,
            random_state=self.config.random_state,
            stratify=df_processed["Attrition"]
        )

        train.to_csv(os.path.join(self.config.root_dir, "train.csv"), index=False)
        test.to_csv(os.path.join(self.config.root_dir, "test.csv"), index=False)

        logger.info("Data preprocessed and split with stratification on binary target.")
        logger.info(f"Train shape: {train.shape}, Test shape: {test.shape}")

        print("Train shape:", train.shape)
        print("Test shape:", test.shape)


In [9]:
try:
    config = ConfigurationManager()
    data_transformation_config = config.get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config)
    data_transformation.train_test_spliting()
except Exception as e:
    raise e

[2025-04-14 19:32:11,591: INFO: common: yaml file: config\config.yaml loaded successfully]
[2025-04-14 19:32:11,594: INFO: common: yaml file: params.yaml loaded successfully]
[2025-04-14 19:32:11,599: INFO: common: yaml file: schema.yaml loaded successfully]
[2025-04-14 19:32:11,602: INFO: common: created directory at: artifacts]
[2025-04-14 19:32:11,604: INFO: common: created directory at: artifacts/data_transformation]
[2025-04-14 19:32:11,639: INFO: 2016861143: Splited data into training and test sets]
[2025-04-14 19:32:11,641: INFO: 2016861143: (1102, 35)]
[2025-04-14 19:32:11,644: INFO: 2016861143: (368, 35)]
(1102, 35)
