Suzanne Oliver

Deep Learning Final Project

Fall 2024

# Data Pre-Processing

In [None]:
# download dataset
# just the files we need, otherwise takes too long
# easier to just download dataset and mount from google drive

validSubjs = list(range(1,110))
validFileIdx = [3,7,11] # files with hand movement

for iSubj in validSubjs:
  subjID = str(iSubj).zfill(3)
  for iFile in validFileIdx:
    fileID = str(iFile).zfill(2)
    curr_url = "https://physionet.org/files/eegmmidb/1.0.0/S"+ subjID + "/S" + subjID + "R" + fileID +".edf"
    !wget -r -N -c -np $curr_url

In [None]:
!pip install pyEDFlib
import pyedflib # to read EEG files

In [None]:
!pip install git+https://github.com/gumpy-bci/gumpy
import gumpy # signal processing toolbox

In [None]:
import numpy as np
import random
import csv
#import sklearn # OBLY UNCOMMENT TO INDUCE GUMPY CHANNEL FLIPPING BUG

In [None]:
# list of valid subjects (per original paper)
# remove subjects with inaccuracies in annotations
validSubjs = list(range(1,110))
validSubjs.remove(38)
validSubjs.remove(88)
validSubjs.remove(89)
validSubjs.remove(92)
validSubjs.remove(100)
validSubjs.remove(104)

In [None]:

# Get all the data into an array to deal with

nTrials = 360*len(validSubjs)

# initialize variables
dataset_x = np.zeros((nTrials, 64, 80))
dataset_y = np.zeros((nTrials,2))

validFileIdx = [3,7,11]
freq = 160 # sample rate in Hz
subLen = 80
count = 0

for iSubj in validSubjs:
  if iSubj % 10 == 0:
    print(iSubj)
  subjID = str(iSubj).zfill(3)
  for iFile in validFileIdx:
    fileID = str(iFile).zfill(2)
    # open relevant file
    f = pyedflib.EdfReader("/content/drive/MyDrive/DL Final/files/S"+ subjID + "/S" + subjID + "R" + fileID +".edf")
    annotations = f.readAnnotations()
    trialStartTimes = annotations[0] # get start times of each trial
    trialTypes = annotations[2] # get trial type (rest, left or right)
    temp = f.readSignal(1)
    # initalize variable for raw signal
    raw = np.zeros((64, len(temp)))
    for iChan in range(64):
      raw[iChan, :] = f.readSignal(iChan)
    for iTrial in range(len(trialStartTimes)):
      if trialTypes[iTrial] != 'T0': # only process non-rest trials
        startInd = int(trialStartTimes[iTrial] * freq)
        trialSig = raw[:,startInd:(startInd+(subLen*8))] # get signal for this trial
        for iChan in range(64):
          # for each channel, do pre-processing steps
          notch_signal = gumpy.signal.notch(trialSig[iChan,:], 60/(160 / 2)) # notch filter
          filt_signal = gumpy.signal.butter_bandpass(notch_signal, 2, 60, fs=160, order = 5) # bandpass filter
          clean_signal = np.squeeze(gumpy.signal.artifact_removal(filt_signal.reshape((-1, 1)))[0]) # artifact removal
          for iSubset in range(8):
            # break signal into 8 subset and record label for each
            dataset_x[count+iSubset, iChan, :] = clean_signal[(iSubset*subLen):((iSubset+1)*subLen)]
            if trialTypes[iTrial] == 'T2':
              dataset_y[count+iSubset,1] = 1
            else:
              dataset_y[count+iSubset,0] = 1
        count += 8
    f.close() # close file


In [None]:
# save data to use later
np.save('/content/drive/MyDrive/DL Final/dataset_x.npy', dataset_x)
np.save('/content/drive/MyDrive/DL Final/dataset_y.npy', dataset_y)

# Get Ready to Train

In [None]:
# load saved preprocessed data
dataset_y = np.load('/content/drive/MyDrive/DL Final/dataset_y.npy')
dataset_x = np.load('/content/drive/MyDrive/DL Final/dataset_x.npy')

In [None]:
# select trials for train (include validation data)

nTrials = 360*103

random.seed(25)
train_idx = random.sample(list(range(nTrials)), int(nTrials*0.8))

In [None]:
# build train dataset
x_train = dataset_x[train_idx, :,:]
y_train = dataset_y[train_idx,:]

In [None]:
# build test dataset out of leftover data

nTrials = 360*103
test_idx = list(range(nTrials))

for idx in train_idx:
  test_idx.remove(idx)

x_test = dataset_x[test_idx, :,:]
y_test = dataset_y[test_idx,:]

In [None]:
# Reshape x datasets to feed into model
x_train = x_train.reshape(x_train.shape[0], 1, x_train.shape[1], x_train.shape[2])
x_test = x_test.reshape(x_test.shape[0], 1, x_test.shape[1], x_test.shape[2])

# Define Model and Train

In [None]:
# import keras and set data format

import keras
from keras import layers
from keras import ops

keras.backend.set_image_data_format('channels_first')

In [None]:
# Create fusionNet

nChans = 64


axVal = 1
input_shape = (1, 64, 80)

