In [None]:
!pip install -r requirements.txt



In [None]:
# ===== LIBRARY IMPORTS =====
# Import necessary libraries for EEG signal processing and machine learning

# PyTorch for neural networks and deep learning
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

# Scikit-learn for data preprocessing and evaluation metrics
import sklearn
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.metrics import accuracy_score, classification_report, confusion_matrix, ConfusionMatrixDisplay

# NumPy for numerical computations
import numpy as np

# SciPy for signal processing (Short-Time Fourier Transform)
from scipy.signal import stft

# Matplotlib for plotting and visualization
import matplotlib.pyplot as plt

# Custom modules for data loading and constants
from loading_data import *
from CONSTANT import *

In [None]:
# ===== HELPER FUNCTIONS =====
# This section contains utility functions for EEG signal processing

def number_fft(window_size):
    """
    Calculate the next power of 2 greater than window_size-1.
    This is used for optimal FFT computation.
    
    Args:
        window_size (int): Size of the analysis window
        
    Returns:
        int: Next power of 2 (2^power)
    """
    power = 0
    window_tmp = window_size - 1
    while(window_tmp != 1):
        power = power + 1
        window_tmp = int(window_tmp / 2)
    return pow(2, power + 1)

def feature_extraction(input_data, stft_parameters=DEFAULT_STFT_PARAMETERS, label_num=0, fs=SAMPLING_FQ):
    """
    Extract features from EEG data using Short-Time Fourier Transform (STFT).
    
    This function processes multi-channel EEG data and extracts spectral features
    for emotion classification. It applies STFT, computes power spectrum,
    and performs smoothing and normalization.
    
    Args:
        input_data (np.ndarray): Raw EEG data of shape (samples, channels)
        stft_parameters (dict): Parameters for STFT computation
        label_num (int): Class label for the data (0=focused, 1=unfocused, 2=drowsed)
        fs (int): Sampling frequency
        
    Returns:
        tuple: (features, labels) where features is a transposed array and labels is a list
    """
    
    def square(x): return (np.abs(x))**2
    def decibels(x): return 10*np.log10(x)
    
    # Extract STFT parameters
    window_size = stft_parameters['window_size']
    window_shift = stft_parameters['window_shift']
    avg_window_size = stft_parameters['avg_filter_size']
    window_type = stft_parameters['window_type']

    times = fs / window_shift
    feature_list = []

    # Calculate optimal FFT size (power of 2)
    nfft_size = number_fft(window_size)

    # Process each EEG channel (62 channels total)
    for i in range(62):
        channel_feature_list = []
        
        # Apply STFT to extract time-frequency features
        # STFT transforms time-domain signal to time-frequency domain
        eeg_feq = stft(input_data[:, i], fs, window_type, nperseg=window_size*fs,
                       noverlap=window_size*fs-window_shift, nfft=nfft_size*fs)
        
        # Extract the power spectrum (magnitude squared)
        eeg_feq_data = eeg_feq[-1]
        eeg_feq_data = eeg_feq_data[0:-1, 0:-1]  # Remove DC and Nyquist components
        eeg_feq_data = eeg_feq_data.reshape(128, int(nfft_size/2), -1)

        # Process frequency bands (36 bands from 4Hz to 40Hz)
        for j in range(36):
            # Extract features from each frequency band (starting from 4Hz)
            current = eeg_feq_data[j+1, :, :].mean(axis=0)
            
            # Convert to power spectrum
            current = np.apply_along_axis(square, axis=0, arr=current)
            
            # Convert to decibels (log scale)
            current = np.apply_along_axis(decibels, axis=0, arr=current)
            
            # Apply moving average smoothing
            feature = moving_average_smooth(current, avg_window_size)
            channel_feature_list.append(feature)
            
        # Standardize features across time windows
        channel_feature_list = standardscaler_dataframe_train(
            np.array(channel_feature_list))
        
        # Combine features from all channels
        if (i == 0):
            feature_list = np.array(channel_feature_list)
        else:
            feature_list = np.vstack(
                (feature_list, np.array(channel_feature_list)))
            
    # Create label for the entire trial
    # focused: 0 -> unfocused: 1 -> drowsed: 2
    label = [label_num] * feature_list.shape[1]
    
    # Return transposed features (time windows as samples)
    return feature_list.transpose(), label

