In [None]:
!pip install mne

In [None]:
!pip install pymatreader

In [None]:
!pip install scipy

In [None]:
import pandas as pd

# Load the participant metadata file
metadata_path = "/content/participants.tsv"
metadata = pd.read_csv(metadata_path, sep="\t")

print(metadata.head())

In [None]:
group_mapping = {'A': 0, 'F': 1, 'C': 2}
metadata['Group'] = metadata['Group'].map(group_mapping)

# Check the distribution of the 'Group' column in your metadata DataFrame
group_counts = metadata['Group'].value_counts()
print(f"Group distribution:\n{group_counts}")

In [None]:
gender_mapping = {'M': 0, 'F': 1}
metadata['Gender'] = metadata['Gender'].map(gender_mapping)

# Check the distribution of the 'Gender' column in your metadata DataFrame
group_counts = metadata['Gender'].value_counts()
print(f"Gender distribution:\n{group_counts}")

In [None]:
import numpy as np
import mne
import pandas as pd
import os
from scipy.signal import resample
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, TensorDataset
import torch.optim as optim

# Function to downsample EEG data
def downsample_data(data, original_sfreq, target_sfreq):
    num_samples = int(data.shape[1] * target_sfreq / original_sfreq)
    downsampled_data = resample(data, num_samples, axis=1)
    return downsampled_data

