In [25]:
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import StratifiedKFold
from sklearn.metrics import accuracy_score
import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy.signal import welch

In [26]:
patient_num = 2

eeg_task_path = f'../Data/Processed/sub0{patient_num}_binned_task.csv'
eeg_data = pd.read_csv(eeg_task_path)

In [27]:
class EEG_LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout_rate=0.5):
        super(EEG_LSTM, self).__init__()
        # Apply dropout only if num_layers > 1
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, 
                            num_layers=num_layers, batch_first=True, 
                            dropout=dropout_rate if num_layers > 1 else 0)
        self.bn = nn.BatchNorm1d(hidden_size)
        self.fc = nn.Linear(hidden_size, output_size)
        self.dropout = nn.Dropout(dropout_rate)
    
    def forward(self, x):
        batch_size, n_electrodes, n_time_steps, window_size = x.shape
        x = x.view(batch_size, n_time_steps, n_electrodes * window_size)
        lstm_out, (hn, _) = self.lstm(x)
        out = self.bn(hn[-1])
        out = self.dropout(out)
        out = self.fc(out)
        return out


In [28]:
def extract_sliding_windows(eeg_data, selected_electrodes, window_size, step_size):
    sequences = []
    labels = []
    for idx, row in eeg_data.iterrows():
        trial_data = []
        for electrode in selected_electrodes:
            # Convert the string representation to a list of floats
            signal = np.array(eval(row[electrode]))
            # Extract sliding windows
            windows = [
                signal[i:i + window_size]
                for i in range(0, len(signal) - window_size + 1, step_size)
            ]
            if len(windows) > 0:
                windows = [(window - np.mean(window)) / np.std(window) for window in windows]
                trial_data.append(np.array(windows))
        if len(trial_data) == len(selected_electrodes):
            trial_data = np.array(trial_data)  # Shape: (n_electrodes, n_time_steps, window_size)
            sequences.append(trial_data)
            label = 1 if row['label_type'] == 'social' else 0
            labels.append(label)
    X = np.array(sequences)
    y = np.array(labels)
    return X, y

In [29]:
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()
    for X_batch, y_batch in train_loader:
        X_batch, y_batch = X_batch.to(device), y_batch.to(device)
        optimizer.zero_grad()
        outputs = model(X_batch)
        loss = criterion(outputs, y_batch)
        loss.backward()
        optimizer.step()

In [30]:
def evaluate_model(model, val_loader, device):
    model.eval()
    all_preds = []
    all_labels = []
    with torch.no_grad():
        for X_batch, y_batch in val_loader:
            X_batch = X_batch.to(device)
            outputs = model(X_batch)
            preds = torch.argmax(outputs, dim=1).cpu().numpy()
            all_preds.extend(preds)
            all_labels.extend(y_batch.numpy())
    return accuracy_score(all_labels, all_preds)

