# Import packages

In [None]:
import numpy as np
import random
import time
from sklearn.metrics import confusion_matrix, roc_curve, auc
from sklearn.model_selection import train_test_split

# create scalogram images
import pywt
from skimage.transform import resize

import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader, TensorDataset, random_split, SubsetRandomSampler, ConcatDataset, WeightedRandomSampler
import torch.optim as optim
from torchsummary import summary
from torch.nn import functional as F
import torchvision
from torchvision import datasets,transforms
from torch.cuda.amp import GradScaler

# LSTM model

In [None]:
# LSTM model architecture
class LSTM_model(nn.Module):
    def __init__(self):
        super(LSTM_model, self).__init__()
        self.lstm1 = nn.LSTM(input_size=20*64, hidden_size=32, num_layers=2, batch_first=True)  
        self.dropout1 = nn.Dropout(p=0.1)
        self.lstm2 = nn.LSTM(input_size=32, hidden_size=32, num_layers=1, batch_first=True)
        self.dropout2 = nn.Dropout(p=0.3)
        self.fc = nn.Linear(in_features=32, out_features=1)

    def forward(self, x):
        h_1 = nn.Parameter(torch.zeros(2, x.size(0), 32)).to(x.device)
        c_1 = nn.Parameter(torch.zeros(2, x.size(0), 32)).to(x.device)
        out_1, _ = self.lstm1(x, (h_1, c_1))
        out_1 = self.dropout1(out_1)

        h_2 = nn.Parameter(torch.zeros(1, x.size(0), 32)).to(x.device)
        c_2 = nn.Parameter(torch.zeros(1, x.size(0), 32)).to(x.device)
        out_2, _ = self.lstm2(out_1, (h_2, c_2))
        out_2 = self.dropout2(out_2)

        out = self.fc(out_2[:, -1, :])
        out = torch.sigmoid(out)
        return out

In [None]:
# initialize model
torch.manual_seed(42)
np.random.seed(42)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("device type: ", device)

model = LSTM_model()
print(model) # model summary

model.to(device)
criterion = nn.BCELoss()
batch_size = 32
num_epochs = 100
optimizer = optim.Adam(model.parameters(), lr=0.001)
dataset = MyDataset() # call custom dataset function

# splitting (80/20) instead of 5-fold cross-validation to simplify published code
train_test_idx = list(range(dataset.num_samples))
np.random.shuffle(train_test_idx)
split_index = int(0.80 * len(train_test_idx))
train_idx = train_test_idx[:split_index]
test_idx = train_test_idx[split_index:]

train_sampler = SubsetRandomSampler(train_idx)
train_loader = DataLoader(dataset, batch_size = batch_size, sampler = train_sampler)

test_sampler = SubsetRandomSampler(test_idx) # by default: without replacement
test_loader = DataLoader(dataset, batch_size = batch_size, sampler = test_sampler, shuffle = False)

In [None]:
# training and testing loops
for epoch in range(num_epochs):
    
    train_loss = 0.0
    train_correct = 0

    model.train()

    for data, labels in train_loader:
        data, labels = data.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(data)
        loss = criterion(outputs.view(-1), labels)
        loss.backward()
        optimizer.step()

        # save train loss for this batch
        train_loss += loss.item() * data.size(0)
        _, predicted = torch.max(outputs.data, 1)
        train_correct += (predicted == labels).sum(dim=0).item()

    # train loss and acc
    train_loss = train_loss / len(train_loader.sampler)
    train_accuracy = train_correct / len(train_loader.sampler) * 100

    # testing loop
    test_loss = 0.0
    test_correct  = 0

    model.eval()

    for data, labels in test_loader:
        data, labels = data.to(device), labels.to(device)
        outputs = model(data)
        loss = criterion(outputs.view(-1), labels)

        # save test loss for this batch
        test_loss += loss.item() * data.size(0)
        _, predicted = torch.max(outputs.data, 1)
        test_correct += (predicted == labels).sum(dim=0).item()

    # test set accuracy
    test_loss = test_loss / len(test_loader.sampler)
    test_accuracy = test_correct / len(test_loader.sampler) * 100

    print(f"Epoch:{epoch+1}/{num_epochs} \t AVG Training Loss:{train_loss:.3f} \t AVG Test Loss:{test_loss:.3f} \t AVG Training Acc {train_accuracy:.2f} % \t AVG Test Acc {test_accuracy:.2f} %")


