# Modelagem

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
# imports
import seaborn as sns
import matplotlib.pyplot as plt
from textwrap import wrap

from sklearn.preprocessing import OrdinalEncoder, OneHotEncoder, RobustScaler
from sklearn.compose import ColumnTransformer

from sklearn.model_selection import StratifiedKFold

from sklearn.metrics import f1_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import accuracy_score
from sklearn.metrics import roc_auc_score

from sklearn.svm import LinearSVC
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.dummy import DummyClassifier
from sklearn.ensemble import IsolationForest

from xgboost import XGBClassifier
from lightgbm import LGBMClassifier

import numpy as np

from sklearn.cluster import MiniBatchKMeans
from imblearn.ensemble import BalancedRandomForestClassifier


from imblearn.under_sampling import ClusterCentroids
from imblearn.over_sampling import ADASYN
from imblearn.combine import SMOTEENN, SMOTETomek


from tqdm.auto import tqdm
import pandas as pd

import optuna


from IPython.display import clear_output

In [None]:
import os

if os.path.basename(os.getcwd()) == "notebooks":
    os.chdir("..")

In [None]:
from src.prepare import prepare_data

X_train, X_test = prepare_data(
    project_id="ca-churn-project",
    database_name="customer_churn",
    table_name="customer_churn_data",
)

In [None]:
categoric_columns = X_train.select_dtypes(include=["category"]).columns

X_train["receita_total"] = X_train["receita_total"].fillna(X_train["receita_mensal"])
X_test["receita_total"] = X_test["receita_total"].fillna(X_test["receita_mensal"])

y_train = X_train.pop("churn")
y_test = X_test.pop("churn")

In [None]:
ordinal_columns = [
    "tipo_de_empresa",
    "funcionarios",
    "_modulo_financeiro",
    "_emissao_de_nota_fiscal",
    "_integracao_bancaria",
    "_modulo_de_vendas",
    "_relatorios",
    "_utilizacao_de_apis_de_integracao",
    "contrato",
    "frequencia_de_pagamento",
]

numeric_columns = [
    "fundacao_da_empresa",
    "meses_de_permanencia",
    "receita_mensal",
    "receita_total",
]

ordinal_encoder = OrdinalEncoder(handle_unknown="use_encoded_value", unknown_value=-1)
one_hot_encoder = OneHotEncoder(handle_unknown="ignore", sparse_output=False)

scaler = RobustScaler()

preprocessing = ColumnTransformer(
    transformers=[
        (
            "ordinal",
            ordinal_encoder,
            ordinal_columns,
        ),
        (
            "one_hot",
            one_hot_encoder,
            [
                "faz_conciliacao_bancaria",
                "forma_de_pagamento",
                "possuicontador",
                "emite_boletos",
                "possui_mais_de_um_socio",
                "utiliza_servicos_financeiros",
            ],
        ),
        (
            "scaler",
            scaler,
            numeric_columns,
        ),
    ]
).set_output(transform="pandas")

In [None]:
metrics = []


