In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, concat_ws, date_format
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from xgboost import XGBClassifier
from sklearn.metrics import roc_auc_score, accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import numpy as np
import joblib
import json
import os
import matplotlib.pyplot as plt
import seaborn as sns

XGBoostError: 
XGBoost Library (libxgboost.dylib) could not be loaded.
Likely causes:
  * OpenMP runtime is not installed
    - vcomp140.dll or libgomp-1.dll for Windows
    - libomp.dylib for Mac OSX
    - libgomp.so for Linux and other UNIX-like OSes
    Mac OSX users: Run `brew install libomp` to install OpenMP runtime.

  * You are running 32-bit Python on a 64-bit OS

Error message(s): ["dlopen(/Users/haleytran/Downloads/MLE_A2-main/env_asm2/lib/python3.11/site-packages/xgboost/lib/libxgboost.dylib, 0x0006): Library not loaded: @rpath/libomp.dylib\n  Referenced from: <B111F8D5-6AC6-3245-A6B5-94693F6992AB> /Users/haleytran/Downloads/MLE_A2-main/env_asm2/lib/python3.11/site-packages/xgboost/lib/libxgboost.dylib\n  Reason: tried: '/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file), '/System/Volumes/Preboot/Cryptexes/OS/opt/homebrew/opt/libomp/lib/libomp.dylib' (no such file)"]


### Setup

In [None]:
# Start Spark
spark = SparkSession.builder \
    .appName("setup") \
    .master("local[*]") \
    .getOrCreate()

# Load gold layer
features = spark.read.parquet("data/gold/feature_store")
labels   = spark.read.parquet("data/gold/label_store")

# Construct loan_id in features
features = features.withColumn(
    "loan_id",
    concat_ws("_", col("Customer_ID"), date_format(col("feature_snapshot_date"), "yyyy_MM_dd"))
)

# Join labels
labeled_df = features.join(
    labels.select("loan_id", "label"),
    on="loan_id",
    how="left"
)

# Filter labeled only
combined_labeled = labeled_df.filter(col("label").isNotNull())

# Save to datamart
combined_labeled.write.mode("overwrite").parquet("data/gold/combined_labeled.parquet")

print("Setup complete: combined_labeled.parquet written.")

### Logistic Regression

In [None]:
# Load prepared labeled dataset
df = spark.read.parquet("data/gold/combined_labeled.parquet")

# Assemble and scale features
non_features = ['Customer_ID', 'feature_snapshot_date', 'loan_id', 'label']
vec_cols = [c for c in df.columns if c.endswith("_vec")]
feature_cols = [c for c in df.columns if c not in non_features + vec_cols]

assembler = VectorAssembler(inputCols=feature_cols, outputCol="raw_features")
assembled = assembler.transform(df)

scaler = StandardScaler(inputCol="raw_features", outputCol="features", withStd=True, withMean=True)
scaler_model = scaler.fit(assembled)
scaled = scaler_model.transform(assembled)

# Split by time
train_df = scaled.filter((col("feature_snapshot_date") >= "2023-01-01") & (col("feature_snapshot_date") <= "2023-12-01"))
val_df   = scaled.filter(col("feature_snapshot_date") == "2024-01-01")
test_df  = scaled.filter(col("feature_snapshot_date") == "2024-02-01")

# Train logistic regression
lr = LogisticRegression(featuresCol="features", labelCol="label", maxIter=100)
lr_model = lr.fit(train_df)

# Evaluation function
def evaluate_model(predictions):
    evaluator = BinaryClassificationEvaluator(labelCol="label", rawPredictionCol="probability", metricName="areaUnderROC")
    auc = evaluator.evaluate(predictions)

    preds = predictions.withColumn("correct", when(col("prediction") == col("label"), 1).otherwise(0))
    total = preds.count()
    correct = preds.filter(col("correct") == 1).count()
    tp = preds.filter((col("prediction") == 1) & (col("label") == 1)).count()
    fp = preds.filter((col("prediction") == 1) & (col("label") == 0)).count()
    fn = preds.filter((col("prediction") == 0) & (col("label") == 1)).count()

    accuracy = correct / total if total > 0 else 0
    precision = tp / (tp + fp) if (tp + fp) > 0 else 0
    recall    = tp / (tp + fn) if (tp + fn) > 0 else 0
    f1        = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0

    return {
        "AUC": round(auc, 4),
        "Accuracy": round(accuracy, 4),
        "Precision": round(precision, 4),
        "Recall": round(recall, 4),
        "F1 Score": round(f1, 4)
    }

# Create folder for metrics
metrics_dir = "model_store/logreg_metrics"
os.makedirs(metrics_dir, exist_ok=True)