In [31]:
def cross_validate(eeg_data, selected_electrodes, hyperparams, device='cuda'):
    skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    best_score = 0
    best_params = None

    for hidden_size in hyperparams['hidden_size']:
        for num_layers in hyperparams['num_layers']:
            for dropout_rate in hyperparams['dropout_rate']:
                for lr in hyperparams['learning_rate']:
                    for window_size in hyperparams['window_size']:
                        for step_size in hyperparams['step_size']:
                            for batch_size in hyperparams['batch_size']:
                                print(f"Testing params: hidden_size={hidden_size}, num_layers={num_layers}, "
                                      f"dropout={dropout_rate}, lr={lr}, window_size={window_size}, step_size={step_size}, batch_size={batch_size}")
                                
                                # Extract features with the current window and step size
                                X, y = extract_sliding_windows(eeg_data, selected_electrodes, window_size, step_size)
                                if X.shape[0] == 0:
                                    continue  # Skip if no data was extracted
                                
                                n_electrodes = X.shape[1]
                                actual_window_size = X.shape[3]
                                input_size = n_electrodes * actual_window_size
                                print(f"Calculated input_size: {input_size}")

                                scores = []

                                for train_idx, val_idx in skf.split(X, y):
                                    X_train, X_val = X[train_idx], X[val_idx]
                                    y_train, y_val = y[train_idx], y[val_idx]

                                    train_dataset = torch.utils.data.TensorDataset(torch.tensor(X_train, dtype=torch.float32),
                                                                                   torch.tensor(y_train, dtype=torch.long))
                                    val_dataset = torch.utils.data.TensorDataset(torch.tensor(X_val, dtype=torch.float32),
                                                                                 torch.tensor(y_val, dtype=torch.long))
                                    train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
                                    val_loader = torch.utils.data.DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

                                    model = EEG_LSTM(input_size=input_size, hidden_size=hidden_size, 
                                                     num_layers=num_layers, output_size=2, dropout_rate=dropout_rate).to(device)

                                    criterion = nn.CrossEntropyLoss()
                                    optimizer = optim.Adam(model.parameters(), lr=lr)

                                    train_model(model, train_loader, criterion, optimizer, device)
                                    accuracy = evaluate_model(model, val_loader, device)
                                    scores.append(accuracy)

                                avg_score = np.mean(scores)
                                if avg_score > best_score:
                                    best_score = avg_score
                                    best_params = {
                                        'hidden_size': hidden_size,
                                        'num_layers': num_layers,
                                        'dropout_rate': dropout_rate,
                                        'learning_rate': lr,
                                        'window_size': window_size,
                                        'step_size': step_size,
                                        'batch_size': batch_size
                                    }
                                print(f"Params: {best_params}, Score: {avg_score:.4f}")
    
    print(f"Best Hyperparameters: {best_params}, Best Score: {best_score:.4f}")
    return best_params

In [32]:
hyperparams = {
    'hidden_size': [32, 64, 128],         # Broader range of hidden sizes
    'num_layers': [1, 2],                 # Keep it to 1 or 2 layers to avoid overfitting
    'dropout_rate': [0.0, 0.3, 0.5],      # Include 0.0 (no dropout) for smaller models
    'learning_rate': [0.001, 0.0005, 0.0001],  # Added intermediate learning rate
    'window_size': [32, 64, 96, 128],         # Smaller window sizes to increase the number of samples
    'step_size': [8, 16, 32, 48],            # Smaller step sizes for more overlap
    'batch_size': [16, 32, 64]            # Try different batch sizes for better convergence
}

selected_electrodes = [
    'O1', 'O2', 'Oz', 'POz', 'PO3', 'PO4', 'PO7', 'PO8',
    'F3', 'F4', 'Fz', 'F7', 'F8', 'AF3', 'AF4', 'AF7', 'AF8', 'Fpz',
    'T7', 'T8', 'Cz', 'Pz', 'FCz', 'CP1', 'CP2'
]

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_params = cross_validate(eeg_data, selected_electrodes, hyperparams, device=device)
print(f"Best Hyperparameters found: {best_params}")

Testing params: hidden_size=32, num_layers=1, dropout=0.0, lr=0.001, window_size=32, step_size=8, batch_size=16
Calculated input_size: 800
Params: {'hidden_size': 32, 'num_layers': 1, 'dropout_rate': 0.0, 'learning_rate': 0.001, 'window_size': 32, 'step_size': 8, 'batch_size': 16}, Score: 0.4980
Testing params: hidden_size=32, num_layers=1, dropout=0.0, lr=0.001, window_size=32, step_size=8, batch_size=32
Calculated input_size: 800
Params: {'hidden_size': 32, 'num_layers': 1, 'dropout_rate': 0.0, 'learning_rate': 0.001, 'window_size': 32, 'step_size': 8, 'batch_size': 32}, Score: 0.4987
Testing params: hidden_size=32, num_layers=1, dropout=0.0, lr=0.001, window_size=32, step_size=8, batch_size=64
Calculated input_size: 800
Params: {'hidden_size': 32, 'num_layers': 1, 'dropout_rate': 0.0, 'learning_rate': 0.001, 'window_size': 32, 'step_size': 8, 'batch_size': 32}, Score: 0.4925
Testing params: hidden_size=32, num_layers=1, dropout=0.0, lr=0.001, window_size=32, step_size=16, batch_size

KeyboardInterrupt: 