In [None]:
# https://www.kaggle.com/code/nadosh/my-project-bci-2008

import sys, os #, math, time

sys.path.insert(0, os.path.dirname(os.getcwd()))

from src.thot.sesh import *
from src.thot.catch_features import *


In [None]:
from sklearn import model_selection, metrics
from sklearn import discriminant_analysis, linear_model, ensemble, svm
import re

# Helper class to train sklearn gridsearchcv models & report metrics
class gridsearchcv_model:
#   model: saved model
#   name: name for model
#   train, val: object with {name, predictions, mse OR accuracy} 

    def __init__(self, model, X_train, Y_train, X_val, Y_val, parameter_matrix = {}, is_classification=False, cv = 4):
        self.is_classification = is_classification
        self.train_model(model, X_train, Y_train, X_val, Y_val, parameter_matrix, cv)
        
    # Trains model using a training set and predicts a validation set
    def train_model(self, model, X_train, Y_train, X_val, Y_val, parameter_matrix = {}, cv = 4):
        if self.is_classification:
            ml_model = model_selection.GridSearchCV(model, parameter_matrix, cv=cv, scoring='f1')
        else:
            ml_model = model_selection.GridSearchCV(model, parameter_matrix, cv=cv, scoring='neg_mean_squared_error')
        
        ml_model.fit(X_train, Y_train)
        
        self.model = ml_model.best_estimator_
        self.name = re.compile("(.*?)\s*\(").match(str(self.model)).group(1)
        
        self.train = {'name': 'train'}
        self.val = {'name': 'val'}
        
        self.calculate_error(self.train, X_train, Y_train, self.train['name'])
        self.calculate_error(self.val, X_val, Y_val, self.val['name'])
        
        return ml_model
    
    def calculate_error(self, var, X_set, Y_set, name):
        var['name'] = name
        var['predictions'] = self.model.predict(X_set)
        
        if self.is_classification:
            var['accuracy'] = metrics.f1_score(Y_set, var['predictions'])
        else:
            var['mse'] = metrics.mean_squared_error(Y_set, var['predictions'])
        
        self.print_error(var)
        
    # Prints error metrics
    def print_error(self, var):
        print(f"{self.name} ({var['name']})")
        
        if self.is_classification:
            print(f"Accuracy: {var['accuracy']:.2%}")
        else:
            print(f"MSE: {var['mse']:.2%}")

In [None]:
# Load train and test data into dataframes

path              = '../data/data.zip'
df_train_original = pickle_in_zip(path, "epoched_train.pkl")
df_test           = pickle_in_zip(path, "epoched_test.pkl")

In [None]:
# Create column 'pid' which is the patient ID 1 through 9
df_train_original['pid'] = [int(df_train_original['patient_id'][x][2]) for x in range(len(df_train_original))]
df_test['pid'] = [int(df_test['patient_id'][x][2]) for x in range(len(df_test))]

# Create column 'trial_id' which is the trial 1 through 3
df_train_original['trial_id'] = [int(df_train_original['patient_id'][x][-2]) for x in range(len(df_train_original))]
df_test['trial_id'] = [int(df_test['patient_id'][x][-2]) for x in range(len(df_test))]

In [None]:
# Use trials 1&2 for training, trial 3 for validation (mirrors process to create Kaggle test set)
df_train = df_train_original[df_train_original['trial_id'] != 3]
df_train = df_train.reindex(np.random.permutation(df_train.index)).reset_index(drop = True)

df_val = df_train_original[df_train_original['trial_id'] == 3]
df_val = df_val.reindex(np.random.permutation(df_val.index)).reset_index(drop=True)

In [None]:
# Prepare data for training across all subjects
y_train = df_train["event_type"].values.astype(float)
y_val   = df_val["event_type"].values.astype(float)

X_train = df_train.drop(columns = ["patient_id", "start_time", "event_type", "pid", "trial_id"])
X_val   = df_val  .drop(columns = ["patient_id", "start_time", "event_type", "pid", "trial_id"])
X_test  = df_test .drop(columns = ["patient_id", "start_time", "pid", "trial_id"])

In [None]:
x_train_nn  = np.array(list(X_train.apply(lambda x : np.concatenate(x), axis = 1)))
x_val_nn    = np.array(X_val.apply  (lambda x : np.concatenate(x), axis = 1).values.tolist())
x_test_nn   = np.array(X_test.apply (lambda x : np.concatenate(x), axis = 1).values.tolist())

v1 = np.array([np.concatenate(x) for x in X_train.values])

v1

# display(x_train_nn)
# print(x_train_nn.shape)

# sum(sum(v1 - x_train_nn))

# np.concatenate(list(X_train.values), axis = 1)

# np.shape(np.array([*X_train.values]))

# [ for x in ] # X_train.apply(lambda x : np.concatenate(x), axis = 1).values.tolist()


In [None]:
# Logistic Regression model
param_matrix    = {}
logistic_model  = gridsearchcv_model(linear_model.LogisticRegression(), x_train_nn, y_train, x_val_nn, y_val,
                                    parameter_matrix=param_matrix, is_classification=True)
rf_model        = gridsearchcv_model(ensemble.RandomForestClassifier(), x_train_nn, y_train, x_val_nn, y_val,
                              parameter_matrix=param_matrix, is_classification=True)
svm_model       = gridsearchcv_model(svm.SVC(), x_train_nn, y_train, x_val_nn, y_val,
                                     parameter_matrix=param_matrix, is_classification=True)
lda_model       = gridsearchcv_model(discriminant_analysis.LinearDiscriminantAnalysis(), x_train_nn, y_train,
                                     x_val_nn, y_val, parameter_matrix=param_matrix, is_classification=True)