In [None]:
import numpy as np
import scipy as sp
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import OneHotEncoder

pd.set_option("display.max_rows", 500)


In [None]:
tb_housing = pd.read_csv("data/tb_ames_housing.csv")
tb_housing = tb_housing.dropna(axis=1, thresh=tb_housing.shape[0] * 0.5)

X_full = tb_housing.drop("SalePrice", axis=1)
y_full = tb_housing["SalePrice"]
X_train, X_test, y_train, y_test = train_test_split(X_full, y_full)
tb_housing_cat = X_train.select_dtypes(exclude="number")


# Treatment of Categorical Variables

## Dealing with rare & missing levels

In [None]:
nl_list = []
mcl_list = []
lcl_list = []
na_list = []
column_list = []
for column in tb_housing_cat.columns:
    num_levels = len(tb_housing_cat[column].unique())
    most_common_level = tb_housing_cat[column].value_counts().max()
    least_common_level = tb_housing_cat[column].value_counts().min()
    num_non_na = (~tb_housing_cat[column].isna()).sum()
    column_list.append(column)
    nl_list.append(num_levels)
    mcl_list.append(most_common_level)
    lcl_list.append(least_common_level)
    na_list.append(num_non_na)

tb_cat_info = pd.DataFrame(
    {
        "column": column_list,
        "num_levels": nl_list,
        "most_common_level": mcl_list,
        "least_common_level": lcl_list,
        "num_nonna": na_list,
    }
)
tb_cat_info


In [None]:
big_level_thresh = (tb_cat_info["most_common_level"] / tb_cat_info["num_nonna"]) < 0.9
tb_cat_info = tb_cat_info[big_level_thresh].copy()
tb_cat_info


In [None]:
tb_cat_levels = pd.DataFrame()
for column in tb_housing_cat.columns:
    num_obs_level = tb_housing_cat[column].value_counts().reset_index()
    num_obs_level.columns = ["level_name", "num_obs"]
    num_obs_level["column"] = column
    tb_cat_levels = pd.concat([tb_cat_levels, num_obs_level], axis=0)

tb_cat_levels.head(10)


In [None]:
tb_cat_level_info = tb_cat_info.merge(tb_cat_levels, on="column")
tb_cat_level_info.head()


In [None]:
tb_cat_level_info.loc[
    tb_cat_level_info["num_obs"] > 50, "grp_level"
] = tb_cat_level_info["level_name"]
tb_cat_level_info.loc[tb_cat_level_info["num_obs"] <= 50, "grp_level"] = "Others"
tb_cat_level_info.head(20)


In [None]:
tb_cat_level_info.shape


In [None]:
tb_cat_level_info.groupby(["column", "grp_level"])["level_name"].count().reset_index()


In [None]:
for column in tb_cat_level_info["column"].unique():
    tb_column = tb_cat_level_info[tb_cat_level_info["column"] == column]
    grp_dict = dict()

    for level in tb_column["level_name"].unique():
        grp_dict[level] = tb_column.loc[
            tb_column["level_name"] == level, "grp_level"
        ].item()

    new_column = "grp_" + column
    X_train[new_column] = X_train[column].map(grp_dict).fillna("Others")


### Writing a wrapper

In [None]:
def create_group_dict(categorical_data, mcl_level=0.9, min_obs=50):
    # Creating measurements of categorical data quality
    nl_list = []
    mcl_list = []
    lcl_list = []
    na_list = []
    column_list = []
    for column in categorical_data.columns:
        num_levels = len(categorical_data[column].unique())
        most_common_level = categorical_data[column].value_counts().max()
        least_common_level = categorical_data[column].value_counts().min()
        num_non_na = (~categorical_data[column].isna()).sum()
        column_list.append(column)
        nl_list.append(num_levels)
        mcl_list.append(most_common_level)
        lcl_list.append(least_common_level)
        na_list.append(num_non_na)
    tb_cat_info = pd.DataFrame(
        {
            "column": column_list,
            "num_levels": nl_list,
            "most_common_level": mcl_list,
            "least_common_level": lcl_list,
            "num_nonna": na_list,
        }
    )
    # Filtering columns with low variance (most values are the same)
    big_level_thresh = (
        tb_cat_info["most_common_level"] / tb_cat_info["num_nonna"]
    ) < mcl_level
    tb_cat_info = tb_cat_info[big_level_thresh].copy()

    # Creating categorical level information DataFrame
    tb_cat_levels = pd.DataFrame()
    for column in categorical_data.columns:
        num_obs_level = categorical_data[column].value_counts().reset_index()
        num_obs_level.columns = ["level_name", "num_obs"]
        num_obs_level["column"] = column
        tb_cat_levels = pd.concat([tb_cat_levels, num_obs_level], axis=0)
    tb_cat_level_info = tb_cat_info.merge(tb_cat_levels, on="column")

    # Creating grouping dictionaries for each categorical observation
    tb_cat_level_info.loc[
        tb_cat_level_info["num_obs"] > min_obs, "grp_level"
    ] = tb_cat_level_info["level_name"]
    tb_cat_level_info.loc[
        tb_cat_level_info["num_obs"] <= min_obs, "grp_level"
    ] = "Others"
    column_grp_dict = dict()
    for column in tb_cat_level_info["column"].unique():
        tb_column = tb_cat_level_info[tb_cat_level_info["column"] == column]
        grp_dict = dict()

        for level in tb_column["level_name"].unique():
            grp_dict[level] = tb_column.loc[
                tb_column["level_name"] == level, "grp_level"
            ].item()

        column_grp_dict[column] = grp_dict

    return column_grp_dict


