## Imports

In [43]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.impute import SimpleImputer

## Data Encoding class

In [44]:
class DataEncoder():
    def __init__(self):
        pass

    def movement_encoding(self, x):
        """
        Encodes an input value into a binary value for movement.
        
        Parameters:
        x (int): The input value to be encoded.
        
        Returns:
        int: The encoded binary value. 0 for mainstream, 1 for avantgrade.
        if x not in any of the above return x 
        
        """
        mainstream_codes = [1, 3, 5, 8, 10, 12, 14]
        avantgrade_codes = [2, 4, 6, 7, 9, 11, 13, 15]
        if x in mainstream_codes:
            x = 0
        elif x in avantgrade_codes:
            x = 1
        else:
            x = x
        return x

    def decade_encoding(self, x):
        """
        Encodes an input value into a discrete value for decade.
        
        Parameters:
        x (int): The input value to be encoded.
        
        Returns:
        int: The encoded discrete value representing a decade.
        if x = 1 or 2 return 0 (40s)
        if x = 3 or 4 return 1 (50s)
        if x = 5 or 6 or 7 return 2 (60s)
        if x = 8 or 9 return 3 (70s)
        if x = 10 or 11 or 12 return 4 (80s)
        if x = 13 or 14 or 15 return 5 (90s)
        if x not in any of the above return x 
        
        """
        decade_40s = [1, 2]
        decade_50s = [3, 4]
        decade_60s = [5, 6, 7]
        decade_70s = [8, 9]
        decade_80s = [10, 11, 12]
        decade_90s = [13, 14, 15]
        if x in decade_40s:
            x = 0
        elif x in decade_50s:
            x = 1
        elif x in decade_60s:
            x = 2
        elif x in decade_70s:
            x = 3
        elif x in decade_80s:
            x = 4
        elif x in decade_90s:
            x = 5
        else:
            x = x 
        return x

    def wealth_encoding(self, x):
        """
        Encodes an input value into a discrete value for wealth.
        
        Parameters:
        x (int): The input value to be encoded.
        
        Returns:
        int: The encoded discrete value representing a wealth.
        if x in wealthy_households return 0
        if x in prosperous_households return 1
        if x in comfortable_households return 2
        if x in less_affluent_households return 3
        if x in poorer_households return 4
        if x not in any of the above return x 
        
        """
        wealthy_households = [11,12,13,14,15]
        prosperous_households = [21,22,23,24,25]
        comfortable_households = [31,32,33,34,35]
        less_affluent_households = [41,42,43,44,45]
        poorer_households = [51,52,53,54,55]
        if x in wealthy_households:
            x = 0
        elif x in prosperous_households:
            x = 1
        elif x in comfortable_households:
            x = 2
        elif x in less_affluent_households:
            x = 3
        elif x in poorer_households:
            x = 4
        else:
            x = x
        return x

    def life_stage_encoding(self, x):
        """
        Encodes an input value into a discrete value for life stage.
        
        Parameters:
        x (int): The input value to be encoded.
        
        Returns:
        int: The encoded discrete value representing a life stage.
        if x in pre_family_couples_and_singles return 0
        if x in young_couples_with_children return 1
        if x in families_with_school_age_children return 2
        if x in older_families_and_mature_couples return 3
        if x in elders_in_retirement return 4
        if x not in any of the above return x 
        
        """
        pre_family_couples_and_singles = [11,21,31,41,51]
        young_couples_with_children = [12,22,32,42,52]
        families_with_school_age_children = [13,23,33,43,53]
        older_families_and_mature_couples = [14,24,34,44,54]
        elders_in_retirement = [15,25,35,45,55]
        if x in pre_family_couples_and_singles:
            x = 0
        elif x in young_couples_with_children:
            x = 1
        elif x in families_with_school_age_children:
            x = 2
        elif x in older_families_and_mature_couples:
            x = 3
        elif x in elders_in_retirement:
            x = 4
        else:
            x = x
        return x

## Missing Values class

