In [None]:
import numpy as np
import pandas as pd

from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import StratifiedKFold
from sklearn.svm import OneClassSVM
from sklearn.mixture import GaussianMixture
from sklearn.metrics import roc_auc_score, average_precision_score

import pickle
import os
import matplotlib.pyplot as plt
import matplotlib.cm as cm

## 1- Load Credit Card Dataset

In [None]:
# 1. Read Credit Card Data
cc_df = pd.read_csv("outputs/credit_card_PCA.csv")


In [None]:
# 2. Inspect the first few rows to verify column names:
print(cc_df.head())
print(cc_df.columns.tolist())

## 2- Data Splitting

In [None]:
# 2. Split into feature matrix X_cc and target vector y_cc.
#    We drop the 'Class' column from X; y_cc holds 0 (normal) / 1 (fraud).
X_cc = cc_df.drop(columns=['Class'])
y_cc = cc_df['Class']


In [None]:
# 3. Quick sanity check: shapes and class distribution
print("=== Credit Card Dataset ===")
print("X_cc shape:", X_cc.shape)
print("y_cc class distribution:\n", y_cc.value_counts(normalize=True))

## 3- Cross Validation

In [None]:
# 1. Use StratifiedKFold to keep class imbalance roughly equal across folds.
skf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
repeats = 10

In [None]:
# 3. Prepare empty dictionaries to collect (AUC, AUPRC) for each method and scenario.
results_cc = {
    'majority': {
        'OCSVM': [],
        'OCSVM_Sigmoid': [],
        'OCSVM_Isotonic': [],
        'GMM': [],
        'GMM_Sigmoid': []
    },
    'minority': {
        'OCSVM': [],
        'OCSVM_Sigmoid': [],
        'OCSVM_Isotonic': [],
        'GMM': [],
        'GMM_Sigmoid': []
    },
}

## 4- Calibration

In [14]:
from sklearn.linear_model import LogisticRegression
from sklearn.isotonic import IsotonicRegression

def calibrate_scores(scores, y_true, method="sigmoid"):
    if method == "sigmoid":
        calibrator = LogisticRegression(solver="lbfgs")
    elif method == "isotonic":
        calibrator = IsotonicRegression(out_of_bounds="clip")
    else:
        raise ValueError("Unknown calibration method.")

    calibrator.fit(scores.reshape(-1, 1), y_true)
    return calibrator


## 5- Model Training

In [None]:
# CREDIT CARD: MAJORITY (Only Normal) VS MINORITY (Only Fraud) TRAINING