In [None]:
# log(y) = B + A * x
# y = e^(B + A * x) = e^B * e^(A*x) = C * e^(A*x) * e^(A1 * x1)

In [None]:
col_grp_dict = create_group_dict(X_train.select_dtypes(exclude="number"))


The `create_group_dict` is the `fit` part of our transformation. Now we must create a wrapper for the application of the dictionary (the `transform` part):

In [None]:
def group_cat_levels(categorical_data, col_grp_dict):
    # Drop columns that are not mapped in our col_grp_dict
    categorical_data = categorical_data[col_grp_dict.keys()].copy()
    # Apply our dictionaries to the remaining columns
    for column in col_grp_dict.keys():
        grp_dict = col_grp_dict[column]
        categorical_data[column] = (
            categorical_data[column].map(grp_dict).fillna("Others")
        )

    return categorical_data


Let's test our wrappers on the housing dataset:

In [None]:
tb_housing = pd.read_csv("data/tb_ames_housing.csv")
tb_housing = tb_housing.dropna(axis=1, thresh=tb_housing.shape[0] * 0.5)

X_full = tb_housing.drop("SalePrice", axis=1)
y_full = tb_housing["SalePrice"]
X_train, X_test, y_train, y_test = train_test_split(X_full, y_full)
tb_housing_cat = X_train.select_dtypes(exclude="number")

col_grp_dict = create_group_dict(X_train.select_dtypes(exclude="number"))
X_train_cat_grp = group_cat_levels(
    X_train.select_dtypes(exclude="number"), col_grp_dict
)
X_test_cat_grp = group_cat_levels(X_test.select_dtypes(exclude="number"), col_grp_dict)
X_train_cat_grp


Now we can train our One-Hot Encoder on our transformed categorical variables:

In [None]:
ohe_fit = OneHotEncoder(sparse=False, handle_unknown="ignore")
ohe_fit.fit(X_train_cat_grp)


In [None]:
X_train_dummy = pd.DataFrame(
    ohe_fit.transform(X_train_cat_grp), columns=ohe_fit.get_feature_names_out()
)
X_test_dummy = pd.DataFrame(
    ohe_fit.transform(X_test_cat_grp), columns=ohe_fit.get_feature_names_out()
)


# Complex Encoding Strategies

## Choosing categorical levels

In [None]:
from sklearn.feature_selection import mutual_info_regression
# You can also use mutual_info_classif for classification problems


In [None]:
mir_fit = mutual_info_regression(X_train_dummy, y_train, discrete_features=True)


In [None]:
sns.histplot(mir_fit)


In [None]:
X_train_dummy.columns[mir_fit > np.quantile(mir_fit, 0.75)]


## MCA

In [None]:
!pip install prince

In [None]:
import prince


In [None]:
mca_fit = prince.MCA(n_components=10)
mca_fit.fit(X_train_cat_grp)


In [None]:
plt.plot(np.cumsum(mca_fit.explained_inertia_))


In [None]:
mca_fit.plot_coordinates(X_train_cat_grp, show_row_points=False, figsize=(10, 10))


In [None]:
tb_mca = mca_fit.transform(X_train_cat_grp)
tb_mca.columns = ['MC_' + str(i) for i in range(10)]
tb_mca['log_SalePrice'] = np.log(y_train)

In [None]:
fig, ax = plt.subplots(1,1, figsize = (10, 10))
sns.scatterplot(data = tb_mca, x = 'MC_0', y = 'MC_2', hue = 'log_SalePrice', palette="Spectral", alpha = 0.8)

In [None]:
X_train_mca = mca_fit.transform(X_train_cat_grp)
X_test_mca = mca_fit.transform(X_test_cat_grp)


# Testing the full model

In [None]:
from sklearn.decomposition import PCA

In [None]:
X_train_num = X_train.select_dtypes(include = 'number').fillna(0)
X_test_num = X_test.select_dtypes(include = 'number').fillna(0)

In [None]:
pca_fit = PCA()
pca_fit.fit(X_train_num)

In [None]:
X_train_pca = pd.DataFrame(
    pca_fit.transform(X_train_num),
    columns = ['PC_' + str(i) for i in range(pca_fit.n_components_)],
    index = y_train.index
)
X_test_pca = pd.DataFrame(
    pca_fit.transform(X_test_num),
    columns = ['PC_' + str(i) for i in range(pca_fit.n_components_)],
    index = y_test.index
)

In [None]:
X_train_full = pd.concat([X_train_mca, X_train_pca], axis = 1)
X_test_full = pd.concat([X_test_mca, X_test_pca], axis = 1)

In [None]:
from catboost import CatBoostRegressor
from sklearn.metrics import mean_squared_error

In [None]:
cat_fit = CatBoostRegressor(iterations=20000, depth=8, od_type="Iter", od_wait=1500, verbose = False)
cat_fit.fit(X_train_full, y_train, eval_set=(X_test_full, y_test))

In [None]:
y_pred = cat_fit.predict(X_test_full)
np.sqrt(mean_squared_error(y_test, y_pred))