In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import glob
import seaborn as sns

In [3]:
from sktime.transformations.panel.rocket import MiniRocket
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
import xgboost as xgb
from sklearn.metrics import f1_score, precision_score, recall_score, classification_report, accuracy_score, confusion_matrix, ConfusionMatrixDisplay
from sklearn.model_selection import LeaveOneOut, RandomizedSearchCV, LeaveOneGroupOut
from sklearn.preprocessing import LabelEncoder
from imblearn.over_sampling import SMOTE

## Transform data on Minirocket

In [3]:
file_paths = glob.glob('eeg_label/*_eeg_label.csv')

data_list = []
labels_list = []
patient_ids = []

for file_path in file_paths:
    eeg_data = pd.read_csv(file_path)
    patient_id = file_path.split('/')[-1]
    
    # Segment the data into non-overlapping 30 second windows
    segment_size = 128 * 30
    num_segments = len(eeg_data) // segment_size
    
    for i in range(num_segments):
        start_idx = i * segment_size
        end_idx = start_idx + segment_size
        segment = eeg_data['EEG'].iloc[start_idx:end_idx].values
        label = eeg_data['Label'].iloc[start_idx:end_idx].mode()[0]
        
        data_list.append(segment)
        labels_list.append(label)
        patient_ids.append(patient_id)  # Repeat patient ID for each segment

# Convert lists to numpy arrays
X = np.array(data_list)
y = np.array(labels_list)
patient_ids = np.array(patient_ids)

In [20]:
# Reshape to perform minirocket
X_reshaped = X.reshape(X.shape[0], 1, X.shape[1])

In [5]:
# initialise the minirocket transformer
minirocket = MiniRocket(random_state=123)
X_transformed = minirocket.fit_transform(X_reshaped) # transform the entire dataset using minirocket

# scale the transformed data
scaler = StandardScaler(with_mean=False)
X_scaled = scaler.fit_transform(X_transformed)

## Class pair

In [11]:
import itertools
import numpy as np
import pandas as pd
from xgboost import XGBClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import f1_score, classification_report
from sklearn.model_selection import train_test_split

# Assuming X_scaled and y_encoded are already prepared (full dataset after MiniRocket transformation and scaling)

# Define the classes
classes = np.unique(y_encoded)

# Store results for each pair
pair_results = {}

# Loop through all pairs of classes
for class1, class2 in itertools.combinations(classes, 2):
    # Filter the data for the two classes
    mask = (y_encoded == class1) | (y_encoded == class2)
    X_pair = X_scaled[mask]
    y_pair = y_encoded[mask]

    # Encode the two classes as binary
    y_pair = LabelEncoder().fit_transform(y_pair)
    
    # Split the data into training and testing sets
    X_train, X_test, y_train, y_test = train_test_split(X_pair, y_pair, test_size=0.3, random_state=2, stratify=y_pair)
    
    # Initialize the XGBoost classifier
    xgb_classifier = XGBClassifier(random_state=2)
    
    # Train the classifier
    xgb_classifier.fit(X_train, y_train)
    
    # Predict on the test set
    y_pred = xgb_classifier.predict(X_test)
    
    # Calculate F1 score
    f1 = f1_score(y_test, y_pred)
    
    # Store the results
    pair_results[(class1, class2)] = f1
    
    # Print results for each pair
    print(f"Class pair ({class1}, {class2}): F1 Score = {f1:.2f}")
    print(classification_report(y_test, y_pred))

Class pair (0, 1): F1 Score = 0.94
              precision    recall  f1-score   support

           0       0.85      0.47      0.60        47
           1       0.90      0.98      0.94       233

    accuracy                           0.90       280
   macro avg       0.87      0.73      0.77       280
weighted avg       0.89      0.90      0.88       280

Class pair (0, 2): F1 Score = 0.83
              precision    recall  f1-score   support

           0       0.78      0.66      0.71        47
           1       0.79      0.87      0.83        69

    accuracy                           0.78       116
   macro avg       0.78      0.76      0.77       116
