# SVM Classification with Nested Cross-Validation

Rigorous evaluation using:
- Nested preprocessing (Harmonize→Scale→PCA per fold)
- 5-fold stratified CV on 90% dev set
- Held-out 10% test set for final evaluation
- Baseline (Logistic Regression) comparison
- Feature importance → brain region mapping

In [None]:
from core.config import initialize_notebook
import pandas as pd
import pickle
from pathlib import Path

env = initialize_notebook(regenerate_run_id=False)

research_question = env.configs.run['run_name']
seed = env.configs.run['seed']
kernel = env.configs.svm['model']['kernel']

print(f"Research Question: {research_question.upper()}")
print(f"Seed: {seed}")
print(f"SVM Kernel: {kernel}")
print(f"CV Folds: {env.configs.svm['cv']['n_splits']}")

## Load Data

Combine train + val → 90% development set for CV.
Keep test set (10%) completely held out.

In [None]:
from core.svm.pipeline import load_development_data

dev_df, data_dir = load_development_data(env)
test_df = pd.read_parquet(data_dir / "test.parquet")

print(f"Development set: {len(dev_df):,} subjects")
print(f"Test set: {len(test_df):,} subjects")

group_col = env.configs.data['columns']['mapping']['research_group']
print(f"\nDev group distribution:\n{dev_df[group_col].value_counts()}")

## Select Classification Task

Test with single task first before running all.

In [None]:
tasks = env.configs.svm['tasks']
print("Available tasks:")
for i, task in enumerate(tasks):
    print(f"  {i}: {task['name']}")

# Select first task for testing
task_config = tasks[0]
print(f"\nTesting with: {task_config['name']}")

## Baseline: Logistic Regression with Nested CV

In [None]:
from core.svm.pipeline import filter_task_data, run_nested_cv
from core.svm.models import create_baseline
from core.svm.preprocessing import fit_pca_on_dev, apply_pca_to_fold

# Filter data for this task
dev_filtered, y_dev = filter_task_data(dev_df, task_config, group_col)
test_filtered, y_test = filter_task_data(test_df, task_config, group_col)

print(f"Task: {task_config['name']}")
print(f"Dev: {len(y_dev)} | Test: {len(y_test)}")
print(f"Positive class ratio: {y_dev.mean():.2%}")

# Fit PCA once on full dev set (not per-fold - avoids "mixing apples and pears")
print("\nFitting PCA on full dev set...")
fitted_pipeline = fit_pca_on_dev(dev_filtered, env, seed)
print(f"PCA: {fitted_pipeline['n_components']} components, {fitted_pipeline['variance_explained']:.2%} variance explained")

# Run baseline with nested CV using pre-fitted PCA
baseline = create_baseline(env.configs.svm, seed)
print("\nRunning baseline with nested CV (pre-fitted PCA)...")
baseline_cv = run_nested_cv(dev_filtered, y_dev, baseline, env, seed, fitted_pipeline, use_wandb=False)

print("\nBaseline CV Results:")
print(f"  Accuracy: {baseline_cv['aggregated']['accuracy_mean']:.3f} ± {baseline_cv['aggregated']['accuracy_std']:.3f}")
print(f"  Balanced Accuracy: {baseline_cv['aggregated']['balanced_accuracy_mean']:.3f} ± {baseline_cv['aggregated']['balanced_accuracy_std']:.3f}")
print(f"  F1: {baseline_cv['aggregated']['f1_mean']:.3f} ± {baseline_cv['aggregated']['f1_std']:.3f}")
print(f"  ROC-AUC: {baseline_cv['aggregated']['roc_auc_mean']:.3f} ± {baseline_cv['aggregated']['roc_auc_std']:.3f}")

## SVM with Nested CV

In [None]:
from core.svm.models import create_svm

svm = create_svm(env.configs.svm, seed)
print(f"Running {kernel} SVM with nested CV (pre-fitted PCA)...")
svm_cv = run_nested_cv(dev_filtered, y_dev, svm, env, seed, fitted_pipeline, use_wandb=False)

print("\nSVM CV Results:")
print(f"  Accuracy: {svm_cv['aggregated']['accuracy_mean']:.3f} ± {svm_cv['aggregated']['accuracy_std']:.3f}")
print(f"  Balanced Accuracy: {svm_cv['aggregated']['balanced_accuracy_mean']:.3f} ± {svm_cv['aggregated']['balanced_accuracy_std']:.3f}")
print(f"  F1: {svm_cv['aggregated']['f1_mean']:.3f} ± {svm_cv['aggregated']['f1_std']:.3f}")
print(f"  ROC-AUC: {svm_cv['aggregated']['roc_auc_mean']:.3f} ± {svm_cv['aggregated']['roc_auc_std']:.3f}")