def objective(trial):
    clf_name = trial.suggest_categorical(
        "clf_name",
        [
            "LinearSVC",
            "DecisionTreeClassifier",
            "RandomForestClassifier",
            "BalancedRandomForestClassifier",
            "LogisticRegression",
            "XGBClassifier",
            "LGBMClassifier",
            "DummyClassifier",
            "IsolationForest",
        ],
    )

    # resampling = trial.suggest_categorical(
    #     "resampling", ["none", "SMOTEENN", "SMOTETomek", "ClusterCentroids", "ADASYN"]
    # )

    if clf_name == "LinearSVC":
        clf = LinearSVC(
            dual="auto",
            random_state=0,
            class_weight="balanced",
            C=trial.suggest_float("C", 1e-5, 1e5, log=True),
            penalty=trial.suggest_categorical("penalty", ["l1", "l2"]),
            max_iter=trial.suggest_int("max_iter", 100, 10000),
        )
    elif clf_name == "LogisticRegression":
        clf = LogisticRegression(
            random_state=0,
            class_weight="balanced",
            C=trial.suggest_float("C", 1e-5, 1e5, log=True),
        )
    elif clf_name == "DecisionTreeClassifier":
        clf = DecisionTreeClassifier(
            random_state=0,
            class_weight="balanced",
            max_depth=trial.suggest_int("max_depth", 1, 100),
            criterion=trial.suggest_categorical(
                "criterion", ["gini", "entropy", "log_loss"]
            ),
        )
    elif clf_name == "RandomForestClassifier":
        clf = RandomForestClassifier(
            n_jobs=-1,
            random_state=0,
            class_weight="balanced",
            n_estimators=trial.suggest_int("n_estimators", 1, 100),
            max_depth=trial.suggest_int("max_depth", 1, 100),
            criterion=trial.suggest_categorical(
                "criterion", ["gini", "entropy", "log_loss"]
            ),
            bootstrap=trial.suggest_categorical("bootstrap", [True, False]),
        )
    elif clf_name == "BalancedRandomForestClassifier":
        clf = BalancedRandomForestClassifier(
            n_jobs=-1,
            random_state=0,
            sampling_strategy="all",
            replacement=True,
            bootstrap=False,
            n_estimators=trial.suggest_int("n_estimators", 1, 100),
            max_depth=trial.suggest_int("max_depth", 1, 100),
        )
    elif clf_name == "XGBClassifier":
        clf = XGBClassifier(
            n_jobs=-1,
            random_state=0,
            booster=trial.suggest_categorical(
                "booster", ["gbtree", "gblinear", "dart"]
            ),
            alpha=trial.suggest_float("alpha", 1e-8, 1.0, log=True),
            scale_pos_weight=2.8,
        )
    elif clf_name == "LGBMClassifier":
        clf = LGBMClassifier(
            n_jobs=-1,
            random_state=0,
            class_weight="balanced",
            boosting_type=trial.suggest_categorical(
                "boosting_type", ["gbdt", "dart", "goss"]
            ),
            alpha=trial.suggest_float("alpha", 1e-8, 1.0, log=True),
            subsample=trial.suggest_float("subsample", 0.2, 1),
            colsample_bytree=trial.suggest_float("colsample_bytree", 0.2, 1),
            reg_alpha=trial.suggest_float("reg_alpha", 1e-8, 1.0, log=True),
            reg_lambda=trial.suggest_float("reg_lambda", 1e-8, 1.0, log=True),
        )
    elif clf_name == "DummyClassifier":
        clf = DummyClassifier(strategy="stratified")
    elif clf_name == "IsolationForest":
        clf = IsolationForest(
            random_state=0,
            n_jobs=-1,
            n_estimators=trial.suggest_int("n_estimators", 1, 100),
            max_samples=trial.suggest_int("max_samples", 1, 100),
            contamination=trial.suggest_float("contamination", 0.01, 0.5),
        )

    # resampling_strategy = {
    #     "none": None,
    #     "SMOTEENN": SMOTEENN(random_state=0),
    #     "SMOTETomek": SMOTETomek(random_state=0),
    #     "ClusterCentroids": ClusterCentroids(
    #         estimator=MiniBatchKMeans(random_state=0), random_state=0
    #     ),
    #     "ADASYN": ADASYN(random_state=0),
    # }

    cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=0)
    scores = np.zeros(10)

    for i, (train_index, test_index) in enumerate(cv.split(X_train, y_train)):
        X_train_fold, X_test_fold = X_train.iloc[train_index], X_train.iloc[test_index]
        y_train_fold, y_test_fold = y_train.iloc[train_index], y_train.iloc[test_index]

        X_train_fold = preprocessing.fit_transform(X_train_fold)
        X_test_fold = preprocessing.transform(X_test_fold)

        # if resampling != "none":
        #     X_train_fold, y_train_fold = resampling_strategy[resampling].fit_resample(
        #         X_train_fold, y_train_fold
        #     )

        if clf_name == "IsolationForest":
            clf.fit(X_train_fold[y_train_fold == 0])
            y_pred = clf.predict(X_test_fold)
            y_pred = np.where(y_pred == 1, 0, 1)
        else:
            clf.fit(X_train_fold, y_train_fold)
            y_pred = clf.predict(X_test_fold)

        scores[i] = f1_score(y_test_fold, y_pred, zero_division=0)
        metrics.append(
            {
                "clf_name": clf_name,
                # "resampling": resampling,
                "f1_score": scores[i],
                "precision_score": precision_score(
                    y_test_fold, y_pred, zero_division=0
                ),
                "recall_score": recall_score(y_test_fold, y_pred, zero_division=0),
                "accuracy_score": accuracy_score(y_test_fold, y_pred),
                "roc_auc_score": roc_auc_score(y_test_fold, y_pred),
                "trial": trial.number,
            }
        )

    return scores.mean()


study = optuna.create_study(direction="maximize")
study.optimize(objective, n_trials=100)
clear_output()
study.best_params

In [None]:
# Best params
# {'clf_name': 'LGBMClassifier',
#  'boosting_type': 'dart',
#  'alpha': 8.04643313873557e-05,
#  'subsample': 0.5962446048419463,
#  'colsample_bytree': 0.22677476520742115,
#  'reg_alpha': 0.4230330447663563,
#  'reg_lambda': 2.2652921293132893e-07}

trials = study.trials_dataframe()

In [None]:
best_models_idx = trials.groupby("params_clf_name")["value"].idxmax()
best_models = trials.loc[best_models_idx]

