<a href="https://colab.research.google.com/github/vartikagpt10/EEG-based-ERP-Detection-for-practical-BCI/blob/main/WCCI_2020_EEGNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.layers import Conv2D, DepthwiseConv2D, BatchNormalization, Activation, AveragePooling2D, Dropout, SeparableConv2D, Flatten, Dense
from tensorflow.keras import optimizers
from tensorflow.keras import losses
from tensorflow.keras.utils import to_categorical

from google.colab import drive

from scipy.io import loadmat
from scipy.signal import butter, lfilter

from sklearn.model_selection import train_test_split, KFold

import numpy as np

import matplotlib.pyplot as plt

In [None]:
drive.mount('/content/drive')

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3aietf%3awg%3aoauth%3a2.0%3aoob&response_type=code&scope=email%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdocs.test%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive%20https%3a%2f%2fwww.googleapis.com%2fauth%2fdrive.photos.readonly%20https%3a%2f%2fwww.googleapis.com%2fauth%2fpeopleapi.readonly

Enter your authorization code:
··········
Mounted at /content/drive


In [None]:
#Download data from here https://github.com/5anirban9/Clinical-Brain-Computer-Interfaces-Challenge-WCCI-2020-Glasgow
#Upload on your drive and paste path from your drive here
path = '/content/drive/My Drive/wcci_patient_data/'
sample_rate = 512 #in Hz
num_channels = 12

In [None]:
# load train data
def load_train_data(path_to_file):
  annots = loadmat(path_to_file)
  raw_eeg_data = annots['RawEEGData']
  labels = annots['Labels']

  return raw_eeg_data, labels

In [None]:
# load evaluate data
def load_eval_data(path_to_file):
  annots = loadmat(path_to_file)
  raw_eeg_data = annots['RawEEGData']

  return raw_eeg_data

In [None]:
# Butterworth Bandpass Filter
# Source: https://scipy-cookbook.readthedocs.io/items/ButterworthBandpass.html
def butter_bandpass(lowcut, highcut, fs, order=5):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    return b, a


def butter_bandpass_filter(data, lowcut, highcut, fs, order=5):
    b, a = butter_bandpass(lowcut, highcut, fs, order=order)
    y = lfilter(b, a, data)
    return y

In [None]:
# Band-Pass Data
def get_data(patient_number, mode="within", low=8, high=24, order=5, test_size=0.15):
  if patient_number<1 or patient_number>10:
    print('Invalid Patient Number')
    return None
  if mode=="within":
    print('Mode = '+mode)
    if patient_number<10:
      patient_str = '0'+str(patient_number)
    else:
      patient_str = str(patient_number)

    raw_data_train, labels = load_train_data(path+'parsed_P'+patient_str+'T.mat')
    raw_data_train = raw_data_train[:,:,-1536:]
    labels = np.squeeze(labels)
    labels -= 1
    labels = to_categorical(labels, num_classes=2)

    raw_eval = load_eval_data(path+'parsed_P'+patient_str+'E.mat')
    raw_eval = raw_eval[:,:,-1536:]

    print('Loaded data for patient %d'%(patient_number))

    raw_train, y_train = raw_data_train, labels
    # #split data
    # raw_train, raw_test, y_train, y_test = train_test_split(raw_data_train, labels, test_size=test_size)

  if mode=="inter":
    print('Mode = '+mode)
    raw_train = np.array([])
    y_train = np.array([])
    for i in range(1,9):
      if i!=patient_number:
        if i<10:
          patient_str = '0'+str(i)
        else:
          patient_str = str(i)
        raw_data_train, labels = load_train_data(path+'parsed_P'+patient_str+'T.mat')
        raw_data_train = raw_data_train[:,:,-1536:]
        labels = np.squeeze(labels)
        labels -= 1
        labels = to_categorical(labels, num_classes=2)

        print('Loaded data for patient %d'%(i))

        if raw_train.size==0:
          raw_train = raw_data_train
          y_train = labels
        else:
          raw_train = np.concatenate((raw_train, raw_data_train))
          y_train = np.concatenate((y_train, labels))

    if patient_number<10:
      patient_str = '0'+str(patient_number)
    else:
      patient_str = str(patient_number)
    
    raw_test, labels = load_train_data(path+'parsed_P'+patient_str+'T.mat')
    raw_test = raw_test[:,:,-1536:]
    labels = np.squeeze(labels)
    labels -= 1
    y_test = to_categorical(labels, num_classes=2)

    raw_eval = load_eval_data(path+'parsed_P'+patient_str+'E.mat')
    raw_eval = raw_eval[:,:,-1536:]

    print('Loaded data for patient %d'%(patient_number))
  
  #filter training data
  X_train = raw_train
  for t in range(X_train.shape[0]):
    for c in range(num_channels):
      X_train[t][c] = butter_bandpass_filter(raw_train[t][c], low, high, sample_rate, order=order)

  # if X_train.ndim == 3:
  #   X_train = np.expand_dims(X_train, axis=3)

  # #filter testing data
  # X_test = raw_test
  # for t in range(X_test.shape[0]):
  #   for c in range(num_channels):
  #     X_test[t][c] = butter_bandpass_filter(raw_test[t][c], low, high, sample_rate, order=order)

  # if X_test.ndim == 3:
  #   X_test = np.expand_dims(X_test, axis=3)

  #filter evaluation data
  X_eval = raw_eval
  for t in range(X_eval.shape[0]):
    for c in range(num_channels):
      X_eval[t][c] = butter_bandpass_filter(raw_eval[t][c], low, high, sample_rate, order=order)

  if X_eval.ndim == 3:  
    X_eval = np.expand_dims(X_eval, axis=3)

  # return X_train, y_train, X_test, y_test, X_eval
  return X_train, y_train, X_eval

