# Initialization

In [None]:

#! STEP1: In terminal:
#! ThesisLocal/Scripts/activate

In [1]:
# Always Import
import mne
from mne.filter import filter_data

import os
import pandas as pd
import numpy as np
import matplotlib
matplotlib.use('TkAgg')
import matplotlib.pyplot as plt
import seaborn as sns


from pathlib import Path
mne.set_log_level(30) # level of 10 prints all info, level of 30 only prints warnings and errors

# Pre-Processing
import time
import picard
import joblib
from mne.preprocessing import ICA
from IPython.display import clear_output
import gc
import cv2
import random
from PIL import Image

# Everything PyTorch
import torch
import torch.nn as nn
import torch.optim as optim
from torch.autograd import Variable
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader, TensorDataset
from torchmetrics import Accuracy
from torchvision import models, transforms
from torchmetrics.classification import BinaryAccuracy
import pytorch_lightning as pl

# Alexnet
from torchvision.models import alexnet 

# import optuna
# from optuna.integration import PyTorchLightningPruningCallback
import sklearn
from sklearn.metrics import accuracy_score
#from pytorch_lightning.callbacks import ModelCheckpoint

from tqdm import tqdm

# For old tensorflow model
from EEGModels import EEGNet, DeepConvNet
from tensorflow.keras.callbacks import ModelCheckpoint
from keras.utils import to_categorical, Sequence
from tensorflow.keras import backend as K


import mne
import numpy as np
from braindecode.models import EEGNetv4, Deep4Net
from braindecode.datasets import create_from_X_y
from braindecode import EEGClassifier
from skorch import NeuralNetClassifier, NeuralNetBinaryClassifier
from skorch.callbacks import LRScheduler, EarlyStopping, ProgressBar, EpochScoring
from skorch.helper import predefined_split
from skorch.dataset import ValidSplit
from sklearn.model_selection import GridSearchCV, StratifiedKFold, train_test_split, cross_val_score
from skorch.classifier import NeuralNetBinaryClassifier
from skorch.helper import SliceDataset
from numpy import array
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from functools import partial
from torch.utils.data import random_split
from torcheeg.models import EEGNet as torchEEG
from torcheeg.trainers import ClassifierTrainer

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score, classification_report, confusion_matrix

# Preprocessing

In [None]:
event_dict = {"block_start": 1, "block_end": 2} # Preparing the event ID's as per the documentation (EEG experiment protocol_Migraine_Brain_KiltHub_vF.pdf)
variances = []
ica = ICA(n_components=50, max_iter='auto', random_state=97) # choosing 50 components, explains at least 95% of variance or more in every file while minimizing convergence time from ~10mins to less than 1min in most cases

reject_criteria = dict(
    eeg=75e-6,  # 100 µV
    eog=200e-6,
    ecg=1200e-6
)  # 200 µV

flat_criteria = dict(eeg=1e-6)  # 1 fT  # 1 fT/cm  # 1 µV


for file in os.listdir('Dataset')[2:]: #! WARNING: This script may currently not work as intended again due to a change in file naming convention, be aware
    shortname = file[:-12]
    
    print(f'Now working on {file}')
    raw = mne.io.read_raw_bdf(f"Dataset/{file}", preload=True)
    raw_unfiltered = raw.copy()
    current_sfreq = raw.info["sfreq"]
    desired_sfreq = 256  # Hz
    decim = np.round(current_sfreq / desired_sfreq).astype(int)
    obtained_sfreq = current_sfreq / decim
    lowpass_freq = obtained_sfreq / 3.0
    raw = raw.filter(l_freq=0.5, h_freq=lowpass_freq)
    raw = raw.resample(256)
    raw = raw.filter(l_freq=0.5, h_freq=100, method='iir') #(i) zero-phase Butterworth filter was used to filter the signals between 0.1 and 100 Hz;
    raw = raw.set_channel_types({'ECG':'ecg', 'Fp1':'eog', 'Fp2':'eog', 'Fpz':'eog'})

    eog_epochs = mne.preprocessing.create_eog_epochs(raw)
    ecg_epochs = mne.preprocessing.create_ecg_epochs(raw)
    
    print(f'Now Resampling and Epoching {file}')
    raw = raw.pick(picks=None, exclude=['Status'])
    raw = mne.make_fixed_length_epochs(raw, duration=2, overlap=0)
    raw.load_data()
    raw = raw.apply_baseline((None, None))
    
    # ecg_events = mne.preprocessing.find_ecg_events(raw)
    # onsets = ecg_events[0][:, 0] / raw.info["sfreq"] - 0.25    
    # durations = [0.5] * len(ecg_events)
    # descriptions = ["bad beat"] * len(ecg_events)
    # beat_annot = mne.Annotations(
    #     onsets, durations, descriptions, orig_time=raw.info["meas_date"]
    # )
    # raw.set_annotations(beat_annot)   
    # print(f'ECG events annotated')    
     # setting up pre-existing channel ECG for heartbeat detection; Fp1 as eye-near channel is used as proxy for eye-related artifacts
    raw = raw.set_eeg_reference(['M1', 'M2'])

    print(f'Initializing ICA for {file}')
    ica_filt_raw = raw_unfiltered.copy().filter(l_freq=1.0, h_freq=None)
    ica_filt_raw.set_channel_types({'ECG':'ecg', 'Fp1':'eog', 'Fp2':'eog', 'Fpz':'eog'})#(iii) independent component analysis was used to identify and remove eye-related artefacts (blinks and horizontal eye movements) and heartbeat
    ica.fit(ica_filt_raw, decim=decim)
    
    explained_var_ratio = ica.get_explained_variance_ratio(ica_filt_raw)
    for ratio in explained_var_ratio.items():
        variances.append(ratio)
    # Finding out which components to exclude
    ica.exclude = []
    # find which ICs match the EOG/ECG patterns
    eog_indices, eog_scores = ica.find_bads_eog(eog_epochs, threshold='auto', measure='zscore')
    ica.exclude = eog_indices
    ecg_indices, ecg_scores = ica.find_bads_ecg(ecg_epochs, threshold="auto", measure='zscore')
    ica.exclude += ecg_indices
    raw = ica.apply(raw)
 
    raw = raw.set_channel_types({'ECG':'eeg', 'Fp1':'eeg', 'Fp2':'eeg', 'Fpz':'eeg'})
    scaler = mne.decoding.Scaler(info=raw.info, scalings=None)
    raw.save(f'PreprocessedEpochs/{shortname}-epo.fif', overwrite=True)
    fixed_epoch_data = raw.get_data()
    fixed_epoch_data = scaler.fit_transform(fixed_epoch_data)
    np.save(f'PreprocessedEpochData/{shortname}.npy', fixed_epoch_data)

## Final data to Pandas DF (Deprecated, ignore)

In [None]:
train_dataframes = []

for file in os.listdir('PreprocessedEpochs/Train'):
    dfs = mne.read_epochs(f'PreprocessedEpochs/Train/{file}').to_data_frame()
    dfs = dfs.drop(columns=['time', 'condition'])
    dfs = dfs.groupby('epoch').mean().reset_index()
    dfs = dfs.drop(columns=['epoch'])
    
    if file[0] == "M":
        dfs['migraine'] = 1
    else:
        dfs['migraine'] = 0

    train_dataframes.append(dfs)

train_df = pd.concat(train_dataframes, ignore_index=True)

test_dataframes = []

for file in os.listdir('PreprocessedEpochs/Test'):
    dfs = mne.read_epochs(f'PreprocessedEpochs/Test/{file}').to_data_frame()
    dfs = dfs.drop(columns=['time', 'condition'])
    dfs = dfs.groupby('epoch').mean().reset_index()
    dfs = dfs.drop(columns=['epoch'])
    
    if file[0] == "M":
        dfs['migraine'] = 1
    else:
        dfs['migraine'] = 0

    test_dataframes.append(dfs)

test_df = pd.concat(test_dataframes, ignore_index=True)

In [None]:
# GROUPING THE DIFFERENT BRAINREGIONS TO REDUCE FEATURES
df = pd.read_csv("FullDF.csv", index_col=0)

regions = {
    'AF_left': ['AF7','AF3'],
    'AF_center': ['AFz'],
    'AFp_left': ['AFp1'],
    'AFp_right': ['AFp2'],
    'AF_right': ['AF4', 'AF8'],
    'AFF_left': ['AFF5h', 'AFF1h'],
    'AFF_right': ['AFF2h', 'AFF6h'],
    
    'C_left': ['C5','C3','C1'],
    'C_center': ['Cz'],
    'C_right': ['C2','C4','C6'],  
    'CCP_left': ['CCP5h','CCP3h','CCP1h'],
    'CCP_right': ['CCP2h','CCP4h','CCP6h'],
    'CP_left': ['CP5','CP3','CP1'],
    'CP_center': ['CPz'],
    'CP_right': ['CP2','CP4','CP6'],
    'CPP_left': ['CPP5h','CPP3h','CPP1h'],
    'CPP_right': ['CPP2h','CPP4h','CPP6h'],
    
    'Erg_left': ['Erg1'],
    'Erg_right': ['Erg2'],
        
    'F_left': ['F9','F7','F5','F3','F1'],
    'F_center': ['Fz'],
    'F_right': ['F2','F4','F6','F8','F10'],
    'FC_left': ['FC5','FC3','FC1'],
    'FC_center': ['FCz'],
    'FC_right': ['FC2','FC4','FC6'],
    'FCC_left': ['FCC5h', 'FCC3h', 'FCC1h'],
    'FCC_right': ['FCC2h', 'FCC4h', 'FCC6h'],
    'FFC_left': ['FFC5h', 'FFC3h', 'FFC1h'],
    'FFC_right': ['FFC2h', 'FFC4h', 'FFC6h'],
    'FFT_left': ['FFT9h', 'FFT7h'],
    'FFT_right': ['FFT8h', 'FFT10h'],
    'Fp1_left': ['Fp1'],
    'Fp_center': ['Fpz'],
    'Fp_right': ['Fp2'],
    'FT_left': ['FT9', 'FT7'],
    'FT_right': ['FT8', 'FT10'],
    'FTT_left': ['FTT9h', 'FTT7h'],
    'FTT_right': ['FTT8h', 'FTT10h'],
    
    'GSR_left': ['GSR1'],
    'GSR_right': ['GSR2'],
    
    'I_left': ['I1'],
    'I_center': ['Iz'],
    'I_right': ['I2'],
    'IO_left': ['IO1'],
    'IO_right': ['IO2'],       
    
    'LO_left': ['LO1'],
    'LO_right': ['LO2'],
    
    'M_left': ['M1'],
    'M_right': ['M2'],
    
    'O_left': ['O1'],
    'O_center': ['Oz'],
    'O_right': ['O2'],
    
    'OI_left': ['OI1h'],
    'OI_right': ['OI2h'],    
    
    'P_left': ['P9','P7','P5','P3','P1'],
    'P_center': ['Pz'],
    'P_right': ['P2','P4','P6','P8','P10'],
    'PO_left': ['PO7', 'PO3', 'PO9'],
    'PO_center': ['POz'],
    'PO_right': ['PO4', 'PO8', 'PO10'],
    'POO_left': ['POO1'],
    'POO_right': ['POO2'],    
    'POOh_left': ['POO9h'],
    'POOh_right': ['POO10h'],
    'PPO_left': ['PPO9h', 'PPO5h', 'PPO1h'],
    'PPO_right': ['PPO2h', 'PPO6h', 'PPO10h'], 
    
    'SO_left': ['SO1'],   

    'T_left': ['T7'],
    'T_right': ['T8'],
    'TP_left': ['TP9', 'TP7'],
    'TP_right': ['TP8', 'TP10'],
    'TPP_left': ['TPP7h'],
    'TPP_right': ['TPP8h'],
    'TTP_left': ['TTP7h'],
    'TTP_right': ['TTP8h'],
    
    'Resp': ['Resp'],
    'Plet': ['Plet'],
    'Temp': ['Temp'],
    'ECG': ['ECG']
}

# The next 10 lines have been supplemented by ChatGPT, but have been manually checked for correctness
new_columns = {}
for freq in ['delta','theta','alpha','beta','gamma']:
    for group_name, channels in regions.items():        
        cols_to_avg = [f"{freq}_{ch}" for ch in channels if f"{freq}_{ch}" in df.columns]
        
        if cols_to_avg:  # Only if there are matching columns
            # Calculate the average for the current frequency and group
            new_col_name = f"{freq}_{group_name}"
            new_columns[new_col_name] = df[cols_to_avg].mean(axis=1)
df_grouped = pd.DataFrame(new_columns)

dropped_df = df.drop(df.columns[np.arange(705)], axis=1)
df = pd.concat([dropped_df, df_grouped], axis=1)
df.to_csv("GroupedDF.csv")

## TFR

In [None]:
all_resting = os.listdir('PreprocessedEpochs')
freqs = np.logspace(*np.log10([1, 100]), num=50)
n_cycles = freqs / 2.0

tfr_input= mne.read_epochs(f'PreprocessedEpochs/{all_resting[0]}', preload=True)
power = tfr_input.compute_tfr(method='morlet', freqs=freqs, n_cycles=n_cycles, picks='all', verbose=10, decim=10, n_jobs=16)
power.save(f'PreprocessedTFR/{all_resting[0]}-tfr.hdf5', overwrite=True)

