<a href="https://colab.research.google.com/github/krishna-gera/my-aiml-learning/blob/main/day-27/day27_shap_rules_mlp.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# day27_shap_rules_mlp.py
"""
Day 27 - SHAP explainability, simple rule discovery, MLP comparison, calibration & threshold tuning.
Inputs:
  - train_processed.csv  (must contain TARGET column, e.g. 'Survived')
  - test_processed.csv   (must contain PassengerId/Id)
Outputs:
  - day27_submission.csv
  - day27_models.joblib
  - day27_report.json
"""

import os
import json
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import joblib

import sklearn
from packaging import version

from sklearn.model_selection import train_test_split
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import RandomForestClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.calibration import CalibratedClassifierCV
from sklearn.metrics import accuracy_score, roc_auc_score, classification_report, confusion_matrix, f1_score
from sklearn.inspection import permutation_importance

# Try to import shap (optional)
try:
    import shap
    SHAP_AVAILABLE = True
except Exception:
    SHAP_AVAILABLE = False

# ---------------------------
# Config / Filenames
# ---------------------------
TRAIN_FILE = "train_processed.csv"
TEST_FILE = "test_processed.csv"
TARGET = "Survived"   # change if your target name differs
ID_CANDIDATES = ["PassengerId", "Id", "ID", "passengerid"]

OUTPUT_MODELS = "day27_models.joblib"
OUTPUT_SUB = "day27_submission.csv"
OUTPUT_REPORT = "day27_report.json"

RANDOM_STATE = 42
SHAP_SAMPLE = 1000  # max samples for SHAP computations (keeps it fast)

# ---------------------------
# Helpers
# ---------------------------
def find_id_col(df):
    for c in ID_CANDIDATES:
        if c in df.columns:
            return c
    return None

def version_safe_ohe():
    if version.parse(sklearn.__version__) >= version.parse("1.2"):
        return OneHotEncoder(handle_unknown="ignore", sparse_output=False)
    else:
        return OneHotEncoder(handle_unknown="ignore", sparse=False)

def build_preprocessor(X_df):
    numeric_cols = X_df.select_dtypes(include=["int64","float64"]).columns.tolist()
    # Exclude 'bool' from categorical types passed to SimpleImputer
    categorical_cols = X_df.select_dtypes(include=["object","category"]).columns.tolist()
    num_pipe = Pipeline([
        ("imputer", SimpleImputer(strategy="median")),
        ("scale", StandardScaler())
    ])
    transformers = [("num", num_pipe, numeric_cols)]
    if categorical_cols:
        cat_pipe = Pipeline([
            ("imputer", SimpleImputer(strategy="most_frequent")),
            ("onehot", version_safe_ohe())
        ])
        transformers.append(("cat", cat_pipe, categorical_cols))
    preprocessor = ColumnTransformer(transformers=transformers, remainder="drop")
    return preprocessor, numeric_cols, categorical_cols

def get_feature_names(preprocessor, numeric_cols, categorical_cols):
    feature_names = []
    if numeric_cols:
        feature_names.extend(numeric_cols)
    if categorical_cols:
        try:
            # get the fitted onehot encoder
            ohe = preprocessor.named_transformers_["cat"].named_steps["onehot"]
            ohe_names = list(ohe.get_feature_names_out(categorical_cols))
            feature_names.extend(ohe_names)
        except Exception:
            # fallback generic names
            n_extra = preprocessor.transformers_[0][2]  # not reliable but fallback below
            # if fallback, create generic names while we know size from preprocessed shape later.
            pass
    return feature_names

def cohen_d(a, b):
    a = np.array(a, dtype=float)
    b = np.array(b, dtype=float)
    if len(a) < 2 or len(b) < 2:
        return 0.0
    na, nb = len(a), len(b)
    s_a = a.std(ddof=1)
    s_b = b.std(ddof=1)
    pooled = np.sqrt(((na - 1)*s_a**2 + (nb - 1)*s_b**2) / (na + nb - 2)) if (na+nb-2)>0 else 0.0
    if pooled == 0:
        return 0.0
    return (a.mean() - b.mean()) / pooled