def moving_average_smooth(interval, window_size):
    """
    Apply moving average smoothing to reduce noise in the signal.
    
    Args:
        interval (np.ndarray): Input signal
        window_size (int): Size of the averaging window
        
    Returns:
        np.ndarray: Smoothed signal
    """
    window = np.ones(int(window_size)) / float(window_size)
    re = np.convolve(interval, window, 'same')
    return re

def standardscaler_dataframe_train(feature_list):
    """
    Standardize features using StandardScaler (zero mean, unit variance).
    
    This function normalizes each feature independently to improve
    model training and convergence.
    
    Args:
        feature_list (list): List of feature arrays
        
    Returns:
        list: List of standardized feature arrays
    """
    scaler_list = list()
    new_feature_list = list()
    
    # Standardize each feature independently
    for i in range(len(feature_list)):
        scaler = StandardScaler()
        x = np.array(feature_list[i]).reshape(-1, 1)
        x = scaler.fit_transform(x)
        new_feature_list.append(x.reshape(-1))
        scaler_list.append(scaler)

    return new_feature_list

def set_seed(seed=42):
    """
    Set random seeds for reproducibility.
    
    Ensures consistent results across different runs by fixing
    random number generation in Python libraries.
    
    Args:
        seed (int): Random seed value
    """
    random.seed(seed)  # python
    np.random.seed(seed)  # numpy
    # PyTorch seeds commented out as they're not needed for current implementation
    # torch.manual_seed(seed) # pytorch
    # torch.cuda.manual_seed(seed)
    # torch.backends.cudnn.deterministic = True
    # torch.backends.cudnn.benchmark = False

# Set seed for reproducible results
set_seed(1)

In [None]:
# ===== DATA PREPARATION FOR CROSS-VALIDATION =====
# This function prepares data for leave-one-session-out cross-validation

def process_dataset_to_fold(label_choices=[0,3]):
    """
    Process dataset for leave-one-session-out cross-validation.
    
    This function creates training and test sets by leaving out each session
    in turn. This is important for evaluating model generalization across
    different recording sessions.
    
    Args:
        label_choices (list): List of emotion labels to include
                             [0,3] typically means focused vs drowsed
        
    Returns:
        tuple: (X_train_folds, X_test_folds, y_train_folds, y_test_folds)
               Each list contains 3 elements (one for each fold)
    """
    X_train_folds = []
    X_test_folds = []
    y_train_folds = []
    y_test_folds = []

    # Perform leave-one-session-out CV (3 sessions total)
    for session_except in [1,2,3]:
        X_train_mean = []
        X_train = []
        y_train = []
        
        # Use remaining sessions for training
        list_session = [1,2,3]
        list_session.remove(session_except)
        
        # Process all files from training sessions
        for session_num in list_session:
            for file_num in range(24):
                # Get labels for current session
                label_list = SESSION_LABELS[str(session_num)]
                
                # Extract features from EEG data
                x_part, y_part = feature_extraction(
                    input_data=d[str(session_num)][str(file_num)], 
                    label_num=label_list[file_num]
                )
                
                # Only include specified emotion classes
                if label_list[file_num] in label_choices:
                    X_train_mean.extend(list(x_part))
                    X_train.extend(list(x_part))
                    y_train.extend(list(y_part))

        # Process test session
        X_test_mean = []
        X_test = []
        y_test = []
        
        for file_num in range(24):
            label_list = SESSION_LABELS[str(session_except)]
            x_part, y_part = feature_extraction(
                input_data=d[str(session_except)][str(file_num)], 
                label_num=label_list[file_num]
            )
            if label_list[file_num] in label_choices:
                X_test_mean.extend(list(x_part))
                X_test.extend(list(x_part))
                y_test.extend(list(y_part))

        # Append fold data
        X_train_folds.append(X_train)
        X_test_folds.append(X_test)
        y_train_folds.append(y_train)
        y_test_folds.append(y_test)
        
    return X_train_folds, X_test_folds, y_train_folds, y_test_folds

