<a href="https://colab.research.google.com/github/sankeawthong/Project-1-Lita-Chatbot/blob/main/%5B20251021%5D%20Train_all_in_one%20%E2%80%94%20Full%20revision%20with%20derived%20features%20Calibrated%20%26%20Adversarially%20Robust%20IDS%20for%20IoT/IoMT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

[20251021] Train_all_in_one — Full revision with derived features
Calibrated & Adversarially Robust IDS for IoT/IoMT: LR→MLP (Edge–Cloud)
This script retrains the full LR→MLP pipeline end-to-end for:
*   CIC_IoMT tiny-slice protocol
*   NF-ToN-IoT in-domain protocol

It reproduces all manuscript figures and exports required results

In [None]:
import os, json, zipfile, warnings
from pathlib import Path
import numpy as np
import pandas as pd
import joblib
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import roc_curve, precision_recall_curve, confusion_matrix, auc
from sklearn.isotonic import IsotonicRegression
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

# === PATCH: robustness utilities & CIC training with attack pool ===
from pathlib import Path
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import SMOTE

In [None]:
# ====================
# CONFIGURATION
# ====================

CIC_CALIB_PATH = "/content/CIC_tiny_slice_calib (1).csv"
CIC_TEST_PATH  = "/content/CIC_tiny_slice_test (1).csv"
NF_FULL_PATH   = "/content/Dataset_NF-ToN-IoT.csv"
OUT_DIR  = Path("/content/paper_exports")
MODEL_DIR = Path("/content/results_models")

HP = dict(
    lr_C=1.0, lr_penalty="l2", lr_max_iter=200,
    mlp_hidden=(32,), mlp_alpha=1e-4, mlp_batch_size=512,
    mlp_epochs=20, smote_on=True, adv_eps=0.10,
    pgd_steps=7, pgd_alpha=0.02, bins_calibration=15
)
SEED = 42
np.random.seed(SEED)

In [None]:
# ---- Add this CONFIG path to your header too ----
CIC_ATTACK_POOL_PATH = "/content/CIC_IoMT_2024_WiFi_MQTT_train.csv"  # attack-only pool

def derive_binary_patched(df: pd.DataFrame) -> pd.Series:
    """
    Derive Binary: 0=benign, 1=attack.
    Accepts Binary / Class / Label / label. Maps 'o' -> '0', recognizes 'benign' text.
    """
    # 1) Binary column
    if "Binary" in df.columns:
        s = df["Binary"].astype(str).str.strip().str.lower().replace({"o":"0"})
        y = pd.to_numeric(s, errors="coerce")
        # If any NaN after coercion, backfill from Class/Label
        if y.isna().any():
            if "Class" in df.columns:
                cls = df["Class"].astype(str).str.strip().str.lower().replace({"o":"0"})
                y = y.fillna((cls != "0").astype(int))
            if "Label" in df.columns:
                lab = df["Label"].astype(str).str.strip().str.lower()
                y = y.fillna((~lab.str.contains("benign")).astype(int))
            if "label" in df.columns:
                lab = df["label"].astype(str).str.strip().str.lower()
                y = y.fillna((~lab.str.contains("benign")).astype(int))
        return y.fillna(1).astype(int)

    # 2) Class numeric
    if "Class" in df.columns:
        cls = df["Class"].astype(str).str.strip().str.lower().replace({"o":"0"})
        return (cls != "0").astype(int)

    # 3) Label / label textual
    for txtcol in ("Label","label"):
        if txtcol in df.columns:
            lab = df[txtcol].astype(str).str.strip().str.lower()
            return (~lab.str.contains("benign")).astype(int)

    raise ValueError("Cannot derive Binary — need one of: Binary, Class, Label/label")

def prepare_Xy_numeric(df: pd.DataFrame):
    """
    Safer feature builder:
    - Build y from patched mapping
    - Drop label-ish columns
    - Keep numeric columns only (avoids string-to-float errors)
    """
    y = derive_binary_patched(df).values
    drop_cols = [c for c in ["Binary","Label","label","Class","class"] if c in df.columns]
    X = df.drop(columns=drop_cols, errors="ignore")
    X = X.select_dtypes(include=[np.number])  # numeric only
    return X, y

def _safe_smote(X: pd.DataFrame, y: np.ndarray, random_state=42):
    """Apply SMOTE only if both classes are present and min class >= 2."""
    classes, counts = np.unique(y, return_counts=True)
    if len(classes) < 2:
        # single class -> skip SMOTE
        return X, y
    if counts.min() < 2:
        # too few minority samples for SMOTE -> skip
        return X, y
    return SMOTE(random_state=random_state).fit_resample(X, y)

In [None]:
# ====================
# UTILITIES
# ====================
def read_df(path): return pd.read_csv(path, low_memory=False)
def derive_binary(df):
    if "Binary" in df.columns:
        s = df["Binary"].astype(str).str.strip().str.lower().replace({"o":"0"})
        y = pd.to_numeric(s, errors="coerce").fillna(1)
        return y.astype(int)
    if "Class" in df.columns:
        return (df["Class"].astype(str).str.replace("o","0")!="0").astype(int)
    if "label" in df.columns:
        return (~df["label"].astype(str).str.lower().str.contains("benign")).astype(int)
    raise ValueError("Cannot derive Binary label")

def prepare_Xy(df):
    y = derive_binary(df).values
    drop_cols = [c for c in ["Binary","Label","label","Class","class"] if c in df.columns]
    X = df.drop(columns=drop_cols, errors="ignore")
    return X, y