# 2D CNN

In [None]:
# create 2D scalogram images
def create_2D_images(eeg_signal, window_size, window_step):   
    '''
    Create 2D scalogram images.
    
    Parameters include EEG signal, the size of the window for creating images (30 seconds in this paper)
    , the size of the window step (15 seconds in this study to apply 50% overlap).
    '''
        
    eeg_signal = np.asarray(eeg_signal).T
    
    for i in range(0, int(eeg_signal.shape[1])-window_size, window_step):     
        
        chuncked_eeg_signal = eeg_signal[: , i : i + window_size]

        for j in range(1,len(common_channels)+1): # save channela independently
            
            chuncked_eeg_signal2 = chuncked_eeg_signal[j-1:j ,:]
            coefficients, frequencies = pywt.cwt(dataset_array2, scales, wavelet)
            scalogram = np.abs(coefficients)
            scalogram = resize(scalogram, (scalogram.shape[0]*0.25, scalogram.shape[1]*0.0042), preserve_range=True)
            np.save("2D_images_folder_address", scalogram)

In [None]:
# 2D CNN model architecture
class ConvNet(nn.Module):
    def __init__(self):
        
        super().__init__()

        self.conv1 = nn.Conv2d(in_channels=1, out_channels=16, kernel_size=3, stride=3, padding=1)
        self.bn1   = nn.BatchNorm2d(16)
        self.pool1 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.relu1 = nn.ReLU()
        
        self.conv2 = nn.Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=3, padding=1)
        self.bn2   = nn.BatchNorm2d(32)
        self.pool2 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.relu2 = nn.ReLU()
        
        self.conv3 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=3, padding=1)
        self.bn3   = nn.BatchNorm2d(64)
        self.pool3 = nn.MaxPool2d(kernel_size=(2, 2), stride=(2, 2))
        self.relu3 = nn.ReLU()
        self.drop3 = nn.Dropout(p=0.1)
        
        self.fc1 = nn.Linear(in_features=int(64*4*4), out_features=128)
        self.relu7 = nn.ReLU()
        self.drop7 = nn.Dropout(p=0.1)
        
        self.fc3 = nn.Linear(in_features=128, out_features=2)
        self.sigmoid = nn.Sigmoid()
        
    def forward(self, x):
        
        x = x.unsqueeze(1)
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu3(x)
        x = self.drop3(x)
    
        x = x.reshape(-1, int(64*4*4))
        x = self.fc1(x)
        x = self.relu7(x)
        x = self.drop7(x)
        
        x = self.fc3(x)
        x = self.sigmoid(x)
  
        return x

In [None]:
# initialize model
torch.manual_seed(42)
np.random.seed(42)

perf_metrics_log = {"accuracy": [], "sensitivity": [], "specificity": [], "cm": []}
roc_log = {"fpr": [],"tpr": [],"thresholds": [],"roc_auc": []}
train_log = []
test_log = []

# load indexs for customize data loader
images_idx = np.load('np_array_including_image_address_and_label')

# splitting (80/20) instead of 5-fold cross-validation to simplify published code
np.random.shuffle(images_idx)
split_index = int(0.8 * images_idx.shape[0])
train_idx = images_idx[:split_index]
test_idx = images_idx[split_index:]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu") 
print("device type: ", device)
model = ConvNet()
model.to(device)

summary(model, input_size=(96, 96)) # print the model summary

