In [2]:
import time
import numpy as np
import random
import keras_tuner as kt
from matplotlib import pyplot as plt
from sklearn.model_selection import train_test_split
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.models import Sequential
from tensorflow.keras import layers as layers
from tensorflow.keras.layers import GlobalMaxPooling2D, Activation, Dense, Conv1D, Conv2D, Dropout, Flatten, MaxPooling2D, BatchNormalization, GlobalMaxPooling1D
from tensorflow.keras import optimizers
import itertools
from sklearn.utils import shuffle
from scipy import signal
%matplotlib inline
import matplotlib.pyplot as plt

from tensorflow.keras import regularizers
from EEGModels import EEGNet    

from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import normalize

from sklearn.model_selection import StratifiedKFold, KFold
from sklearn.metrics import classification_report,confusion_matrix
from sklearn.metrics import accuracy_score
from tensorflow.keras.utils import to_categorical

In [6]:
def showHistory(history):
    plt.rcParams['figure.figsize'] = [5, 5]
    for key in history.history.keys():

        if 'val_' not in key and 'lr' != key:
            try:
                plt.clf()
                plt.plot(history.history[key])
                plt.plot(history.history['val_'+key])
                plt.ylabel(key)
                plt.xlabel('epoch')
                plt.legend(['train', 'validation'], loc='upper left')
                plt.show()
            except:
                ...

classes = []
def smoothLabels(label, factor = 0.):
    label *= (1 - factor)
    label += (factor / len(label))
    return label

def oneHot(label, classes = classes):
    label = to_categorical(label,num_classes=len(classes))
    return smoothLabels(label)

def applyOneHot(data):
    new = []
    for y in data:
        new.append(oneHot(y))
    return np.array(new)

def DCFilter(data):
    new_data = []
    for d in data:
        new_data.append(d-np.mean(d))
    return np.array(new_data)

def notchFilter(data, f0 = 60.0, Q = 30.0, fs = 500):
    b, a = signal.iirnotch(f0, Q, fs)
    data = signal.filtfilt(b, a, data, axis=1)
    return np.array(data)

def preProcess(data):
    data = signal.resample(data,   signal_length,axis =-1)
  
    new_data = []
    for d in data:
        d = DCFilter(d)
        d = notchFilter(d)
        scaler = StandardScaler()
        scaler.fit(d)
        d = scaler.transform(d)
        new_data.append(normalize(d, norm='l2'))
    return np.array(new_data)

def showMe(data):
    
    plt.rcParams["figure.figsize"] = [17, 2]
    fig, (c1, c2, c3, c4, c5, c6) = plt.subplots(1, 6)
    c1.plot(data[0])
    c2.plot(data[1])
    c3.plot(data[2])
    c4.plot(data[3])
    c5.plot(data[4])
    c6.plot(data[5])
    plt.show()

In [7]:
signal_length = 100
classes = ['Rest','Left','Right']
resource_path = 'resources/'
train_sessions = ['session_0', 'session_1', 'session_2']

class_0 = []
class_1 = []
class_2 = []
for session in train_sessions:
    class_0.append(np.load(os.path.join(resource_path,session,classes[0]+'.npy')))
    class_1.append(np.load(os.path.join(resource_path,session,classes[1]+'.npy')))
    class_2.append(np.load(os.path.join(resource_path,session,classes[2]+'.npy')))
class_0 = np.concatenate((class_0),axis=0)
class_1 = np.concatenate((class_1),axis=0)
class_2 = np.concatenate((class_2),axis=0)


class_0 = preProcess(class_0)
class_1 = preProcess(class_1)
class_2 = preProcess(class_2)
print(class_0.shape)


(60, 6, 100)


In [None]:
class_0_avg = class_0.mean(axis=0)
showMe(class_0_avg)

In [None]:
class_1_avg = class_1.mean(axis=0)
showMe(class_1_avg)

In [None]:
class_2_avg = class_2.mean(axis=0)
showMe(class_2_avg)

