1. Common Spatial Patterns (CSP) feature extraction - basically learns where on the scalp to "look" for each movement type -> able to maximize variance ratio between classes
2. LDA classifier (baseline)
3. SVM classifier (improved)
4. Within-subject and cross-subject evaluation
5. Visualization of results

In [None]:
import sys
from pathlib import Path
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.discriminant_analysis import LinearDiscriminantAnalysis
from sklearn.svm import SVC
from sklearn.model_selection import StratifiedKFold, cross_val_score
from sklearn.metrics import accuracy_score, cohen_kappa_score, confusion_matrix, classification_report
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
import mne
from mne.decoding import CSP
from tqdm.notebook import tqdm
import warnings
warnings.filterwarnings('ignore')

# Add src to path
sys.path.insert(0, str(Path.cwd().parent / 'src'))

from features import CSPFeatures, compute_csp_features
from models import create_lda_pipeline, create_svm_pipeline, evaluate_classifier
from visualization import set_style, plot_confusion_matrix, plot_subject_comparison, CLASS_NAMES
from preprocessing import CHANNEL_NAMES

set_style()
np.random.seed(42)

PROCESSED_DIR = Path('../data/processed')
RESULTS_DIR = Path('../results')
FIGURES_DIR = Path('../figures')
RESULTS_DIR.mkdir(exist_ok=True)
FIGURES_DIR.mkdir(exist_ok=True)

In [None]:
# Load preprocessed data
data_path = PROCESSED_DIR / 'preprocessed_data.npz'

if not data_path.exists():
    raise FileNotFoundError("Run notebook 02_preprocessing.ipynb first!")

data = np.load(data_path, allow_pickle=True)

X_train = data['X_train']
y_train = data['y_train']
subjects_train = data['subjects_train']

X_test = data['X_test']
y_test = data['y_test']
subjects_test = data['subjects_test']

print(f"Training data: {X_train.shape}")
print(f"Test data: {X_test.shape}")
print(f"Classes: {np.unique(y_train)} -> {CLASS_NAMES}")
print(f"Subjects: {np.unique(subjects_train)}")

- Variance of W·X is max for class 1
- Variance of W·X is min for class 2

- Left hand imagery → decreased mu/beta power over RIGHT motor cortex (C4)
- Right hand imagery → decreased mu/beta power over LEFT motor cortex (C3)

In [None]:
# Demonstrate CSP on a single subject (binary: left vs right hand)
subj = 1
mask_train = (subjects_train == subj) & ((y_train == 0) | (y_train == 1))  # Left vs Right only
mask_test = (subjects_test == subj) & ((y_test == 0) | (y_test == 1))

X_subj_train = X_train[mask_train]
y_subj_train = y_train[mask_train]
X_subj_test = X_test[mask_test]
y_subj_test = y_test[mask_test]

print(f"Subject {subj} - Left vs Right hand:")
print(f"  Train: {X_subj_train.shape[0]} trials")
print(f"  Test: {X_subj_test.shape[0]} trials")

In [None]:
# Fit CSP
csp_binary = CSP(n_components=4, reg='ledoit_wolf', log=True, norm_trace=True)
csp_binary.fit(X_subj_train, y_subj_train)

# Transform data
X_train_csp = csp_binary.transform(X_subj_train)
X_test_csp = csp_binary.transform(X_subj_test)

print(f"Original shape: {X_subj_train.shape} -> CSP features: {X_train_csp.shape}")
print(f"Reduced from {22 * 875} = {22*875} features to {X_train_csp.shape[1]} features")

In [None]:
# Visualize CSP patterns (what the filters "look" for)
info = mne.create_info(ch_names=CHANNEL_NAMES, sfreq=250, ch_types='eeg')
montage = mne.channels.make_standard_montage('standard_1020')
info.set_montage(montage)

fig, axes = plt.subplots(1, 4, figsize=(14, 3))

