## Training script

Trains several models on a given dataset and prints/saves the result

In [None]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split

from sklearn.metrics import classification_report, confusion_matrix
from sklearn.metrics import f1_score
from sklearn.metrics import accuracy_score as accuracy

from sklearn.model_selection import cross_validate

import shap

#### Helper functions

In [None]:
def evaluate_by_gender(y_true, y_pred, gender_labels):
    y_true = np.array(y_true)
    gender_0_ind = np.where(np.array(gender_labels) == 0)
    gender_1_ind = np.where(np.array(gender_labels) == 1)
    #print(gender_labels)
    #print("overall acc:", accuracy(y_pred, y_true))
    ##sns.heatmap(confusion_matrix(y_pred, y_true), annot=True, fmt='d')
    #plt.show()
    
    gender_0_accuracy = accuracy(y_pred[gender_0_ind], y_true[gender_0_ind])
    #print("Gender 0 accuracy:", gender_0_accuracy)
    #sns.heatmap(confusion_matrix(y_pred[gender_0_ind], y_true[gender_0_ind]), annot=True, fmt='d')
    #plt.show()
               
    gender_1_accuracy = accuracy(y_pred[gender_1_ind], y_true[gender_1_ind])
    #print("Gender 1 accuracy:", gender_1_accuracy)
    #sns.heatmap(confusion_matrix(y_pred[gender_1_ind], y_true[gender_1_ind]), annot=True, fmt='d')
    #plt.show()
    
    return gender_0_accuracy, gender_1_accuracy
    
    
#trained_model, f1_train, f1_test, acc_train, acc_test = fit_evaluate_model(models[0], X_train, X_test_final, y_train, y_test_final, print_evaluation=False)

In [None]:
def fit_evaluate_model(model, X_train, X_test, y_train, y_test, train_gender, validation_gender, model_name=None, print_evaluation=True):
    model.fit(X_train, y_train)
    
    #print("Training set")
    y_pred = model.predict(X_train)
    f1_train = f1_score(y_train, y_pred)
    acc_train = accuracy(y_train, y_pred)
    #print("F1:", f1_train)
    #print("Acc:", acc_train)
    
    if print_evaluation:

        print(classification_report(y_train,y_pred))
        cm = confusion_matrix(y_train,y_pred)
        sns.heatmap(cm, annot=True, fmt='d')
        plt.show()
        
    #print(len(y_train))
    #print(len(train_gender))
    gender_train_acc = evaluate_by_gender(y_train, y_pred, train_gender)
    
    y_pred = model.predict(X_test)

    #print("Testing set")
    f1_test = f1_score(y_test, y_pred)
    acc_test = accuracy(y_test, y_pred)
    ##print("F1:", f1_test)
    #print("Acc:", acc_test)

    if print_evaluation:

        print(classification_report(y_test,y_pred))
        cm = confusion_matrix(y_test,y_pred)
        sns.heatmap(cm, annot=True, fmt='d')
        plt.show()
        
    gender_test_acc = evaluate_by_gender(y_test, y_pred, validation_gender)
        
    
    return model, f1_train, f1_test, acc_train, acc_test, gender_train_acc, gender_test_acc

In [None]:
def fit_evaluate_model_old(model, X_train, X_test, y_train, y_test, model_name=None, print_evaluation=True):
    model.fit(X_train, y_train)
    
    print("Training set")
    y_pred = model.predict(X_train)
    f1_train = f1_score(y_train, y_pred)
    acc_train = accuracy(y_train, y_pred)
    print("F1:", f1_train)
    print("Acc:", acc_train)
    
    if print_evaluation:

        print(classification_report(y_train,y_pred))
        cm = confusion_matrix(y_train,y_pred)
        sns.heatmap(cm, annot=True, fmt='d')
        plt.show()
    
    y_pred = model.predict(X_test)

    print("Testing set")
    f1_test = f1_score(y_test, y_pred)
    acc_test = accuracy(y_test, y_pred)
    print("F1:", f1_test)
    print("Acc:", acc_test)

    if print_evaluation:

        print(classification_report(y_test,y_pred))
        cm = confusion_matrix(y_test,y_pred)
        sns.heatmap(cm, annot=True, fmt='d')
        plt.show()

        
    return model, f1_train, f1_test, acc_train, acc_test