In [45]:
class MissingValuesImputer():
    def __init__(self):
        # initialize different imputers
        self.categorical_mixed_imputer = SimpleImputer(missing_values= np.NAN, strategy= "most_frequent")
        self.numerical_imputer = SimpleImputer(missing_values= np.NAN, strategy= "mean")
        self.ordinal_imputer = SimpleImputer(missing_values= np.NAN, strategy= "median")
    
    def general_data_imputer(self, df, feat_info):
        """
        This function imputes missing values in a DataFrame. It imputes different data types differently.
        For categorical and mixed type columns, it uses the 'most_frequent' strategy.
        For numerical type columns, it uses the 'mean' strategy.
        For ordinal type columns, it uses the 'median' strategy.
        
        Parameters:
        df (pandas.DataFrame): The DataFrame which has missing values to be imputed.
        feat_info (pandas.DataFrame): The DataFrame which has information about the features.
        
        Returns:
        df (pandas.DataFrame): The DataFrame with imputed missing values.
        
        """
        # impute categorical and mixed data types features
        categorical_mixed_cols = feat_info[(feat_info['type'] == 'categorical') | (feat_info['type'] == 'mixed')].attribute
        categorical_mixed_cols = categorical_mixed_cols.apply(lambda x: x if x in df.columns else np.NAN).dropna().to_list()
        self.categorical_mixed_imputer.fit(df[categorical_mixed_cols])
        imputed_categorical_mixed_columns = self.categorical_mixed_imputer.transform(df[categorical_mixed_cols])
        imputed_categorical_mixed_columns = pd.DataFrame(imputed_categorical_mixed_columns, index= df.index, columns= categorical_mixed_cols)
        df = df.drop(columns=categorical_mixed_cols)
        df = pd.concat([df, imputed_categorical_mixed_columns], axis=1)
        # impute numerical data types features
        numerical_cols = feat_info[feat_info['type'] == 'numeric'].attribute
        numerical_cols = numerical_cols.apply(lambda x: x if x in df.columns else np.NAN).dropna().to_list()
        self.numerical_imputer.fit(df[numerical_cols])
        imputed_numerical_columns = self.numerical_imputer.transform(df[numerical_cols])
        imputed_numerical_columns = pd.DataFrame(imputed_numerical_columns, index= df.index, columns= numerical_cols)
        df = df.drop(columns=numerical_cols)
        df = pd.concat([df, imputed_numerical_columns], axis=1)
        # impute ordinal data types features
        ordinal_cols = feat_info[feat_info['type'] == 'ordinal'].attribute
        ordinal_cols = ordinal_cols.apply(lambda x: x if x in df.columns else np.NAN).dropna().to_list()
        self.ordinal_imputer.fit(df[ordinal_cols])
        imputed_ordinal_columns = self.ordinal_imputer.transform(df[ordinal_cols])
        imputed_ordinal_columns = pd.DataFrame(imputed_ordinal_columns, index= df.index, columns= ordinal_cols)
        df = df.drop(columns=ordinal_cols)
        df = pd.concat([df, imputed_ordinal_columns], axis=1)
        return df
    
    def customer_data_imputer(self, df, feat_info):
        """
        This function imputes missing values in a DataFrame. It imputes different data types differently.
        For categorical and mixed type columns, it uses the 'most_frequent' strategy.
        For numerical type columns, it uses the 'mean' strategy.
        For ordinal type columns, it uses the 'median' strategy.
        
        Parameters:
        df (pandas.DataFrame): The DataFrame which has missing values to be imputed.
        feat_info (pandas.DataFrame): The DataFrame which has information about the features.
        
        Returns:
        df (pandas.DataFrame): The DataFrame with imputed missing values.
        
        """
        # impute categorical and mixed data types features
        categorical_mixed_cols = feat_info[(feat_info['type'] == 'categorical') | (feat_info['type'] == 'mixed')].attribute
        categorical_mixed_cols = categorical_mixed_cols.apply(lambda x: x if x in df.columns else np.NAN).dropna().to_list()
        imputed_categorical_mixed_columns = self.categorical_mixed_imputer.transform(df[categorical_mixed_cols])
        imputed_categorical_mixed_columns = pd.DataFrame(imputed_categorical_mixed_columns, index= df.index, columns= categorical_mixed_cols)
        df = df.drop(columns=categorical_mixed_cols)
        df = pd.concat([df, imputed_categorical_mixed_columns], axis=1)
        # impute numerical data types features
        numerical_cols = feat_info[feat_info['type'] == 'numeric'].attribute
        numerical_cols = numerical_cols.apply(lambda x: x if x in df.columns else np.NAN).dropna().to_list()
        imputed_numerical_columns = self.numerical_imputer.transform(df[numerical_cols])
        imputed_numerical_columns = pd.DataFrame(imputed_numerical_columns, index= df.index, columns= numerical_cols)
        df = df.drop(columns=numerical_cols)
        df = pd.concat([df, imputed_numerical_columns], axis=1)
        # impute ordinal data types features
        ordinal_cols = feat_info[feat_info['type'] == 'ordinal'].attribute
        ordinal_cols = ordinal_cols.apply(lambda x: x if x in df.columns else np.NAN).dropna().to_list()
        imputed_ordinal_columns = self.ordinal_imputer.transform(df[ordinal_cols])
        imputed_ordinal_columns = pd.DataFrame(imputed_ordinal_columns, index= df.index, columns= ordinal_cols)
        df = df.drop(columns=ordinal_cols)
        df = pd.concat([df, imputed_ordinal_columns], axis=1)
        return df

