<a href="https://colab.research.google.com/github/rahilv99/EEG_network/blob/main/EEG_network.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Get data from google cloud bucket

In [None]:
from google.colab import auth
auth.authenticate_user()

In [None]:
!echo "deb http://packages.cloud.google.com/apt gcsfuse-bionic main" > /etc/apt/sources.list.d/gcsfuse.list
!curl https://packages.cloud.google.com/apt/doc/apt-key.gpg | apt-key add -
!apt -qq update
!apt -qq install gcsfuse

In [None]:
!mkdir /content/EEG_directory

Replace eegnet with google cloud project registered to your credentials

In [None]:
!gsutil -m -u eegnet cp -r gs://eegmmidb-1.0.0.physionet.org /content/EEG_directory

Install dependencies

In [None]:
!pip install mne

In [None]:
import numpy as np
import pandas as pd

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Activation, SeparableConv2D, DepthwiseConv2D, Conv2D, SpatialDropout2D
from tensorflow.keras.layers import MaxPooling2D, AveragePooling2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import Input, Flatten
from tensorflow.keras.constraints import max_norm
from tensorflow.keras.callbacks import ModelCheckpoint

from glob import glob
import os
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.model_selection import ShuffleSplit, cross_val_score
from sklearn.pipeline import Pipeline
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

from mne import Epochs, pick_types
from mne.channels import make_standard_montage
from mne.datasets import eegbci
from mne.decoding import CSP
from mne.io import concatenate_raws, read_raw_edf

import matplotlib.pyplot as plt

Preprocess EEG data
- make left = 0 and right = 1

In [None]:
root_dir = '/content/EEG_directory/eegmmidb-1.0.0.physionet.org'
all_files = {}
for dirpath, dirnames, filenames in os.walk(root_dir):
  to_add = glob(os.path.join(dirpath,"*.edf"))
  key = dirpath.replace("/content/EEG_directory/eegmmidb-1.0.0.physionet.org/","")
  for f in to_add:
    if "R04" in f or "R08" in f or "R12" in f:
      all_files.setdefault(key, []).extend([f])

def extract_data(id):
  f_names = all_files[id]

  # piece raw
  raw = concatenate_raws([read_raw_edf(f, preload = True) for f in f_names])
  eegbci.standardize(raw)  # set channel names
  montage = make_standard_montage("standard_1005")
  raw.set_montage(montage)
  raw.annotations.rename(dict(T1="left", T2="right"))
  raw.set_eeg_reference(projection=True)

  # Apply band-pass filter
  raw.filter(7.0, 30.0, fir_design="firwin", skip_by_annotation="edge") # finitie impulse response (window method) filter
  # Apply selection
  picks = pick_types(raw.info, meg=False, eeg=True, stim=False, eog=False, exclude="bads")

  epochs = Epochs(
    raw,
    event_id=["left", "right"],
    tmin=-1,
    tmax=4,
    proj=True,
    picks=picks,
    baseline=None,
    preload=True)

  x = epochs
  y = epochs.events[:, -1] - 2
  info = epochs.info

  return x,y,info

x,y,info = extract_data('S001')

x_train = x.copy().crop(tmin=1.0,tmax=2)


Split data

In [None]:
# Define a monte-carlo cross-validation generator (reduce variance):
epochs_data = x.get_data(copy=False)
epochs_data_train = x_train.get_data(copy=False)
cv = ShuffleSplit(10, test_size=0.2, random_state=42)

model_x = epochs_data_train.reshape(epochs_data_train.shape[0], epochs_data_train.shape[1], epochs_data_train.shape[2], 1)

scaled_data = model_x*100000

# Train/Test Split
X_train, X_test, Y_train, Y_test = train_test_split(scaled_data, y, test_size=0.2, random_state=42)

# Train/Validation Split
X_train, X_val, Y_train, Y_val = train_test_split(X_train, Y_train, test_size=0.2, random_state=42)



print("Training Data: ", X_train)
print("Epochs:", X_train.shape[0])
print("Channels:", X_train.shape[1])
print("Time Points:", X_train.shape[2])
print("Depth:", X_train.shape[3])
print("Left/Right Kinisthenic Movement:")
print(Y_train)

Display comon spatial patterns for selected data

In [None]:
scores = []

labels = x.events[:, -1] - 2
# Assemble a classifier
lda = LinearDiscriminantAnalysis()
csp = CSP(n_components=4, reg=None, log=True, norm_trace=False)

