# Robust, Leakageâ€‘Free Evaluation with Perâ€‘Subject Normalization

This notebook implements **group-aware** training/validation to avoid leakage when doing **per-subject normalization**. It includes:

- GroupKFold and **Leave-One-Subject-Out (LOSO)** evaluation
- Two scaling modes for X: **per-subject** (fit on train-only stats per subject) or **global** (fit on train only)
- Y preprocessing: **Imputer â†’ StandardScaler â†’ PCA** (fit on train only)
- Model baselines: **RidgeCV** and **RandomForestRegressor** (easily replaceable)
- Optional **log1p** transform for power-like features
- Clear metrics: RÂ², MAE, Pearson r (macro over targets)

ðŸ‘‰ *How to use:* Replace the **Data Loading** cell to construct `X`, `Y`, and `subjects` (a 1D array-like subject id per row). Then run all cells.


In [1]:
# !pip install numpy pandas scikit-learn scipy
import numpy as np
import pandas as pd
from sklearn.model_selection import GroupKFold
from sklearn.linear_model import RidgeCV
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import r2_score, mean_absolute_error
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
from scipy.stats import pearsonr
from typing import Tuple, Dict, Any
import warnings
warnings.filterwarnings('ignore')


## Data Loading (Replace This Cell)
You must define:
- `X`: shape (n_samples, n_features), `pd.DataFrame` or `np.ndarray`
- `Y`: shape (n_samples, n_targets), `pd.DataFrame` or `np.ndarray`
- `subjects`: shape (n_samples,), array-like subject IDs (int/str)

Optionally, set `FEATURES_ARE_POWERS = True` to apply `log1p` to X.


In [None]:
# ====== EXAMPLE DUMMY DATA (DELETE/REPLACE) ======
rng = np.random.default_rng(42)
n_subj = 8
n_per = 200
n_features = 30
n_targets = 5
X_list, Y_list, S_list = [], [], []
for s in range(n_subj):
    # subject-specific shifts
    shift = rng.normal(0, 3, size=n_features)
    Xs = rng.normal(0, 1, size=(n_per, n_features)) + shift
    W = rng.normal(0, 0.5, size=(n_features, n_targets))
    Ys = Xs @ W + rng.normal(0, 0.3, size=(n_per, n_targets))
    X_list.append(Xs)
    Y_list.append(Ys)
    S_list.append(np.full(n_per, s))
X = np.vstack(X_list)
Y = np.vstack(Y_list)
subjects = np.concatenate(S_list)

# Wrap into DataFrames for convenience
X = pd.DataFrame(X, columns=[f"f{i}" for i in range(X.shape[1])])
Y = pd.DataFrame(Y, columns=[f"y{i}" for i in range(Y.shape[1])])

FEATURES_ARE_POWERS = False  # set True if your X are power-like (use log1p)
print(f"X shape: {X.shape}; Y shape: {Y.shape}; subjects shape: {subjects.shape}")


## Utilities: Metrics and Safe Conversions


In [None]:
def _to_numpy(a):
    if isinstance(a, pd.DataFrame) or isinstance(a, pd.Series):
        return a.values
    return np.asarray(a)

def macro_pearsonr(y_true: np.ndarray, y_pred: np.ndarray) -> float:
    # average Pearson r across targets; guard degenerate columns
    y_true = _to_numpy(y_true)
    y_pred = _to_numpy(y_pred)
    T = y_true.shape[1]
    rs = []
    for t in range(T):
        yt = y_true[:, t]
        yp = y_pred[:, t]
        if np.std(yt) < 1e-9 or np.std(yp) < 1e-9:
            rs.append(0.0)
        else:
            r, _ = pearsonr(yt, yp)
            rs.append(r)
    return float(np.mean(rs))

def summarize_metrics(y_true, y_pred) -> Dict[str, float]:
    y_true = _to_numpy(y_true)
    y_pred = _to_numpy(y_pred)
    return {
        'R2': float(r2_score(y_true, y_pred, multioutput='uniform_average')),
        'MAE': float(mean_absolute_error(y_true, y_pred)),
        'PearsonR_macro': macro_pearsonr(y_true, y_pred)
    }


## Per-Subject Scaling without Leakage
We compute scalers **using train indices only**, *per subject*, then apply those scalers to both train and test samples of the **same subject**.


In [None]:
from collections import defaultdict

