In [1]:
from src.utils import DatabaseManager

db=DatabaseManager()
df=db.execute_query('select * from marketing_campaign',fetch=True)

In [2]:
df.head()

Unnamed: 0,id,year_birth,education,marital_status,income,kidhome,teenhome,dt_customer,recency,mntwines,...,numwebvisitsmonth,acceptedcmp3,acceptedcmp4,acceptedcmp5,acceptedcmp1,acceptedcmp2,complain,z_costcontact,z_revenue,response
0,5524,1957,Graduation,Single,58138.0,0,0,2012-09-04,58,635,...,7,0,0,0,0,0,0,3,11,1
1,2174,1954,Graduation,Single,46344.0,1,1,2014-03-08,38,11,...,5,0,0,0,0,0,0,3,11,0
2,4141,1965,Graduation,Together,71613.0,0,0,2013-08-21,26,426,...,4,0,0,0,0,0,0,3,11,0
3,6182,1984,Graduation,Together,26646.0,1,0,2014-02-10,26,11,...,6,0,0,0,0,0,0,3,11,0
4,5324,1981,PhD,Married,58293.0,1,0,2014-01-19,94,173,...,5,0,0,0,0,0,0,3,11,0


In [3]:
cat_col=df.columns[df.dtypes=='object']
df.columns[df.dtypes!='object']

Index(['id', 'year_birth', 'income', 'kidhome', 'teenhome', 'recency',
       'mntwines', 'mntfruits', 'mntmeatproducts', 'mntfishproducts',
       'mntsweetproducts', 'mntgoldprods', 'numdealspurchases',
       'numwebpurchases', 'numcatalogpurchases', 'numstorepurchases',
       'numwebvisitsmonth', 'acceptedcmp3', 'acceptedcmp4', 'acceptedcmp5',
       'acceptedcmp1', 'acceptedcmp2', 'complain', 'z_costcontact',
       'z_revenue', 'response'],
      dtype='object')

In [5]:
import os
from dataclasses import dataclass
import pandas as pd
import numpy as np
from typing import List
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.decomposition import PCA
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import FunctionTransformer
from src.exception import CustomException
from src.logger import logging
from src.utils import DatabaseManager, save_object
from datetime import datetime
import sys

# Define the list of categorical columns
object_cols = ['education', 'living_with', 'is_parent']

@dataclass
class DataTransformationConfig:
    preprocessor_obj_file_path: str = os.path.join('artifacts', 'preprocessor.pkl')