In [None]:
def cross_validation(model, _X, _y, _cv=5):
      '''Function to perform 5 Folds Cross-Validation
       Parameters
       ----------
      model: Python Class, default=None
              This is the machine learning algorithm to be used for training.
      _X: array
           This is the matrix of features.
      _y: array
           This is the target variable.
      _cv: int, default=5
          Determines the number of folds for cross-validation.
       Returns
       -------
       The function returns a dictionary containing the metrics 'accuracy', 'precision',
       'recall', 'f1' for both training set and validation set.
      '''
      _scoring = ['accuracy', 'precision', 'recall', 'f1']
      results = cross_validate(estimator=model,
                               X=_X,
                               y=_y,
                               cv=_cv,
                               scoring=_scoring,
                               return_train_score=True)
      
      return {"Training Accuracy scores": results['train_accuracy'],
              "Mean Training Accuracy": results['train_accuracy'].mean()*100,
              "Training F1 scores": results['train_f1'],
              "Mean Training F1 Score": results['train_f1'].mean(),
              "Validation Accuracy scores": results['test_accuracy'],
              "Mean Validation Accuracy": results['test_accuracy'].mean()*100,
              "Validation F1 scores": results['test_f1'],
              "Mean Validation F1 Score": results['test_f1'].mean()
              }

In [None]:
def get_tuned_model(model, X_train, X_test, y_train, y_test):
    
    trial=Trials()
    
    if type(model) == LogisticRegression:
        best = optimize_lr(trial, X_train, X_test, y_train, y_test)
        
        return LogisticRegression(C=best['C'])
        
    elif type(model) == DecisionTreeClassifier:
        best = optimize_dtree(trial, X_train, X_test, y_train, y_test)
        
        return DecisionTreeClassifier(
            max_depth=int(best['max_depth']),
            criterion=int(best['criterion'])
        )
        
    elif type(model) == RandomForestClassifier:
        best = optimize_rf(trial, X_train, X_test, y_train, y_test)
        
        return RandomForestClassifier(
            n_estimators=int(best['n_estimators']),
            max_depth=int(best['max_depth']),
            min_samples_leaf=int(best['min_samples_leaf']),
            min_samples_split=int(best['min_samples_split']))   
    
            
    elif type(model) == SVC:
        best = optimize_svc(trial, X_train, X_test, y_train, y_test)
        
        return SVC(C=best['C'], kernel=best['kernel'])
    
    elif type(model) == LinearSVC:
        best = optimize_linearSvc(trial, X_train, X_test, y_train, y_test)
        
        return LinearSVC(penalty=best['penalty'])
    
    else:
        print("Model has no parameters to tune.")
        return model

In [None]:
def run_models(models, X_train, X_test, y_train, y_test, hyperparam_tunning=False):
    
    for model in models:
        #print(model)
        
        if hyperparam_tunning:
            model = get_tuned_model(model, X_train, X_test, y_train, y_test)
            print(model)
            
            print("Cross validation from training set:")
            results = cross_validation(model, X_train, y_train, _cv=5)
            for key, value in results.items():
                print(key, value)

            print("")
            trained_model, _, _, _, _ = fit_evaluate_model(model, X_train, X_test, y_train, y_test)
        
        if type(model) == RandomForestClassifier:
            explainer = shap.TreeExplainer(trained_model)
            shap_values = explainer.shap_values(X_train)
            shap.summary_plot(shap_values, X_train)
        
        print("*****************************************************************")

### Models

In [None]:
from sklearn.svm import SVC
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier
from sklearn.naive_bayes import GaussianNB
from sklearn.tree import DecisionTreeClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import BaggingClassifier
from sklearn.neural_network import MLPClassifier