In [None]:
for i in range(5):
    showMe(class_1[i])

In [None]:
for i in range(5):
    showMe(class_0[i])

In [8]:
X = np.concatenate((class_0,class_1,class_2),axis = 0)
y = np.concatenate(( np.zeros(class_0.shape[0]),np.ones(class_1.shape[0]),np.ones(class_1.shape[0])*2))


#SHUFFLE DATA
c = list(zip(X, y))
random.shuffle(c)
X,y = zip(*c)
X = np.array(X)
y = np.array(y)


X = np.expand_dims(X, axis = -1) 

y = applyOneHot(y)

print(X.shape)
print(y.shape)


(180, 6, 100, 1)
(180,)




In [9]:
#AUGMENT
noise = np.random.normal(0,0.1,X.size)
noise = np.reshape(noise,X.shape)

augmented = X + noise
X_aug = np.concatenate((X,augmented))
y_aug = np.concatenate((y,y))

print(X_aug.shape)
print(y_aug.shape)

(360, 6, 100, 1)
(360,)


In [12]:
def get_model():
    inspected_chanels= X.shape[1]
    signal_length=     X.shape[2]
    input_layer = keras.Input(shape = (inspected_chanels,signal_length,1), name='input')
    x     = layers.Conv2D(64, kernel_size=(1,5), padding='same', activation='relu')(input_layer)
    x     = layers.BatchNormalization()(x)
    x     = layers.AveragePooling2D(pool_size=(1,5))(x)

    x     = layers.Conv2D(32, kernel_size=(2,1), padding='same', activation='relu')(x)
    x     = layers.BatchNormalization()(x)
    x     = layers.AveragePooling2D(pool_size=(2,1))(x)

    x     = layers.Conv2D(16, 16, padding='same', activation='relu')(x)
    x     = layers.BatchNormalization()(x)

    x     = layers.GlobalAveragePooling2D()(x)
    x     = layers.Dense(64)(x)
    x     = layers.Dropout(0.2)(x)
    output = layers.Dense(len(classes), activation='softmax')(x)

    model = keras.Model(inputs=input_layer, outputs=output)


    return model


In [16]:

#model = get_model()
model=EEGNet(nb_classes=3,
                Chans=6,
                Samples = signal_length,  # sure?
                kernLength = 20,
)
lr_schedule = keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=1e-2,
    decay_steps=500,
    decay_rate=0.9
    )

opt = keras.optimizers.SGD(learning_rate=lr_schedule, momentum=0.9, nesterov=True)
model.compile(optimizer=opt,
          loss='categorical_crossentropy',
          metrics=['accuracy']
         )

In [None]:
skf = StratifiedKFold(n_splits=10)
tf.config.run_functions_eagerly(True)
accs = []
models = []
for train, test in skf.split(X_aug, y_aug.argmax(axis=1)):
  
    X_train = X_aug[train]
    X_test  = X_aug[test]
    y_train = y_aug[train]
    y_test  = y_aug[test]
    print(X_train.shape, y_train.shape)

   

    batch_size = 16 #len(X_train)
    print("Batch size: {}".format(batch_size))

    history = model.fit(X_train,
                        y_train,
                        validation_data=(X_test, y_test),
                        batch_size=batch_size,
                        epochs=100,
                        shuffle=True)

    showHistory(history)          
    acc = max(history.history['val_accuracy'])
    accs.append(acc)                       
    models.append(model)


    if acc > 0.9:
        break


model = models[accs.index(max(accs))]
for acc in accs:
    print(acc)

In [55]:
model.save('val_loss_083')

INFO:tensorflow:Assets written to: val_loss_083\assets


In [65]:
pred = model(X)
conf_matrix = np.zeros((3,3))
for i in range(int(len(pred))):
    prediction = np.argmax(pred[i])
    gt         = np.argmax(y[i])
    conf_matrix[gt][prediction] += 1
conf_matrix

array([[47.,  5.,  8.],
       [ 4., 44., 12.],
       [ 6., 21., 33.]])