# Step 1: Importing Libraries and Loading Data

In [None]:
## In this section, we import the necessary libraries and load the EEG data of four different patients (0013, 0025, 0053, and 0090).
## The data is then stored in variables POS_x and NEG_x, where x represents the patient number. 
## The POS_x variable contains the IED-positive epochs, and the NEG_x variable contains the IED-negative epochs.

In [None]:
import copy
import scipy.io
import mat73
import numpy as np
import pandas as pd
from matplotlib import pyplot as plt 
import tensorflow as tf
from tensorflow.keras import datasets, layers, models
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import ModelCheckpoint, ReduceLROnPlateau, EarlyStopping
from tensorflow.keras import metrics
from tensorflow import keras
from sklearn.utils import shuffle
import collections
from sklearn import preprocessing
from sklearn.model_selection import train_test_split 
import pickle
from sklearn.preprocessing import MinMaxScaler
from scikeras.wrappers import KerasClassifier
from sklearn.model_selection import GridSearchCV
import sklearn.metrics
from sklearn.metrics import accuracy_score
from sklearn.decomposition import PCA
from scipy.stats import kurtosis
from sklearn.metrics import confusion_matrix

In [None]:
# Checks if GPUs are available
print("Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

# Prints the TensorFlow version
tf.__version__

In [3]:
# Loads the EEG data of four different patients (0013, 0025, 0053, and 0090)

path = '/export03/data/Yuxin/pt_data/' # Defines the paths to the data

POS_0013 = mat73.loadmat(path + 'pt0013/POS.mat')['POS']
NEG_0013 = mat73.loadmat(path + 'pt0013/NEG.mat')['NEG']

POS_0025 = mat73.loadmat(path + 'pt0025/POS.mat')['POS']
NEG_0025 = mat73.loadmat(path + 'pt0025/NEG.mat')['NEG']

POS_0053 = mat73.loadmat(path + 'pt0053/POS.mat')['POS']
NEG_0053 = mat73.loadmat(path + 'pt0053/NEG.mat')['NEG']

POS_0090 = mat73.loadmat(path + 'pt0090/POS.mat')['POS']
NEG_0090 = mat73.loadmat(path + 'pt0090/NEG.mat')['NEG']

# Step 2: EEG Data Preprocessing for Neural Network Training

In [None]:
## In this section, we process the EEG data using *PCs ordering method* to prepare it for neural network training. 
## The data is transposed to ensure it has the correct shape for training, and then epoching is applied to split the data into 1-second epochs. 
## Next, the data is scaled to a range between 0 and 1 using the MinMaxScaler method. 

In [None]:
## By Raymundo Cassani, used with permission from: https://github.com/MuSAELab/amplitude-modulation-analysis-module/blob/master/am_analysis/am_analysis.py

def epoching(data, samples_epoch, samples_overlap = 0):
    """Divide an array in a colletion of smaller arrays
    
    Divides the `data` provided as [n_samples, n_channels] using the 
    `size_epoch` indicated (in samples) and the `overlap_epoch` between 
    consecutive epochs.
   
    Parameters
    ----------
    data : 2D array 
        with shape (n_samples, n_channels)
    samples_epochs : 
        number of samples in smaller epochs
        
    samples_overlap : 
        number of samples for ovelap between epochs (Default 0)
    Returns
    -------
    epochs : 3D array 
        with shape (samples_epoch, n_channels, n_epochs)
    
    remainder : 2D array 
        with the remaining data after last complete epoch
    
    ix_center : 1D array
        indicates the index tha corresponds to the center of the nth epoch.
    """ 
    # input 'data' as 2D matrix [samples, columns]
    try:
        data.shape[1]
    except IndexError:
        data = data[:, np.newaxis]
    
    # number of samples and number of channels
    n_samples, n_channels = data.shape

    # Size of half epoch
    half_epoch = np.ceil(samples_epoch / 2 )

    # Epoch shift   
    samples_shift = samples_epoch - samples_overlap

    # Number of epochs
    n_epochs =  int(np.floor( (n_samples - samples_epoch) / float(samples_shift) ) + 1 )
    if n_epochs == 0:
        return np.array([]), data, np.array([])

    #markers indicates where the epoch starts, and the epoch contains samples_epoch rows
    markers = np.asarray(range(0,n_epochs)) * samples_shift
    markers = markers.astype(int)

    #Divide data in epochs
    epochs = np.zeros((samples_epoch, n_channels, n_epochs))
    ix_center = np.zeros((n_epochs,1))

    for i_epoch in range(0,n_epochs):
        epochs[:,:,i_epoch] = data[ markers[i_epoch] : markers[i_epoch] + samples_epoch ,:]
        ix_center[i_epoch] = markers[i_epoch] -1 + half_epoch
        
    if ( (markers[-1] + samples_epoch) < n_samples): 
        remainder = data[markers[-1] + samples_epoch : n_samples, :]
    else:
        remainder = np.asarray([])
    
    return epochs, remainder, ix_center.astype(int)

In [None]:
## This function scales each channel of the input data to a range of [0,1]
## Input: 3D numpy array (channels, timepoints, trials)
## Output: Scaled 3D numpy array with values in the range of [0,1]

def scale_data(data):
  counter = 0
  return_copy = copy.deepcopy(data)
  
  # Loop over each channel
  for item in data:
    # Reshape to 2D array
    one_column = np.reshape(item, (-1, 1))
    
    # Scale data to range [0,1]
    scaler = MinMaxScaler()
    scaler.fit(one_column)
    one_column = scaler.transform(one_column)
    
    # Reshape back to 3D array
    transformed = np.reshape(one_column, (data.shape[1], data.shape[2]))
    return_copy[counter] = transformed
    counter=counter+1
  
  return return_copy

In [None]:
## This function processes raw EEG data with the *PCs ordering method* to prepare for neural network training.

def process_raw_data(Data, EEG_start, EEG_end):

    # The function takes raw data as input, and the start and end indices of EEG channels

    # `EEG_start``: the starting index of EEG channels in the data
    # `EEG_end``: the ending index of EEG channels in the data

    # Make a deep copy of the input data to avoid overwriting the original data
    deep_copy = copy.deepcopy(Data)

    # Transpose the deep copied data
    transposed = np.transpose(deep_copy)

    # Slice the transposed data to extract EEG data
    EEG = transposed[:,EEG_start-1:EEG_end]

    # PCs ordering method ----------------------------------
    # Perform PCA on the EEG data and select the first 20 principal components
    n_components = 20
    pca_EEG = PCA(n_components)
    pca_EEG.fit(EEG)
    EEG_transformed = pca_EEG.transform(EEG)
    
    # Sort the first 5 PCs by kurtosis
    first_5_PCs = EEG_transformed[:, :5] # Extract the first 5 PCs from the transformed EEG data
    kurtosis_values = kurtosis(first_5_PCs, axis=0) # Calculate kurtosis for each of the first 5 PCs
    sorted_indices = np.argsort(kurtosis_values) # Sort the indices of the first 5 PCs based on their kurtosis values
    sorted_first_5_PCs = first_5_PCs[:, sorted_indices] # Order the first 5 PCs based on the sorted indices

    # Sort the first 2 PCs from the sorted_first_5_PCs by variance
    first_2_sorted_PCs = sorted_first_5_PCs[:, :2] # Extract the first 2 PCs from the sorted_first_5_PCs
    variances = np.var(first_2_sorted_PCs, axis=0) # Calculate variance for each of the first 2 sorted PCs
    sorted_indices = np.argsort(variances)[::-1] # Sort the indices of the first 2 sorted PCs based on their variance values
    sorted_first_2_PCs = first_2_sorted_PCs[:, sorted_indices] # Order the first 2 sorted PCs based on the sorted indices

    # Replace the first 5 PCs with the sorted PCs in the transformed EEG data
    EEG_transformed[:, :5] = sorted_first_5_PCs
    EEG_transformed[:, :2] = sorted_first_2_PCs
    # -------------------------------------------------------

    # Epoching the data into 1 second intervals
    EEG = epoching(EEG_transformed, 256, 0)[0]

    # Transpose the data to match CNN input shape
    EEG = EEG.transpose(2,0,1)

    # Scale the data to range between 0 and 1 with MinMaxScaler
    EEG = scale_data(EEG)

    return EEG

In [None]:
# For each of the four participants, we use the `process_raw_data` function to extract the EEG data from the appropriate channel range, 
# epoch the data into 1-second epochs, transpose the data, and scale it to the range between 0 and 1 using the scale_data function. 
# The resulting EEG data for each participant are stored in POS_EEG and NEG_EEG variables.

POS_EEG_0013 = process_raw_data(POS_0013, 301, 321)
NEG_EEG_0013 = process_raw_data(NEG_0013, 301, 321)

POS_EEG_0025 = process_raw_data(POS_0025, 299, 321)
NEG_EEG_0025 = process_raw_data(NEG_0025, 299, 321)

POS_EEG_0053 = process_raw_data(POS_0053, 299, 327)
NEG_EEG_0053 = process_raw_data(NEG_0053, 299, 327)

POS_EEG_0090 = process_raw_data(POS_0090, 302, 357)
NEG_EEG_0090 = process_raw_data(NEG_0090, 302, 357)


# Step 3: Input Data Visualisation

In [None]:
# Here, we plot the averaged EEG IEDs (interictal epileptiform discharges) for patient 0013. 
# The first plot shows the averaged EEG IEDs for positive epochs (EEG IED_positive), 
# and the second plot shows the averaged EEG IEDs for negative epochs (EEG IED_negative). 
# The same plots are generated for the other three patients.

fig_0013, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(10, 5), dpi=150)