# Evaluate on splits
for name, split in [("train", train_df), ("validation", val_df), ("test", test_df)]:
    pred = lr_model.transform(split)
    metrics = evaluate_model(pred)
    print(f"\n=== {name.capitalize()} Metrics ===")
    print(metrics)
    
    # Save to corresponding JSON
    metrics_path = os.path.join(metrics_dir, f"logreg_{name}.json")
    with open(metrics_path, "w") as f:
        json.dump(metrics, f, indent=2)
    print(f"{name.capitalize()} metrics saved to {metrics_path}")

# Save model (if needed later in selection step)
model_path = "model_store/logistic_model"
if os.path.exists(model_path):
    import shutil
    shutil.rmtree(model_path)
lr_model.save(model_path)
print("\nLogistic regression model saved.")

### XGBoost

In [None]:
# Helper: convert Spark DataFrame to NumPy
def spark_df_to_numpy(df):
    pdf = df.select("features", "label").toPandas()
    X = np.vstack(pdf["features"].values)
    y = pdf["label"].values
    return X, y

X_train, y_train = spark_df_to_numpy(train_df)
X_val, y_val     = spark_df_to_numpy(val_df)
X_test, y_test   = spark_df_to_numpy(test_df)

# Helper: evaluation
def evaluate_model(y_true, y_pred, y_prob):
    return {
        "AUC":       round(roc_auc_score(y_true, y_prob), 4),
        "Accuracy":  round(accuracy_score(y_true, y_pred), 4),
        "Precision": round(precision_score(y_true, y_pred), 4),
        "Recall":    round(recall_score(y_true, y_pred), 4),
        "F1 Score":  round(f1_score(y_true, y_pred), 4)
    }

# Hyperparameter tuning
best_model = None
best_auc = 0
best_n = None

for n in [50, 100, 200]:
    model = XGBClassifier(n_estimators=n, use_label_encoder=False, eval_metric='logloss', random_state=42)
    model.fit(X_train, y_train)
    y_val_prob = model.predict_proba(X_val)[:, 1]
    auc = roc_auc_score(y_val, y_val_prob)
    print(f"n={n} → Val AUC: {auc:.4f}")

    if auc > best_auc:
        best_auc = auc
        best_model = model
        best_n = n

print(f"\nBest XGBoost n_estimators: {best_n}, Validation AUC: {best_auc:.4f}")

# Evaluate on all sets
results = {}
for name, X, y in [("train", X_train, y_train), ("val", X_val, y_val), ("test", X_test, y_test)]:
    y_pred = best_model.predict(X)
    y_prob = best_model.predict_proba(X)[:, 1]
    results[name] = evaluate_model(y, y_pred, y_prob)

# Print & save metrics
metrics_dir = "model_store/xgboost_metrics"
os.makedirs(metrics_dir, exist_ok=True)

for name, metrics in results.items():
    print(f"\n=== {name.capitalize()} Metrics ===")
    print(metrics)

    metrics_path = os.path.join(metrics_dir, f"xgboost_{name}.json")
    with open(metrics_path, "w") as f:
        json.dump(metrics, f, indent=2)
    print(f"{name.capitalize()} metrics saved to {metrics_path}")

# Save model
os.makedirs("model_store", exist_ok=True)
joblib.dump(best_model, "model_store/xgboost_model.pkl")
print("XGBoost model saved to model_store/xgboost_model.pkl")

### Select Best Model

In [None]:
# Load test AUCs from saved metrics
with open("model_store/logreg_metrics/logreg_test.json") as f:
    logreg_metrics = json.load(f)
with open("model_store/xgboost_metrics/xgboost_test.json") as f:
    xgb_metrics = json.load(f)

logreg_auc = logreg_metrics["AUC"]
xgb_auc = xgb_metrics["AUC"]

print("Validation AUCs:")
print(f"Logistic Regression: {logreg_auc}")
print(f"XGBoost: {xgb_auc}")

# Select best model
if xgb_auc > logreg_auc:
    best_model = joblib.load("model_store/xgboost_model.pkl")
    joblib.dump(best_model, "model_store/best_model.pkl")
    print("\nXGBoost selected as best model and saved as best_model.pkl")
else:
    print("\nLogistic Regression has better AUC but cannot be saved with joblib.")
    print("Use PySpark's .load() from model_store/logistic_model when needed.")

### OOT

In [None]:
# Load best model (assumed XGBoost)
model = joblib.load("model_store/best_model.pkl")

# Helper to convert Spark → NumPy
def spark_df_to_numpy(df):
    pdf = df.select("loan_id", "features").toPandas()
    loan_ids = pdf["loan_id"].values
    X = np.vstack(pdf["features"].values)
    return loan_ids, X

