# XGBoost - Train

In [None]:
import pathlib
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import label_binarize
from sklearn.metrics import (
    classification_report, confusion_matrix, ConfusionMatrixDisplay,
    roc_auc_score, roc_curve, precision_recall_curve, average_precision_score
)

from xgboost import XGBClassifier
import joblib

In [None]:
DATA_DIR = pathlib.Path("../data")
MODEL_DIR = pathlib.Path("../models")

ARTIFACTS = pathlib.Path(DATA_DIR / "artifacts/xgb")
MODELS = pathlib.Path(MODEL_DIR / "xgb")

OUT = MODELS
OUT.mkdir(parents=True, exist_ok=True)

seed = 42

## Load Data

In [None]:
X = pd.read_parquet(ARTIFACTS / "X.parquet")
y = pd.read_parquet(ARTIFACTS / "y.parquet")["label_id"]
metadata = pd.read_parquet(ARTIFACTS / "metadata.parquet")

with open(ARTIFACTS / "splits.json") as f:
    splits = json.load(f)

label_to_id = splits["label_to_id"]
id_to_label = {int(k): v for k, v in splits["id_to_label"].items()}
classes = np.array(sorted(id_to_label.keys()))  # numeric class ids
class_names = [id_to_label[c] for c in classes]
num_class = len(classes)

# Indexes are stored as pairs (subject, experiment). Convert back to indexers.
idx_train = pd.MultiIndex.from_tuples(splits["train_idx"], names=["subject","experiment"])
idx_val   = pd.MultiIndex.from_tuples(splits["val_idx"],   names=["subject","experiment"])
idx_test  = pd.MultiIndex.from_tuples(splits["test_idx"],  names=["subject","experiment"])

X_train, y_train = X.loc[idx_train], y.loc[idx_train]
X_val,   y_val   = X.loc[idx_val],   y.loc[idx_val]
X_test,  y_test  = X.loc[idx_test],  y.loc[idx_test]

print("Shapes:")
print("  X_train:", X_train.shape, " y_train:", y_train.shape)
print("  X_val:  ", X_val.shape,   " y_val:  ", y_val.shape)
print("  X_test: ", X_test.shape,  " y_test: ", y_test.shape)
print("Class map:", id_to_label)

## Model Training

### Train Function

We will retrain on different splits, lets standarize the function

In [None]:
from xgboost import XGBClassifier
import pandas as pd

def train_xgb(X_train, y_train, X_val, y_val, parameters=None):
    """
    Minimal, explicit training helper. Expects all required keys to exist in `parameters`.
    """
    params = parameters  # use as-is

    # Instantiate with explicit fields from the dict
    xgb_clf = XGBClassifier(
        objective=params["objective"],
        num_class=params["num_class"],
        n_estimators=params["n_estimators"],
        learning_rate=params["learning_rate"],
        max_depth=params["max_depth"],
        min_child_weight=params["min_child_weight"],
        subsample=params["subsample"],
        colsample_bytree=params["colsample_bytree"],
        reg_lambda=params["reg_lambda"],
        reg_alpha=params["reg_alpha"],
        tree_method=params["tree_method"],
        random_state=params["random_state"],
        n_jobs=params["n_jobs"],
        eval_metric=params["eval_metric"],
    )

    # Fit with explicit fit-args from the same dict
    xgb_clf.fit(
        X_train, y_train,
        eval_set=[(X_train, y_train), (X_val, y_val)],
        early_stopping_rounds=params["early_stopping_rounds"],
        verbose=params["verbose"],
    )
    return xgb_clf


### Train

In [None]:


params = {
    "objective": "multi:softprob",
    "num_class": num_class,
    "n_estimators": 2000,
    "learning_rate": 0.05,
    "max_depth": 6,
    "min_child_weight": 2.0,
    "subsample": 0.9,
    "colsample_bytree": 0.9,
    "reg_lambda": 1.0,
    "reg_alpha": 0.0,
    "tree_method": "hist",
    "random_state": seed,
    "n_jobs": -1,
    "eval_metric": "mlogloss",
    "early_stopping_rounds": 100,
    "verbose": 50,
}

