In [None]:
import os
import csv
import math
import wandb
import numpy as np
from pathlib import Path 

import matplotlib.pyplot as plt

from keras.utils import np_utils
from keras.datasets import mnist
from keras.models import Sequential
from wandb.keras import WandbCallback
from keras.layers import Dense, Flatten, Dropout

Using TensorFlow backend.


In [None]:
#Getting the latest file from the folder
record_path = Path("RecordSummary")
training_data_list = list(filter(os.path.isfile,record_path.glob('*.csv')))
training_data_list.sort(key=lambda x: os.path.getmtime(x))
training_data = training_data_list[-1]

x_train_percent = 60
x_test_percent = 100-x_train_percent
x_train = []
x_test = []
y_train = []
y_test = []

with open(training_data) as csv_file:
    data = list(csv.reader(csv_file))
    lines = list(map(list, zip(*data)))

nr_x_train = math.ceil(len(lines[0])*x_train_percent/100)
lines = np.asarray(lines)

#X stands for input, Y stands for output

x_train = lines[1:, 0:nr_x_train]
y_train = lines[0, 0:nr_x_train]

x_test = lines[1:, nr_x_train:] 
y_test = lines[0, nr_x_train:]
x_train = x_train.transpose()
x_test = x_test.transpose()

x_train = [a.reshape(4,96) for a in x_train]
x_test = [a.reshape(4,96) for a in x_test]

In [None]:
#preparing the output for categorical crossentropy
print(np.unique(y_train))
y_train = np.where(y_train=='down', 0, y_train) 
y_train = np.where(y_train=='up', 1, y_train) 

y_test = np.where(y_test=='down', 0, y_test) 
y_test = np.where(y_test=='up', 1, y_test) 
print(np.unique(y_train))

In [None]:
channels = np.shape(x_train)[1]
samples = np.shape(x_train)[2]

print(channels)
print(samples)

In [None]:
print(np.shape(x_train))
print(np.shape(y_train))
print(np.shape(x_test))
print(np.shape(y_test))

In [None]:
print(type(x_train))
print(type(y_train))

In [None]:
#converting <class 'list'> to <class 'numpy.ndarray'>
x_train = np.asarray(x_train)
x_test = np.asarray(x_test)

In [None]:
# one hot encode outputs
y_train = np_utils.to_categorical(y_train)
y_test = np_utils.to_categorical(y_test)
labels = ["Down", "Up"]

num_classes = y_train.shape[1]

print(np.shape(y_train))
print(np.shape(y_test))

In [None]:
#Normalizing data:
x_train = x_train.astype("float")
x_test = x_test.astype("float")
merge = np.vstack((x_train, x_test))
minval = np.amin(merge)
maxval = np.amax(merge)
print(f'Min: {minval}, Max:{maxval}')
x_train = (x_train-minval)/(maxval-minval)
x_test = (x_test-minval)/(maxval-minval)

In [None]:
# create model
model=Sequential()
model.add(Flatten(input_shape=(channels,samples)))
model.add(Dense(num_classes, activation='softmax'))
model.compile(loss='categorical_crossentropy', optimizer='adam',
                metrics=['accuracy'])

In [None]:
# logging code
run = wandb.init(project="eeg-up-down")
config = run.config
config.epochs = 1000
# Fit the model
model.fit(x_train, y_train, epochs=config.epochs, validation_data=(x_test, y_test),
                    callbacks=[WandbCallback(labels=labels)])

In [None]:
print(model.predict(x_test[:10,:,:]))
print(y_test[:10])