In [25]:
"""
Written by, 
Sriram Ravindran, sriram@ucsd.edu

Original paper - https://arxiv.org/abs/1611.08024

Please reach out to me if you spot an error.
"""

'\nWritten by, \nSriram Ravindran, sriram@ucsd.edu\n\nOriginal paper - https://arxiv.org/abs/1611.08024\n\nPlease reach out to me if you spot an error.\n'

In [26]:
import torch.nn as nn
import torch
from sklearn.metrics import (
    roc_auc_score, precision_score, recall_score, accuracy_score
)

import os
import pickle
import numpy as np


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)
print("PyTorch version:", torch.__version__)
print("CUDA available:", torch.cuda.is_available())
print("Compiled with CUDA:", torch.version.cuda)

Using device: cpu
PyTorch version: 2.9.0
CUDA available: False
Compiled with CUDA: None


<p>Here's the description from the paper</p>
<img src="EEGNet.png" style="width: 700px; float:left;">

In [27]:

# ---------------------------
# Squeeze-and-Excitation Block
# ---------------------------
class SEBlock(nn.Module):
    def __init__(self, channels, reduction=16):
        super(SEBlock, self).__init__()
        self.avg_pool = nn.AdaptiveAvgPool2d(1)
        self.fc = nn.Sequential(
            nn.Linear(channels, channels // reduction, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(channels // reduction, channels, bias=False),
            nn.Sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y.expand_as(x)

# ---------------------------
# EEGNet with SE Attention
# ---------------------------
class EEGNet(nn.Module):
    def __init__(self, T=120, C=32, dropout=0.25):
        super(EEGNet, self).__init__()
        self.T = T
        self.C = C
        self.dropout = dropout

        # Layer 1: temporal conv across channels
        self.conv1 = nn.Conv2d(1, 16, (1, C))  # kernel spans all 32 channels
        self.batchnorm1 = nn.BatchNorm2d(16)

        # Layer 2: spatial conv (depthwise) with more filters
        self.padding1 = nn.ZeroPad2d((8, 8, 0, 1))
        self.conv2 = nn.Conv2d(1, 16, (1, 16))
        self.batchnorm2 = nn.BatchNorm2d(16)
        self.pooling2 = nn.MaxPool2d((2, 4))

        # SE block after Layer 2
        self.se2 = SEBlock(16)

        # Layer 3
        self.padding2 = nn.ZeroPad2d((2, 1, 4, 3))
        self.conv3 = nn.Conv2d(16, 16, (8, 4))
        self.batchnorm3 = nn.BatchNorm2d(16)
        self.pooling3 = nn.MaxPool2d((2, 4))

        # Dynamically infer flatten size
        with torch.no_grad():
            dummy = torch.zeros(1, 1, T, C)
            out = self._forward_features(dummy)
            flatten_dim = out.shape[1]
        print(f"[EEGNet] Flattened feature dimension: {flatten_dim}")

        # Fully connected layer
        self.fc1 = nn.Linear(flatten_dim, 2)

    def _forward_features(self, x):
        # Layer 1
        x = F.elu(self.conv1(x))
        x = self.batchnorm1(x)
        x = F.dropout(x, self.dropout, training=self.training)
        x = x.permute(0, 3, 1, 2)

        # Layer 2
        x = self.padding1(x)
        x = F.elu(self.conv2(x))
        x = self.batchnorm2(x)
        x = F.dropout(x, self.dropout, training=self.training)
        x = self.pooling2(x)

        # Apply SE attention
        x = self.se2(x)

        # Layer 3
        x = self.padding2(x)
        x = F.elu(self.conv3(x))
        x = self.batchnorm3(x)
        x = F.dropout(x, self.dropout, training=self.training)
        x = self.pooling3(x)

        # Flatten
        x = x.reshape(x.size(0), -1)
        return x

    def forward(self, x):
        x = self._forward_features(x)
        x = self.fc1(x)
        return x

# ---------------------------
# Usage Example
# ---------------------------
net = EEGNet(T=120, C=32, dropout=0.25).to(device)
x_dummy = torch.rand(1, 1, 120, 32).to(device)
output = net(x_dummy)
criterion = nn.BCEWithLogitsLoss()
print("Output:", output)


[EEGNet] Flattened feature dimension: 448
Output: tensor([[-0.0402,  0.0106]], grad_fn=<AddmmBackward0>)


#### Evaluate function returns values of different criteria like accuracy, precision etc.
In case you face memory overflow issues, use batch size to control how many samples get evaluated at one time. Use a batch_size that is a factor of length of samples. This ensures that you won't miss any samples.

In [28]:
def evaluate(model, X, Y, params=["acc"], batch_size=100, device=None):
    """
    Evaluate a trained multi-label EEGNet model on given data.

    Args:
        model: torch.nn.Module
        X: numpy array, shape [samples, 1, timepoints, channels]
        Y: numpy array, shape [samples, n_labels] (e.g., valence+arousal)
        params: list of metrics to compute ['acc', 'auc', 'precision', 'recall', 'fmeasure']
        batch_size: batch size for evaluation
        device: torch.device (default: cuda if available)

    Returns:
        results: list of computed metrics (average across labels)
    """
    if device is None:
        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    model.eval()  # set model to evaluation mode
    preds = []

    # Iterate over batches
    for i in range(0, len(X), batch_size):
        batch_x = X[i:i + batch_size]
        inputs = torch.tensor(batch_x, dtype=torch.float32).to(device)

        with torch.no_grad():  # disable gradient computation
            output = model(inputs)        # raw logits
            output = torch.sigmoid(output)  # convert logits → probabilities

        preds.append(output.cpu().numpy())

    # Concatenate all batches
    predicted = np.vstack(preds)  # shape: [samples, n_labels]

    results = []
    for param in params:
        if param == "acc":
            results.append(np.mean(np.round(predicted) == Y))  # average accuracy over labels
        elif param == "auc":
            results.append(roc_auc_score(Y, predicted, average='macro'))
        elif param == "recall":
            results.append(recall_score(Y, np.round(predicted), average='macro'))
        elif param == "precision":
            results.append(precision_score(Y, np.round(predicted), average='macro'))
        elif param == "fmeasure":
            precision = precision_score(Y, np.round(predicted), average='macro')
            recall = recall_score(Y, np.round(predicted), average='macro')
            results.append(2 * precision * recall / (precision + recall + 1e-8))  # avoid div0

    model.train()  # switch back to training mode
    return results


In [29]:

# Path to your DEAP folder
# data_dir = r"G:\DEvAP\data_preprocessed_python"
data_dir = "/Users/mateisorodoc/Facultate/Licenta/DEAP_Data/data_preprocessed_python"
# Initialize lists
all_data = []
all_labels = []

# Loop through subjects s01–s32
for i in range(1, 33):
    filename = f"s{i:02d}.dat"
    file_path = os.path.join(data_dir, filename)
    print(f"Loading {filename}...")

    with open(file_path, "rb") as f:
        subject_data = pickle.load(f, encoding="latin1")

    data = subject_data["data"]      # shape: (40, 40, 8064)
    labels = subject_data["labels"]  # shape: (40, 4)

    all_data.append(data)
    all_labels.append(labels)

# Stack into single numpy arrays
data_all = np.vstack(all_data)       # shape: (1280, 40, 8064)
labels_all = np.vstack(all_labels)   # shape: (1280, 4)

print("All subjects loaded!")
print("Data shape:", data_all.shape)
print("Labels shape:", labels_all.shape)


Loading s01.dat...
Loading s02.dat...
Loading s03.dat...
Loading s04.dat...
Loading s05.dat...
Loading s06.dat...
Loading s07.dat...
Loading s08.dat...
Loading s09.dat...
Loading s10.dat...
Loading s11.dat...
Loading s12.dat...
Loading s13.dat...
Loading s14.dat...
Loading s15.dat...
Loading s16.dat...
Loading s17.dat...
Loading s18.dat...
Loading s19.dat...
Loading s20.dat...
Loading s21.dat...
Loading s22.dat...
Loading s23.dat...
Loading s24.dat...
Loading s25.dat...
Loading s26.dat...
Loading s27.dat...
Loading s28.dat...
Loading s29.dat...
Loading s30.dat...
Loading s31.dat...
Loading s32.dat...
All subjects loaded!
Data shape: (1280, 40, 8064)
Labels shape: (1280, 4)


In [30]:
import numpy as np

n_trials, n_channels, n_samples = data_all.shape
data_all = data_all[:, :32, :]  # keep first 32 channels
segment_len = 120
step = segment_len // 2  # 50% overlap
n_segments = (n_samples - segment_len) // step + 1  # total segments per trial

X_list = []
Y_list = []

print(f"Segmenting {n_trials} trials into {n_segments} overlapping segments each...")

for trial in range(n_trials):
    trial_data = data_all[trial]
    trial_label = labels_all[trial]

    for seg in range(n_segments):
        start = seg * step
        end = start + segment_len
        segment = trial_data[:, start:end].T  # shape: (120, 32)

        X_list.append(segment[np.newaxis, :, :])

        valence = int(trial_label[0] > 5.0)
        arousal = int(trial_label[1] > 5.0)
        Y_list.append([valence, arousal])

X = np.array(X_list, dtype=np.float32)
Y = np.array(Y_list, dtype=np.float32)

print("Done segmenting with 50% overlap!")
print("X shape:", X.shape)
print("Y shape:", Y.shape)


Segmenting 1280 trials into 133 overlapping segments each...
Done segmenting with 50% overlap!
X shape: (170240, 1, 120, 32)
Y shape: (170240, 2)


In [31]:
from sklearn.model_selection import train_test_split

# 70% training, 15% validation, 15% test
X_train, X_temp, y_train, y_temp = train_test_split(X, Y, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_val:", X_val.shape, "y_val:", y_val.shape)
print("X_test:", X_test.shape, "y_test:", y_test.shape)


X_train: (119168, 1, 120, 32) y_train: (119168, 2)
X_val: (25536, 1, 120, 32) y_val: (25536, 2)
X_test: (25536, 1, 120, 32) y_test: (25536, 2)


#### Run

In [32]:
import torch
import torch.nn.functional as F
from torch.amp import GradScaler, autocast
from torch.optim import Adam
from torch.optim.lr_scheduler import StepLR
import numpy as np

# ------------------------------
# Hyperparameters
# ------------------------------
batch_size = 16
epochs = 100
learning_rate = 1e-4
max_grad_norm = 1.0  # optional gradient clipping

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

net = net.to(device)
optimizer = Adam(net.parameters(), lr=learning_rate)
scheduler = StepLR(optimizer, step_size=10, gamma=0.5)  # reduce LR every 30 epochs
scaler = GradScaler()  # AMP scaler

params = ["acc", "auc", "fmeasure"]

# ------------------------------
# Training loop
# ------------------------------
for epoch in range(epochs):
    net.train()
    running_loss = 0.0

    # Shuffle training indices
    indices = np.arange(len(X_train))
    np.random.shuffle(indices)

    for i in range(0, len(X_train), batch_size):
        batch_idx = indices[i:i + batch_size]
        batch_x = torch.tensor(X_train[batch_idx], dtype=torch.float32).to(device)
        batch_y = torch.tensor(y_train[batch_idx], dtype=torch.float32).to(device)

        optimizer.zero_grad()

        # AMP forward + backward
        with autocast(device_type='cuda'):
            outputs = net(batch_x)
            loss = criterion(outputs, batch_y)

        scaler.scale(loss).backward()

        # Optional gradient clipping
        scaler.unscale_(optimizer)
        torch.nn.utils.clip_grad_norm_(net.parameters(), max_grad_norm)

        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()

    scheduler.step()  # update learning rate
    avg_loss = running_loss / (len(X_train) / batch_size)
    print(f"Epoch {epoch+1}/{epochs} - Training Loss: {avg_loss:.4f}")

    # ------------------------------
    # Evaluation
    # ------------------------------
    net.eval()
    train_metrics = evaluate(net, X_train, y_train, params, device=device)
    val_metrics = evaluate(net, X_val, y_val, params, device=device)
    test_metrics = evaluate(net, X_test, y_test, params, device=device)

    print(f"Train     - {train_metrics}")
    print(f"Validation- {val_metrics}")
    print(f"Test      - {test_metrics}")


  scaler = GradScaler()  # AMP scaler


Epoch 1/100 - Training Loss: 0.6801
Train     - [np.float64(0.582891380236305), 0.6129535129660459, 0.676479266138665]
Validation- [np.float64(0.5823151629072681), 0.609983471299175, 0.6772653698214322]
Test      - [np.float64(0.5843319235588973), 0.6156360677644579, 0.6763691323163753]




KeyboardInterrupt: 

In [71]:
net.eval()

with torch.no_grad():
    inputs = torch.tensor(X_test, dtype=torch.float32).to(device)
    outputs = net(inputs)  # shape: (402, 2)
    probs = torch.sigmoid(outputs)  # convert logits → probabilities [0,1]
    preds = (probs > 0.5).int()     # threshold at 0.5 → class 0 or 1

# Move back to CPU for viewing
pred_classes = preds.cpu().numpy()
true_classes = y_test.astype(int)

# Print some examples
for i in range(10):
    print(f"Sample {i+1}: Predicted [Valence, Arousal] = {pred_classes[i]}, True = {true_classes[i]}")


Sample 1: Predicted [Valence, Arousal] = [1 0], True = [0 1]
Sample 2: Predicted [Valence, Arousal] = [1 1], True = [0 1]
Sample 3: Predicted [Valence, Arousal] = [1 0], True = [1 1]
Sample 4: Predicted [Valence, Arousal] = [1 1], True = [1 1]
Sample 5: Predicted [Valence, Arousal] = [1 0], True = [1 1]
Sample 6: Predicted [Valence, Arousal] = [0 1], True = [1 0]
Sample 7: Predicted [Valence, Arousal] = [0 0], True = [1 1]
Sample 8: Predicted [Valence, Arousal] = [1 1], True = [1 0]
Sample 9: Predicted [Valence, Arousal] = [0 1], True = [1 1]
Sample 10: Predicted [Valence, Arousal] = [1 1], True = [1 1]