xgb_clf = train_xgb(X_train, y_train, X_val, y_val, parameters=params)

best_iter = getattr(xgb_clf, "best_iteration", None)
print("Best iteration:", best_iter)


### Check early stop and overfitting

In [None]:
import numpy as np
import matplotlib.pyplot as plt

# Get training history from the fitted model
history = xgb_clf.evals_result()
# XGBoost names the eval sets as validation_0, validation_1 in sklearn API
train_key, val_key = "validation_0", "validation_1"

# Safety checks in case keys/metric names differ
if train_key not in history or val_key not in history:
    print("Could not find expected eval keys in evals_result(). Available keys:", list(history.keys()))
else:
    # Find the metric name (should be 'mlogloss'); fall back to the first metric found
    metrics = list(history[train_key].keys())
    metric = "mlogloss" if "mlogloss" in metrics else metrics[0]

    train_curve = history[train_key][metric]
    val_curve   = history[val_key][metric]

    rounds = np.arange(1, len(train_curve) + 1)

    # Best iteration: prefer model attribute; otherwise, argmin of validation curve
    best_iter = getattr(xgb_clf, "best_iteration", None)
    if best_iter is None or best_iter < 0:
        best_iter = int(np.argmin(val_curve))
    best_round = best_iter + 1  # 1-based for display

    # Plot
    plt.figure(figsize=(7,5))
    plt.plot(rounds, train_curve, label=f"Train ({metric})")
    plt.plot(rounds, val_curve,   label=f"Validation ({metric})")

    # Marker at best validation point
    plt.axvline(best_round, linestyle="--")
    plt.scatter([best_round], [val_curve[best_iter]], zorder=3)

    plt.title(f"Learning Curve: {metric} vs. iteration")
    plt.xlabel("Boosting round")
    plt.ylabel(metric)
    plt.legend()
    plt.grid(True)
    plt.show()

    print(f"Best iteration (0-based): {best_iter}")
    print(f"Best {metric} (validation): {val_curve[best_iter]:.6f} at round {best_round}")


## Validation

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import label_binarize
from sklearn.metrics import (
    classification_report, confusion_matrix, ConfusionMatrixDisplay,
    roc_auc_score, roc_curve, precision_recall_curve, average_precision_score
)


In [None]:
# --- VALIDATION ---
proba_val = xgb_clf.predict_proba(X_val)
pred_val  = proba_val.argmax(axis=1)

print("Validation classification report")
print(classification_report(y_val, pred_val, target_names=class_names, digits=3))

cm_val = confusion_matrix(y_val, pred_val, labels=np.arange(len(class_names)))
ConfusionMatrixDisplay(cm_val, display_labels=class_names).plot(cmap="Blues", values_format="d")
plt.title("Validation Confusion Matrix")
plt.show()


## Test

In [None]:
# --- TEST (only after you're happy with tuning) ---
proba_test = xgb_clf.predict_proba(X_test)
pred_test  = proba_test.argmax(axis=1)

print("Test classification report")
print(classification_report(y_test, pred_test, target_names=class_names, digits=3))

cm_test = confusion_matrix(y_test, pred_test, labels=np.arange(len(class_names)))
ConfusionMatrixDisplay(cm_test, display_labels=class_names).plot(cmap="Greens", values_format="d")
plt.title("Test Confusion Matrix")
plt.show()


In [None]:
cm_test = confusion_matrix(y_test, pred_test, labels=np.arange(len(class_names)))
cm_test

### Collect Metrics

In [None]:
import numpy as np