weighted avg       0.78      0.78      0.78       116

Class pair (0, 3): F1 Score = 0.89
              precision    recall  f1-score   support

           0       0.85      0.62      0.72        47
           1       0.84      0.95      0.89        98

    accuracy                           0.84       145
   macro avg       0.85

In [12]:
most_separable_pair = max(pair_results, key=pair_results.get)
print(f"\nMost separable pair: {most_separable_pair} with F1 Score = {pair_results[most_separable_pair]:.2f}")


Most separable pair: (0, 1) with F1 Score = 0.94


So most separable pair is AW and LA. It's hard to distinguish between LA to DA and LA to RE.

## ConvTran

In [4]:
import torch
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim
import torch.nn as nn

In [5]:
# Load data
file_paths = glob.glob('eeg_label/*_eeg_label.csv')

data_list = []
labels_list = []
patient_ids = []

for file_path in file_paths:
    eeg_data = pd.read_csv(file_path)
    patient_id = file_path.split('/')[-1]
    
    # Segment the data into non-overlapping 30-second windows
    segment_size = 128 * 30
    num_segments = len(eeg_data) // segment_size
    
    for i in range(num_segments):
        start_idx = i * segment_size
        end_idx = start_idx + segment_size
        segment = eeg_data['EEG'].iloc[start_idx:end_idx].values
        label = eeg_data['Label'].iloc[start_idx:end_idx].mode()[0]
        
        data_list.append(segment)
        labels_list.append(label)
        patient_ids.append(patient_id)  # Repeat patient ID for each segment

# Encode labels
label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(labels_list)

# Convert lists to numpy arrays and then to PyTorch tensors
X = torch.tensor(np.array(data_list), dtype=torch.float32)  # Shape: (num_samples, sequence_length)
y = torch.tensor(y_encoded, dtype=torch.long)  # Shape: (num_samples,)
patient_ids = np.array(patient_ids)  # Keep patient IDs as numpy array for easier indexing

# Reshape X to include a channel dimension
X = X.unsqueeze(1)  # Shape: (num_samples, 1, sequence_length)

In [6]:
import numpy as np
from torch import nn
from ConvTran.Models.AbsolutePositionalEncoding import tAPE, AbsolutePositionalEncoding, LearnablePositionalEncoding
from ConvTran.Models.Attention import Attention, Attention_Rel_Scl, Attention_Rel_Vec


def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)


class Permute(nn.Module):
    def forward(self, x):
        return x.permute(1, 0, 2)


def model_factory(config):
    if config['Net_Type'][0] == 'T':
        model = Transformer(config, num_classes=config['num_labels'])
    elif config['Net_Type'][0] == 'CC-T':
        model = CasualConvTran(config, num_classes=config['num_labels'])
    else:
        model = ConvTran(config, num_classes=config['num_labels'])
    return model


class Transformer(nn.Module):
    def __init__(self, config, num_classes):
        super().__init__()
        # Parameters Initialization -----------------------------------------------
        channel_size, seq_len = config['Data_shape'][1], config['Data_shape'][2]
        emb_size = config['emb_size']
        num_heads = config['num_heads']
        dim_ff = config['dim_ff']
        self.Fix_pos_encode = config['Fix_pos_encode']
        self.Rel_pos_encode = config['Rel_pos_encode']
        # Embedding Layer -----------------------------------------------------------
        self.embed_layer = nn.Sequential(
            nn.Linear(channel_size, emb_size),
            nn.LayerNorm(emb_size, eps=1e-5)
        )

        self.Fix_Position = tAPE(emb_size, dropout=config['dropout'], max_len=seq_len)

        self.LayerNorm1 = nn.LayerNorm(emb_size, eps=1e-5)
        self.LayerNorm2 = nn.LayerNorm(emb_size, eps=1e-5)
        if self.Rel_pos_encode == 'Scalar':
            self.attention_layer = Attention_Rel_Scl(emb_size, num_heads, seq_len, config['dropout'])
        elif self.Rel_pos_encode == 'Vector':
            self.attention_layer = Attention_Rel_Vec(emb_size, num_heads, seq_len, config['dropout'])
        else:
            self.attention_layer = Attention(emb_size, num_heads, config['dropout'])

        self.FeedForward = nn.Sequential(
            nn.Linear(emb_size, dim_ff),
            nn.ReLU(),
            nn.Dropout(config['dropout']),
            nn.Linear(dim_ff, emb_size),
            nn.Dropout(config['dropout']))

        self.gap = nn.AdaptiveAvgPool1d(1)
        self.flatten = nn.Flatten()
        self.out = nn.Linear(emb_size, num_classes)

    def forward(self, x):
        x_src = self.embed_layer(x.permute(0, 2, 1))
        if self.Fix_pos_encode != 'None':
            x_src = self.Fix_Position(x_src)
        att = x_src + self.attention_layer(x_src)
        att = self.LayerNorm1(att)
        out = att + self.FeedForward(att)
        out = self.LayerNorm2(out)

        out = out.permute(0, 2, 1)
        out = self.gap(out)
        out = self.flatten(out)
        out = self.out(out)
        # out = out.permute(1, 0, 2)
        # out = self.out(out[-1])
        return out


