In [None]:
%%capture
%run preprocess_flags.ipynb

In [None]:
from sklearn.naive_bayes import ComplementNB
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import f1_score, confusion_matrix, ConfusionMatrixDisplay, classification_report
from sklearn.pipeline import Pipeline
from sklearn.feature_selection import GenericUnivariateSelect, chi2, f_classif, mutual_info_classif
import pandas as pd
import seaborn as sns
import numpy as np
import matplotlib.pyplot as plt

In [None]:
df = normalized_df
random_state: int = 0
np.random.seed(42)
X = df.drop("religion", axis=1)
y = df["religion"]
religion_encoder = LabelEncoder()
religion_encoder.fit(y)
y = religion_encoder.transform(y)

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=1/3)

train_countries = X_train["name"]
test_countries = X_test["name"]

X_train.drop("name", axis=1, inplace=True)
X_test.drop("name", axis=1, inplace=True)

In [None]:
feature_selector = GenericUnivariateSelect()
pipeline = Pipeline(steps=[("feature_selector", feature_selector), ("classifier", ComplementNB())])
pipelines = {
    "ComplementNB": pipeline
}

In [None]:
freq = 4

cross_args = {
    "ComplementNB": [
        {
            "feature_selector__score_func": [chi2, f_classif],
            "feature_selector__mode": ["fpr", "fdr", "fwe"],
            "feature_selector__param": np.arange(0.0, 0.5, 0.025),
            "classifier__alpha": [
                10 ** power for power in np.arange(-10 / freq, 20 / freq, 1 / freq)
            ],
        }, 
        {
            "feature_selector__score_func": [chi2, f_classif, mutual_info_classif],
            "feature_selector__mode": ["k_best"],
            "feature_selector__param": list(range(5, X_train.shape[1])),
            "classifier__alpha": [
                10 ** power for power in np.arange(-10 / freq, 20 / freq, 1 / freq)
            ],
        }
    ],
    "RandomForestClassifier": {
        "criterion": ["gini", "entropy"],
        "min_samples_split": [2, 4, 8, 16],
        "max_features": ["sqrt", "log2", None],
    },
}


In [None]:
name = "ComplementNB"
pipe = pipelines[name]
args = cross_args[name]
scoring_criterium = "f1_macro"

search = GridSearchCV(
    pipe,
    param_grid=args,
    scoring=scoring_criterium,
    n_jobs=-1,
    cv=5,
    verbose=2,
)

search.fit(X_train, y_train)

In [None]:
search.best_params_


In [None]:
search.cv_results_

In [None]:
test_cl = ComplementNB()
test_cl.fit(X_train, y_train)
f1_score(y_test, test_cl.predict(X_test), average="macro")

In [None]:
search.best_params_


In [None]:
best_pipe = pipe.set_params(**search.best_params_)
best_pipe.fit(X_train, y_train)
f1_score(y_test, best_pipe.predict(X_test), average="macro")

In [None]:
n_correct = (y_test == best_pipe.predict(X_test)).sum()
print(f"{n_correct} of {X_test.shape[0]} ({n_correct / X_test.shape[0]})")

In [None]:
cs_results = pd.DataFrame.from_records(search.cv_results_["params"])
cs_results["feature_selector__score_func"] = cs_results["feature_selector__score_func"].apply(lambda x: x.__name__)

score_keys = [key for key in search.cv_results_.keys() if str(key).startswith("split")]
for score_key in score_keys:
    cs_results[score_key] = search.cv_results_[score_key]

cs_results["score_mean"] = cs_results[score_keys].mean(axis=1)
cs_results

In [None]:
# https://stackoverflow.com/questions/53904155/flexibly-select-pandas-dataframe-rows-using-dictionary
best_params_formatted = {}
for key in search.best_params_:
    if callable(search.best_params_[key]):
        best_params_formatted[key] = search.best_params_[key].__name__
    else:
        best_params_formatted[key] = search.best_params_[key]
query = ' and '.join([f'{k} == {repr(v)}' for k, v in best_params_formatted.items()]) 

best_score = cs_results.query(query)["score_mean"]
best_score

In [None]:
y_predicted = best_pipe.predict(X_test)
true_labels, predicted_labels = religion_encoder.inverse_transform(y_test), religion_encoder.inverse_transform(y_predicted)
labels = np.unique(true_labels)
cm = confusion_matrix(true_labels, predicted_labels, labels=labels)
labels

In [None]:
plt.figure()
#cmp = ConfusionMatrixDisplay(cm, display_labels=religion_encoder.classes_, xticks_rotation="vertical")
cmp = ConfusionMatrixDisplay.from_estimator(best_pipe, X_test, y_test, display_labels=religion_encoder.classes_[:-1], xticks_rotation="vertical")
#cmp.plot()

In [None]:
selector_modes = list(cs_results["feature_selector__mode"].unique())
score_functions = list(cs_results["feature_selector__score_func"].unique())
cs_heatmaps = []


for score_function in score_functions:
    for selector_mode in selector_modes:
        plt.figure()
        cs_heatmaps.append(sns.heatmap(cs_results[(cs_results["feature_selector__mode"] == selector_mode) & (cs_results["feature_selector__score_func"] == score_function)].pivot(index="classifier__alpha", columns="feature_selector__param", values="score_mean"), vmin=0.0, vmax=0.55))
        cs_heatmaps[-1].set_xlabel(f"Mode: {selector_mode} ({score_function})")
        cs_heatmaps[-1].set_ylabel("Alpha")
        cs_heatmaps[-1].set_title(f"Complement Naive Bayes {scoring_criterium} by Selector Parameter and Alpha")

In [None]:
features_scores = pd.DataFrame(list(zip(best_pipe[-2].scores_, best_pipe[-2].feature_names_in_)), columns=["Score", "Name"]).sort_values("Score", ascending=False)
included_features = best_pipe[-2].get_feature_names_out()
features_scores

In [None]:
scoring_fig, scoring_ax = plt.subplots(figsize=(6, 15))
sns.barplot(features_scores, y="Name", x="Score", orient="horizontal", width=0.8, dodge=True, ax=scoring_ax)
for t in scoring_ax.yaxis.get_ticklabels():
    if t.get_text() in included_features:
        t.set_color("#008800")
scoring_ax.set_title(f"Feature Importance as determined by {best_pipe[-2].get_params()['score_func'].__name__}")

In [None]:
print(classification_report(y_true=y_test, y_pred=y_predicted, target_names=religion_encoder.classes_[:-1]))