This notebook demonstrates the "One versus the Rest" classification paradigm for Common Spatial Patterns based classification of EEG Motor Imagery Trials. It can identify four classes of Motor Imagery - 
* Left Hand
* Right Hand
* Tongue
* Foot

Four one versus the rest classifiers, one corresponding to each class in the dataset are constructed. Test instances are classified by all four classifiers - the final decision is that of the classifier which predicts a class with the highest probability

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import numpy as np
import math
import xgboost
import time
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import cohen_kappa_score
from sklearn.metrics import precision_recall_fscore_support

bands = 10

Utility functions to read data into memory, reshape data, and reformat data to suit one versus the rest classification

In [1]:
def get_data(subject):
    """
    Loads the augmented, filtered data into memory

    Parameters: 
    subject(int): The numeric identifier of each subject in the dataset

    Returns:
    numpy.ndarray: training data
    numpy.ndarray: training labels
    numpy.ndarray: test data
    numpy.ndarray: test labels
    """

    X_train = np.load('/content/drive/My Drive/EEG-MI/data/filtered/aug/signals/gaussian/X0{}T.npy'.format(subject))
    y_train = np.load('/content/drive/My Drive/EEG-MI/data/filtered/aug/labels/gaussian/y0{}T.npy'.format(subject))
    X_test = np.load('/content/drive/My Drive/EEG-MI/data/filtered/X0{}E.npy'.format(subject))
    y_test = np.load('/content/drive/My Drive/EEG-MI/data/filtered/y0{}E.npy'.format(subject))
    print(type(y_train))
    return X_train, y_train, X_test, y_test

def binarize_labels(y, class_1, class_2):
    """
    Converts an array of labels into binary representation e.g. the labels 770,771 will translate to 0,1.
    """

    y_bin = []
    for i in y:
        if i == class_1:
            y_bin.append(0)
        else:
            y_bin.append(1)
    return np.array(y_bin)

def reshape_trial_major(data):
    """
    Reshapes data to conform to the following axis representation - (trials, frequency bands, channels, samples) 
    e.g. a set with 20 trials, 10 frequency bands, 22 channels and 750 data points gets shaped into a matrix with dimensions (20, 10, 22, 750)

    Parameters: 
    data(numpy.ndarray): EEG signals with the following dimensions - (frequency bands, trials, channels, samples)

    Returns:
    numpy.ndarray: EEG signals with the following dimensions - (trials, frequency bands, channels, samples)
    """

    X = []
    for trial in range(data.shape[1]):
        band_data = []
        for band in range(10):
            band_data.append(data[band][trial])
        X.append(band_data)
    return np.array(X)

def reshape_band_major(data):
    """
    Reshapes data to conform to the following axis representation - (frequency bands, trials, channels, samples) 
    e.g. a set with 20 trials, 10 frequency bands, 22 channels and 750 data points gets shaped into a matrix with dimensions (10, 20, 22, 750)

    Parameters: 
    data(numpy.ndarray): EEG signals with the following dimensions - (trials, frequency bands, channels, samples)

    Returns:
    numpy.ndarray: EEG signals with the following dimensions - (frequency bands, trials, channels, samples)
    """

    X = []
    for band in range(10):
        trial_data = []
        for trial in range(data.shape[0]):
            trial_data.append(data[trial][band])
        X.append(trial_data)
    return np.array(X)

def get_OVR_data(y, ovr_class):
    """
    Reformats labels such that all labels corresponding to the provided class are encoded as 1 and the rest are encoded as 0

    Parameters:
    y(numpy.ndarray): labels array
    ovr_class: the class value to be encoded as 1

    Returns:
    numpy.ndarray: One versus the rest formatted labels
    """
    
    labels = []
    for i in y:
        if i == ovr_class:
            labels.append(1)
        else:
            labels.append(0)
    return np.array(labels)

Functions to construct the Common Spatial Patterns projection matrices and compute CSP features on input data