#build first branch of model
input1 = keras.Input(shape=(input_shape))
branch1 = keras.layers.Conv2D(4, (1, 64), padding='same', use_bias=False, input_shape=input_shape)(input1)
branch1 = keras.layers.BatchNormalization(axis=axVal)(branch1)
branch1 = keras.layers.DepthwiseConv2D((64,1), depth_multiplier=2, depthwise_constraint=keras.constraints.max_norm(1.), use_bias=False)(branch1)
branch1 = keras.layers.BatchNormalization(axis=axVal)(branch1)
branch1 = keras.layers.Activation('elu')(branch1)
branch1 = keras.layers.AveragePooling2D((1,4))(branch1)
branch1 = keras.layers.Dropout(0.5)(branch1)
branch1 = keras.layers.SeparableConv2D(16, (1,8), padding='same', use_bias=False)(branch1)
branch1 = keras.layers.BatchNormalization(axis=axVal)(branch1)
branch1 = keras.layers.Activation('elu')(branch1)
branch1 = keras.layers.AveragePooling2D((1,8))(branch1)
branch1 = keras.layers.Dropout(0.5)(branch1)
branch1 = keras.layers.Flatten()(branch1)

# second branch
input2 = keras.Input(shape=(input_shape))
branch2 = keras.layers.Conv2D(8, (1, 128), padding='same', use_bias=False, input_shape=input_shape)(input2)
branch2 = keras.layers.BatchNormalization(axis=axVal)(branch2)
branch2 = keras.layers.DepthwiseConv2D((64,1), depth_multiplier=2, depthwise_constraint=keras.constraints.max_norm(1.), use_bias=False)(branch2)
branch2 = keras.layers.BatchNormalization(axis=axVal)(branch2)
branch2 = keras.layers.Activation('elu')(branch2)
branch2 = keras.layers.AveragePooling2D((1,4))(branch2)
branch2 = keras.layers.Dropout(0.5)(branch2)
branch2 = keras.layers.SeparableConv2D(16, (1,16), padding='same', use_bias=False)(branch2)
branch2 = keras.layers.BatchNormalization(axis=axVal)(branch2)
branch2 = keras.layers.Activation('elu')(branch2)
branch2 = keras.layers.AveragePooling2D((1,8))(branch2)
branch2 = keras.layers.Dropout(0.5)(branch2)
branch2 = keras.layers.Flatten()(branch2)

# third branch
input3 = keras.Input(shape=(input_shape))
branch3 = keras.layers.Conv2D(16, (1, 256), padding='same', use_bias=False, input_shape=input_shape)(input3)
branch3 = keras.layers.BatchNormalization(axis=axVal)(branch3)
branch3 = keras.layers.DepthwiseConv2D((64,1), depth_multiplier=2, depthwise_constraint=keras.constraints.max_norm(1.), use_bias=False)(branch3)
branch3 = keras.layers.BatchNormalization(axis=axVal)(branch3)
branch3 = keras.layers.Activation('elu')(branch3)
branch3 = keras.layers.AveragePooling2D((1,4))(branch3)
branch3 = keras.layers.Dropout(0.5)(branch3)
branch3 = keras.layers.SeparableConv2D(16, (1,32), padding='same', use_bias=False)(branch3)
branch3 = keras.layers.BatchNormalization(axis=axVal)(branch3)
branch3 = keras.layers.Activation('elu')(branch3)
branch3 = keras.layers.AveragePooling2D((1,8))(branch3)
branch3 = keras.layers.Dropout(0.5)(branch3)
branch3 = keras.layers.Flatten()(branch3)

# fusion
merge1 = keras.layers.concatenate([branch1, branch2])
merge2 = keras.layers.concatenate([merge1, branch3])

flat = keras.layers.Flatten()(merge2)

dense1 = keras.layers.Dense(2, kernel_constraint=keras.constraints.max_norm(0.25))(flat)

mod_out = keras.layers.Softmax()(dense1)

model = keras.Model(inputs=[input1, input2, input3], outputs=mod_out)



In [None]:
# create callbacks for early stopping and adjusting learning rate

callback_list = [
    keras.callbacks.ReduceLROnPlateau(
    monitor="val_loss",
    factor=0.1, patience=10), keras.callbacks.EarlyStopping(
    monitor="val_loss",
    patience=10,
    restore_best_weights=True,
    start_from_epoch=10
)]

In [None]:
# compile model
model.summary()
model.compile(loss = keras.losses.binary_crossentropy,optimizer= keras.optimizers.Adam(learning_rate=0.0001), metrics=['accuracy'])

In [None]:
# Do Training!
hist = model.fit([x_train, x_train, x_train], y_train, batch_size=64, shuffle=True, epochs=300, validation_split=0.125, callbacks=callback_list)



In [None]:
# Save training history
np.save('/content/drive/MyDrive/DL Final/FusionWithFlipTrue.npy',hist.history)

In [None]:
# get test accuracy
model.evaluate([x_test,x_test,x_test], y_test)

No flip, fusion model - accuracy: 0.7718

WIth flip, fusion model - accuracy: 0.8163


In [None]:
# save model
model.save('/content/drive/MyDrive/DL Final/FusionWithFlipTrue.keras')