In [None]:
models = [
    SVC(), 
    LinearSVC(),
    DecisionTreeClassifier(),
    RandomForestClassifier(), 
    LogisticRegression()
]

### Hyperparameter tunning

In [None]:
from hyperopt import tpe,hp,Trials
from hyperopt.fmin import fmin

In [None]:
seed=2

In [None]:
# Random Forest 

def objective_rf(params):
    est=int(params['n_estimators'])
    md=int(params['max_depth'])
    msl=int(params['min_samples_leaf'])
    mss=int(params['min_samples_split'])
    X_train = params['X_train']
    X_test = params['X_test']
    y_train = params['y_train']
    y_test = params['y_test']
    
    model=RandomForestClassifier(n_estimators=est,max_depth=md,min_samples_leaf=msl,min_samples_split=mss)
    
    model.fit(X_train,y_train)
    pred=model.predict(X_test)
    score=f1_score(y_test,pred)
    return score

def optimize_rf(trial, X_train, X_test, y_train, y_test):
    params={
        'n_estimators':hp.choice('n_estimators', [5, 10, 15, 20]),
        'max_depth':hp.choice('max_depth',[3, 4, 5, 10, 20]),
        'min_samples_leaf':hp.choice('min_samples_leaf', [1, 2, 3, 4]),
        'min_samples_split':hp.choice('min_samples_split', [1, 2, 3, 4]),
        'X_train': X_train,
        'X_test': X_test,
        'y_train': y_train,
        'y_test': y_test
    }
    
    best=fmin(
        fn=objective_rf,
        space=params,
        algo=tpe.suggest,trials=trial,
        max_evals=100,
        rstate=np.random.default_rng(seed)
    )
    
    return best

In [None]:
# Decision Tree 

def objective_dtree(params):
    md=int(params['max_depth'])
    criterion=params['criterion']
    X_train = params['X_train']
    X_test = params['X_test']
    y_train = params['y_train']
    y_test = params['y_test']
    
    model=DecisionTreeClassifier(max_depth=md,criterion=criterion)
    
    model.fit(X_train,y_train)
    pred=model.predict(X_test)
    score=f1_score(y_test,pred)
    return score

def optimize_dtree(trial, X_train, X_test, y_train, y_test):
    params={
        'max_depth':hp.choice('max_depth', [3, 4, 5, 10, 20]),
        'criterion': hp.choice('criterion', ["gini", "log_loss"]),
        'X_train': X_train,
        'X_test': X_test,
        'y_train': y_train,
        'y_test': y_test
    }
    
    best=fmin(
        fn=objective_dtree,
        space=params,
        algo=tpe.suggest,trials=trial,
        max_evals=100,
        rstate=np.random.default_rng(seed)
    )
    
    return best

In [None]:
# SVM

def objective_svc(params):
    
    c=params['C']
    kernel=params['kernel']
    X_train = params['X_train']
    X_test = params['X_test']
    y_train = params['y_train']
    y_test = params['y_test']
    
    if kernel=='rbf':
        model = SVC(kernel='rbf')
    else:
        model=SVC(kernel='linear', C=c)
    
    model.fit(X_train,y_train)
    pred=model.predict(X_test)
    score=f1_score(y_test,pred)
    return score

def optimize_svc(trial, X_train, X_test, y_train, y_test):
    params={
        'C': hp.choice('C', [0.0005, 0.001, 0.01, 0.1, 0.5]),
        'kernel': hp.choice('linear', 'rbf'),
        'X_train': X_train,
        'X_test': X_test,
        'y_train': y_train,
        'y_test': y_test,
    }
    
    best=fmin(
        fn=objective_svc,
        space=params,
        algo=tpe.suggest,trials=trial,
        max_evals=100,
        rstate=np.random.default_rng(seed)
    )
    
    return best

In [None]:
# SVM