criterion = nn.BCELoss()
batch_size = 32
num_epochs = 100
scaler = GradScaler() # mixed precision training
optimizer = optim.Adam(model.parameters(), lr=0.001)

transform = transforms.Compose([transforms.ToTensor()])
dataset = MyDataset(csv_file = csv_file, root_dir = root_dir, transform = transform)

train_sampler = SubsetRandomSampler(train_idx)
train_loader = DataLoader(dataset, batch_size = batch_size, sampler = train_sampler)
test_sampler = SubsetRandomSampler(test_idx)
test_loader = DataLoader(dataset, batch_size = batch_size, sampler = test_sampler, shuffle = False)

In [None]:
# training and testing loops

# local performance metrics log
train_local_log = {"loss": [], "accuracy": []}
test_local_log = {"loss": [], "accuracy": []}

# variables to early stopping
best_test_loss = float('inf')
epochs_since_improvement = 0

for epoch in range(num_epochs):

    print(f"\nEpoch {epoch+1} is starting... please wait!")
    start_time = time.time()

    # train the model
    train_loss = 0.0
    train_correct = 0

    model.train()
    for batch_idx, (inputs, targets) in enumerate(train_loader):

        inputs, targets = inputs.to(device, non_blocking=True), targets.to(device, non_blocking=True)
        target_onehot = F.one_hot(targets, num_classes=2).float()
  
        if torch.backends.cudnn.enabled:
            torch.backends.cudnn.benchmark = True

        output = model(inputs)
        loss = criterion(output,target_onehot)

        model.zero_grad()
        loss.backward()

        optimizer.step()

        # save train loss for this batch  
        train_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(output.data, 1)            
        train_correct += (predicted == targets).sum(dim=0).item() 

    # save average train loss and acc
    train_loss = train_loss / len(train_loader.sampler)
    train_accuracy = train_correct / len(train_loader.sampler) * 100
    train_local_log['loss'].append(train_loss)
    train_local_log['accuracy'].append(train_accuracy)

    # test the model
    test_loss = 0.0
    test_correct = 0
    val_preds = []
    val_labels = []

    model.eval()
    for inputs, targets in test_loader:

        inputs,targets = inputs.to(device),targets.to(device)
        target_onehot = F.one_hot(targets, num_classes=2).float()
        output = model(inputs)
        loss = criterion(output,target_onehot) 
        # save test loss for this batch  
        test_loss += loss.item() * inputs.size(0)
        _, predicted = torch.max(output.data, 1)
        val_preds.extend(predicted.cpu().numpy())
        val_labels.extend(targets.cpu().numpy())
        test_correct += (predicted == targets).sum(dim=0).item()

    # save average test loss and acc
    test_loss = test_loss / len(test_loader.sampler)

    # early stopping
    if test_loss < best_test_loss:
        best_test_loss = test_loss
        epochs_since_improvement = 0
    else:
        epochs_since_improvement += 1

    if epochs_since_improvement >= 10:
        print(f'Validation loss has not improved for {epochs_since_improvement} epochs. Stopping training...')
        break
    else:
        pass

    test_accuracy = test_correct / len(test_loader.sampler) * 100
    test_local_log['loss'].append(test_loss)
    test_local_log['accuracy'].append(test_accuracy)
    print("Epoch:{}/{} \t AVG Training Loss:{:.3f} \t AVG Test Loss:{:.3f} \t AVG Training Acc {:.2f} % \t AVG Test Acc {:.2f} %".format(epoch+1, num_epochs, train_loss, test_loss, train_accuracy, test_accuracy))

train_log.append(train_local_log)
test_log.append(test_local_log)

# performance metrics log
val_labels = np.array(val_labels)
val_preds = np.array(val_preds)

cm = confusion_matrix(val_labels, val_preds)
perf_metrics_log["cm"].append(cm)