In [None]:
# Custom PyTorch Dataset Classes

class MyNNDataset(Dataset):
    """
    Custom PyTorch Dataset for Neural Network (MLP) model.
    
    This dataset flattens the 2D feature matrix (62 channels × 36 frequency bands)
    into a 1D vector (2232 features) for input to a standard neural network.
    """
    def __init__(self, X, y):
        self.X = []
        for x in X:
            # Convert to tensor and flatten to 1D
            x = torch.tensor(x, dtype=torch.float32)
            x = x.flatten()          # Convert 62×36 matrix to 2232-dim vector
            self.X.append(x)
        self.X = torch.stack(self.X)  # Stack into tensor of shape (N, 2232)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]
    
class MyCNNDataset(Dataset):
    """
    Custom PyTorch Dataset for Convolutional Neural Network model.
    
    This dataset preserves the 2D structure of the EEG features
    as a spatial representation (62 channels × 36 frequency bands)
    which can be treated like an image for CNN processing.
    """
    def __init__(self, X, y):
        self.X = []
        for x in X:
            x = torch.tensor(x, dtype=torch.float32)

            # Ensure correct shape for CNN
            # If it's a vector (2232,), reshape to (62, 36)
            if x.ndim == 1 and x.numel() == 2232:
                x = x.reshape(62, 36)

            # If it's already (62, 36), add channel dimension
            if x.shape == (62, 36):
                x = x.unsqueeze(0)  # Add channel dim: (1, 62, 36)
            self.X.append(x)

        self.X = torch.stack(self.X)   # Shape: (N, 1, 62, 36)
        self.y = torch.tensor(y, dtype=torch.long)

    def __len__(self):
        return len(self.y)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

In [None]:
# Neural Network Model Architectures

class NNClassifier(nn.Module):
    """
    Multi-Layer Perceptron (MLP) for EEG emotion classification.
    
    This is a standard fully-connected neural network that takes
    flattened EEG features (2232 dimensions) as input.
    
    Architecture:
    - Input: 2232 features (62 channels × 36 frequency bands)
    - Hidden layers: 512 → 128 → 32 units
    - Output: num_classes units (emotion categories)
    - Activation: ReLU for hidden layers
    """
    def __init__(self, input_dim, num_classes=4):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(input_dim, 512),  # First hidden layer
            nn.ReLU(),
            nn.Linear(512, 128),        # Second hidden layer
            nn.ReLU(),
            nn.Linear(128, 32),         # Third hidden layer
            nn.ReLU(),
            nn.Linear(32, num_classes)  # Output layer (no activation)
        )

    def forward(self, x):
        return self.net(x)

class CNNClassifier(nn.Module):
    """
    Convolutional Neural Network for EEG emotion classification.
    
    This CNN treats the EEG feature matrix (62×36) as a 2D spatial map,
    applying convolution operations to learn spatial patterns.
    
    Architecture:
    - Input: 1×62×36 tensor (1 channel, 62 electrodes, 36 frequency bands)
    - Conv layers: 16→32 filters with 3×3 kernels
    - Max pooling: 2×2 windows
    - Fully connected: 32×9×15 → 128 → num_classes
    """
    def __init__(self, num_classes=4):
        super().__init__()
        # Convolutional layers for feature extraction
        self.conv = nn.Sequential(
            # First conv block
            nn.Conv2d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),   # Output: (16, 31, 18)

            # Second conv block
            nn.Conv2d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2),   # Output: (32, 15, 9)
        )

        # Fully connected layers for classification
        self.fc = nn.Sequential(
            nn.Linear(32*15*9, 128),  # Note: Check input size based on actual conv output
            nn.ReLU(),
            nn.Linear(128, num_classes)
        )

    def forward(self, x):
        # Apply convolutional layers
        out = self.conv(x)
        # Flatten for fully connected layers
        out = out.reshape(out.size(0), -1)
        # Apply fully connected layers
        return self.fc(out)

