In [2]:
# ==========================================
# Isolation Forest (creditcard.csv) with manual/F1/Precision threshold
# - Splits: TrainN=20k normal, VAL 250/250, TEST Balanced 200/200, TEST Imbalanced 10k/200
# - Scale fit on train normal only
# - Scores = -score_samples (cao hơn => bất thường hơn)
# ==========================================
import os, random
import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import IsolationForest
from sklearn.metrics import (
    confusion_matrix, classification_report, roc_auc_score,
    precision_recall_fscore_support, precision_recall_curve
)

# ------------------------------
# 0) Seeds
# ------------------------------
SEED = 42
random.seed(SEED); np.random.seed(SEED)

# ------------------------------
# 1) Load
# ------------------------------
csv_path = "../creditcard.csv"
df = pd.read_csv(csv_path)
assert "Class" in df.columns
X_df = df.drop(columns=["Class"])
y = df["Class"].astype(int).to_numpy()

# ------------------------------
# 2) Splits (no overlap), same as VAE/GAN
# ------------------------------
normal_idx = np.where(y==0)[0]; anom_idx = np.where(y==1)[0]
rng = np.random.default_rng(SEED); rng.shuffle(normal_idx); rng.shuffle(anom_idx)

TR_N, VAL_N, VAL_A, TESTB_N, TESTB_A, TESTI_N = 20000, 250, 250, 200, 200, 10000
assert len(anom_idx) >= (VAL_A + TESTB_A), "Không đủ anomaly cho VAL/TESTB."

# Bảo đảm còn đủ normal cho TESTI & train
max_train_normal = len(normal_idx) - (VAL_N + TESTB_N + TESTI_N)
if max_train_normal < 1000:
    TESTI_N = max(2000, len(normal_idx) - (VAL_N + TESTB_N + 1000))
    max_train_normal = len(normal_idx) - (VAL_N + TESTB_N + TESTI_N)
TRAIN_N = max(5000, min(TR_N, max_train_normal))

# Cắt chỉ số
ptr_n=0; ptr_a=0
trn_n = normal_idx[ptr_n:ptr_n+TRAIN_N]; ptr_n+=TRAIN_N
val_n = normal_idx[ptr_n:ptr_n+VAL_N];   ptr_n+=VAL_N
tstb_n= normal_idx[ptr_n:ptr_n+TESTB_N]; ptr_n+=TESTB_N
tsti_n= normal_idx[ptr_n:ptr_n+TESTI_N]; ptr_n+=TESTI_N

val_a = anom_idx[ptr_a:ptr_a+VAL_A]; ptr_a+=VAL_A
tstb_a= anom_idx[ptr_a:ptr_a+TESTB_A]; ptr_a+=TESTB_A
tsti_a= tstb_a  # dùng chung anomaly với test balanced

def take(idxs): return X_df.iloc[idxs].to_numpy().astype(np.float32), y[idxs]
X_tr_n, _ = take(trn_n)

X_val = np.vstack([X_df.iloc[val_n].to_numpy(), X_df.iloc[val_a].to_numpy()]).astype(np.float32)
y_val = np.hstack([np.zeros(len(val_n), dtype=int), np.ones(len(val_a), dtype=int)])

X_tstb = np.vstack([X_df.iloc[tstb_n].to_numpy(), X_df.iloc[tstb_a].to_numpy()]).astype(np.float32)
y_tstb = np.hstack([np.zeros(len(tstb_n), dtype=int), np.ones(len(tstb_a), dtype=int)])

X_tsti = np.vstack([X_df.iloc[tsti_n].to_numpy(), X_df.iloc[tsti_a].to_numpy()]).astype(np.float32)
y_tsti = np.hstack([np.zeros(len(tsti_n), dtype=int), np.ones(len(tsti_a), dtype=int)])

print(f"TrainN={len(trn_n)}, Val={len(val_n)}/{len(val_a)}, TestB={len(tstb_n)}/{len(tstb_a)}, TestI={len(tsti_n)}/{len(tsti_a)}")

# ------------------------------
# 3) Scale (fit on train normal only)
# ------------------------------
scaler = StandardScaler().fit(X_tr_n)
def z(x): return scaler.transform(x).astype(np.float32)
X_tr_n = z(X_tr_n); X_val = z(X_val); X_tstb = z(X_tstb); X_tsti = z(X_tsti)

# ------------------------------
# 4) Fit Isolation Forest on normal only
# ------------------------------
clf = IsolationForest(
    n_estimators=400,
    max_samples=512,
    contamination="auto",  # ta tự đặt ngưỡng bằng scores
    random_state=SEED,
    n_jobs=-1
)
clf.fit(X_tr_n)