ax1.plot(np.mean(POS_EEG_0013, axis=0))
ax1.invert_yaxis()
ax1.set_title('Averaged EEG_IED_pos Epochs')
ax1.set_xlabel('Sample number')
ax1.set_ylabel('Amplitude')

ax2.plot(np.mean(NEG_EEG_0013, axis=0))
ax2.invert_yaxis()
ax2.set_title('Averaged EEG_IED_neg Epochs')
ax2.set_xlabel('Sample number')
ax2.set_ylabel('Amplitude')

fig_0013.suptitle('Patient 0013', fontsize=22)
fig_0013.tight_layout()

In [None]:
fig_0025, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(10, 5), dpi=150)

ax1.plot(np.mean(POS_EEG_0025, axis=0))
ax1.invert_yaxis()
ax1.set_title('Averaged EEG_IED_pos Epochs')
ax1.set_xlabel('Sample number')
ax1.set_ylabel('Amplitude')

ax2.plot(np.mean(NEG_EEG_0025, axis=0))
ax2.invert_yaxis()
ax2.set_title('Averaged EEG_IED_neg Epochs')
ax2.set_xlabel('Sample number')
ax2.set_ylabel('Amplitude')

fig_0025.suptitle('Patient 0025', fontsize=22)
fig_0025.tight_layout()