def eval_xgb_split(y_true, y_pred, class_names):
    """
    Build metrics for model predictions.
    """
    labels=np.arange(len(class_names))

    # Reports
    rep_dict = classification_report(y_true, y_pred, target_names=class_names, output_dict=True)
    rep_text = classification_report(y_true, y_pred, target_names=class_names, digits=3)

    # Confusion matrix (use provided label order to align with class_names)
    cm = confusion_matrix(y_true, y_pred, labels=labels)

    # Build details block in your schema
    details = {
        "macro": {
            "accuracy": rep_dict["accuracy"],
            "f1": rep_dict["macro avg"]["f1-score"],
            "precision": rep_dict["macro avg"]["precision"],
            "recall": rep_dict["macro avg"]["recall"],
        },
        "weighted": {
            "f1": rep_dict["weighted avg"]["f1-score"],
            "precision": rep_dict["weighted avg"]["precision"],
            "recall": rep_dict["weighted avg"]["recall"],
        },
        "per_class": {
            cls: {
                "f1":        rep_dict[cls]["f1-score"],
                "precision": rep_dict[cls]["precision"],
                "recall":    rep_dict[cls]["recall"],
            }
            for cls in class_names
        },
    }

    return {
        "accuracy": details["macro"]["accuracy"],
        "f1_macro": details["macro"]["f1"],
        "f1_weighted": details["weighted"]["f1"],
        "report": rep_text,
        "confusion_matrix": cm.tolist(),
        "details": details,
    }


In [None]:
val_metrics = eval_xgb_split(y_val, pred_val, class_names)
test_metrics = eval_xgb_split(y_test, pred_test, class_names)

## Feature Importance

In [None]:
# Feature importance 
importances = pd.Series(xgb_clf.feature_importances_, index=X_train.columns).sort_values(ascending=False)
print("Top 20 features:\n", importances.head(20))



## Feature Prunning

Check how the model performs after removing less important features.
The goal is to see how lean and performant (computational wise) the inference can get.

### Prunned Training

Train models progressively removing the least important features.
Feature importance is based on the training with all the features.

In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report

# Build pruning order from the CURRENT model's importances ---

# We'll drop least-important first, keeping the most important for last.
base_importances = pd.Series(
    xgb_clf.feature_importances_, index=X_train.columns, dtype=float
).fillna(0.0)

# order to DROP (ascending importance)
drop_order = base_importances.sort_values(ascending=True).index.tolist()

total_feats = len(X_train.columns)
start_feats = total_feats
step = 10
min_keep = 1 

print(f"Total features: {total_feats}. Pruning by {step}s, down to {min_keep} kept.")

# --- Sweep: ignore first k features (least important), retrain, evaluate ---
rows = []
kept_counts = list(range(start_feats, min_keep - 1, -step))
# ensure last point is exactly min_keep
if kept_counts[-1] != min_keep:
    kept_counts.append(min_keep)

for keep_n in kept_counts:
    # how many to drop
    drop_n = max(0, total_feats - keep_n)
    ignored_features = drop_order[:drop_n]

    print(f"\n=== Keep {keep_n} features (drop {drop_n}) ===")
    X_train_pruned = X_train.drop(columns=ignored_features, errors="ignore")
    X_val_pruned = X_val.drop(columns=ignored_features, errors="ignore")
    xgb_pruned = train_xgb(
        X_train_pruned, y_train,
        X_val_pruned, y_val,
        parameters=params,
    )

    # --- Evaluate on TEST ---
    Xte_eval = X_test.drop(columns=ignored_features, errors="ignore")
    y_pred = xgb_pruned.predict(Xte_eval)

    cr = classification_report(y_test, y_pred, target_names=class_names, output_dict=True)

    row = {
        "kept_features": keep_n,
        "dropped_features": drop_n,
        "best_iteration": int(getattr(xgb_pruned, "best_iteration", -1) or -1),
        "accuracy": cr["accuracy"],
        "macro_precision": cr["macro avg"]["precision"],
        "macro_recall": cr["macro avg"]["recall"],
        "macro_f1": cr["macro avg"]["f1-score"],
        "weighted_precision": cr["weighted avg"]["precision"],
        "weighted_recall": cr["weighted avg"]["recall"],
        "weighted_f1": cr["weighted avg"]["f1-score"],
    }
    for cls in class_names:
        if cls in cr:
            row[f"recall_{cls}"] = cr[cls]["recall"]
            row[f"f1_{cls}"] = cr[cls]["f1-score"]

    rows.append(row)




