<a href="https://colab.research.google.com/github/sagihaider/EEG_Deep/blob/master/main_2A/main_EEGNet_2A.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
!git clone https://github.com/sagihaider/EEG_Deep.git

Cloning into 'EEG_Deep'...
remote: Enumerating objects: 24, done.[K
remote: Counting objects: 100% (24/24), done.[K
remote: Compressing objects: 100% (24/24), done.[K
remote: Total 501 (delta 13), reused 0 (delta 0), pack-reused 477[K
Receiving objects: 100% (501/501), 1.69 GiB | 11.97 MiB/s, done.
Resolving deltas: 100% (243/243), done.
Checking out files: 100% (95/95), done.


In [2]:
import scipy.io as spio
import numpy as np
from importlib.machinery import SourceFileLoader

# EEGNet-specific imports
from EEG_Deep.EEGModels import EEGNet, ShallowConvNet, DeepConvNet
from tensorflow.keras import utils as np_utils
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras import backend as K
K.set_image_data_format('channels_first')

from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression

# tools for plotting confusion matrices
from matplotlib import pyplot as plt
from scipy.signal import butter, lfilter


# Band-pass Filter
def butter_bandpass_filter(data, lowcut, highcut, fs, order=4):
    nyq = 0.5 * fs
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    y = lfilter(b, a, data)
    return y

In [9]:
from numpy import zeros
K.clear_session()

X_tr = np.empty([288, 22, 1875])
X_ts = np.empty([288, 22, 1875])


# Initialize parameter
h_cut = [24]
drop_out = [0.25]
k_len = [15, 30, 60]
n_epochs = 300

out_f_name = 'acc_EEGNet' + '_epochs_' + str(n_epochs) + '_filter_' + str(h_cut) +'_drop_' + str(drop_out) +'_2AData.npy'
print(out_f_name)

nsub = 9
nfilt = len(h_cut)
ndrop = len(drop_out)
nkl = len(k_len)
loss_score = zeros([nsub, nfilt,ndrop,nkl])
categorical_crossentropy_score = zeros([nsub, nfilt,ndrop,nkl])
acc_all = zeros([nsub, nfilt,ndrop,nkl])

for sub_idx, x in enumerate(range(1,10)):
    for h_indx, h in enumerate(h_cut):
      fName = 'EEG_Deep/Data2A/Data_A0' + str(x) + 'T.mat'  # Load Data
      print(fName)
      mat = spio.loadmat(fName)
      r_X_tr = mat['cleanRawEEGData']
      y_tr = mat['cleanClassLabels']
  
      print(np.shape(r_X_tr))
      print(np.shape(y_tr))

      for t in range(r_X_tr.shape[0]):
        tril = r_X_tr[t,:,:]
        # tril = tril.transpose()
        tril_filtered = butter_bandpass_filter(tril, lowcut=4, highcut=h, 
                                               fs=250, order=4)
        # tril_filtered = tril_filtered.transpose()
        X_tr[t,:,:] = tril_filtered
      # split data of each subject in training and validation
      X_train      = X_tr[0:240,:,500:1250]
      Y_train      = y_tr[0:240]
      X_val       = X_tr[241:,:,500:1250]
      Y_val       = y_tr[241:]

      print(np.shape(X_train))
      print(np.shape(Y_train))
      print(np.shape(X_val))
      print(np.shape(Y_val))

      # convert labels to one-hot encodings.
      Y_train      = np_utils.to_categorical(Y_train-1)
      Y_val       = np_utils.to_categorical(Y_val-1)

      kernels, chans, samples = 1, 22, 750
      # convert data to NCHW (trials, kernels, channels, samples) format. Data 
      # contains 22 channels and 500 time-points. Set the number of kernels to 1.
      X_train = X_train.reshape(X_train.shape[0], kernels, chans, samples)
      X_val = X_val.reshape(X_val.shape[0], kernels, chans, samples)
   
      print('X_train shape:', X_train.shape)
      print(X_train.shape[0], 'train samples')
      print(X_val.shape[0], 'val samples')

      # Load test data         
      fName = 'EEG_Deep/Data2A/Data_A0' + str(x) + 'E.mat'  # Load Data
      print(fName)
      mat = spio.loadmat(fName)
      r_X_ts = mat['cleanRawEEGData']
      y_ts = mat['cleanClassLabels']

      print(np.shape(r_X_ts))
      print(np.shape(y_ts))

      for t in range(r_X_ts.shape[0]):
        tril = r_X_ts[t,:,:]
        # tril = tril.transpose()
        tril_filtered = butter_bandpass_filter(tril, lowcut=4, highcut=h, 
                                               fs=250,order=4)
        # tril_filtered = tril_filtered.transpose()
        X_ts[t,:,:] = tril_filtered 

      X_test      = X_ts[:,:,500:1250]
      Y_test      = y_ts[:]
      print(np.shape(X_test))
      print(np.shape(Y_test))

      #convert labels to one-hot encodings.
      Y_test      = np_utils.to_categorical(Y_test-1)

      # convert data to NCHW (trials, kernels, channels, samples) format. Data 
      # contains 22 channels and 500 time-points. Set the number of kernels to 1.
      X_test = X_test.reshape(X_test.shape[0], kernels, chans, samples)
      print('X_train shape:', X_test.shape)
      print(X_test.shape[0], 'train samples')

      for id_d, d in enumerate(drop_out):
        for id_kl, kl in enumerate(k_len):
          # configure the EEGNet-8,2,16 model with kernel length of 32 samples (other 
          # model configurations may do better, but this is a good starting point)
          model = EEGNet(nb_classes = 4, Chans = 22, Samples = 750, 
                         dropoutRate = d, kernLength = kl, F1 = 8, 
                         D = 2, F2 = 16, norm_rate = 0.25, dropoutType = 'Dropout')
          
          # compile the model and set the optimizers
          model.compile(loss='categorical_crossentropy', optimizer='adam', 
                        metrics = ['accuracy'])

          # count number of parameters in the model
          numParams    = model.count_params() 

          # set a valid path for your system to record model checkpoints
          checkpointer = ModelCheckpoint(filepath='/tmp/checkpoint.h5', 
                                         verbose=1, save_best_only=True)
          
          # the syntax is {class_1:weight_1, class_2:weight_2,...}. Here just setting
          # the weights all to be 1
          class_weights = {0:1, 1:1, 2:1, 3:1}
          
          history = model.fit(X_train, Y_train, batch_size = 16, epochs = n_epochs, 
                              verbose = 2, validation_data=(X_val, Y_val),
                              callbacks=[checkpointer], class_weight = class_weights)
          
          print('\n# Evaluate on test data')
          results = model.evaluate(X_test, Y_test, batch_size=1)
          print('test loss, test acc:', results)

          # loss_score[sub_idx,h_indx,id_d, id_kl] = results[0]
          # categorical_crossentropy_score[sub_idx,h_indx,id_d, id_kl] = results[1]
          acc_all[sub_idx,h_indx,id_d, id_kl] = results[1]

          # from keras import backend as K 
          # Do some code, e.g. train and save model
          # K.clear_session()


np.save(out_f_name, acc_all)


acc_EEGNet_epochs_300_filter_[24]_drop_[0.25]_2AData.npy
EEG_Deep/Data2A/Data_A01T.mat
(288, 22, 1875)
(288, 1)
(240, 22, 750)
(240, 1)
(47, 22, 750)
(47, 1)
X_train shape: (240, 1, 22, 750)
240 train samples
47 val samples
EEG_Deep/Data2A/Data_A01E.mat
(288, 22, 1875)
(288, 1)
(288, 22, 750)
(288, 1)
X_train shape: (288, 1, 22, 750)
288 train samples
Train on 240 samples, validate on 47 samples
Epoch 1/300

Epoch 00001: val_loss improved from inf to 1.38734, saving model to /tmp/checkpoint.h5
240/240 - 1s - loss: 1.3957 - acc: 0.2625 - val_loss: 1.3873 - val_acc: 0.2979
Epoch 2/300

Epoch 00002: val_loss improved from 1.38734 to 1.38455, saving model to /tmp/checkpoint.h5
240/240 - 0s - loss: 1.3354 - acc: 0.3958 - val_loss: 1.3846 - val_acc: 0.3191
Epoch 3/300

Epoch 00003: val_loss improved from 1.38455 to 1.37998, saving model to /tmp/checkpoint.h5
240/240 - 0s - loss: 1.2862 - acc: 0.4458 - val_loss: 1.3800 - val_acc: 0.3617
Epoch 4/300

Epoch 00004: val_loss improved from 1.37998