In [1]:
import os
import numpy as np
import mne

### Loading and Preprocessing the PhysioNet Motor Imagery Dataset
1. Set the dataset path - where the edf files are stored (edf files are the data eeg)
2. Select run - we use motor imagery runs (left hand imagery vs right hand imagery)
3. Load Edf files with mne - reads eeg signlas for each subject and run
4. Extract events - they will be out labels for classification
5. Epoching - segment the continuous eeg into short trials
6. Stroe trials and labels

### This is when both feet and both fists are moved

In [2]:

dataset_root = "/media/sparsh/CaptainSlow/Programming Stuff/Programming Stuff/Code _n_ Stuff/All Projects/All Projects/PhysioNet EEG Motor Movement/eeg_motor_movement_dataset"


X_all = []
y_all = []
expected_time_len = None
runs_to_use = ['R04', 'R08', 'R12']  # Runs: motor imagery (hands vs feet)

for subject_folder in sorted(os.listdir(dataset_root)):
    subject_path = os.path.join(dataset_root, subject_folder)
    if not os.path.isdir(subject_path):
        continue

    for run in runs_to_use:
        r_file = os.path.join(subject_path, f"{subject_folder}{run}.edf")
        if not os.path.exists(r_file):
            print(f"File not found: {r_file}")
            continue

        try:
            raw = mne.io.read_raw_edf(r_file, preload=True, verbose=False)
            # raw.filter(7., 30., fir_design='firwin', verbose=False)
            # raw.set_eeg_reference('average', verbose=False)
            # ica = mne.preprocessing.ICA(verbose=False, n_components=20, max_iter = 'auto')
            # ica.fit(raw)



            events, event_id = mne.events_from_annotations(raw, verbose=False)

            if 'T1' not in event_id or 'T2' not in event_id:
                print(f"⚠️ Skipping {r_file}, missing T1/T2 events")
                continue

            selected_events = {'left': event_id['T1'], 'right': event_id['T2']}
            epochs = mne.Epochs(raw, events,event_id=selected_events, tmin=0.5, tmax=2.5,baseline=None,preload=True,verbose=False)

            X = epochs.get_data()  # shape: (n_trials, n_channels, n_times)
            y = epochs.events[:, -1] - min(epochs.events[:, -1])  # 0 for left, 1 for right

            print(f"{subject_folder}-{run}: epochs {X.shape}, labels {np.bincount(y)}")

            if expected_time_len is None:
                expected_time_len = X.shape[2]
            if X.shape[2] != expected_time_len:
                print(f"Skipping {subject_folder}-{run}, time length mismatch")
                continue

            X_all.append(X)
            y_all.append(y)

        except Exception as e:
            print(f"Error loading {r_file}: {e}")


if len(X_all) == 0:
    raise ValueError("No data loaded. Check dataset path or preprocessing.")

X_all = np.vstack(X_all)
y_all = np.hstack(y_all)

print("Data Loaded Successfully!")
print("Data shape:", X_all.shape)
print("Labels distribution:", np.bincount(y_all))

S001-R04: epochs (15, 64, 321), labels [8 7]
S001-R08: epochs (15, 64, 321), labels [8 7]
S001-R12: epochs (15, 64, 321), labels [7 8]
S002-R04: epochs (15, 64, 321), labels [7 8]
S002-R08: epochs (15, 64, 321), labels [8 7]
S002-R12: epochs (15, 64, 321), labels [8 7]
S003-R04: epochs (15, 64, 321), labels [8 7]
S003-R08: epochs (15, 64, 321), labels [7 8]
S003-R12: epochs (15, 64, 321), labels [8 7]
S004-R04: epochs (15, 64, 321), labels [8 7]
S004-R08: epochs (15, 64, 321), labels [7 8]
S004-R12: epochs (15, 64, 321), labels [8 7]
S005-R04: epochs (15, 64, 321), labels [7 8]
S005-R08: epochs (15, 64, 321), labels [7 8]
S005-R12: epochs (15, 64, 321), labels [7 8]
S006-R04: epochs (15, 64, 321), labels [8 7]
S006-R08: epochs (15, 64, 321), labels [8 7]
S006-R12: epochs (15, 64, 321), labels [8 7]
S007-R04: epochs (15, 64, 321), labels [8 7]
S007-R08: epochs (15, 64, 321), labels [8 7]
S007-R12: epochs (15, 64, 321), labels [7 8]
S008-R04: epochs (15, 64, 321), labels [7 8]
S008-R08: 

  raw = mne.io.read_raw_edf(r_file, preload=True, verbose=False)
  raw = mne.io.read_raw_edf(r_file, preload=True, verbose=False)
  raw = mne.io.read_raw_edf(r_file, preload=True, verbose=False)


S101-R12: epochs (15, 64, 321), labels [7 8]
S102-R04: epochs (15, 64, 321), labels [7 8]
S102-R08: epochs (15, 64, 321), labels [7 8]
S102-R12: epochs (15, 64, 321), labels [8 7]
S103-R04: epochs (15, 64, 321), labels [7 8]
S103-R08: epochs (15, 64, 321), labels [8 7]
S103-R12: epochs (15, 64, 321), labels [7 8]
S104-R04: epochs (15, 64, 321), labels [7 8]
S104-R08: epochs (13, 64, 321), labels [7 6]
S104-R12: epochs (15, 64, 321), labels [8 7]
S105-R04: epochs (15, 64, 321), labels [8 7]
S105-R08: epochs (15, 64, 321), labels [7 8]
S105-R12: epochs (15, 64, 321), labels [8 7]
S106-R04: epochs (15, 64, 321), labels [8 7]
S106-R08: epochs (15, 64, 321), labels [8 7]
S106-R12: epochs (15, 64, 321), labels [8 7]
S107-R04: epochs (15, 64, 321), labels [8 7]
S107-R08: epochs (15, 64, 321), labels [8 7]
S107-R12: epochs (15, 64, 321), labels [8 7]
S108-R04: epochs (15, 64, 321), labels [7 8]
S108-R08: epochs (15, 64, 321), labels [7 8]
S108-R12: epochs (15, 64, 321), labels [8 7]
S109-R04: 

### Labels distribution = [2406 2362]

- 2406 - Label 0 (both fists are moved)
- 2362 - Label 1 (both feet are moved)

### Standardization EEG Data

Raw EEG signals can have very different ranges across channels.
To make learning stable, we *normalize* each channel by subtracting its mean and dividing by its standard deviation.

This ensures:
- All channels have values centered around *0*
- Variations are on a comparable *scale*
- The neural network can train more effectively

In [3]:
mean = X_all.mean(axis = (0,2), keepdims = True)
std = X_all.std(axis = (0,2), keepdims = True)
X_all = (X_all - mean) / std

### Importing Deep learning dependencies and evaluations metrics

- PyTorch
- Scikit-Learn

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset, random_split
from sklearn.utils.class_weight import compute_class_weight
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.ensemble import RandomForestClassifier

### Data loading and splitting

In [6]:
# Step 3: Train-test split and DataLoader
import torch
from torch.utils.data import TensorDataset, DataLoader, random_split
import numpy as np

# Convert to PyTorch tensors
X_tensor = torch.tensor(X_all, dtype=torch.float32)
y_tensor = torch.tensor(y_all, dtype=torch.long)

dataset = TensorDataset(X_tensor, y_tensor)


### Defining the model

CNN + Random Forest