In [None]:
fig_0053, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(10, 5), dpi=150)

ax1.plot(np.mean(POS_EEG_0053, axis=0))
ax1.invert_yaxis()
ax1.set_title('Averaged EEG_IED_pos Epochs')
ax1.set_xlabel('Sample number')
ax1.set_ylabel('Amplitude')

ax2.plot(np.mean(NEG_EEG_0053, axis=0))
ax2.invert_yaxis()
ax2.set_title('Averaged EEG_IED_neg Epochs')
ax2.set_xlabel('Sample number')
ax2.set_ylabel('Amplitude')

fig_0053.suptitle('Patient 0053', fontsize=22)
fig_0053.tight_layout()

In [None]:
fig_0090, (ax1, ax2) = plt.subplots(nrows=1, ncols=2, figsize=(10, 5), dpi=150)

ax1.plot(np.mean(POS_EEG_0090, axis=0))
ax1.invert_yaxis()
ax1.set_title('Averaged EEG_IED_pos Epochs')
ax1.set_xlabel('Sample number')
ax1.set_ylabel('Amplitude')

ax2.plot(np.mean(NEG_EEG_0090, axis=0))
ax2.invert_yaxis()
ax2.set_title('Averaged EEG_IED_neg Epochs')
ax2.set_xlabel('Sample number')
ax2.set_ylabel('Amplitude')

fig_0090.suptitle('Patient 0090', fontsize=22)
fig_0090.tight_layout()

# Step 4: Prepare the EEG data for training and testing the model

In [None]:
## Concatenate the EEG data from patients 0013, 0025, 0053, and 0090 for training and testing
## Note: Training and testing data do not overlap