## Data Cleaning class

In [46]:
class DataCleaner():
    def __init__(self):
        self.upper_bound_row = 8

    def clean_general_data(self, df, feat_info):
        """
        Perform feature trimming, re-encoding, and engineering for demographics
        data
        
        INPUT: Demographics DataFrame
        OUTPUT: Trimmed and cleaned demographics DataFrame
        """
        
        # Put in code here to execute all main cleaning steps:
        # convert missing value codes into NaNs, ...
        missing_values_codes = []
        columns = []
        for column, missing_value_code in zip(feat_info["attribute"],feat_info["missing_or_unknown"]):
            missing_values_codes.extend(missing_value_code)
            columns.extend([column] * len(missing_value_code))
        for attribute , code in zip(columns, missing_values_codes):
            df[attribute] = df[attribute].apply(lambda x: np.NAN if x == code else x)
        # remove selected columns and rows, ...
        removed_columns = ['AGER_TYP', 'GEBURTSJAHR', 'TITEL_KZ', 'ALTER_HH', 'KK_KUNDENTYP', 'KBA05_BAUMAX']
        print('Removed Columns are:', removed_columns)
        df = df.drop(removed_columns,axis=1)
        df["NAN count"] = df.isna().sum(axis=1)
        df = df.loc[df["NAN count"] < self.upper_bound_row]
        df = df.drop(columns=["NAN count"])
        imputer = MissingValuesImputer()
        df = imputer.general_data_imputer(df, feat_info)
        # select, re-encode, and engineer column values.
        df = df.drop(columns = ['GEBAEUDETYP'])
        categorical = feat_info[feat_info['type'] == 'categorical'].attribute
        categorical = categorical.apply(lambda x: x if x in df.columns else np.NAN).dropna()
        categorical_levels = {'binary-level':[], 'binary-level-re-encoded':[],'multi-level-re-encoded':[]}
        for categorical_col in categorical:
            unique_categories = pd.Series(df[categorical_col].unique()).dropna().to_list()
            if len(unique_categories) == 2:
                if str(unique_categories[0]).isalpha() or str(unique_categories[1]).isalpha():
                    categorical_levels['binary-level-re-encoded'].append(categorical_col)
                else:
                    categorical_levels['binary-level'].append(categorical_col)
            elif len(unique_categories) > 2:
                categorical_levels['multi-level-re-encoded'].append(categorical_col)
        for categorical_col in categorical_levels['multi-level-re-encoded']:
            df.loc[:, categorical_col] = df[categorical_col].apply(lambda x: x if str(x).isalpha() else str(x))
        df[categorical_levels['binary-level-re-encoded'][0]] = df[categorical_levels['binary-level-re-encoded'][0]].apply(lambda x: 0 if x == 'W' else 1)
        multiple_dummy_attributes = pd.get_dummies(df[categorical_levels['multi-level-re-encoded']])
        df = pd.concat([df, multiple_dummy_attributes],axis=1)
        df = df.drop(categorical_levels['multi-level-re-encoded'],axis='columns')
        encoder = DataEncoder()
        # Create first variable movement wiht two bianry values: 0 for avantgarde and 1 for mainstream
        df['movement'] = df['PRAEGENDE_JUGENDJAHRE'].apply(encoder.movement_encoding)
        # Create second variable decade with multi-values: 0 for 40s and 1 for 50s and 2 for 60s 
        # and 3 for 70s and 4 for 80s and 5 for 90s
        df['decade'] = df['PRAEGENDE_JUGENDJAHRE'].apply(encoder.decade_encoding)
        # Create first variable wealth with multi-values: 0 for (11,12,13,14,15) and 1 for (21,22,23,24,25) and 2 for (31,32,33,34,35)
        # and 3 for (41,42,43,44,45) and 4 for (51,52,53,54,55)
        df["CAMEO_INTL_2015"]= df["CAMEO_INTL_2015"].astype(float)
        df['wealth'] = df['CAMEO_INTL_2015'].apply(encoder.wealth_encoding)
        # create second variable life_stage with multi-values: 0 for (11,21,31,41,51) and 1 for (12,22,32,42,52)
        # and 2 for (13,23,33,43,53) and 3 for (14,24,34,44,54) and 4 for (15,25,35,45,55)
        df['life_stage'] = df['CAMEO_INTL_2015'].apply(encoder.life_stage_encoding)
        df = df.drop(["PRAEGENDE_JUGENDJAHRE", "CAMEO_INTL_2015"], axis= 1)
        # Return the cleaned dataframe.
        df = df.drop(columns=['LP_LEBENSPHASE_FEIN', 'LP_LEBENSPHASE_GROB'])
        return df
    
    def clean_customer_data(self, df, feat_info):
        """
        Perform feature trimming, re-encoding, and engineering for demographics
        data
        
        INPUT: Demographics DataFrame
        OUTPUT: Trimmed and cleaned demographics DataFrame
        """
        
        # Put in code here to execute all main cleaning steps:
        # convert missing value codes into NaNs, ...
        missing_values_codes = []
        columns = []
        for column, missing_value_code in zip(feat_info["attribute"],feat_info["missing_or_unknown"]):
            missing_values_codes.extend(missing_value_code)
            columns.extend([column] * len(missing_value_code))
        for attribute , code in zip(columns, missing_values_codes):
            df[attribute] = df[attribute].apply(lambda x: np.NAN if x == code else x)
        # remove selected columns and rows, ...
        removed_columns = ['AGER_TYP', 'GEBURTSJAHR', 'TITEL_KZ', 'ALTER_HH', 'KK_KUNDENTYP', 'KBA05_BAUMAX']
        print('Removed Columns are:', removed_columns)
        df = df.drop(removed_columns,axis=1)
        df["NAN count"] = df.isna().sum(axis=1)
        df = df.loc[df["NAN count"] < upper_bound_row]
        df = df.drop(columns=["NAN count"])
        imputer = MissingValuesImputer()
        df = imputer.customer_data_imputer(df, feat_info)
        # select, re-encode, and engineer column values.
        df = df.drop(columns = ['GEBAEUDETYP'])
        categorical = feat_info[feat_info['type'] == 'categorical'].attribute
        categorical = categorical.apply(lambda x: x if x in df.columns else np.NAN).dropna()
        categorical_levels = {'binary-level':[], 'binary-level-re-encoded':[],'multi-level-re-encoded':[]}
        for categorical_col in categorical:
            unique_categories = pd.Series(df[categorical_col].unique()).dropna().to_list()
            if len(unique_categories) == 2:
                if str(unique_categories[0]).isalpha() or str(unique_categories[1]).isalpha():
                    categorical_levels['binary-level-re-encoded'].append(categorical_col)
                else:
                    categorical_levels['binary-level'].append(categorical_col)
            elif len(unique_categories) > 2:
                categorical_levels['multi-level-re-encoded'].append(categorical_col)
        for categorical_col in categorical_levels['multi-level-re-encoded']:
            df.loc[:, categorical_col] = df[categorical_col].apply(lambda x: x if str(x).isalpha() else str(x))
        df[categorical_levels['binary-level-re-encoded'][0]] = df[categorical_levels['binary-level-re-encoded'][0]].apply(lambda x: 0 if x == 'W' else 1)
        multiple_dummy_attributes = pd.get_dummies(df[categorical_levels['multi-level-re-encoded']])
        df = pd.concat([df, multiple_dummy_attributes],axis=1)
        df = df.drop(categorical_levels['multi-level-re-encoded'],axis='columns')
        encoder = DataEncoder()
        # Create first variable movement wiht two bianry values: 0 for avantgarde and 1 for mainstream
        df['movement'] = df['PRAEGENDE_JUGENDJAHRE'].apply(encoder.movement_encoding)
        # Create second variable decade with multi-values: 0 for 40s and 1 for 50s and 2 for 60s 
        # and 3 for 70s and 4 for 80s and 5 for 90s
        df['decade'] = df['PRAEGENDE_JUGENDJAHRE'].apply(encoder.decade_encoding)
        # Create first variable wealth with multi-values: 0 for (11,12,13,14,15) and 1 for (21,22,23,24,25) and 2 for (31,32,33,34,35)
        # and 3 for (41,42,43,44,45) and 4 for (51,52,53,54,55)
        df["CAMEO_INTL_2015"]= df["CAMEO_INTL_2015"].astype(float)
        df['wealth'] = df['CAMEO_INTL_2015'].apply(encoder.wealth_encoding)
        # create second variable life_stage with multi-values: 0 for (11,21,31,41,51) and 1 for (12,22,32,42,52)
        # and 2 for (13,23,33,43,53) and 3 for (14,24,34,44,54) and 4 for (15,25,35,45,55)
        df['life_stage'] = df['CAMEO_INTL_2015'].apply(encoder.life_stage_encoding)
        df = df.drop(["PRAEGENDE_JUGENDJAHRE", "CAMEO_INTL_2015"], axis= 1)
        # Return the cleaned dataframe.
        df = df.drop(columns=['LP_LEBENSPHASE_FEIN', 'LP_LEBENSPHASE_GROB'])
        return df

