In [7]:
# Import all relevant libraries
import warnings
def fxn(): 
	warnings.warn("deprecated",DeprecationWarning)

with warnings.catch_warnings( ):
    warnings.simplefilter("ignore")
    fxn( )

# Keras imports
import keras
from keras.models import Sequential
from keras.layers import Permute, Flatten, Softmax, Dense, Conv1D, Conv2D, Conv2DTranspose, AveragePooling2D, Activation, Reshape, Dropout

# Other
import numpy as np
import h5py
import sklearn
from sklearn.model_selection import StratifiedKFold

In [5]:
# Load data from specific trial
def get_trial(trial_num):    
    trial = h5py.File('../data/A0' + str(trial_num) + 'T_slice.mat', 'r')
    X = np.copy(trial['image'])
    y = np.copy(trial['type'])
    y = y[0,0:X.shape[0]:1]
    y = np.asarray(y, dtype=np.int32)
    y -= 769                            # shift class labels to [0-3]
    X = np.nan_to_num(X)[:, :22, :]     # remove EOG channels
    return X, y

def get_all_trials():
    X_total = np.concatenate([get_trial(trial_num)[0] for trial_num in range(1, 9)], axis=0)
    y_total = np.concatenate([get_trial(trial_num)[1] for trial_num in range(1, 9)], axis=0)
    return X_total, y_total

def stratified_train_test_split(X, y, k):
    ''' Returns a stratified train/test split, for k number of splits.
    Return value is in the form [(train indices, test indices), ... for k folds ]
    '''
    skf = StratifiedKFold(n_splits=k)
    return skf.split(X, y)

In [8]:
# Get the data from all the people and generate train/test split
X, y = get_all_trials()
y_cat = keras.utils.to_categorical(y, num_classes=4)
tt_splits = stratified_train_test_split(X, y, 5)

# The data for each trial is of the shape (288, 22, 1000)
#   There are 288 samples per trial (12 of each class per "run", 4 classes, 6 "runs" 
#                                   at different time periods of the day)
#   There are 22 electrodes from the EEG (represents spatial aspect of the signals)
#   There are 1000 time units (4 seconds of data, sampled at 250Hz). The first 250 units
#                                   are when no movement occurs (but the cue is heard) and
#                                   the next 750 units are when the movement occurs
# The labels for each trial belong in one of 4 classes
#   0 - left
#   1 - right
#   2 - foot
#   3 - tongue

In [9]:
print(X.shape)
print(y.shape)

(2304, 22, 1000)
(2304,)


In [10]:
# Create CNN model

def make_CNN():
    # input is of the form: (sample, spatial, temporal)
    model = Sequential()

    # Temporal convolution
    model.add(Reshape((22, 1000, 1), input_shape=(22, 1000)))
    model.add(Conv2D(filters=40, kernel_size=(1, 25), activation='elu', strides=1))
    print(model.output_shape)

    # Spatial convolution
    model.add(Conv2D(filters=40, kernel_size=(22, 40), activation='elu', data_format="channels_first"))
    print(model.output_shape)

    # Mean pool
    model.add(AveragePooling2D(pool_size=(1,75), strides=(1,15)))
    print(model.output_shape)

    # Dense layers
    model.add(Flatten())
    model.add(Dense(units=400, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(units=200, activation='relu'))
    model.add(Dropout(0.5))
    model.add(Dense(units=4, activation='softmax'))
    print(model.output_shape)

    model.compile(loss='categorical_crossentropy',
                  optimizer='SGD',
                  metrics=['accuracy'])
    
    return model

In [None]:
batch_size = 128

avg_acc = 0
for train_idx, test_idx in tt_splits:
    X_train = X[train_idx]
    y_train = y_cat[train_idx]
    X_test = X[test_idx]
    y_test = y_cat[test_idx]
    
    model = make_CNN()
    
    model.fit(X_train, y_train, epochs=1, batch_size=batch_size)
    metrics = model.evaluate(X_test, y_test, batch_size=batch_size)
    avg_acc += metrics[0]
    print(metrics)
    break

avg_acc /= len(splits)
print(avg_acc)

(None, 22, 976, 40)
(None, 40, 955, 1)
(None, 40, 59, 1)
(None, 4)
Epoch 1/1
 384/1840 [=====>........................] - ETA: 5:51 - loss: 1.9664 - acc: 0.2500