## Event Classification from EEG

### Introduction


The following analyes EEG data taken in experiments where participants where exposed to light and sound events. This code cleans that n channel EEG data and uses a long short-term memory recurrent neural network to classify the time following light or sound events by which event occured. 

### Data Prep



In [None]:
# setting the random seed for reproducibility
import random
seed=2023
random.seed(seed)

In [None]:
# import libraries 
import itertools
import numpy as np
import pandas as pd
from sklearn import preprocessing
from sklearn.preprocessing import StandardScaler

### Load the Data

In [None]:
# eeg1 and events1 are the test data from a single person
# code assumes eeg1 and events1 are csv files in the current working directory
event_path = ''
eeg_path = ''
eeg1 = pd.read_csv(eeg_path + "")
new_columns = eeg1.columns.values
print(eeg1.shape)

eeg1.columns = new_columns

events1 = pd.read_csv(event_path + "") #, delimiter="\t"
print(events1.shape)


In [None]:
time_col = np.arange(eeg1.shape[0])
time_col = time_col/500
time_col = time_col*1000
time_col = time_col.reshape(-1, 1)

eeg1.insert(0, 'time', time_col)

In [None]:
print(eeg1.shape)
print(eeg1.head())
# print(eeg1.loc[(eeg1["time"] > 150000) & (eeg1["time"] < 160000)])

In [None]:
# subsample of the data to ease building the model, unused in final run
eeg1_smol = eeg1
events1_smol = events1

In [None]:
def standardize_eeg(eeg_data):
    # breaks apart an eeg dataframe, scales the eeg readings, and reassmbles it into a dataframe
    column_list = eeg_data.columns[1:eeg1.shape[1]]
    print(column_list)
    time = eeg_data['time']
    # sample = eeg_data['sample']
    eeg_array = eeg_data[column_list]
    eeg_stnd = scale_data(eeg_array)
    eeg_stnd_df = pd.DataFrame(eeg_stnd, index=eeg_data.index, columns=column_list)
    eeg_stnd = pd.concat([time, eeg_stnd_df], axis =1)
    return eeg_stnd

def scale_data(unscaled_data):
    # helper function for standardize_eeg, fits a scaler and transforms the data 
    scaler = StandardScaler()
    scaler.fit(unscaled_data)
    scaled_data = scaler.transform(unscaled_data)
    return scaled_data
# takes in eeg dataframe and event dataframe, cleans them, 1hot encodes the events
def clean_eeg(eeg, events, event_interval_length, eeg_slice_length):
    #event_list = []
    array_list = [] 
    index_list = []
    eeg = standardize_eeg(eeg) # function for standardizing the eeg readings
    #events_new = build_zero_events(events)
    # iterate over the rows of the events and slice out the corresponding eeg data
    for index, row in itertools.islice(events.iterrows(), None, None): # loop through events data
        #build_event_list(row, event_list) #
        # print(row.latency)
#         tmin = row
        tmin, tmax = build_event_intervals(row, events)
        # print(tmin, tmax)
        eeg_slice = cut_event_intervals(eeg, tmin, tmax)
        # print('slice')
        array_list, index_list = build_array(eeg_slice, eeg_slice_length, 
                                             index, index_list, array_list)
    y_int = events.iloc[index_list] # take the event types for the correct index
    y_int = y_int['type'].values    # take just the event types as an array
    #y_int = y_int.as_matrix()            # save the event types as a matrix
    #y, lb = one_hot_events(y_int)        # one-hot the event types and save the binarizer
    X = np.stack(array_list, axis = 0)   # stack the arrays so the whole thing is 3D
    return X, y_int                     # return the data, outputs, and the binarizer
    
        
def build_event_list(row, event_list):
    # helper function to pull event types out of event data in the right order
    event_type = getattr(row, "type")
    event_list.append(event_type)
        
def build_event_intervals(row, events):
    # helper function to get the time intervals associated with each event
#     tmin = getattr(row, "latency")
    stim_onset = row.latency
    tmin = stim_onset
    tmax = stim_onset + 512*1.5
    return tmin, tmax

def cut_event_intervals(eeg, tmin, tmax):
    # helper function to slice up the eeg data so each slice is associated with one event
    eeg_slice = eeg.loc[(eeg["time"] > tmin) & (eeg["time"] < tmax)]
    eeg_slice.drop(["time"], axis = 1, inplace = True)
    return eeg_slice
    
def build_array(eeg_slice, eeg_slice_length, index, index_list, array_list):
    # helper function to build an array out of the eeg slices and pad them out to a standard length
    if len(eeg_slice) < eeg_slice_length:
        index_list.append(index)
        eeg_matrix = eeg_slice.to_numpy()
        padded_matrix = np.pad(eeg_matrix, ((0, eeg_slice_length - len(eeg_matrix)), (0,0)),
                                   'constant', constant_values=0)
        array_list.append(padded_matrix)
    return array_list, index_list