## Data Count Visualizer class

In [47]:
class DataCountVisualizer():
    def __init__(self):
        pass

    def count_plot(self, data1, data2, column_name, super_title, sub_title1, sub_title2):
        """
        This function creates a countplot of a column of data in two different data sets.

        Parameters:
        data1 (pandas dataframe): The first data set to be plotted.
        data2 (pandas dataframe): The second data set to be plotted.
        column_name (str): The name of the column to be plotted.
        super_title (str): The title of the entire plot.
        sub_title1 (str): The title of the first subplot.
        sub_title2 (str): The title of the second subplot.

        Returns:
        None
        """
        figure , ax = plt.subplots(1,2)
        figure.suptitle(super_title)
        figure.set_figheight(7)
        figure.set_figwidth(18)
        sns.countplot(x=column_name, data=data1, ax=ax[0])
        ax[0].set_title(sub_title1)
        sns.countplot(x=column_name, data=data2, ax=ax[1])
        ax[1].set_title(sub_title2)
    
    def proportion_plot(self, data1, data2, column_name, super_title, sub_title1, sub_title2):
        """
        This function creates a countplot of a column of data in two different data sets. 

        Parameters:
        data1 (pandas dataframe): The first data set to be plotted.
        data2 (pandas dataframe): The second data set to be plotted.
        column_name (str): The name of the column to be plotted.
        super_title (str): The title of the entire plot.
        sub_title1 (str): The title of the first subplot.
        sub_title2 (str): The title of the second subplot.

        Returns:
        None
        """
        figure , ax = plt.subplots(1,2)
        figure.suptitle(super_title)
        figure.set_figheight(7)
        figure.set_figwidth(18)
        proportions_1 = data1[column_name].value_counts(normalize=True)
        proportions_2 = data2[column_name].value_counts(normalize=True)
        sns.barplot(x=proportions_1.index, y=proportions_1.values, ax=ax[0])
        ax[0].set_title(sub_title1)
        sns.barplot(x=proportions_2.index, y=proportions_2.values, ax=ax[1])
        ax[1].set_title(sub_title2)