# ---------------------------
# Main
# ---------------------------
def main():
    # 1) load
    if not os.path.exists(TRAIN_FILE) or not os.path.exists(TEST_FILE):
        raise FileNotFoundError("Make sure train_processed.csv and test_processed.csv are in the working directory.")
    train = pd.read_csv(TRAIN_FILE)
    test = pd.read_csv(TEST_FILE)
    id_col = find_id_col(test)

    if TARGET not in train.columns:
        raise ValueError(f"Target column '{TARGET}' not found in {TRAIN_FILE}")

    drop_cols = [TARGET]
    if id_col and id_col in train.columns:
        drop_cols.append(id_col)

    X = train.drop(columns=drop_cols, errors="ignore")
    y = train[TARGET].copy()

    test_ids = test[id_col] if id_col else pd.Series(np.arange(len(test)), name="Id")
    X_test = test.drop(columns=[id_col], errors="ignore") if id_col else test.copy()

    # 2) split train/holdout
    X_train, X_hold, y_train, y_hold = train_test_split(
        X, y, test_size=0.20, random_state=RANDOM_STATE, stratify=y
    )

    # 3) preprocessing
    preprocessor, numeric_cols, categorical_cols = build_preprocessor(X_train)
    X_train_proc = preprocessor.fit_transform(X_train)
    X_hold_proc = preprocessor.transform(X_hold)
    X_test_proc = preprocessor.transform(X_test)

    # feature names (best-effort)
    feature_names = []
    # numeric first
    feature_names.extend(numeric_cols)
    # categorical ohe names if possible
    if categorical_cols:
        try:
            ohe = preprocessor.named_transformers_["cat"].named_steps["onehot"]
            ohe_names = list(ohe.get_feature_names_out(categorical_cols))
            feature_names.extend(ohe_names)
        except Exception:
            # create generic names based on transformed shape
            total_cols = X_train_proc.shape[1]
            if len(feature_names) < total_cols:
                n_extra = total_cols - len(feature_names)
                feature_names.extend([f"cat_ohe_{i}" for i in range(n_extra)])
    else:
        # ensure names length matches proc
        total_cols = X_train_proc.shape[1]
        if len(feature_names) < total_cols:
            feature_names = [f"f_{i}" for i in range(total_cols)]

    # 4) train models (RF and small MLP)
    rf = RandomForestClassifier(n_estimators=300, random_state=RANDOM_STATE, n_jobs=-1)
    mlp = MLPClassifier(hidden_layer_sizes=(64,32), max_iter=600, random_state=RANDOM_STATE)

    rf.fit(X_train_proc, y_train)
    mlp.fit(X_train_proc, y_train)

    # holdout predictions & metrics
    rf_proba = rf.predict_proba(X_hold_proc)[:, 1]
    rf_pred = (rf_proba >= 0.5).astype(int)
    mlp_proba = mlp.predict_proba(X_hold_proc)[:, 1]
    mlp_pred = (mlp_proba >= 0.5).astype(int)

    def summarize(name, y_true, y_pred, y_proba):
        acc = accuracy_score(y_true, y_pred)
        roc = roc_auc_score(y_true, y_proba) if y_proba is not None else None
        f1 = f1_score(y_true, y_pred)
        return {"name": name, "accuracy": float(acc), "roc_auc": float(roc) if roc is not None else None, "f1": float(f1)}

    rf_stats = summarize("RandomForest", y_hold, rf_pred, rf_proba)
    mlp_stats = summarize("MLP", y_hold, mlp_pred, mlp_proba)

    # choose base for explainability & rule-search (the better ROC AUC on holdout)
    base_model = rf if rf_stats["roc_auc"] >= (mlp_stats["roc_auc"] or 0) else mlp
    base_proba = rf_proba if base_model is rf else mlp_proba
    base_pred = rf_pred if base_model is rf else mlp_pred
    chosen_name = "RandomForest" if base_model is rf else "MLP"

    # 5) Explainability: SHAP if available, else permutation importance
    shap_top_features = []
    perm_top_features = []
    try:
        if SHAP_AVAILABLE and isinstance(base_model, RandomForestClassifier):
            # compute TreeExplainer SHAP on a sample (fast)
            expl = shap.TreeExplainer(base_model)
            sample_idx = np.random.choice(X_train_proc.shape[0], min(SHAP_SAMPLE, X_train_proc.shape[0]), replace=False)
            X_shap = X_train_proc[sample_idx]
            shap_vals = expl.shap_values(X_shap)  # note: structure depends on shap version
            # normalize to mean abs values
            if isinstance(shap_vals, list) and len(shap_vals) >= 2:
                # binary classifier -> shap_vals[1] corresponds to positive class
                arr = np.abs(shap_vals[1]).mean(axis=0)
            else:
                arr = np.abs(shap_vals).mean(axis=0)
            feat_imp = sorted(zip(feature_names, arr), key=lambda x: x[1], reverse=True)
            shap_top_features = feat_imp[:20]
        else:
            raise Exception("SHAP not available or not a tree model -> fallback to permutation.")
    except Exception:
        # permutation importance fallback (on holdout)
        pi = permutation_importance(base_model, X_hold_proc, y_hold, n_repeats=10, random_state=RANDOM_STATE, n_jobs=-1)
        arr = pi.importances_mean
        feat_imp = sorted(zip(feature_names, arr), key=lambda x: x[1], reverse=True)
        perm_top_features = feat_imp[:20]

    # 6) Error analysis on holdout (find FN/FP and numeric Cohen's d)
    hold_df = X_hold.reset_index(drop=True).copy()
    hold_df[TARGET] = y_hold.reset_index(drop=True)
    hold_df["pred"] = base_pred
    hold_df["proba"] = base_proba

    tp = hold_df[(hold_df[TARGET] == 1) & (hold_df["pred"] == 1)]
    fn = hold_df[(hold_df[TARGET] == 1) & (hold_df["pred"] == 0)]
    tn = hold_df[(hold_df[TARGET] == 0) & (hold_df["pred"] == 0)]
    fp = hold_df[(hold_df[TARGET] == 0) & (hold_df["pred"] == 1)]

    numeric_candidates = numeric_cols.copy()
    # compute Cohen's d for FN vs TP, FP vs TN
    fn_diffs = []
    fp_diffs = []
    for col in numeric_candidates:
        try:
            d_fn = cohen_d(fn[col].dropna(), tp[col].dropna()) if (len(fn)>1 and len(tp)>1) else 0.0
            d_fp = cohen_d(fp[col].dropna(), tn[col].dropna()) if (len(fp)>1 and len(tn)>1) else 0.0
        except Exception:
            d_fn, d_fp = 0.0, 0.0
        fn_diffs.append((col, d_fn))
        fp_diffs.append((col, d_fp))
    fn_diffs_sorted = sorted(fn_diffs, key=lambda x: abs(x[1]), reverse=True)
    fp_diffs_sorted = sorted(fp_diffs, key=lambda x: abs(x[1]), reverse=True)

    top_fn_cols = [c for c, _ in fn_diffs_sorted[:5]]
    top_fp_cols = [c for c, _ in fp_diffs_sorted[:5]]
    top_candidates = list(dict.fromkeys(top_fn_cols + top_fp_cols))  # unique preserve order

    # 7) Candidate rule generation & evaluation (single-rule search)
    # We'll search simple threshold rules for numeric features and category rules for categorical.
    best_rule = {"type": None, "feature": None, "op": None, "threshold": None, "improvement_f1": 0.0, "new_f1": f1_score(y_hold, base_pred)}
    base_f1 = f1_score(y_hold, base_pred)

    # numeric rules: try thresholds at 25/50/75 percentiles
    percentiles = [0.25, 0.5, 0.75]
    for col in top_candidates:
        if col not in hold_df.columns:
            continue
        col_vals = X_hold[col].dropna()
        if len(col_vals) == 0:
            continue
        for p in percentiles:
            thresh = float(np.nanpercentile(X_hold[col].values, p*100))
            # two ops: if predicted==0 and val >= thresh -> flip to 1  (catch FN)
            new_preds = base_pred.copy()
            cond = (X_hold[col].values >= thresh) & (base_pred == 0)
            new_preds[cond] = 1
            new_f1 = f1_score(y_hold, new_preds)
            if new_f1 > best_rule["new_f1"]:
                best_rule = {"type":"numeric", "feature":col, "op":">=", "threshold":thresh, "improvement_f1": new_f1 - base_f1, "new_f1": new_f1}
            # other direction: if predicted==1 and val <= thresh -> flip to 0 (catch FP)
            new_preds2 = base_pred.copy()
            cond2 = (X_hold[col].values <= thresh) & (base_pred == 1)
            new_preds2[cond2] = 0
            new_f1b = f1_score(y_hold, new_preds2)
            if new_f1b > best_rule["new_f1"]:
                best_rule = {"type":"numeric", "feature":col, "op":"<=", "threshold":thresh, "improvement_f1": new_f1b - base_f1, "new_f1": new_f1b}

    # categorical rules: look for categories overrepresented in FN or FP
    if categorical_cols:
        for col in categorical_cols:
            # skip if too many categories
            vc = X_hold[col].value_counts(normalize=True)
            common_cats = vc[vc.index.notnull()].head(10).index.tolist()
            for cat in common_cats:
                # rule: when category==cat and pred==0 -> flip to 1
                new_preds = base_pred.copy()
                cond = (X_hold[col].values == cat) & (base_pred == 0)
                if cond.sum() == 0:
                    continue
                new_preds[cond] = 1
                new_f1 = f1_score(y_hold, new_preds)
                if new_f1 > best_rule["new_f1"]:
                    best_rule = {"type":"categorical", "feature":col, "value":cat, "op":"==", "improvement_f1": new_f1 - base_f1, "new_f1": new_f1}

    # Keep the best rule only if it improves F1
    selected_rule = None
    if best_rule["new_f1"] > base_f1 + 1e-6:
        selected_rule = best_rule

    # 8) Probability calibration (on training set) - test if it improves ROC AUC on holdout
    calibrated = None
    try:
        calibrator = CalibratedClassifierCV(base_model, cv=5, method='sigmoid')
        calibrator.fit(X_train_proc, y_train)
        cal_proba_hold = calibrator.predict_proba(X_hold_proc)[:, 1]
        cal_roc = roc_auc_score(y_hold, cal_proba_hold)
        raw_roc = roc_auc_score(y_hold, base_proba)
        if cal_roc >= raw_roc:
            calibrated = calibrator
            used_proba_hold = cal_proba_hold
            used_proba_test = calibrator.predict_proba(X_test_proc)[:, 1]
            calib_used = True
        else:
            used_proba_hold = base_proba
            used_proba_test = None  # compute later from base_model
            calib_used = False
    except Exception:
        used_proba_hold = base_proba
        used_proba_test = None
        calib_used = False

    # 9) Threshold tuning (maximize F1 on holdout) using used_proba_hold (calibrated if used)
    probs_for_thresh = used_proba_hold
    best_t = 0.5
    best_f1_thresh = base_f1
    for t in np.linspace(0.1, 0.9, 81):
        preds_t = (probs_for_thresh >= t).astype(int)
        # if rule selected, apply rule to these preds for evaluation
        if selected_rule is not None:
            preds_t = apply_rule_to_preds(preds_t, X_hold, selected_rule)
        cur_f1 = f1_score(y_hold, preds_t)
        if cur_f1 > best_f1_thresh:
            best_f1_thresh = cur_f1
            best_t = float(t)

    # 10) Final model choice: choose between rf/mlp/calibrated using holdout ROC/F1
    # compute chosen models' proba on holdout
    rf_hold_proba = rf.predict_proba(X_hold_proc)[:, 1]
    mlp_hold_proba = mlp.predict_proba(X_hold_proc)[:, 1]
    rf_f1 = f1_score(y_hold, (rf_hold_proba >= 0.5).astype(int))
    mlp_f1 = f1_score(y_hold, (mlp_hold_proba >= 0.5).astype(int))
    # prefer calibrated base if available
    candidates = []
    candidates.append(("rf", rf, rf_hold_proba, rf_f1))
    candidates.append(("mlp", mlp, mlp_hold_proba, mlp_f1))
    if calibrated is not None:
        cal_hold_proba = calibrator.predict_proba(X_hold_proc)[:, 1]
        cal_f1 = f1_score(y_hold, (cal_hold_proba >= 0.5).astype(int))
        candidates.append(("calibrated_base", calibrator, cal_hold_proba, cal_f1))
    # pick candidate with highest F1 on holdout (after applying selected_rule + threshold tuning)
    best_candidate = None
    best_candidate_score = -1
    for name, model_obj, proba, f1raw in candidates:
        # use best_t
        preds_candidate = (proba >= best_t).astype(int)
        if selected_rule is not None:
            preds_candidate = apply_rule_to_preds(preds_candidate, X_hold, selected_rule)
        cur_f1 = f1_score(y_hold, preds_candidate)
        if cur_f1 > best_candidate_score:
            best_candidate_score = cur_f1
            best_candidate = {"name": name, "model": model_obj, "proba_hold": proba, "f1": cur_f1}

    # 11) Final predictions on test
    if best_candidate["name"] == "rf":
        test_proba = rf.predict_proba(X_test_proc)[:, 1]
    elif best_candidate["name"] == "mlp":
        test_proba = mlp.predict_proba(X_test_proc)[:, 1]
    else:
        test_proba = calibrator.predict_proba(X_test_proc)[:, 1]

    test_preds = (test_proba >= best_t).astype(int)
    if selected_rule is not None:
        test_preds = apply_rule_to_preds(test_preds, X_test, selected_rule)

    # 12) Save submission, models, report
    submission = pd.DataFrame({ id_col if id_col else "Id": test_ids, TARGET: test_preds })
    submission.to_csv(OUTPUT_SUB, index=False)

    saved_obj = {
        "preprocessor": preprocessor,
        "rf": rf,
        "mlp": mlp,
        "calibrator": calibrator if 'calibrator' in locals() else None,
        "chosen_model": best_candidate["name"],
        "selected_rule": selected_rule,
        "threshold": best_t
    }
    joblib.dump(saved_obj, OUTPUT_MODELS)

    report = {
        "rf_stats": rf_stats,
        "mlp_stats": mlp_stats,
        "chosen_explainability": "shap" if len(shap_top_features)>0 else "permutation",
        "shap_top": shap_top_features[:10] if shap_top_features else [],
        "perm_top": perm_top_features[:10] if perm_top_features else [],
        "fn_fp_counts": {"TP": int(len(tp)), "FN": int(len(fn)), "TN": int(len(tn)), "FP": int(len(fp))},
        "top_fn_numeric": fn_diffs_sorted[:10],
        "top_fp_numeric": fp_diffs_sorted[:10],
        "selected_rule": selected_rule,
        "best_threshold": best_t,
        "best_holdout_f1_after_rule": float(best_candidate["f1"]),
        "notes": "If SHAP not available, permutation importance used. Keep an eye on selected_rule generalization to test set."
    }
    with open(OUTPUT_REPORT, "w") as f:
        json.dump(report, f, indent=2)

    print("Done. Outputs:")
    print("-", OUTPUT_SUB)
    print("-", OUTPUT_MODELS)
    print("-", OUTPUT_REPORT)


# Helper function defined after main to keep code flow simple
def apply_rule_to_preds(preds, X_df, rule):
    """Apply single selected_rule to numpy preds array and DataFrame X_df; return modified preds (numpy)."""
    preds = preds.copy()
    if rule is None:
        return preds
    if rule["type"] == "numeric":
        feat = rule["feature"]
        thr = float(rule["threshold"])
        if rule["op"] == ">=":
            cond = (X_df[feat].values >= thr) & (preds == 0)
            preds[cond] = 1
        elif rule["op"] == "<=":
            cond = (X_df[feat].values <= thr) & (preds == 1)
            preds[cond] = 0
    elif rule["type"] == "categorical":
        feat = rule["feature"]
        val = rule["value"]
        if rule["op"] == "==":
            # flip 0->1 where category==val
            cond = (X_df[feat].values == val) & (preds == 0)
            preds[cond] = 1
    return preds

if __name__ == "__main__":
    main()

Done. Outputs:
- day27_submission.csv
- day27_models.joblib
- day27_report.json