def split_nf(df):
    if "Binary" not in df.columns:
        df["Binary"] = derive_binary(df)
    tr, tst = train_test_split(df, test_size=0.2, random_state=SEED, stratify=df["Binary"])
    cal, _  = train_test_split(tr, test_size=0.9, random_state=SEED, stratify=tr["Binary"])
    return cal, tst

In [None]:
# ====================
# PIPELINES
# ====================
def build_pipe_and_fit(X, y):
    pre = ColumnTransformer([("num", Pipeline([
        ("imp", SimpleImputer(strategy="median")),
        ("sc", StandardScaler())
    ]), X.columns)], remainder="drop")
    lr = LogisticRegression(C=HP["lr_C"], penalty=HP["lr_penalty"],
                            solver="lbfgs", max_iter=HP["lr_max_iter"], random_state=SEED)
    pipe = Pipeline([("pre", pre), ("lr", lr)])
    pipe.fit(X, y)
    return pipe, lr

def fit_mlp_on_logits(z, y):
    mlp = MLPClassifier(hidden_layer_sizes=HP["mlp_hidden"],
                        alpha=HP["mlp_alpha"], batch_size=HP["mlp_batch_size"],
                        max_iter=HP["mlp_epochs"], random_state=SEED)
    mlp.fit(z.reshape(-1,1), y)
    return mlp

def temperature_fit(z, y, iters=300, lr=0.01):
    T = 1.0
    for _ in range(iters):
        s = 1/(1+np.exp(-(z/T)))
        grad = ((s-y)*z).sum()/(T**2+1e-12)
        T = max(1e-3, T-lr*grad)
    return float(T)

In [None]:
# ====================
# ADVERSARIAL
# ====================
def lr_grad(pipe, X, y):
    lr = pipe.named_steps["lr"]; pre = pipe.named_steps["pre"]
    Xs = pre.transform(X); z = lr.decision_function(X)
    s = 1/(1+np.exp(-z)); w = lr.coef_.ravel()
    grad = (s-y).reshape(-1,1)*w.reshape(1,-1)
    return Xs, grad, w

def fgsm(pipe, X, y, T, mlp, iso, eps):
    Xs, grad, _ = lr_grad(pipe, X, y)
    Xs_adv = Xs + np.sign(grad)*eps
    lr = pipe.named_steps["lr"]
    z = Xs_adv@lr.coef_.ravel() + lr.intercept_[0]
    p_uncal = mlp.predict_proba(z.reshape(-1,1))[:,1]
    p_temp = 1/(1+np.exp(-(z/T)))
    p_iso = iso.transform(p_uncal)
    return dict(z=z, p_uncal=p_uncal, p_temp=p_temp, p_iso=p_iso)

In [None]:
# ====================
# EXPORTS
# ====================
def dump(tag, y, p_uncal, p_temp, p_iso):
    pd.DataFrame(dict(y_true=y,p_uncal=p_uncal,p_temp=p_temp,p_iso=p_iso)).to_csv(OUT_DIR/f"reliability_{tag}.csv", index=False)
def plot_conf(tag, y, p, dr=0.95):
    fpr,tpr,thr=roc_curve(y,p); thr=thr[np.argmax(tpr>=dr)]
    yhat=(p>=thr).astype(int); tn,fp,fn,tp=confusion_matrix(y,yhat).ravel()
    mat=np.array([[tn,fp],[fn,tp]])
    plt.imshow(mat); plt.title(tag); plt.savefig(OUT_DIR/f"conf_{tag}.png"); plt.close()

