In [4]:
import sklearn as sk
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt 
import scipy.io as sio
from keras.utils import to_categorical
import time
import glob
from sklearn.model_selection import train_test_split
from sklearn.naive_bayes import GaussianNB

In [5]:
!ls
np.random.seed(816)

Channel-0.png       Channel-3.png       Channel-6.png       Untitled.ipynb
Channel-1.png       Channel-4.png       Channel-7.png       kaggle_model.ipynb
Channel-2.png       Channel-5.png       P300 Analysis.ipynb [34mmbsi-p300-dataset[m[m


In [6]:
folder = './mbsi-p300-dataset/*.mat'
files = glob.glob(folder)
test_file = files.pop(0)
files.pop(2)
print(files)

['./mbsi-p300-dataset/P300S03.mat', './mbsi-p300-dataset/P300S01.mat', './mbsi-p300-dataset/P300S05.mat', './mbsi-p300-dataset/P300S07.mat', './mbsi-p300-dataset/P300S06.mat', './mbsi-p300-dataset/P300S08.mat']


In [7]:
from scipy.signal import butter, lfilter, freqz

def butter_lowpass(cutoff, fs, order=5):
    nyq = 0.5 * fs
    normal_cutoff = cutoff / nyq
    b, a = butter(order, normal_cutoff, btype='low', analog=False)
    return b, a

def butter_lowpass_filter(data, cutoff, fs, order=5):
    b, a = butter_lowpass(cutoff, fs, order=order)
    y = lfilter(b, a, data)
    return y


# Filter requirements.
order = 6
fs = 24.0       # sample rate, Hz
cutoff = 1.5  # desired cutoff frequency of the filter, Hz

# Get the filter coefficients so we can check its frequency response.
b, a = butter_lowpass(cutoff, fs, order)

In [8]:
def prepare_data(file):
    raw_data = sio.loadmat(file)
    useful_data = raw_data['data'].copy()
    X = np.array(useful_data['X'][0,0])
    Y = np.array(useful_data['y'][0,0])
    T = np.array(useful_data['trial'][0,0])
    F = np.array(useful_data['flash'][0,0])
    # X_mean = np.mean(X, axis=1)
    X_filtered = butter_lowpass_filter(X, cutoff, fs, order)

    return X_filtered, Y, T, F

In [9]:
def normalized(vec):
    norm_vec = (vec - vec.min(axis=1, keepdims=True))/vec.ptp(axis=1, keepdims=True)
    return norm_vec

In [10]:
def clean_data(X, Y, flash):
  
    X_samples = np.array([np.array(X[i[0]:i[0]+351]) for i in flash])
    column    = [i[2] for i in flash]
    label     = [i[3] - 1 for i in flash]

    LIMIT = 4080 #the last trial is incomplete
    X_selected = np.array(X_samples[:LIMIT])
    col_selected = np.array(column[:LIMIT])
    label_selected = np.array(label[:LIMIT])

    y = np.array(to_categorical(label_selected))

    false_idx = [k for k, i in enumerate(y) if i[0] == 1]
    true_idx  = [k for k, i in enumerate(y) if i[0] == 0]

    falseX = X_selected[false_idx]
    falsey = y[false_idx]

    trueX  = X_selected[true_idx]  
    truey  = y[true_idx]
    # proportional data to avoid greedy cost funtion

    proportionalX = falseX[:int(len(trueX))]
    proportionaly = falsey[:int(len(truey))]

    finalX = np.concatenate((trueX, proportionalX))
    finaly = np.concatenate((truey, proportionaly))

    X_timeseries = np.vstack(finalX)
    X_letters = X_timeseries.reshape(34,40,351,8)
    y_letters = finaly.reshape(34,40,2)
    cleaned_X = np.vstack(X_letters)
    cleaned_Y = np.vstack(y_letters)

    return cleaned_X, cleaned_Y

In [11]:
X, Y, Trials, Flash = prepare_data(test_file)
X_clean, y_clean = clean_data(X, Y, Flash)


  X_samples = np.array([np.array(X[i[0]:i[0]+351]) for i in flash])


In [12]:
import tensorflow as tf

from tensorflow import keras
tf.keras.backend.clear_session()  # For easy reset of notebook state.

from tensorflow.keras import layers

input_layer = keras.Input(shape = (351,8,1), name='main_input')
x     = layers.Conv2D(16, 8, padding='same', activation='relu')(input_layer)
x     = layers.Conv2D(32, 6, padding='same', activation='relu')(x)
x     = layers.Conv2D(8, 4, padding='same', activation='relu')(x)
x     = layers.Conv2D(4, 2, padding='same', activation='relu')(x)
x     = layers.GlobalAveragePooling2D()(x)
x     = layers.Dense(8)(x)
x     = layers.Dense(64)(x)
output = layers.Dense(2, activation='softmax')(x)

model = keras.Model(inputs=input_layer, outputs=output)

model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
main_input (InputLayer)      [(None, 351, 8, 1)]       0         
_________________________________________________________________
conv2d (Conv2D)              (None, 351, 8, 16)        1040      
_________________________________________________________________
conv2d_1 (Conv2D)            (None, 351, 8, 32)        18464     
_________________________________________________________________
conv2d_2 (Conv2D)            (None, 351, 8, 8)         4104      
_________________________________________________________________
conv2d_3 (Conv2D)            (None, 351, 8, 4)         132       
_________________________________________________________________
global_average_pooling2d (Gl (None, 4)                 0         
_________________________________________________________________
dense (Dense)                (None, 8)                 40    

In [13]:
#compiling the model
opt = keras.optimizers.Adam(lr=0.001, beta_1=0.9, beta_2=0.999, amsgrad=True)  # default params
model.compile(optimizer=opt,
              loss='binary_crossentropy',
              metrics=['accuracy'])

In [14]:
def train_net(model):
    appX = []
    appy = []
    init = time.time()
    for file in files:
        X, Y, Trials, Flash = prepare_data(file)
        X_clean, y_clean = clean_data(X, Y, Flash)
        appX.append(X_clean)
        appy.append(y_clean)

    print("exited loops!")
    X = [subject for subject in appX]
    y = [subject for subject in appy]
    X_train, X_valid, y_train, y_valid = train_test_split(np.vstack(X), np.vstack(y), test_size=0.1, random_state=816)
    print("trained!!")
    
    
#     history = model.fit(X_train, y_train, validation_data=(X_valid, y_valid), batch_size=30, epochs=75, verbose=1)
    end = time.time()
    print("time elapsed training is:", (end - init)/60, " minutes")  
#     return history.history['accuracy'], history.history['val_accuracy'], history.history['loss'], history.history['val_loss']

In [19]:
def train_model():
    appX = []
    appy = []
    init = time.time()
    for file in files:
        X, Y, Trials, Flash = prepare_data(file)
        X_clean, y_clean = clean_data(X, Y, Flash)
        appX.append(X_clean)
        appy.append(y_clean)

    print("exited loops!")
    X = [subject for subject in appX]
    y = [subject for subject in appy]
    X_train, X_valid, y_train, y_valid = train_test_split(np.vstack(X), np.vstack(y), test_size=0.1, random_state=816)
    gnb = GaussianNB()
    
    X, Y, Trials, Flash = prepare_data(test_file)
    X_test, Y_test = clean_data(X, Y, Flash)
    y_pred = gnb.fit(X_train, y_train).predict(X_test)
    print(X_test.shape[0], (Y_test != y_pred).sum())
    
    
#     history = model.fit(X_train, y_train, validation_data=(X_valid, y_valid), batch_size=30, epochs=75, verbose=1)
    end = time.time()
    print("time elapsed training is:", (end - init)/60, " minutes")  
#     return history.history['accuracy'], history.history['val_accuracy'], history.history['loss'], history.history['val_loss']

In [20]:
# acc, val_acc, loss, val_loss = train_net(model)
train_model()

  X_samples = np.array([np.array(X[i[0]:i[0]+351]) for i in flash])


exited loops!


ValueError: Found array with dim 3. Estimator expected <= 2.

In [None]:
# Plot test accuracy values
plt.rcParams["figure.figsize"] = (10,7)
plt.plot(acc)
plt.plot(val_acc)
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Valid'], loc='upper left')
plt.show()

In [None]:
# Plot test accuracy values
plt.plot(loss)
plt.plot(val_loss)
plt.title('Model loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Valid'], loc='upper left')
plt.show()

In [None]:
X, Y, Trials, Flash = prepare_data(test_file)
X_clean, y_clean = clean_data(X, Y, Flash)
X_train, X_test, y_train, y_test = train_test_split(X_clean, y_clean, test_size=0.1, random_state=816)
history = model.fit(X_train, y_train, batch_size=1, epochs=20)

In [None]:
# Plot test accuracy values
plt.plot(history.history['accuracy'])
plt.title('Model accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Test'], loc='upper left')
plt.show()