In [None]:
best_models

In [None]:
ignore_columns = [
    "number",
    "state",
    "datetime_start",
    "datetime_complete",
    "params_clf_name",
    "duration",
    "value",
]
lgbm_params = (
    best_models.loc[best_models["params_clf_name"] == "LGBMClassifier"]
    .dropna(axis=1)
    .drop(columns=ignore_columns)
    .iloc[0]
    .rename(lambda x: x.replace("params_", ""))
    .to_dict()
)

linear_svc_params = (
    best_models.loc[best_models["params_clf_name"] == "LinearSVC"]
    .dropna(axis=1)
    .drop(columns=ignore_columns)
    .iloc[0]
    .rename(lambda x: x.replace("params_", ""))
    .to_dict()
)

lr_params = (
    best_models.loc[best_models["params_clf_name"] == "LogisticRegression"]
    .dropna(axis=1)
    .drop(columns=ignore_columns)
    .iloc[0]
    .rename(lambda x: x.replace("params_", ""))
    .to_dict()
)
linear_svc_params["max_iter"] = int(linear_svc_params["max_iter"])

In [None]:
plt.figure(figsize=(20, 8))
ax = sns.boxplot(data=trials, x="params_clf_name", y="value")

plt.xlabel("")
plt.ylabel("")

# plt.ylim(0.5, 1.0)
sns.despine()

labels = [item.get_text() for item in ax.get_xticklabels()]
plt.xticks(
    ticks=np.arange(len(labels)),
    labels=["\n".join(wrap(text, 18)) for text in labels],
)


plt.show()

In [None]:
# plt.figure(figsize=(10, 6))
# ax = sns.boxplot(data=trials, x="params_resampling", y="value")

# plt.xlabel("")
# plt.ylabel("")

# plt.ylim(0, 1.0)
# sns.despine()

# plt.tick_params(axis="x", which="both", bottom=False)

# plt.xticks(
#     ticks=[0, 1, 2, 3, 4],
#     labels=[
#         "Sem reamostragem" if (label := item.get_text()) == "none" else label
#         for item in ax.get_xticklabels()
#     ],
# )


# plt.show()

In [None]:
linear_svc_params

In [None]:
clfs = {
    "LinearSVC": LinearSVC(
        dual="auto",
        random_state=0,
        class_weight="balanced",
    ),
    "LogisticRegression": LogisticRegression(
        random_state=0,
        class_weight="balanced",
    ),
    "LGBMClassifier": LGBMClassifier(
        n_jobs=-1,
        random_state=0,
        class_weight="balanced",
        force_row_wise=True,
    ),
    "DummyClassifier": DummyClassifier(strategy="stratified"),
}


metrics = []
cv = StratifiedKFold(n_splits=10, shuffle=True, random_state=0)

for clf_name, clf in tqdm(clfs.items()):
    for i, (train_index, test_index) in enumerate(cv.split(X_train, y_train)):
        X_train_fold, X_test_fold = X_train.iloc[train_index], X_train.iloc[test_index]
        y_train_fold, y_test_fold = y_train.iloc[train_index], y_train.iloc[test_index]

        X_train_fold = preprocessing.fit_transform(X_train_fold)
        X_test_fold = preprocessing.transform(X_test_fold)

        clf.fit(X_train_fold, y_train_fold)
        y_pred = clf.predict(X_test_fold)

        metrics.append(
            {
                "fold": i,
                "clf_name": clf_name,
                "f1_score": f1_score(y_test_fold, y_pred, zero_division=0),
                "precision_score": precision_score(
                    y_test_fold, y_pred, zero_division=0
                ),
                "recall_score": recall_score(y_test_fold, y_pred, zero_division=0),
                "accuracy_score": accuracy_score(y_test_fold, y_pred),
                "roc_auc_score": roc_auc_score(y_test_fold, y_pred),
            }
        )

clear_output()

df_metrics = pd.DataFrame(metrics)

df_metrics.groupby("clf_name").agg(["mean", "std"])

In [None]:
best_models

Os modelos possuem a pontuação de F1 muito próximas, por isso, para escolher o melhor modelo, será feito um teste de hipótese:

- H0: Modelos tem a mesma performance no conjunto de dados
- H1: Os modelos não possuem a mesma performance no conjunto de dados

Nível de significância: 0.05

Vamos conduzir o 5x2 cross-validation t-test para comparar os modelos.


In [None]:
from sklearn.pipeline import Pipeline
from mlxtend.evaluate import paired_ttest_5x2cv

lgbm_pipe = Pipeline(
    steps=[
        ("preprocessing", preprocessing),
        (
            "clf",
            LGBMClassifier(
                n_jobs=-1, random_state=0, class_weight="balanced", **lgbm_params
            ),
        ),
    ]
)

