In [None]:
from functools import partial
from natsort import natsorted
import numpy as np
import os
import pywt
from scipy.signal import welch

#Machine Learning
from sklearn.model_selection import KFold
from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC
from sklearn.metrics import mean_absolute_error
from sklearn.metrics import accuracy_score

## Feature Engineering

In [None]:
def feature_engineering(data_set, target_path):
    #load subject_data one by one
    for path in natsorted(os.listdir(data_set))[:-1]:
        file = np.load(os.path.join(data_set,path), allow_pickle=True)

        #VAR
        np_var = partial(np.var, axis=1)
        a, b, c = np.split(file, [55, 110], axis=1)
        VAR = np.hstack((
            np_var(file),
            np_var(a),
            np_var(b),
            np_var(c))
            )
        VAR = (VAR-np.mean(VAR))/np.std(VAR) #normalize

        #PSD (Welch)
        freq, power = welch(file, fs = 250)
        PSD = power[:,:25].flatten() #take first 25 components (~35Hz)
        PSD = (PSD-np.mean(PSD))/np.std(PSD) #normalize

        #DWT (only use approximate coefficients)
        (aC, dC1, dC2, dC3) = pywt.wavedec(file, wavelet = "db8", mode='sym', level=3)
        DWT = aC.flatten()
        DWT = (DWT-np.mean(DWT))/np.std(DWT) #normalize

        sample = np.hstack((VAR, PSD, DWT))
        np.save("{}/{}".format(target_path, os.path.basename(path)), sample)
    #np.save("{}/{}".format(target_path, os.path.basename(path)), np.load(data_set+"/labels.npy"))

## Create Datasets

In [None]:
data_ml_multi = "C:/Users/Daydreamore/Desktop/Semester/BCI/data_ml_multi"
train_multi = "C:/Users/Daydreamore/Desktop/Semester/BCI/train_multi"
val_multi = "C:/Users/Daydreamore/Desktop/Semester/BCI/val_multi"
feature_engineering(train_multi, data_ml_multi)
feature_engineering(val_multi, data_ml_multi)

#Combine labes of train and val (we use CV for the ML part)
labels1 = np.load(train_multi+"/labels.npy")
labels2 = np.load(val_multi+"/labels.npy")
labels = np.hstack((labels1, labels2))
np.save("{}/{}".format(data_ml_multi, os.path.basename(path)), labels)

## Load datasets. 
Randomly select equal number of control trials for each participant to avoid unbalanced dataset.

In [None]:
data_ml_multi = "C:/Users/Daydreamore/Desktop/Semester/BCI/data_ml_multi"
X_train = np.array([np.load(data_ml_multi + "/" + path) for path in natsorted(os.listdir(data_ml_multi))[:-1]])
Y_train = np.load(data_ml_multi+"/"+"labels.npy")

#For control randomly sample 325 trials (to avoid imbalanced dataset)
for i in range(3):
    target_ixs = np.where(Y_train == i)

    if i == 0: 
        target_ixs_shuffled = np.random.choice(target_ixs[0], size = 325, replace = False)
        selected_samples = X_train[target_ixs_shuffled]
        selected_labels = Y_train[target_ixs_shuffled]
        X_train_selected = selected_samples
        Y_train_selected = selected_labels
    else:
        selected_samples = X_train[target_ixs]
        selected_labels = Y_train[target_ixs]
        X_train_selected = np.concatenate((X_train_selected, selected_samples))
        Y_train_selected = np.concatenate((Y_train_selected, selected_labels))

Sanity Check to see if dimensions are as expected:

In [None]:
X_train_selected.shape, Y_train_selected.shape

## Model Fitting
Try to overfit on training data to see if the SVM can learn to tell the conditions apart.

In [None]:
from sklearn.svm import SVC
overfit = SVC()#kernel = "linear")
overfit.fit(X_train_selected, Y_train_selected)
yhat = overfit.predict(X_train_selected)
acc_svm = accuracy_score(Y_train_selected, yhat)
print(acc_svm)

Combine hyperparameter search with model selection through nested cross-validation and put out winning combinations.

In [None]:
outer_results = list()
#Outer CV
cv_outer = KFold(n_splits=5, shuffle=True, random_state=42)
for train_ix, val_ix in cv_outer.split(X_train_selected): #Splitting fault?
    #split data
    x_train, x_val = X_train_selected[train_ix, :], X_train_selected[val_ix, :]
    y_train, y_val = Y_train_selected[train_ix], Y_train_selected[val_ix]

    #Inner CV
    cv_inner = KFold(n_splits=3, shuffle=True, random_state=42)
    model = SVC()
    #define search space
    param_grid = dict()
    param_grid["C"] = np.logspace(-18, 9, num=9, base=2).tolist() 
    param_grid["kernel"] = ["rbf", "linear"] # "poly"
    param_grid["gamma"] = np.logspace(-18, 9, num=9, base=2).tolist() 
    search = GridSearchCV(model, param_grid, scoring="accuracy", cv=cv_inner, refit=True, error_score="raise")
    result = search.fit(x_train, y_train) #runs fit with all sets of parameters
    best_model = result.best_estimator_ #get the best performing model fit on the whole training set
    yhat = best_model.predict(x_val) #evaluate model on the hold out dataset
    acc_svm = accuracy_score(y_val, yhat) #evaluate the model
    outer_results.append(acc_svm) #store result
    print(">acc=%.3f, est=%.3f, cfg=%s" % (acc_svm, result.best_score_, result.best_params_))

#summarize estimated performance of the model
outer_results = np.array(outer_results)
print("Accuracy: %.3f (%.3f)" % (np.mean(outer_results), np.std(outer_results)))