class ConvTran(nn.Module):
    def __init__(self, config, num_classes):
        super().__init__()
        # Parameters Initialization -----------------------------------------------
        channel_size, seq_len = config['Data_shape'][1], config['Data_shape'][2]
        emb_size = config['emb_size']
        num_heads = config['num_heads']
        dim_ff = config['dim_ff']
        self.Fix_pos_encode = config['Fix_pos_encode']
        self.Rel_pos_encode = config['Rel_pos_encode']
        # Embedding Layer -----------------------------------------------------------
        self.embed_layer = nn.Sequential(nn.Conv2d(1, emb_size*4, kernel_size=[1, 8], padding='same'),
                                         nn.BatchNorm2d(emb_size*4),
                                         nn.GELU())

        self.embed_layer2 = nn.Sequential(nn.Conv2d(emb_size*4, emb_size, kernel_size=[channel_size, 1], padding='valid'),
                                          nn.BatchNorm2d(emb_size),
                                          nn.GELU())

        if self.Fix_pos_encode == 'tAPE':
            self.Fix_Position = tAPE(emb_size, dropout=config['dropout'], max_len=seq_len)
        elif self.Fix_pos_encode == 'Sin':
            self.Fix_Position = AbsolutePositionalEncoding(emb_size, dropout=config['dropout'], max_len=seq_len)
        elif config['Fix_pos_encode'] == 'Learn':
            self.Fix_Position = LearnablePositionalEncoding(emb_size, dropout=config['dropout'], max_len=seq_len)

        if self.Rel_pos_encode == 'eRPE':
            self.attention_layer = Attention_Rel_Scl(emb_size, num_heads, seq_len, config['dropout'])
        elif self.Rel_pos_encode == 'Vector':
            self.attention_layer = Attention_Rel_Vec(emb_size, num_heads, seq_len, config['dropout'])
        else:
            self.attention_layer = Attention(emb_size, num_heads, config['dropout'])

        self.LayerNorm = nn.LayerNorm(emb_size, eps=1e-5)
        self.LayerNorm2 = nn.LayerNorm(emb_size, eps=1e-5)

        self.FeedForward = nn.Sequential(
            nn.Linear(emb_size, dim_ff),
            nn.ReLU(),
            nn.Dropout(config['dropout']),
            nn.Linear(dim_ff, emb_size),
            nn.Dropout(config['dropout']))

        self.gap = nn.AdaptiveAvgPool1d(1)
        self.flatten = nn.Flatten()
        self.out = nn.Linear(emb_size, num_classes)

    def forward(self, x):
        x = x.unsqueeze(1)
        x_src = self.embed_layer(x)
        x_src = self.embed_layer2(x_src).squeeze(2)
        x_src = x_src.permute(0, 2, 1)
        if self.Fix_pos_encode != 'None':
            x_src_pos = self.Fix_Position(x_src)
            att = x_src + self.attention_layer(x_src_pos)
        else:
            att = x_src + self.attention_layer(x_src)
        att = self.LayerNorm(att)
        out = att + self.FeedForward(att)
        out = self.LayerNorm2(out)
        out = out.permute(0, 2, 1)
        out = self.gap(out)
        out = self.flatten(out)
        out = self.out(out)
        return out