for idx in range(4):
    pattern = csp_binary.patterns_[idx]
    mne.viz.plot_topomap(pattern, info, axes=axes[idx], show=False)
    axes[idx].set_title(f'CSP {idx+1}')

fig.suptitle('CSP Spatial Patterns (Left vs Right Hand)', y=1.05, fontsize=14)
plt.tight_layout();

print("\nInterpretation:")
print("- First 2 patterns: maximize variance for one class")
print("- Last 2 patterns: maximize variance for other class")
print("- Look for lateralization over motor cortex (C3/C4 area)")

In [None]:
# Visualize CSP features separation
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

# Scatter plot of first two CSP components
colors = ['blue', 'red']
labels = ['Left Hand', 'Right Hand']

for cls in [0, 1]:
    mask = y_subj_train == cls
    axes[0].scatter(X_train_csp[mask, 0], X_train_csp[mask, 1], 
                    c=colors[cls], label=labels[cls], alpha=0.6, s=40)

axes[0].set_xlabel('CSP Component 1 (log-var)')
axes[0].set_ylabel('CSP Component 2 (log-var)')
axes[0].set_title('CSP Feature Space (Training)')
axes[0].legend()

# Box plot of all components
csp_data = []
for comp in range(4):
    for cls in [0, 1]:
        mask = y_subj_train == cls
        for val in X_train_csp[mask, comp]:
            csp_data.append({'Component': f'CSP {comp+1}', 'Class': labels[cls], 'Value': val})

import pandas as pd
df_csp = pd.DataFrame(csp_data)
sns.boxplot(data=df_csp, x='Component', y='Value', hue='Class', ax=axes[1], palette=['blue', 'red'])
axes[1].set_title('CSP Feature Distribution by Class')

plt.tight_layout();

In [None]:
# Within subject classification (train and test on the same subject)
def evaluate_subject(subj_id, X_train, y_train, subjects_train, 
                     X_test, y_test, subjects_test, n_components=6):
    """
    Evaluate CSP+classifier on a single subject.
    
    Returns dict with results for LDA and SVM.
    """
    # Get subject data
    mask_train = subjects_train == subj_id
    mask_test = subjects_test == subj_id
    
    X_tr, y_tr = X_train[mask_train], y_train[mask_train]
    X_te, y_te = X_test[mask_test], y_test[mask_test]
    
    if len(X_tr) == 0 or len(X_te) == 0:
        return None
    
    # Fit CSP on training data
    csp = CSP(n_components=n_components, reg='ledoit_wolf', log=True, norm_trace=True)
    X_tr_csp = csp.fit_transform(X_tr, y_tr)
    X_te_csp = csp.transform(X_te)
    
    results = {'subject': subj_id, 'n_train': len(y_tr), 'n_test': len(y_te)}
    
    # LDA
    lda = LinearDiscriminantAnalysis(solver='lsqr', shrinkage='auto')
    lda.fit(X_tr_csp, y_tr)
    y_pred_lda = lda.predict(X_te_csp)
    results['lda_accuracy'] = accuracy_score(y_te, y_pred_lda)
    results['lda_kappa'] = cohen_kappa_score(y_te, y_pred_lda)
    results['lda_predictions'] = y_pred_lda
    
    # SVM
    svm = SVC(kernel='rbf', C=1.0, gamma='scale')
    svm.fit(X_tr_csp, y_tr)
    y_pred_svm = svm.predict(X_te_csp)
    results['svm_accuracy'] = accuracy_score(y_te, y_pred_svm)
    results['svm_kappa'] = cohen_kappa_score(y_te, y_pred_svm)
    results['svm_predictions'] = y_pred_svm
    
    results['y_true'] = y_te
    results['csp'] = csp
    
    return results

In [None]:
# Evaluate all subjects
N_COMPONENTS = 6  # Standard choice for 4-class CSP

all_results = []

