<a href="https://colab.research.google.com/github/saugatbh/EEG_NSL/blob/master/Performance_DatasetIIb.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
!git clone https://github.com/saugatbh/EEG_NSL.git

Cloning into 'EEG_NSL'...
remote: Enumerating objects: 55, done.[K
remote: Counting objects: 100% (55/55), done.[K
remote: Compressing objects: 100% (54/54), done.[K
remote: Total 55 (delta 1), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (55/55), done.
Checking out files: 100% (48/48), done.


In [0]:
from __future__ import print_function, division

import os
from scipy import io
from scipy.signal import butter, lfilter, filtfilt
from glob import glob
import numpy as np
import matplotlib.pyplot as plt
import itertools

from numpy import zeros

# EEGNet-specific imports
from EEG_NSL.EEGModels import EEGNet, ShallowConvNet, DeepConvNet
from tensorflow.keras import utils as np_utils
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import backend as K
K.set_image_data_format('channels_first')

!pip install neural_structured_learning
import neural_structured_learning as nsl
import tensorflow as tf



In [0]:
# Band-pass Filter
def butter_bandpass_filter(data, lowcut, highcut, fs, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='bandpass')
    y = filtfilt(b, a, data)
    return y

In [0]:
# Data preparation
people = 9  # Change the number of subjects to work on
class_id = [1, 2]  # Class information
no_of_sessions = 5
samplerate = 250
bands = [4, 40]
order = 4
no_electrodes = 3
epoch_length = 1000

data_path = 'EEG_NSL/BCIcompIV_DS2b/'  # Change path
subjects = dict(enumerate([str(s).zfill(2) for s in range(1, people+1)]))
sessions = dict(enumerate([str(s).zfill(2) for s in range(1, no_of_sessions+1)]))
print(subjects, sessions)

print('*'*80)
# print('Loading subjects:', subjects)
print('Loading subjects:', list(subjects.values()))

# Load the data set for subjects
filtered_epoch = {}
label = {}
train_data = {}
test_data = {}
train_label = {}
test_label = {}
for s, sub in enumerate(subjects.values()):
    filtered_epoch[s] = {}
    label[s] = {}
    train_data[s]=[]
    test_data[s]=[]
    train_label[s] = []
    test_label[s] = []
    for ses, sess in enumerate(sessions.values()):
        print('Subject %s Session %s' % (sub, sess))
        file = data_path + 'B' + str(sub) + str(sess) +'.mat'
        raw_eeg = io.loadmat(file)['rawEEG']
        raw_eeg[np.isnan(raw_eeg)] = 0
        # load event information
        triggers = io.loadmat(file)['triggers']
        y = io.loadmat(file)['classlabel']
        label[s][ses] = y
        # Extract the epochs
        filtered_epoch[s][ses] = np.empty((len(y), no_electrodes, epoch_length))
        for tr in range(len(y)):
            for elec in range(no_electrodes):
                epoch = raw_eeg[triggers[tr, 0]:triggers[tr, 0] + epoch_length, elec]
                filtered_epoch[s][ses][tr, elec, :] = butter_bandpass_filter(epoch, lowcut=bands[0], highcut=bands[1],
                                                                             fs=samplerate, order=order)
        if ses < 3:
            train_data[s].extend(filtered_epoch[s][ses])
            train_label[s].extend(label[s][ses])
        else:
            test_data[s].extend(filtered_epoch[s][ses])
            test_label[s].extend(label[s][ses])
    print(np.asarray(train_data[s]).shape, np.asarray(test_data[s]).shape)
    print(np.asarray(train_label[s]).shape, np.asarray(test_label[s]).shape)

{0: '01', 1: '02', 2: '03', 3: '04', 4: '05', 5: '06', 6: '07', 7: '08', 8: '09'} {0: '01', 1: '02', 2: '03', 3: '04', 4: '05'}
********************************************************************************
Loading subjects: ['01', '02', '03', '04', '05', '06', '07', '08', '09']
Subject 01 Session 01
Subject 01 Session 02
Subject 01 Session 03
Subject 01 Session 04
Subject 01 Session 05
(400, 3, 1000) (320, 3, 1000)
(400, 1) (320, 1)
Subject 02 Session 01
Subject 02 Session 02
Subject 02 Session 03
Subject 02 Session 04
Subject 02 Session 05
(400, 3, 1000) (280, 3, 1000)
(400, 1) (280, 1)
Subject 03 Session 01
Subject 03 Session 02
Subject 03 Session 03
Subject 03 Session 04
Subject 03 Session 05
(400, 3, 1000) (320, 3, 1000)
(400, 1) (320, 1)
Subject 04 Session 01
Subject 04 Session 02
Subject 04 Session 03
Subject 04 Session 04
Subject 04 Session 05
(420, 3, 1000) (320, 3, 1000)
(420, 1) (320, 1)
Subject 05 Session 01
Subject 05 Session 02
Subject 05 Session 03
Subject 05 Session 0

In [0]:
for s, sub in enumerate(subjects.values()):
  # split data of each subject in training and validation
  X_train      = np.asarray(train_data[s])[0:300,:,:]
  Y_train      = np.asarray(train_label[s])[0:300]
  X_val       = np.asarray(train_data[s])[300:,:,:]
  Y_val       = np.asarray(train_label[s])[300:]
  X_test      = np.asarray(test_data[s])
  Y_test      = np.asarray(test_label[s])
  
  # convert labels to one-hot encodings.
  Y_train      = np_utils.to_categorical(Y_train-1)
  Y_val       = np_utils.to_categorical(Y_val-1)
  Y_test      = np_utils.to_categorical(Y_test-1)

  kernels, chans, samples = 1, 3, 1000
  # convert data to NCHW (trials, kernels, channels, samples) format. Data 
  # contains 22 channels and 500 time-points. Set the number of kernels to 1.
  X_train      = X_train.reshape(X_train.shape[0], kernels, chans, samples)
  X_val       = X_val.reshape(X_val.shape[0], kernels, chans, samples)
  X_test       = X_test.reshape(X_test.shape[0], kernels, chans, samples)
   
  print('X_train shape:', X_train.shape)
  print(X_train.shape[0], 'train samples')
  print(X_val.shape[0], 'val samples')
  print(X_test.shape[0], 'test samples')

  # configure the EEGNet-8,2,16 model with kernel length of 32 samples (other 
  # model configurations may do better, but this is a good starting point)
  model = EEGNet(nb_classes = 2, Chans = 3, Samples = 1000, 
                 dropoutRate = 0.5, kernLength = 25, F1 = 8, 
                 D = 2, F2 = 16, norm_rate = 0.25, dropoutType = 'Dropout')

  # compile the model and set the optimizers
  model.compile(loss='categorical_crossentropy', optimizer='adam', 
                metrics = ['accuracy'])

  # count number of parameters in the model
  numParams    = model.count_params() 

  # set a valid path for your system to record model checkpoints
  checkpointer = ModelCheckpoint(filepath='/tmp/checkpoint.h5', verbose=1,
                                 save_best_only=True)
  
  # the syntax is {class_1:weight_1, class_2:weight_2,...}. Here just setting
  # the weights all to be 1
  class_weights = {0:1, 1:1, 2:1, 3:1}

  ################################################################################
  # fit the model. Due to very small sample sizes this can get
  # pretty noisy run-to-run, but most runs should be comparable to xDAWN + 
  # Riemannian geometry classification (below)
  ################################################################################
  # history = model.fit(X_train, Y_train, batch_size = 16, epochs = 100, 
  #                     verbose = 2, validation_data=(X_val, Y_val),
  #                      class_weight = class_weights) #callbacks=[checkpointer],
  
  # # Plot training & validation accuracy values
  # plt.plot(history.history['acc'])
  # plt.plot(history.history['val_acc'])
  # plt.title('Model accuracy')
  # plt.ylabel('Accuracy')
  # plt.xlabel('Epoch')
  # plt.legend(['Train', 'Test'], loc='upper left')
  # plt.show()
  # figName = 'Accuracy_A0' + str(x) + '.png'  
  # plt.savefig(figName)

  print('\n# Evaluate on test data')
  results = model.evaluate(X_test, Y_test, batch_size=1)
  print('test loss, test acc:', results)

  acc_all[x - 1, 0] = results[0]
  acc_all[x - 1, 1] = results[1]

  from keras import backend as K 
  # Do some code, e.g. train and save model
  K.clear_session()

X_train shape: (300, 1, 3, 1000)
300 train samples
100 val samples
320 test samples

# Evaluate on test data


InvalidArgumentError: ignored

In [0]:
for s, sub in enumerate(subjects.values()):
  # split data of each subject in training and validation
  X_train      = np.asarray(train_data[s])[0:300,:,:]
  Y_train      = np.asarray(train_label[s])[0:300]
  X_val       = np.asarray(train_data[s])[301:,:,:]
  Y_val       = np.asarray(train_label[s])[301:]
  X_test      = np.asarray(test_data[s])
  Y_test      = np.asarray(test_label[s])
  
  # convert labels to one-hot encodings.
  Y_train      = np_utils.to_categorical(Y_train-1)
  Y_val       = np_utils.to_categorical(Y_val-1)
  Y_test      = np_utils.to_categorical(Y_test-1)

  kernels, chans, samples = 1, 3, 1000
  # convert data to NCHW (trials, kernels, channels, samples) format. Data 
  # contains 22 channels and 500 time-points. Set the number of kernels to 1.
  X_train      = X_train.reshape(X_train.shape[0], kernels, chans, samples)
  X_val       = X_val.reshape(X_val.shape[0], kernels, chans, samples)
  X_test       = X_test.reshape(X_test.shape[0], kernels, chans, samples)
   
  print('X_train shape:', X_train.shape)
  print(X_train.shape[0], 'train samples')
  print(X_val.shape[0], 'val samples')
  print(X_test.shape[0], 'test samples')

  # configure the EEGNet-8,2,16 model with kernel length of 32 samples (other 
  # model configurations may do better, but this is a good starting point)
  model = EEGNet(nb_classes = 2, Chans = 3, Samples = 1000, 
                 dropoutRate = 0.5, kernLength = 25, F1 = 8, 
                 D = 2, F2 = 16, norm_rate = 0.25, dropoutType = 'Dropout')

  adv_config = nsl.configs.make_adv_reg_config(multiplier=0.2, adv_step_size=0.5, adv_grad_norm='infinity')
  adv_model = nsl.keras.AdversarialRegularization(model, adv_config=adv_config)
  
  # compile the model and set the optimizers
  adv_model.compile(loss='categorical_crossentropy', optimizer='adam', metrics = ['accuracy'])
  batch_size = 16

  X_train = tf.cast(X_train, tf.float32)
  X_test = tf.cast(X_test, tf.float32)
  X_val = tf.cast(X_val, tf.float32)


  train_data = tf.data.Dataset.from_tensor_slices({'input': X_train, 'label': Y_train}).batch(batch_size)
  val_data = tf.data.Dataset.from_tensor_slices({'input': X_val, 'label': Y_val}).batch(batch_size)
  test_data = tf.data.Dataset.from_tensor_slices({'input': X_test, 'label': Y_test}).batch(batch_size)

  val_steps = X_val.shape[0] // batch_size

  
  adv_model.fit(train_data, validation_data=val_data, validation_steps=None, epochs=100, verbose=1)

  history = adv_model.fit(train_data, validation_data=val_data, validation_steps=None, epochs=100, verbose=1)

  # Plot training & validation accuracy values
  plt.plot(history.history['categorical_accuracy'])
  plt.plot(history.history['val_categorical_accuracy'])
  plt.title('Model accuracy')
  plt.ylabel('Accuracy')
  plt.xlabel('Epoch')
  plt.legend(['Train', 'Test'], loc='upper left')
  plt.show()
  figName = 'Accuracy_A0' + str(x) + '.png'  
  plt.savefig(figName)

  print('\n# Evaluate on test data')
  results = adv_model.evaluate(test_data)
  print('test loss, test acc:', results)

  acc_all[x - 1, 0] = results[1]
  acc_all[x - 1, 1] = results[2]

  from keras import backend as K 
  # Do some code, e.g. train and save model
  K.clear_session()


print(acc_all)


X_train shape: (300, 1, 3, 1000)
300 train samples
99 val samples
320 test samples

Train on 19 steps, validate on 7 steps
Epoch 1/100


InvalidArgumentError: ignored