In [None]:
def whitening(sigma):
    ''' Calculate a whitening matrix for covariance matrix sigma. '''
    U, l, _ = linalg.svd(sigma)
    return U.dot( np.diag(l ** -0.5) )

In [None]:
from numpy import linalg

def cov(trials):
    ''' Calculate the covariance for each trial and return their average '''
    ntrials = trials.shape[2]
    covs = [ trials[:,:,i].dot(trials[:,:,i].T) / nsamples for i in range(ntrials) ]
    return np.mean(covs, axis=0)

In [None]:
def csp(trials_r, trials_f):
    '''
    Calculate the CSP transformation matrix W.
    arguments:
        trials_r - Array (channels x samples x trials) containing right hand movement trials
        trials_f - Array (channels x samples x trials) containing foot movement trials
    returns:
        Mixing matrix W
    '''
    cov_r = cov(trials_r)
    cov_f = cov(trials_f)
    P = whitening(cov_r + cov_f)
    B, _, _ = linalg.svd( P.T.dot(cov_f).dot(P) )
    W = P.dot(B)
    return W

In [None]:
import scipy.signal 
from scipy.signal import butter,lfilter

def bandpass(trials, lo, hi, sample_rate):
    '''
    Designs and applies a bandpass filter to the signal.
    
    Parameters
    ----------
    trials : 3d-array (channels x samples x trials)
        The EEGsignal
    lo : float
        Lower frequency bound (in Hz)
    hi : float
        Upper frequency bound (in Hz)
    sample_rate : float
        Sample rate of the signal (in Hz)
    
    Returns
    -------
    trials_filt : 3d-array (channels x samples x trials)
        The bandpassed signal
    '''

    # The iirfilter() function takes the filter order: higher numbers mean a sharper frequency cutoff,
    # but the resulting signal might be shifted in time, lower numbers mean a soft frequency cutoff,
    # but the resulting signal less distorted in time. It also takes the lower and upper frequency bounds
    # to pass, divided by the niquist frequency, which is the sample rate divided by 2:
    a, b = scipy.signal.iirfilter(4, [lo/(sample_rate/2.0), hi/(sample_rate/2.0)],ftype='butter')
    # Applying the filter to each trial
    ntrials = trials.shape[2]
    trials_filt = np.zeros((nchannels, nsamples, ntrials))
    for i in range(ntrials):
        trials_filt[:,:,i] = scipy.signal.filtfilt(a, b, trials[:,:,i], axis=1)
    
    return trials_filt

In [None]:
def logvar(trials):
    '''
    Calculate the log-var of each channel.
    
    Parameters
    ----------
    trials : 3d-array (channels x samples x trials)
        The EEG signal.
        
    Returns
    -------
    logvar - 2d-array (channels x trials)
        For each channel the logvar of the signal
    '''
    return np.log(np.var(trials, axis=1))

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import os
import numpy as np
import pandas as pd
import warnings
import scipy.io
warnings.filterwarnings('ignore')
d=r'/content/drive/MyDrive/Arm-Reaching/Session 1/EEG_session1_sub3_reaching_realMove.mat'

In [None]:
m = scipy.io.loadmat(d, struct_as_record=True)

In [None]:
ch=['ch1', 'ch2', 'ch3', 'ch4', 'ch5', 'ch6', 'ch7', 'ch8', 'ch9', 'ch10', 'ch11', 'ch12', 'ch13', 'ch14', 'ch15', 'ch16', 'ch17', 'ch18', 'ch19', 'ch20', 'ch21', 'ch22', 'ch23', 'ch24', 'ch25', 'ch26', 'ch27', 'ch28', 'ch29', 'ch30', 'ch31', 'ch32', 'ch33', 'ch34', 'ch35', 'ch36', 'ch37', 'ch38', 'ch39', 'ch40', 'ch41', 'ch42', 'ch43', 'ch44', 'ch45', 'ch46', 'ch47', 'ch48', 'ch49', 'ch50', 'ch51', 'ch52', 'ch53', 'ch54', 'ch55', 'ch56', 'ch57', 'ch58', 'ch59', 'ch60']

In [None]:
arr=[m[i] for i in ch ]  
arr=np.hstack(arr)
EEG=arr.T
nchannels, nsamples =EEG.shape