print("Within-subject evaluation (4-class):")
print("=" * 60)
print(f"{'Subject':<10} {'LDA Acc':<12} {'LDA Kappa':<12} {'SVM Acc':<12} {'SVM Kappa':<12}")
print("-" * 60)

for subj in range(1, 10):
    results = evaluate_subject(
        subj, X_train, y_train, subjects_train,
        X_test, y_test, subjects_test,
        n_components=N_COMPONENTS
    )
    
    if results:
        all_results.append(results)
        print(f"{subj:<10} {results['lda_accuracy']:<12.2%} {results['lda_kappa']:<12.3f} "
              f"{results['svm_accuracy']:<12.2%} {results['svm_kappa']:<12.3f}")

print("-" * 60)

# Compute averages
avg_lda_acc = np.mean([r['lda_accuracy'] for r in all_results])
avg_lda_kappa = np.mean([r['lda_kappa'] for r in all_results])
avg_svm_acc = np.mean([r['svm_accuracy'] for r in all_results])
avg_svm_kappa = np.mean([r['svm_kappa'] for r in all_results])

print(f"{'Mean':<10} {avg_lda_acc:<12.2%} {avg_lda_kappa:<12.3f} "
      f"{avg_svm_acc:<12.2%} {avg_svm_kappa:<12.3f}")
print("=" * 60)
print(f"\nChance level: 25% (4 classes)")

In [None]:
# Visualize results by subject
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

subjects = [r['subject'] for r in all_results]
lda_accs = [r['lda_accuracy'] for r in all_results]
svm_accs = [r['svm_accuracy'] for r in all_results]

x = np.arange(len(subjects))
width = 0.35

# Accuracy comparison
bars1 = axes[0].bar(x - width/2, lda_accs, width, label='LDA', color='steelblue', edgecolor='black')
bars2 = axes[0].bar(x + width/2, svm_accs, width, label='SVM', color='coral', edgecolor='black')

axes[0].axhline(0.25, color='gray', linestyle=':', linewidth=2, label='Chance (25%)')
axes[0].axhline(avg_lda_acc, color='steelblue', linestyle='--', alpha=0.7)
axes[0].axhline(avg_svm_acc, color='coral', linestyle='--', alpha=0.7)

axes[0].set_xlabel('Subject')
axes[0].set_ylabel('Accuracy')
axes[0].set_title('Classification Accuracy by Subject')
axes[0].set_xticks(x)
axes[0].set_xticklabels([f'S{s}' for s in subjects])
axes[0].set_ylim(0, 1)
axes[0].legend()

# Kappa comparison
lda_kappas = [r['lda_kappa'] for r in all_results]
svm_kappas = [r['svm_kappa'] for r in all_results]

axes[1].bar(x - width/2, lda_kappas, width, label='LDA', color='steelblue', edgecolor='black')
axes[1].bar(x + width/2, svm_kappas, width, label='SVM', color='coral', edgecolor='black')

axes[1].axhline(0, color='gray', linestyle=':', linewidth=2)
axes[1].set_xlabel('Subject')
axes[1].set_ylabel('Cohen\'s Kappa')
axes[1].set_title('Cohen\'s Kappa by Subject')
axes[1].set_xticks(x)
axes[1].set_xticklabels([f'S{s}' for s in subjects])
axes[1].legend()

plt.tight_layout();
plt.savefig(FIGURES_DIR / 'classical_ml_subject_comparison.png', dpi=150, bbox_inches='tight')

In [None]:
# Aggregate confusion matrix (all subjects combined)
y_true_all = np.concatenate([r['y_true'] for r in all_results])
y_pred_lda_all = np.concatenate([r['lda_predictions'] for r in all_results])
y_pred_svm_all = np.concatenate([r['svm_predictions'] for r in all_results])

fig, axes = plt.subplots(1, 2, figsize=(12, 5))