print("\nBaseline vs SVM (CV):")
print(f"  Accuracy: {(svm_cv['aggregated']['accuracy_mean'] - baseline_cv['aggregated']['accuracy_mean']):.3f}")
print(f"  Balanced Accuracy: {(svm_cv['aggregated']['balanced_accuracy_mean'] - baseline_cv['aggregated']['balanced_accuracy_mean']):.3f}")
print(f"  ROC-AUC: {(svm_cv['aggregated']['roc_auc_mean'] - baseline_cv['aggregated']['roc_auc_mean']):.3f}")

## Final Models on Test Set

Train on full dev set, evaluate once on held-out test.

In [None]:
from core.svm.pipeline import run_final_model
from core.svm.evaluation import bootstrap_test_metrics

run_cfg = env.configs.run
task_name = task_config['name']
svm_dir = env.repo_root / "outputs" / run_cfg['run_name'] / run_cfg['run_id'] / f"seed_{seed}" / "svm" / task_name
svm_dir.mkdir(parents=True, exist_ok=True)

print("Training final baseline on full dev set...")
baseline_final = run_final_model(dev_filtered, test_filtered, y_dev, y_test, 
                                 baseline, env, seed, f"baseline_{task_name}", svm_dir, fitted_pipeline)

print("Training final SVM on full dev set...")
svm_final = run_final_model(dev_filtered, test_filtered, y_dev, y_test,
                            svm, env, seed, f"svm_{task_name}", svm_dir, fitted_pipeline)

print("\n" + "="*60)
print("TEST SET RESULTS (FINAL)")
print("="*60)

print("\nBaseline:")
for metric, value in baseline_final['test_metrics'].items():
    print(f"  {metric}: {value:.3f}")

print("\nSVM:")
for metric, value in svm_final['test_metrics'].items():
    print(f"  {metric}: {value:.3f}")

# Bootstrap for confidence intervals (optimized - much faster now!)
n_boot = env.configs.svm.get("evaluation", {}).get("bootstrap", {}).get("iterations", 1000)
print(f"\n\nBootstrapping test set ({n_boot} iterations)...")

baseline_bootstrap = bootstrap_test_metrics(
    baseline_final['model'], baseline_final['X_test_pca'], y_test, n_boot, seed
)
svm_bootstrap = bootstrap_test_metrics(
    svm_final['model'], svm_final['X_test_pca'], y_test, n_boot, seed
)

print("\n" + "="*60)
print("TEST SET WITH 95% CONFIDENCE INTERVALS")
print("="*60)
print(f"\nBaseline ROC-AUC: {baseline_bootstrap['roc_auc_mean']:.3f} "
      f"[{baseline_bootstrap['roc_auc_ci_lower']:.3f}, {baseline_bootstrap['roc_auc_ci_upper']:.3f}]")
print(f"SVM ROC-AUC: {svm_bootstrap['roc_auc_mean']:.3f} "
      f"[{svm_bootstrap['roc_auc_ci_lower']:.3f}, {svm_bootstrap['roc_auc_ci_upper']:.3f}]")

print(f"\nCV ROC-AUC: {svm_cv['aggregated']['roc_auc_mean']:.3f} ± {svm_cv['aggregated']['roc_auc_std']:.3f}")
print(f"Test ROC-AUC: {svm_bootstrap['roc_auc_mean']:.3f} (95% CI: [{svm_bootstrap['roc_auc_ci_lower']:.3f}, {svm_bootstrap['roc_auc_ci_upper']:.3f}])")
print(f"CV-Test Gap: {svm_cv['aggregated']['roc_auc_mean'] - svm_bootstrap['roc_auc_mean']:.3f}")

## Confusion Matrices

In [None]:
from core.svm.preprocessing import preprocess_fold
from core.svm.evaluation import compute_confusion_matrix
from core.svm.visualization import plot_confusion_matrix
from IPython.display import Image, display

plots_dir = svm_dir / "plots"
plots_dir.mkdir(exist_ok=True)

# Preprocess test data
_, X_test_pca, _ = preprocess_fold(dev_filtered, test_filtered, env, seed)