# Loop over OOT months
oot_months = ["2024-03-01", "2024-04-01", "2024-05-01", "2024-06-01"]

for month in oot_months:
    df_month = scaled.filter(col("feature_snapshot_date") == month)
    loan_ids, X = spark_df_to_numpy(df_month)

    # Predict probabilities and labels
    y_prob = model.predict_proba(X)[:, 1]
    y_pred = model.predict(X)

    # Create output DataFrame
    pdf_out = {
        "loan_id": loan_ids,
        "prediction": y_pred,
        "probability": y_prob,
        "snapshot_date": [month] * len(loan_ids)
    }

    import pandas as pd
    spark_out = spark.createDataFrame(pd.DataFrame(pdf_out))

    # Save to gold datamart
    output_path = f"datamart/gold/predictions/predictions_oot_{month}.parquet"
    spark_out.write.mode("overwrite").parquet(output_path)
    print(f"✅ Saved predictions for {month} to {output_path}")


### Monitor Stability (PSI)

In [None]:
# Reference = February 2024
ref_df = df.filter(col("feature_snapshot_date") == "2024-02-01").toPandas()

# Feature columns
non_features = ['Customer_ID', 'feature_snapshot_date', 'loan_id', 'label']
vec_cols = [c for c in ref_df.columns if c.endswith("_vec")]
feature_cols = [c for c in ref_df.columns if c not in non_features + vec_cols]

# PSI calculator
def calculate_psi(expected, actual, buckets=10):
    breakpoints = np.percentile(expected, np.linspace(0, 100, buckets + 1))
    expected_bins = np.histogram(expected, bins=breakpoints)[0] / len(expected)
    actual_bins = np.histogram(actual, bins=breakpoints)[0] / len(actual)
    psi = np.sum((expected_bins - actual_bins) * np.log((expected_bins + 1e-6) / (actual_bins + 1e-6)))
    return round(psi, 4)

# Loop through OOT months
monitor_months = ["2024-03-01", "2024-04-01", "2024-05-01", "2024-06-01"]
psi_results = {}

for month in monitor_months:
    current_df = df.filter(col("feature_snapshot_date") == month).toPandas()
    psi_scores = {}

    for feat in feature_cols:
        try:
            psi_scores[feat] = calculate_psi(ref_df[feat].dropna(), current_df[feat].dropna())
        except Exception:
            psi_scores[feat] = None

    # Valid PSI scores only
    valid_psi = {k: v for k, v in psi_scores.items() if v is not None}
    avg_psi = round(np.mean(list(valid_psi.values())), 4)
    max_psi = round(max(valid_psi.values()), 4)

    # Alerting
    drifted_feats = [k for k, v in valid_psi.items() if v > 0.1]

    if avg_psi > 0.25:
        alert = "🔴 Investigate & consider retraining (high average PSI)"
        print(f"\n{month} — {alert}")
        print(f"Avg PSI: {avg_psi}")
        print("Drifted features (PSI > 0.1):", drifted_feats)

    elif avg_psi > 0.1:
        alert = "🟡 Monitor more closely (elevated average PSI)"
        print(f"\n{month} — {alert}")
        print(f"Avg PSI: {avg_psi}")
        print("Drifted features (PSI > 0.1):", drifted_feats)

    elif max_psi > 0.1:
        alert = "🟡 Monitor individual features (some drift)"
        print(f"\n{month} — {alert}")
        print(f"Max PSI: {max_psi}")
        print("Drifted features (PSI > 0.1):", drifted_feats)

    else:
        alert = "🟢 All clear (PSI stable)"
        print(f"\n{month} — {alert}")

    psi_results[month] = {
        "average_psi": avg_psi,
        "max_psi": max_psi,
        "drifted_features": drifted_feats,
        "alert": alert,
        "feature_psi": valid_psi
    }

# Save one JSON file per month
output_dir = "datamart/gold/monitoring/stability"
os.makedirs(output_dir, exist_ok=True)

for month, result in psi_results.items():
    filename = f"{output_dir}/stability_{month}.json"
    with open(filename, "w") as f:
        json.dump(result, f, indent=2)

print("\nStability (PSI) monitoring complete.")

### Monitor Performance (Metrics)

In [None]:
# Load baseline test metrics (from model_store)
with open("model_store/xgboost_metrics.json") as f:
    baseline = json.load(f)

print("\nBaseline (Test Set / February 2024) Metrics")
print(baseline)

# Load labels
labels_df = pd.read_parquet("data/gold/combined_labeled.parquet")[["loan_id", "label"]]