def anomaly_score(model, X):
    # score_samples: higher => more normal -> đảo dấu để high = anomalous
    return -model.score_samples(X)

val_scores = anomaly_score(clf, X_val)

# ------------------------------
# 5) Threshold selection
# ------------------------------
MODE = "f1"        # "manual" | "f1" | "p_at"
THR_MANUAL = float(np.percentile(val_scores, 95))  # gợi ý khi manual
TARGET_P   = 0.60  # Precision target khi MODE="p_at"

def best_f1_threshold(y_true, scores, percentiles=np.linspace(50, 99.5, 200)):
    ths = np.percentile(scores, percentiles)
    best_f1, best_thr = -1.0, None
    for t in ths:
        yhat = (scores >= t).astype(int)
        _, _, f1, _ = precision_recall_fscore_support(
            y_true, yhat, labels=[0,1], average=None, zero_division=0
        )
        if f1[1] > best_f1:
            best_f1, best_thr = float(f1[1]), float(t)
    return best_thr, best_f1

def threshold_for_precision(y_true, scores, target_p=0.60):
    p, r, thr = precision_recall_curve(y_true, scores)
    # thr dài = len(p)-1
    idx = np.where(p[:-1] >= target_p)[0]
    if len(idx) == 0:
        # fallback: 95th percentile nếu không đạt precision target trên VAL
        return float(np.percentile(scores, 95)), float(p[1] if len(p)>1 else 0.0), float(r[1] if len(r)>1 else 0.0)
    i = idx[0]
    return float(thr[i]), float(p[i]), float(r[i])

if MODE == "manual":
    thr = float(THR_MANUAL)
    pct = (val_scores <= thr).mean() * 100.0
    print(f"\n[VAL] Manual threshold selected: thr={thr:.6f} (~percentile {pct:.2f}%)")
elif MODE == "f1":
    thr, f1v = best_f1_threshold(y_val, val_scores)
    print(f"\n[VAL balanced] Best F1(Class 1)={f1v:.3f} at threshold={thr:.6f}")
else:
    thr, p_at, r_at = threshold_for_precision(y_val, val_scores, TARGET_P)
    print(f"\n[VAL balanced] Threshold for Precision≥{TARGET_P:.2f}: thr={thr:.6f} (P={p_at:.3f}, R={r_at:.3f})")

# ------------------------------
# 6) Evaluate
# ------------------------------
def evaluate(name, X, y, thr):
    s = anomaly_score(clf, X); yhat = (s >= thr).astype(int)
    print(f"\n===== {name} @thr={thr:.6f} =====")
    print("Confusion Matrix:\n", confusion_matrix(y, yhat))
    print("\nClassification Report:\n", classification_report(y, yhat, digits=4))
    print("ROC AUC (scores):", roc_auc_score(y, s))

evaluate("TEST Balanced (200/200)",     X_tstb, y_tstb, thr)
evaluate("TEST Imbalanced (10000/200)", X_tsti, y_tsti, thr)

# ------------------------------
# 7) Quick sweep vài percentile để quan sát
# ------------------------------
cands = [80, 90, 92.5, 95, 97.5, 99]
print("\n>>> Quick sweep on percentile-based thresholds (from VAL):")
for p in cands:
    t = float(np.percentile(val_scores, p))
    print(f"\n-- Try thr={t:.6f} (pctl={p}) on TEST Balanced --")
    evaluate("TEST Balanced (200/200)", X_tstb, y_tstb, t)
    print(f"\n-- Try thr={t:.6f} (pctl={p}) on TEST Imbalanced --")
    evaluate("TEST Imbalanced (10000/200)", X_tsti, y_tsti, t)


TrainN=20000, Val=250/250, TestB=200/200, TestI=10000/200

[VAL balanced] Best F1(Class 1)=0.893 at threshold=0.443434

===== TEST Balanced (200/200) @thr=0.443434 =====
Confusion Matrix:
 [[185  15]
 [ 21 179]]

Classification Report:
               precision    recall  f1-score   support

           0     0.8981    0.9250    0.9113       200
           1     0.9227    0.8950    0.9086       200

    accuracy                         0.9100       400
   macro avg     0.9104    0.9100    0.9100       400
weighted avg     0.9104    0.9100    0.9100       400

ROC AUC (scores): 0.9571250000000001

===== TEST Imbalanced (10000/200) @thr=0.443434 =====
Confusion Matrix:
 [[9155  845]
 [  21  179]]

Classification Report:
               precision    recall  f1-score   support

           0     0.9977    0.9155    0.9548     10000
           1     0.1748    0.8950    0.2925       200

    accuracy                         0.9151     10200
   macro avg     0.5863    0.9052    0.6237     10200
w