for r in range(repeats):
    print(f"\n--- Credit Card: Repeat {r+1}/{repeats} ---")
    for fold, (train_idx, test_idx) in enumerate(skf.split(X_cc, y_cc), start=1):
        
        # Split feature matrix and target into training and testing sets for this fold.
        X_train, X_test = X_cc.values[train_idx], X_cc.values[test_idx]
        y_train, y_test = y_cc.values[train_idx], y_cc.values[test_idx]



        # ---------------------------------
        # 1) Majority training: use only normal (y_train == 0)
        # ---------------------------------
        X_train_major = X_train[y_train == 0]  # All “normal” transactions



        # 1st Model
        # -- One-Class SVM (Majority) --
        ocsvm_major_m = OneClassSVM(kernel="rbf", nu=0.01, gamma="scale")
        print("training is started!")
        ocsvm_major_m.fit(X_train)


        # Get anomaly scores
        svm_scores_raw = ocsvm_major_m.decision_function(X_test)
        svm_anomaly = -svm_scores_raw

        # Calibrate
        calibrator_sigmoid = calibrate_scores(svm_anomaly, y_test, method="sigmoid")
        calibrator_isotonic = calibrate_scores(svm_anomaly, y_test, method="isotonic")

        # Get calibrated probabilities
        prob_sigmoid = calibrator_sigmoid.predict_proba(svm_anomaly.reshape(-1, 1))[:, 1] if hasattr(calibrator_sigmoid, "predict_proba") else calibrator_sigmoid.predict(svm_anomaly.reshape(-1, 1))
        prob_isotonic = calibrator_isotonic.predict(svm_anomaly.reshape(-1, 1))

        # Evaluate
        auc_sigmoid = roc_auc_score(y_test, prob_sigmoid)
        aupr_sigmoid = average_precision_score(y_test, prob_sigmoid)
        auc_isotonic = roc_auc_score(y_test, prob_isotonic)
        aupr_isotonic = average_precision_score(y_test, prob_isotonic)

        results_cc['majority']['OCSVM_Sigmoid'].append((auc_sigmoid, aupr_sigmoid))
        results_cc['majority']['OCSVM_Isotonic'].append((auc_isotonic, aupr_isotonic))

        print(f"Majority OCSVM + Sigmoid: AUC={auc_sigmoid:.4f}, AUPR={aupr_sigmoid:.4f}")
        print(f"Majority OCSVM + Isotonic: AUC={auc_isotonic:.4f}, AUPR={aupr_isotonic:.4f}")



        
        # 2nd Model
        # -- One-Class GMM (Majority) --
        gmm_major = GaussianMixture(n_components=1, random_state=42)
        gmm_major.fit(X_train_major)


        # Get raw scores and convert to anomaly scores
        gmm_scores_raw_major_gmm = gmm_major.score_samples(X_test)
        gmm_anomaly_major_gmm = -gmm_scores_raw_major_gmm

        # Calibrate using sigmoid and isotonic
        calibrator_sigmoid_major_gmm = calibrate_scores(gmm_anomaly_major_gmm, y_test, method="sigmoid")
        # calibrator_isotonic_major_gmm = calibrate_scores(gmm_anomaly_major_gmm, y_test_m, method="isotonic")

        # Get calibrated probabilities
        prob_sigmoid_major_gmm = (
            calibrator_sigmoid_major_gmm.predict_proba(gmm_anomaly_major_gmm.reshape(-1, 1))[:, 1]
            if hasattr(calibrator_sigmoid_major_gmm, "predict_proba")
            else calibrator_sigmoid_major_gmm.predict(gmm_anomaly_major_gmm.reshape(-1, 1))
        )


        # Evaluate calibrated results
        auc_sigmoid_major_gmm = roc_auc_score(y_test, prob_sigmoid_major_gmm)
        aupr_sigmoid_major_gmm = average_precision_score(y_test, prob_sigmoid_major_gmm)

        # Save results
        results_cc['majority']['GMM_Sigmoid'].append((auc_sigmoid_major_gmm, aupr_sigmoid_major_gmm))

        # Print results
        print(f"Majority GMM + Sigmoid: AUC={auc_sigmoid_major_gmm:.4f}, AUPR={aupr_sigmoid_major_gmm:.4f}")

        





        # ---------------------------------
        # 2) Minority training: use only fraud (y_train == 1)
        # ---------------------------------
        X_train_minor = X_train[y_train == 1]  # All “fraud” transactions

        if len(X_train_minor) > 0:
            
            # 1st model
            # -- One-Class SVM (Minority) --
            ocsvm_minor_m = OneClassSVM(kernel="rbf", nu=0.01, gamma="scale")
            print("training is started!")
            ocsvm_minor_m.fit(X_train_minor)

            # Get anomaly scores
            svm_scores_raw_minor_svm = ocsvm_minor_m.decision_function(X_test)
            svm_anomaly_minor_svm = -svm_scores_raw_minor_svm

            # Calibrate
            calibrator_sigmoid_minor_svm = calibrate_scores(svm_anomaly_minor_svm, y_test, method="sigmoid")
            calibrator_isotonic_minor_svm = calibrate_scores(svm_anomaly_minor_svm, y_test, method="isotonic")

            # Get calibrated probabilities
            prob_sigmoid_minor_svm = (
                calibrator_sigmoid_minor_svm.predict_proba(svm_anomaly_minor_svm.reshape(-1, 1))[:, 1]
                if hasattr(calibrator_sigmoid_minor_svm, "predict_proba")
                else calibrator_sigmoid_minor_svm.predict(svm_anomaly_minor_svm.reshape(-1, 1))
            )

            prob_isotonic_minor_svm = calibrator_isotonic_minor_svm.predict(svm_anomaly_minor_svm.reshape(-1, 1))

            # Evaluate
            auc_sigmoid_minor_svm = roc_auc_score(y_test, prob_sigmoid_minor_svm)
            aupr_sigmoid_minor_svm = average_precision_score(y_test, prob_sigmoid_minor_svm)
            auc_isotonic_minor_svm = roc_auc_score(y_test, prob_isotonic_minor_svm)
            aupr_isotonic_minor_svm = average_precision_score(y_test, prob_isotonic_minor_svm)

            # Store results
            results_cc['minority']['OCSVM_Sigmoid'].append((auc_sigmoid_minor_svm, aupr_sigmoid_minor_svm))
            results_cc['minority']['OCSVM_Isotonic'].append((auc_isotonic_minor_svm, aupr_isotonic_minor_svm))

            # Print results
            print(f"Minority OCSVM (Minority) + Sigmoid: AUC={auc_sigmoid_minor_svm:.4f}, AUPR={aupr_sigmoid_minor_svm:.4f}")
            print(f"Minority OCSVM (Minority) + Isotonic: AUC={auc_isotonic_minor_svm:.4f}, AUPR={aupr_isotonic_minor_svm:.4f}")



            

            # 2nd Model
            # -- One-Class GMM (Minority) --
            gmm_minor_m = GaussianMixture(n_components=1, random_state=42)
            gmm_minor_m.fit(X_train_minor)


            # Get raw scores and convert to anomaly scores (negate log-likelihood)
            gmm_scores_raw_minor_gmm = gmm_minor_m.score_samples(X_test)
            gmm_anomaly_minor_gmm = -gmm_scores_raw_minor_gmm

            # Calibrate using sigmoid (you can add isotonic similarly if needed)
            calibrator_sigmoid_minor_gmm = calibrate_scores(gmm_anomaly_minor_gmm, y_test, method="sigmoid")
            
            # Get calibrated probabilities
            prob_sigmoid_minor_gmm = (
                calibrator_sigmoid_minor_gmm.predict_proba(gmm_anomaly_minor_gmm.reshape(-1, 1))[:, 1]
                if hasattr(calibrator_sigmoid_minor_gmm, "predict_proba")
                else calibrator_sigmoid_minor_gmm.predict(gmm_anomaly_minor_gmm.reshape(-1, 1))
            )

            # prob_isotonic_minor_gmm = calibrator_isotonic_minor_gmm.predict(gmm_anomaly_minor_gmm.reshape(-1, 1))

            # Evaluate calibrated results
            auc_sigmoid_minor_gmm = roc_auc_score(y_test, prob_sigmoid_minor_gmm)
            aupr_sigmoid_minor_gmm = average_precision_score(y_test, prob_sigmoid_minor_gmm)

            # Save results
            results_cc['minority']['GMM_Sigmoid'].append((auc_sigmoid_minor_gmm, aupr_sigmoid_minor_gmm))
        
            # Print results
            print(f"Minority GMM + Sigmoid: AUC={auc_sigmoid_minor_gmm:.4f}, AUPR={aupr_sigmoid_minor_gmm:.4f}")
            


        else:
            # If no fraud samples in this fold’s training split, skip minority training.
            continue



    print(f"--- Finished Credit Card Repeat {r+1} ---")