class DataTransformation:
    def __init__(self):
        self.data_transformation_config = DataTransformationConfig()
        self.db = DatabaseManager()
        self.LE = LabelEncoder()  # Instantiate LabelEncoder

    def calculate_customer_for(self, df):
        newest_customer_date = df['dt_customer'].max()
        df['Customer_For'] = (newest_customer_date - df['dt_customer']).dt.days
        return df

    def calculate_age(self, df):
        current_year = datetime.now().year
        df['Age'] = current_year - df['year_birth']
        return df

    def calculate_spent(self, df):
        df['Spent'] = df[['mntwines', 'mntfruits', 'mntmeatproducts', 'mntfishproducts', 'mntsweetproducts', 'mntgoldprods']].sum(axis=1)
        return df

    def preprocess_living_with(self, df):
        df['living_with'] = df['marital_status'].replace({"Married": "Partner", "Together": "Partner", "Absurd": "Alone", "Widow": "Alone", "YOLO": "Alone", "Divorced": "Alone", "Single": "Alone"})
        return df

    def calculate_family_size(self, df):
        df['Family_Size'] = df['living_with'].replace({"Alone": 1, "Partner": 2})
        return df

    def calculate_is_parent(self, df):
        df['Is_Parent'] = np.where(df['children'] > 0, 1, 0)
        return df

    def preprocess_education(self, df):
        df['education'] = df['education'].replace({"Basic": "Undergraduate", "2n Cycle": "Undergraduate", "Graduation": "Graduate", "Master": "Postgraduate", "PhD": "Postgraduate"})
        return df

    def drop_columns(self, df):
        to_drop = ["marital_status", "dt_customer", "z_costcontact", "z_revenue", "year_birth", "id"]
        df.drop(to_drop, axis=1, inplace=True)
        return df

    def transform_categorical(self, df):
        # Use the predefined object_cols list for transformation
        s = (df.dtypes == 'object')
        object_cols = list(s[s].index)
        for col in object_cols:
            df[col] = self.LE.fit_transform(df[col])
        return df

    def get_data_preprocessing_pipeline(self) -> Pipeline:
        try:
            # Create a function transformer for each preprocessing step
            customer_for_transformer = FunctionTransformer(self.calculate_customer_for)
            age_transformer = FunctionTransformer(self.calculate_age)
            spent_transformer = FunctionTransformer(self.calculate_spent)
            living_with_transformer = FunctionTransformer(self.preprocess_living_with)
            family_size_transformer = FunctionTransformer(self.calculate_family_size)
            is_parent_transformer = FunctionTransformer(self.calculate_is_parent)
            education_transformer = FunctionTransformer(self.preprocess_education)
            drop_columns_transformer = FunctionTransformer(self.drop_columns)
            transform_categorical_transformer = FunctionTransformer(self.transform_categorical)

            # Create the data preprocessing pipeline
            data_preprocessing_pipeline = Pipeline([
                ('customer_for', customer_for_transformer),
                ('age', age_transformer),
                ('spent', spent_transformer),
                ('living_with', living_with_transformer),
                ('family_size', family_size_transformer),
                ('is_parent', is_parent_transformer),
                ('education', education_transformer),
                ('drop_columns', drop_columns_transformer),
                ('transform_categorical', transform_categorical_transformer),
                ('imputer_num', SimpleImputer(strategy='median')),  # Imputer for numerical columns
                ('imputer_cat', SimpleImputer(strategy='most_frequent')),  # Imputer for categorical columns
                ('scaler', StandardScaler()),  # StandardScaler for both types of columns
            ])

            return data_preprocessing_pipeline

        except Exception as e:
            logging.info("Error in Data Preprocessing Pipeline")
            raise CustomException(e,sys)

    def get_full_pipeline(self, numerical_cols, categorical_cols):
        try:
            # Get the data preprocessing pipeline object
            data_preprocessing_pipeline = self.get_data_preprocessing_pipeline()

            # Create the combined pipeline
            full_pipeline = Pipeline([
                ('data_preprocessing', data_preprocessing_pipeline),
                ('pca', PCA(n_components=3))  # PCA for numerical columns
            ])

            return full_pipeline

        except Exception as e:
            logging.info("Error in Full Pipeline")
            raise CustomException(e,sys)

    def initiate_data_transformation(self, filename):
        try:
            logging.info(f'Reading database table {filename} initiated')

            df = self.db.execute_query(f'select * from {filename}', fetch=True)

            # Define feature columns
            categorical_cols =['education', 'marital_status', 'dt_customer']
            numerical_cols = ['id', 'year_birth', 'income', 'kidhome', 'teenhome', 'recency',
            'mntwines', 'mntfruits', 'mntmeatproducts', 'mntfishproducts',
            'mntsweetproducts', 'mntgoldprods', 'numdealspurchases',
            'numwebpurchases', 'numcatalogpurchases', 'numstorepurchases',
            'numwebvisitsmonth', 'acceptedcmp3', 'acceptedcmp4', 'acceptedcmp5',
            'acceptedcmp1', 'acceptedcmp2', 'complain', 'z_costcontact',
            'z_revenue', 'response']

            logging.info(f'Reading database table {filename} complete')
            logging.info(f'Dataframe Head: \n{df.head().to_string()}')

            logging.info('Obtaining preprocessing object')

            # Get the full pipeline object
            full_pipeline = self.get_full_pipeline(numerical_cols, categorical_cols)

            # Drop the specified columns from the DataFrame
            cols_del = ['acceptedcmp3', 'acceptedcmp4', 'acceptedcmp5', 'acceptedcmp1',
                        'acceptedcmp2', 'complain', 'response']
            df_train = df.drop(columns=cols_del, axis=1)

            # Apply the full pipeline on the training dataset
            input_feature_train_arr = full_pipeline.fit_transform(df_train)

            # Save the full pipeline object
            save_object(
                file_path=self.data_transformation_config.preprocessor_obj_file_path,
                obj=full_pipeline
            )

            logging.info('Full pipeline pickle file saved')

            return (
                input_feature_train_arr,
                self.data_transformation_config.preprocessor_obj_file_path
            )

        except Exception as e:
            logging.info(f'Error occurred during reading database table {filename} in data transformation error: {e}')
            raise CustomException(e,sys)
            # Optionally, you can rollback the transaction here if necessary
            self.db.rollback_transaction()

# Note: The code above includes both the unchanged portions and the applied changes based on the provided suggestions. 
# However, the actual implementation might need to include the missing imports, definitions, and context from your complete codebase.
dt=DataTransformation()
a=dt.initiate_data_transformation('marketing_campaign')

a

CustomException: Error occured in python script name [C:\Users\shukl\AppData\Local\Temp\ipykernel_7332\831343499.py] line number [156] error message [unsupported operand type(s) for -: 'str' and 'str']