# Combine positive (IED) EEG data from all patients
POS_EEG = np.concatenate((POS_EEG_0013, POS_EEG_0025, POS_EEG_0053, POS_EEG_0090))

# Combine negative (non-IED) EEG data from all patients
NEG_EEG = np.concatenate((NEG_EEG_0013, NEG_EEG_0025, NEG_EEG_0053, NEG_EEG_0090))

print('===== Checking concatenated EEG data =====')

EEG_all_X = np.concatenate((POS_EEG, NEG_EEG))
print(EEG_all_X.shape)
EEG_y_1s = np.ones(POS_EEG.shape[0]).astype(int)
EEG_y_0s = np.zeros(NEG_EEG.shape[0]).astype(int)
EEG_all_y = np.concatenate((EEG_y_1s, EEG_y_0s))
print(EEG_all_y.shape)

print('===== Checking Trainin / Testing / Validation data =====')

# Create Training, Validation, and Test Sets from all of our available data
# Use 70/15/15 split since dataset is relatively small
EEG_X_train,EEG_X_val_test,EEG_y_train,EEG_y_val_test = train_test_split(EEG_all_X,EEG_all_y, test_size=0.3)
EEG_X_val,EEG_X_test,EEG_y_val,EEG_y_test = train_test_split(EEG_X_val_test,EEG_y_val_test, test_size=0.5)
print(EEG_X_train.shape)
print(EEG_y_train.shape)
print(EEG_X_val.shape)
print(EEG_y_val.shape)
print(EEG_X_test.shape)
print(EEG_y_test.shape)

# Make sure EEG data is shuffled proprely
print(EEG_y_test[0:10])
print(EEG_y_val[0:10])
print(EEG_y_train[0:10])

In [None]:
## Use EEG data from patients 0013, 0025, and 0053 for trainning and 00090 for testing

# Combine positive (IED) EEG data from patients 0013, 0025, and 0053 for training
POS_EEG_train = np.concatenate((POS_EEG_0013, POS_EEG_0025, POS_EEG_0053))

# Combine negative (non-IED) EEG data from patients 0013, 0025, and 0053 for training
NEG_EEG_train = np.concatenate((NEG_EEG_0013, NEG_EEG_0025, NEG_EEG_0053))

# Use positive (IED) EEG data from patient 0090 for testing
POS_EEG_test = POS_EEG_0090

# Use negative (non-IED) EEG data from patient 0090 for testing
NEG_EEG_test = NEG_EEG_0090

print('===== Checking concatenated EEG data =====')

EEG_all_X = np.concatenate((POS_EEG, NEG_EEG))
print(EEG_all_X.shape)
EEG_y_1s = np.ones(POS_EEG.shape[0]).astype(int)
EEG_y_0s = np.zeros(NEG_EEG.shape[0]).astype(int)
EEG_all_y = np.concatenate((EEG_y_1s, EEG_y_0s))
print(EEG_all_y.shape)

print('===== Checking Trainin / Testing / Validation data =====')

# Create Training, Validation, and Test Sets from all of our available data
# Use 70/15/15 split since dataset is relatively small
EEG_X_train,EEG_X_val_test,EEG_y_train,EEG_y_val_test = train_test_split(EEG_all_X,EEG_all_y, test_size=0.3)
EEG_X_val,EEG_X_test,EEG_y_val,EEG_y_test = train_test_split(EEG_X_val_test,EEG_y_val_test, test_size=0.5)
print(EEG_X_train.shape)
print(EEG_y_train.shape)
print(EEG_X_val.shape)
print(EEG_y_val.shape)
print(EEG_X_test.shape)
print(EEG_y_test.shape)

# Make sure EEG data is shuffled proprely
print(EEG_y_test[0:10])
print(EEG_y_val[0:10])
print(EEG_y_train[0:10])

In [92]:
# Dump EEG train, test, and validation dataset as pickle file

pickle.dump(EEG_X_train, open(path+"EEG_X_train.pickle",'wb'))
pickle.dump(EEG_X_val, open(path+"EEG_X_val.pickle",'wb'))
pickle.dump(EEG_X_test, open(path+"EEG_X_test.pickle",'wb'))
pickle.dump(EEG_y_train, open(path+"EEG_y_train.pickle",'wb'))
pickle.dump(EEG_y_val, open(path+"EEG_y_val.pickle",'wb'))
pickle.dump(EEG_y_test, open(path+"EEG_y_test.pickle",'wb'))