## 7- Evaluation

In [None]:
def compute_mean_scores(score_list):
    """
    Given a list of (AUC, AUPRC) tuples, return the average AUC and AUPRC.
    """
    arr = np.array(score_list)   # shape = (num_experiments, 2)
    return arr.mean(axis=0)      # returns (mean_auc, mean_auprc)


In [None]:
for group in results_cc:
    print(f'\n>>> {group.upper()}')
    for method in results_cc[group]:
        scores = results_cc[group][method]
        if scores:
            mean_auc, mean_aupr = compute_mean_scores(scores)
            print(f'{method:20s} | AUC: {mean_auc:.4f} | AUPRC: {mean_aupr:.4f}')
        else:
            print(f'{method:20s} | No scores available.')

## 8- ROC Curve

In [None]:
import matplotlib.pyplot as plt
from sklearn.metrics import roc_curve

# Compute ROC curves for all variants
fpr_sigmoid_minor, tpr_sigmoid_minor, _ = roc_curve(y_test, prob_sigmoid_minor_svm)
fpr_isotonic_minor, tpr_isotonic_minor, _ = roc_curve(y_test, prob_isotonic_minor_svm)
fpr_sigmoid_major, tpr_sigmoid_major, _ = roc_curve(y_test, prob_sigmoid)
fpr_isotonic_major, tpr_isotonic_major, _ = roc_curve(y_test, prob_isotonic)

# fpr_sigmoid_minor_gmm, tpr_sigmoid_minor_gmm, _ = roc_curve(y_test, prob_sigmoid_minor_gmm)
# fpr_sigmoid_major_gmm, tpr_sigmoid_major_gmm, _ = roc_curve(y_test, prob_sigmoid_major_gmm)

### Save Pickled TPR and FPR values for each model

In [None]:
# Ensure the directory exists
os.makedirs("pickled_storage", exist_ok=True)