In [None]:
def get_CSP_mat(X,y,class1,class2):
    """
    Generates the CSP projection matrix for the input training instances

    Parameters:
    X(numpy.ndarray): Training instances corresponding to a particular frequency band
    y(numpy.ndarray): Training labels
    class1: the value of the label corresponding to the first class
    class2: the value of the label corresponding to the second class

    Returns:
    numpy.ndarray: The CSP Projection Matrix
    numpy.ndarray: The sorted Eigenvalues
    """

    R1=np.zeros((22,22)) #To store the sum of the covariance matrices of class 1
    R2=np.zeros((22,22)) #To store the sum of the covariance matrices of class 1

    c1=list(y).count(class1) 
    c2=list(y).count(class2)

    #Compute the sum of covariance matrices for each trial for each class
    for i in range(len(y)):
        temp=np.dot(X[i],np.transpose(X[i]))
        temp=np.divide(temp,np.trace(temp))
        if(y[i])==class1:
            R1=np.add(R1,temp)
        else:
            R2=np.add(R2,temp)

    #Compute average covariance matrices for both classes 
    R1=np.divide(R1,c1)
    R2=np.divide(R2,c2)
    R=np.add(R1,R2) #Composite Spatial Covariance

    e_val,e_vec=np.linalg.eig(R)
    #Sort in Descending Order of eigenvalues
    e_val=[(e_val[i],i) for i in range(len(e_val))]
    e_val=list(e_val)
    e_val.sort(key= lambda x:x[0])
    e_val.reverse()
    args=[e_val[i][1] for i in range(22)]
    e_val=[e_val[i][0] for i in range(22)]
    e_vec=e_vec[:,args]
    e_val=np.diag(np.power(e_val,-0.5))

    P=np.dot(e_val,np.transpose(e_vec))#Whitening Transformation Matrix

    S1=np.dot(P,np.dot(R1,np.transpose(P)))
    S2=np.dot(P,np.dot(R2,np.transpose(P)))

    #Simultaneous diagonalisation of S1 and S2
    e_val1,e_vec=np.linalg.eig(S1)
    e_val2,e_vec=np.linalg.eig(S2)

    a=np.argsort(e_val1) #Sort eigenvalues 
    W=np.dot(np.transpose(e_vec),P) #Final Projection Matrix

    return(W,a)

def get_CSP_band_features(data,W,a):
    """
    Computes the log variance of the CSP transformed signals

    Parameters: 
    data(numpy.ndarray): The input trials corresponding to a single frequency band
    W(numpy.ndarray): The CSP projection matrix for this frequency band
    a(numpy.ndarray): The sorted Eigenvalues

    Returns:
    numpy.ndarray: The log variance of the channels of CSP transformed signals corresponding to the two minimum and maximum eigenvalues
    """

    reconstructed_eeg=np.dot(W,data)
    sources=[reconstructed_eeg[k] for k in [a[0],a[1],a[20],a[21]]]
    sources_var=[]
    s=0
    for j in sources:
        k=np.var(j)
        s+=k
        sources_var.append(k)
    return(np.log(np.divide(sources_var,s)))

def get_test_features(data, W, a):
    """
    Generates the CSP features for all frequency bands of the input signals

    Parameters:
    data(numpy.ndarray): The input trials corresponding to all frequency bands
    W(numpy.ndarray): The CSP projection matrix for each frequency band
    a(numpy.ndarray): The sorted Eigenvalues for each frequency band

    Returns:
    numpy.ndarray: The log variance of the channels of CSP transformed signals corresponding to the two minimum and maximum eigenvalues for all frequency bands
    """
    
    features = []
    for i in range(10):
        features.extend(get_CSP_band_features(data[i], W[i], a[i]))
    return np.array(features)

def get_features(X_train, X_test, y_train):
    """
    Computes the train and test CSP features for the input train and test signals

    Parameters: 
    X_train(numpy.ndarray): The input training set of trials
    X_test(numpy.ndarray): The input test set of trials
    y_train(numpy.ndarray): The labels corresponding to the training set 

    Returns:
    numpy.ndarray: the log variance features of the CSP transformed training signals
    numpy.ndarray: the log variance features of the CSP transformed test signals
    numpy.ndarray: the CSP Projection Matrices for each frequency band
    numpy.ndarray: the eigenvalues for each frequency band
    """

    W = []
    a = []

    for band in range(bands):
        W_band, a_band = get_CSP_mat(X_train[band],y_train, 0, 1)
        W.append(W_band)
        a.append(a_band)

    features_train = []
    for i in range(X_train.shape[1]):
        features = []
        for band in range(bands):
            features.extend(get_CSP_band_features(X_train[band][i], W[band], a[band]))
        features_train.append(features)

    features_train = np.array(features_train)

    features_test = []
    for i in range(X_test.shape[1]):
        features = []
        for band in range(bands):
            features.extend(get_CSP_band_features(X_test[band][i], W[band], a[band]))
        features_test.append(features)

    features_test = np.array(features_test)

    return(features_train, features_test, W, a)

Functions to train the one verus the rest classifiers