def one_hot_events(events):
    # helper function for one-hot encoding the events
    events_list = list(events)
    lb = preprocessing.LabelBinarizer()
    lb.fit(events_list)
    events_1hot = lb.transform(events_list)
    return events_1hot, lb

def invert_one_hot(events, lb):
    # function for decoding one-hot, binarizer made in one_hot_events
    inv_events = lb.inverse_transform(events)
    return inv_events

### Model



In [None]:
# full dataset parameters

# define model parameters
samples = events1.shape[0]  # how many trials of eeg data
n_features = eeg1.shape[1] - 1  # how many channels of eeg in each sample
time_steps = ... # how many ms was each sample run for
event_types = ... #len(set(y))  # how many different event types (light, sound, etc) are there # 6 large, 4 smol

In [None]:
# get the data into useable form and store as X and y
X, y = clean_eeg(eeg1, events1, samples, time_steps)

In [None]:
# removes the minor event types. There were only a couple hundred examples of each, whereas the used events had a 
# couple thousand examples
remove_list = [1]              # designate unwanted event types
drop_list = np.isin(y, remove_list)    # create a list of indices associated with unwanted events                  
drop_array = np.array(drop_list)       # make the list of indices to drop into an array

In [None]:
# make X, y's with the unwanted events removed
y_short_int = y[np.isin(y,remove_list, invert=True)]
X_short = X[np.isin(y, remove_list, invert=True)]

In [None]:
print(X_short.shape)

In [None]:
# one hot encode the y data without the unwanted events
y_short, lb = one_hot_events(y_short_int) 
# print(y_short)

In [None]:
from sklearn.model_selection import StratifiedShuffleSplit

# use strat. shuffle split to get indices for test and training data 
sss = StratifiedShuffleSplit(n_splits=2, test_size=0.2, random_state=seed)
sss.get_n_splits(X_short, y_short)

In [None]:
# take the indices generated by stratified shuffle split and make the test and training datasets
for train_index, test_index in sss.split(X_short, y_short):
    #print("TRAIN:", train_index, "TEST:", test_index)
    X_train, X_test = X_short[train_index], X_short[test_index]
    y_train, y_test = y_short[train_index], y_short[test_index]

In [None]:
print(y_train.shape)
from keras.utils import to_categorical
temp = to_categorical(y_train, 3)
print(temp.shape)

In [None]:
from tqdm import tqdm 
import keras
import matplotlib.pyplot as plt

# code for building an LSTM with 100 neurons and dropout. Runs for 50 epochs
model = keras.Sequential()
model.add(keras.layers.LSTM(1024, return_sequences=False, input_shape=(time_steps, n_features)))
model.add(keras.layers.Dropout(0.5))
#model.add(LSTM(100)) dramatically worse results
model.add(keras.layers.Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy',
              optimizer='rmsprop',
              metrics=['accuracy'])

# model.fit(X_train, y_train, batch_size=16, epochs=50)
# score = model.evaluate(X_test, y_test, batch_size=16)

class LossAccHistory(keras.callbacks.Callback):
    def on_train_begin(self, logs={}):
        self.losses = []
        self.accuracies = []

    def on_epoch_end(self, batch, logs={}):
        self.losses.append(logs.get('loss'))
        self.accuracies.append(logs.get('accuracy'))
        self.draw_plot()

    def draw_plot(self):
        epochs = len(self.losses)
        plt.figure(figsize=(12, 5))

        if(epochs % 50 == 0):
            # Plot loss
            plt.subplot(1, 2, 1)
            plt.plot(range(1, epochs + 1), self.losses, label='Training Loss')
            plt.xlabel('Epochs')
            plt.ylabel('Loss')
            plt.legend()

            # Plot accuracy
            plt.subplot(1, 2, 2)
            plt.plot(range(1, epochs + 1), self.accuracies, label='Training Accuracy')
            plt.xlabel('Epochs')
            plt.ylabel('Accuracy')
            plt.legend()

            plt.tight_layout()
            plt.show()

# Initialize the callback
history = LossAccHistory()

# Add ProgbarLogger to display real-time progress
progbar = keras.callbacks.ProgbarLogger()

# Fit the model with the callbacks
model.fit(X_train, y_train, batch_size=16, epochs=500, callbacks=[history, progbar])

# Evaluate the model
score = model.evaluate(X_test, y_test, batch_size=32)

# Plot the final training history
history.draw_plot()


In [None]:
score

In [None]:
print("Accuracy: %.2f%%" % (score[1]*100))

In [None]:
model.save('trained_model.h5')

#### saved model details
standardized

model = Sequential()
#model.add(Embedding(2, output_dim=256))
model.add(LSTM(100, input_shape=(time_steps, n_features)))
model.add(Dropout(0.5))
model.add(Dense(1, activation='sigmoid'))

model.compile(loss='binary_crossentropy',
              optimizer='rmsprop',
              metrics=['accuracy'])

model.fit(X_train, y_train, batch_size=16, epochs=50)
score = model.evaluate(X_test, y_test, batch_size=16)

This model run for 50 epochs had:

* binary crossentropy 0.41922928811677918

* accuracy 0.8529411764705882