In [1]:
# Import all dependencies

from __future__ import print_function, absolute_import
import time
import numpy as np
import matplotlib.pyplot as plt
from scipy import signal
from sklearn.svm import SVC
from sklearn.model_selection import cross_val_score, StratifiedKFold, train_test_split
from sklearn.metrics import matthews_corrcoef, balanced_accuracy_score
import multiprocessing
import scipy.stats
import sys

In [2]:
# Load the dataset
import src.data_loader as dl
subIDs, data, labels = dl.load_processed_data_N_subjects_allchans('data/', Nsub=15)

### Obtain a STFT for a given epoch

In [3]:
def get_stft(data_array):
    f, t, data_stft = np.abs(signal.stft(data_array, fs=100, nperseg=64, noverlap=48, axis=-1))
    data_stft = np.moveaxis(data_stft, 1, -1)
    return data_stft

### Normalize the STFT to the subject's overall EEG spectrum (mean, std)

In [4]:
def get_stft_zscore(stft):
    
    # move trials to the last axis
    data_stft_mean = np.mean(data_stft, axis=(2,3))
    data_stft_mean = np.repeat(data_stft_mean[:,:,np.newaxis], data_stft.shape[2], axis=2)
    data_stft_mean = np.repeat(data_stft_mean[:,:,:, np.newaxis], data_stft.shape[3], axis=3)

    data_stft_std = np.std(data_stft, axis=(2,3))
    data_stft_std = np.repeat(data_stft_std[:,:,np.newaxis], data_stft.shape[2], axis=2)
    data_stft_std = np.repeat(data_stft_std[:,:,:, np.newaxis], data_stft.shape[3], axis=3)
    
    stft_norm = (data_stft - data_stft_mean) / data_stft_std
    
    return stft_norm

### Split the data into train and test based on subjects

In [5]:
for sub in range(len(data)):
    y = labels[sub]
    label_array = np.array([])
    for i in range(y.shape[0]):
        l_t = np.where(y[i,:]==1)
        label_array = np.append(label_array, l_t)
    y = label_array.astype(int)
    y[y==6]=0
    y[y==9]=1
    
    print(sub, np.unique(y))

0 [0]
1 [0]
2 [0]
3 [0 1]
4 [0]
5 [0]
6 [0]
7 [ 0 11]
8 [ 0 15]
9 [0]
10 [0]
11 [0]
12 [0]
13 [0]
14 [0 1 8]


In [6]:
xtrain = [0, 1, 2, 3, 4, 5]
xtest = [14]

In [7]:
def generate_features(stft_z, labels_):
      
    # Then get the frames as feature vectors
    data_array = label_array = np.array([])
    for trial in np.arange(stft_z.shape[-1]):
        
        this_array = stft_z[:,:,:,trial]
        
        for timepoint in np.arange(stft_z.shape[-2]):
            this_array = stft_z[:,]
            data_array = np.append(data_array, stft_z[:, :, timepoint, trial].ravel(), axis=0)
            l_t = np.where(labels_[trial,:]==1)
            label_array = np.append(label_array, l_t)
    
    X = np.reshape(data_array, (-1, stft_z.shape[0]*stft_z.shape[1]))
    y = label_array.astype(int)
    y[y==6] = 0
    y[y==9] = 1

    return X, y

In [13]:
for sub in xtrain:
    print("Subject {} out of {}".format(sub+1, len(xtrain)))
    data_stft = get_stft(data[sub])
    data_stft_norm = data_stft

    if sub==0:
        X, y = generate_features(data_stft_norm, labels[sub])
    else:
        X_sub, y_sub = generate_features(data_stft_norm, labels[sub])
        X = np.append(X, X_sub, axis=0)
        y = np.append(y, y_sub, axis=0)
        

# Randomize the training set
ds = np.empty((X.shape[0], X.shape[1]+1))
ds[:,:-1]=X
ds[:,-1] = y
np.random.seed=42
np.random.shuffle(ds)

clf = SVC(C=1.0, cache_size=200, class_weight='balanced', coef0=0.0,
  decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
  max_iter=-1, probability=True, random_state=42, shrinking=True, tol=0.001,
  verbose=0)

clf.fit(ds[:,:-1],ds[:,-1])

Subject 1 out of 6
Subject 2 out of 6
Subject 3 out of 6
Subject 4 out of 6
Subject 5 out of 6
Subject 6 out of 6


SVC(C=1.0, cache_size=200, class_weight='balanced', coef0=0.0,
    decision_function_shape='ovr', degree=3, gamma='scale', kernel='rbf',
    max_iter=-1, probability=True, random_state=42, shrinking=True, tol=0.001,
    verbose=0)

In [16]:
def generate_test_features(stft_epoch):
    data_array = label_array = np.array([])        
    for timepoint in np.arange(stft_epoch.shape[-1]):
        data_array = np.append(data_array, stft_epoch[:, :, timepoint].ravel(), axis=0)
    X = np.reshape(data_array, (-1, stft_epoch.shape[0]*stft_epoch.shape[1]))
    return X

In [17]:
scores = np.empty((len(xtest),))

for sub in xtest:
    print("Subject {} out of {}".format(sub+1, len(xtest)))
    data_stft = get_stft(data[sub])
    data_stft_norm = data_stft
    y_m = np.array([])
    for epoch in range(data_stft_norm.shape[-1]):
        tpc = np.floor(data_stft_norm.shape[-1]/100)
#         if epoch%10==0:
#             print("Epoch {} out of {}".format(epoch+1, data_stft_norm.shape[-1]))
        X = generate_test_features(data_stft_norm[:,:,:,epoch])
        y_pred = clf.predict(X)
        y_m = np.append(y_m, np.median(y_pred))
    y = labels[sub]
    
    label_array = np.array([])
    for i in range(y.shape[0]):
        l_t = np.where(y[i,:]==1)
        label_array = np.append(label_array, l_t)
            
    y_test = label_array.astype(int)
    y_test[y_test == 6] = 0
    y_test[y_test == 9] = 1
        
    score = balanced_accuracy_score(y_m[y_test<=2].astype(int), y_test[y_test<=2].astype(int))
    print("Score is %0.3f \n\n" %(score))

Subject 15 out of 1
Score is 0.777 




In [39]:
from sklearn.metrics import classification_report
print(classification_report(y_m[y_test<=2].astype(int), y_test[y_test<=2].astype(int)))

              precision    recall  f1-score   support

           0       1.00      0.55      0.71       186
           1       0.02      1.00      0.05         2

    accuracy                           0.56       188
   macro avg       0.51      0.78      0.38       188
weighted avg       0.99      0.56      0.71       188