In [None]:
def build_OVR_clf(X_train, y_train, X_test, y_test, lr, depth, est, colsample):
    """
    Trains a one versus the rest classifier on the provided training instances.

    Parameters:
    X_train(numpy.ndarray): the training instances
    y_train(numpy.ndarray): the training labels
    X_test(numpy.ndarray): the test instancs
    y_test(numpy.ndarray): the test labels
    lr(float): learning rate of XGBoost classifier
    depth(int): the max depth of a tree for the XGBoost classifier
    est(int): number of estimators for the XGBoost classifier
    colsample(float): the column subsampling ratio of the XGBoost classfier
    """
    f_train = X_train
    f_test = X_test
    l_train = y_train
    l_test = y_test
    f_train, f_test, W, a = get_features(f_train, f_test, l_train)
    clf = xgboost.XGBClassifier()
    clf = xgboost.XGBClassifier(max_depth=depth, learning_rate=lr, n_estimators=est, objective='binary:logistic', subsample=0.8, colsample_bytree=colsample, silent=True, seed=42)
    clf.fit(f_train, l_train)
    return clf, W, a

def build_classifiers(subject, lr, depth, est, colsample):
    """
    Constructs  4 one versus the rest classifiers, one for each class represented in the dataset

    Parameters: 
    subject(int): the numeric identifier of the subject
    lr(float): learning rate of XGBoost classifier
    depth(int): the max depth of a tree for the XGBoost classifier
    est(int): number of estimators for the XGBoost classifier
    cols(float): the column subsampling ratio of the XGBoost classfier

    Returns:
    dict: the trained level1 and leve2 classifiers
    numpy.ndarray: the test instances
    numpy.ndarray: the test labels
    """
    
    X_train, y_train, X_test, y_test = get_data(subject)
    models = {}
    labels = [769,770,771,772]
    for i in range(4):
        f_train = get_OVR_data(y_train, labels[i])
        clf, W, a = build_OVR_clf(X_train, f_train, X_test, y_test, lr, depth, est, colsample)
        models[labels[i]] = {'clf': clf, 'W':W, 'a':a}

    return models, X_test, y_test

Functions to make predictions on test data and evaluate model performance

In [None]:
def score(labels_pred, labels_true):
    """
    Returns the accuracy of the model 

    Parameters:
    labels_pred(numpy.ndarray): The predicted class labels
    labels_true(numpy.ndarray): The ground truth labels

    Returns:
    float: the prediction accuracy
    """
    scores = 0
    for i in range(len(labels_pred)):
        if labels_pred[i] == labels_true[i]:
            scores+=1
    return(scores/len(labels_pred))

def predict(models, X_test, y_test):
    """
    Uses the built models to predict class labels of test instances. Each test instances is classfied by all four one versus the rest classifiers. The final decision is that of the classifier which predicts a class with the highest probability.
    
    Parameters:
    models(dict): dictionary of all models
    X_test(numpy.ndarray): the test instances
    y_test(numpy.ndarrat): the labels corresponding to the test set

    Returns:
    float: the prediction accuracy of the model
    float: the cohen kappa score of the model
    np.ndarray: the average precision, recall and f1 scores of the model
    """
    labels = [769, 770, 771 ,772]
    X_test = reshape_trial_major(X_test)
    pred = []
  
    start = time.time()
    for i in range(len(y_test)):
        scores = []
        for label in [769, 770, 771, 772]:
            clf = models[label]
            W = clf['W']
            a = clf['a']
            sample = get_test_features(X_test[i], W, a)
            scores.append(clf['clf'].predict_proba([sample])[0][1])
        pred.append(labels[np.where(scores == np.amax(scores))[0][0]])
    end = time.time() - start
    return score(pred, y_test), cohen_kappa_score(pred, y_test), end/len(y_test), np.mean(precision_recall_fscore_support(y_test, pred)[0]), np.mean(precision_recall_fscore_support(y_test, pred)[1]), np.mean(precision_recall_fscore_support(y_test, pred)[2])          

Builds the classifiers and evaluates them on each subjects data. The parameter values used for the XGBoost classifiers -

* Max depth - 3
* Number of Estimator - 100
* Column Subsampling Ratio - 0.5

In [None]:
kappas = []
accs = []
p = []
r = []
f = []
for subject in range(1,10):
    models, X_test, y_test = build_classifiers(subject, 0.01, 3, 100, 0.5)
    acc, kappa, test_time, precision, recall, f1 = predict_OVR(models, X_test, y_test)
    kappas.append(kappa)
    accs.append(acc)
    p.append(precision)
    r.append(recall)
    f.append(f1)
    print(subject, kappa, acc, precision, recall, f1)
print(np.mean(kappas), np.mean(accs))