def objective_linearSvc(params):
    
    penalty=params['penalty']
    X_train = params['X_train']
    X_test = params['X_test']
    y_train = params['y_train']
    y_test = params['y_test']
    
    if penalty=="l1":
        model=LinearSVC(penalty=penalty, dual=False)
    else:
        model=LinearSVC(penalty="l2")
    
    model.fit(X_train,y_train)
    pred=model.predict(X_test)
    score=f1_score(y_test,pred)
    return score

def optimize_linearSvc(trial, X_train, X_test, y_train, y_test):
    params={
        'penalty': hp.choice('penalty', ["l1", "l2"]),
        'X_train': X_train,
        'X_test': X_test,
        'y_train': y_train,
        'y_test': y_test,
    }
    
    best=fmin(
        fn=objective_linearSvc,
        space=params,
        algo=tpe.suggest,trials=trial,
        max_evals=100,
        rstate=np.random.default_rng(seed)
    )
    
    return best

In [None]:
# Logistic regression

def objective_lr(params):
    
    c=params['C']
    X_train = params['X_train']
    X_test = params['X_test']
    y_train = params['y_train']
    y_test = params['y_test']
    
    model=LogisticRegression(C=c)
    
    model.fit(X_train,y_train)
    pred=model.predict(X_test)
    score=f1_score(y_test,pred)
    return score

def optimize_lr(trial, X_train, X_test, y_train, y_test):
    params={
        'C': hp.choice('C', [0.1, 0.5, 0.7, 1]),
        'penalty': hp.choice('penalty',['l1', 'l2',]),
        'X_train': X_train,
        'X_test': X_test,
        'y_train': y_train,
        'y_test': y_test,
    }
    
    best=fmin(
        fn=objective_lr,
        space=params,
        algo=tpe.suggest,trials=trial,
        max_evals=100,
        rstate=np.random.default_rng(seed)
    )
    
    return best

### Main

In [None]:
training_set = pd.read_csv("motifs_all_electrodes_beta_m70_train_balanced.csv")
validation_set = pd.read_csv("motifs_all_electrodes_beta_m70_val_balanced.csv")

target = "label"
y_train = training_set[target]
X_train = np.array(training_set.drop(target, axis=1))
X_test = np.array(validation_set.drop(target, axis=1))
y_test = validation_set[target]

In [None]:
run_models(models, X, y, hyperparam_tunning=True)

In [None]:
def run_evaluation_result(X_input_train, X_input_test,  y_input_train, y_input_test, train_input_gender, validation_input_gender):
    f1_train_list = []
    f1_test_list = []
    acc_train_list = []
    acc_test_list = []
    gender_train_0_acc = []
    gender_train_1_acc = []
    gender_test_0_acc = []
    gender_test_1_acc = []

    for model in models:
        #print(model)
        trained_model, f1_train, f1_test, acc_train, acc_test, gender_train_acc, gender_test_acc = fit_evaluate_model(model, X_input_train, X_input_test, y_input_train, y_input_test, train_input_gender, validation_input_gender, print_evaluation=False)
        f1_train_list.append(f1_train)
        f1_test_list.append(f1_test)
        acc_train_list.append(acc_train)
        acc_test_list.append(acc_test)
        gender_train_0_acc.append(gender_train_acc[0])
        gender_train_1_acc.append(gender_train_acc[1])
        gender_test_0_acc.append(gender_test_acc[0])
        gender_test_1_acc.append(gender_test_acc[1])

    result_df = pd.DataFrame()
    result_df["model"] = models
    result_df["F1_train"] = f1_train_list
    result_df["Acc_train"] = acc_train_list
    result_df["Acc_train_female"] = gender_train_0_acc
    result_df["Acc_train_male"] = gender_train_1_acc
    result_df["F1_val"] = f1_test_list
    result_df["Acc_val"] = acc_test_list
    result_df["Acc_val_female"] = gender_test_0_acc
    result_df["Acc_val_male"] = gender_test_1_acc
    return result_df