# EEG SIGNAL CLASSIFICATION

## ABOUT

**Author :** Ravi Narayana K S

**Date :** 03.07.2024

## IMPLEMENTATION 

#### **Importing Dependencies :**

In [None]:
import mne
import os
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score,confusion_matrix
import matplotlib.pyplot as plt

import tensorflow as tf
import tensorflow as tf
from tensorflow.keras import layers, models, Input
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, BatchNormalization, DepthwiseConv2D, SeparableConv2D, AveragePooling2D, Flatten, Dense, Dropout, Activation, Reshape

#### **Loading Data :**

*Loading file names :*

In [None]:
data_folder = 'data' # folder containing the data
edf_files_1 = sorted([file for file in os.listdir(data_folder) if file.endswith('_1.edf')]) # rest
edf_files_2 = sorted([file for file in os.listdir(data_folder) if file.endswith('_2.edf')]) # task

*Loading raw(.edf) files :*

*raws1* and *raws2* contain the raw files of class 1 and class 2 respectively

In [None]:
raws1 = [] # rest
for edf_file in edf_files_1:
    raw = mne.io.read_raw_edf(os.path.join(data_folder, edf_file), preload=True)
    raws1.append(raw)

raws2 = [] # task
for edf_file in edf_files_2:
    raw = mne.io.read_raw_edf(os.path.join(data_folder, edf_file), preload=True)
    raws2.append(raw)

**Visualising the loaded data :**

In [None]:
print(raws1[0].info)
plt.close()
raws1[0].plot()

In [None]:
print(raws2[0].info)
plt.close()
raws2[0].plot()

#### **Calculate PSD :**

*rawPSDs1* and *rawPSDs2* contain the PSDs of raw files of class 1 and class 2 respectively

In [None]:
rawPSDs1 = [] # rest
for raw in raws1:
    rawPSD = raw.compute_psd(fmin=0, fmax=100, n_fft=2048)
    rawPSDs1.append(rawPSD)

rawPSDs2 = [] # task
for raw in raws2:
    rawPSD = raw.compute_psd(fmin=0, fmax=100, n_fft=2048)
    rawPSDs2.append(rawPSD)

*Visualising PSD for class 1*

In [None]:
print(rawPSDs1[0].info)
plt.close()
rawPSDs1[0].plot()

*Visualising PSD for class 2*

In [None]:
print(rawPSDs2[0].info)
plt.close()
rawPSDs2[0].plot()

*We observe high fluctuation in the data belonging to class 2*

#### **Defining Bands :**

In [None]:
bands = {'Delta': (1, 4), 'Theta': (4, 8), 'Alpha': (8, 12), 'Beta': (12, 30), 'Gamma': (30, 100)}

#### **Creating bandwise PSD :**

In [None]:
PSDs1 = []
Freqs1 = []

for rawPSD in rawPSDs1:
    PSD, Freq = rawPSD.get_data(return_freqs=True)  
    PSDs1.append(PSD)
    Freqs1.append(Freq)

PSDs2 = []
Freqs2 = []

for rawPSD in rawPSDs2:
    PSD, Freq = rawPSD.get_data(return_freqs=True)  
    PSDs2.append(PSD)
    Freqs2.append(Freq)

*bandPSDs1* and *bandPSDs2* contain the bandwise PSDs of raw files of class 1 and class 2 respectively

In [None]:
bandPSDs1 = [] # rest
for i in range(len(PSDs1)):
    psds = PSDs1[i]
    freqs = Freqs1[i]
    band_psd = {band:[] for band in bands}

    for band, (fmin,fmax) in bands.items():
        idx = np.logical_and(freqs>=fmin,freqs<=fmax)
        band_psd[band] = np.mean(psds[:,idx],axis=1)

    bandPSDs1.append(band_psd)

bandPSDs2 = [] # task
for i in range(len(PSDs2)):
    psds = PSDs2[i]
    freqs = Freqs2[i]
    band_psd = {band:[] for band in bands}

    for band, (fmin,fmax) in bands.items():
        idx = np.logical_and(freqs>=fmin,freqs<=fmax)
        band_psd[band] = np.mean(psds[:,idx],axis=1)

    bandPSDs2.append(band_psd)

#### **Calculating the difference :**

In [None]:
DIFFs = []

for i in range(len(bandPSDs1)):
    DIFF = {band:[] for band in bands}
    for band in bands:
        diff = bandPSDs1[i][band] - bandPSDs2[i][band]
        DIFF[band].append(diff)
    DIFFs.append(DIFF)

**Visualising the difference :**

In [None]:
plt.close()
for j in range(len(DIFFs)):
    for i,band in enumerate(bands):
        plt.plot(np.abs(DIFFs[j][band][0]),label=f'{band}')
    plt.title(f'Difference in Band PSD ({band},{j}) : Rest-Task')
    plt.xlabel('channels')
    plt.ylabel('PSD Difference (db/Hz)')
    plt.legend()
    plt.show()
        