tn, fp, fn, tp = confusion_matrix(val_labels, val_preds).ravel()
accuracy = (tp + tn) / (tp + tn + fp + fn)
sensitivity = tp / (tp + fn)
specificity = tn / (tn + fp)
perf_metrics_log["accuracy"].append(accuracy*100)
perf_metrics_log["sensitivity"].append(sensitivity*100)
perf_metrics_log["specificity"].append(specificity*100)
print(f"accuracy : {accuracy*100:.2f} %")
print(f"sensitivity : {sensitivity*100:.2f} %")
print(f"specificity : {specificity*100:.2f} %")

# ROC log
fpr, tpr, thresholds = roc_curve(val_labels, val_preds)
roc_auc = auc(fpr, tpr)
roc_log["fpr"].append(fpr)
roc_log["tpr"].append(tpr)
roc_log["thresholds"].append(thresholds)
roc_log["roc_auc"].append(roc_auc)

# 3D CNN model

In [None]:
# create 3D scalogram images
def create_3D_images(eeg_signal, window_size, window_step):   
    '''
    Create 3D scalogram images.
    
    Parameters include EEG signal, the size of the window for creating images (30 seconds in this paper)
    , the size of the window step (15 seconds in this study to apply 50% overlap).
    '''
        
    eeg_signal = np.asarray(eeg_signal).T
    
    for i in range(0, int(eeg_signal.shape[1])-window_size, window_step):     
        
        chuncked_eeg_signal = eeg_signal[: , i : i + window_size]

        # save channels dependently
        chuncked_eeg_signal = chuncked_eeg_signal.T
        coefficients, frequencies = pywt.cwt(chuncked_eeg_signal, scales, wavelet)
        scalogram = np.abs(coefficients)
        scalogram = resize(scalogram, (scalogram.shape[0]*0.755, scalogram.shape[1]*0.0249), preserve_range=True)
        scalogram = scalogram.transpose(2, 0, 1)
        np.save("3D_images_folder_address", scalogram)

### We adapted the existing 2D CNN architecture exclusively regarding kernel dimensions, allowing for a transparent comparison.

In [None]:
# 3D CNN model architecture
class ConvNet(nn.Module):

    def __init__(self):
        
        super().__init__()

        self.conv1 = nn.Conv3d(in_channels=1, out_channels=16, kernel_size=3 , stride=3, padding=1)
        self.bn1   = nn.BatchNorm3d(16)
        self.pool1 = nn.MaxPool3d(kernel_size=2, stride=2)
        self.relu1 = nn.ReLU()
        
        self.conv2 = nn.Conv3d(in_channels=16, out_channels=32, kernel_size=3, stride=3, padding=1)
        self.bn2   = nn.BatchNorm3d(32)
        self.pool2 = nn.MaxPool3d(kernel_size=2, stride=2)
        self.relu2 = nn.ReLU()
        
        self.conv3 = nn.Conv3d(in_channels=32, out_channels=64, kernel_size=3, stride=3, padding=1)
        self.bn3   = nn.BatchNorm3d(64)
        self.pool3 = nn.MaxPool3d(kernel_size=2, stride=2)
        self.relu3 = nn.ReLU()
        self.drop3 = nn.Dropout(p=0.1)
        
        self.fc1 = nn.Linear(in_features=int(64*1*4*4), out_features=128)
        self.relu7 = nn.ReLU()
        self.drop7 = nn.Dropout(p=0.1)
        
        self.fc3 = nn.Linear(in_features=128, out_features=2)
        self.sigmoid = nn.Sigmoid()

        
    def forward(self, x):
        
        x = x.unsqueeze(1)
        
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.relu3(x)
        x = self.drop3(x)
    
        x = x.reshape(-1, int(64*1*4*4))
        x = self.fc1(x)
        x = self.relu7(x)
        x = self.drop7(x)
        
        x = self.fc3(x)
        x = self.sigmoid(x)
  
        return x

### Rest of the code is similar to 2D CNN model