In [None]:
montage = mne.channels.make_standard_montage("standard_1020")
montage.plot()
fig = montage.plot(kind="3d", show=True)  # 3D
fig = fig.gca().view_init(azim=70, elev=15)  # set view angle for tutorial

In [None]:
raw = mne.io.read_raw_fif("Preprocessed/M1.raw.fif")
power = mne.time_frequency.read_tfrs("PreprocessedTFR/M1-tfr.hdf5")
#print(power[0].average().plot()) #power[x] chooses epoch, picks=y chooses channels

In [None]:
# CREATES MONTAGE FOR STANDARD BIOSEMI, HOWEVER THIS DATASET HAS A CUSTOM-MADE EEG CAP
#mne.channels.get_builtin_montages(descriptions=True)
montage = mne.channels.make_standard_montage('biosemi128')
montage.plot()

In [None]:
all_resting = os.listdir('PreprocessedEpochs')
freqs = np.logspace(*np.log10([1, 100]), num=50)
n_cycles = freqs / 2.0

for file in all_resting:
    clear_output()
    shortname = file[:-8]
    if f'{shortname}-STFT-tfr.hdf5' not in os.listdir('PreprocessedTFR-STFT'):
        print(f"Now computing {shortname}")
        tfr_input= mne.read_epochs(f'PreprocessedEpochs/{file}', preload=True)
        power = tfr_input.compute_tfr(method='multitaper', freqs=freqs, n_cycles=n_cycles, picks='all', verbose=10, decim=10, n_jobs=-1) #Methods: multitaper for STFT, morlet for CWT (replacing bump)
        del tfr_input
        power.save(f'PreprocessedTFR-STFT/{shortname}-STFT-tfr.hdf5')
        del power
        gc.collect()
    else:
        continue

# Deep Learning

## TFR through AlexNet

### Processing

In [None]:
shortname = "C1"
power = mne.time_frequency.read_tfrs(f'PreprocessedTFR/C1-tfr.hdf5') #reads in the TFR
images = power[52].average().plot(picks="Cz", show=False, colorbar=False, baseline=(None, None), mode="zlogratio") 
Path.mkdir(f'AlexNet/{shortname}', exist_ok=True)
image = images[0]
ax = image.axes[0]
ax.axis('off')
image.savefig(f'AlexNet/{shortname}/temp')

In [None]:
files = os.listdir("PreprocessedTFR")
print(files[32:])

In [None]:
#channel_list = ['Fp1','AFp1','AF7','AF3','AFF5h','AFF1h','F9','F7','F5','F3','F1','FFT9h','FFT7h','FFC5h','FFC3h','FFC1h','FT9','FT7','FC5','FC3','FC1','FTT9h','FTT7h','FCC5h','FCC3h','FCC1h','T7','C5','C3','C1','TTP7h','CCP5h','CCP3h','CCP1h','TP9','TP7','CP5','CP3','CP1','CPz','TPP7h','CPP5h','CPP3h','CPP1h','P9','P7','P5','P3','P1','Pz','PPO9h','PPO5h','PPO1h','PO7','PO3','POz','PO9','POO9h','O1','POO1','I1','OI1h','Oz','Iz','Fpz','Fp2','AFp2','AFz','AF4','AF8','AFF2h','AFF6h','Fz','F2','F4','F6','F8','F10','FFC2h','FFC4h','FFC6h','FFT8h','FFT10h','FCz','FC2','FC4','FC6','FT8','FT10','FCC2h','FCC4h','FCC6h','FTT8h','FTT10h','Cz','C2','C4','C6','T8','CCP2h','CCP4h','CCP6h','TTP8h','CP2','CP4','CP6','TP8','TP10','CPP2h','CPP4h','CPP6h','TPP8h','P2','P4','P6','P8','P10','PPO2h','PPO6h','PPO10h','PO4','PO8','PO10','POO2','O2','POO10h','OI2h','I2','M1','M2','LO1','LO2','IO1','SO1','IO2','ECG','GSR1','GSR2','Erg1','Erg2','Resp','Plet','Temp','Status']
short_channel_list = ['AF7', 'AF3', 'F7', 'F5', 'F3', 'F1', 'FC5', 'FC3', 'AF4', 'AF8', 'F2', 'F4', 'F6', 'F8', 'FC4', 'FC6', 'C6', 'CCP2h', 'CCP4h', 'TTP8h', 'CP6', 'TP10', 'CPP4h', 'TPP8h', 'CPP1h', 'CPP3h', 'CP5', 'CP1', 'CPP5h', 'CPP2h', 'P9', 'P5', 'P2', 'P6', 'PPO2h', 'PPO10h', 'PO4', 'PO10', 'POO10h', 'I2', 'Pz', 'PPO5h', 'PO7', 'PO3', 'POO9h', 'POO1', 'I1', 'Oz', 'O2', 'O1']

# ! Just for documentation which channels were chosen
# frontal_left = ['AF7', 'AF3', 'F7', 'F5', 'F3', 'F1', 'FC5', 'FC3']
# frontal_right = ['AF4', 'AF8', 'F2', 'F4', 'F6', 'F8', 'FC4', 'FC6']
# parietal_left = ['C6', 'CCP2h', 'CCP4h', 'TTP8h', 'CP6', 'TP10', 'CPP4h', 'TPP8h']
# parietal_right= ['CPP1h', 'CPP3h', 'CP5', 'CP1', 'CPP5h', 'CPP2h', 'P9', 'P5']
# occipito_parietal_left = ['P2', 'P6', 'PPO2h', 'PPO10h', 'PO4', 'PO10', 'POO10h', 'I2']
# occipito_parietal_right = ['Pz', 'PPO5h', 'PO7', 'PO3', 'POO9h', 'POO1', 'I1', 'Oz']
# occipital_left = ['O2']
# occipital_right = ['O1']

labels = pd.DataFrame(columns=['file', 'label', 'aura', 'old', 'female'])
files = os.listdir("PreprocessedTFR")
files = files[32:]

