In [6]:
from EEGModels import EEGNet
import scipy
import sys
import os
import numpy as np
from tensorflow.keras import utils as np_utils
from tensorflow.keras.callbacks import ModelCheckpoint
from sklearn.metrics import balanced_accuracy_score


In [7]:
folder_path_dance = '../condition_data/dance/raw/inst/'
folder_path_walk = '../condition_data/walk/raw/inst/'

file_list_dance = [os.path.join(folder_path_dance, file) for file in os.listdir(folder_path_dance)]
file_list_walk = [os.path.join(folder_path_walk, file) for file in os.listdir(folder_path_walk)]

file_list_dance.sort()
file_list_walk.sort()

# create a dictionary that maps indices to file names
file_dict_dance = {index: file_name.split("/")[-1] for index, file_name in enumerate(file_list_dance)}
file_dict_walk = {index: file_name.split("/")[-1] for index, file_name in enumerate(file_list_walk)}

print(file_dict_dance)
print(file_dict_walk)

X_dance = []
Y_dance = []
X_walk = []
Y_walk = []

for file in file_list_dance:
    dataset = scipy.io.loadmat(file)
    x = dataset['data']
    y = dataset['labels']

    y = np_utils.to_categorical(y-1, num_classes=6)
    print(x.shape, y.shape)

    X_dance.append(x)
    Y_dance.append(y)

for file in file_list_walk:
    dataset = scipy.io.loadmat(file)
    x = dataset['data']
    y = dataset['labels']

    y = np_utils.to_categorical(y-1, num_classes=6)
    print(x.shape, y.shape)

    X_walk.append(x)
    Y_walk.append(y)