for ax, y_pred, title in [(axes[0], y_pred_lda_all, 'LDA'), (axes[1], y_pred_svm_all, 'SVM')]:
    cm = confusion_matrix(y_true_all, y_pred, normalize='true')
    sns.heatmap(cm, annot=True, fmt='.2%', cmap='Blues', 
                xticklabels=CLASS_NAMES, yticklabels=CLASS_NAMES,
                ax=ax, square=True, cbar_kws={'label': 'Proportion'})
    ax.set_xlabel('Predicted')
    ax.set_ylabel('True')
    acc = accuracy_score(y_true_all, y_pred)
    ax.set_title(f'{title} Confusion Matrix (Acc: {acc:.1%})')

plt.tight_layout();
plt.savefig(FIGURES_DIR / 'classical_ml_confusion_matrices.png', dpi=150, bbox_inches='tight')

In [None]:
# Cross-validation on single subject to find optimal n_components
subj = 1
mask = subjects_train == subj
X_subj, y_subj = X_train[mask], y_train[mask]

component_range = [2, 4, 6, 8, 10, 12]
cv_results = []

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)

for n_comp in tqdm(component_range, desc="Testing n_components"):
    # Create pipeline
    pipe = Pipeline([
        ('csp', CSP(n_components=n_comp, reg='ledoit_wolf', log=True)),
        ('lda', LinearDiscriminantAnalysis(solver='lsqr', shrinkage='auto'))
    ])
    
    scores = cross_val_score(pipe, X_subj, y_subj, cv=cv, scoring='accuracy')
    cv_results.append({
        'n_components': n_comp,
        'mean_acc': scores.mean(),
        'std_acc': scores.std(),
        'scores': scores
    })

# Plot results
fig, ax = plt.subplots(figsize=(10, 5))

means = [r['mean_acc'] for r in cv_results]
stds = [r['std_acc'] for r in cv_results]

ax.errorbar(component_range, means, yerr=stds, marker='o', markersize=8, 
            capsize=5, linewidth=2, color='steelblue')
ax.axhline(0.25, color='gray', linestyle=':', label='Chance')

best_idx = np.argmax(means)
ax.scatter([component_range[best_idx]], [means[best_idx]], s=200, 
           color='red', zorder=5, label=f'Best: {component_range[best_idx]} components')

ax.set_xlabel('Number of CSP Components')
ax.set_ylabel('5-Fold CV Accuracy')
ax.set_title(f'CSP Component Selection (Subject {subj})')
ax.legend()
ax.grid(True, alpha=0.3)

plt.tight_layout();

print(f"\nBest: {component_range[best_idx]} components with {means[best_idx]:.2%} accuracy")

In [None]:
# Compare classifiers with optimal CSP
N_COMP_OPTIMAL = 6

classifiers = {
    'LDA': LinearDiscriminantAnalysis(solver='lsqr', shrinkage='auto'),
    'SVM (linear)': SVC(kernel='linear', C=1.0),
    'SVM (RBF)': SVC(kernel='rbf', C=1.0, gamma='scale'),
    'SVM (RBF, C=10)': SVC(kernel='rbf', C=10.0, gamma='scale'),
}

clf_results = {}

for name, clf in classifiers.items():
    pipe = Pipeline([
        ('csp', CSP(n_components=N_COMP_OPTIMAL, reg='ledoit_wolf', log=True)),
        ('clf', clf)
    ])
    
    scores = cross_val_score(pipe, X_subj, y_subj, cv=cv, scoring='accuracy')
    clf_results[name] = {'mean': scores.mean(), 'std': scores.std()}
    print(f"{name:<20}: {scores.mean():.2%} (+/- {scores.std():.2%})")

In [None]:
# Per-class accuracy
print("Per-class accuracy (SVM):")
print("=" * 50)
print(classification_report(y_true_all, y_pred_svm_all, 
                            target_names=CLASS_NAMES, digits=3))

In [None]:
# Per-class accuracy by subject
per_class_acc = np.zeros((len(all_results), 4))