preprocess = transforms.Compose([
    transforms.Resize(256, interpolation=Image.BICUBIC),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

aura = ['M1', 'M2', 'M4', 'M5', 'M6', 'M7', 'M10', 'M11', 'M14', 'M16', 'M17', 'M18']
old = ['M6', 'M7', 'M8', 'M9', 'M14', 'M17', 'M18', 'C5', 'C7', 'C8', 'C9', 'C10', 'C17', 'C21']
female = ['M2', 'M3', 'M5', 'M6', 'M10', 'M11', 'M12', 'M13', 'M14', 'M15', 'M16', 'M17', 'M18', 'C1', 'C3', 'C4', 'C8', 'C9', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21'] 

for file in files: #files contains the name of each subjects file
    shortname = file[:-9] #extracts only the subject name (i.e., C1)
    Path.mkdir(f'AlexNet/{shortname}', exist_ok=True)
    power = mne.time_frequency.read_tfrs(f'PreprocessedTFR/{file}') #reads in the TFR
    for i in random.sample(range(0,power.shape[0]), 50): # takes 50 random epochs across all epochs of a file
        for channel in short_channel_list: #short_channel_list contains a selection of 50 channels that should cover all areas of the brain sufficiently and roughly equally
            print(f"now {shortname}-{i}-{channel}")
            images = power[i].average().plot(picks=channel, show=False, colorbar=False, baseline=(None, None), mode="zlogratio") #baselined zlog image is plotted
            image = images[0]
            ax = image.axes[0]
            ax.axis('off')
            image.savefig("temp")
            matplotlib.pyplot.close()
            img = Image.open("temp.png").convert("RGB") # image is converted into RGB rather than RGBT image
            img = preprocess(img) # turned into tensor
            torch.save(img, f=f'AlexNet/{shortname}/{shortname}-{i}-{channel}.pt')
            #img.save(f'AlexNet/{shortname}/{shortname}-{i}-{channel}')

In [None]:
channel_list = ['Fp1','AFp1','AF7','AF3','AFF5h','AFF1h','F9','F7','F5','F3','F1','FFT9h','FFT7h','FFC5h','FFC3h','FFC1h','FT9','FT7','FC5','FC3','FC1','FTT9h','FTT7h','FCC5h','FCC3h','FCC1h','T7','C5','C3','C1','TTP7h','CCP5h','CCP3h','CCP1h','TP9','TP7','CP5','CP3','CP1','CPz','TPP7h','CPP5h','CPP3h','CPP1h','P9','P7','P5','P3','P1','Pz','PPO9h','PPO5h','PPO1h','PO7','PO3','POz','PO9','POO9h','O1','POO1','I1','OI1h','Oz','Iz','Fpz','Fp2','AFp2','AFz','AF4','AF8','AFF2h','AFF6h','Fz','F2','F4','F6','F8','F10','FFC2h','FFC4h','FFC6h','FFT8h','FFT10h','FCz','FC2','FC4','FC6','FT8','FT10','FCC2h','FCC4h','FCC6h','FTT8h','FTT10h','Cz','C2','C4','C6','T8','CCP2h','CCP4h','CCP6h','TTP8h','CP2','CP4','CP6','TP8','TP10','CPP2h','CPP4h','CPP6h','TPP8h','P2','P4','P6','P8','P10','PPO2h','PPO6h','PPO10h','PO4','PO8','PO10','POO2','O2','POO10h','OI2h','I2','M1','M2','LO1','LO2','IO1','SO1','IO2','ECG','GSR1','GSR2','Erg1','Erg2','Resp','Plet','Temp','Status']
labels = pd.DataFrame(columns=['file', 'label', 'aura', 'old', 'female'])
files = os.listdir("PreprocessedTFR")
preprocess = transforms.Compose([transforms.Resize((227,227), interpolation=Image.BICUBIC)])

aura = ['M1', 'M2', 'M4', 'M5', 'M6', 'M7', 'M10', 'M11', 'M14', 'M16', 'M17', 'M18']
old = ['M6', 'M7', 'M8', 'M9', 'M14', 'M17', 'M18', 'C5', 'C7', 'C8', 'C9', 'C10', 'C17', 'C21']
female = ['M2', 'M3', 'M5', 'M6', 'M10', 'M11', 'M12', 'M13', 'M14', 'M15', 'M16', 'M17', 'M18', 'C1', 'C3', 'C4', 'C8', 'C9', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21'] 

for file in files: #files contains the name of each subjects file
    shortname = file[:-9] #extracts only the subject name (i.e., C1)
    power = mne.time_frequency.read_tfrs(f'PreprocessedTFR/{file}') #reads in the TFR
    for i in range(0,power.shape[0]+1): #power.shape[0] is the amount of epochs of that file
        for channel in channel_list: #channel_list contains all existing channels
            if f'{shortname}-{i}-{channel}.png' not in os.listdir("TFR-CWT Representations"): #failsafe in case the kernel crashes
                print(f"Now: {shortname}-{i}-{channel}")
                images = power[i].average().plot(picks=channel, show=False, colorbar=False, baseline=(None, None), mode="ratio") 
                image = images[0]
                ax = image.axes[0]
                ax.axis('off')
                image.savefig("temp")
                img = Image.open("temp.png").convert("RGB")
                img = preprocess(img)
                img.save(f"TFR-CWT Representations/{shortname}-{i}-{channel}.png")

In [None]:
# Read files
files = os.listdir("PreprocessedTFR")

# Prepare rows for the DataFrame
rows = []

labels = pd.DataFrame(columns=['file', 'label', 'aura', 'old', 'female'])
channel_list = ['Fp1','AFp1','AF7','AF3','AFF5h','AFF1h','F9','F7','F5','F3','F1','FFT9h','FFT7h','FFC5h','FFC3h','FFC1h','FT9','FT7','FC5','FC3','FC1','FTT9h','FTT7h','FCC5h','FCC3h','FCC1h','T7','C5','C3','C1','TTP7h','CCP5h','CCP3h','CCP1h','TP9','TP7','CP5','CP3','CP1','CPz','TPP7h','CPP5h','CPP3h','CPP1h','P9','P7','P5','P3','P1','Pz','PPO9h','PPO5h','PPO1h','PO7','PO3','POz','PO9','POO9h','O1','POO1','I1','OI1h','Oz','Iz','Fpz','Fp2','AFp2','AFz','AF4','AF8','AFF2h','AFF6h','Fz','F2','F4','F6','F8','F10','FFC2h','FFC4h','FFC6h','FFT8h','FFT10h','FCz','FC2','FC4','FC6','FT8','FT10','FCC2h','FCC4h','FCC6h','FTT8h','FTT10h','Cz','C2','C4','C6','T8','CCP2h','CCP4h','CCP6h','TTP8h','CP2','CP4','CP6','TP8','TP10','CPP2h','CPP4h','CPP6h','TPP8h','P2','P4','P6','P8','P10','PPO2h','PPO6h','PPO10h','PO4','PO8','PO10','POO2','O2','POO10h','OI2h','I2','M1','M2','LO1','LO2','IO1','SO1','IO2','ECG','GSR1','GSR2','Erg1','Erg2','Resp','Plet','Temp','Status']
aura = ['M1', 'M2', 'M4', 'M5', 'M6', 'M7', 'M10', 'M11', 'M14', 'M16', 'M17', 'M18']
old = ['M6', 'M7', 'M8', 'M9', 'M14', 'M17', 'M18', 'C5', 'C7', 'C8', 'C9', 'C10', 'C17', 'C21']
female = ['M2', 'M3', 'M5', 'M6', 'M10', 'M11', 'M12', 'M13', 'M14', 'M15', 'M16', 'M17', 'M18', 'C1', 'C3', 'C4', 'C8', 'C9', 'C14', 'C15', 'C16', 'C17', 'C18', 'C19', 'C20', 'C21'] 



for file in files:
    shortname = file[:-9]
    power = mne.time_frequency.read_tfrs(f'PreprocessedTFR/{file}')
    num_epochs = power.shape[0]

    is_old = shortname in old
    is_female = shortname in female
    is_aura = shortname in aura

    for i in range(num_epochs):
        for channel in channel_list:
            label = [f'{shortname}-{i}-{channel}.png']
            if shortname[0] == 'C':
                label += [0, 0, int(is_old), int(is_female)]
            elif shortname[0] == 'M':
                label += [1, int(is_aura), int(is_old), int(is_female)]
            rows.append(label)

# Append all rows to the DataFrame at once
labels = pd.concat([labels, pd.DataFrame(rows, columns=labels.columns)], ignore_index=True)

In [None]:
labels.to_csv("labels.csv")

### AlexNet

In [5]:
class AlexDataset(Dataset):
    def __init__(self, root_dir):
        self.root_dir = root_dir
        self.data_files, self.labels = self._load_data_files_and_labels(root_dir)

    def _load_data_files_and_labels(self, root_dir):
        data_files = []
        labels = []
        
        for root, _, files in os.walk(root_dir):
            for file in files:
                if file.endswith('.pt'):
                    file_path = os.path.join(root, file)
                    data_files.append(file_path)
                    label = 0 if file.startswith('C') else 1
                    #label_float = np.float32(label)                   
                    labels.append(label)
        
        return data_files, labels

    def __len__(self):
        return len(self.data_files)

    def __getitem__(self, idx):
        img_path = self.data_files[idx]
        tensor = torch.load(img_path)
        #tensor = tensor.unsqueeze(0)  # Add an additional dimension
        label = self.labels[idx]
        return tensor, label
    
    
train_dataset = AlexDataset('AlexNet/Train')
valid_dataset = AlexDataset('AlexNet/Valid')
test_dataset = AlexDataset('AlexNet/Test')

from torch.utils.data import Sampler
class SubsetRandomSampler(Sampler):
    def __init__(self, data_source, subset_size):
        self.data_source = data_source
        self.subset_size = subset_size

    def __iter__(self):
        return iter(np.random.choice(len(self.data_source), self.subset_size, replace=False))

    def __len__(self):
        return self.subset_size
    
# Create a custom sampler for the training set
train_sampler = SubsetRandomSampler(train_dataset, subset_size=5000)
valid_sampler = SubsetRandomSampler(valid_dataset, subset_size=5000)
test_sampler = SubsetRandomSampler(test_dataset, subset_size=2500)

# Create DataLoaders with the custom sampler
train_loader = DataLoader(train_dataset, batch_size=128, sampler=train_sampler, pin_memory=True)
valid_loader = DataLoader(valid_dataset, batch_size=128, sampler=valid_sampler, shuffle=False, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=128, sampler=test_sampler, shuffle=False, pin_memory=True)

class CustomNeuralNetBinaryClassifier(NeuralNetClassifier):
    def __init__(self, *args, train_loader=None, valid_loader=None, **kwargs):
        self.train_loader = train_loader
        self.valid_loader = valid_loader
        super().__init__(*args, **kwargs)

    def get_iterator(self, dataset, training=False):
        if training:
            return self.train_loader
        return self.valid_loader

In [6]:
class CustomAlexNet(nn.Module):
    def __init__(self, num_classes=2):
        super(CustomAlexNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 96, kernel_size=11, stride=4, padding=2),
            nn.BatchNorm2d(96),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2))
        self.layer2 = nn.Sequential(
            nn.Conv2d(96, 256, kernel_size=5, stride=1, padding=2),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2))
        self.layer3 = nn.Sequential(
            nn.Conv2d(256, 384, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU())
        self.layer4 = nn.Sequential(
            nn.Conv2d(384, 384, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(384),
            nn.ReLU())
        self.layer5 = nn.Sequential(
            nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=3, stride=2))
        self.fc = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(256 * 6 * 6, 4096),  # Update input dimension of first fc layer
            nn.ReLU())
        self.fc1 = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(4096, 4096),
            nn.ReLU())
        self.fc2 = nn.Sequential(
            nn.Linear(4096, num_classes))
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = self.layer5(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        out = self.fc1(out)
        out = self.fc2(out)
        return out

In [8]:
from torchsummary import summary
device = 'cuda'

num_classes = 2
num_epochs = 10
batch_size = 64
learning_rate = 0.0001

custommodel = CustomAlexNet(num_classes).to('cuda')


# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(custommodel.parameters(), momentum=0.9, lr=0.1, weight_decay=0.0005)  


# Train the model
total_step = len(train_loader)

summary(custommodel)

Layer (type:depth-idx)                   Param #
├─Sequential: 1-1                        --
|    └─Conv2d: 2-1                       34,944
|    └─BatchNorm2d: 2-2                  192
|    └─ReLU: 2-3                         --
|    └─MaxPool2d: 2-4                    --
├─Sequential: 1-2                        --
|    └─Conv2d: 2-5                       614,656
|    └─BatchNorm2d: 2-6                  512
|    └─ReLU: 2-7                         --
|    └─MaxPool2d: 2-8                    --
├─Sequential: 1-3                        --
|    └─Conv2d: 2-9                       885,120
|    └─BatchNorm2d: 2-10                 768
|    └─ReLU: 2-11                        --
├─Sequential: 1-4                        --
|    └─Conv2d: 2-12                      1,327,488
|    └─BatchNorm2d: 2-13                 768
|    └─ReLU: 2-14                        --
├─Sequential: 1-5                        --
|    └─Conv2d: 2-15                      884,992
|    └─BatchNorm2d: 2-16                 

Layer (type:depth-idx)                   Param #
├─Sequential: 1-1                        --
|    └─Conv2d: 2-1                       34,944
|    └─BatchNorm2d: 2-2                  192
|    └─ReLU: 2-3                         --
|    └─MaxPool2d: 2-4                    --
├─Sequential: 1-2                        --
|    └─Conv2d: 2-5                       614,656
|    └─BatchNorm2d: 2-6                  512
|    └─ReLU: 2-7                         --
|    └─MaxPool2d: 2-8                    --
├─Sequential: 1-3                        --
|    └─Conv2d: 2-9                       885,120
|    └─BatchNorm2d: 2-10                 768
|    └─ReLU: 2-11                        --
├─Sequential: 1-4                        --
|    └─Conv2d: 2-12                      1,327,488
|    └─BatchNorm2d: 2-13                 768
|    └─ReLU: 2-14                        --
├─Sequential: 1-5                        --
|    └─Conv2d: 2-15                      884,992
|    └─BatchNorm2d: 2-16                 

In [9]:
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):  
        # Move tensors to the configured device
        images = images.to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = custommodel(images)
        loss = criterion(outputs, labels)
        
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss.item()))
            
    # Validation
    all_labels = []
    all_preds = []
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in valid_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = custommodel(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
            all_labels.extend(labels.cpu().numpy())
            all_preds.extend(predicted.cpu().numpy())
            del images, labels, outputs
            
        val_accuracy = 100 * correct / total
        val_f1 = f1_score(all_labels, all_preds, average='weighted')          
            
    
        print('Accuracy of the network on the validation images: {:.2f} %, Validation F1-score: {:.4f}'.format(val_accuracy, val_f1))

Epoch [1/10], Step [40/40], Loss: 0.6890
Accuracy of the network on the validation images: 42.94 %, Validation F1-score: 0.2580
Epoch [2/10], Step [40/40], Loss: 0.6760
Accuracy of the network on the validation images: 57.08 %, Validation F1-score: 0.4148
Epoch [3/10], Step [40/40], Loss: 0.6926
Accuracy of the network on the validation images: 41.90 %, Validation F1-score: 0.2474
Epoch [4/10], Step [40/40], Loss: 0.6938
Accuracy of the network on the validation images: 42.80 %, Validation F1-score: 0.2566
Epoch [5/10], Step [40/40], Loss: 0.6946
Accuracy of the network on the validation images: 42.40 %, Validation F1-score: 0.2525
Epoch [6/10], Step [40/40], Loss: 0.7377
Accuracy of the network on the validation images: 56.04 %, Validation F1-score: 0.4025
Epoch [7/10], Step [40/40], Loss: 0.6892
Accuracy of the network on the validation images: 57.12 %, Validation F1-score: 0.4153
Epoch [8/10], Step [40/40], Loss: 0.6937
Accuracy of the network on the validation images: 43.20 %, Vali

In [12]:
all_labels = []
all_preds = []
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.to(device)
        labels = labels.to(device)
        outputs = custommodel(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
        all_labels.extend(labels.cpu().numpy())
        all_preds.extend(predicted.cpu().numpy())
        del images, labels
        
        
        
    test_accuracy = 100 * correct / total
    test_f1 = f1_score(all_labels, all_preds, average='weighted')  
    conf_matrix = confusion_matrix(all_labels, all_preds)
    print('Accuracy of the network on the test images: {:.2f} %, Test F1-score: {:.4f}'.format(test_accuracy, test_f1))
    print('Confusion Matrix:\n', conf_matrix)

Accuracy of the network on the test images: 55.04 %, Test F1-score: 0.3908
Confusion Matrix:
 [[1376    0]
 [1124    0]]


In [None]:
outputs

In [11]:
# Plot confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues', xticklabels=['Predicted Negative', 'Predicted Positive'], yticklabels=['Actual Negative', 'Actual Positive'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [15]:
alexnet = models.alexnet()
alexnet.classifier[6] = nn.Linear(4096, 2)  # Change output layer to have 2 outputs (control/migraine)
alexnet.cuda()

n_epochs = 50
anet = CustomNeuralNetBinaryClassifier(
    alexnet,
    train_split = predefined_split(valid_dataset),
    criterion=nn.CrossEntropyLoss,
    optimizer=torch.optim.Adam,
    max_epochs=n_epochs,
    optimizer__lr=0.01,
    optimizer__weight_decay = 0.1,
    train_loader = train_loader,
    valid_loader = valid_loader,
    iterator_train__shuffle=False,
    iterator_train__batch_size = 15,
    device='cuda',
    batch_size=15,
    callbacks=[
        (EpochScoring(scoring='accuracy', name='train_acc', on_train=True, lower_is_better=False)),
        (EpochScoring(scoring='f1', name='train f1-score', on_train=True, lower_is_better=False)),
        (EpochScoring(scoring='f1', name='valid f1-score', on_train=False, lower_is_better=False)),
        #("lr_scheduler", LRScheduler('CosineAnnealingLR', T_max=n_epochs - 1)),
        #('EarlyStopping', EarlyStopping(patience=25, load_best=True)),
        ('ProgressBar', ProgressBar())
    ],
    classes=[0,1]
    )

anet.fit(train_dataset, y=None)

  0%|          | 0/5334 [00:00<?, ?it/s]

  epoch    train f1-score    train_acc    train_loss    valid f1-score    valid_acc    valid_loss      dur
-------  ----------------  -----------  ------------  ----------------  -----------  ------------  -------
      1            [36m0.0846[0m       [32m0.5152[0m        [35m0.6940[0m            [31m0.0000[0m       [94m0.5714[0m        [36m0.6884[0m  66.0885


  0%|          | 0/5334 [00:00<?, ?it/s]

<class '__main__.CustomNeuralNetBinaryClassifier'>[initialized](
  module_=AlexNet(
    (features): Sequential(
      (0): Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
      (1): ReLU(inplace=True)
      (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
      (3): Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
      (4): ReLU(inplace=True)
      (5): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
      (6): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (7): ReLU(inplace=True)
      (8): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (9): ReLU(inplace=True)
      (10): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
      (11): ReLU(inplace=True)
      (12): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    )
    (avgpool): AdaptiveAvgPool2d(output_size=(6, 6))
    (classifier): Sequent

In [None]:
def predict_proba(loader, model):
    model.eval()
    y_pred = []
    with torch.no_grad():
        for inputs, _ in loader:
            inputs = inputs.cuda()
            outputs = model(inputs)
            prob = F.sigmoid(outputs)  # Apply sigmoid activation
            y_pred.append(prob.cpu().numpy())
    y_pred = np.concatenate(y_pred)
    return y_pred


anet.module_.eval()

# Access ground truth labels from test_dataset
test_labels = np.array(test_dataset.labels)

# Predict probabilities
#y_pred = predict_proba(test_loader, anet.module_)
y_pred = anet.predict(test_loader)

y_pred_classes = (y_pred > 0.5).astype(int)

print(classification_report(test_labels, y_pred_classes))


## EEGNet Models

### EEGNET Processing

In [None]:
# The following code chunk creates numpy arrays for training, validation, and test data. Note that i manually moved each of the epoch files into the respective folders
# EEGnet/Train contains C1-C13 and M1-12; Valid contains C14-C17 and M13, M14 and M16; Test contains C18-C21 and M15, M17 and M18
# The switch in the migraineur files has been made to make sure that both validation and test have at least 1 Aura-migraineur

# Reading the first file in each folder
train_epoch = mne.read_epochs(f"EEGNet/Train/C1-epo.fif") 
valid_epoch = mne.read_epochs(f"EEGNet/Valid/C14-epo.fif")
test_epoch = mne.read_epochs(f"EEGNet/Test/C18-epo.fif")

# Turning the epoch data into numpy array (this is done outside the loop to ensure correct dimensions, making appending easier)
train_file = train_epoch.get_data()*1000
valid_file = valid_epoch.get_data()*1000
test_file =  test_epoch.get_data()*1000

labels = [0] * train_file.shape[0] # since the first file for each of the loops is a control patient (C1, C14 and C18), I add a zero for each epoch of the file, since they are excluded in the loop. The same is done for validation and test further down

# This loop reads each file in the EEGNet/Train folder except the first, turns the epoched data into numpy data and adds it to the initially created numpy files. Additionally, labels are created and added to a label file
for file in os.listdir("EEGNet/Train")[1:]:
    epochs = mne.read_epochs(f"EEGNet/Train/{file}")
    temp = epochs.get_data()*1000
    train_file = np.append(train_file, temp, axis=0)
    
    label = 0 if file.startswith('C') else 1
    labels.extend([label] * temp.shape[0])

train_labels = np.array(labels)
np.save("EEGNet/training_labels", train_labels) # Saving the labels as their own file
labels = [0] * valid_file.shape[0]
    
for file in os.listdir("EEGNet/Valid")[1:]:
    epochs = mne.read_epochs(f"EEGNet/Valid/{file}")
    temp = epochs.get_data()*1000
    valid_file = np.append(valid_file, temp, axis=0)
    
    label = 0 if file.startswith('C') else 1
    labels.extend([label] * temp.shape[0])
    
valid_labels = np.array(labels)
np.save("EEGNet/valid_labels", valid_labels)
labels = [0] * test_file.shape[0]

for file in os.listdir("EEGNet/Test")[1:]:
    epochs = mne.read_epochs(f"EEGNet/Test/{file}")
    temp = epochs.get_data()*1000
    test_file = np.append(test_file, temp, axis=0)
    
    label = 0 if file.startswith('C') else 1
    labels.extend([label] * temp.shape[0])    

test_labels = np.array(labels)
np.save("EEGNet/test_labels", test_labels)

#Ensuring that each epoch / trial has an associated label
print(f'Training data shapes: {train_file.shape}, {valid_file.shape}, {test_file.shape}')
print(f'Labels shapes: {train_labels.shape}, {valid_labels.shape}, {test_labels.shape}')

np.save("EEGNet/train.npy", train_file)
np.save("EEGNet/valid.npy", valid_file)
np.save("EEGNet/test.npy", test_file)


In [None]:
# This code brings the code into the expected format for the used Pytorch implementation of EEGNet (--> Trials, Model-Channels, EEG-Channels, Timepoints)
X_train      = np.load("EEGNet/train.npy")
X_train = np.expand_dims(X_train, axis=-1)
X_train = np.transpose(X_train, (0, 3, 1, 2))
Y_train      = np.load("EEGNet/training_labels.npy")

X_valid   = np.load("EEGNet/valid.npy")
X_valid = np.expand_dims(X_valid, axis=-1)
X_valid = np.transpose(X_valid, (0, 3, 1, 2))
Y_valid   = np.load("EEGNet/valid_labels.npy")

X_test       = np.load("EEGNet/test.npy")
X_test = np.expand_dims(X_test, axis=-1)
X_test = np.transpose(X_test, (0, 3, 1, 2))
Y_test       = np.load("EEGNet/test_labels.npy")

# convert labels to one-hot encodings.
Y_train      = to_categorical(Y_train)
Y_valid   = to_categorical(Y_valid)
Y_test       = to_categorical(Y_test)
   
print('X_train shape:', X_train.shape)
print(X_train.shape[0], 'train samples')
print(X_test.shape[0], 'test samples')

In [None]:
tensor_xtrain = torch.Tensor(X_train)
tensor_ytrain = torch.Tensor(Y_train)
tensor_xvalid = torch.Tensor(X_valid)
tensor_yvalid = torch.Tensor(Y_valid)
tensor_xtest = torch.Tensor(X_test)
tensor_ytest = torch.Tensor(Y_test)

tensor_xtrain = tensor_xtrain.float()
tensor_xvalid = tensor_xvalid.float()
tensor_xtest = tensor_xtest.float()

# Calculate mean and standard deviation along the trials/EEG-epochs axis (axis=0)
mean = torch.mean(tensor_xtrain, dim=(0, 2, 3), keepdim=True)
std = torch.std(tensor_xtrain, dim=(0, 2, 3), keepdim=True)

vmean = torch.mean(tensor_xvalid, dim=(0, 2, 3), keepdim=True)
vstd = torch.std(tensor_xvalid, dim=(0, 2, 3), keepdim=True)

tmean = torch.mean(tensor_xtest, dim=(0, 2, 3), keepdim=True)
tstd = torch.std(tensor_xtest, dim=(0, 2, 3), keepdim=True)

# Standardize the data
standardized_tensor_xtrain = (tensor_xtrain - mean) / std
standardized_tensor_xvalid = (tensor_xvalid - vmean) / vstd
standardized_tensor_xtest = (tensor_xtest - tmean) / tstd


In [None]:
train_dataset = TensorDataset(standardized_tensor_xtrain, tensor_ytrain)
#train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
torch.save(train_dataset, "EEGNet/train_dataset_normalized.pt")

valid_dataset = TensorDataset(standardized_tensor_xvalid, tensor_yvalid)
#valid_loader =  DataLoader(valid_dataset, batch_size=16, shuffle=False)
torch.save(valid_dataset, "EEGNet/valid_dataset_normalized.pt")

test_dataset = TensorDataset(standardized_tensor_xtest, tensor_ytest)
#test_loader =  DataLoader(test_dataset, batch_size=128, shuffle=False)
torch.save(test_dataset, "EEGNet/test_dataset_normalized.pt")

In [None]:


tensor_xtrain = torch.Tensor(X_train)

tensor_ytrain = torch.Tensor(Y_train)
tensor_xvalid = torch.Tensor(X_valid)
tensor_yvalid = torch.Tensor(Y_valid)
tensor_xtest = torch.Tensor(X_test)
tensor_ytest = torch.Tensor(Y_test)

train_dataset = TensorDataset(tensor_xtrain, tensor_ytrain)
#train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
torch.save(train_dataset, "EEGNet/train_dataset_normalized.pt")

valid_dataset = TensorDataset(tensor_xvalid, tensor_yvalid)
#valid_loader =  DataLoader(valid_dataset, batch_size=16, shuffle=False)
torch.save(valid_dataset, "EEGNet/valid_dataset_normalized.pt")

test_dataset = TensorDataset(tensor_xtest, tensor_ytest)
#test_loader =  DataLoader(test_dataset, batch_size=128, shuffle=False)
torch.save(test_dataset, "EEGNet/test_dataset_normalized.pt")


In [None]:
# The following code turns each file into tensors, which are then used to create datasets and dataloaders for the models
tensor_xtrain = torch.Tensor(X_train)
tensor_ytrain = torch.Tensor(Y_train)
tensor_xvalid = torch.Tensor(X_valid)
tensor_yvalid = torch.Tensor(Y_valid)
tensor_xtest = torch.Tensor(X_test)
tensor_ytest = torch.Tensor(Y_test)

train_dataset = TensorDataset(tensor_xtrain, tensor_ytrain)
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
torch.save(train_dataset, "EEGNet/train_dataset.pt")
torch.save(train_loader, "EEGNet/train_loader.pt")

valid_dataset = TensorDataset(tensor_xvalid, tensor_yvalid)
valid_loader =  DataLoader(valid_dataset, batch_size=32, shuffle=False)
torch.save(valid_dataset, "EEGNet/valid_dataset.pt")
torch.save(valid_loader, "EEGNet/valid_loader.pt")

test_dataset = TensorDataset(tensor_xtest, tensor_ytest)
test_loader =  DataLoader(test_dataset, batch_size=32, shuffle=True)
torch.save(test_dataset, "EEGNet/test_dataset.pt")
torch.save(test_loader, "EEGNet/test_loader.pt")

### EEGNet via TF

In [2]:
X_train      = np.load("EEGNet/train.npy")
X_train = np.expand_dims(X_train, axis=-1)
Y_train      = np.load("EEGNet/training_labels.npy")

X_valid   = np.load("EEGNet/valid.npy")
X_valid = np.expand_dims(X_valid, axis=-1)
Y_valid   = np.load("EEGNet/valid_labels.npy")

X_test       = np.load("EEGNet/test.npy")
X_test = np.expand_dims(X_test, axis=-1)
Y_test       = np.load("EEGNet/test_labels.npy")

kernels, chans, samples = 1, 144, 1024

# convert labels to one-hot encodings.
Y_train      = to_categorical(Y_train)
Y_valid      = to_categorical(Y_valid)
Y_test       = to_categorical(Y_test)
   
print('X_train shape:', X_train.shape)
print(X_train.shape[0], 'train samples')
print(X_test.shape[0], 'test samples')

FileNotFoundError: [Errno 2] No such file or directory: 'EEGNet/training_labels.npy'

In [None]:
# configure the EEGNet-8,2,16 model with kernel length of 32 samples (other 
# model configurations may do better, but this is a good starting point)
model = EEGNet(nb_classes = 2, Chans = chans, Samples = samples, 
               dropoutRate = 0.5, kernLength = 32, F1 = 8, D = 2, F2 = 16, 
               dropoutType = 'Dropout')

# compile the model and set the optimizers
model.compile(loss='binary_crossentropy', optimizer='adam', 
              metrics = ['accuracy'])

# count number of parameters in the model
#numParams    = model.count_params()    

# set a valid path for your system to record model checkpoints
checkpointer = ModelCheckpoint(filepath='/tmp/checkpoint.keras', verbose=1,
                               save_best_only=True)

model.summary()

In [None]:
fittedModel = model.fit(X_train, Y_train, batch_size = 16, epochs = 11, 
                        verbose = 1, validation_data=(X_valid, Y_valid),
                        callbacks=[checkpointer])

# load optimal weights
model.load_weights('/tmp/checkpoint.keras')

#Evaluate the model
loss, accuracy = model.evaluate(X_test, Y_test)
print(f'Test Loss: {loss:.4f}')
print(f'Test Accuracy: {accuracy:.4f}')
 
#Save the model
model.save('eegnet_tf.keras')

### EEGNet in Pytorch (via https://github.com/Amir-Hofo/EEGNet_Pytorch)

In [None]:
# Backup version with Sigmoid function
class PytorchEEG(nn.Module): 
    def __init__(self, nb_classes=2, nb_channels=144, nb_samples=1024, F1=8, D=2, freq=512, dropout=0.25, device='cuda'):
        super().__init__()
        #Parameters
        self.nb_samples = nb_samples
        self.nb_classes= nb_classes
        self.nb_channels = nb_channels
        self.nb_inputs = 1
        self.F1 = F1
        self.D = D
        self.F2 = F1*D
        self.freq = freq
        self.dropout = dropout
        self.device = torch.device(device)
        
        self.kernel_size_1 = (1,round(self.freq/2)) 
        self.kernel_size_2 = (nb_channels, 1)
        self.kernel_size_3 = (1, round(self.freq/8))
        self.kernel_size_4 = (1, 1)
        
        self.kernel_avgpool_1 = (1,4)
        self.kernel_avgpool_2= (1,8)
        
        self.kernel_padding_1 = (int(round((self.kernel_size_1[0]-1)/2)) , (int(round((self.kernel_size_1[1]-1)/2)))-1)
        self.kernel_padding_3 = (int(round((self.kernel_size_3[0]-1)/2)) , (int(round((self.kernel_size_3[1]-1)/2))))
        
        # layer 1
        self.conv2d = nn.Conv2d(self.nb_inputs, self.F1, self.kernel_size_1, padding=self.kernel_padding_1)
        self.Batch_normalization_1 = nn.BatchNorm2d(self.F1)
        # layer 2
        self.Depthwise_conv2D = nn.Conv2d(self.F1, self.D*self.F1, self.kernel_size_2, groups= self.F1)
        self.Batch_normalization_2 = nn.BatchNorm2d(self.D*self.F1)
        self.Elu = nn.ELU()
        self.Average_pooling2D_1 = nn.AvgPool2d(self.kernel_avgpool_1)
        self.Dropout = nn.Dropout2d(self.dropout)
        # layer 3
        self.Separable_conv2D_depth = nn.Conv2d(self.D*self.F1, self.D*self.F1, self.kernel_size_3,
                                                padding=self.kernel_padding_3, groups= self.D*self.F1)
        self.Separable_conv2D_point = nn.Conv2d(self.D*self.F1, self.F2, self.kernel_size_4)
        self.Batch_normalization_3 = nn.BatchNorm2d(self.F2)
        self.Average_pooling2D_2 = nn.AvgPool2d(self.kernel_avgpool_2)
        # layer 4
        self.Flatten = nn.Flatten()
        self.Dense = nn.Linear(self.F2*round(self.nb_samples/32), self.nb_classes)
        
        if self.nb_classes == 2:
            self.Sigmoid = nn.Sigmoid()
        else:
            self.Softmax = nn.Softmax(dim= 1)
        
    def forward(self, x):
        # layer 1
        y = self.Batch_normalization_1(self.conv2d(x)) #.relu()
        # layer 2
        y = self.Batch_normalization_2(self.Depthwise_conv2D(y))
        y = self.Elu(y)
        y = self.Dropout(self.Average_pooling2D_1(y))
        # layer 3
        y = self.Separable_conv2D_depth(y)
        y = self.Batch_normalization_3(self.Separable_conv2D_point(y))
        y = self.Elu(y)
        y = self.Dropout(self.Average_pooling2D_2(y))
        # layer 4
        y = self.Flatten(y)
        y = self.Dense(y)
        
        if self.nb_classes == 2:
            y = self.Sigmoid(y)
        else:
            y = self.Softmax(y)
        return y
    
    def max_norm(self):
        with torch.no_grad():
            for param in self.parameters():
                if len(param.shape) > 1:
                    param.clamp_(-1.0, 1.0)

In [5]:
device= 'cuda'
class PytorchEEG(nn.Module): 
    def __init__(self, nb_classes=2, nb_channels=144, nb_samples=1024, F1=8, D=2, freq=512, dropout=0.25, device='cuda'):
        super().__init__()
        #Parameters
        self.nb_samples = nb_samples
        self.nb_classes= nb_classes
        self.nb_channels = nb_channels
        self.nb_inputs = 1
        self.F1 = F1
        self.D = D
        self.F2 = F1*D
        self.freq = freq
        self.dropout = dropout
        self.device = torch.device(device)
        
        self.kernel_size_1 = (1,round(self.freq/2)) 
        self.kernel_size_2 = (nb_channels, 1)
        self.kernel_size_3 = (1, round(self.freq/8))
        self.kernel_size_4 = (1, 1)
        
        self.kernel_avgpool_1 = (1,4)
        self.kernel_avgpool_2= (1,8)
        
        self.kernel_padding_1 = (int(round((self.kernel_size_1[0]-1)/2)) , (int(round((self.kernel_size_1[1]-1)/2)))-1)
        self.kernel_padding_3 = (int(round((self.kernel_size_3[0]-1)/2)) , (int(round((self.kernel_size_3[1]-1)/2))))
        
        # layer 1
        self.conv2d = nn.Conv2d(self.nb_inputs, self.F1, self.kernel_size_1, padding=self.kernel_padding_1)
        self.Batch_normalization_1 = nn.BatchNorm2d(self.F1)
        # layer 2
        self.Depthwise_conv2D = nn.Conv2d(self.F1, self.D*self.F1, self.kernel_size_2, groups= self.F1)
        self.Batch_normalization_2 = nn.BatchNorm2d(self.D*self.F1)
        self.Elu = nn.ELU()
        self.Average_pooling2D_1 = nn.AvgPool2d(self.kernel_avgpool_1)
        self.Dropout = nn.Dropout2d(self.dropout)
        # layer 3
        self.Separable_conv2D_depth = nn.Conv2d(self.D*self.F1, self.D*self.F1, self.kernel_size_3,
                                                padding=self.kernel_padding_3, groups= self.D*self.F1)
        self.Separable_conv2D_point = nn.Conv2d(self.D*self.F1, self.F2, self.kernel_size_4)
        self.Batch_normalization_3 = nn.BatchNorm2d(self.F2)
        self.Average_pooling2D_2 = nn.AvgPool2d(self.kernel_avgpool_2)
        # layer 4
        self.Flatten = nn.Flatten()
        self.Dense = nn.Linear(self.F2*round(self.nb_samples/32), self.nb_classes)
        
    def forward(self, x):
        # layer 1
        y = self.Batch_normalization_1(self.conv2d(x)) #.relu()
        # layer 2
        y = self.Batch_normalization_2(self.Depthwise_conv2D(y))
        y = self.Elu(y)
        y = self.Dropout(self.Average_pooling2D_1(y))
        # layer 3
        y = self.Separable_conv2D_depth(y)
        y = self.Batch_normalization_3(self.Separable_conv2D_point(y))
        y = self.Elu(y)
        y = self.Dropout(self.Average_pooling2D_2(y))
        # layer 4
        y = self.Flatten(y)
        y = self.Dense(y)

        return y
    
    def max_norm(self):
        with torch.no_grad():
            for param in self.parameters():
                if len(param.shape) > 1:
                    param.clamp_(-1.0, 1.0)
                    
class AverageMeter(object):
    """Computes and stores the average and current value"""
    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

model = PytorchEEG(nb_classes=2, nb_samples=512, nb_channels=143, F1 = 64, D = 2, dropout=0.4)
model.cuda()

PytorchEEG(
  (conv2d): Conv2d(1, 64, kernel_size=(1, 256), stride=(1, 1), padding=(0, 127))
  (Batch_normalization_1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (Depthwise_conv2D): Conv2d(64, 128, kernel_size=(143, 1), stride=(1, 1), groups=64)
  (Batch_normalization_2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (Elu): ELU(alpha=1.0)
  (Average_pooling2D_1): AvgPool2d(kernel_size=(1, 4), stride=(1, 4), padding=0)
  (Dropout): Dropout2d(p=0.4, inplace=False)
  (Separable_conv2D_depth): Conv2d(128, 128, kernel_size=(1, 64), stride=(1, 1), padding=(0, 32), groups=128)
  (Separable_conv2D_point): Conv2d(128, 128, kernel_size=(1, 1), stride=(1, 1))
  (Batch_normalization_3): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (Average_pooling2D_2): AvgPool2d(kernel_size=(1, 8), stride=(1, 8), padding=0)
  (Flatten): Flatten(start_dim=1, end_dim=-1)
  (Dense): Linear(in_features=2

In [6]:
def train_one_epoch(model, train_loader, valid_loader, criterion, optimizer):
    model.train(True)
    loss_train = AverageMeter()
    acc_train = Accuracy(task='binary').to(device)
    
    loss_val = AverageMeter()
    acc_val = Accuracy(task='binary').to(device)
    
    train_loader = tqdm(train_loader, desc="Training", leave=False) #Gives progress bar
    
    for i, (inputs, targets) in enumerate(train_loader):
        inputs = inputs.to(device)
        targets = targets.to(device)
        
        optimizer.zero_grad()
        
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        nn.utils.clip_grad_norm_(model.parameters(), 1)
        optimizer.step()
        model.max_norm()

        loss_train.update(loss.item())
        acc_train(outputs, targets.int())
        train_loader.set_postfix({'loss': loss_train.avg, 'accuracy': acc_train.compute().item()}, refresh=True)
    model.eval()
    with torch.no_grad():
        for i, (vinputs, vtargets) in enumerate(valid_loader):
            vinputs = vinputs.to(device)
            vtargets = vtargets.to(device)
            
            voutputs = model(vinputs)
            vloss = criterion(voutputs, vtargets)
            loss_val.update(vloss.item())
            acc_val(voutputs, vtargets.int())
    
    return model, loss_train.avg, acc_train.compute().item(), loss_val.avg, acc_val.compute().item()

In [7]:
X_train = np.load('PreprocessedEpochData/X_train.npy')
X_val = np.load('PreprocessedEpochData/X_val.npy')
X_test = np.load('PreprocessedEpochData/X_test.npy')

y_train = np.load('PreprocessedEpochData/y_train.npy')
y_val = np.load('PreprocessedEpochData/y_val.npy')
y_test = np.load('PreprocessedEpochData/y_test.npy')

In [8]:
# Assuming X_train and y_train are numpy arrays
X_train_reshaped = np.expand_dims(X_train, axis=1)  # Add a new axis at index 1
X_train_tensor = torch.tensor(X_train_reshaped, dtype=torch.float32)  # Convert numpy array to tensor
y_train_cat = to_categorical(y_train)
y_train_tensor = torch.tensor(y_train_cat, dtype=torch.float32)  # Convert numpy array to tensor

X_val_reshaped = np.expand_dims(X_val, axis=1)  # Add a new axis at index 1
X_val_tensor = torch.tensor(X_val_reshaped, dtype=torch.float32)  # Convert numpy array to tensor
y_val_cat = to_categorical(y_val)
y_val_tensor = torch.tensor(y_val_cat, dtype=torch.float32)  # Convert numpy array to tensor

X_test_reshaped = np.expand_dims(X_test, axis=1)  # Add a new axis at index 1
X_test_tensor = torch.tensor(X_test_reshaped, dtype=torch.float32)  # Convert numpy array to tensor
y_test_cat = to_categorical(y_test)
y_test_tensor = torch.tensor(y_test_cat, dtype=torch.float32)  # Convert numpy array to tensor

# Create a TensorDataset
train_data = TensorDataset(X_train_tensor, y_train_tensor)
val_data = TensorDataset(X_val_tensor, y_val_tensor)
test_data = TensorDataset(X_test_tensor, y_test_tensor)


In [9]:
# train_data = torch.load('EEGNet/Correct-data/train_data_bce_z.pt') #or 'EEGNet/Correct-data/train_dataset_BCE.pt' when working with BCELoss
# val_data = torch.load('EEGNet/Correct-data/val_data_bce_z.pt')
# test_data = torch.load('EEGNet/Correct-data/test_data_bce_z.pt') # or 'EEGNet/Correct-data/test_dataset_BCE.pt' when working with BCELoss

# Create dataloaders for training and validation
train_loader = DataLoader(train_data, batch_size=16, shuffle=True)
val_loader = DataLoader(val_data, batch_size=16, shuffle=False)
test_loader = DataLoader(test_data, batch_size=128, shuffle=False)

In [10]:
# Optimizer
criterion = nn.BCEWithLogitsLoss().to(device)

Adamoptimizer = optim.Adam(model.parameters(), lr= 0.00001, weight_decay=1e-6)
Adamax = optim.Adamax(model.parameters(), lr= 0.000006)
NAdamoptimizer = optim.NAdam(model.parameters(), lr= 0.01)
RMSoptimizer= optim.RMSprop(model.parameters(), lr= 0.00001, weight_decay=1e-6)
SGDoptimizer = optim.SGD(model.parameters(), lr= 0.001, momentum=0.9)

# Data collection
loss_train_hist = []
acc_train_hist = []
loss_val_hist = []
acc_val_hist = []

for epoch in range(301): #choose number of epochs in range
    model, loss_train, acc_train, loss_val, acc_val = train_one_epoch(model, train_loader, val_loader, criterion, optimizer=Adamax)
  
    loss_train_hist.append(loss_train)
    acc_train_hist.append(acc_train)

    loss_val_hist.append(loss_val)
    acc_val_hist.append(acc_val)    

    print(f'epoch {epoch}:')
    print(f' Training Loss = {loss_train:.4}, Training Accuracy = {int(acc_train*100)}%')
    print(f' Validation Loss = {loss_val:.4}, Validation Accuracy = {int(acc_val*100)}% \n')    



epoch 0:
 Training Loss = 0.7147, Training Accuracy = 49%
 Validation Loss = 0.7868, Validation Accuracy = 49% 





epoch 1:
 Training Loss = 0.7068, Training Accuracy = 50%
 Validation Loss = 0.7494, Validation Accuracy = 51% 





epoch 2:
 Training Loss = 0.6892, Training Accuracy = 53%
 Validation Loss = 0.7283, Validation Accuracy = 56% 





epoch 3:
 Training Loss = 0.6814, Training Accuracy = 54%
 Validation Loss = 0.769, Validation Accuracy = 57% 





epoch 4:
 Training Loss = 0.6693, Training Accuracy = 56%
 Validation Loss = 0.6929, Validation Accuracy = 56% 





epoch 5:
 Training Loss = 0.6631, Training Accuracy = 57%
 Validation Loss = 0.7833, Validation Accuracy = 55% 





epoch 6:
 Training Loss = 0.6592, Training Accuracy = 58%
 Validation Loss = 0.7429, Validation Accuracy = 56% 





epoch 7:
 Training Loss = 0.6556, Training Accuracy = 58%
 Validation Loss = 0.9093, Validation Accuracy = 56% 





epoch 8:
 Training Loss = 0.6475, Training Accuracy = 59%
 Validation Loss = 0.9058, Validation Accuracy = 55% 





epoch 9:
 Training Loss = 0.6465, Training Accuracy = 59%
 Validation Loss = 0.8002, Validation Accuracy = 53% 





epoch 10:
 Training Loss = 0.6374, Training Accuracy = 61%
 Validation Loss = 0.9386, Validation Accuracy = 55% 





epoch 11:
 Training Loss = 0.6381, Training Accuracy = 60%
 Validation Loss = 0.9186, Validation Accuracy = 54% 





epoch 12:
 Training Loss = 0.6339, Training Accuracy = 61%
 Validation Loss = 0.9372, Validation Accuracy = 54% 





epoch 13:
 Training Loss = 0.632, Training Accuracy = 61%
 Validation Loss = 0.7125, Validation Accuracy = 56% 





epoch 14:
 Training Loss = 0.6239, Training Accuracy = 63%
 Validation Loss = 1.036, Validation Accuracy = 55% 





epoch 15:
 Training Loss = 0.6249, Training Accuracy = 63%
 Validation Loss = 0.9536, Validation Accuracy = 52% 





epoch 16:
 Training Loss = 0.6233, Training Accuracy = 62%
 Validation Loss = 1.204, Validation Accuracy = 57% 





epoch 17:
 Training Loss = 0.6166, Training Accuracy = 63%
 Validation Loss = 1.095, Validation Accuracy = 55% 





epoch 18:
 Training Loss = 0.6097, Training Accuracy = 64%
 Validation Loss = 1.103, Validation Accuracy = 54% 





: 

In [None]:
accuracy = Accuracy(task="binary").to('cuda')
all_pred = []
all_labels = []
all_losses = []
for testdata, labels in test_loader:
    testdata = testdata.cuda()
    labels = labels.cuda()

    with torch.no_grad():
        model.eval()
        y_pred = model(testdata)        
        y_pred_binary = (y_pred > 0.5).float()
        
        all_pred.append(y_pred_binary)
        all_labels.append(labels)

all_pred = torch.cat(all_pred)
all_labels = torch.cat(all_labels)

accuracy_score = accuracy(all_pred, all_labels)
print(f'Overall Accuracy: {accuracy_score.item()}')

In [None]:
plt.clf()

figure, axis = plt.subplots(2, 2)
# For Train Accuracy 
axis[0, 0].plot(range(48), acc_train_hist) 
axis[0, 0].set_title("Training accuracy") 
  
# For Cosine Function 
axis[0, 1].plot(range(48), acc_val_hist) 
axis[0, 1].set_title("Validation accuracy") 
  
# For Tangent Function 
axis[1, 0].plot(range(48), loss_train_hist) 
axis[1, 0].set_title("Training loss") 
  
# For Tanh Function 
axis[1, 1].plot(range(48), loss_val_hist) 
axis[1, 1].set_title("Validation loss") 

plt.show()

In [None]:
plt.plot(range(51), loss_val_hist, 'b-', label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.grid(True)
plt.legend()
plt.show()

In [None]:
plt.plot(range(51), acc_val_hist, 'b-', label='Validation Accuracy')
plt.xlabel('Epoch')
plt.ylabel('Acc')
plt.grid(True)
plt.legend()
plt.show()

In [None]:
import torch

# Set your model to evaluation mode
model.eval()

# Initialize empty lists to store predictions
all_predicted_probs = []
all_true_labels = []

# Iterate over batches in the test dataloader
for inputs, labels in test_loader:
    inputs = inputs.cuda()
    labels = labels.cuda()
    # Forward pass: compute predicted probabilities
    with torch.no_grad():
        predicted_probs = model(inputs)
    
    # Append predicted probabilities and true labels to the lists
    all_predicted_probs.append(predicted_probs)
    all_true_labels.append(labels)

# Concatenate predictions and true labels from all batches
predicted_labels = torch.cat(all_predicted_probs, dim=0)
true_labels = torch.cat(all_true_labels, dim=0)

true_labels = true_labels.cpu().numpy()
predicted_labels = predicted_labels.cpu().numpy()

# Now you can calculate metrics using all_predicted_probs and all_true_labels
accuracy = accuracy_score(true_labels, predicted_labels)
precision = precision_score(true_labels, predicted_labels)
recall = recall_score(true_labels, predicted_labels)
f1 = f1_score(true_labels, predicted_labels)
auc_roc = roc_auc_score(true_labels, predicted_probs[:, 1])
fpr, tpr, thresholds = roc_curve(true_labels, predicted_probs[:, 1])
conf_matrix = confusion_matrix(true_labels, predicted_labels)

In [None]:
import optuna

def objective(trial):
    # Suggest hyperparameters
    F1 = trial.suggest_int('F1', 4, 16)
    #D = trial.suggest_int('D', 2, 8)
    #dropout = trial.suggest_float('dropout', 0.1, 0.5)
    lr = trial.suggest_loguniform('lr', 0.0005, 0.001)
    
    # Initialize model with suggested hyperparameters
    model = PytorchEEG(nb_classes=2, nb_samples=1024, nb_channels=141, F1=F1, D=2, dropout=0.5)
    model.cuda()
    
    # Define loss and optimizer
    loss_fn = nn.BCELoss().to(device)
    optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=1e-6)
    
    # Training loop for a few epochs (reduce number of epochs for faster optimization)
    na = "NA"
    print(f"Now: F1:{F1}, D:{na}, dropout:{na}, lr:{lr}")
    num_epochs = 10
    for epoch in range(num_epochs):
        model, loss_train, acc_train, loss_val, acc_val = train_one_epoch(model, train_loader, val_loader, loss_fn, optimizer)
        print(f'epoch {epoch}:')
        print(f' Training Loss = {loss_train:.4}, Training Accuracy = {int(acc_train*100)}%')
        print(f' Validation Loss = {loss_val:.4}, Validation Accuracy = {int(acc_val*100)}% \n') 
    # Return the validation loss for Optuna to minimize
    return loss_val

# Create an Optuna study and optimize
study = optuna.create_study(direction='minimize')
study.optimize(objective, n_trials=50)

# Get the best hyperparameters
best_params = study.best_params
best_loss = study.best_value
print("Best hyperparameters: ", best_params)
print("Best Validation Loss:", best_loss)

### EEGNet via torchEEG

In [2]:
train_data = torch.load('EEGNet/Correct-data/train_data.pt')
val_data = torch.load('EEGNet/Correct-data/val_data.pt')
test_data = torch.load('EEGNet/Correct-data/test_data.pt')

In [3]:
# Create dataloaders for training and validation
train_loader = DataLoader(train_data, batch_size=32, shuffle=True)
val_loader = DataLoader(val_data, batch_size=32, shuffle=False)
test_loader = DataLoader(test_data, batch_size=128, shuffle=False)

In [None]:
model2 = torchEEG(chunk_size=1024, num_electrodes=141, num_classes=2, F1=64, D=2, F2=128, dropout=0.4)
model2.cuda()

trainer = ClassifierTrainer(model=model2,
                            num_classes=2,
                            lr=0.00001,
                            weight_decay=1e-6,
                            accelerator="gpu",
                            metrics=['accuracy', 'f1score'])


trainer.fit(train_loader=train_loader,
            val_loader=val_loader,
            max_epochs=300,
            callbacks=[pl.callbacks.ModelCheckpoint(save_last=True)],
            enable_progress_bar=True,
            enable_model_summary=True,
            limit_val_batches=0.0)

score = trainer.test(test_loader,
                     enable_progress_bar=True,
                     enable_model_summary=True)[0]
print(f'Test accuracy for F1=8, D=2, F2=16, dropout=0.3: {score["test_accuracy"]:.4f}')

### EEGNet via Braindecode

#### Preprocessing

In [None]:
# Lists to store data and labels
X_train = np.empty((0,143,512))  # EEG training data
y_train = np.empty((0,))  # Training labels

X_val = np.empty((0,143,512))
y_val = np.empty((0,))

X_test = np.empty((0,143,512))  # EEG testing data
y_test = np.empty((0,))   # Testing labels


# Loop through each subject in the training set
for file in os.listdir("PreprocessedEpochData/Train"):
    subject_file = np.load(f'PreprocessedEpochData/Train/{file}')
    print(f'Now {file} in train')
    
    # Determine label based on whether the subject is a control or migraineur
    if file[0] == 'C':
        label = 0  # Control
    elif file[0] == 'M':
        label = 1  # Migraineur

    # Append subject data and labels to the overall training sets
    X_train = np.append(X_train, subject_file, axis=0)
    y_train = np.append(y_train, [label]*len(subject_file))

# Loop through each subject in the training set
for file in os.listdir("PreprocessedEpochData/Valid"):
    subject_file = np.load(f'PreprocessedEpochData/Valid/{file}')
    print(f'Now {file} in validation')
    
    # Determine label based on whether the subject is a control or migraineur
    if file[0] == 'C':
        label = 0  # Control
    elif file[0] == 'M':
        label = 1  # Migraineur

    # Append subject data and labels to the overall training sets
    X_val = np.append(X_val, subject_file, axis=0)
    y_val = np.append(y_val, [label]*len(subject_file))

# Loop through each subject in the testing set
for file in os.listdir("PreprocessedEpochData/Test"):
    subject_file = np.load(f'PreprocessedEpochData/Test/{file}')
    print(f'Now {file} in test')
    
    # Determine label based on whether the subject is a control or migraineur
    if file[0] == 'C':
        label = 0  # Control
    else:
        label = 1  # Migraineur

    # Append subject data and labels to the overall testing sets
    X_test = np.append(X_test, subject_file, axis=0)
    y_test = np.append(y_test, [label]*len(subject_file))

y_train = y_train.astype(np.int32)
y_val = y_val.astype(np.int32)
y_test = y_test.astype(np.int32)

np.save('PreprocessedEpochData/X_train.npy', X_train)
np.save('PreprocessedEpochData/X_val.npy', X_val)
np.save('PreprocessedEpochData/X_test.npy', X_test)
np.save('PreprocessedEpochData/y_train.npy', y_train)
np.save('PreprocessedEpochData/y_val.npy', y_val)
np.save('PreprocessedEpochData/y_test.npy', y_test)

In [None]:
from sklearn.model_selection import train_test_split
#! (VERY IMPORTANT STEP)
#Standardizing files in training, validation and test set 
def standardizete(f):
    epoch = mne.read_epochs(f'EEGNet/Test/{f}')
    epochdata = epoch.pick(['eeg'])
    a = epochdata.get_data()
    labels = a.shape[0]
    scale = mne.decoding.Scaler(info=epochdata.info, scalings='mean')
    if f[0] == 'C':
        epo_std = scale.fit_transform(a, y=np.zeros(labels, dtype=np.int32))
    elif f[0] == 'M':
        epo_std = scale.fit_transform(a, y=np.ones(labels, dtype=np.int32))
    return epo_std

def standardizetr(f):
    epoch = mne.read_epochs(f'EEGNet/Train/{f}')
    epochdata = epoch.pick(['eeg'])
    a = epochdata.get_data()
    labels = a.shape[0]
    scale = mne.decoding.Scaler(info=epochdata.info, scalings='mean')
    if f[0] == 'C':
        epo_std = scale.fit_transform(a, y=np.zeros(labels, dtype=np.int32))
    elif f[0] == 'M':
        epo_std = scale.fit_transform(a, y=np.ones(labels, dtype=np.int32))
    return epo_std

def standardizeval(f):
    epoch = mne.read_epochs(f'EEGNet/Valid/{f}')
    epochdata = epoch.pick(['eeg'])
    a = epochdata.get_data()
    labels = a.shape[0]
    scale = mne.decoding.Scaler(info=epochdata.info, scalings='mean')
    if f[0] == 'C':
        epo_std = scale.fit_transform(a, y=np.zeros(labels, dtype=np.int32))
    elif f[0] == 'M':
        epo_std = scale.fit_transform(a, y=np.ones(labels, dtype=np.int32))
    return epo_std

# Lists to store data and labels
X_train = np.empty((0,141,1024))  # EEG training data
y_train = np.empty((0,))  # Training labels

X_val = np.empty((0,141,1024))
y_val = np.empty((0,))

X_test = np.empty((0,141,1024))  # EEG testing data
y_test = np.empty((0,))   # Testing labels

# Loop through each subject in the training set
for subject_file in os.listdir("EEGNet/Train"):
    print(f'Now {subject_file} in train')
    # Standardize data
    epo_std = standardizetr(subject_file)
    
    # Determine label based on whether the subject is a control or migraineur
    if subject_file[0] == 'C':
        label = 0  # Control
    elif subject_file[0] == 'M':
        label = 1  # Migraineur

    # Append subject data and labels to the overall training sets
    X_train = np.append(X_train, epo_std, axis=0)
    y_train = np.append(y_train, [label]*len(epo_std))

# Loop through each subject in the training set
for subject_file in os.listdir("EEGNet/Valid"):
    print(f'Now {subject_file} in train')
    # Standardize data
    epo_std = standardizeval(subject_file)
    
    # Determine label based on whether the subject is a control or migraineur
    if subject_file[0] == 'C':
        label = 0  # Control
    elif subject_file[0] == 'M':
        label = 1  # Migraineur

    # Append subject data and labels to the overall training sets
    X_val = np.append(X_val, epo_std, axis=0)
    y_val = np.append(y_val, [label]*len(epo_std))

# Loop through each subject in the testing set
for subject_file in os.listdir("EEGNet/Test"):
    print(f'Now {subject_file} in test')
    # Standardize data
    epo_std = standardizete(subject_file)
    
    # Determine label based on whether the subject is a control or migraineur
    if subject_file[0] == 'C':
        label = 0  # Control
    else:
        label = 1  # Migraineur

    # Append subject data and labels to the overall testing sets
    X_test = np.append(X_test, epo_std, axis=0)
    y_test = np.append(y_test, [label]*len(epo_std))

y_train = y_train.astype(np.int32)
y_val = y_val.astype(np.int32)
y_test = y_test.astype(np.int32)

np.save('EEGNet/Correct-data/X_train_z.npy', X_train)
np.save('EEGNet/Correct-data/X_val_z.npy', X_val)
np.save('EEGNet/Correct-data/X_test_z.npy', X_test)
np.save('EEGNet/Correct-data/y_train_z.npy', y_train)
np.save('EEGNet/Correct-data/y_val_z.npy', y_val)
np.save('EEGNet/Correct-data/y_test_z.npy', y_test)

In [None]:
# epoch = mne.read_epochs('EEGNet/Allfiles/C1-epo.fif')
# epoch.pick(['eeg'])

# def todataset(data_array, datalabels):
#     combined = list(zip(data_array, datalabels))
#     np.random.shuffle(combined)
#     data_array_shuffled, datalabels_shuffled = zip(*combined)
#     del combined
#     data_array_shuffled = np.array(data_array_shuffled)
#     datalabels_shuffled = np.array(datalabels_shuffled)
#     datalabels_shuffled = to_categorical(datalabels_shuffled)
#     datalabels_shuffled.astype(np.single)
#     print(data_array_shuffled.shape, datalabels_shuffled.shape)
#     data_epochs = mne.EpochsArray(data_array_shuffled, info=epoch.info)
#     del data_array_shuffled
#     datasetxy = create_from_X_y(data_epochs, datalabels, drop_last_window=True, sfreq=512, ch_names=epoch.ch_names)
#     return datasetxy

#### Debugging

In [None]:
X_train = np.expand_dims(X_train, axis=1)
X_val = np.expand_dims(X_val, axis=1)
X_test = np.expand_dims(X_test, axis=1)

In [None]:
X_train = torch.Tensor(X_train)
X_val = torch.Tensor(X_val)
X_test = torch.Tensor(X_test)

In [None]:
y_train = torch.Tensor(y_train)
y_val = torch.Tensor(y_val)
y_test = torch.Tensor(y_test)

In [None]:
y_train.shape

In [None]:
y_train = y_train.long()
y_val = y_val.long()
y_test = y_test.long()

In [None]:
train_data = TensorDataset(X_train, y_train)
torch.save(train_data, 'EEGNet/Correct-data/train_data_z.pt')

val_data = TensorDataset(X_val, y_val)
torch.save(val_data, 'EEGNet/Correct-data/val_data_z.pt')

test_data = TensorDataset(X_test, y_test)
torch.save(test_data, 'EEGNet/Correct-data/test_data_z.pt')

In [None]:
y_train_cat = to_categorical(y_train)
y_val_cat = to_categorical(y_val)
y_test_cat = to_categorical(y_test)

In [None]:
y_train_cat = y_train_cat.astype(np.float32)
y_val_cat = y_val_cat.astype(np.float32)
y_test_cat = y_test_cat.astype(np.float32)

y_train_cat = torch.Tensor(y_train_cat)
y_val_cat = torch.Tensor(y_val_cat)
y_test_cat = torch.Tensor(y_test_cat)

In [None]:
train_data = TensorDataset(X_train, y_train_cat)
torch.save(train_data, 'EEGNet/Correct-data/train_data_bce_z.pt')

val_data = TensorDataset(X_val, y_val_cat)
torch.save(val_data, 'EEGNet/Correct-data/val_data_bce_z.pt')

test_data = TensorDataset(X_test, y_test_cat)
torch.save(test_data, 'EEGNet/Correct-data/test_data_bce_z.pt')

#### Feature testing

In [57]:
train_dataframes = []
test_dataframes = []

for file in os.listdir('EEGNet/Trainval'):
    shortname = file[:-8] #gives me the exact subject number
    raw = mne.read_epochs(f'EEGNet/Trainval/{file}')
    raw = raw.pick(None, exclude=['Fp1', 'ECG'])
    raw = raw.to_data_frame()
    raw = raw.drop(columns=['time', 'condition', 'Status'])
    raw = raw.groupby('epoch').mean().reset_index()
    raw = raw.drop(columns=['epoch'])
    raw = raw * 1e-6
    
    if shortname[0] == "M":
        raw['migraine'] = 1
    else:
        raw['migraine'] = 0

    train_dataframes.append(raw)

train_df = pd.concat(train_dataframes, ignore_index=True)

for file in os.listdir('EEGNet/Test'):
    shortname = file[:-8] #gives me the exact subject number
    raw = mne.read_epochs(f'EEGNet/Test/{file}')
    raw = raw.pick(None, exclude=['Fp1', 'ECG'])
    raw = raw.to_data_frame()
    raw = raw.drop(columns=['time', 'condition', 'Status'])
    raw = raw.groupby('epoch').mean().reset_index()
    raw = raw.drop(columns=['epoch'])
    raw = raw * 1e-6
    
    if shortname[0] == "M":
        raw['migraine'] = 1
    else:
        raw['migraine'] = 0

    test_dataframes.append(raw)

test_df = pd.concat(test_dataframes, ignore_index=True)

In [5]:
df = pd.read_csv('PreprocessedEpochData/FullDF - Copy.csv', index_col=0)
train_subjects = ['C1', 'C2', 'C3', 'C4', 'C5', 'C6', 'C7', 'C8', 'C9', 'C10', 'C11', 'C12', 'C13', 'M1', 'M2', 'M3', 'M4', 'M5', 'M6', 'M7', 'M8', 'M9', 'M10', 'M11','M12']

train_df = df[df['subject'].isin(train_subjects)]
test_df = df[~df['subject'].isin(train_subjects)]

In [6]:
train_df = train_df.sample(frac=1)
test_df = test_df.sample(frac=1)

train_df_x = train_df.drop(columns=['subject','migraine'])
train_df_y = train_df['migraine']


test_df_x = test_df.drop(columns=['subject','migraine'])
test_df_y = test_df['migraine']

# Initialize and train the Random Forest model
rf_classifier = RandomForestClassifier(random_state=42)
rf_classifier.fit(train_df_x, train_df_y)

# Predict on the test data
y_pred = rf_classifier.predict(test_df_x)

# Calculate and print the accuracy
accuracy = accuracy_score(test_df_y, y_pred)
report = classification_report(test_df_y, y_pred)
conf_matrix = confusion_matrix(test_df_y, y_pred)

print("Accuracy:", accuracy)
print("\nClassification Report:\n", report)
print("\nConfusion Matrix:\n", conf_matrix)

Accuracy: 0.5986159169550173

Classification Report:
               precision    recall  f1-score   support

           0       0.61      0.75      0.67      1584
           1       0.58      0.41      0.48      1306

    accuracy                           0.60      2890
   macro avg       0.59      0.58      0.58      2890
weighted avg       0.59      0.60      0.59      2890


Confusion Matrix:
 [[1190  394]
 [ 766  540]]


In [7]:
# Plot confusion matrix
sns.heatmap(conf_matrix, annot=True, fmt='d', cmap='Blues')
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [8]:
#Perform cross-validation
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
cv_scores = cross_val_score(rf_classifier, train_df_x, train_df_y, cv=cv, scoring='accuracy')

print(f'Cross-Validation Accuracy: {np.mean(cv_scores) * 100:.2f}%')
print(f'Cross-Validation Scores: {cv_scores}')

# Check feature importances
importances = rf_classifier.feature_importances_
feature_importance_df = pd.DataFrame({'feature': train_df_x.columns, 'importance': importances})
feature_importance_df = feature_importance_df.sort_values(by='importance', ascending=False)
print(feature_importance_df.head(10))  # Print top 10 features

# Plot feature importances
feature_importance_df.set_index('feature').head(10).plot(kind='bar')
plt.title('Top 10 Feature Importances')
plt.ylabel('Importance')
plt.show()

# Visual inspection of top features
top_features = feature_importance_df['feature'].head(3)
for feature in top_features:
    sns.scatterplot(data=df, x=feature, y='migraine')
    plt.title(f'{feature} vs Migraine')
    plt.show()

Cross-Validation Accuracy: 69.30%
Cross-Validation Scores: [0.69431921 0.70513977 0.68440036 0.69161407 0.68953069]
         feature  importance
3    delta_AFF5h    0.111994
598    gamma_TP7    0.057573
568  gamma_AFF1h    0.043851
316    alpha_TP7    0.042762
627    gamma_Fpz    0.039479
567  gamma_AFF5h    0.037492
175    theta_TP7    0.036617
578  gamma_FFC1h    0.036336
586  gamma_FCC5h    0.031211
606  gamma_CPP1h    0.030002


In [10]:
print("Training accuracy:", rf_classifier.score(train_df_x, train_df_y))
print("Test accuracy:", rf_classifier.score(test_df_x, test_df_y))

Training accuracy: 0.7063492063492064
Test accuracy: 0.5986159169550173


In [11]:
from sklearn.inspection import permutation_importance
# Compute permutation importance
perm_importance = permutation_importance(rf_classifier, test_df_x, test_df_y, n_repeats=30, random_state=42)

# Create a DataFrame to display the results
perm_importance_df = pd.DataFrame({
    'feature': test_df_x.columns,
    'importance_mean': perm_importance.importances_mean,
    'importance_std': perm_importance.importances_std
}).sort_values(by='importance_mean', ascending=False)

print(perm_importance_df)

         feature  importance_mean  importance_std
634  gamma_AFF6h         0.070542        0.007563
598    gamma_TP7         0.013057        0.003568
627    gamma_Fpz         0.008512        0.002992
694    gamma_LO2         0.004971        0.001871
625     gamma_Oz         0.004048        0.002475
..           ...              ...             ...
584  gamma_FTT9h        -0.001027        0.000209
693    gamma_LO1        -0.001407        0.000218
697    gamma_IO2        -0.002215        0.000510
695    gamma_IO1        -0.002249        0.000343
620  gamma_POO9h        -0.009942        0.003219

[705 rows x 3 columns]


In [62]:
# Calculate correlation with the target
correlation_with_target = train_df.corr()['migraine'].drop('migraine')

print(correlation_with_target.sort_values(ascending=False))

M2       0.010150
FFT8h    0.006757
F2       0.004213
LO2      0.003175
C2       0.002756
           ...   
AFz     -0.004517
FCC5h   -0.004982
F6      -0.005326
M1      -0.007764
C1      -0.008112
Name: migraine, Length: 141, dtype: float64


In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

# Visual inspection of top features
top_features = feature_importance_df['feature'].head(3)  # Adjust the number of features as needed

for feature in top_features:
    # Scatter plot
    plt.figure(figsize=(10, 6))
    sns.scatterplot(data=df, x=feature, y='migraine', hue='migraine', alpha=0.6)
    plt.title(f'Scatter plot of {feature} vs Migraine')
    plt.show()
    
    # Histogram
    plt.figure(figsize=(10, 6))
    sns.histplot(data=df, x=feature, hue='migraine', multiple='stack', kde=True)
    plt.title(f'Histogram of {feature} for Migraine and Non-Migraine')
    plt.show()

# Pairplot for top features (optional for inspecting interactions)
sns.pairplot(df, vars=top_features, hue='migraine', plot_kws={'alpha': 0.6})
plt.suptitle('Pairplot of Top Features', y=1.02)
plt.show()


In [None]:
from sklearn.feature_selection import RFE
from sklearn.svm import LinearSVC
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import RandomForestClassifier
from sklearn.feature_selection import SelectFromModel
from sklearn.pipeline import Pipeline
import pandas as pd

# df = pd.read_csv('GroupedDF.csv', index_col='index')
# sample_df = df.sample(frac=0.2, random_state=42)
# X = sample_df.drop(columns=['subject', 'migraine'])
# y = sample_df['migraine']
# X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
clf = Pipeline([
  ('feature_selection', SelectFromModel(LinearSVC(penalty="l1"))),
  ('classification', RandomForestClassifier())
])
clf.fit(train_df_x, train_df_y)

In [None]:
y_pred = clf.predict(test_df_x)

# Evaluate the model
accuracy = accuracy_score(test_df_y, y_pred)
report = classification_report(test_df_y, y_pred)

print("Accuracy:", accuracy)
print("\nClassification Report:\n", report)

#### Training

In [3]:
X_train = np.load('PreprocessedEpochData/X_train.npy')
X_val = np.load('PreprocessedEpochData/X_val.npy')
X_test = np.load('PreprocessedEpochData/X_test.npy')

y_train = np.load('PreprocessedEpochData/y_train.npy')
y_val = np.load('PreprocessedEpochData/y_val.npy')
y_test = np.load('PreprocessedEpochData/y_test.npy')
device= 'cuda'

# Set n_channels to 143 and n_times to 512 for this

In [2]:
X_train = np.load('EEGNet/Correct-data/X_train.npy')
X_val = np.load('EEGNet/Correct-data/X_val.npy')
X_test = np.load('EEGNet/Correct-data/X_test.npy')

y_train = np.load('EEGNet/Correct-data/y_train.npy')
y_val = np.load('EEGNet/Correct-data/y_val.npy')
y_test = np.load('EEGNet/Correct-data/y_test.npy')
device= 'cuda'

# Set n_channels to 141 and n_times to 1024 for this

In [48]:
from sklearn.utils.class_weight import compute_class_weight
print('Shape of Data in Formate (EEG-Epochs,Channels,Timepoints) and label file with 1/0')
print(f'Train: {X_train.shape} {y_train.shape}')
print(f'Valid: {X_val.shape} {y_val.shape}')
print(f'Test: {X_test.shape} {y_test.shape} \n')

print('Ratio of labels 1 and 0:')
print(f'Train:{compute_class_weight(class_weight="balanced", classes=np.unique(y_train), y=y_train)}')
print(f'Valid: {compute_class_weight(class_weight="balanced", classes=np.unique(y_val), y=y_val)}')
print(f'Test: {compute_class_weight(class_weight="balanced", classes=np.unique(y_test), y=y_test)} \n')

print(f'Label 1 in Training data: {np.count_nonzero(y_train)}')
print(f'Label 0 in Training data: {len(y_train) - np.count_nonzero(y_train)}')
print(f'Label 1 in Validation data: {np.count_nonzero(y_val)}')
print(f'Label 0 in Validation data: {len(y_val) - np.count_nonzero(y_val)}')
print(f'Label 1 in Test data: {np.count_nonzero(y_test)}')
print(f'Label 0 in Test data: {len(y_test) - np.count_nonzero(y_test)}')

Shape of Data in Formate (EEG-Epochs,Channels,Timepoints) and label file with 1/0
Train: (5544, 141, 1024) (5544,)
Valid: (1375, 141, 1024) (1375,)
Test: (1515, 141, 1024) (1515,) 

Ratio of labels 1 and 0:
Train:[1.02666667 0.97468354]
Valid: [0.87915601 1.15935919]
Test: [0.94451372 1.06241234] 

Label 1 in Training data: 2844
Label 0 in Training data: 2700
Label 1 in Validation data: 593
Label 0 in Validation data: 782
Label 1 in Test data: 713
Label 0 in Test data: 802


In [3]:
# Data Shuffler
n_samples = len(X_train)
index = np.arange(n_samples)
np.random.shuffle(index)

# Shuffle data based on shuffled indices
X_train = X_train[index]
y_train = y_train[index]

# n_samples = len(X_val)
# index = np.arange(n_samples)
# np.random.shuffle(index)

# X_val = X_val[index]
# y_val = y_val[index]

In [4]:
import torch.nn.functional as F
import torch

# Assuming y_train is a numpy array
X_train_tensor = torch.tensor(X_train).float()
X_val_tensor = torch.tensor(X_val).float()
y_train_tensor = torch.tensor(y_train).float()
y_val_tensor = torch.tensor(y_val).float()
X_test_tensor = torch.tensor(X_test).float()
y_test_tensor = torch.tensor(y_test).float()

In [50]:
from sklearn.utils.class_weight import compute_class_weight
train_weight = compute_class_weight(class_weight="balanced", classes=np.unique(y_train), y=y_train)
train_weight = torch.tensor([train_weight[0] , train_weight[1]], dtype=torch.float32)

In [16]:
from skorch.dataset import Dataset as skorchset
from skorch.classifier import NeuralNetBinaryClassifier
from skorch.callbacks import EpochScoring
eeg_model = EEGNetv4(n_chans=141, n_outputs=1, n_times=1024, kernel_length=256, drop_prob=0.4, F1=8, D=4, F2=32)
opt = optim.Adam(eeg_model.parameters(), lr=0.00005, weight_decay=0.0001, eps=1e-8, betas=(0.9, 0.999))
eeg_model.cuda()
n_epochs = 300

# class WeightedBCELoss(nn.BCEWithLogitsLoss):
#     def __init__(self, weight):
#         super(WeightedBCELoss, self).__init__(pos_weight=weight[1]/weight[0])

eeg_net = NeuralNetBinaryClassifier(eeg_model,
                        criterion=nn.BCEWithLogitsLoss,
                        #criterion__weight = train_weight,
                        optimizer=optim.NAdam,
                        #threshold=0.465,
                        optimizer__lr=0.00001,
                        optimizer__weight_decay = 0.1,
                        optimizer__eps = 1e-8,
                        optimizer__betas = (0.9, 0.999),
                        max_epochs=n_epochs,
                        train_split = None,
                        iterator_train__shuffle=True,
                        device='cuda', 
                        batch_size=32,
                        callbacks=[
                            (EpochScoring(scoring='accuracy', name='train_acc', on_train=True, lower_is_better=False)),
                            (EpochScoring(scoring='f1', name='train f1-score', on_train=True, lower_is_better=False)),
                            (EpochScoring(scoring='f1', name='valid f1-score', on_train=False, lower_is_better=False)),
                            ("lr_schedulerPlat", LRScheduler('ReduceLROnPlateau', patience=5, factor=0.1)),
                            #("lr_schedulerStep", LRScheduler('StepLR', step_size=5, gamma=0.8)),
                            #("lr_schedulerExp", LRScheduler('ExponentialLR', gamma=0.9)),
                            ('EarlyStopping', EarlyStopping(patience=100, load_best=True, lower_is_better=True))
                        ]
                        )

val_data = skorchset(X_test_tensor, y_test_tensor)
eeg_net.set_params(train_split = predefined_split(val_data))
eeg_fit = eeg_net.fit(X_train_tensor, y_train_tensor)

  epoch    train f1-score    train_acc    train_loss    valid f1-score    valid_acc    valid_loss      dur
-------  ----------------  -----------  ------------  ----------------  -----------  ------------  -------
      1            [36m0.4811[0m       [32m0.4926[0m        [35m0.7795[0m            [31m0.4282[0m       [94m0.5505[0m        [36m0.7153[0m  22.3643
      2            [36m0.4970[0m       [32m0.5041[0m        [35m0.7413[0m            0.4102       [94m0.5729[0m        [36m0.7087[0m  19.7215
      3            [36m0.5029[0m       [32m0.5079[0m        [35m0.7375[0m            0.4015       [94m0.5828[0m        [36m0.7002[0m  20.0960
      4            [36m0.5173[0m       [32m0.5256[0m        [35m0.7103[0m            0.3831       [94m0.5960[0m        [36m0.6963[0m  19.9307
      5            [36m0.5240[0m       0.5245        [35m0.7051[0m            0.3753       [94m0.6000[0m        [36m0.6925[0m  20.8454
      6            [36m0.

In [17]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, f1_score

y_pred = eeg_net.predict(X_val_tensor)
y_pred_classes = (y_pred > 0.5).astype(int)
print(classification_report(y_val_tensor, y_pred_classes))
print("Test Accuracy:", accuracy_score(y_val_tensor, y_pred_classes))
print('Test F1-Score:', f1_score(y_pred=y_pred_classes, y_true=y_val_tensor))

              precision    recall  f1-score   support

         0.0       0.58      0.90      0.71       782
         1.0       0.54      0.15      0.23       593

    accuracy                           0.58      1375
   macro avg       0.56      0.53      0.47      1375
weighted avg       0.56      0.58      0.50      1375

Test Accuracy: 0.5774545454545454
Test F1-Score: 0.23249669749009247


In [18]:


# Compute confusion matrix
cm = confusion_matrix(y_val, y_pred_classes)

# Plot confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Predicted Negative', 'Predicted Positive'], yticklabels=['Actual Negative', 'Actual Positive'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [52]:
deep_model = Deep4Net(n_chans=143, n_outputs=1, n_times=512, drop_prob=0.25, add_log_softmax=False)
deep_model.cuda()
n_epochs = 300
deep_net = NeuralNetBinaryClassifier(deep_model,
                        optimizer=optim.Adamax,
                        threshold=0.465,
                        optimizer__lr=0.00002,
                        #optimizer__weight_decay = 1e-6,
                        max_epochs=n_epochs,
                        train_split = None,
                        iterator_train__shuffle=True,
                        device='cuda', 
                        batch_size=32,
                        callbacks=[
                            (EpochScoring(scoring='accuracy', name='train_acc', on_train=True, lower_is_better=False)),
                            (EpochScoring(scoring='f1', name='f1-score', on_train=True, lower_is_better=False)),
                            #("lr_scheduler", LRScheduler('CosineAnnealingLR', T_max=n_epochs - 1)),
                            ('EarlyStopping', EarlyStopping(patience=50, load_best=True))
                        ]
                        )

val_data = skorchset(X_val_tensor, y_val_tensor)
deep_net.set_params(train_split = predefined_split(val_data))
deep_fit = deep_net.fit(X_train_tensor, y_train_tensor)

  epoch    f1-score    train_acc    train_loss    valid_acc    valid_loss     dur
-------  ----------  -----------  ------------  -----------  ------------  ------
      1      [36m0.6744[0m       [32m0.6124[0m        [35m0.6659[0m       [31m0.5822[0m        [94m1.3160[0m  2.3150
      2      0.6686       [32m0.6733[0m        [35m0.5895[0m       0.5624        1.3218  1.7782
      3      [36m0.6820[0m       [32m0.7008[0m        [35m0.5559[0m       [31m0.5894[0m        1.5361  1.6684
      4      [36m0.7017[0m       [32m0.7249[0m        [35m0.5314[0m       0.5894        1.9916  1.5329
      5      [36m0.7214[0m       [32m0.7381[0m        [35m0.5218[0m       [31m0.5914[0m        1.6260  1.5224
      6      [36m0.7345[0m       [32m0.7527[0m        [35m0.4970[0m       0.5769        1.4716  1.5758
      7      [36m0.7422[0m       [32m0.7594[0m        [35m0.4880[0m       0.5881        2.6652  1.5984
      8      [36m0.7567[0m       [32m0.7695

In [43]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, f1_score
X_test_tensor = torch.tensor(X_test).float()
y_test_tensor = torch.tensor(y_test).float()
y_pred = deep_net.predict(X_test_tensor)
print("Test Accuracy:", accuracy_score(y_test_tensor, y_pred))
print('Test F1-Score:', f1_score(y_pred=y_pred, y_true=y_test))

# Compute confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Plot confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Predicted Negative', 'Predicted Positive'], yticklabels=['Actual Negative', 'Actual Positive'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

Test Accuracy: 0.4930909090909091
Test F1-Score: 0.4967509025270758


In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, f1_score

# Compute confusion matrix
cm = confusion_matrix(y_test, y_pred)

# Plot confusion matrix
plt.figure(figsize=(10, 7))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=['Predicted Negative', 'Predicted Positive'], yticklabels=['Actual Negative', 'Actual Positive'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title('Confusion Matrix')
plt.show()

In [None]:
deep_model = Deep4Net(n_chans=144, n_classes=2, n_times=1024, drop_prob=0.25)

deep_net = EEGClassifier(deep_model,
                        optimizer = optim.Adam,
                        optimizer__lr = 0.01,
                        optimizer__weight_decay = 0.0005,
                        train_split=ValidSplit(0.2), 
                        max_epochs=n_epochs, 
                        device='cuda', 
                        batch_size=32,
                        callbacks=[
                            "accuracy", ("lr_scheduler", LRScheduler('CosineAnnealingLR', T_max=n_epochs - 1)),
                        ],
                        classes=[0,1]
                        )
deep_net.fit(X=X_train, y=y_train)

In [None]:
grid_model = partial(EEGNetv4, n_chans=141, n_outputs=2, n_times=1024, kernel_length=256, drop_prob=0.25)

grid_net = EEGClassifier(grid_model,
                    optimizer = optim.Adam,
                    optimizer__lr = [],
                    train_split= ValidSplit(0.2),
                    device='cuda', 
                    batch_size=64,
                    callbacks=[
                        "accuracy", ("lr_scheduler", LRScheduler('CosineAnnealingLR', T_max=30 - 1)),
                    ],
                    classes=[0,1]
                    )

# train_X = SliceDataset(train_dataset, idx=0)
# train_y = array([y for y in SliceDataset(train_dataset, idx=1)])
cv = StratifiedKFold(n_splits=2, shuffle=True, random_state=42)

learning_rates = [0.001, 0.0001, 0.00005]
drop_probs = [0.3, 0.5]

fit_params = {'epochs': 30}
param_grid = {
    'optimizer__lr': learning_rates,
    'module__drop_prob': drop_probs
}

# By setting n_jobs=-1, grid search is performed
# with all the processors, in this case the output of the training
# process is not printed sequentially
search = GridSearchCV(
    estimator=grid_net,
    param_grid=param_grid,
    cv=cv,
    return_train_score=True,
    scoring='accuracy',
    refit=True,
    verbose=3,
    error_score='raise',
    n_jobs=1,
)

search.fit(X_train, y_train, **fit_params)

# Extract the results into a DataFrame
search_results = pd.DataFrame(search.cv_results_)

In [None]:
# Create a pivot table for the heatmap
pivot_table = search_results.pivot(index='param_optimizer__lr',
                                   columns='param_module__drop_prob',
                                   values='mean_test_score')
# Create the heatmap
fig, ax = plt.subplots()
sns.heatmap(pivot_table, annot=True, fmt=".3f",
            cmap="YlGnBu", cbar=True)
plt.title('Grid Search Mean Test Scores')
plt.ylabel('Learning Rate')
plt.xlabel('Dropout Probability')
plt.tight_layout()
plt.show()

In [None]:
best_run = search_results[search_results['rank_test_score'] == 1].squeeze()
print(
    f"Best hyperparameters were {best_run['params']} which gave a validation "
    f"accuracy of {best_run['mean_test_score'] * 100:.2f}% (training "
    f"accuracy of {best_run['mean_train_score'] * 100:.2f}%).")

eval_X = SliceDataset(test_dataset, idx=0)
eval_y = SliceDataset(test_dataset, idx=1)
score = search.score(eval_X, eval_y)
print(f"Test accuracy is {score * 100:.2f}%.")