In [1]:
import os
import tensorflow as tf
from sklearn.model_selection import train_test_split
import numpy as np
from sklearn import preprocessing
import scipy.io
from tensorflow.keras import layers
import mne
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint
from tensorflow.keras.models import load_model

2023-04-08 23:27:56.110756: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [None]:
#load data from ../EEG folder, all csv files
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
tf.get_logger().setLevel('ERROR')

def get_data():
    data = []
    scores = []
    order = []
    filedir = '/mnt/Ryans Study/EEG'
    for file in [f for f in os.listdir(filedir) if f.endswith(".fif") and not f.endswith("resting.fif") and not f.endswith("hard.fif")]:
        filepath = filedir + "/" + file
        print(file)
        # get number from file name
        order.append(int(file.split("_")[0]))
        # load raw file
        raw = mne.io.read_raw_fif(filepath, preload=True)
        # get data
        data.append(raw.get_data(picks=['Fp1','Fp2'])[:, 7500:-15000])
        # get scores from file names, 1 = watching, 2 = normal, 3 = hard
        if "watching" in file or "watch" in file:
            scores.append(0)
        elif "normal" in file or "correct" in file:
            scores.append(1)
        elif "hard" in file:
            scores.append(2)

    return data, scores, order


def remove_participant(data, scores, order, participant):
    newdata = []
    newscores = []
    removed_participant = []
    removed_scores = []

    for x, y, z in zip(data, scores, order):
        if z != participant:
            newdata.append(x)
            newscores.append(y)
        else:
            removed_participant.append(x)
            removed_scores.append(y)

    return np.array(newdata), np.array(newscores), np.array(removed_participant), np.array(removed_scores)


def split_timeseries(series, window_size=1000, overlap=100):
    segments = []
    for i in range(0, series.shape[-1] - window_size + 1, window_size - overlap):
        segment = series[..., i:i + window_size]
        # add extra dimension for channel
        x_max = np.max(segment)
        x_avg = np.mean(segment)
        segment = (segment - x_avg) / x_max
        segment = np.expand_dims(segment, axis=-1)
        segments.append(segment)
    return segments


# split data into 1000 sample sliding window with 100 sample overlap
def split_data(data, scores, order, window_size=1000, overlap=100):
    X = []
    Y = []
    neworder = []
    for x, y, z in zip(data, scores, order):
        x = split_timeseries(x, window_size, overlap)
        X.extend(x)
        Y.extend([y] * len(x))
        neworder.extend([z] * len(x))
    return X, Y, neworder


def remove_nan(data, scores, order):
    # if series contains nan, remove it
    newdata = []
    newscores = []
    neworder = []
    for x, y, z in zip(data, scores, order):
        if np.isnan(x).any():
            continue
        else:
            newdata.append(x)
            newscores.append(y)
            neworder.append(z)
    return newdata, newscores, neworder


# def model
def EEGNet_seq(nb_classes, Chans=64, Samples=128,
               dropoutRate=0.5, kernLength=64, F1=8,
               D=2, F2=16, norm_rate=0.25, dropoutType='Dropout',
               learning_rate=3e-3, loss="mse"):
    """Create a Sequential EEGNet model.

      nb_classes      : int, number of classes to classify
      Chans, Samples  : number of channels and time points in the EEG data
      dropoutRate     : dropout fraction
      kernLength      : length of temporal convolution in first layer. We found
                        that setting this to be half the sampling rate worked
                        well in practice. For the SMR dataset in particular
                        since the data was high-passed at 4Hz we used a kernel
                        length of 32.
      F1, F2          : number of temporal filters (F1) and number of pointwise
                        filters (F2) to learn. Default: F1 = 8, F2 = F1 * D.
      D               : number of spatial filters to learn within each temporal
                        convolution. Default: D = 2
      dropoutType     : Either SpatialDropout2D or Dropout, passed as a string.

    :param nb_classes: int, number of classes to classify
    :param Chans: Number of channels in the EEG data
    :param Samples: Number of time poitns in EEG data
    :param dropoutRate: dropout fraction
    :param kernLength: Length of temporal convolution in first layer
    :param F1: Number of temporal features to learn
    :param D: Number of spatial filters to learn within each temporal
    convolution
    :param F2: Number of pointwise filters to learn
    :param norm_rate: Normalisation rate
    :param dropoutType: Dropout method to use
    :param learning_rate: Learning rate of classifier
    :param loss: Loss function of classifier
    :returns: Keras Sequential model

    """
    from tensorflow.keras.models import Model
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense, Activation, Dropout
    from tensorflow.keras.layers import Conv2D, AveragePooling2D
    from tensorflow.keras.layers import SeparableConv2D, DepthwiseConv2D
    from tensorflow.keras.layers import BatchNormalization
    from tensorflow.keras.layers import SpatialDropout2D
    from tensorflow.keras.layers import Input, Flatten
    from tensorflow.keras.constraints import max_norm
    from tensorflow import keras

    if dropoutType == 'SpatialDropout2D':
        dropoutType = SpatialDropout2D
    elif dropoutType == 'Dropout':
        dropoutType = Dropout
    else:
        raise ValueError('dropoutType must be one of SpatialDropout2D '
                         'or Dropout, passed as a string.')

    model = Sequential([
        # Input(shape = (Chans, Samples, 1)),

        # Block 1
        Conv2D(F1, (1, kernLength), padding='same',
               input_shape=(Chans, Samples, 1),
               use_bias=False),
        BatchNormalization(),
        DepthwiseConv2D((Chans, 1), use_bias=False,
                        depth_multiplier=D,
                        depthwise_constraint=max_norm(1.)),
        BatchNormalization(),
        Activation('elu'),
        AveragePooling2D((1, 4)),
        dropoutType(dropoutRate),

        # Block 2
        SeparableConv2D(F2, (1, 16), use_bias=False, padding='same'),
        BatchNormalization(),
        Activation('elu'),
        AveragePooling2D((1, 8)),
        dropoutType(dropoutRate),

        Flatten(name='flatten'),
        Dense(nb_classes, name='dense',
              kernel_constraint=max_norm(norm_rate)),
        Activation('softmax', name='softmax')
    ])

    optimizer = keras.optimizers.Adam(learning_rate=learning_rate)
    model.compile(loss=loss, optimizer=optimizer, metrics=["accuracy"])
    return model