# Evaluation function
def evaluate(y_true, y_pred, y_prob):
    return {
        "AUC":       round(roc_auc_score(y_true, y_prob), 4),
        "Accuracy":  round(accuracy_score(y_true, y_pred), 4),
        "Precision": round(precision_score(y_true, y_pred), 4),
        "Recall":    round(recall_score(y_true, y_pred), 4),
        "F1 Score":  round(f1_score(y_true, y_pred), 4)
    }

# Percentage change calculator
def percent_change(current, reference):
    return round(((current - reference) / reference) * 100, 2)

# Store results and alerts
results = {}

for month in ["2024-03-01", "2024-04-01", "2024-05-01", "2024-06-01"]:
    path = f"datamart/gold/predictions/predictions_oot_{month}.parquet"
    if os.path.exists(path):
        preds_df = pd.read_parquet(path)
        merged = preds_df.merge(labels_df, on="loan_id", how="left")

        if merged["label"].isnull().any():
            print(f"Warning: Missing labels in {month} predictions")

        y_true = merged["label"].values
        y_pred = merged["prediction"].values
        y_prob = merged["probability"].values

        # Evaluate metrics
        current_metrics = evaluate(y_true, y_pred, y_prob)
        print(f"\n=== {month} Metrics ===")
        print(current_metrics)

        # Compare against baseline
        changes = {k: percent_change(current_metrics[k], baseline[k]) for k in baseline}
        dropped = {k: v for k, v in changes.items() if v < -5}
        severe_drops = {k: v for k, v in changes.items() if v < -10}

        # Determine alert level
        if severe_drops:
            alert = "🔴 Significant performance drop"
        elif dropped:
            alert = "🟡 Moderate drop, monitor"
        else:
            alert = "🟢 All clear"

        print(f"% Change vs Test Set: {changes}")
        print(f"Alert: {alert}")
        if dropped:
            print("Metrics with drops:", dropped)

        # Store
        results[month] = {
            "metrics": current_metrics,
            "percentage_change": changes,
            "dropped_metrics": dropped,
            "alert": alert
        }

# Save one JSON file per month
output_dir = "datamart/gold/monitoring/performance"
os.makedirs(output_dir, exist_ok=True)

for month, result in results.items():
    filename = f"{output_dir}/performance_{month}.json"
    with open(filename, "w") as f:
        json.dump(result, f, indent=2)

print("\nPerformance monitoring complete.")

### Visualise Metrics

In [None]:
# Load monthly PSI JSONs
stability_dir = "datamart/gold/monitoring/stability"
psi_data = {}

for file in os.listdir(stability_dir):
    if file.endswith(".json"):
        month = file.replace("stability_", "").replace(".json", "")
        with open(os.path.join(stability_dir, file)) as f:
            psi_data[month] = json.load(f)

psi_df = pd.DataFrame.from_dict(
    {month: data["average_psi"] for month, data in psi_data.items()},
    orient='index', columns=["Average PSI"]
).sort_index()
psi_df.index.name = "Month"

# Load monthly performance JSONs
performance_dir = "datamart/gold/monitoring/performance"
perf_data = {}

for file in os.listdir(performance_dir):
    if file.endswith(".json"):
        month = file.replace("performance_", "").replace(".json", "")
        with open(os.path.join(performance_dir, file)) as f:
            perf_data[month] = json.load(f)

metrics = ["AUC", "Accuracy", "Precision", "Recall", "F1 Score"]
perf_df = pd.DataFrame.from_dict(
    {month: data["metrics"] for month, data in perf_data.items()},
    orient='index'
)[metrics].sort_index()
perf_df.index.name = "Month"

# Plotting
output_dir = "datamart/gold/monitoring"

# PSI Trend Plot
plt.figure(figsize=(8, 6))
sns.lineplot(data=psi_df, x=psi_df.index, y="Average PSI", marker="o")
plt.title("Stability: Average PSI Mar-Jun 2024", fontsize=16)
plt.ylabel("PSI", fontsize=14)
plt.xlabel("Month", fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.grid(True)
plt.tight_layout()
plt.savefig(os.path.join(output_dir, "psi_trend_mar_to_jun.png"))
plt.show()

# Performance Metrics Plot
plt.figure(figsize=(8, 6))
for metric in metrics:
    sns.lineplot(data=perf_df, x=perf_df.index, y=metric, marker="o", label=metric)
plt.title("Performance Metrics Mar-Jun 2024", fontsize=16)
plt.ylabel("Score", fontsize=14)
plt.xlabel("Month", fontsize=14)
plt.xticks(fontsize=12)
plt.yticks(fontsize=12)
plt.legend(fontsize=12)
plt.grid(True)
plt.tight_layout()
plt.savefig(os.path.join(output_dir, "performance_trends_mar_to_jun.png"))
plt.show()