# Function to perform epoching with 50% overlap and 4s windows
def create_epochs(data, sfreq, epoch_duration=4, overlap=0.5):
    samples_per_epoch = int(epoch_duration * sfreq)
    step_samples = int(samples_per_epoch * (1 - overlap))  # 50% overlap

    num_epochs = max(0, (data.shape[1] - samples_per_epoch) // step_samples + 1)
    epochs = []

    for i in range(num_epochs):
        start = i * step_samples
        end = start + samples_per_epoch
        epochs.append(data[:, start:end])

    return np.array(epochs)

# Function to load EEG data, downsample, epoch, and extract metadata
def load_and_preprocess_eeg_with_metadata(eeg_folder, metadata_df, target_sfreq=128, epoch_duration=4, overlap=0.5):
    eeg_files = [f for f in os.listdir(eeg_folder) if f.endswith('.set')]
    all_epochs = []
    all_groups = []
    all_ages = []
    all_mmse = []
    all_genders = []
    all_patient_ids = []

    for eeg_file in eeg_files:
        file_path = os.path.join(eeg_folder, eeg_file)

        # Load the EEG data
        eeg_data = mne.io.read_raw_eeglab(file_path, preload=True)
        original_sfreq = eeg_data.info['sfreq']
        data, _ = eeg_data.get_data(return_times=True)  # Data shape: (n_channels, n_timepoints)

        # Downsample the data
        downsampled_data = downsample_data(data, original_sfreq, target_sfreq)

        # Epoch the downsampled data
        epochs = create_epochs(downsampled_data, target_sfreq, epoch_duration, overlap)

        # Extract participant ID (filename format is 'sub-XXX_task-eyesclosed_eeg.set')
        participant_id = eeg_file.split('_')[0]

        # Retrieve group, age, and MMSE from metadata
        participant_info = metadata_df[metadata_df['participant_id'] == participant_id]

        if not participant_info.empty:
            group = participant_info.iloc[0]['Group']
            age = participant_info.iloc[0]['Age']
            mmse = participant_info.iloc[0]['MMSE']
            gender = participant_info.iloc[0]['Gender']
        else:
            group, age, mmse, gender = 'Unknown', np.nan, np.nan, np.nan  # Assign NaN for missing values

        # Append values for each epoch
        if epochs.shape[0] > 0:
            all_epochs.append(epochs)
            all_groups.extend([group] * epochs.shape[0])
            all_ages.extend([age] * epochs.shape[0])
            all_mmse.extend([mmse] * epochs.shape[0])
            all_genders.extend([gender] * epochs.shape[0])
            all_patient_ids.extend([participant_id] * epochs.shape[0])

    # Convert lists to NumPy arrays
    all_epochs = np.vstack(all_epochs) if all_epochs else np.array([])
    all_groups = np.array(all_groups)
    all_ages = np.array(all_ages, dtype=float)
    all_mmse = np.array(all_mmse, dtype=float)
    all_genders = np.array(all_genders, dtype=int)
    all_patient_ids = np.array(all_patient_ids)

    return all_epochs, all_groups, all_ages, all_mmse, all_genders, all_patient_ids

# Load the EEG data
eeg_folder = '/content/derivatives'  # Path to EEG files
target_sfreq = 128
epochs, groups, ages, mmse_scores, genders, patient_ids = load_and_preprocess_eeg_with_metadata

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, TensorDataset
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix, precision_score, recall_score, f1_score

# Assuming your EEG, age, MMSE, and gender data are already preprocessed
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Normalize EEG data
scaler = StandardScaler()
epochs_flattened = epochs.reshape(epochs.shape[0], -1)  # Flatten the epochs for scaling
epochs_normalized = scaler.fit_transform(epochs_flattened)  # Normalize
epochs_normalized = epochs_normalized.reshape(epochs.shape[0], 19, 512)  # Reshape back to (epochs, channels, timepoints)

# Normalize the additional features (age, MMSE)
age_scaler = StandardScaler()
ages_normalized = age_scaler.fit_transform(ages.reshape(-1, 1))

mmse_scaler = StandardScaler()
mmse_normalized = mmse_scaler.fit_transform(mmse_scores.reshape(-1, 1))

# Gender doesn't require scaling, so we just use it as-is
genders_tensor = torch.tensor(genders, dtype=torch.long)

# Prepare the additional features (age, MMSE, gender) for training and testing
ages_tensor = torch.tensor(ages_normalized, dtype=torch.float).unsqueeze(1)  # Shape becomes [34788, 1]
mmse_tensor = torch.tensor(mmse_normalized, dtype=torch.float).unsqueeze(1)  # Shape becomes [34788, 1]
genders_tensor = torch.tensor(genders, dtype=torch.long).unsqueeze(1)  # Shape becomes [34788, 1]


# ResNet Model with Dropout, Weight Decay, and Data Augmentation
class ResNet1D(nn.Module):

    # Initialize model with given input channels and number of output classes
    def __init__(self, input_channels, num_classes):
        super(ResNet1D, self).__init__()

        # Block 1 - 2 convolutions and a residual connection, followed by dropout for regularization
        self.block1_conv1 = nn.Conv1d(input_channels, 64, kernel_size=3, stride=1, padding=1)
        self.block1_conv2 = nn.Conv1d(64, 64, kernel_size=3, stride=1, padding=1)
        self.block1_residual = nn.Conv1d(input_channels, 64, kernel_size=1, stride=1, padding=0)
        self.dropout1 = nn.Dropout(0.5)  # 50% dropout in Block 1

        # Same for Block 2 and 3, but with increasing channels: 64→128→256
        # Block 2
        self.block2_conv1 = nn.Conv1d(64, 128, kernel_size=3, stride=1, padding=1)
        self.block2_conv2 = nn.Conv1d(128, 128, kernel_size=3, stride=1, padding=1)
        self.block2_residual = nn.Conv1d(64, 128, kernel_size=1, stride=1, padding=0)
        self.dropout2 = nn.Dropout(0.5)  # 50% dropout in Block 2

        # Block 3
        self.block3_conv1 = nn.Conv1d(128, 256, kernel_size=3, stride=1, padding=1)
        self.block3_conv2 = nn.Conv1d(256, 256, kernel_size=3, stride=1, padding=1)
        self.block3_residual = nn.Conv1d(128, 256, kernel_size=1, stride=1, padding=0)
        self.dropout3 = nn.Dropout(0.5)  # 50% dropout in Block 3

        # Global Average Pooling
        # Reduce the time dimension to one by averaging
        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)

        # Fully connected layers
        # First FC layer combines CNN output with 3 metadata features
        self.fc1 = nn.Linear(256 + 3, 512)  # 256 from ResNet output + 3 features (age, MMSE, gender)
        self.fc2 = nn.Linear(512, num_classes)  # Final output layer

    # Accept EEG input and metadata
    def forward(self, x, age, mmse, gender):
        # Block 1
        residual = x
        x = self.block1_conv1(x)
        x = F.relu(x)
        x = self.block1_conv2(x)
        x = F.relu(x)
        residual = self.block1_residual(residual)
        x = x + residual  # Residual connection
        x = self.dropout1(x)  # Dropout after Block 1

        # Block 2
        residual = x
        x = self.block2_conv1(x)
        x = F.relu(x)
        x = self.block2_conv2(x)
        x = F.relu(x)
        residual = self.block2_residual(residual)
        x = x + residual  # Residual connection
        x = self.dropout2(x)  # Dropout after Block 2

        # Block 3
        residual = x
        x = self.block3_conv1(x)
        x = F.relu(x)
        x = self.block3_conv2(x)
        x = F.relu(x)
        residual = self.block3_residual(residual)
        x = x + residual  # Residual connection
        x = self.dropout3(x)  # Dropout after Block 3

        # Global average pooling
        x = self.global_avg_pool(x).squeeze(-1)  # Shape becomes [batch_size, channels]

        # Concatenate additional features (age, MMSE, gender)
        age_flattened = age.view(age.size(0), -1)
        mmse_flattened = mmse.view(mmse.size(0), -1)
        gender_flattened = gender.view(gender.size(0), -1)
        combined_features = torch.cat([x, age_flattened, mmse_flattened, gender_flattened], dim=1)

        # Fully connected layers after combining features
        x = self.fc1(combined_features)
        x = F.relu(x)
        x = self.fc2(x)

        return x


