
# Reproducing Thennakoon et al. (2019): Credit‑Card Fraud Detection Methodology

This notebook implements the **methodology** from *Thennakoon et al., 2019 — “Real-time Credit Card Fraud Detection Using Machine Learning”*:
- Two preparation modes (**Type A numeric**, **Type B categorical**)
- **Data cleaning, normalization**, and **PCA** for dimension reduction
- **Class-imbalance handling** via **SMOTE**, **Random Under‑Sampling (RUS)**, **Condensed Nearest Neighbour (CNN)**
- **10‑fold cross‑validation**
- Classifiers: **SVM**, **Naïve Bayes**, **K‑Nearest Neighbors**, **Logistic Regression**
- Metrics: **Accuracy, Precision, Recall, TPR, FPR, F1, ROC‑AUC, MCC**
- (Optional) **Real‑time scoring stub**

> Paper citation (uploaded): fileciteturn1file0

> **Dataset note**: The paper’s four fraud patterns use fields like MCC, ISO response code, and URL. If those are **not** present in your dataset (e.g., Taiwan UCI credit default data), the notebook still reproduces the **learning pipeline** on the available target and features. Where the specific fields exist, the same pipeline can be applied **per fraud pattern**.


In [None]:

# %pip install -q numpy pandas scikit-learn imbalanced-learn matplotlib seaborn

import json
from pathlib import Path

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline
from sklearn.decomposition import PCA
from sklearn.model_selection import StratifiedKFold, cross_validate
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, matthews_corrcoef, confusion_matrix, RocCurveDisplay
)
from sklearn.metrics import make_scorer
from sklearn.naive_bayes import GaussianNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC

from imblearn.pipeline import Pipeline as ImbPipeline
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler, CondensedNearestNeighbour

pd.set_option("display.max_columns", 200)
pd.set_option("display.width", 1200)

SEED = 42
RNG = np.random.default_rng(SEED)


In [None]:

CSV_PATH = "/mnt/data/Taiwan_UCI_Credit_Card.csv"
assert Path(CSV_PATH).exists(), f"CSV not found at {CSV_PATH}"

df_raw = pd.read_csv(CSV_PATH)
print(df_raw.shape)
df_raw.head()



## Dataset target & schema
- If present, the canonical UCI Taiwan target is **`default.payment.next.month`** (1=default, 0=non‑default).
- We will detect the target column automatically if possible; otherwise set it manually.


In [None]:

candidate_targets = [
    "default.payment.next.month", "DEFAULT_NEXT_MONTH", "default_next_month", "target", "label"
]
target_col = None
for c in candidate_targets:
    if c in df_raw.columns:
        target_col = c
        break
print("Detected target:", target_col)
if target_col is None:
    raise ValueError("Could not detect target column. Please set `target_col` manually.")

df = df_raw.drop_duplicates().copy()
if "ID" in df.columns:
    df = df.drop(columns=["ID"])

print("Class balance:\n", df[target_col].value_counts(normalize=True).round(3))
print("Nulls (top):\n", df.isna().sum().sort_values(ascending=False).head(10))



## Type A vs Type B Preparation
- **Type A**: Numeric transform (+ standardization), optional **PCA**.
- **Type B**: Keep numeric as-is, one‑hot encode categoricals; no numeric scaling beyond encoding.


In [None]:

num_cols = [c for c in df.columns if pd.api.types.is_numeric_dtype(df[c]) and c != target_col]
cat_cols = [c for c in df.columns if (c != target_col) and (c not in num_cols)]
print("Numeric columns (sample):", num_cols[:10])
print("Categorical columns (sample):", cat_cols[:10])

X = df.drop(columns=[target_col])
y = df[target_col].astype(int).values

typeA_preprocess = ColumnTransformer(
    transformers=[
        ("num", StandardScaler(with_mean=False), num_cols),
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse=False), cat_cols) if cat_cols else ("noop","passthrough",[]),
    ],
    remainder="drop",
)

USE_PCA = True
PCA_COMPONENTS = 0.95  # variance retained

typeB_preprocess = ColumnTransformer(
    transformers=[
        ("num", "passthrough", num_cols),
        ("cat", OneHotEncoder(handle_unknown="ignore", sparse=False), cat_cols) if cat_cols else ("noop","passthrough",[]),
    ],
    remainder="drop",
)



## Class Imbalance Handling & Models
Resamplers: **SMOTE**, **RUS**, **CNN**.  
Models: **LR**, **NB**, **KNN**, **SVM**.  
Metrics include Accuracy, Precision/Recall/F1, ROC‑AUC, **MCC**, **TPR**, **FPR**.


In [None]:

models = {
    "LR": LogisticRegression(max_iter=200, n_jobs=None, random_state=SEED),
    "NB": GaussianNB(),
    "KNN": KNeighborsClassifier(n_neighbors=15, weights="distance"),
    "SVM": SVC(kernel="rbf", probability=True, random_state=SEED),
}

