# EEG Motor Movement/Imagery Classification Using Random Forest and Convolutional Neural Networks

### Imports

In [None]:
import os
import numpy as np
import mne
import pandas as pd
from dotenv import load_dotenv
from pathlib import Path
from scipy.signal import detrend

### Getting .env variables

In [None]:
load_dotenv()

INPUT_DIR = os.getenv("INPUT_DIR", "./data/raw")
OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./result")

SFREQ = int(os.getenv("SFREQ", 160))
WINDOW_SEC = float(os.getenv("WINDOW_SEC", 2))
OVERLAP = float(os.getenv("OVERLAP", 0.5))

DEBUG = os.getenv("DEBUG", "1").strip().lower() in {"1", "true", "yes", "y"}
# DEBUG = False
print(f"DEBUGGING is {'ON' if DEBUG else 'OFF'}")

In [None]:
window_samples = int(SFREQ * WINDOW_SEC)
print(f"Window samples: {window_samples}")

records_file = os.path.join(INPUT_DIR, "RECORDS")
print(f"Records file: {records_file}")

## 1. Data Preparation

This data set consists of over 1500 one- and two-minute EEG recordings, obtained from 109 volunteers, as described below.

Subjects performed different motor/imagery tasks while 64-channel EEG were recorded using the BCI2000 system (http://www.bci2000.org). 

The experimental runs were:
1. Baseline, eyes open
2. Baseline, eyes closed
3. Task 1 (open and close left or right fist)
4. Task 2 (imagine opening and closing left or right fist)
5. Task 3 (open and close both fists or both feet)
6. Task 4 (imagine opening and closing both fists or both feet)
7. Task 1
8. Task 2
9. Task 3
10. Task 4
11. Task 1
12. Task 2
13. Task 3
14. Task 4

Each annotation includes one of three codes (**T0**, **T1**, or **T2**):
- **T0** corresponds to rest
- **T1** corresponds to onset of motion (real or imagined) of
    the left fist (in runs 3, 4, 7, 8, 11, and 12)
    both fists (in runs 5, 6, 9, 10, 13, and 14)
- **T2** corresponds to onset of motion (real or imagined) of
    the right fist (in runs 3, 4, 7, 8, 11, and 12)
    both feet (in runs 5, 6, 9, 10, 13, and 14)

### 1.1. Loading RECORDS file and verifying EDF files' existence

In [None]:
with open(records_file, "r") as f:
    records = [line.strip() for line in f if line.strip()]

print(f"Number of RECORDS entries: {len(records)}")
for r in records[:4]:
    print(" ", r)
print("  ...\n ",records[-1])

edf_paths = []
missing = []

for rel in records:
    p = os.path.join(INPUT_DIR, rel)
    if os.path.exists(p):
        edf_paths.append(p)
    else:
        missing.append(p)

print(f"\nResolved EDF files: {len(edf_paths)}")
print(f"Missing EDF files: {len(missing)}")

if missing:
    print("Example missing path:", missing[0])

### 1.2. Testing [first] EDF file loading and preprocessing

In [None]:
if DEBUG:
    test_edf = edf_paths[0]
    print("Testing EDF:", test_edf)

    raw = mne.io.read_raw_edf(test_edf, preload=False, verbose=False)

    print("\n--- EDF INFO ---")
    print("Channels:", len(raw.ch_names))
    print("Sampling freq:", raw.info["sfreq"])
    print("Duration (sec):", raw.times[-1])
    print("First 10 channels:", raw.ch_names[:10])

    assert len(raw.ch_names) >= 64, "Expected ~64 EEG channels"
    assert abs(raw.info["sfreq"] - SFREQ) < 1e-3, "Sampling frequency mismatch"

### 1.3. Preprocessing All EDF Files and Saving Processed Data

In [None]:
def load_and_summarize(edf_path: Path):
    raw = mne.io.read_raw_edf(edf_path, preload=False, verbose=False)
    descs = sorted(map(str, set(raw.annotations.description))) if raw.annotations is not None else []
    print("\n===", edf_path.name, "===")
    print("sfreq:", raw.info["sfreq"], "| duration:", raw.times[-1], "sec | ch:", len(raw.ch_names))
    print("Annotations count:", len(raw.annotations))
    print("Unique descriptions:", descs)

    events, event_id = mne.events_from_annotations(raw, verbose=False)
    event_id = {str(k): v for k, v in event_id.items()}
    print("event_id:", event_id)
    print("n_events:", len(events))
    print("first events:", events[:10])
    return raw, events, event_id

if DEBUG:
    raw, events, event_id = load_and_summarize(Path(test_edf))
else:
    for edf in edf_paths:
        raw, events, event_id = load_and_summarize(Path(edf))

### 1.4. Imaging vs. Execution Labeling

In [None]:
RUN_MOTOR_MOVE = {3, 5, 7, 9, 11, 13}
RUN_MOTOR_IMAG = {4, 6, 8, 10, 12, 14}

def run_id_from_filename(fname: str):
    # "S001R03.edf" -> 3
    return int(fname.split("R")[1].split(".")[0])

def get_exec_imag_label(run_id: int):
    # 0 = executed, 1 = imagined
    if run_id in RUN_MOTOR_MOVE:
        return 0
    if run_id in RUN_MOTOR_IMAG:
        return 1
    return -1

### 1.5. Window Segmentation and Normalization

In [None]:
def segment_windows(data: np.ndarray, window_samples: int, overlap: float):
    step = int(window_samples * (1 - overlap))
    windows = []
    for start in range(0, data.shape[1] - window_samples + 1, step):
        windows.append(data[:, start:start + window_samples])
    return np.asarray(windows)

def normalize_windows(windows: np.ndarray):
    mean = windows.mean(axis=2, keepdims=True)
    std = windows.std(axis=2, keepdims=True) + 1e-8
    return (windows - mean) / std

def extract_rf_features(windows: np.ndarray):
    means = windows.mean(axis=2)
    stds = windows.std(axis=2)
    rms = np.sqrt((windows**2).mean(axis=2))
    energy = (windows**2).sum(axis=2)
    return np.concatenate([means, stds, rms, energy], axis=1)

def pick_motor_intervals_from_annotations(raw):
    intervals = []
    if raw.annotations is None or len(raw.annotations) == 0:
        return intervals

    for onset, duration, desc in zip(raw.annotations.onset, raw.annotations.duration, raw.annotations.description):
        d = str(desc)
        if d in {"T1", "T2"} and float(duration) > 0:
            intervals.append((float(onset), float(onset + duration), d))
    return intervals

def windows_from_intervals(data: np.ndarray, sfreq: float, intervals, window_samples: int, overlap: float):
    step = int(window_samples * (1 - overlap))
    all_windows = []
    meta = []

    for (t0, t1, desc) in intervals:
        start = int(round(t0 * sfreq))
        end = int(round(t1 * sfreq))

        if end - start < window_samples:
            continue

        for s in range(start, end - window_samples + 1, step):
            all_windows.append(data[:, s:s + window_samples])
            meta.append({"desc": desc, "start_sample": s, "end_sample": s + window_samples})

    if not all_windows:
        return np.empty((0, data.shape[0], window_samples), dtype=np.float32), []

    return np.asarray(all_windows, dtype=np.float32), meta

### 1.6. Segment recordings into fixed-length time windows, normalize signals, and select hand-related motor execution and imagery trials.

In [None]:
X_cnn_list, y_list, X_rf_list, meta_rows = [], [], [], []

n_channels_ref = None

for edf_path in edf_paths:
    edf_path = Path(edf_path)
    run_id = run_id_from_filename(edf_path.name)
    label = get_exec_imag_label(run_id)
    if label == -1:
        continue

    raw = mne.io.read_raw_edf(edf_path, preload=True, verbose=False)
    raw.pick("eeg")

    if abs(raw.info["sfreq"] - SFREQ) > 1e-3:
        raw.resample(SFREQ)

    data = raw.get_data()

    if n_channels_ref is None:
        n_channels_ref = data.shape[0]

    if data.shape[0] != n_channels_ref:
        if DEBUG:
            print(f"Skipping {edf_path.name}: channels {data.shape[0]} != {n_channels_ref}")
        continue

    intervals = pick_motor_intervals_from_annotations(raw)  # T1/T2 only
    if not intervals:
        continue

    windows, wmeta = windows_from_intervals(
        data=data,
        sfreq=raw.info["sfreq"],
        intervals=intervals,
        window_samples=window_samples,
        overlap=OVERLAP,
    )
    if windows.shape[0] == 0:
        continue

    windows = normalize_windows(windows).astype(np.float32)
    feats = extract_rf_features(windows).astype(np.float32)

    X_cnn_list.append(windows)
    y_list.append(np.full((windows.shape[0],), label, dtype=np.int64))
    X_rf_list.append(feats)

    subject = edf_path.parent.name
    for md in wmeta:
        meta_rows.append({
            "subject": subject,
            "run": run_id,
            "exec_imag": int(label),
            "trial_type": md["desc"], # T1/T2
            "start_sample": md["start_sample"],
            "end_sample": md["end_sample"],
            "edf": str(edf_path),
        })

    if DEBUG:
        print(f"{edf_path.name}: intervals={len(intervals)} windows={windows.shape[0]} label={label}")

# stack
if X_cnn_list:
    X_cnn = np.vstack(X_cnn_list)
    y = np.concatenate(y_list)
    X_rf = np.vstack(X_rf_list)
else:
    X_cnn = np.empty((0, n_channels_ref or 64, window_samples), dtype=np.float32)
    y = np.empty((0,), dtype=np.int64)
    X_rf = np.empty((0, (n_channels_ref or 64) * 4), dtype=np.float32)

meta = pd.DataFrame(meta_rows)

print("X_cnn:", X_cnn.shape)
print("y:", y.shape, "counts:", np.bincount(y) if y.size else "empty")
print("X_rf:", X_rf.shape)
print("meta:", meta.shape)

# save
np.save(os.path.join(OUTPUT_DIR, "X_cnn.npy"), X_cnn)
np.save(os.path.join(OUTPUT_DIR, "y.npy"), y)
np.save(os.path.join(OUTPUT_DIR, "X_rf.npy"), X_rf)
meta.to_csv(os.path.join(OUTPUT_DIR, "meta.csv"), index=False)

print("Saved to:", OUTPUT_DIR)