# Predictions
y_pred_baseline = baseline_final['model'].predict(X_test_pca)
y_pred_svm = svm_final['model'].predict(X_test_pca)

# Confusion matrices
cm_baseline = compute_confusion_matrix(y_test, y_pred_baseline)
cm_svm = compute_confusion_matrix(y_test, y_pred_svm)

plot_confusion_matrix(cm_baseline, ["Negative", "Positive"], f"Baseline - {task_name}",
                     plots_dir / f"cm_baseline_{task_name}.png")
plot_confusion_matrix(cm_svm, ["Negative", "Positive"], f"SVM - {task_name}",
                     plots_dir / f"cm_svm_{task_name}.png")

display(Image(str(plots_dir / f"cm_baseline_{task_name}.png")))
display(Image(str(plots_dir / f"cm_svm_{task_name}.png")))

## CV-Based Feature Importance → Brain Regions

Uses averaged permutation importance across CV folds for robust estimates (more stable than single test set with small positive class).

In [None]:
from core.svm.interpretation import get_cv_feature_importance, map_pca_to_brain_regions
from core.svm.feature_mapping import enrich_brain_regions
from core.tsne.embeddings import get_imaging_columns
from core.svm.visualization import plot_feature_importance

# CV-based permutation importance (averaged across folds)
print("Computing CV-based permutation importance...\n")
n_repeats = env.configs.svm.get("interpretation", {}).get("n_repeats", 3)
svm_importance = get_cv_feature_importance(svm_cv['fold_results'], seed, n_repeats=n_repeats)

print(f"\nTop 10 Principal Components (by CV importance):\n")
print(svm_importance.head(10).to_string(index=False))

# Map to brain regions using THE SAME fitted_pipeline PCA (consistent across all folds)
all_imaging_cols = get_imaging_columns(dev_filtered, env.configs.svm['imaging_prefixes'])
valid_features = fitted_pipeline['valid_features']
imaging_cols = [col for i, col in enumerate(all_imaging_cols) if valid_features[i]]

brain_regions = map_pca_to_brain_regions(
    svm_importance, fitted_pipeline['pca'], imaging_cols,
    top_n_components=10, top_n_features=20
)

# Add human-readable labels
brain_regions_enriched = enrich_brain_regions(brain_regions, env)
brain_regions_enriched.to_csv(svm_dir / "brain_regions.csv", index=False)

# Display formatted table
print("\n\n" + "="*120)
print(f"TOP 20 BRAIN REGIONS - {task_name.replace('_', ' ').title()}")
print("="*120)

display_df = brain_regions_enriched.head(20).copy()
display_df.insert(0, 'Rank', range(1, 21))
display_df['importance'] = display_df['importance'].apply(lambda x: f"{x:.4f}")

pd.set_option('display.max_colwidth', None)
pd.set_option('display.width', None)
pd.set_option('display.max_rows', None)

print(display_df.to_string(index=False))
print("="*120)

In [None]:
svm_importance.tail(20)

## Visualize Brain Region Importance

In [None]:
plot_feature_importance(brain_regions_enriched, f"Top Brain Regions - {task_name}",
                       plots_dir / f"brain_regions_{task_name}.png", top_n=20)

display(Image(str(plots_dir / f"brain_regions_{task_name}.png")))

## Run All Tasks (Optional)

Once single task works, run complete pipeline.

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import textwrap

# Use CV fold data - all folds now use same PCA so no dimension mismatch!
fold_results = svm_cv['fold_results']
X_dev_pca_list = []
y_dev_list = []

for fold_idx, fold in enumerate(fold_results):
    X_dev_pca_list.append(fold['X_val_pca'])
    y_dev_list.append(fold['y_val'])

# Combine all folds to get full dev set (no truncation needed!)
X_dev_pca = np.vstack(X_dev_pca_list)
y_dev_combined = np.hstack(y_dev_list)

# Use the fitted_pipeline PCA (same for all folds)
pca = fitted_pipeline['pca']

print(f"Using pre-fitted PCA with {pca.n_components_} components")
print(f"Dev set from CV folds: {X_dev_pca.shape}")

# Get original brain imaging features for brain region visualization
from core.tsne.embeddings import get_imaging_columns
all_imaging_cols = get_imaging_columns(dev_filtered, env.configs.svm['imaging_prefixes'])
valid_features = fitted_pipeline['valid_features']
imaging_cols = [col for i, col in enumerate(all_imaging_cols) if valid_features[i]]