In [None]:
# Final tidy DataFrame for plotting
prune_results_df = pd.DataFrame(rows).sort_values("kept_features", ascending=False).reset_index(drop=True)
prune_results_df

### Plot Prunning Results

To check the affect of prunning on model performance.


In [None]:
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator

metrics_to_plot = ["accuracy", "macro_precision", "macro_recall", "macro_f1"]

plt.figure(figsize=(8,6))
for m in metrics_to_plot:
    plt.plot(
        prune_results_df["kept_features"],
        prune_results_df[m],
        label=m.replace("_", " ").title()
    )

plt.xlabel("Kept Features")
plt.ylabel("Score")
plt.title("Feature Pruning Impact on Performance")
plt.legend()

ax = plt.gca()

# Major x ticks every 100 (with labels)
ax.set_xticks(range(0, prune_results_df["kept_features"].max()+1, 100))

# Minor x ticks every 50 (grid only, no labels)
ax.xaxis.set_minor_locator(MultipleLocator(50))

# Grid: both x and y
ax.grid(True, which="major", axis="both")
ax.grid(True, which="minor", axis="x")  # vertical minor gridlines

plt.show()


## Noise Adding

Add noise to the Test Dataset to check model robustness against noisy data.

### Noisy Inference

In [None]:
import numpy as np
import pandas as pd
from sklearn.metrics import classification_report

# --- Config ---
# If you pruned features earlier, list them here so test columns match the model.
ignored_features = []  # e.g., from your pruning choice; keep [] if using all

# Columns actually used by the trained model
cols_used = list(getattr(xgb_clf, "feature_names_in_", X_test.columns))
# Ensure we drop any ignored features
cols_used = [c for c in cols_used if c not in set(ignored_features)]

# Base test matrix aligned to the model
Xte_base = X_test[cols_used].copy()

# Per-feature std (on the clean test set) for scaling noise
feat_std = Xte_base.std(axis=0, ddof=0).replace(0, 1e-12)  # guard zero-std

# Noise levels: 0%, 2%, ..., 20%
alphas = [a / 100.0 for a in range(0, 51, 2)]
rng = np.random.default_rng(123)

rows = []
for alpha in alphas:
    # Gaussian noise with sigma = alpha * std(feature)
    noise = rng.normal(loc=0.0, scale=(alpha * feat_std).to_numpy(), size=Xte_base.shape)
    Xte_noisy = Xte_base.to_numpy() + noise

    # Predict with the existing model
    y_pred = xgb_clf.predict(Xte_noisy)

    # Collect metrics (like pruning loop)
    cr = classification_report(y_test, y_pred, target_names=class_names, output_dict=True)

    row = {
        "noise_alpha": alpha,                  # 0.00 .. 0.20
        "noise_percent": int(alpha * 100),     # 0 .. 20 (for nicer x-axis later)
        "accuracy": cr["accuracy"],
        "macro_precision": cr["macro avg"]["precision"],
        "macro_recall": cr["macro avg"]["recall"],
        "macro_f1": cr["macro avg"]["f1-score"],
        "weighted_precision": cr["weighted avg"]["precision"],
        "weighted_recall": cr["weighted avg"]["recall"],
        "weighted_f1": cr["weighted avg"]["f1-score"],
    }
    for cls in class_names:
        if cls in cr:
            row[f"recall_{cls}"] = cr[cls]["recall"]
            row[f"f1_{cls}"] = cr[cls]["f1-score"]

    rows.append(row)

noise_results_df = pd.DataFrame(rows).sort_values("noise_alpha").reset_index(drop=True)
noise_results_df


### Plot Noise Performance

In [None]:
import matplotlib.pyplot as plt
from matplotlib.ticker import MultipleLocator

metrics_to_plot = ["accuracy", "macro_precision", "macro_recall", "macro_f1"]

plt.figure(figsize=(8,6))
for m in metrics_to_plot:
    plt.plot(
        noise_results_df["noise_percent"],
        noise_results_df[m],
        label=m.replace("_", " ").title()
    )

