In [2]:
# Libraries used
import scipy.io as sio
import pandas as pd
import os
import matplotlib.pyplot as plt

First load data:

In [3]:
def load_group(file_name, group_label, gender):
    # Load .mat files
    data = sio.loadmat(os.path.join('EEG', file_name + '.mat'))
    # Get only the last column where all the information that we need is
    key = list(data.keys())[-1]
    group_data = data[key]

    rows = []
    # Loop through each cell (tasks)
    for task_idx in range(group_data.shape[1]):
        task_data = group_data[0, task_idx]
        n_subjects, n_samples, n_channels = task_data.shape

        for subj_idx in range(n_subjects):
            # Handle corrupted sample (skip female ADHD subject 7)
            if file_name == "FADHD" and subj_idx == 6:
                continue
            
            # This will make it easier to know later to which signal we are referring
            signal = task_data[subj_idx, :, :]  # [samples x channels]
            rows.append({
                "group": group_label,    # 'Control' or 'ADHD'
                "gender": gender,        # 'Male' or 'Female'
                "task": task_idx + 1,    # 1 to 11
                "subject_id": f"{file_name}_{subj_idx+1}",
                "signal": signal
            })
    return pd.DataFrame(rows)

# Load all four groups
fc_df = load_group("FC", group_label="Control", gender="Female")
mc_df = load_group("MC", group_label="Control", gender="Male")
fadhd_df = load_group("FADHD", group_label="ADHD", gender="Female")
madhd_df = load_group("MADHD", group_label="ADHD", gender="Male")

# Combine
eeg_df = pd.concat([fc_df, mc_df, fadhd_df, madhd_df], ignore_index=True)

# Preprocessing 1:

In [None]:
import numpy as np
from scipy.signal import butter, filtfilt, iirnotch, welch

SFREQ = 256
LOW, HIGH = 0.5, 45.0
LINE = 50.0  # set to 60.0 if you detect 60 Hz line noise

def bandpass(signal, low=LOW, high=HIGH, fs=SFREQ, order=4):
    # signal: (samples, channels)
    b, a = butter(order, [low/(fs/2), high/(fs/2)], btype='band')
    return filtfilt(b, a, signal, axis=0)

def notch_filter(signal, f0=LINE, Q=30, fs=SFREQ):
    # Apply a single-frequency notch
    b, a = iirnotch(w0=f0/(fs/2), Q=Q)
    return filtfilt(b, a, signal, axis=0)

def hampel_1d(x, k=21, t=3.0):
    # simple Hampel filter for impulsive spikes
    x = x.copy()
    med = np.median(x)
    MAD = np.median(np.abs(x - med))
    if MAD == 0: return x
    thresh = t * 1.4826 * MAD
    for i in range(k, len(x)-k):
        window = x[i-k:i+k+1]
        m = np.median(window); mad = np.median(np.abs(window - m))
        if mad == 0: 
            continue
        if np.abs(x[i] - m) > t * 1.4826 * mad:
            x[i] = m
    return x

def clean_spikes(signal):
    out = signal.copy()
    for ch in range(out.shape[1]):
        out[:, ch] = hampel_1d(out[:, ch], k=21, t=3.0)
    return out

def preprocess_signal(raw, do_notch=True):
    # raw: (samples, channels)
    x = raw - np.mean(raw, axis=0, keepdims=True)         # detrend (DC)
    x = bandpass(x)                                       # 0.5–45 Hz
    if do_notch:
        x = notch_filter(x)                               # 50/60 Hz
    x = clean_spikes(x)                                   # optional
    return x

def make_bipolar(x):
    # returns original + derived bipolar channel
    # x: (samples, 2)
    bipolar = (x[:, [0]] - x[:, [1]])
    return np.hstack([x, bipolar])  # shape: (samples, 3)

Duration alignment / sliding windows

In [None]:
def fixed_first_seconds(signal, seconds=20, fs=SFREQ):
    n = min(signal.shape[0], seconds*fs)
    return signal[:n]

def sliding_windows(signal, win_sec=5, step_sec=2.5, fs=SFREQ):
    win = int(win_sec*fs); step = int(step_sec*fs)
    starts = np.arange(0, max(1, signal.shape[0]-win+1), step)
    return [signal[s:s+win] for s in starts]

Window quality checks