In [None]:
# Training and Evaluation Functions

def train_and_test(
    X_train, y_train, X_test, y_test, 
    epochs=20, batch_size=16, lr=1e-3,
    device=device,
    model_type='nn', print_check=True
):
    """
    Train and evaluate either NN or CNN model on EEG data.
    
    This function handles the complete training pipeline including:
    - Dataset creation and DataLoader setup
    - Model initialization
    - Training loop
    - Evaluation with confusion matrix and classification report
    
    Args:
        X_train, y_train: Training data and labels
        X_test, y_test: Test data and labels
        epochs: Number of training epochs
        batch_size: Batch size for training
        lr: Learning rate
        device: PyTorch device (cpu/cuda)
        model_type: 'nn' for MLP, 'cnn' for CNN
        print_check: Whether to print detailed results
        
    Returns:
        tuple: (trained_model, test_accuracy)
    """

    # ===== DATA PREPARATION =====
    # Create appropriate datasets based on model type
    if model_type.lower() == 'nn':
        train_ds = MyNNDataset(X_train, y_train)
        test_ds  = MyNNDataset(X_test,  y_test)
        model = NNClassifier(input_dim=len(X_train[1])).to(device)
    elif model_type.lower() == 'cnn':
        train_ds = MyCNNDataset(X_train, y_train)
        test_ds  = MyCNNDataset(X_test,  y_test)
        model = CNNClassifier().to(device)
    else:
        raise TypeError("Only allowed cnn or nn model. Check your option and run again!")

    # Create DataLoaders for batch processing
    train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
    test_loader  = DataLoader(test_ds,  batch_size=batch_size, shuffle=False)

    # ===== TRAINING SETUP =====
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)
    loss_fn = nn.CrossEntropyLoss()  # Multi-class classification loss

    # ===== TRAINING LOOP =====
    for epoch in range(epochs):
        model.train()  # Set model to training mode
        total_loss = 0
        
        # Iterate over batches
        for batch_X, batch_y in train_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)

            # Forward pass
            optimizer.zero_grad()
            logits = model(batch_X)
            loss = loss_fn(logits, batch_y)
            
            # Backward pass
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()

    # ===== EVALUATION =====
    model.eval()  # Set model to evaluation mode
    correct = 0
    total = 0
    all_preds = []
    all_labels = []

    # Disable gradient computation for evaluation
    with torch.no_grad():
        for batch_X, batch_y in test_loader:
            batch_X, batch_y = batch_X.to(device), batch_y.to(device)
            
            # Get predictions
            logits = model(batch_X)
            pred = logits.argmax(dim=1)
            
            # Calculate accuracy
            correct += (pred == batch_y).sum().item()
            total += batch_y.size(0)

            # Store predictions for detailed metrics
            all_preds.extend(pred.cpu().numpy())
            all_labels.extend(batch_y.cpu().numpy())

    # Calculate final accuracy
    accuracy = correct / total if total > 0 else 0
    
    # ===== RESULTS VISUALIZATION =====
    if print_check:
        # Determine model name for display
        if model_type == 'nn':
            print('Neural Network training:')
            model_name = 'Neural Network'
        elif model_type == 'cnn':
            print('\nConvolutional Neural Network training:')
            model_name = 'Convolutional Neural Network'
        
        # Create and display confusion matrix
        cm = confusion_matrix(all_labels, all_preds)
        disp = ConfusionMatrixDisplay(confusion_matrix=cm, 
                                     display_labels=['Class 0', 'Class 1'])
        disp.plot(cmap=plt.cm.Blues)
        plt.title(f"Confusion Matrix {model_name}")
        plt.show()
        
        # Print performance metrics
        print(f"Test Accuracy: {accuracy * 100:.2f}%")
        print("\nClassification report:")
        print(classification_report(all_labels, all_preds))

    return model, accuracy