# Step 5: Hyperparameter Grid Search for EEG CNN Model

In [None]:
# Load EEG data for grid search
EEG_X_grid_train = pickle.load(open(path+"pickles/EEG_X_grid_train.pickle",'rb'))
EEG_y_grid_train = pickle.load(open(path+"pickles/EEG_y_grid_train.pickle",'rb'))

In [None]:
# Print the type and shape of EEG data for grid search
print(type(EEG_X_grid_train))
print(type(EEG_y_grid_train))
print(EEG_X_grid_train.shape)
print(EEG_y_grid_train.shape)

# Make sure EEG data is shuffled proprely
print(EEG_y_grid_train[0:20])

In [None]:
# Function to return a 2D CNN using given hyperparameters during cross validation

def get_clf(dropout, filters, fullyneurons):
  
  # Create Sequential model
  model = tf.keras.Sequential() 

  # Add first convolutional layer with specified number of filters, kernel size (3, 3), and input shape based on epoch size
  model.add(layers.Conv2D(filters, (3, 3), padding="same", input_shape=(256, 20, 1))) 
  model.add(layers.BatchNormalization())  # Add batch normalization layer for faster and more stable training
  model.add(layers.Activation("relu"))  # Add ReLU activation function
  model.add(layers.MaxPool2D(pool_size=(2, 2)))  # Add max pooling layer to reduce spatial dimensions
  model.add(layers.Dropout(dropout))  # Add dropout layer for regularization

  # Add second convolutional layer with double the number of filters and the same kernel size (3, 3)
  model.add(layers.Conv2D(filters*2, (3, 3), padding="same")) 
  model.add(layers.BatchNormalization())
  model.add(layers.Activation("relu"))
  model.add(layers.MaxPool2D(pool_size=(2,2))) 
  model.add(layers.Dropout(dropout))

  # Add third convolutional layer with triple the number of filters and the same kernel size (3, 3)
  model.add(layers.Conv2D(filters*3, (3, 3), padding="same")) 
  model.add(layers.BatchNormalization())
  model.add(layers.Activation("relu"))
  model.add(layers.MaxPool2D(pool_size=(2,2))) 
  model.add(layers.Dropout(dropout))

  # Flatten the output of the convolutional layers
  model.add(layers.Flatten())

  # Add a fully connected layer with the specified number of neurons
  model.add(layers.Dense(fullyneurons))
  model.add(layers.BatchNormalization())
  model.add(layers.Activation("relu"))
  model.add(layers.Dropout(dropout))

  # Add the output layer with sigmoid activation function for binary classification: IED_positive or IED_negative
  model.add(layers.Dense(1, activation="sigmoid"))

  return model

In [None]:
## Define KerasClassifier for binary classification for EEG data

# Reduce the learning rate if the loss does not improve for 2 epochs
reduce_lr = ReduceLROnPlateau(monitor="loss", factor=0.1, patience=2, min_lr=0.00001, model="auto")

# Stop training if the validation loss does not decrease for 10 epochs
early_stop = EarlyStopping(monitor='val_loss', mode='min', verbose=1, patience=10)

cb = [reduce_lr, early_stop] # Call back function

clf = KerasClassifier(
    model=get_clf,
    loss="binary_crossentropy", # Set loss fundtion to binary crossentropy
    optimizer=Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-08),
    model__dropout=0.2,
    model__filters = 8, 
    model__fullyneurons = 50,
    fit__validation_split=0.2,
    verbose=False,
    epochs=100, # Maximum number of epochs
    callbacks=cb,
    metrics=["accuracy", tf.keras.metrics.AUC(curve="ROC", from_logits=True), tf.keras.metrics.AUC(curve="PR", from_logits=True), tf.keras.metrics.Precision(), tf.keras.metrics.Recall()]
)

In [None]:
# Define the scoring metrics to be used during grid search
scoring_metrics = ['accuracy', 'roc_auc']