def tpr_score(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return tp / (tp + fn) if (tp + fn) else 0.0

def fpr_score(y_true, y_pred):
    tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()
    return fp / (fp + tn) if (fp + tn) else 0.0

from sklearn.metrics import make_scorer
scorers = {
    "accuracy": make_scorer(accuracy_score),
    "precision": make_scorer(precision_score, zero_division=0),
    "recall": make_scorer(recall_score, zero_division=0),
    "f1": make_scorer(f1_score, zero_division=0),
    "roc_auc": make_scorer(roc_auc_score, needs_proba=True),
    "mcc": make_scorer(matthews_corrcoef),
    "tpr": make_scorer(tpr_score),
    "fpr": make_scorer(fpr_score),
}


In [None]:

from sklearn.model_selection import StratifiedKFold, cross_validate

def run_experiment(view_name:str, preprocessor, resampler, use_pca:bool):
    steps = [("prep", preprocessor)]
    if use_pca:
        steps += [("pca", PCA(n_components=PCA_COMPONENTS, random_state=SEED))]
    results = []
    skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=SEED)

    for model_key, clf in models.items():
        pipe_cls = ImbPipeline(steps=[("resample", resampler)] + steps + [("model", clf)])
        cv = cross_validate(
            pipe_cls, X, y, cv=skf, scoring=scorers, n_jobs=-1, return_train_score=False
        )
        row = {m: float(np.mean(cv["test_"+m])) for m in scorers.keys()}
        row.update(dict(view=view_name, resampler=type(resampler).__name__, model=model_key))
        results.append(row)

    return pd.DataFrame(results)


In [None]:

views = {
    "TypeA_numeric": typeA_preprocess,
    "TypeB_categorical": typeB_preprocess,
}

resamplers = [
    SMOTE(random_state=SEED),
    RandomUnderSampler(random_state=SEED),
    CondensedNearestNeighbour(random_state=SEED),
]

all_results = []
for vname, prep in views.items():
    for res in resamplers:
        df_res = run_experiment(vname, prep, res, USE_PCA if vname=="TypeA_numeric" else False)
        all_results.append(df_res)

df_results = pd.concat(all_results, ignore_index=True)
df_results_sorted = df_results.sort_values(["roc_auc","f1","accuracy"], ascending=False)
display(df_results_sorted.head(20))

OUT_DIR = Path("./outputs"); OUT_DIR.mkdir(exist_ok=True, parents=True)
df_results_sorted.to_csv(OUT_DIR / "thennakoon2019_results.csv", index=False)
print("Saved results to:", (OUT_DIR / "thennakoon2019_results.csv").resolve())


In [None]:

best_by_model = (
    df_results_sorted
    .groupby(["view","resampler","model"], as_index=False)
    .head(1)
    .sort_values(["roc_auc","f1","accuracy"], ascending=False)
    .reset_index(drop=True)
)
print("=== Best per (view, resampler, model) ===")
display(best_by_model.head(12))
best_by_model.to_csv(OUT_DIR / "thennakoon2019_best_by_model.csv", index=False)


In [None]:

from sklearn.model_selection import StratifiedKFold
top = df_results_sorted.iloc[0]
print(top)

skf = StratifiedKFold(n_splits=10, shuffle=True, random_state=SEED)
(train_idx, test_idx) = next(iter(skf.split(X, y)))
X_train, X_test = X.iloc[train_idx], X.iloc[test_idx]
y_train, y_test = y[train_idx], y[test_idx]

prep = views[top['view']]
pca_steps = [("pca", PCA(n_components=PCA_COMPONENTS, random_state=SEED))] if top['view']=="TypeA_numeric" else []

if top['resampler'] == "SMOTE":
    resampler = SMOTE(random_state=SEED)
elif top['resampler'] == "RandomUnderSampler":
    resampler = RandomUnderSampler(random_state=SEED)
else:
    resampler = CondensedNearestNeighbour(random_state=SEED)

clf = models[top['model']]
pipe = ImbPipeline(steps=[("resample", resampler), ("prep", prep)] + pca_steps + [("model", clf)])
pipe.fit(X_train, y_train)

if hasattr(clf, "predict_proba"):
    y_prob = pipe.predict_proba(X_test)[:,1]
    RocCurveDisplay.from_predictions(y_test, y_prob)
    plt.title(f"ROC — {top['model']} | {top['view']} | {top['resampler']}")
    plt.show()
else:
    print("Model lacks predict_proba; skipping ROC.")



## (Optional) Real‑time Scoring Stub
The paper deploys models behind an API. Below simulates a basic request‑response with the **best** pipeline.


In [None]:

def build_best_pipeline(summary_row: pd.Series):
    prep = views[summary_row['view']]
    pca_steps = [("pca", PCA(n_components=PCA_COMPONENTS, random_state=SEED))] if summary_row['view']=="TypeA_numeric" else []

    if summary_row['resampler'] == "SMOTE":
        resampler = SMOTE(random_state=SEED)
    elif summary_row['resampler'] == "RandomUnderSampler":
        resampler = RandomUnderSampler(random_state=SEED)
    else:
        resampler = CondensedNearestNeighbour(random_state=SEED)

    clf = models[summary_row['model']]
    return ImbPipeline(steps=[("resample", resampler), ("prep", prep)] + pca_steps + [("model", clf)])

best_pipe = build_best_pipeline(df_results_sorted.iloc[0])
best_pipe.fit(X, y)

def score_transaction(json_like: dict) -> float:
    row_df = pd.DataFrame([json_like])
    for c in X.columns:
        if c not in row_df.columns:
            row_df[c] = np.nan
    row_df = row_df[X.columns]
    if hasattr(best_pipe.named_steps["model"], "predict_proba"):
        prob = best_pipe.predict_proba(row_df)[:,1][0]
        return float(prob)
    else:
        pred = best_pipe.predict(row_df)[0]
        return float(pred)

print("Fraud score (example):", score_transaction(X.iloc[0].to_dict()))



### Mapping the Paper’s Four Fraud Patterns
If your data include **MCC**, **ISO response codes**, **URL**, or **amount** thresholds, create boolean masks to isolate each fraud pattern and run the same pipeline on each subset. Without those fields, you still reproduce the learning method (resampling + models + CV + metrics) on your available target.