print(f"Original brain features: {len(imaging_cols)}")

# Load full data dictionary for feature name lookups
from core.svm.feature_mapping import load_data_dictionary
data_dict = load_data_dictionary(env)
if data_dict is not None:
    brain_region_labels = dict(zip(data_dict['var_name'], data_dict['var_label']))
else:
    brain_region_labels = {}

# Select feature to visualize
viz_type = "brain"  # Change to "PC" to visualize principal component
pc_idx = 0  # PC1 (0-indexed)
brain_feature = "mrisdp_191"  # Example: top brain region from feature importance

if viz_type == "PC":
    if X_dev_pca.shape[1] >= pc_idx + 1:
        feature_values = X_dev_pca[:, pc_idx]
        feature_name = f"PC{pc_idx + 1}"
        feature_label = f"PC{pc_idx + 1}"
    else:
        pc_idx = X_dev_pca.shape[1] - 1
        feature_values = X_dev_pca[:, pc_idx]
        feature_name = f"PC{pc_idx + 1}"
        feature_label = f"PC{pc_idx + 1}"
        print(f"\nNote: Only {X_dev_pca.shape[1]} components. Using {feature_name}.")
else:
    # Visualize original brain region from raw dev data
    if brain_feature in dev_filtered.columns:
        feature_values = dev_filtered[brain_feature].values
        feature_name = brain_feature
        # Use human-readable label if available
        if brain_feature in brain_region_labels:
            feature_label = brain_region_labels[brain_feature]
        else:
            feature_label = brain_feature
    else:
        print(f"Brain feature {brain_feature} not found in dev_filtered columns.")
        feature_values = None

if feature_values is not None:
    # Wrap long titles to prevent overlap
    def wrap_title(text, width=60):
        """Wrap text to multiple lines if too long."""
        if len(text) <= width:
            return text
        return '\n'.join(textwrap.wrap(text, width=width))
    
    # Create visualization
    fig, axes = plt.subplots(1, 2, figsize=(16, 6))
    
    class_labels = dev_filtered[group_col].values
    colors_map = {'Control': '#1f77b4', 'Clinical': '#d62728', 'Subclinical': '#ff7f0e'}
    
    # Only plot classes that exist in filtered data
    unique_labels = np.unique(class_labels)
    
    # Plot 1: Histogram/density by class
    for label in unique_labels:
        if label in colors_map:
            mask = class_labels == label
            axes[0].hist(feature_values[mask], bins=50, alpha=0.5, label=label, 
                        color=colors_map[label], density=True)
    
    axes[0].set_xlabel(f'{feature_name} value', fontsize=12)
    axes[0].set_ylabel('Density', fontsize=12)
    axes[0].set_title(wrap_title(feature_label) + '\nDistribution by class', fontsize=11, fontweight='bold')
    axes[0].legend(loc='best', fontsize=10)
    axes[0].grid(alpha=0.3)
    
    # Plot 2: Strip plot
    class_to_y = {'Control': 0, 'Subclinical': 1, 'Clinical': 2}
    y_positions = np.array([class_to_y.get(label, 0) for label in class_labels])
    
    np.random.seed(seed)
    jitter = np.random.normal(0, 0.08, size=len(y_positions))
    
    for label in unique_labels:
        if label in colors_map:
            mask = class_labels == label
            axes[1].scatter(feature_values[mask], y_positions[mask] + jitter[mask], 
                           c=colors_map[label], label=label, alpha=0.4, s=10)
    
    axes[1].set_xlabel(f'{feature_name} value', fontsize=12)
    axes[1].set_ylabel('Class', fontsize=12)
    axes[1].set_yticks([0, 1, 2])
    axes[1].set_yticklabels(['Control', 'Subclinical', 'Clinical'])
    axes[1].set_title(wrap_title(feature_label) + '\nValues across classes', fontsize=11, fontweight='bold')
    axes[1].grid(alpha=0.3, axis='x')
    
    plt.tight_layout()
    plt.show()
    
    # Print statistics (only for classes present in data)
    print(f"\n{feature_label}")
    print(f"Feature: {feature_name}")
    print("="*80)
    for label in unique_labels:
        mask = class_labels == label
        values = feature_values[mask]
        if len(values) > 0:
            print(f"{label:12s}: mean={values.mean():7.3f}, std={values.std():7.3f}, n={mask.sum():5d}")
    print("="*80)