# Use scikit-learn Pipeline with cross_val_score function
clf = Pipeline([("CSP", csp), ("LDA", lda)])
scores = cross_val_score(clf, epochs_data_train, labels, cv=cv, n_jobs=None)

# plot CSP patterns estimated on full data for visualization
csp.fit_transform(epochs_data, y)

csp.plot_patterns(x.info, ch_type="eeg", units="Patterns (AU)", size=1.5)

Display example frequenecy domain of EEG signal

In [None]:
# Shape of X_train is epochs x channels x samples x 1
print(X_train.shape)
EEG_sig = X_train[1][10]
size = len(EEG_sig)
t = np.linspace(1,2,size)

Fs = 160
EEG_freq = np.fft.fft(EEG_sig)
f = np.linspace(0,Fs,size)

print(Y_train[1])

plt.figure()
plt.plot(t,EEG_sig)
plt.xlabel("time (s)")
plt.ylabel("Microvolts (uV)")
plt.title("Epoch 1, Channel 11 (Motor Cortex)")


plt.figure()
plt.plot(np.abs(f[0:round(size/2+1)]),np.abs(EEG_freq[0:round(size/2+1)]))
plt.xlabel('frequency (Hz)')
plt.ylabel('DFT of EEG signal')
plt.title("Epoch 1, Channel 11 (Motor Cortex)")
plt.show()

Classifier for kinesthenic imagination (Model Construction)

In [None]:
def EEGNet(Chans = 64, Samples = 128, kernelLength = 64, dropoutRate = 0.5, norm_rate = 0.25):
    '''
    chans = number of electrodes (EEG signal from each electrode)
    samples = number of discrete points of data in the EEG signal
    kernelLength = length of kernel, dictates how many electrodes considered in the filter
    dropoutRate = portion of nodes to drop to prevent overfitting
    '''
    input1   = Input(shape = (Chans, Samples, 1))

    block1       = Conv2D(4, (1, kernelLength), padding = 'same',
                          input_shape = (Chans, Samples, 1))(input1)
    block1       = BatchNormalization()(block1)
    # depth multiplier = # of spatial filters to learn in 1 temporal
    block1       = DepthwiseConv2D((Chans, 1), depth_multiplier = 2,
                                   depthwise_constraint = max_norm(1.))(block1)
    block1       = BatchNormalization()(block1)
    block1       = Activation('elu')(block1)
    block1       = AveragePooling2D((1, 4))(block1)
    block1       = SpatialDropout2D(dropoutRate)(block1)

    block2       = SeparableConv2D(4, (1, 16), padding = 'same')(block1)
    block2       = BatchNormalization()(block2)
    block2       = Activation('elu')(block2)
    block2       = AveragePooling2D((1, 8))(block2)
    block2       = SpatialDropout2D(dropoutRate)(block2)

    flatten      = Flatten(name = 'flatten')(block2)

    dense        = Dense(1, name = 'dense', activation = 'sigmoid',
                         kernel_constraint = max_norm(norm_rate))(flatten)

    return Model(inputs=input1, outputs=dense)


Compile and run

note: accurary shows MSE from output vs 0 (left) or 1 (right)

In [None]:
checkpointer = ModelCheckpoint(filepath='/tmp/checkpoint.h5', verbose=1, save_best_only=True)
channels = X_train.shape[1]
samples = X_train.shape[2]

model = EEGNet(Chans = channels, Samples = samples)

# compile the model and set the optimizers
model.compile(loss='binary_crossentropy', optimizer='adam', metrics = ['accuracy'])

fittedModel = model.fit(X_train, Y_train, epochs = 300, verbose = 2, validation_data=(X_val, Y_val), callbacks=[checkpointer])

model.load_weights('/tmp/checkpoint.h5')

Evaluation

In [None]:
probs       = model.predict(X_test)
preds       = np.round(probs)

size = len(preds)
count = 0
for i in range(size):
  pred = int(preds[i][0])
  val = Y_test[i]
  if  pred != val:
    print("index", i)
    count = count+1

acc = (1-count/size)*100
print("Classification accuracy: %f " % (acc))

Unmount data from google cloud when done

In [None]:
!fusermount -u /content/EEG_directory

fusermount: failed to unmount /content/EEG_directory: Invalid argument