*We observe that **ALPHA, THETA and DELTA** bands are sensitive to change of state from rest to task*

In [None]:
plt.close()
for band in bands:
    for i,j in enumerate(range(len(DIFFs))):
        plt.plot(np.abs(DIFFs[j][band][0]),label=f'{band}')
    plt.title(f'Difference in Band PSD ({band}) : Rest-Task')
    plt.xlabel('channels')
    plt.ylabel('PSD Difference (db/Hz)')
    plt.show()

*We observe that*
* **ALPHA** band signal fluctuation occurs in channels between ~7-20
* **THETA** band signal fluctuation occurs in channels between ~5-8 and ~15-20
* **DELTA** band signal fluctuation occurs in channel 15
* **GAMMA** band signal fluctuation occurs in channels between ~2-18


* 21st Channel shows high fluctuation

#### **Creating dataset for model training :**

In [None]:
data = []
labels = []

for i in PSDs1:
    data.append(i)
    labels.append(0)

for i in PSDs2:
    data.append(i)
    labels.append(1)

data = np.array(data)
labels = np.array(labels)

data.shape, labels.shape

In [None]:
X_train, X_test, y_train, y_test = train_test_split(data, labels, test_size=0.2, random_state=42,shuffle=True)

# Reshape data
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1], X_train.shape[2], 1)
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1], X_test.shape[2], 1)

#### **Implementing EEGNET :**

**Defining model :**

In [None]:
def EEGNet(nb_classes, Chans = 64, Samples = 128, dropoutRate = 0.5, kernLength = 64, F1 = 8, D = 2, F2 = 16, norm_rate = 0.25):
    model = Sequential()
    model.add(Conv2D(F1, (1, kernLength), padding = 'same', input_shape = (Chans, Samples, 1)))
    model.add(BatchNormalization())
    model.add(DepthwiseConv2D((Chans, 1), padding = 'valid', depth_multiplier = D))
    model.add(BatchNormalization())
    model.add(Activation('elu'))
    model.add(AveragePooling2D((1, 4)))
    model.add(Dropout(dropoutRate))
    model.add(SeparableConv2D(F2, (1, 16), padding = 'same'))
    model.add(BatchNormalization())
    model.add(Activation('elu'))
    model.add(AveragePooling2D((1, 8)))
    model.add(Dropout(dropoutRate))
    model.add(Flatten())
    model.add(Dense(nb_classes, activation = 'softmax'))
    return model

**Compiling model :**

In [None]:
model_EEGNET = EEGNet(nb_classes=2, Chans=X_train.shape[1], Samples=410)
model_EEGNET.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])

**Training model :**

In [None]:
model_EEGNET.fit(X_train, y_train, batch_size=16, epochs=100, validation_split=0.2)

**Evaluating models :**

In [None]:
model_EEGNET.evaluate(X_test, y_test)

In [None]:
y_hat = model_EEGNET.predict(X_test)
y_hat = np.argmax(y_hat, axis=1)

print(y_hat == y_test,'\n')

acc = accuracy_score(y_test, y_hat)
prec = precision_score(y_test, y_hat)
rec = recall_score(y_test, y_hat)
f1 = f1_score(y_test, y_hat)
con = confusion_matrix(y_test, y_hat)

print(f'Accuracy: {acc}\nPrecision: {prec}\nRecall: {rec}\nF1: {f1}\nConfusion Matrix: \n{con}')

#### **Implementing TSCeption :**

**Defining Model Builders :**

In [None]:
def TSCeptionBlock(input_tensor, filters, kernel_sizes):
    convs = []
    for kernel_size in kernel_sizes:
        conv = layers.Conv2D(filters=filters, kernel_size=(1, kernel_size), padding='same', activation='relu')(input_tensor)
        conv = layers.BatchNormalization()(conv)
        convs.append(conv)
    output = layers.Concatenate(axis=-1)(convs)
    return output

def TSCeptionModel(input_shape, num_classes, num_blocks=3, filters=16, kernel_sizes=[3, 5, 7]):
    input_layer = Input(shape=input_shape)
    
    x = input_layer
    for _ in range(num_blocks):
        x = TSCeptionBlock(x, filters, kernel_sizes)
        x = layers.MaxPooling2D(pool_size=(1, 2))(x)
        filters *= 2

    x = layers.Flatten()(x)
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dropout(0.5)(x)
    output_layer = layers.Dense(num_classes, activation='softmax')(x)

    model = models.Model(inputs=input_layer, outputs=output_layer)
    return model

In [None]:
input_shape = (21, 410, 1)  #(channels, sample, 1)
num_classes = 2

**Compiling Model :**

In [None]:
model_TSC = TSCeptionModel(input_shape, num_classes)
model_TSC.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])
model_TSC.summary()

**Training Model :**

In [None]:
history = model_TSC.fit(X_train, y_train, epochs=50, batch_size=32, validation_split=0.2)

**Evaluating Model :**

