# Written on 2024.05.23

This notebook is used to perform 10-fold cross-validation to trained models based on DeepConvNet.

The idea is to use the same training dataset used for the previously selected models. The workflow is as follows

1. Load the training dataset
2. Generate a 10 fold iterator
3. Generate a model per fold
4. Save accuracies and accuracy plots.

After exporting the models in .h5 format, these files will be used to predict with the testing subset and generate the confusion matrices per each fold (not sure if this is necessary, but I'll do it if I have time)

# **1. Instancies and libraries**

## 1.1 Add EEGNet library to the environmental variable

In [2]:
import sys
path = "/content/EEGNET"
sys.path.append(path)

In [3]:
!pip install pyyaml h5py



## 1.2 Import libraries

In [4]:
# Filesystem
import os
import zipfile

# data processing
import numpy as np
import pandas as pd

# AI-related
import tensorflow as tf
from tensorflow.keras.callbacks import Callback
from imblearn.over_sampling import RandomOverSampler
from sklearn.metrics import confusion_matrix
import itertools
#library for cross validation
from sklearn.model_selection import StratifiedKFold

# Plotting
%matplotlib inline
import matplotlib.pyplot as plt

#EEGNET
from EEGModels import DeepConvNet
from tensorflow.keras import utils as np_utils
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras import backend as K


# **2. Functions**

## 2.1 Function to visualize the performance of the model

In [5]:
def visualize_results (model, n_epochs):
  epochs = [i for i in range (n_epochs)]
  fig, ax = plt.subplots(1,2)
  train_acc = model.history["accuracy"]
  train_loss = model.history["loss"]
  val_acc = model.history["val_accuracy"]
  val_loss = model.history["val_loss"]
  fig.set_size_inches(16,9)

  ax[0].plot(epochs, train_acc, "go-", label = "Training Accuracy")
  ax[0].plot(epochs, val_acc, "ro-", label = "Validation Accuracy")
  ax[0].set_title("Training and Validation Accuracy")
  ax[0].legend()
  ax[0].set_xlabel("Epochs")
  ax[0].set_ylabel("Accuracy")

  ax[1].plot(epochs, train_loss, "go-", label = "Training Loss")
  ax[1].plot(epochs, val_loss, "ro-", label = "Validation Loss")
  ax[1].set_title("Training and Validation Loss")
  ax[1].legend()
  ax[1].set_xlabel("Epochs")
  ax[1].set_ylabel("Loss")

  plt.show()

## 2.2 **Function** to plot Confusion Matrix

In [None]:
def plot_confusion_matrix (cm,
                           classes,
                           normalize = False,
                           title = 'Confusion Matrix',
                           cmap=plt.cm.Greens
                          ):
  plt.imshow (cm, interpolation='nearest', cmap=cmap)
  plt.title (title)
  plt.colorbar()
  tick_marks = np.arange(len(classes))
  plt.xticks(tick_marks, classes, rotation=45)
  plt.yticks(tick_marks, classes)

  if normalize:
    cm=cm.astype('float')/cm.sum(axis=1)[:, np.newaxis]
    print("Normalized confusion matrix")
  else:
    print("Confusion matrix, without normalization")
  print(cm)

  thresh = cm.max()*0.80
  for i, j in itertools.product (range(cm.shape[0]), range(cm.shape[1])):
     plt.text(j, i, round(cm[i,j],2),
              horizontalalignment="center",
              color="white" if cm [i, j] > thresh else "black")

  plt.tight_layout()
  plt.ylabel('True Label')
  plt.xlabel('Predicted Label')

## 2.3 Function to normalize EEG data (run inside the loop to assemble the tensor)

In [6]:
def normalize_channels (data, n_channels):
  data_norm = np.zeros_like(data, dtype='float32')
  for ch in range (n_channels):
      min_val = (np.min(data[ch]))
      max_val = (np.max(data[ch]))
      data_norm [ch] = (data[ch] - min_val) / (max_val - min_val)

  return data_norm

# **3. Data Load**

## 3.1 Unzip dataset

In [7]:
local_zip = "/content/ERP_Pretest_Data_128Hz.zip"
zip_ref = zipfile.ZipFile(local_zip, "r")
zip_ref.extractall("/content/ERP_Pretest_Data_128_Hz")
zip_ref.close()

## 3.2 Define file paths

In [8]:
train_data_path = "/content/ERP_Pretest_Data_128_Hz/Train/"

## 3.3 Load file lists

In [9]:
trainlist = open  ("/content/ERP_Pretest_Data_128_Hz/TrainFileList.txt", "r")
data = trainlist.read()
train_data_all_files = data.split("\n")

## 3.4 Determine the number of trials available based on the number of files

In [10]:
n_trials_train = len(train_data_all_files)

## 3.5 Define the parameters for EEG data

In [11]:
n_channels = 20
n_samples = 128
n_kernels = 1

ch_names = ['Fz', 'F7', 'F3', 'F4', 'F8',
            'T7', 'C3', 'CZ', 'C4', 'T8',
            'P7', 'P3', 'Pz', 'P4', 'P8',
            'O1', 'Oz', 'O2', 'LM', 'RM']

sfreq = 128
#info = mne.create_info(ch_names = ch_names, sfreq = sfreq)

## 3.6 Load training data

This data will be loaded as 2D array for augmentation and balancing

In [12]:
train_set_2D = np.zeros ((n_trials_train,n_channels*n_samples), dtype='float32')

In [13]:
trial = 0
for filename in train_data_all_files:
  data = pd.read_csv(filename, header=None, dtype=np.float32)
  trl = np.reshape (np.array(data), (n_channels*n_samples))
  train_set_2D[trial:] = trl
  trial=trial+1

Load labels for training data

In [14]:
train_labels_path = '/content/ERP_Pretest_Data_128_Hz/TrainLabels.csv'
y_0 = np.array(pd.read_csv(train_labels_path, header=None, dtype='uint8'))
y_0 = np.squeeze(y_0.T)

Balance the training set using the Random Oversampler

In [15]:
oversampler_train = RandomOverSampler(sampling_strategy='minority')
traindata_over, trainlabels_over = oversampler_train.fit_resample(train_set_2D, y_0)
(overTrials_tr, overSamples_tr) = traindata_over.shape

Allocate data in tensor

In [16]:
X = np.zeros((overTrials_tr,n_channels,n_samples), dtype=np.float32)

In [17]:
for i in range(overTrials_tr):
  trl_rs = np.reshape (traindata_over[i],(n_channels,n_samples))
  # Optional operation to check if this improves the model performance
  trl_rs = normalize_channels(np.array(trl_rs, dtype='float32'), n_channels)
  X[i] = trl_rs

# reasign the training labels
y = trainlabels_over
print(y.shape)

(1342,)


# 4 DeepConv Section

# 4.1 Define parameters for architecture

In [18]:
# Model Build
classes = 2
dropout_rate = 0.2                  # hp.Float  ('dropoutRate',  min_value=0.2, max_value=0.5, sampling="log")

# Compile
lr = 1e-4                           # hp.Float  ('learning_rate',  min_value=1e-6, max_value=1e-2, sampling="log")

batch = 16
epoch = 300

# 4.2 Generate model architecture

In [19]:
DeepConvNet_Model = DeepConvNet(nb_classes = classes,
                       Chans = n_channels,
                       Samples = n_samples,
                       dropoutRate = dropout_rate)

DeepConvNet_Model.summary()

Model: "model"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 input_1 (InputLayer)        [(None, 20, 128, 1)]      0         
                                                                 
 conv2d (Conv2D)             (None, 20, 124, 25)       150       
                                                                 
 conv2d_1 (Conv2D)           (None, 1, 124, 25)        12525     
                                                                 
 batch_normalization (Batch  (None, 1, 124, 25)        100       
 Normalization)                                                  
                                                                 
 activation (Activation)     (None, 1, 124, 25)        0         
                                                                 
 max_pooling2d (MaxPooling2  (None, 1, 62, 25)         0         
 D)                                                          

# 5. Cross-validation settings

In [22]:
import keras
from keras import optimizers

# Define number of folds
n_splits = 10

# Initialize StratifiedKFold
skf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)