# Define the hyperparameter search space
params = {
    'model__dropout': [0.2, 0.4],  # Dropout rates to try
    'model__filters': [8, 16, 32],  # Number of filters for the first convolutional layer
    'model__fullyneurons': [50, 100, 500]  # Number of neurons in the fully connected layer
}

# Create a GridSearchCV object with the specified parameters, scoring metrics, and cross-validation settings
gs = GridSearchCV(
    clf,  # The classifier object to perform the grid search on
    params,  # The hyperparameter search space
    scoring=scoring_metrics,  # Scoring metrics to evaluate during the grid search
    refit='accuracy',  # Refit the model with the best hyperparameters found for the accuracy metric
    n_jobs=1,  # Number of CPU cores to use (1 for single core)
    verbose=3,  # Verbosity level of the output messages
    cv=3  # Number of cross-validation folds
)

# Fit the GridSearchCV object to the training data
gs.fit(EEG_X_grid_train, EEG_y_grid_train)

In [None]:
# Print the best score and the best set of hyperparameters found by the GridSearchCV function
print(gs.best_score_, gs.best_params_)

In [None]:
# Save the grid search results of EEG data to CSV file
gs_dict = gs.cv_results_
del gs_dict['mean_fit_time']
del gs_dict['std_fit_time']
del gs_dict['mean_score_time']
del gs_dict['std_score_time']
del gs_dict['params']
gs_df = pd.DataFrame.from_dict(gs_dict)
gs_df.to_csv(path+"eeg_gs_df_edited.csv")

# Step 6: Training and Testing EEG CNN Model Using Optimal Hyperparameters

In [None]:
# Load EEG data for model training, validation, and testing
EEG_X_train = pickle.load(open(path+"EEG_X_train.pickle",'rb'))
EEG_X_val = pickle.load(open(path+"EEG_X_val.pickle",'rb'))
EEG_X_test = pickle.load(open(path+"EEG_X_test.pickle",'rb'))
EEG_y_train = pickle.load(open(path+"EEG_y_train.pickle",'rb'))
EEG_y_val = pickle.load(open(path+"EEG_y_val.pickle",'rb'))
EEG_y_test = pickle.load(open(path+"EEG_y_test.pickle",'rb'))

In [None]:
## Create 2D CNN using optimal parameters

model = tf.keras.Sequential() # Create Sequential model

DROPOUT = 0.2
FILTERS = 32
FULLUNEURONS = 50

# Add first convolutional layer with specified number of filters, kernel size (3, 3), and input shape based on epoch size
model.add(layers.Conv2D(FILTERS, (3, 3), padding="same", input_shape=(256, 20, 1)))
model.add(layers.BatchNormalization())  # Add batch normalization layer for faster and more stable training
model.add(layers.Activation("relu"))  # Add ReLU activation function
model.add(layers.MaxPool2D(pool_size=(2, 2)))  # Add max pooling layer to reduce spatial dimensions
model.add(layers.Dropout(DROPOUT))  # Add dropout layer for regularization

# Add second convolutional layer with double the number of filters and the same kernel size (3, 3)
model.add(layers.Conv2D(FILTERS*2, (3, 3), padding="same"))
model.add(layers.BatchNormalization())
model.add(layers.Activation("relu"))
model.add(layers.MaxPool2D(pool_size=(2,2)))
model.add(layers.Dropout(DROPOUT))

# Add third convolutional layer with triple the number of filters and the same kernel size (3, 3)
model.add(layers.Conv2D(FILTERS*3, (3, 3), padding="same"))
model.add(layers.BatchNormalization())
model.add(layers.Activation("relu"))
model.add(layers.MaxPool2D(pool_size=(2,2)))
model.add(layers.Dropout(DROPOUT))

# Flatten the output of the convolutional layers
model.add(layers.Flatten())

# Add a fully connected layer with the specified number of neurons
model.add(layers.Dense(FULLUNEURONS)) 
model.add(layers.BatchNormalization())
model.add(layers.Activation("relu"))
model.add(layers.Dropout(DROPOUT))

# Add the output layer with sigmoid activation function for binary classification: IED_positive or IED_negative
model.add(layers.Dense(1, activation="sigmoid"))

# Set up Adam optimizer
optimizer = Adam(learning_rate=0.01, beta_1=0.9, beta_2=0.999, epsilon=1e-08) 