In [None]:
def get_model(flt_size=32, drop=0.25):
  #build model
  model = tf.keras.Sequential()
  model.add(Conv2D(8, (1, flt_size), padding="same", input_shape=(12, 1536, 1)))
  model.add(BatchNormalization())
  C = num_channels
  model.add(DepthwiseConv2D((C, 1), padding="valid", depth_multiplier=2))
  model.add(BatchNormalization())
  model.add(Activation('relu'))
  model.add(AveragePooling2D(pool_size=(1, 4), padding="valid"))
  model.add(Dropout(drop))
  model.add(SeparableConv2D(16, (1, 16), padding="same"))
  model.add(BatchNormalization())
  model.add(Activation('relu'))
  model.add(AveragePooling2D(pool_size=(1, 8), padding="valid"))
  model.add(Dropout(drop))
  model.add(Flatten())
  model.add(Dense(2))
  model.add(Activation('softmax'))

  #compile model
  model.compile(loss='categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

  return model

In [None]:
test_acc = []
patients = [1]
num_folds = 10
num_epochs = [500]
flt_size = [128]
drop = [0.5]

for i in patients:
  #get data
  # X_train, y_train, X_test, y_test, X_eval = get_data(i, mode="within", test_size=0.1)
  for e in num_epochs:
    for f in flt_size:
      for d in drop:
        X, y, X_eval = get_data(i, mode="within")
        val_acc = []
        kf = KFold(num_folds, shuffle=True)
        for train_index, test_index in kf.split(X):
          X_train, X_test = X[train_index], X[test_index]
          y_train, y_test = y[train_index], y[test_index]
          if X_train.ndim == 3:
            X_train = np.expand_dims(X_train, axis=3)
          if X_test.ndim == 3:
            X_test = np.expand_dims(X_test, axis=3)
          # print('\nTrain data shape:',X_train.shape)
          # print('Test data shape:',X_test.shape)
          # print('Evaluate data shape:',X_eval.shape)

          # train model
          model = get_model(flt_size=f, drop=d)
          history = model.fit(X_train, y_train, epochs=e, verbose=1)

          #evaluate model on test_data
          loss, acc = model.evaluate(X_test, y_test)
          val_acc.append(acc)
          # print(np.argmax(y_test, axis=1), ':Truth')
          # y_eval = np.argmax(model.predict(X_test), axis=-1)
          # print(y_eval, ':Predicted')
        print('\nPatient=%d | Epochs=%d  |  Filter Size=%d  |  Droput=%f'%(i, e, f, d))
        print(val_acc)
        print(np.mean(np.array(val_acc)),'\n')

  # print('\nTraining Done for Patient',i)
  # print(val_acc)
  # print(np.mean(np.array(val_acc)))

[1;30;43mStreaming output truncated to the last 5000 lines.[0m
Epoch 43/500
Epoch 44/500
Epoch 45/500
Epoch 46/500
Epoch 47/500
Epoch 48/500
Epoch 49/500
Epoch 50/500
Epoch 51/500
Epoch 52/500
Epoch 53/500
Epoch 54/500
Epoch 55/500
Epoch 56/500
Epoch 57/500
Epoch 58/500
Epoch 59/500
Epoch 60/500
Epoch 61/500
Epoch 62/500
Epoch 63/500
Epoch 64/500
Epoch 65/500
Epoch 66/500
Epoch 67/500
Epoch 68/500
Epoch 69/500
Epoch 70/500
Epoch 71/500
Epoch 72/500
Epoch 73/500
Epoch 74/500
Epoch 75/500
Epoch 76/500
Epoch 77/500
Epoch 78/500
Epoch 79/500
Epoch 80/500
Epoch 81/500
Epoch 82/500
Epoch 83/500
Epoch 84/500
Epoch 85/500
Epoch 86/500
Epoch 87/500
Epoch 88/500
Epoch 89/500
Epoch 90/500
Epoch 91/500
Epoch 92/500
Epoch 93/500
Epoch 94/500
Epoch 95/500
Epoch 96/500
Epoch 97/500
Epoch 98/500
Epoch 99/500
Epoch 100/500
Epoch 101/500
Epoch 102/500
Epoch 103/500
Epoch 104/500
Epoch 105/500
Epoch 106/500
Epoch 107/500
Epoch 108/500
Epoch 109/500
Epoch 110/500
Epoch 111/500
Epoch 112/500
Epoch 113/50

KeyboardInterrupt: ignored

In [None]:
for i in range(len(test_acc)):
  print('Test accuracy for patient %d : %f'%(patients[i],test_acc[i]))

#evaluate model on test_data
# loss, acc = model.evaluate(X_test, y_test)
# print(acc)

In [None]:
# # Plot training accuracy values
# plt.plot(history.history['acc'])
# plt.title('Model training accuracy')
# plt.ylabel('Accuracy')
# plt.xlabel('Epoch')
# plt.show()

# # Plot training loss values
# plt.plot(history.history['loss'])
# plt.title('Model training loss')
# plt.ylabel('Loss')
# plt.xlabel('Epoch')
# plt.show()

# predict classes for evaluate data
# y_eval = np.argmax(model.predict(X_eval), axis=-1)
# print(y_eval)