def scale_within_subject_train_stats(X: np.ndarray, subjects: np.ndarray, train_idx: np.ndarray, test_idx: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
    X = _to_numpy(X).astype(float)
    subjects = _to_numpy(subjects)
    X_train, X_test = X[train_idx].copy(), X[test_idx].copy()
    subj_train_stats = {}
    # Fit stats per subject from TRAIN subset only
    for s in np.unique(subjects[train_idx]):
        s_mask = (subjects[train_idx] == s)
        Xs = X_train[s_mask]
        mean = Xs.mean(axis=0)
        std = Xs.std(axis=0, ddof=0)
        std[std < 1e-12] = 1.0  # avoid div by zero
        subj_train_stats[s] = (mean, std)
    # Transform train using its subject stats
    for s in np.unique(subjects[train_idx]):
        mean, std = subj_train_stats[s]
        mask = (subjects[train_idx] == s)
        X_train[mask] = (X_train[mask] - mean) / std
    # Transform test using the SAME subject stats (if subject unseen in train, fallback to global train stats)
    global_mean = X[train_idx].mean(axis=0)
    global_std = X[train_idx].std(axis=0, ddof=0)
    global_std[global_std < 1e-12] = 1.0
    for s in np.unique(subjects[test_idx]):
        if s in subj_train_stats:
            mean, std = subj_train_stats[s]
        else:
            mean, std = global_mean, global_std
        mask = (subjects[test_idx] == s)
        X_test[mask] = (X_test[mask] - mean) / std
    return X_train, X_test


## Y Preprocessing Pipeline (train-only fit)
Impute â†’ Standardize â†’ PCA; returns transformed arrays and fitted transformers.


In [None]:
def preprocess_Y_train_test(Y_train, Y_test, n_components: int = None, var_ratio: float = 0.95):
    Y_train = _to_numpy(Y_train).astype(float)
    Y_test = _to_numpy(Y_test).astype(float)
    imp = SimpleImputer(strategy='median')
    Ytr_imp = imp.fit_transform(Y_train)
    Yte_imp = imp.transform(Y_test)
    y_scaler = StandardScaler()
    Ytr_sc = y_scaler.fit_transform(Ytr_imp)
    Yte_sc = y_scaler.transform(Yte_imp)
    if n_components is None:
        pca = PCA(n_components=var_ratio, svd_solver='full')
    else:
        pca = PCA(n_components=n_components)
    Ytr_z = pca.fit_transform(Ytr_sc)
    Yte_z = pca.transform(Yte_sc)
    return Ytr_z, Yte_z, {'imputer': imp, 'scaler': y_scaler, 'pca': pca}


## Main Evaluation Loop
Set `USE_LOSO = True` for Leave-One-Subject-Out; otherwise uses GroupKFold. Choose `PER_SUBJECT_SCALING` or `GLOBAL_SCALING`.


In [None]:
USE_LOSO = False            # set True for Leave-One-Subject-Out
N_SPLITS = 5               # ignored if USE_LOSO=True
PER_SUBJECT_SCALING = True # if False, uses global StandardScaler fit on train only
APPLY_LOG1P_TO_X = bool(FEATURES_ARE_POWERS)

# Choose a model (you can try both)
MODEL = 'ridge'  # 'ridge' or 'rf'
Ridge_alphas = np.logspace(-3, 3, 13)
RF_params = dict(n_estimators=300, max_depth=None, random_state=0, n_jobs=-1)

X_np = _to_numpy(X)
Y_np = _to_numpy(Y)
S = _to_numpy(subjects)

if APPLY_LOG1P_TO_X:
    X_np = np.log1p(np.maximum(X_np, 0))

unique_subj = np.unique(S)
if USE_LOSO:
    folds = [(np.where(S != s)[0], np.where(S == s)[0]) for s in unique_subj]
else:
    gkf = GroupKFold(n_splits=N_SPLITS)
    folds = list(gkf.split(X_np, Y_np, groups=S))

metrics_all = []
for fold_id, (tr_idx, te_idx) in enumerate(folds, 1):
    Xtr, Xte = None, None
    if PER_SUBJECT_SCALING:
        Xtr, Xte = scale_within_subject_train_stats(X_np, S, tr_idx, te_idx)
    else:
        # Global scaler fit on train only
        xs = StandardScaler()
        Xtr = xs.fit_transform(X_np[tr_idx])
        Xte = xs.transform(X_np[te_idx])

    Ytr_z, Yte_z, yproc = preprocess_Y_train_test(Y_np[tr_idx], Y_np[te_idx], n_components=None, var_ratio=0.95)

    if MODEL == 'ridge':
        model = RidgeCV(alphas=Ridge_alphas, store_cv_values=False)
    elif MODEL == 'rf':
        model = RandomForestRegressor(**RF_params)
    else:
        raise ValueError("Unknown MODEL")

    model.fit(Xtr, Ytr_z)
    Yhat_z = model.predict(Xte)
    # Inverse-transform back to original Y space for metrics (optional). Here we metric in PCA space for stability.
    m = summarize_metrics(Yte_z, Yhat_z)
    m.update({'fold': fold_id, 'n_train': len(tr_idx), 'n_test': len(te_idx)})
    metrics_all.append(m)
    print(f"Fold {fold_id}: R2={m['R2']:.3f}, MAE={m['MAE']:.3f}, r={m['PearsonR_macro']:.3f}")

df_metrics = pd.DataFrame(metrics_all)
print("\nCV Summary:")
print(df_metrics.describe().loc[['mean','std','min','max'], ['R2','MAE','PearsonR_macro']])


## Compare Settings
You can re-run the loop with different switches (e.g., `USE_LOSO=True`, or `PER_SUBJECT_SCALING=False`) to see how results change.

In [None]:
df_metrics.head()


## Tips
- If your test set contains **unseen subjects**, prefer `PER_SUBJECT_SCALING=False` (global scaler fit on train) to avoid fitting any statistics on test subjects.
- If subjects appear in both train and test (e.g., temporal splits per subject), `PER_SUBJECT_SCALING=True` uses only **train windows of each subject** to compute that subject's scaler and applies it to the test windows of the same subject.
- Remove any **near-zero-variance** targets before PCA, or use `RobustScaler` if targets have heavy tails.
- Consider adding **time lags** or domain-specific features to strengthen Xâ†’Y mapping.