# Compile the model with the optimizer, loss function, and specified metrics
model.compile(optimizer=optimizer, 
              loss=tf.keras.losses.BinaryCrossentropy(), 
              metrics=["accuracy", 
                       tf.keras.metrics.AUC(curve="ROC", from_logits=True), 
                       tf.keras.metrics.AUC(curve="PR", from_logits=True), 
                       tf.keras.metrics.Precision(), 
                       tf.keras.metrics.Recall()])

In [None]:
# Clear the previous TensorFlow session
keras.backend.clear_session()

In [None]:
# Define the number of training epochs
epochs = 20

# Set up a checkpoint to save the model weights with the highest validation accuracy
checkpoint = ModelCheckpoint("model_weights.h5", 
                             monitor="val_accuracy", 
                             save_weights_only=True, 
                             mode="max", 
                             verbose=1)

# Set up a learning rate scheduler to reduce the learning rate if the loss does not improve for 2 epochs
reduce_lr = ReduceLROnPlateau(monitor="loss", 
                              factor=0.1, 
                              patience=2, 
                              min_lr=0.00001, 
                              mode="auto")

# Create a list of callbacks to be used during training
callbacks = [checkpoint, reduce_lr]

# Train the model using the specified training and validation data, epochs, and callbacks
history = model.fit(
    x=EEG_X_train,
    y=EEG_y_train,
    validation_data=(EEG_X_val, EEG_y_val),
    epochs=epochs,
    callbacks=callbacks
)

In [None]:
# Evaluate the trained EEG IED detector
results = model.evaluate(EEG_X_test, EEG_y_test)

In [None]:
## Generate the confusion matrix of the trained EEG IED detector on the test data

# Make predictions on the test data
y_pred = model.predict(EEG_X_test)
y_pred = np.round(y_pred).flatten()

# Calculate the confusion matrix
cm = confusion_matrix(EEG_y_test, y_pred)

# Plot the confusion matrix
fig, ax = plt.subplots()
im = ax.imshow(cm, cmap='Blues')

# Add labels and colorbar
ax.set_xticks(np.arange(2))
ax.set_yticks(np.arange(2))
ax.set_xticklabels(['Non-IED', 'IED'])
ax.set_yticklabels(['Non-IED', 'IED'])
plt.colorbar(im)

# Add annotations to the plot
thresh = cm.max() / 2
for i in range(cm.shape[0]):
    for j in range(cm.shape[1]):
        ax.text(j, i, format(cm[i, j], 'd'),
                ha="center", va="center",
                color="white" if cm[i, j] > thresh else "black")

plt.xlabel('Predicted label')
plt.ylabel('True label')
plt.title('Confusion Matrix')
plt.show()

# Step 7: Visualize the EGG CNN Model's Classification

In [None]:
def show_time_series(data, xlable, ylabel):
  cur = data 
  plt.plot(cur)
  plt.gca().invert_yaxis() #Invert to match the default reversed y-axis view in Brainstorm.
  plt.xlabel(xlable)
  plt.ylabel(ylabel)
  plt.show()

In [None]:
# Use the trained EEG CNN model to predict the test set
prediction_test = model.predict(EEG_X_test)
prediction_test = (prediction_test > 0.5).astype(np.float32)
prediction_test = prediction_test.flatten()

# Initialize empty lists for IED_positive and IED_negative indices
positive_indices = []
negative_indices = []

# Iterate through the predicted labels and assign indices to positive and negative lists
for i in range(0, len(prediction_test)):
    if prediction_test[i] == 1:
        positive_indices.append(i)
    else:
        negative_indices.append(i)

# Calculate the average of the test set samples classified as positive (IEDs)
predicted_test_positives = np.mean(EEG_X_test[positive_indices, :, :], axis=0)

# Visualize the time series of the average predicted positives (IEDs)
show_time_series(predicted_test_positives, "Sample number", "Amplitude")

# Calculate the average of the test set samples classified as negative (non-IEDs)
predicted_test_negatives = np.mean(EEG_X_test[negative_indices, :, :], axis=0)

# Visualize the time series of the average predicted negatives (non-IEDs)
show_time_series(predicted_test_negatives, "Sample number", "Amplitude")