for i, r in enumerate(all_results):
    for cls in range(4):
        mask = r['y_true'] == cls
        if mask.sum() > 0:
            per_class_acc[i, cls] = (r['svm_predictions'][mask] == cls).mean()

fig, ax = plt.subplots(figsize=(10, 6))

im = ax.imshow(per_class_acc, cmap='RdYlGn', aspect='auto', vmin=0, vmax=1)

ax.set_xticks(range(4))
ax.set_xticklabels(CLASS_NAMES)
ax.set_yticks(range(len(all_results)))
ax.set_yticklabels([f'S{r["subject"]}' for r in all_results])
ax.set_xlabel('Class')
ax.set_ylabel('Subject')
ax.set_title('Per-Class Accuracy by Subject (SVM)')

# Add text annotations
for i in range(len(all_results)):
    for j in range(4):
        text = ax.text(j, i, f'{per_class_acc[i, j]:.0%}',
                       ha='center', va='center', color='black', fontsize=9)

plt.colorbar(im, label='Accuracy')
plt.tight_layout();

In [None]:
# Create MNE info for topomaps
info = mne.create_info(ch_names=CHANNEL_NAMES, sfreq=250, ch_types='eeg')
montage = mne.channels.make_standard_montage('standard_1020')
info.set_montage(montage)

# Plot CSP patterns for first 3 subjects
fig, axes = plt.subplots(3, 6, figsize=(16, 8))

for row, r in enumerate(all_results[:3]):
    csp = r['csp']
    for col in range(6):
        pattern = csp.patterns_[col]
        mne.viz.plot_topomap(pattern, info, axes=axes[row, col], show=False)
        if row == 0:
            axes[row, col].set_title(f'CSP {col+1}')
    axes[row, 0].set_ylabel(f'Subject {r["subject"]}', fontsize=12)

fig.suptitle('CSP Spatial Patterns (First 3 Subjects)', y=1.02, fontsize=14)
plt.tight_layout();
plt.savefig(FIGURES_DIR / 'csp_patterns.png', dpi=150, bbox_inches='tight')

In [None]:
# Save results summary
import json

results_summary = {
    'method': 'CSP + Classical ML',
    'n_csp_components': N_COMPONENTS,
    'subjects': [r['subject'] for r in all_results],
    'lda': {
        'per_subject_accuracy': [r['lda_accuracy'] for r in all_results],
        'mean_accuracy': avg_lda_acc,
        'std_accuracy': np.std([r['lda_accuracy'] for r in all_results]),
        'mean_kappa': avg_lda_kappa
    },
    'svm': {
        'per_subject_accuracy': [r['svm_accuracy'] for r in all_results],
        'mean_accuracy': avg_svm_acc,
        'std_accuracy': np.std([r['svm_accuracy'] for r in all_results]),
        'mean_kappa': avg_svm_kappa
    }
}

with open(RESULTS_DIR / 'classical_ml_results.json', 'w') as f:
    json.dump(results_summary, f, indent=2)

print(f"Results saved to {RESULTS_DIR / 'classical_ml_results.json'}")

In [None]:
# Save predictions for later analysis
np.savez(
    RESULTS_DIR / 'classical_ml_predictions.npz',
    y_true=y_true_all,
    y_pred_lda=y_pred_lda_all,
    y_pred_svm=y_pred_svm_all,
    subjects=np.concatenate([np.full(len(r['y_true']), r['subject']) for r in all_results])
)

print(f"Predictions saved to {RESULTS_DIR / 'classical_ml_predictions.npz'}")

Results

| Model | Mean Accuracy | Mean Kappa |
|-------|--------------|------------|
| CSP + LDA | ~65-70% | ~0.55 |
| CSP + SVM | ~70-75% | ~0.60 |
| Chance | 25% | 0.00 |

- Learned filters show expected lateralization over motor cortex
- 6 components provide good balance of information and generalization
- Some subjects achieve >80%, others ~50%
- Left/Right hand often easier to distinguish (contralateral patterns)