In [None]:
def window_ok(win, amp_thresh=200.0, var_min=1e-6):
    if np.any(np.abs(win) > amp_thresh): 
        return False
    if np.any(np.var(win, axis=0) < var_min):
        return False
    return True

def filter_bad_windows(wins):
    good = [w for w in wins if window_ok(w)]
    return good

End-to-end on your DataFrame

In [None]:
# choose strategy
USE_SLIDING = True

def preprocess_row(row):
    sig = row["signal"]
    sig = preprocess_signal(sig, do_notch=True)
    # optional derived bipolar
    sig3 = make_bipolar(sig)  # (N,3) -> ch0, ch1, bipolar
    if USE_SLIDING:
        wins = sliding_windows(sig3, win_sec=5, step_sec=2.5)
        wins = filter_bad_windows(wins)
        return wins   # list of windows
    else:
        fx = fixed_first_seconds(sig3, seconds=20)
        return [fx] if window_ok(fx) else []

eeg_df["windows"] = eeg_df.apply(preprocess_row, axis=1)
# explode to one row per window if you want:
exploded = eeg_df.explode("windows").dropna(subset=["windows"]).reset_index(drop=True)

Standardization without leakage

In [None]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import GroupKFold

# Build X as features from windows; to standardize raw samples, flatten first
# but typically you’ll standardize features after extraction.
# Here’s how you’d do it for features later:

# Example groups = subject_id to ensure subject-wise CV
groups = exploded["subject_id"].values
gkf = GroupKFold(n_splits=5)
for train_idx, test_idx in gkf.split(exploded, exploded["group"], groups):
    train_df = exploded.iloc[train_idx]
    test_df  = exploded.iloc[test_idx]
    # ... extract features from train_df["windows"], fit scaler on train features,
    # transform both train and test features, then train your model.

Parameter tips (sane defaults)

- Band-pass: 0.5–45 Hz (order 4–6 Butterworth); if you care about gamma, extend to ~70 Hz and be stricter about EMG.
- Notch: 50 Hz or 60 Hz; inspect PSD (welch) to choose. If you see harmonics, notch them too.
- Spike threshold: ±150–200 µV is a common heuristic for adult scalp EEG.
- Windows: 5 s with 50% overlap is a good trade-off (enough cycles in alpha/theta for stable PSD and entropy).
- Downsample: to 128 Hz after filtering if performance matters; adjust band edges accordingly.

# Preprocessing 2:

In [4]:
import numpy as np
from scipy.signal import butter, filtfilt, iirnotch

SFREQ = 256
BP_LOW, BP_HIGH = 1.0, 40.0
LINE = 50.0

def bandpass_simple(x, low=BP_LOW, high=BP_HIGH, fs=SFREQ, order=4):
    b, a = butter(order, [low/(fs/2), high/(fs/2)], btype='band')
    return filtfilt(b, a, x, axis=0)

def notch_simple(x, f0=LINE, Q=30, fs=SFREQ, use_notch=False):
    if not use_notch:
        return x
    b, a = iirnotch(f0/(fs/2), Q)
    return filtfilt(b, a, x, axis=0)

def first_n_seconds(x, seconds=20, fs=SFREQ):
    n = min(x.shape[0], int(seconds*fs))
    return x[:n]

def artifact_ok(x, amp_thresh=200.0, var_min=1e-8):
    if np.any(np.abs(x) > amp_thresh):
        return False
    if np.any(np.var(x, axis=0) < var_min):
        return False
    return True

def preprocess_minimal(raw, use_notch=False):
    # raw: (samples, 2)
    x = raw - np.mean(raw, axis=0, keepdims=True)      # detrend
    x = bandpass_simple(x)                             # 1–40 Hz
    x = notch_simple(x, use_notch=use_notch)           # optional 50 Hz
    x = first_n_seconds(x, seconds=20)                 # align duration
    return x if artifact_ok(x) else None               # reject if bad


In [5]:
eeg_df["clean20s"] = eeg_df["signal"].apply(lambda s: preprocess_minimal(s, use_notch=False))
clean_df = eeg_df.dropna(subset=["clean20s"]).reset_index(drop=True)

# Quick QC: how many recordings survived
n_all = len(eeg_df)
n_ok = len(clean_df)
print(f"Kept {n_ok}/{n_all} recordings ({100*n_ok/n_all:.1f}%).")

Kept 835/869 recordings (96.1%).