class CasualConvTran(nn.Module):
    def __init__(self, config, num_classes):
        super().__init__()
        # Parameters Initialization -----------------------------------------------
        channel_size, seq_len = config['Data_shape'][1], config['Data_shape'][2]
        emb_size = config['emb_size']
        num_heads = config['num_heads']
        dim_ff = config['dim_ff']
        self.Fix_pos_encode = config['Fix_pos_encode']
        self.Rel_pos_encode = config['Rel_pos_encode']
        # Embedding Layer -----------------------------------------------------------
        self.causal_Conv1 = nn.Sequential(CausalConv1d(channel_size, emb_size, kernel_size=8, stride=2, dilation=1),
                                          nn.BatchNorm1d(emb_size), nn.GELU())

        self.causal_Conv2 = nn.Sequential(CausalConv1d(emb_size, emb_size, kernel_size=5, stride=2, dilation=2),
                                          nn.BatchNorm1d(emb_size), nn.GELU())

        self.causal_Conv3 = nn.Sequential(CausalConv1d(emb_size, emb_size, kernel_size=3, stride=2, dilation=2),
                                          nn.BatchNorm1d(emb_size), nn.GELU())

        if self.Fix_pos_encode == 'tAPE':
            self.Fix_Position = tAPE(emb_size, dropout=config['dropout'], max_len=seq_len)
        elif self.Fix_pos_encode == 'Sin':
            self.Fix_Position = tAPE(emb_size, dropout=config['dropout'], max_len=seq_len)
        elif config['Fix_pos_encode'] == 'Learn':
            self.Fix_Position = LearnablePositionalEncoding(emb_size, dropout=config['dropout'], max_len=seq_len)

        if self.Rel_pos_encode == 'eRPE':
            self.attention_layer = Attention_Rel_Scl(emb_size, num_heads, seq_len, config['dropout'])
        elif self.Rel_pos_encode == 'Vector':
            self.attention_layer = Attention_Rel_Vec(emb_size, num_heads, seq_len, config['dropout'])
        else:
            self.attention_layer = Attention(emb_size, num_heads, config['dropout'])

        self.LayerNorm = nn.LayerNorm(emb_size, eps=1e-5)
        self.LayerNorm2 = nn.LayerNorm(emb_size, eps=1e-5)

        self.FeedForward = nn.Sequential(
            nn.Linear(emb_size, dim_ff),
            nn.ReLU(),
            nn.Dropout(config['dropout']),
            nn.Linear(dim_ff, emb_size),
            nn.Dropout(config['dropout']))

        self.gap = nn.AdaptiveAvgPool1d(1)
        self.flatten = nn.Flatten()
        self.out = nn.Linear(emb_size, num_classes)

    def forward(self, x):
        x = x.unsqueeze(1)
        x_src = self.embed_layer(x)
        x_src = self.embed_layer2(x_src).squeeze(2)
        x_src = x_src.permute(0, 2, 1)
        if self.Fix_pos_encode != 'None':
            x_src_pos = self.Fix_Position(x_src)
            att = x_src + self.attention_layer(x_src_pos)
        else:
            att = x_src + self.attention_layer(x_src)
        att = self.LayerNorm(att)
        out = att + self.FeedForward(att)
        out = self.LayerNorm2(out)
        out = out.permute(0, 2, 1)
        out = self.gap(out)
        out = self.flatten(out)
        out = self.out(out)
        return out