# Data Augmentation (add Gaussian noise to EEG inputs to improve generalization)
def augment_data(x, noise_factor=0.1):
    # Adding random noise to the input
    noise = torch.randn_like(x) * noise_factor
    return x + noise

In [None]:
# Perform Leave-One-Patient-Out Cross-Validation
patients = np.unique(patient_ids)
accuracies = []
all_preds = []
all_labels = []

for patient in patients:
    # Split the data into training and testing based on patient ID
    train_mask = patient_ids != patient
    test_mask = patient_ids == patient

    # Prepare training and testing data
    X_train, y_train = epochs_normalized[train_mask], groups[train_mask]
    X_test, y_test = epochs_normalized[test_mask], groups[test_mask]

    # Prepare the additional features (age, MMSE, gender) for training and testing
    age_train, mmse_train, gender_train = ages_tensor[train_mask], mmse_tensor[train_mask], genders_tensor[train_mask]
    age_test, mmse_test, gender_test = ages_tensor[test_mask], mmse_tensor[test_mask], genders_tensor[test_mask]

    # Convert to tensors and move to the device
    X_train_tensor = torch.tensor(X_train, dtype=torch.float32).to(device)
    y_train_tensor = torch.tensor(y_train, dtype=torch.long).to(device)
    age_train_tensor = age_train.to(device)
    mmse_train_tensor = mmse_train.to(device)
    gender_train_tensor = gender_train.to(device)

    X_test_tensor = torch.tensor(X_test, dtype=torch.float32).to(device)
    y_test_tensor = torch.tensor(y_test, dtype=torch.long).to(device)
    age_test_tensor = age_test.to(device)
    mmse_test_tensor = mmse_test.to(device)
    gender_test_tensor = gender_test.to(device)

    # Create DataLoader
    train_dataset = TensorDataset(X_train_tensor, y_train_tensor, age_train_tensor, mmse_train_tensor, gender_train_tensor)
    test_dataset = TensorDataset(X_test_tensor, y_test_tensor, age_test_tensor, mmse_test_tensor, gender_test_tensor)

    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

    # Initialize the model
    model = ResNet1D(input_channels=19, num_classes=3).to(device)

    # Define loss function and optimizer with weight decay
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-4)  # L2 regularization
    scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)  # Learning rate scheduler

    # Train the model
    model.train()
    for epoch in range(5):  # Increase this number to 10-50 for better training
        running_loss = 0.0
        correct = 0
        total = 0
        for inputs, labels, age, mmse, gender in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            age, mmse, gender = age.to(device), mmse.to(device), gender.to(device)

            # Apply augmentation here
            inputs = augment_data(inputs)

            optimizer.zero_grad()
            outputs = model(inputs, age, mmse, gender)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        scheduler.step()  # Step the learning rate scheduler

        print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}, Accuracy: {100 * correct / total}%")

    # Test the model
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels, age, mmse, gender in test_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            age, mmse, gender = age.to(device), mmse.to(device), gender.to(device)
            outputs = model(inputs, age, mmse, gender)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

            # Collect predictions and labels for later metric calculations
            all_preds.extend(predicted.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = 100 * correct / total
    accuracies.append(accuracy)
    print(f"Patient {patient}: Test Accuracy: {accuracy}%")

# Calculate metrics after loop
conf_matrix = confusion_matrix(all_labels, all_preds)
precision = precision_score(all_labels, all_preds, average='weighted')
recall = recall_score(all_labels, all_preds, average='weighted')
f1 = f1_score(all_labels, all_preds, average='weighted')

# Print overall performance metrics
print(f"Average Accuracy: {np.mean(accuracies)}%")
print(f"Confusion Matrix:\n{conf_matrix}")
print(f"Precision: {precision}")
print(f"Recall: {recall}")
print(f"F1 Score: {f1}")