plt.xlabel("Noise Percent")
plt.ylabel("Score")
plt.title("Noise Impact on Performance")
plt.legend()

ax = plt.gca()

# Grid: both x and y
ax.grid(True, which="major", axis="both")
ax.grid(True, which="minor", axis="x")  # vertical minor gridlines

plt.show()


## Leave One Subject Out approach
Instead of infolds, lets use LOSO to check if the model still holds.

In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import label_binarize
from sklearn.metrics import (
    classification_report, confusion_matrix, ConfusionMatrixDisplay,
    roc_auc_score, roc_curve, precision_recall_curve, average_precision_score,
    accuracy_score
)
from sklearn.model_selection import GroupShuffleSplit
from xgboost import XGBClassifier


In [None]:
rng = seed
subjects_all = X.index.get_level_values("subject").unique().tolist()
C = len(class_names)

fold_rows = []
cm_sum = np.zeros((C, C), dtype=int)

for held in subjects_all:
    # Split by subject
    mask_test = (X.index.get_level_values("subject") == held)
    X_tr_full, y_tr_full = X.loc[~mask_test], y.loc[~mask_test]
    X_te, y_te = X.loc[mask_test], y.loc[mask_test]
    
    # Make a validation split from the training subjects (grouped by subject to avoid leakage)
    tr_subjects = X_tr_full.index.get_level_values("subject")
    gss = GroupShuffleSplit(test_size=0.2, n_splits=1, random_state=rng)
    tr_idx, val_idx = next(gss.split(X_tr_full, y_tr_full, groups=tr_subjects))
    X_tr, y_tr = X_tr_full.iloc[tr_idx], y_tr_full.iloc[tr_idx]
    X_val, y_val = X_tr_full.iloc[val_idx], y_tr_full.iloc[val_idx]

    # Model
    xgb_loso = train_xgb(
        X_tr, y_tr,
        X_val, y_val,
        parameters=params,
    )
    

    # Evaluate on the held-out subject
    proba = xgb_loso.predict_proba(X_te)
    pred  = proba.argmax(axis=1)

    acc = accuracy_score(y_te, pred)
    cm  = confusion_matrix(y_te, pred, labels=np.arange(C))
    cm_sum += cm

    fold_rows.append({
        "held_out_subject": held,
        "n_test": int(len(y_te)),
        "accuracy": float(acc),
    })

loso_df = pd.DataFrame(fold_rows).sort_values("held_out_subject").reset_index(drop=True)
loso_df


In [None]:
print("LOSO-CV summary over", len(loso_df), "folds")
print(loso_df[["held_out_subject","n_test","accuracy"]])

print("\nMeans ± std:")
for col in ["accuracy"]:
    print(f"{col}: {loso_df[col].mean():.3f} ± {loso_df[col].std():.3f}")

# Pooled confusion matrix (sum over folds)
disp = ConfusionMatrixDisplay(cm_sum, display_labels=class_names)
disp.plot(cmap="Purples", values_format="d")
plt.title("LOSO-CV Pooled Confusion Matrix")
plt.show()


## Persist Model

In [None]:
from pathlib import Path
import json, joblib, time

xgb_final = xgb_clf

OUT.mkdir(parents=True, exist_ok=True)

joblib.dump(xgb_final, OUT / "xgb_model.joblib")
importances.head(200).to_csv(OUT / "top_features.csv")  # quick peek

params = xgb_final.get_params()
params["best_iteration"] = int(getattr(xgb_final, "best_iteration", -1) or -1)
params["n_features"] = int(X.shape[1])

summary = {
    "model": "XGBoost",
    "params": params,
    "train_time": {},
    "val": {k: (v if k != "report" else None) for k, v in val_metrics.items()},
    "test": {k: (v if k != "report" else None) for k, v in test_metrics.items()},
    "class_names": class_names,
}

with open(OUT / "metrics.json", "w") as f:
    json.dump(summary, f, indent=2)

print("Saved model + metadata to:", OUT)