In [None]:
# ====================
# TRAINING
# ====================
def train_and_export_patched():
    OUT_DIR.mkdir(parents=True, exist_ok=True)
    MODEL_DIR.mkdir(parents=True, exist_ok=True)

    # ----- CIC IoMT tiny-slice -----
    print("[CIC] Loading tiny-slice (benign-only) for calibration & an attack pool for training")
    cic_cal = pd.read_csv(CIC_CALIB_PATH, low_memory=False)
    cic_tst = pd.read_csv(CIC_TEST_PATH, low_memory=False)
    if not Path(CIC_ATTACK_POOL_PATH).exists():
        raise FileNotFoundError(f"CIC attack pool not found at {CIC_ATTACK_POOL_PATH}")

    cic_attack_pool = pd.read_csv(CIC_ATTACK_POOL_PATH, low_memory=False)

    # Build labels
    y_cal = derive_binary_patched(cic_cal).values
    y_test = derive_binary_patched(cic_tst).values
    y_pool = derive_binary_patched(cic_attack_pool).values

    # Split into benign/attack pools
    cal_ben = cic_cal[y_cal == 0]
    pool_att = cic_attack_pool[y_pool == 1]

    if len(cal_ben) == 0:
        raise ValueError("CIC_tiny_slice_calib.csv does not contain benign rows after mapping.")

    if len(pool_att) == 0:
        raise ValueError("CIC attack pool has no attack rows after mapping. Provide a valid attack CSV.")

    # Sample a modest number of attacks to pair with benign tiny-slice
    # Strategy: equal number to benign (cap to avoid heavy training)
    n_ben = len(cal_ben)
    n_att = min(len(pool_att), n_ben)  # balance
    att_sample = pool_att.sample(n=n_att, random_state=SEED)

    # Build CIC train set: benign (from tiny-slice) + attacks (from pool)
    cic_train_df = pd.concat([cal_ben, att_sample], axis=0, ignore_index=True)
    X_cic_tr, y_cic_tr = prepare_Xy_numeric(cic_train_df)
    X_cic_te, y_cic_te = prepare_Xy_numeric(cic_tst)

    print(f"[CIC] Train shapes: X={X_cic_tr.shape}, y counts={np.unique(y_cic_tr, return_counts=True)}")
    print(f"[CIC] Test  shapes: X={X_cic_te.shape}, y counts={np.unique(y_cic_te, return_counts=True)}")

    # Preproc + LR
    X_bal, y_bal = (_safe_smote(X_cic_tr, y_cic_tr, random_state=SEED) if HP.get("smote_on", True) else (X_cic_tr, y_cic_tr))
    cic_pipe, cic_lr = build_pipe_and_fit(X_bal, y_bal)

    # MLP on logits
    z_tr = cic_pipe.decision_function(X_cic_tr).ravel()
    cic_mlp = fit_mlp_on_logits(z_tr, y_cic_tr)

    # Calibrators: use a small calibration mix (benign from tiny-slice + a small attack subset)
    n_cal_att = min(len(att_sample), max(50, len(cal_ben)//2))
    cal_mix_df = pd.concat([cal_ben, att_sample.sample(n=n_cal_att, random_state=SEED)], axis=0, ignore_index=True)
    X_cal_mix, y_cal_mix = prepare_Xy_numeric(cal_mix_df)

    z_cal = cic_pipe.decision_function(X_cal_mix).ravel()
    T_cic = temperature_fit(z_cal, y_cal_mix)
    p_uncal_cal = cic_mlp.predict_proba(z_cal.reshape(-1,1))[:,1]
    iso_cic = IsotonicRegression(out_of_bounds='clip').fit(p_uncal_cal, y_cal_mix)

    # Save deployables
    joblib.dump(cic_pipe, MODEL_DIR/"CIC_tiny_slice_pipe.joblib")
    joblib.dump(cic_mlp,  MODEL_DIR/"CIC_tiny_slice_mlp.joblib")

    # Clean eval & exports
    z_te = cic_pipe.decision_function(X_cic_te).ravel()
    p_uncal = cic_mlp.predict_proba(z_te.reshape(-1,1))[:,1]
    p_temp  = 1/(1+np.exp(-(z_te/T_cic)))
    p_iso   = iso_cic.transform(p_uncal)

    pd.DataFrame({"y_true":y_cic_te,"p_uncal":p_uncal,"p_temp":p_temp,"p_iso":p_iso}).to_csv(OUT_DIR/"reliability_cic_clean.csv", index=False)

    def _plot_conf(tag, y, p, dr=0.95):
        fpr,tpr,thr = roc_curve(y,p); thr = thr[np.argmax(tpr>=dr)]
        yhat=(p>=thr).astype(int); tn,fp,fn,tp=confusion_matrix(y,yhat).ravel()
        mat=np.array([[tn,fp],[fn,tp]])
        plt.figure(figsize=(3.4,3.0)); plt.imshow(mat)
        for i in range(2):
            for j in range(2):
                plt.text(j,i,str(mat[i,j]),ha="center",va="center",color="white" if mat[i,j]>0 else "black")
        plt.title(f"Confusion @DR=0.95: {tag}"); plt.tight_layout()
        plt.savefig(OUT_DIR/f"confusion_{tag}.png", dpi=200); plt.close()

    _plot_conf("cic_clean", y_cic_te, p_temp)

    # ----- NF-ToN-IoT in-domain (unchanged, but ensure numeric-only features) -----
    df_nf = pd.read_csv(NF_FULL_PATH, low_memory=False)
    # derive binary
    if "Binary" not in df_nf.columns:
        df_nf["Binary"] = derive_binary_patched(df_nf)

    nf_tr, nf_tst = train_test_split(df_nf, test_size=0.2, random_state=SEED, stratify=df_nf["Binary"])
    nf_cal, _ = train_test_split(nf_tr, test_size=0.9, random_state=SEED, stratify=nf_tr["Binary"])

    X_nf_tr, y_nf_tr = prepare_Xy_numeric(nf_cal)
    X_nf_te, y_nf_te = prepare_Xy_numeric(nf_tst)

    X_nf_bal, y_nf_bal = (_safe_smote(X_nf_tr, y_nf_tr, random_state=SEED) if HP.get("smote_on", True) else (X_nf_tr, y_nf_tr))
    nf_pipe, nf_lr = build_pipe_and_fit(X_nf_bal, y_nf_bal)

    z_nf_tr = nf_pipe.decision_function(X_nf_tr).ravel()
    nf_mlp = fit_mlp_on_logits(z_nf_tr, y_nf_tr)

    T_nf = temperature_fit(z_nf_tr, y_nf_tr)
    p_uncal_nf_tr = nf_mlp.predict_proba(z_nf_tr.reshape(-1,1))[:,1]
    iso_nf = IsotonicRegression(out_of_bounds='clip').fit(p_uncal_nf_tr, y_nf_tr)

    joblib.dump(nf_pipe, MODEL_DIR/"NF_in_domain_pipe.joblib")
    joblib.dump(nf_mlp,  MODEL_DIR/"NF_in_domain_mlp.joblib")

    z_nf_te = nf_pipe.decision_function(X_nf_te).ravel()
    p_uncal_nf = nf_mlp.predict_proba(z_nf_te.reshape(-1,1))[:,1]
    p_temp_nf  = 1/(1+np.exp(-(z_nf_te/T_nf)))
    p_iso_nf   = iso_nf.transform(p_uncal_nf)

    pd.DataFrame({"y_true":y_nf_te,"p_uncal":p_uncal_nf,"p_temp":p_temp_nf,"p_iso":p_iso_nf}).to_csv(OUT_DIR/"reliability_nf_clean.csv", index=False)

    print("[DONE] Patched training finished. Artifacts in:", OUT_DIR)

# === Run the patched entrypoint instead of original ===
train_and_export_patched()

In [None]:
# === Post-training exports (add-on) ===
import numpy as np, pandas as pd, joblib, json
from pathlib import Path
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from sklearn.calibration import CalibrationDisplay
from sklearn.metrics import roc_curve, precision_recall_curve, confusion_matrix
from sklearn.isotonic import IsotonicRegression
from sklearn.neural_network import MLPClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline

# Inputs (same as training)
CIC_CALIB_PATH = "/content/CIC_tiny_slice_calib (1).csv"
CIC_TEST_PATH  = "/content/CIC_tiny_slice_test (1).csv"
NF_FULL_PATH   = "/content/Dataset_NF-ToN-IoT.csv"

# Models
MODEL_DIR = Path("/content/results_models")
cic_pipe   = joblib.load(MODEL_DIR/"CIC_tiny_slice_pipe.joblib")
cic_mlp    = joblib.load(MODEL_DIR/"CIC_tiny_slice_mlp.joblib")
nf_pipe    = joblib.load(MODEL_DIR/"NF_in_domain_pipe.joblib")
nf_mlp     = joblib.load(MODEL_DIR/"NF_in_domain_mlp.joblib")

OUT_DIR = Path("/content/paper_exports"); OUT_DIR.mkdir(parents=True, exist_ok=True)

# Utils (match training’s label handling & numeric-only features)
def derive_binary(df: pd.DataFrame) -> pd.Series:
    if "Binary" in df.columns:
        s = df["Binary"].astype(str).str.strip().str.lower().replace({"o":"0"})
        y = pd.to_numeric(s, errors="coerce")
        # backfills
        if y.isna().any() and "Class" in df.columns:
            cls = df["Class"].astype(str).str.strip().str.lower().replace({"o":"0"})
            y = y.fillna((cls != "0").astype(int))
        if y.isna().any() and "Label" in df.columns:
            lab = df["Label"].astype(str).str.strip().str.lower()
            y = y.fillna((~lab.str.contains("benign")).astype(int))
        if y.isna().any() and "label" in df.columns:
            lab = df["label"].astype(str).str.strip().str.lower()
            y = y.fillna((~lab.str.contains("benign")).astype(int))
        return y.fillna(1).astype(int)
    if "Class" in df.columns:
        return (df["Class"].astype(str).str.replace("o","0")!="0").astype(int)
    if "label" in df.columns:
        return (~df["label"].astype(str).str.lower().str.contains("benign")).astype(int)
    raise ValueError("Cannot derive Binary label")

def prepare_Xy_numeric(df):
    y = derive_binary(df).values
    X = df.drop(columns=[c for c in ["Binary","Label","label","Class","class"] if c in df.columns], errors="ignore")
    X = X.select_dtypes(include=[np.number])
    return X, y

# Re-create calibration objects (T and isotonic) from calibration sets
def fit_calibrators(pipe, mlp, cal_df):
    Xc, yc = prepare_Xy_numeric(cal_df)
    z = pipe.decision_function(Xc).ravel()
    # temperature
    T = 1.0
    for _ in range(300):
        s = 1/(1+np.exp(-(z/T))); grad = ((s-yc)*z).sum()/(T**2 + 1e-12); T = max(1e-3, T - 0.01*grad)
    # isotonic on uncal MLP probs
    p_uncal = mlp.predict_proba(z.reshape(-1,1))[:,1]
    iso = IsotonicRegression(out_of_bounds="clip").fit(p_uncal, yc)
    return T, iso

def predict_variants(pipe, mlp, X_df, T, iso):
    z = pipe.decision_function(X_df).ravel()
    p_uncal = mlp.predict_proba(z.reshape(-1,1))[:,1]
    p_temp  = 1/(1+np.exp(-(z/T)))
    p_iso   = iso.transform(p_uncal)
    return z, p_uncal, p_temp, p_iso

def thr_at_dr(y, scores, dr=0.95):
    fpr, tpr, thr = roc_curve(y, scores)
    idx = np.argmax(tpr >= dr)
    return thr[idx] if idx < len(thr) else thr[-1]

def plot_conf(tag, y, scores, dr=0.95):
    thr = thr_at_dr(y, scores, dr)
    yhat = (scores >= thr).astype(int)
    tn, fp, fn, tp = confusion_matrix(y, yhat).ravel()
    mat = np.array([[tn,fp],[fn,tp]])
    plt.figure(figsize=(3.4,3.0)); plt.imshow(mat)
    for i in range(2):
        for j in range(2):
            plt.text(j,i,str(mat[i,j]),ha="center",va="center",color="white" if mat[i,j]>0 else "black")
    plt.xticks([0,1],["Pred 0","Pred 1"]); plt.yticks([0,1],["True 0","True 1"])
    plt.title(f"Confusion @DR=0.95: {tag}"); plt.tight_layout()
    plt.savefig(OUT_DIR/f"confusion_{tag}.png", dpi=200); plt.close()

def plot_reliability_png(tag, y, p_uncal, p_temp, p_iso, n_bins=15):
    plt.figure(figsize=(4.8,3.6))
    CalibrationDisplay.from_predictions(y, p_uncal, n_bins=n_bins, name="Uncal", strategy="uniform")
    CalibrationDisplay.from_predictions(y, p_temp,  n_bins=n_bins, name="Temp",  strategy="uniform")
    CalibrationDisplay.from_predictions(y, p_iso,   n_bins=n_bins, name="Isotonic", strategy="uniform")
    plt.title(f"Reliability: {tag}"); plt.tight_layout()
    plt.savefig(OUT_DIR/f"reliability_{tag}.png", dpi=200); plt.close()

def plot_roc_pr_overlays(tag, y_clean, p_clean, y_fgsm=None, p_fgsm=None, y_pgd=None, p_pgd=None):
    # ROC
    plt.figure(figsize=(4.8,3.6))
    fpr,tpr,_ = roc_curve(y_clean, p_clean); plt.plot(fpr,tpr,label="Clean")
    if y_fgsm is not None: fpr,tpr,_ = roc_curve(y_fgsm, p_fgsm); plt.plot(fpr,tpr,label="FGSM")
    if y_pgd  is not None: fpr,tpr,_ = roc_curve(y_pgd,  p_pgd ); plt.plot(fpr,tpr,label="PGD")
    plt.plot([0,1],[0,1],'--'); plt.xlabel("FPR"); plt.ylabel("TPR"); plt.legend(); plt.title(f"ROC: {tag}")
    plt.tight_layout(); plt.savefig(OUT_DIR/f"roc_overlay_{tag}.png", dpi=200); plt.close()
    # PR
    plt.figure(figsize=(4.8,3.6))
    rec,pre,_ = precision_recall_curve(y_clean, p_clean); plt.plot(rec,pre,label="Clean")
    if y_fgsm is not None: rec,pre,_ = precision_recall_curve(y_fgsm, p_fgsm); plt.plot(rec,pre,label="FGSM")
    if y_pgd  is not None: rec,pre,_ = precision_recall_curve(y_pgd,  p_pgd ); plt.plot(rec,pre,label="PGD")
    plt.xlabel("Recall"); plt.ylabel("Precision"); plt.legend(); plt.title(f"PR: {tag}")
    plt.tight_layout(); plt.savefig(OUT_DIR/f"pr_overlay_{tag}.png", dpi=200); plt.close()

def lr_top_pm10(tag, pipe):
    lr = pipe.named_steps["lr"]
    coefs = lr.coef_.ravel()
    # Try to get feature names after preprocessor
    try:
        feats = pipe.named_steps["pre"].get_feature_names_out()
        feats = [f.split("__",1)[-1] for f in feats]
    except Exception:
        feats = [f"f{i}" for i in range(len(coefs))]
    order = np.argsort(coefs); idxs = list(order[:10]) + list(order[-10:])
    names = [feats[i] if i < len(feats) else f"f{i}" for i in idxs]; vals = coefs[idxs]
    pd.DataFrame({"feature":names,"coef":vals}).to_csv(OUT_DIR/f"lr_top_pm10_{tag}.csv", index=False)
    plt.figure(figsize=(6.2,3.6)); plt.bar(range(len(vals)), vals)
    plt.xticks(range(len(vals)), names, rotation=60, ha="right", fontsize=8)
    plt.title(f"LR top ±10: {tag}"); plt.tight_layout()
    plt.savefig(OUT_DIR/f"lr_top_pm10_{tag}.png", dpi=200); plt.close()

def threshold_stability(tag, y, p_uncal, z, T):
    targets = np.linspace(0.80,0.99,20)
    fpr_u, fpr_t = [], []
    for dr in targets:
        fpr, tpr, thr = roc_curve(y, p_uncal); thr_u = thr[np.argmax(tpr>=dr)] if (tpr>=dr).any() else thr[-1]
        fpr_u.append(fpr[np.argmax(tpr>=dr)] if (tpr>=dr).any() else fpr[-1])
        p_temp = 1/(1+np.exp(-(z/T)))
        fpr2, tpr2, thr2 = roc_curve(y, p_temp); thr_t = thr2[np.argmax(tpr2>=dr)] if (tpr2>=dr).any() else thr2[-1]
        fpr_t.append(fpr2[np.argmax(tpr2>=dr)] if (tpr2>=dr).any() else fpr2[-1])
    plt.figure(figsize=(4.8,3.6))
    plt.plot(targets, fpr_u, label="Uncalibrated"); plt.plot(targets, fpr_t, label="Temperature")
    plt.xlabel("Chosen DR"); plt.ylabel("FPR at DR"); plt.title(f"Threshold stability: {tag}")
    plt.legend(); plt.tight_layout(); plt.savefig(OUT_DIR/f"threshold_stability_{tag}.png", dpi=200); plt.close()

# Adversarial in scaled feature space (same as training logic)
def fgsm_scores(pipe, X, y, T, mlp, iso, eps=0.10):
    pre = pipe.named_steps["pre"]; lr = pipe.named_steps["lr"]
    Xs = pre.transform(X)
    z = lr.decision_function(X); s = 1/(1+np.exp(-z)); w = lr.coef_.ravel()
    grad = (s - y).reshape(-1,1) * w.reshape(1,-1)
    Xs_adv = Xs + np.sign(grad)*eps
    z_adv = Xs_adv @ lr.coef_.ravel() + lr.intercept_.ravel()[0]
    p_uncal = mlp.predict_proba(z_adv.reshape(-1,1))[:,1]
    p_temp  = 1/(1+np.exp(-(z_adv/T)))
    p_iso   = iso.transform(p_uncal)
    return z_adv, p_uncal, p_temp, p_iso

def pgd_scores(pipe, X, y, T, mlp, iso, eps=0.10, alpha=0.02, steps=7):
    pre = pipe.named_steps["pre"]; lr = pipe.named_steps["lr"]
    Xs = pre.transform(X); Xs_adv = Xs.copy()
    for _ in range(steps):
        z = Xs_adv @ lr.coef_.ravel() + lr.intercept_.ravel()[0]
        s = 1/(1+np.exp(-z)); w = lr.coef_.ravel()
        grad = (s - y).reshape(-1,1) * w.reshape(1,-1)
        Xs_adv = Xs_adv + np.sign(grad)*alpha
        Xs_adv = np.clip(Xs_adv, Xs - eps, Xs + eps)
    z_adv = Xs_adv @ lr.coef_.ravel() + lr.intercept_.ravel()[0]
    p_uncal = mlp.predict_proba(z_adv.reshape(-1,1))[:,1]
    p_temp  = 1/(1+np.exp(-(z_adv/T)))
    p_iso   = iso.transform(p_uncal)
    return z_adv, p_uncal, p_temp, p_iso

# 1) CIC calibrators
cic_cal = pd.read_csv(CIC_CALIB_PATH, low_memory=False)
cic_tst = pd.read_csv(CIC_TEST_PATH,  low_memory=False)
Xc_te, yc_te = prepare_Xy_numeric(cic_tst)
T_cic, iso_cic = fit_calibrators(cic_pipe, cic_mlp, cic_cal)

# 2) NF calibrators (build from split like in training)
df_nf = pd.read_csv(NF_FULL_PATH, low_memory=False)
if "Binary" not in df_nf.columns:
    # derive
    if "Class" in df_nf.columns:
        df_nf["Binary"] = (df_nf["Class"].astype(str).str.replace("o","0")!="0").astype(int)
    elif "Label" in df_nf.columns:
        df_nf["Binary"] = (~df_nf["Label"].astype(str).str.lower().str.contains("benign")).astype(int)
    else:
        raise ValueError("NF needs Binary/Class/Label")
from sklearn.model_selection import train_test_split
nf_tr, nf_tst = train_test_split(df_nf, test_size=0.2, random_state=42, stratify=df_nf["Binary"])
nf_cal, _ = train_test_split(nf_tr, test_size=0.9, random_state=42, stratify=nf_tr["Binary"])
Xn_te, yn_te = prepare_Xy_numeric(nf_tst)
T_nf, iso_nf = fit_calibrators(nf_pipe, nf_mlp, nf_cal)

# 3) CIC clean + FGSM/PGD exports
zc, p_u, p_t, p_i = predict_variants(cic_pipe, cic_mlp, Xc_te, T_cic, iso_cic)
pd.DataFrame({"y_true":yc_te,"p_uncal":p_u,"p_temp":p_t,"p_iso":p_i}).to_csv(OUT_DIR/"reliability_cic_clean.csv", index=False)
plot_reliability_png("cic_clean", yc_te, p_u, p_t, p_i, n_bins=15)
plot_conf("cic_clean", yc_te, p_t)

zc_f, p_uf, p_tf, p_if = fgsm_scores(cic_pipe, Xc_te, yc_te, T_cic, cic_mlp, iso_cic, eps=0.10)
zc_p, p_up, p_tp, p_ip = pgd_scores(cic_pipe, Xc_te, yc_te, T_cic, cic_mlp, iso_cic, eps=0.10, alpha=0.02, steps=7)
plot_roc_pr_overlays("cic_tinyslice", yc_te, p_t, yc_te, p_tf, yc_te, p_tp)
plot_conf("cic_pgd010", yc_te, p_tp)
# also dump ROC/PR CSVs
fpr,tpr,_ = roc_curve(yc_te, p_t); pd.DataFrame({"fpr":fpr,"tpr":tpr}).to_csv(OUT_DIR/"roc_cic_clean.csv", index=False)
rec,pre,_ = precision_recall_curve(yc_te, p_t); pd.DataFrame({"recall":rec,"precision":pre}).to_csv(OUT_DIR/"pr_cic_clean.csv", index=False)

# 4) NF clean + FGSM/PGD exports
zn, p_un, p_tn, p_in = predict_variants(nf_pipe, nf_mlp, Xn_te, T_nf, iso_nf)
pd.DataFrame({"y_true":yn_te,"p_uncal":p_un,"p_temp":p_tn,"p_iso":p_in}).to_csv(OUT_DIR/"reliability_nf_clean.csv", index=False)
plot_reliability_png("nf_clean", yn_te, p_un, p_tn, p_in, n_bins=15)
plot_conf("nf_clean", yn_te, p_tn)

zn_f, p_unf, p_tnf, p_inf = fgsm_scores(nf_pipe, Xn_te, yn_te, T_nf, nf_mlp, iso_nf, eps=0.10)
zn_p, p_unp, p_tnp, p_inp = pgd_scores(nf_pipe, Xn_te, yn_te, T_nf, nf_mlp, iso_nf, eps=0.10, alpha=0.02, steps=7)
plot_roc_pr_overlays("nf_in_domain", yn_te, p_tn, yn_te, p_tnf, yn_te, p_tnp)
plot_conf("nf_pgd010", yn_te, p_tnp)
fpr,tpr,_ = roc_curve(yn_te, p_tn); pd.DataFrame({"fpr":fpr,"tpr":tpr}).to_csv(OUT_DIR/"roc_nf_clean.csv", index=False)
rec,pre,_ = precision_recall_curve(yn_te, p_tn); pd.DataFrame({"recall":rec,"precision":pre}).to_csv(OUT_DIR/"pr_nf_clean.csv", index=False)

# 5) LR coefficients & threshold stability
lr_top_pm10("cic_tinyslice", cic_pipe)
threshold_stability("cic_tinyslice", yc_te, p_u, zc, T_cic)
lr_top_pm10("nf_in_domain", nf_pipe)
threshold_stability("nf_in_domain", yn_te, p_un, zn, T_nf)

# 6) Optional manifest + hyperparams sketch (best-effort)
hp = {"adv_eps":0.10,"pgd_steps":7,"pgd_alpha":0.02,"bins":15}
with open(OUT_DIR/"manifest.json","w") as f: json.dump({"models":[str(p) for p in MODEL_DIR.glob("*.joblib")],
                                                        "fig_dir":str(OUT_DIR), "hp":hp}, f, indent=2)
print("[OK] All add-on exports written to:", OUT_DIR)

In [None]:
import json, pandas as pd, numpy as np
from pathlib import Path
OUT = Path("/content/paper_exports"); OUT.mkdir(parents=True, exist_ok=True)

# 1) Hyperparams ledger
hp = {
  "lr_C": 1.0, "lr_penalty": "l2", "lr_max_iter": 200,
  "mlp_hidden": "(32,)", "mlp_alpha": 1e-4, "mlp_batch": 512, "mlp_epochs": 20,
  "SMOTE": True, "adv_eps": 0.10, "pgd_steps": 7, "pgd_alpha": 0.02, "seed": 42
}
pd.DataFrame([hp]).to_csv(OUT/"hyperparams.csv", index=False)

# 2) Save adversarial ROC/PR CSVs, reusing the reliability + overlays if present
# If you have p_temp for FGSM/PGD in memory, write them; otherwise skip silently.
# (If you want, I can give you a small cell to recompute p_temp_FGSM/PGD and dump here.)
print("Extras written:", OUT/"hyperparams.csv")

In [None]:
# === Minimal add-on: write adversarial reliability + confusion-inputs + ROC/PR CSVs ===
import numpy as np, pandas as pd, joblib
from pathlib import Path
from sklearn.isotonic import IsotonicRegression
from sklearn.metrics import roc_curve, precision_recall_curve

OUT = Path("/content/paper_exports"); OUT.mkdir(parents=True, exist_ok=True)
MODELS = Path("/content/results_models")

# Reuse your paths
CIC_CAL = "/content/CIC_tiny_slice_calib (1).csv"
CIC_TST = "/content/CIC_tiny_slice_test (1).csv"
NF_FULL = "/content/Dataset_NF-ToN-IoT.csv"

# --- utils (match training) ---
def derive_binary(df):
    if "Binary" in df.columns:
        s = df["Binary"].astype(str).str.strip().str.lower().replace({"o":"0"})
        y = pd.to_numeric(s, errors="coerce")
        if y.isna().any() and "Class" in df.columns:
            y = y.fillna((df["Class"].astype(str).str.replace("o","0")!="0").astype(int))
        if y.isna().any() and "Label" in df.columns:
            y = y.fillna((~df["Label"].astype(str).str.lower().str.contains("benign")).astype(int))
        return y.fillna(1).astype(int)
    if "Class" in df.columns:
        return (df["Class"].astype(str).str.replace("o","0")!="0").astype(int)
    if "label" in df.columns:
        return (~df["label"].astype(str).str.lower().str.contains("benign")).astype(int)
    raise ValueError("No label column")
def Xy_numeric(df):
    y = derive_binary(df).values
    X = df.drop(columns=[c for c in ["Binary","Label","label","Class","class"] if c in df.columns], errors="ignore")
    return X.select_dtypes(include=[np.number]), y
def temp_fit(z,y,steps=300,lr=0.01):
    T=1.0; z=z.ravel(); y=y.astype(int).ravel()
    for _ in range(steps):
        s=1/(1+np.exp(-(z/T))); T=max(1e-3, T - lr*((s-y)*z).sum()/(T**2+1e-12))
    return T
def cal_objs(pipe, mlp, calib_df):
    Xc,yc = Xy_numeric(calib_df); z = pipe.decision_function(Xc).ravel()
    T = temp_fit(z,yc)
    p_uncal = mlp.predict_proba(z.reshape(-1,1))[:,1]
    iso = IsotonicRegression(out_of_bounds="clip").fit(p_uncal, yc)
    return T, iso
def scores(pipe, mlp, X, T, iso):
    z = pipe.decision_function(X).ravel()
    p_u = mlp.predict_proba(z.reshape(-1,1))[:,1]
    p_t = 1/(1+np.exp(-(z/T)))
    p_i = iso.transform(p_u)
    return z, p_u, p_t, p_i
def fgsm(pipe, X, y, T, mlp, iso, eps=0.10):
    pre = pipe.named_steps["pre"]; lr = pipe.named_steps["lr"]
    Xs = pre.transform(X); z = lr.decision_function(X); s=1/(1+np.exp(-z)); w = lr.coef_.ravel()
    grad=(s-y).reshape(-1,1)*w.reshape(1,-1); Xs_adv = Xs + np.sign(grad)*eps
    z_adv = Xs_adv @ lr.coef_.ravel() + lr.intercept_.ravel()[0]
    p_u = mlp.predict_proba(z_adv.reshape(-1,1))[:,1]; p_t = 1/(1+np.exp(-(z_adv/T))); p_i = iso.transform(p_u)
    return z_adv, p_u, p_t, p_i
def pgd(pipe, X, y, T, mlp, iso, eps=0.10, alpha=0.02, steps=7):
    pre = pipe.named_steps["pre"]; lr = pipe.named_steps["lr"]
    Xs = pre.transform(X); Xs_adv = Xs.copy()
    for _ in range(steps):
        z = Xs_adv @ lr.coef_.ravel() + lr.intercept_.ravel()[0]
        s = 1/(1+np.exp(-z)); w = lr.coef_.ravel()
        grad = (s-y).reshape(-1,1) * w.reshape(1,-1)
        Xs_adv = np.clip(Xs_adv + np.sign(grad)*alpha, Xs - eps, Xs + eps)
    z_adv = Xs_adv @ lr.coef_.ravel() + lr.intercept_.ravel()[0]
    p_u = mlp.predict_proba(z_adv.reshape(-1,1))[:,1]; p_t = 1/(1+np.exp(-(z_adv/T))); p_i = iso.transform(p_u)
    return z_adv, p_u, p_t, p_i
def save_reliability(tag, y, p_u, p_t, p_i):
    pd.DataFrame({"y_true":y, "p_uncal":p_u, "p_temp":p_t, "p_iso":p_i}).to_csv(OUT/f"reliability_{tag}.csv", index=False)
def save_curves(tag, y, p):
    fpr,tpr,_ = roc_curve(y,p); pd.DataFrame({"fpr":fpr,"tpr":tpr}).to_csv(OUT/f"roc_{tag}.csv", index=False)
    rec,pre,_ = precision_recall_curve(y,p); pd.DataFrame({"recall":rec,"precision":pre}).to_csv(OUT/f"pr_{tag}.csv", index=False)
def save_conf_inputs(tag, y, p_t):
    pd.DataFrame({"y_true":y, "p_temp":p_t}).to_csv(OUT/f"confusion_inputs_{tag}.csv", index=False)

# Load models
from sklearn.metrics import roc_curve, precision_recall_curve
cic_pipe = joblib.load(MODELS/"CIC_tiny_slice_pipe.joblib")
cic_mlp  = joblib.load(MODELS/"CIC_tiny_slice_mlp.joblib")
nf_pipe  = joblib.load(MODELS/"NF_in_domain_pipe.joblib")
nf_mlp   = joblib.load(MODELS/"NF_in_domain_mlp.joblib")

# CIC
cic_cal = pd.read_csv(CIC_CAL, low_memory=False)
cic_tst = pd.read_csv(CIC_TST, low_memory=False)
Xc, yc = Xy_numeric(cic_tst)
T_cic, iso_cic = cal_objs(cic_pipe, cic_mlp, cic_cal)
# PGD(ε=0.10)
zc_p, puc_p, pt_p, pis_p = pgd(cic_pipe, Xc, yc, T_cic, cic_mlp, iso_cic, eps=0.10, alpha=0.02, steps=7)
save_reliability("cic_pgd010", yc, puc_p, pt_p, pis_p)
save_conf_inputs("cic_pgd010", yc, pt_p)
save_curves("cic_pgd010", yc, pt_p)
# Clean confusion inputs too
_, puc_c, pt_c, _ = scores(cic_pipe, cic_mlp, Xc, T_cic, iso_cic)
save_conf_inputs("cic_clean", yc, pt_c)

# NF
df_nf = pd.read_csv(NF_FULL, low_memory=False)
# build split as in training
if "Binary" not in df_nf.columns:
    if "Class" in df_nf.columns:
        df_nf["Binary"] = (df_nf["Class"].astype(str).str.replace("o","0")!="0").astype(int)
    elif "Label" in df_nf.columns:
        df_nf["Binary"] = (~df_nf["Label"].astype(str).str.lower().str.contains("benign")).astype(int)
from sklearn.model_selection import train_test_split
tr, tst = train_test_split(df_nf, test_size=0.2, random_state=42, stratify=df_nf["Binary"])
cal, _  = train_test_split(tr, test_size=0.9, random_state=42, stratify=tr["Binary"])
Xn, yn = Xy_numeric(tst)
T_nf, iso_nf = cal_objs(nf_pipe, nf_mlp, cal)
# PGD(ε=0.10)
zn_p, pun_p, pt_n_p, pis_n_p = pgd(nf_pipe, Xn, yn, T_nf, nf_mlp, iso_nf, eps=0.10, alpha=0.02, steps=7)
save_reliability("nf_pgd010", yn, pun_p, pt_n_p, pis_n_p)
save_conf_inputs("nf_pgd010", yn, pt_n_p)
save_curves("nf_pgd010", yn, pt_n_p)
# Clean confusion inputs too
_, pun_c, pt_n_c, _ = scores(nf_pipe, nf_mlp, Xn, T_nf, iso_nf)
save_conf_inputs("nf_clean", yn, pt_n_c)

print("Wrote missing artifacts to:", OUT)