In [None]:
import pandas as pd
from configs.data import MACHINE_LEARNING_DATASET_PATH, MERGED_DATASET_PATH, OUT_PATH, VERSION
from sklearn.metrics import classification_report
import numpy as np
from configs.enums import Column, RISKCLASSIFICATIONS
from machine_learning.utils import scale_dataset
from sklearn.model_selection import cross_val_score
import shap

In [None]:
def split_data(dataframe: pd.DataFrame):
    data_by_risk = [dataframe[dataframe["country_risk"] == v] for v in RISKCLASSIFICATIONS.get_values()]
    split_data = [
        # Train (70%) and test (30%) datasets
        np.split(sd.sample(frac=1, random_state=0), [int(0.7 * len(sd))])
        for sd
        in data_by_risk
    ]

    train = pd.concat([row[0] for row in split_data])
    test = pd.concat([row[1] for row in split_data])

    return train, test

In [None]:
df = pd.read_excel(MACHINE_LEARNING_DATASET_PATH)

train_df, test_df = split_data(df)

train, x_train, train_labels = scale_dataset(train_df, oversample=True)
test, x_test, test_labels = scale_dataset(test_df, oversample=False)  

In [None]:
def print_results(model) -> None:
    y_pred_train = model.predict(x_train)
    print("\n###### Training ######")
    print(classification_report(train_labels, y_pred_train))

    y_pred = model.predict(x_test)
    print("\n###### Test ######")
    print(classification_report(test_labels, y_pred))

In [None]:
def output_wrong_predicted_xlsx(dataframe, y_pred, model_name):
    import os
    options = ["low", "medium", "high"]
    
    result = dataframe
    result["predicted_country_risk"] = y_pred
    
    wrongly_predicted = result[result["country_risk"] != result["predicted_country_risk"]]
    
    m_df = pd.read_excel(MERGED_DATASET_PATH)
    wm_df = m_df.iloc[wrongly_predicted.index, ]
    
    match_classifications = lambda c: [(wrongly_predicted[c] == 0), (wrongly_predicted[c] == 1), (wrongly_predicted[c] == 2)]

    wm_df["country_risk"] = np.select(match_classifications("country_risk"), options)
    wm_df["predicted_country_risk"] = np.select(match_classifications("predicted_country_risk"), options)
    
    cols = ["year", "country"] + list(wrongly_predicted.columns) + ["norm_risk"]
    wm_df = wm_df[cols]
    wm_df.to_excel(
            os.path.join(OUT_PATH, f"{model_name}-wrongly-predicted-V.{VERSION}.xlsx"),
            index=False,
            sheet_name="Data")

## KNN

In [None]:
from sklearn.neighbors import KNeighborsClassifier
knn_model = KNeighborsClassifier(n_neighbors=50)
knn_model.fit(x_train, train_labels)

print_results(knn_model)

In [None]:
X = df[df.columns[:-1]]
y = df[df.columns[-1]]  
score_knn = cross_val_score(knn_model, X, y, cv=10)
print(score_knn)
print("Avg: ", np.average((score_knn)))

In [None]:
# y_pred = knn_model.predict(x_test)
# print(classification_report(test_labels, y_pred))
# output_wrong_predicted_xlsx(test_df, y_pred, "knn_200")

### Shap

In [None]:
def shapify(data: pd.DataFrame, model):
    train, test = split_data(data)

    train, x_train, train_labels = scale_dataset(train, oversample=True)
    test, x_test, test_labels = scale_dataset(test, oversample=False)

    explainer = shap.KernelExplainer(model.predict, x_train)
    shap_values = explainer.shap_values(x_test, nsamples=100)
    # explainer.save()

    return explainer, shap_values, x_test

In [None]:
# shap_df = df[:300]
# 
# train, valid, test = split_data(shap_df)
# train, X_train, train_labels = scale_dataset(train, oversample=True)
# valid, X_valid, val_labels = scale_dataset(valid, oversample=False)
# test, X_test, test_labels = scale_dataset(test, oversample=False)
# 
# knn_model = KNeighborsClassifier(n_neighbors=50)
# knn_model.fit(X_train, train_labels)
# 
# explainer, shap_values, shap_x_test = shapify(shap_df, knn_model)
# shap.summary_plot(shap_values, shap_x_test, feature_names=df.columns[Column.COUNTRY_RISK],
#                   class_names=RISKCLASSIFICATIONS.get_names())

## Logistic Regression

In [None]:
from sklearn.linear_model import LogisticRegression
lg_model = LogisticRegression()
lg_model.fit(x_train, train_labels)

print_results(lg_model)

In [None]:
X = df[df.columns[:-1]]
y = df[df.columns[-1]]  
score_rf = cross_val_score(lg_model, X, y, cv=10)
print(score_rf)
print("Avg: ", np.average((score_rf)))

## SVM

In [None]:
from sklearn.svm import SVC
svm_model = SVC()
svm_model.fit(x_train, train_labels)

print_results(svm_model)

In [None]:
X = df[df.columns[:-1]]
y = df[df.columns[-1]]  
score_rf = cross_val_score(svm_model, X, y, cv=10)
print(score_rf)
print("Avg: ", np.average((score_rf)))

## Random Forest

In [None]:
from sklearn.ensemble import RandomForestClassifier
# rf_model = RandomForestClassifier(n_estimators=3000, random_state=4098)
rf_model = RandomForestClassifier(n_estimators=500, random_state=42) 
rf_model.fit(x_train, train_labels)

print_results(rf_model)

In [None]:
# y_pred = rf_model.predict(x_test)
# print(classification_report(test_labels, y_pred))
# output_wrong_predicted_xlsx(test_df, y_pred, "rf_500")

In [None]:
X = df[df.columns[:-1]]
y = df[df.columns[-1]]  
score_rf = cross_val_score(rf_model, X, y, cv=10)
print(score_rf)
print("Avg: ", np.average((score_rf)))

In [None]:
# https://www.kaggle.com/code/ahmedabdulhamid/best-n-estimators-for-randomforest
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score

rf_model = RandomForestClassifier(n_estimators=5000, random_state=42)
rf_model.fit(x_train, train_labels)

predictions = []
for tree in rf_model.estimators_:
    predictions.append(tree.predict_proba(x_test)[None, :])

predictions = np.vstack(predictions)
cum_mean = np.cumsum(predictions, axis=0)/np.arange(1, predictions.shape[0] + 1)[:, None, None]

scores = []
for pred in cum_mean:
    scores.append(accuracy_score(test_labels, np.argmax(pred, axis=1)))
    
plt.figure(figsize=(15, 8))
plt.plot(scores, linewidth=3)
plt.xlabel('num_trees')
plt.ylabel('accuracy')

In [None]:
# y_pred = rf_model.predict(x_test)
# result = test_df 
# result["predicted_country_risk"] = y_pred
# distribution = result.groupby(["country_risk", "predicted_country_risk"]).size().reset_index().rename(columns={0: 'count'})
# print(distribution)