# Save each (fpr, tpr) pair
with open("pickled_storage/fpr_sigmoid_minor_OCSVM.pkl", "wb") as f:
    pickle.dump(fpr_sigmoid_minor, f)
with open("pickled_storage/tpr_sigmoid_minor_OCSVM.pkl", "wb") as f:
    pickle.dump(tpr_sigmoid_minor, f)
with open("pickled_storage/fpr_isotonic_minor_OCSVM.pkl", "wb") as f:
    pickle.dump(fpr_isotonic_minor, f)
with open("pickled_storage/tpr_isotonic_minor_OCSVM.pkl", "wb") as f:
    pickle.dump(tpr_isotonic_minor, f)


with open("pickled_storage/fpr_sigmoid_major_OCSVM.pkl", "wb") as f:
    pickle.dump(fpr_sigmoid_major, f)
with open("pickled_storage/tpr_sigmoid_major_OCSVM.pkl", "wb") as f:
    pickle.dump(tpr_sigmoid_major, f)
with open("pickled_storage/fpr_isotonic_major_OCSVM.pkl", "wb") as f:
    pickle.dump(fpr_isotonic_major, f)
with open("pickled_storage/tpr_isotonic_major_OCSVM.pkl", "wb")  as f:
    pickle.dump(tpr_isotonic_major, f)



with open("pickled_storage/fpr_sigmoid_minor_gmm.pkl", "rb") as f:
    fpr_sigmoid_minor_gmm = pickle.load(f)
with open("pickled_storage/tpr_sigmoid_minor_gmm.pkl", "rb") as f:
    tpr_sigmoid_minor_gmm = pickle.load(f)
with open("pickled_storage/fpr_sigmoid_major_gmm.pkl", "rb") as f:
    fpr_sigmoid_major_gmm = pickle.load(f)
with open("pickled_storage/tpr_sigmoid_major_gmm.pkl", "rb") as f:
    tpr_sigmoid_major_gmm = pickle.load(f)

### Plot 3 model's ROC Curve in one plot

In [None]:
# Directory with pickled files
path = "pickled_storage"
files = os.listdir(path)

# Models and calibration methods to include
model_order = [
    ("GMM", "sigmoid"),
    ("OCSVM", "sigmoid"),
    ("OCSVM", "isotonic"),
]

# Color map for model types
colors = {
    "GMM": "#C70707",
    "OCSVM + Sigmoid": "#121746",
    "OCSVM + Isotonic": "#43D139",
}

# Store curves as: (label, fpr, tpr, is_minority)
curves = []

for model, calib in model_order:
    model_lower = model.lower()
    calib_lower = calib.lower()

    # Base label for plotting color consistency
    base_label = model if model == "GMM" else f"{model} + {calib.capitalize()}"

    # Find both majority and minority files
    for group in ["major", "minor"]:
        # Skip OCAN models
        fpr_file = [f for f in files if f.startswith(f"fpr_{calib_lower}") and model_lower in f.lower() and group in f.lower() and "OCAN" not in f]
        tpr_file = [f for f in files if f.startswith(f"tpr_{calib_lower}") and model_lower in f.lower() and group in f.lower() and "OCAN" not in f]

        if fpr_file and tpr_file:
            with open(os.path.join(path, fpr_file[0]), "rb") as f:
                fpr = pickle.load(f)
            with open(os.path.join(path, tpr_file[0]), "rb") as f:
                tpr = pickle.load(f)

            is_minority = "minor" in fpr_file[0].lower()
            if base_label == "GMM":
                updated_base_label = "GMM + Sigmoid"
            else:
                updated_base_label = base_label

            label = f"{updated_base_label} ({'Minority' if is_minority else 'Majority'})"
            curves.append((label, fpr, tpr, is_minority, base_label))
        else:
            print(f"[Warning] Missing files for {model} {calib} {group}")

# Plotting
plt.figure(figsize=(10, 7))

for label, fpr, tpr, is_minority, base_label in curves:
    linestyle = "dashed" if is_minority else "-"
    color = colors[base_label]
    plt.plot(fpr, tpr, linestyle=linestyle, color=color, label=label, linewidth=2.0)

plt.plot([0, 1], [0, 1], linestyle="--", color="gray", alpha=0.5)

plt.xlabel("FPR",fontsize=20)
plt.ylabel("TPR",fontsize=20)
# plt.title("ROC Curves by Model and Class",fontsize=14)
plt.legend(fontsize=18)
plt.grid(True, linestyle='--', linewidth=0.5)
plt.tight_layout()
plt.show()
