In [35]:
from pathlib import Path
import re, numpy as np, pandas as pd, mne
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis as LDA
from sklearn.metrics import accuracy_score, cohen_kappa_score, f1_score, confusion_matrix, classification_report
from sklearn.model_selection import GroupKFold

import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)
mne.set_log_level("ERROR")

In [36]:
from src.feature_extraction import epochs_to_bandpower

In [37]:
import sys, importlib.util, os
parentPath = Path.cwd().parent
sys.path.insert(0, str(parentPath))  

In [None]:
DATA = Path(f"{parentPath}/data")
DER  = DATA / "derivatives"  
FIGS = Path(f"{parentPath}/figs"); FIGS.mkdir(parents=True, exist_ok=True)

In [39]:
paths = sorted(DER.glob("*-aligned128-epo.fif")) or sorted(DER.glob("*-epo.fif"))
paths = [p for p in paths if re.search(r"R(03|04|07|08|11|12)", p.name)] # only these subjects have both L and R hand MI
# paths = [p for p in paths if re.search(r"R(04|08|12)", p.name)]  # L/R imagery only
# paths = [p for p in paths if re.search(r"R(06|10|14)", p.name)]  # both imagery only

print("Files:", len(paths))


Files: 654


In [40]:
X_all, y_all, runs, files_used = [], [], [], []
for p in paths:
    ep = mne.read_epochs(p, preload=True, verbose=False)
    if len(ep) == 0:
        # print(f"Skip (empty): {p.name}")
        continue
    X, y, chs, bns = epochs_to_bandpower(ep)
    # group by run number so CV doesn't mix epochs from same run across folds
    m = re.search(r"R(\d{2})", p.name); run = int(m.group(1)) if m else -1
    X_all.append(X); y_all.append(y); runs.extend([run]*len(y)); files_used.append(p.name)
    
X = np.vstack(X_all); y = np.concatenate(y_all); runs = np.array(runs)
print(
    f"X shape: {X.shape}\n"
    f"Y shape: {y.shape}\n"
    f"Runs shape: {runs.shape}\n"
    f"Unique runs: {np.unique(runs)}\n"
    f"Unique labels: {np.unique(y)}"
)

groups = runs

X shape: (2743, 128)
Y shape: (2743,)
Runs shape: (2743,)
Unique runs: [ 3  4  7  8 11 12]
Unique labels: [1 2]


In [41]:
logreg = Pipeline([
    ("scaler", StandardScaler()),
    ("clf", LogisticRegression(max_iter=2000, n_jobs=None, class_weight="balanced"))
])

lda = Pipeline([
    ("scaler", StandardScaler(with_mean=True, with_std=True)),  # ok with LDA
    ("clf", LDA())
])

models = {"logreg": logreg, "lda": lda}


In [42]:
def eval_group_kfold(model, X, y, groups, n_splits=5):
    gkf = GroupKFold(n_splits=n_splits)
    mets = []
    cms  = []
    for fold, (tr, te) in enumerate(gkf.split(X, y, groups)):
        model.fit(X[tr], y[tr])
        yhat = model.predict(X[te])
        mets.append({
            "fold": fold,
            "acc": accuracy_score(y[te], yhat),
            "kappa": cohen_kappa_score(y[te], yhat),
            "f1_macro": f1_score(y[te], yhat, average="macro")
        })
        cms.append(confusion_matrix(y[te], yhat))
    df = pd.DataFrame(mets)
    return df, cms

rows = []
for name, mdl in models.items():
    df, cms = eval_group_kfold(mdl, X, y, groups, n_splits=5)
    print(name, df.mean(numeric_only=True).round(3))
    rows.append(dict(model=name, **{k: float(v) for k,v in df.mean(numeric_only=True).items()}))


logreg fold        2.000
acc         0.702
kappa       0.403
f1_macro    0.701
dtype: float64
lda fold        2.000
acc         0.698
kappa       0.396
f1_macro    0.697
dtype: float64


In [43]:
# Subject groups
subs = []
for p in paths:
    m = re.search(r"S(\d{3})", p.name); subj = int(m.group(1)) if m else -1
    ep = mne.read_epochs(p, preload=False, verbose=False)
    subs.extend([subj]*len(ep))
subs = np.array(subs)

df_subj, _ = eval_group_kfold(models["logreg"], X, y, subs, n_splits=len(np.unique(subs)))
print("LOSO(logreg):", df_subj.mean(numeric_only=True).round(3))
rows.append(dict(model="logreg_LOSO", **{k: float(v) for k,v in df_subj.mean(numeric_only=True).items()}))




LOSO(logreg): fold        34.500
acc          0.681
kappa        0.329
f1_macro     0.655
dtype: float64




In [44]:
rows

[{'model': 'logreg',
  'fold': 2.0,
  'acc': 0.701609674012242,
  'kappa': 0.40308423348097167,
  'f1_macro': 0.7012327028645082},
 {'model': 'lda',
  'fold': 2.0,
  'acc': 0.6978977637575372,
  'kappa': 0.395694612883607,
  'f1_macro': 0.697490808990495},
 {'model': 'logreg_LOSO',
  'fold': 34.5,
  'acc': 0.681355249504913,
  'kappa': 0.3286870804725235,
  'f1_macro': 0.6548718822518477}]

In [46]:
import matplotlib.pyplot as plt
from sklearn.metrics import ConfusionMatrixDisplay

best = max(rows, key=lambda r: r["kappa"])
print("Best:", best)

# Refit on all data to plot CM using a simple random 20% hold-out, just for a figure
from sklearn.model_selection import StratifiedShuffleSplit
sss = StratifiedShuffleSplit(n_splits=1, test_size=0.2, random_state=42)
(tr, te), = sss.split(X, y)
model = models[best["model"].split("_")[0]]
model.fit(X[tr], y[tr]); yhat = model.predict(X[te])
disp = ConfusionMatrixDisplay.from_predictions(y[te], yhat)
plt.title(f"CM — {best['model']}  acc={accuracy_score(y[te],yhat):.2f}  κ={cohen_kappa_score(y[te],yhat):.2f}")
plt.tight_layout()
plt.savefig(FIGS / "cm_day3.png", dpi=150)
plt.close()


Best: {'model': 'logreg', 'fold': 2.0, 'acc': 0.701609674012242, 'kappa': 0.40308423348097167, 'f1_macro': 0.7012327028645082}