# Lists to store evaluation results
accuracies = []

# Perform n-fold cross-validation
# for train_index, val_index in skf.split(X, y):
for fold, (train_index, val_index) in enumerate(skf.split(X, y)):

    X_train, X_val = X[train_index], X[val_index]
    y_train, y_val = y[train_index], y[val_index]

    # reshape to one-hot encodings
    # datasets
    X_train = X_train.reshape(X_train.shape[0], n_channels, n_samples, n_kernels)
    X_val  = X_val.reshape(X_val.shape[0], n_channels, n_samples, n_kernels)

    #labels
    y_train = np_utils.to_categorical(y_train)
    y_val = np_utils.to_categorical(y_val)


    opt = keras.optimizers.Adam(learning_rate = lr)

    #Checkpointer
    checkpoint_filename = '/tmp/'+'checkpoint' + str (fold) + '.h5'
    checkpointer = ModelCheckpoint(filepath=checkpoint_filename,
                                   verbose=1,
                                   save_best_only=True)

    DeepConvNet_Model.compile(loss='binary_crossentropy',
                         optimizer=opt,
                         metrics = ['accuracy'])

    #load weights from externalfile
    DeepConvNet_Model.load_weights('/content/Weights/DeepConvNet_Model.h5')


    fittedModel = DeepConvNet_Model.fit(X_train,
                                   y_train,
                                   batch_size = batch,
                                   epochs = epoch,
                                   verbose = 2,
                                   validation_data=(X_val, y_val),
                                   callbacks=[checkpointer])

    # load optimal weights
    DeepConvNet_Model.load_weights(checkpoint_filename)

    # Evaluate the model on the validation set
    _, accuracy = DeepConvNet_Model.evaluate(X_val, y_val)
    accuracies.append(accuracy)

    #plot output
    print ('Fold ' + str (fold))
    visualize_results (fittedModel, epoch)

    # Save model output
    model_filepath = '/content/Models/DeepConvNet_model_fold_' + str(fold) + '.h5'
    DeepConvNet_Model.save(model_filepath)


Output hidden; open in https://colab.research.google.com to view.

In [23]:
print("Average accuracy:", np.mean(accuracies))

Average accuracy: 0.9709452748298645


In [24]:
outputfile = '/content/Models/DeepConvNet_Accuracies.csv'
df = pd.DataFrame({'Accuracy': accuracies})
df.to_csv(outputfile)