In [None]:
# ===== EXPERIMENT EXECUTION =====
# This cell runs the complete cross-validation experiment

# Prepare cross-validation datasets for focused vs drowsed classification
# Label choices: [0=focused, 3=drowsed]
X_train_folds, X_test_folds, y_train_folds, y_test_folds = process_dataset_to_fold(label_choices=[0,3])

# Initialize lists to store accuracy results
acc_NN_summarize = []  # Neural Network accuracies
acc_CNN_summarize = [] # CNN accuracies

# Flag to control output printing
PRINT_CHECK = True

# ===== CROSS-VALIDATION LOOP =====
# Perform 3-fold cross-validation (leave one session out each time)
for i in range(3):
    print(f"\n{'='*50}")
    print(f"FOLD {i+1}/3 - Session {i+1} as test set")
    print(f"{'='*50}")
    
    # Train and evaluate Neural Network
    print("\nTraining Neural Network...")
    modelNN, accNN = train_and_test(
        X_train=X_train_folds[i], 
        y_train=y_train_folds[i], 
        X_test=X_test_folds[i], 
        y_test=y_test_folds[i], 
        epochs=20, 
        batch_size=64, 
        lr=1e-3, 
        model_type='nn', 
        print_check=PRINT_CHECK
    )
    acc_NN_summarize.append(accNN)
    print(f"Neural Network Accuracy (Fold {i+1}): {accNN*100:.2f}%")
    
    # Train and evaluate CNN
    print("\n" + "="*50)
    print("Training Convolutional Neural Network...")
    modelCNN, accCNN = train_and_test(
        X_train=X_train_folds[i], 
        y_train=y_train_folds[i], 
        X_test=X_test_folds[i], 
        y_test=y_test_folds[i], 
        epochs=20, 
        batch_size=64, 
        lr=1e-3, 
        model_type='cnn', 
        print_check=PRINT_CHECK
    )
    acc_CNN_summarize.append(accCNN)
    print(f"CNN Accuracy (Fold {i+1}): {accCNN*100:.2f}%")

# ===== FINAL RESULTS SUMMARY =====
print(f"\n{'='*50}")
print("FINAL RESULTS SUMMARY")
print(f"{'='*50}")

# Calculate and display mean accuracies across folds
nn_mean_acc = np.mean(np.array(acc_NN_summarize))
cnn_mean_acc = np.mean(np.array(acc_CNN_summarize))

print(f"\nNeural Network Performance:")
print(f"  - Fold accuracies: {[f'{acc*100:.2f}%' for acc in acc_NN_summarize]}")
print(f"  - Mean accuracy: {nn_mean_acc*100:.2f}%")
print(f"  - Std deviation: {np.std(np.array(acc_NN_summarize))*100:.2f}%")

print(f"\nConvolutional Neural Network Performance:")
print(f"  - Fold accuracies: {[f'{acc*100:.2f}%' for acc in acc_CNN_summarize]}")
print(f"  - Mean accuracy: {cnn_mean_acc*100:.2f}%")
print(f"  - Std deviation: {np.std(np.array(acc_CNN_summarize))*100:.2f}%")

# Determine which model performed better
if cnn_mean_acc > nn_mean_acc:
    improvement = ((cnn_mean_acc - nn_mean_acc) / nn_mean_acc) * 100
    print(f"\nCNN outperforms NN by {improvement:.2f}%")
elif nn_mean_acc > cnn_mean_acc:
    improvement = ((nn_mean_acc - cnn_mean_acc) / cnn_mean_acc) * 100
    print(f"\nNN outperforms CNN by {improvement:.2f}%")
else:
    print(f"\nBoth models have similar performance")