class CausalConv1d(nn.Conv1d):
    def __init__(self,
                 in_channels,
                 out_channels,
                 kernel_size,
                 stride=1,
                 dilation=1,
                 groups=1,
                 bias=True):
        super(CausalConv1d, self).__init__(
            in_channels,
            out_channels,
            kernel_size=kernel_size,
            stride=stride,
            padding=0,
            dilation=dilation,
            groups=groups,
            bias=bias)

        self.__padding = (kernel_size - 1) * dilation

    def forward(self, x):
        return super(CausalConv1d, self).forward(nn.functional.pad(x, (self.__padding, 0)))

In [9]:
from sklearn.model_selection import LeaveOneGroupOut
import torch.optim as optim
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# Define the number of epochs
num_epochs = 10
batch_size = 8

# Model configuration
config = {
    'Data_shape': (X.shape[0], 1, X.shape[2]),
    'emb_size': 32,  
    'num_heads': 4,  
    'dim_ff': 64,  
    'dropout': 0.1,
    'Fix_pos_encode': 'tAPE',
    'Rel_pos_encode': 'Scalar',
    'num_labels': 5
}


# Initialise the LOPO cross-validator
logo = LeaveOneGroupOut()

# Loop over each patient
for train_idx, test_idx in logo.split(X, y, groups=patient_ids):
    # Split the data into training and testing sets
    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]

    # Convert to PyTorch dataset
    train_dataset = TensorDataset(X_train, y_train)
    test_dataset = TensorDataset(X_test, y_test)

    # Create data loaders
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

    # Initialize the ConvTran model
    model = ConvTran(config, num_classes=config['num_labels'])

    # Train the model
    optimizer = optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()

    model.train()
    for epoch in range(num_epochs):
        for batch_X, batch_y in train_loader:
            optimizer.zero_grad()
            outputs = model(batch_X)
            loss = criterion(outputs, batch_y)
            loss.backward()
            optimizer.step()

    # Evaluate the model on the test set
    model.eval()
    with torch.no_grad():
        all_preds = []
        for batch_X, _ in test_loader:
            outputs = model(batch_X)
            _, predicted = torch.max(outputs, 1)
            all_preds.append(predicted)
        all_preds = torch.cat(all_preds)
        accuracy = (all_preds == y_test).sum().item() / y_test.size(0)
        print(f'Patient ID: {patient_ids[test_idx[0]]}, Accuracy: {accuracy * 100:.2f}%')

  return F.conv2d(input, weight, bias, self.stride,


Patient ID: L05200708_eeg_label.csv, Accuracy: 59.15%
Patient ID: L05211742_eeg_label.csv, Accuracy: 67.87%
Patient ID: L05250816_eeg_label.csv, Accuracy: 64.89%
Patient ID: L05250921_eeg_label.csv, Accuracy: 37.66%
Patient ID: L05271431_eeg_label.csv, Accuracy: 46.33%
Patient ID: L05281010_eeg_label.csv, Accuracy: 46.37%
Patient ID: L06101015_eeg_label.csv, Accuracy: 68.83%
Patient ID: L06181302_eeg_label.csv, Accuracy: 34.88%
Patient ID: L06181332_eeg_label.csv, Accuracy: 39.62%
Patient ID: L06221009_eeg_label.csv, Accuracy: 32.14%
Patient ID: L06221141_eeg_label.csv, Accuracy: 63.64%
Patient ID: L06221219_eeg_label.csv, Accuracy: 19.05%
Patient ID: L08181442_eeg_label.csv, Accuracy: 32.98%
Patient ID: L08190811_eeg_label.csv, Accuracy: 53.85%
Patient ID: L08190921_eeg_label.csv, Accuracy: 68.06%


In [7]:
config = {
    'Data_shape': (64, 1, 128),  # Example: (batch_size, channels, sequence_length)
    'emb_size': 64,
    'num_heads': 8,
    'dim_ff': 128,
    'dropout': 0.2,
    'Fix_pos_encode': 'Learn',  # Can be 'Sin', 'Learn', or 'None'
    'Rel_pos_encode': 'Scalar',  # Can be 'Scalar', 'Vector', or 'None'
    'num_labels': 5,  # Number of classes
    'Net_Type': 'C'  # 'T' for Transformer, 'CC-T' for CasualConvTran, 'C' for ConvTran
}

'CC-T' for CasualConvTran, 'C' for ConvTran - these have higher accuracy then Transformer

number of input channels = 1 since EEG data is usually single-channel per electrode
30 seconds of EEG data at a sampling rate of 128 Hz

In [13]:
model = model_factory(config)

In [14]:
import torch
import torch.optim as optim

# Example training data (replace with actual data)
X_train = torch.randn(32, 1, 128)  # Example: (batch_size, channels, sequence_length)
y_train = torch.randint(0, 5, (32,))  # Example: 5 classes

# Define optimizer and loss function
optimizer = optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

# Training loop (simplified)
for epoch in range(10):  # Example: 10 epochs
    model.train()
    optimizer.zero_grad()
    
    # Forward pass
    outputs = model(X_train)
    
    # Compute loss
    loss = criterion(outputs, y_train)
    
    # Backward pass and optimization
    loss.backward()
    optimizer.step()
    
    print(f'Epoch [{epoch+1}/10], Loss: {loss.item():.4f}')

Epoch [1/10], Loss: 1.5931
Epoch [2/10], Loss: 1.4378
Epoch [3/10], Loss: 1.3834
Epoch [4/10], Loss: 1.3724
Epoch [5/10], Loss: 1.3435
Epoch [6/10], Loss: 1.3003
Epoch [7/10], Loss: 1.2668
Epoch [8/10], Loss: 1.2060
Epoch [9/10], Loss: 1.1798
Epoch [10/10], Loss: 1.1639


In [18]:
import torch

label_encoder = LabelEncoder()
y_encoded = label_encoder.fit_transform(labels_list)

# Convert lists to numpy arrays and then to PyTorch tensors
X = torch.tensor(np.array(data_list), dtype=torch.float32)  # Shape: (num_samples, sequence_length)
y = torch.tensor(y_encoded, dtype=torch.long)  # Shape: (num_samples,)
patient_ids = np.array(patient_ids)  # Keep patient IDs as numpy array for easier indexing

# Reshape X to include a channel dimension
X = X.unsqueeze(1)  # Shape: (num_samples, 1, sequence_length)



In [19]:
import torch.optim as optim
import torch.nn as nn

# Define the number of epochs
num_epochs = 3  # You can change this number based on your needs

# # Define optimizer and loss function
# optimizer = optim.Adam(model.parameters(), lr=0.001)
# criterion = nn.CrossEntropyLoss()

# model.train()
# for epoch in range(num_epochs):
#     optimizer.zero_grad()
#     outputs = model(X_train)
#     loss = criterion(outputs, y_train)
#     loss.backward()
#     optimizer.step()

#     # Print loss for every epoch
#     print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {loss.item():.4f}')


In [None]:
from sklearn.model_selection import LeaveOneGroupOut

# Initialize the LOPO cross-validator
logo = LeaveOneGroupOut()

# Loop over each patient
for train_idx, test_idx in logo.split(X, y, groups=patient_ids):
    # Split the data into training and testing sets
    X_train, X_test = X[train_idx], X[test_idx]
    y_train, y_test = y[train_idx], y[test_idx]
    
    # Initialize your model (example using ConvTran)
    model = ConvTran(config, num_classes=5)  # Replace with your actual config and number of classes
    
    # Train the model (you will need to implement the training loop)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    criterion = nn.CrossEntropyLoss()
    
    model.train()
    for epoch in range(num_epochs):
        optimizer.zero_grad()
        outputs = model(X_train)
        loss = criterion(outputs, y_train)
        loss.backward()
        optimizer.step()
    
    # Evaluate the model on the test set
    model.eval()
    with torch.no_grad():
        outputs = model(X_test)
        _, predicted = torch.max(outputs, 1)
        accuracy = (predicted == y_test).sum().item() / y_test.size(0)
        print(f'Patient ID: {patient_ids[test_idx[0]]}, Accuracy: {accuracy * 100:.2f}%')