window_size = 2500
channels = 2


data, scores, order = get_data();
n_participants = len(set(order))
data, scores, order = split_data(data, scores, order, window_size=window_size, overlap=100)
#data, scores, order = remove_nan(data, scores, order)

#scored = scores
# one hot encode scores sklearn
scores = preprocessing.OneHotEncoder().fit_transform(np.array(scores).reshape(-1, 1))
scores = scores.toarray()

# use test train split inc order
train_data, test_data, train_scores, test_scores, train_order, test_order = train_test_split(data, scores, order,test_size=0.2, random_state=42, shuffle=True)
# test data into array
test_data = np.array(test_data)
#test_scores = test_scores.tolist()
history = []

# define the checkpoint path and filename
checkpoint_path = "best_model_CNNLSTM.h5"



# leave one out cross validation
for i in range(n_participants):
    train_datas, train_scoress, val_data, val_scores = remove_participant(train_data, train_scores, train_order, i + 1);
    # model
    # define model checkpoint callback
    checkpoint = ModelCheckpoint(checkpoint_path, monitor='val_accuracy', verbose=1, save_best_only=True, mode='max')

    # define early stopping callback
    early_stop = EarlyStopping(monitor='val_accuracy', patience=50, mode='max')

    models = EEGNet_seq(nb_classes=2, Chans=channels, Samples=window_size, dropoutRate=0.5, kernLength=64, F1=8, D=2, F2=16, norm_rate=0.25, dropoutType='Dropout', learning_rate=3e-3, loss="mse")
    hist = models.fit(train_datas, train_scoress, epochs=500, batch_size=512, validation_data=(val_data, val_scores),callbacks=[early_stop, checkpoint])
    hist = load_model(checkpoint_path)
    history.append(hist);




: 

In [None]:
# evaluate all models on test set and save best model
best_model = None
best_acc = 0
# change test_scores to list instead of array
# set no logging
tf.get_logger().setLevel(3)

for i, model in enumerate(history):
    # evaluate model on test set
    _, acc = model.evaluate(test_data, test_scores, verbose=0)
    print('Model %d: %.3f' % (i + 1, acc))
    # check if best model
    if acc > best_acc:
        best_acc = acc
        best_model = model

# save best model
best_model.save("best_model_CNNBILSTM_binary.h5")

: 

In [None]:
# import plot_model, confusion_matrix, plt, sns
from tensorflow.keras.utils import plot_model
from sklearn.metrics import confusion_matrix
import matplotlib.pyplot as plt
import seaborn as sns

# print best model accuracy
print('Best Model Accuracy: %.3f' % best_acc)


# plot best model
plot_model(best_model, to_file='best_model_CNNBILSTM_binary.png', show_shapes=True, show_layer_names=True)
# confusion matrix
y_pred = best_model.predict(test_data)
y_pred = np.argmax(y_pred, axis=1)
y_true = np.argmax(test_scores, axis=1)
cm = confusion_matrix(y_true, y_pred)
# plot confusion matrix
plt.figure(figsize=(10, 10))
sns.heatmap(cm, annot=True, fmt="d")
plt.title("Confusion matrix")
plt.ylabel('True label')
plt.xlabel('Predicted label')
plt.show()



: 

: 