In [None]:
sample_rate = m['nfo']['fs'][0][0][0][0]
channel_names = [s[0] for s in m['nfo']['clab'][0][0][0]]
event_onsets = m['mrk'][0][0][0]
event_codes = m['mrk'][0][0][1]
labels = np.zeros((1, nsamples), int)
labels[0, event_onsets] = event_codes
cl_lab = [s[0] for s in m['nfo']['className'][0][0][0]]
nclasses = len(cl_lab)
nevents = len(event_onsets)

In [None]:
# Print some information
print('Shape of EEG:', EEG.shape)
print('Sample rate:', sample_rate)
print('Number of channels:', nchannels)
print('Channel names:', channel_names)
print('Number of events:', len(event_onsets))
print('Event codes:', np.unique(event_codes))
print('Class labels:', cl_lab)
print('Number of classes:', nclasses)

Shape of EEG: (60, 8394200)
Sample rate: 2500
Number of channels: 60
Channel names: ['Fp1', 'AF7', 'AF3', 'AFz', 'F7', 'F5', 'F3', 'F1', 'Fz', 'FT7', 'FC5', 'FC3', 'FC1', 'T7', 'C5', 'C3', 'C1', 'Cz', 'TP7', 'CP5', 'CP3', 'CP1', 'CPz', 'P7', 'P5', 'P3', 'P1', 'Pz', 'PO7', 'PO3', 'POz', 'Fp2', 'AF4', 'AF8', 'F2', 'F4', 'F6', 'F8', 'FC2', 'FC4', 'FC6', 'FT8', 'C2', 'C4', 'C6', 'T8', 'CP2', 'CP4', 'CP6', 'TP8', 'P2', 'P4', 'P6', 'P8', 'PO4', 'PO8', 'O1', 'Oz', 'O2', 'Iz']
Number of events: 1
Event codes: [ 8 11 21 31 41 51 61]
Class labels: ['Forward', 'Backward', 'Left', 'Right', 'Up', 'Down', 'Rest']
Number of classes: 7


In [None]:
cd=[11, 21, 31, 41, 51, 61,8]
cl_lab=[ 'Forward','Backward','Left', 'Right','Up', 'Down','Rest']

In [None]:
# Dictionary to store the trials in, each class gets an entry
trials = {}

# The time window (in samples) to extract for each trial, here 0.5 -- 2.5 seconds
win = np.arange(int(0.5*sample_rate), int(2.5*sample_rate))

# Length of the time window
nsamples = len(win)

# Loop over the classes (right, foot)
for cl, code in zip(cl_lab, cd):
    
    # Extract the onsets for the class
    cl_onsets = event_onsets[event_codes == code]
    
    # Allocate memory for the trials
    trials[cl] = np.zeros((nchannels, nsamples, len(cl_onsets)))
    
    # Extract each trial
    for i, onset in enumerate(cl_onsets):
        trials[cl][:,:,i] = EEG[:, win+onset]

In [None]:
cl1=cl_lab[0]
cl2=cl_lab[1]
cl3=cl_lab[2]
cl4=cl_lab[3]
cl5=cl_lab[4]
cl6=cl_lab[5]

In [None]:
labels[0, event_onsets] = event_codes

In [None]:
#Apply the function
trials_filt = {cl1: bandpass(trials[cl1], 8, 30, sample_rate),
               cl2: bandpass(trials[cl2], 8, 30, sample_rate),
               cl3: bandpass(trials[cl3], 8, 30, sample_rate),
               cl4: bandpass(trials[cl4], 8, 30, sample_rate),
               cl5: bandpass(trials[cl5], 8, 30, sample_rate),
               cl6: bandpass(trials[cl6], 8, 30, sample_rate)}

In [None]:
trials_filt[cl1].shape

(60, 5000, 50)

In [None]:
def apply_mix(W, trials):
    ''' Apply a mixing matrix to each trial (basically multiply W with the EEG signal matrix)'''
    ntrials = trials.shape[2]
    trials_csp = np.zeros((nchannels, nsamples, ntrials))
    for i in range(ntrials):
        trials_csp[:,:,i] = W.T.dot(trials[:,:,i])
    return trials_csp