In [None]:
model_TSC.evaluate(X_test, y_test)

In [None]:
y_hat = model_TSC.predict(X_test)
y_hat = np.argmax(y_hat, axis=1)

print(y_hat == y_test,'\n')

acc = accuracy_score(y_test, y_hat)
prec = precision_score(y_test, y_hat)
rec = recall_score(y_test, y_hat)
f1 = f1_score(y_test, y_hat)
con = confusion_matrix(y_test, y_hat)

print(f'Accuracy: {acc}\nPrecision: {prec}\nRecall: {rec}\nF1: {f1}\nConfusion Matrix: \n{con}')

# Experimentation :

#### **Creating Processed Dataset for classification :**

In [None]:
dataset = []
for i in range(len(bandPSDs1)):
    row = []
    for band in bands:
        for j in bandPSDs1[i][band]:
            row.append(j)
    row.append(0)
    dataset.append(row)

for i in range(len(bandPSDs2)):
    row = []
    for band in bands:
        for j in bandPSDs2[i][band]:
            row.append(j)
    row.append(1)
    dataset.append(row)

In [None]:
dataset = np.array(dataset)
dataset.shape 

**Spliting dataset into training and testing batches :**

In [None]:
X = dataset[:,:-1]
Y = dataset[:,-1].astype(int)

X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=42)

#### **Implementing Convolution model :**

**Defining model :**

In [None]:
model = Sequential()
model.add(Reshape((21,5,1), input_shape=(105,)))
model.add(Conv2D(16, (5,5), padding='same', activation='relu'))
model.add(BatchNormalization())
model.add(AveragePooling2D(pool_size=(2,2)))
model.add(Dropout(0.5))
model.add(Conv2D(4, (5,5), padding='same', activation='relu'))
model.add(BatchNormalization())
model.add(AveragePooling2D(pool_size=(2,2)))
model.add(Dropout(0.5))
model.add(Flatten())
model.add(Dense(2, activation='softmax'))

**Compiling Model :**

In [None]:
model.compile(loss='sparse_categorical_crossentropy', optimizer='adam', metrics=['accuracy'])
model.summary()

In [None]:
history = model.fit(X_train, Y_train, batch_size=32, epochs=250, validation_split=0.2, shuffle=True)

**Analysing training vs validation curves for accuacy and loss :**

In [None]:
plt.plot(history.history['accuracy'], label='accuracy')
plt.plot(history.history['val_accuracy'], label = 'val_accuracy')
plt.xlabel('Epoch')
plt.ylabel('Accuracy')
plt.legend(loc='lower right')
plt.show()

In [None]:
plt.plot(history.history['loss'], label='loss')
plt.plot(history.history['val_loss'], label = 'val_loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend(loc='lower right')
plt.show()

**Evaluating model :**

In [None]:
model.evaluate(X_test, Y_test)

In [None]:
y_hat = model.predict(X_test)
y_hat = np.argmax(y_hat, axis=1)

print(y_hat == Y_test,'\n')

acc = accuracy_score(Y_test, y_hat)
prec = precision_score(Y_test, y_hat)
rec = recall_score(Y_test, y_hat)
f1 = f1_score(Y_test, y_hat)
con = confusion_matrix(Y_test, y_hat)

print(f'Accuracy: {acc}\nPrecision: {prec}\nRecall: {rec}\nF1: {f1}\nConfusion Matrix: \n{con}')

# RESULTS

1. With Respect to Data

    * Upon analysing the PSD trends of two classes, it is observed that the PSD curve of class 2 (task) is noisy.

    * Upon analysing the bandwise PSD and comparing bandwise PSDs of two classes, we observe that the ALPHA, THETA and DELTA bands fluctuate more than other bands.

    * The above inference brings up to conclusion that these bands are sensitive to change of state (0 -> 1).

    * Analysing the visualisations of difference between the bandwise PSDs we observe lot of fluctuations in data of 21st channel.

2. With Respect to Models Trained

    * The EEGNET model trains well in short span of time with training accuracy up to 65% and validation accuracy of about 45%.

    * The TSCeption model also performs similarly.

    * The much simplar CNN used gives output simliar to the previous models.

    * both the models are observed to have given the same scores,

    accuracy_score = 0.333333
    precision_score = 0.333333
    f1_score = 0.5
    confusion_matrix = [[0 , 10],
                        [0 , 5 ]]

3. With Respect to Model Evaluation

    * Upon close inspection of model traing history, we observe that the models are overfitting and hence doesnt generalise properly.

    * This can be inferred by looking at training_acc >> validation_acc and testing_acc << groungTruth.

    * Regularisation used did indeed reduce bit of overfitting, but overfitting does persist. Thus the overfitting can be removed only by increasing number of records in data.

4. With Respect to Experimentation

    * Upon exploring cleaning and reordering the PSD for eeg signal, the processed dataset consists of each record of shape (21,410,1).

    * A simple CNN trained on this dataset performs as good as models discussed earliercwith same sklearn.metrics