{0: 'P_1.mat', 1: 'P_10.mat', 2: 'P_2.mat', 3: 'P_3.mat', 4: 'P_4.mat', 5: 'P_5.mat', 6: 'P_9.mat', 7: 'S_1.mat', 8: 'S_10.mat', 9: 'S_2.mat', 10: 'S_3.mat', 11: 'S_4.mat', 12: 'S_5.mat', 13: 'S_9.mat'}
{0: 'P_1.mat', 1: 'P_10.mat', 2: 'P_2.mat', 3: 'P_3.mat', 4: 'P_4.mat', 5: 'P_5.mat', 6: 'P_9.mat', 7: 'S_1.mat', 8: 'S_10.mat', 9: 'S_2.mat', 10: 'S_3.mat', 11: 'S_4.mat', 12: 'S_5.mat', 13: 'S_9.mat'}
(79, 14, 750) (79, 6)
(56, 14, 750) (56, 6)
(78, 14, 750) (78, 6)
(80, 14, 750) (80, 6)
(116, 14, 750) (116, 6)
(85, 14, 750) (85, 6)
(89, 14, 750) (89, 6)
(104, 14, 750) (104, 6)
(77, 14, 750) (77, 6)
(89, 14, 750) (89, 6)
(119, 14, 750) (119, 6)
(95, 14, 750) (95, 6)
(100, 14, 750) (100, 6)
(96, 14, 750) (96, 6)
(84, 14, 750) (84, 6)
(55, 14, 750) (55, 6)
(74, 14, 750) (74, 6)
(84, 14, 750) (84, 6)
(86, 14, 750) (86, 6)
(87, 14, 750) (87, 6)
(90, 14, 750) (90, 6)
(90, 14, 750) (90, 6)
(98, 14, 750) (98, 6)
(90, 14, 750) (90, 6)
(120, 14, 750) (120, 6)
(118, 14, 750) (118, 6)
(97, 14, 7

In [8]:
f = open('../condition_results/data_sizes.txt', 'a')
f.write('P_and_S_Dance_Inst: ' + str(sum(x.shape[0] for x in X_dance)) + '\n')
f.write('P_and_S_Walk_Inst: ' + str(sum(x.shape[0] for x in X_walk)) + '\n')
f.close()

In [9]:
kernels, chans, samples = 1, 14, 750

In [10]:
f = open('../condition_results/P_and_S_inst.txt', 'a')
f.write('---------------------------------DANCE----------------------------------\n')
f.close()

dance_accuracies = []
dance_accuracies_p = []
dance_accuracies_s = []

for index_1 in range(0,7):
    for index_2 in range(7,14):

        # build training set from allfiles expect the ones with index_1 and index_2
        X_train = np.concatenate([X_dance[i] for i in range(len(X_dance)) if i != index_1 and i != index_2])
        Y_train = np.concatenate([Y_dance[i] for i in range(len(Y_dance)) if i != index_1 and i != index_2])

        # build test set from the files with index_1 and index_2
        X_test = [X_dance[i] for i in range(len(X_dance)) if i == index_1 or i == index_2]
        Y_test = [Y_dance[i] for i in range(len(Y_dance)) if i == index_1 or i == index_2]

        # convert data to NHWC (trials, channels, samples, kernels) format.
        X_train = X_train.reshape(X_train.shape[0], chans, samples, kernels)

        model = EEGNet(nb_classes=6, Chans=chans, Samples=samples, dropoutRate=0.5, 
                       kernLength=32, F1=8, D=2, F2=16, dropoutType='Dropout')

        # compile the model and set the optimizers
        model.compile(loss='categorical_crossentropy', optimizer='adam', metrics = ['accuracy'])

        # count number of parameters in the model
        numParams    = model.count_params() 

        # set a valid path for the system to record model checkpoints
        # checkpointer = ModelCheckpoint(filepath='/tmp/checkpoint.h5', verbose=1, save_best_only=True)

        class_weights = {0: 1., 1: 1., 2: 1., 3: 1., 4: 1., 5: 1.}

        fitted_model = model.fit(X_train, Y_train, batch_size = 32, epochs = 50, verbose = 2, 
                                class_weight=class_weights)
        
        # make prediction on the 2 test files individually
        X_test_1 = X_test[0].reshape(X_test[0].shape[0], chans, samples, kernels)
        X_test_2 = X_test[1].reshape(X_test[1].shape[0], chans, samples, kernels)

        probs_1 = model.predict(X_test_1)
        probs_2 = model.predict(X_test_2)
        preds_1 = probs_1.argmax(axis = -1)
        preds_2 = probs_2.argmax(axis = -1)
        acc_1 = balanced_accuracy_score(Y_test[0].argmax(axis=-1), preds_1)
        acc_2 = balanced_accuracy_score(Y_test[1].argmax(axis=-1), preds_2)

        # make prediction on the 2 test files together
        X_test = np.concatenate(X_test)
        X_test = X_test.reshape(X_test.shape[0], chans, samples, kernels)
        Y_test = np.concatenate(Y_test)

        probs       = model.predict(X_test)
        preds       = probs.argmax(axis = -1)
        acc         = balanced_accuracy_score(Y_test.argmax(axis=-1), preds)

        dance_accuracies_p.append(acc_1)
        dance_accuracies_s.append(acc_2)
        dance_accuracies.append(acc)

        f = open('../condition_results/P_and_S_inst.txt', 'a')
        f.write("--------SELECTED FILES: '%s' and '%s'--------\n" % (file_dict_dance[index_1], file_dict_dance[index_2]))
        f.write("Classification accuracy for P: %f \n" % (acc_1))
        f.write("Classification accuracy for S: %f \n" % (acc_2))
        f.write("Classification accuracy for P and S: %f \n" % (acc))
        f.write("\n")

        f.close()


In [11]:
f = open('../condition_results/P_and_S_inst.txt', 'a')
f.write('\n\n---------------------------------WALK----------------------------------\n')
f.close()

walk_accuracies = []
walk_accuracies_p = []
walk_accuracies_s = []

for index_1 in range(0,7):
    for index_2 in range(7,14):

        # build training set from allfiles expect the ones with index_1 and index_2
        X_train = np.concatenate([X_walk[i] for i in range(len(X_walk)) if i != index_1 and i != index_2])
        Y_train = np.concatenate([Y_walk[i] for i in range(len(Y_walk)) if i != index_1 and i != index_2])

        # build test set from the files with index_1 and index_2
        X_test = [X_walk[i] for i in range(len(X_walk)) if i == index_1 or i == index_2]
        Y_test = [Y_walk[i] for i in range(len(Y_walk)) if i == index_1 or i == index_2]

        # convert data to NHWC (trials, channels, samples, kernels) format.
        X_train = X_train.reshape(X_train.shape[0], chans, samples, kernels)

        model = EEGNet(nb_classes=6, Chans=chans, Samples=samples, dropoutRate=0.5, 
                       kernLength=32, F1=8, D=2, F2=16, dropoutType='Dropout')

        # compile the model and set the optimizers
        model.compile(loss='categorical_crossentropy', optimizer='adam', metrics = ['accuracy'])

        # count number of parameters in the model
        numParams    = model.count_params() 

        # set a valid path for the system to record model checkpoints
        # checkpointer = ModelCheckpoint(filepath='/tmp/checkpoint.h5', verbose=1, save_best_only=True)

        class_weights = {0: 1., 1: 1., 2: 1., 3: 1., 4: 1., 5: 1.}

        fitted_model = model.fit(X_train, Y_train, batch_size = 32, epochs = 50, verbose = 2, 
                                class_weight=class_weights)
        
        # make prediction on the 2 test files individually
        X_test_1 = X_test[0].reshape(X_test[0].shape[0], chans, samples, kernels)
        X_test_2 = X_test[1].reshape(X_test[1].shape[0], chans, samples, kernels)

        probs_1 = model.predict(X_test_1)
        probs_2 = model.predict(X_test_2)
        preds_1 = probs_1.argmax(axis = -1)
        preds_2 = probs_2.argmax(axis = -1)
        acc_1 = balanced_accuracy_score(Y_test[0].argmax(axis=-1), preds_1)
        acc_2 = balanced_accuracy_score(Y_test[1].argmax(axis=-1), preds_2)

        # make prediction on the 2 test files together
        X_test = np.concatenate(X_test)
        X_test = X_test.reshape(X_test.shape[0], chans, samples, kernels)
        Y_test = np.concatenate(Y_test)

        probs       = model.predict(X_test)
        preds       = probs.argmax(axis = -1)
        acc         = balanced_accuracy_score(Y_test.argmax(axis=-1), preds)

        walk_accuracies_p.append(acc_1)
        walk_accuracies_s.append(acc_2)
        walk_accuracies.append(acc)

        f = open('../condition_results/P_and_S_inst.txt', 'a')
        f.write("--------SELECTED FILES: '%s' and '%s'--------\n" % (file_dict_walk[index_1], file_dict_walk[index_2]))
        f.write("Classification accuracy for P: %f \n" % (acc_1))
        f.write("Classification accuracy for S: %f \n" % (acc_2))
        f.write("Classification accuracy for P and S: %f \n" % (acc))
        f.write("\n")

        f.close()


In [12]:
# calculate the mean and std of the accuracies
p_acc_dance = np.array(dance_accuracies_p)
s_acc_dance = np.array(dance_accuracies_s)
acc_dance = np.array(dance_accuracies)

s_acc_walk = np.array(walk_accuracies_s)
p_acc_walk = np.array(walk_accuracies_p)
acc_walk = np.array(walk_accuracies)

f = open("../condition_results/P_and_S_inst.txt", "a")
f.write("\n\nMean overall accuracy dance: %f \n" % (np.mean(acc_dance)))
f.write("Std overall accuracy dance: %f \n" % (np.std(acc_dance)))
f.write("Mean overall accuracy walk: %f \n" % (np.mean(acc_walk)))
f.write("Std overall accuracy walk: %f \n" % (np.std(acc_walk)))
f.write("\n\nMean P accuracy dance: %f \n" % (np.mean(p_acc_dance)))
f.write("Std P accuracy dance: %f \n" % (np.std(p_acc_dance)))
f.write("Mean P accuracy walk: %f \n" % (np.mean(p_acc_walk)))
f.write("Std P accuracy walk: %f \n" % (np.std(p_acc_walk)))
f.write("\n\nMean S accuracy dance: %f \n" % (np.mean(s_acc_dance)))
f.write("Std S accuracy dance: %f \n" % (np.std(s_acc_dance)))
f.write("Mean S accuracy walk: %f \n" % (np.mean(s_acc_walk)))
f.write("Std S accuracy walk: %f \n" % (np.std(s_acc_walk)))
f.close()