logistic_pipe = Pipeline(
    steps=[
        ("preprocessing", preprocessing),
        (
            "clf",
            LogisticRegression(random_state=0, class_weight="balanced", **lr_params),
        ),
    ]
)

svm_pipe = Pipeline(
    steps=[
        ("preprocessing", preprocessing),
        (
            "clf",
            LinearSVC(
                dual="auto",
                random_state=0,
                class_weight="balanced",
                **linear_svc_params,
            ),
        ),
    ]
)


t, p = paired_ttest_5x2cv(
    estimator1=lgbm_pipe,
    estimator2=svm_pipe,
    X=X_train,
    y=y_train,
    scoring="f1",
    random_seed=0,
)

clear_output()

print(f"P-value: {p:.4f}")
print(f"t-statistic: {t:.4f}")

if p < 0.05:
    print(
        "Já que o p-value é menor que 0.05, podemos rejeitar a hipótese nula e afirmar que um modelo é melhor que o outro"
    )
else:
    print(
        "Já que o p-value é maior que 0.05, não podemos rejeitar a hipótese nula e não podemos afirmar que um modelo é melhor que o outro"
    )

Os modelos não possuem diferença significativa, mas todos superam o baseline.

In [None]:
from sklearn.metrics import classification_report

best_lgbm_pipe = Pipeline(
    steps=[
        ("preprocessing", preprocessing),
        (
            "clf",
            LGBMClassifier(
                n_jobs=-1, random_state=0, class_weight="balanced", **lgbm_params
            ),
        ),
    ]
)

best_lr_pipe = Pipeline(
    steps=[
        ("preprocessing", preprocessing),
        (
            "clf",
            LogisticRegression(random_state=0, class_weight="balanced", **lr_params),
        ),
    ]
)

best_svm_pipe = Pipeline(
    steps=[
        ("preprocessing", preprocessing),
        (
            "clf",
            LinearSVC(
                dual="auto",
                random_state=0,
                class_weight="balanced",
                **linear_svc_params,
            ),
        ),
    ]
)

best_models = {
    "LGBMClassifier": best_lgbm_pipe,
    "LogisticRegression": best_lr_pipe,
    "LinearSVC": best_svm_pipe,
    "DummyClassifier": DummyClassifier(strategy="stratified"),
}


best_lgbm_pipe.fit(X_train, y_train)
best_lr_pipe.fit(X_train, y_train)
best_svm_pipe.fit(X_train, y_train)
best_models["DummyClassifier"].fit(X_train, y_train)

y_pred_lgbm = best_lgbm_pipe.predict(X_test)
y_pred_lr = best_lr_pipe.predict(X_test)
y_pred_svm = best_svm_pipe.predict(X_test)
y_pred_dummy = best_models["DummyClassifier"].predict(X_test)

clear_output()

print(f"{'LGBMClassifier':^30}")
print(classification_report(y_test, y_pred_lgbm, target_names=["Não Churn", "Churn"]))

print(f"\n\n{'LogisticRegression':^30}")
print(classification_report(y_test, y_pred_lr, target_names=["Não Churn", "Churn"]))

print(f"\n\n{'LinearSVC':^30}")
print(classification_report(y_test, y_pred_svm, target_names=["Não Churn", "Churn"]))

print(f"\n\n{'DummyClassifier':^30}")
print(
    classification_report(
        y_test,
        best_models["DummyClassifier"].predict(X_test),
        target_names=["Não Churn", "Churn"],
    )
)

In [None]:
df_metrics_test = []
cms = []

for clf_name, clf in best_models.items():
    y_pred = clf.predict(X_test)

    df_metrics_test.append(
        {
            "classifier": clf_name,
            "f1": f1_score(y_test, y_pred, zero_division=0),
            "precision": precision_score(y_test, y_pred, zero_division=0),
            "recall": recall_score(y_test, y_pred, zero_division=0),
            "accuracy": accuracy_score(y_test, y_pred),
            "roc_auc": roc_auc_score(y_test, y_pred),
        }
    )
    cms.append(pd.crosstab(y_test, y_pred, rownames=["Real"], colnames=["Predito"]))


df_metrics_test = pd.DataFrame(df_metrics_test)
df_metrics_test

In [None]:
print(
    (
        df_metrics_test.set_index("classifier").mul(100).round(2).astype(str) + "%"
    ).to_markdown()
)

In [None]:
fig, axes = plt.subplots(1, 4, figsize=(16, 4))

for ax, cm, clf_name in zip(axes.flatten(), cms, best_models.keys()):
    sns.heatmap(
        cm,
        annot=True,
        fmt="d",
        cmap="Blues",
        cbar=False,
        ax=ax,
    )
    ax.set_title(clf_name)