In [None]:
#Percentage of trials to use for training (50-50 split here)
train_percentage = 0.5

# Calculate the number of trials for each class the above percentage boils down to
ntrain_1 = int(trials_filt[cl1].shape[2] * train_percentage)
ntrain_2 = int(trials_filt[cl2].shape[2] * train_percentage)
ntrain_3=  int(trials_filt[cl3].shape[2] * train_percentage)
ntrain_4 = int(trials_filt[cl4].shape[2] * train_percentage)
ntrain_5 = int(trials_filt[cl5].shape[2] * train_percentage)
ntrain_6 = int(trials_filt[cl6].shape[2] * train_percentage)




ntest_1 = trials_filt[cl1].shape[2] - ntrain_1
ntest_2 = trials_filt[cl2].shape[2] - ntrain_2
ntest_3 = trials_filt[cl3].shape[2] - ntrain_3
ntest_4 = trials_filt[cl4].shape[2] - ntrain_4
ntest_5 = trials_filt[cl5].shape[2] - ntrain_5
ntest_6 = trials_filt[cl6].shape[2] - ntrain_6


# Splitting the frequency filtered signal into a train and test set
train = {cl1: trials_filt[cl1][:,:,:ntrain_1],
         cl2: trials_filt[cl2][:,:,:ntrain_2],
         cl3: trials_filt[cl3][:,:,:ntrain_3],
         cl4: trials_filt[cl4][:,:,:ntrain_4],
         cl5: trials_filt[cl5][:,:,:ntrain_5],
         cl6: trials_filt[cl6][:,:,:ntrain_6],}

test = {cl1: trials_filt[cl1][:,:,ntrain_1:],
        cl2: trials_filt[cl2][:,:,ntrain_2:],
        cl3: trials_filt[cl3][:,:,ntrain_3:],
        cl4: trials_filt[cl4][:,:,ntrain_4:],
        cl5: trials_filt[cl5][:,:,ntrain_5:],
        cl6: trials_filt[cl6][:,:,ntrain_6:],}
    
#print (train.items)
# Train the CSP on the training set only
W = csp(train[cl1], train[cl2])

# Apply the CSP on both the training and test set
train[cl1] = apply_mix(W, train[cl1])
train[cl2] = apply_mix(W, train[cl2])
test[cl1] = apply_mix(W, test[cl1])
test[cl2] = apply_mix(W, test[cl2])

# Select only the first and last components for classification
comp = np.array([0,1,2,3,4,5,6,7,8,9,10])
train[cl1] = train[cl1][comp,:,:]
train[cl2] = train[cl2][comp,:,:]
test[cl1] = test[cl1][comp,:,:]
test[cl2] = test[cl2][comp,:,:]
print (train[cl2].shape)

# Calculate the log-var
train[cl1] = logvar(train[cl1])
train[cl2] = logvar(train[cl2])
test[cl1] = logvar(test[cl1])
test[cl2] = logvar(test[cl2])
print (train[cl2].shape)

(11, 5000, 25)
(11, 25)


In [None]:
#reshape the training data 
channels = [ np.concatenate([train[cl1][i],train[cl2][i]]) for i in range(train[cl2].shape[0])]
xtrain=np.stack(channels, axis=1)
ytrain=np.concatenate([np.zeros(len(train[cl1][0])),np.ones(len(train[cl2][0]))])

#reshape test data 
channels = [ np.concatenate([train[cl1][i],train[cl2][i]]) for i in range(train[cl2].shape[0])]
xtest=np.stack(channels, axis=1)
ytest=np.concatenate([np.zeros(len(test[cl1][0])),np.ones(len(test[cl2][0]))])

In [None]:
xtrain.shape , ytrain.shape , xtest.shape , ytest.shape

((50, 11), (50,), (50, 11), (50,))

In [None]:

xtest.shape

(50, 11)

In [None]:
from sklearn import svm
from sklearn import metrics

#Create a svm Classifier
clf = svm.SVC(kernel='linear') # Linear Kernel

#Train the model using the training sets
clf.fit(xtrain, ytrain)

#Predict the response for test dataset
y_pred = clf.predict(xtest)

print("Accuracy:",metrics.accuracy_score(ytest, y_pred))

Accuracy: 0.76
