## XGBoost Model Training and SHAP-Based Evaluation for Loan Default Prediction

This notebook trains an XGBoost model to predict loan defaults using tabular data. It includes hyperparameter tuning, performance evaluation (AUC, F1, precision, recall), and SHAP-based model interpretability. Key outputs like plots, metrics, and explanations are saved for analysis.

Note: This script is intended for academic reference only.

In [None]:
import pandas as pd
import numpy as np
import joblib

import xgboost as xgb

import os
import random

from sklearn.model_selection import GridSearchCV
from sklearn.metrics import (
    roc_auc_score,
    accuracy_score,
    precision_score,
    average_precision_score,
    recall_score,
    confusion_matrix,
    ConfusionMatrixDisplay,
    precision_recall_curve,
    roc_curve,
    classification_report
)
from sklearn.utils import resample

import matplotlib.pyplot as plt

MODEL_FILE_NAME = 'trained_xgb_model.pkl'
MODEL_SAVE_DIR = "./"
MODEL_PATH = os.path.join(MODEL_SAVE_DIR, MODEL_FILE_NAME)
EVAL_SAVE_DIR = '../evaluations/xgb_base_model_performance/'

os.makedirs(EVAL_SAVE_DIR, exist_ok=True)

def load_data():

    train = pd.read_csv('../data/cleaned_data/df_origination_train_scaled.csv')
    test = pd.read_csv('../data/cleaned_data/df_origination_test_scaled.csv')
    exclude_cols = ['id', 'id_loan', 'year', 'month', 'provider', 'area', 'svcg_cycle']
    target_col = 'default'
    
    if target_col not in train.columns and 'loan_defaulted' in train.columns:
        target_col = 'loan_defaulted'
    
    if 'd_timer' in train.columns:
        exclude_cols.append('d_timer')

    X_train = train.drop(columns=[col for col in exclude_cols if col in train.columns] + [target_col])
    y_train = train[target_col]
    print(f"Target variable distribution in training data:\n{y_train.value_counts(normalize=True)}")

    X_test = test.drop(columns=[col for col in exclude_cols if col in test.columns] + [target_col])
    y_test = test[target_col]

    print("Data loaded successfully.")
    print(f"Features: {X_train.shape[1]}")
    print(f"Feature names: {', '.join(X_train.columns[:5])}... (and {X_train.shape[1]-5} more)")
    return X_train, y_train, X_test, y_test

def train_model(X_train, y_train):

    neg_count = (y_train == 0).sum()
    pos_count = (y_train == 1).sum()
    scale_pos_weight_value = neg_count / pos_count

    print(f"Calculated scale_pos_weight: {scale_pos_weight_value:.2f}")

    model = xgb.XGBClassifier(
        objective='binary:logistic',
        eval_metric='auc',
        random_state=42,
        gamma=0.001,
        n_jobs=-1,
    )

    param_grid = {
        'n_estimators': [50, 100, 150],
        'max_depth': [2, 3, 4],
        'learning_rate': [0.01, 0.1, 0.2],
        'subsample': [0.8, 1.0],
        'colsample_bytree': [0.8, 1.0],
        'scale_pos_weight': [1, 2, 3, 5, 8, 10, scale_pos_weight_value]
    }

    grid_search = GridSearchCV(
        estimator=model,
        param_grid=param_grid,
        scoring='roc_auc',
        cv=5,
        n_jobs=-1,
        verbose=1,
    )

    print("Starting GridSearchCV for XGBoost model...")
    grid_search.fit(X_train, y_train)
    
    print(f"Best parameters found: {grid_search.best_params_}")
    print(f"Best cross-validation AUC: {grid_search.best_score_:.4f}")

    return grid_search.best_estimator_

def optimize_f1_threshold(model, X_test, y_test, eval_save_dir):

    y_probs = model.predict_proba(X_test)[:, 1]
    precision, recall, thresholds = precision_recall_curve(y_test, y_probs)
    f1_scores = 2 * (precision * recall) / (precision + recall + 1e-8)
    best_idx = np.argmax(f1_scores)
    best_threshold = thresholds[best_idx]
    best_f1 = f1_scores[best_idx]

    print(f"\n### F1-score Optimization ###")
    print(f"Optimized F1-score: {best_f1:.4f} at threshold: {best_threshold:.4f}")

    plt.figure(figsize=(8, 6))
    plt.plot(thresholds, f1_scores[:-1], label="F1-score")
    plt.axvline(best_threshold, color='red', linestyle='--', label=f'Best threshold = {best_threshold:.4f}')
    plt.xlabel("Threshold")
    plt.ylabel("F1-score")
    plt.title("F1-score vs. Classification Threshold")
    plt.legend()
    plt.grid(True)
    plt.savefig(os.path.join(eval_save_dir, 'f1_threshold_plot.png'))
    plt.close()
    print(f"F1-score vs. Threshold plot saved to {os.path.join(eval_save_dir, 'f1_threshold_plot.png')}")

    y_pred_optimized = (y_probs >= best_threshold).astype(int)
    print("\n### Classification Report at Optimized Threshold ###")
    print(classification_report(y_test, y_pred_optimized, target_names=['No Default', 'Default']))

    return best_threshold, best_f1

def evaluate_model_metrics_and_plots(model, X_test, y_test, threshold, eval_save_dir):

    y_proba = model.predict_proba(X_test)[:, 1]
    y_pred = (y_proba >= threshold).astype(int)
    auc = roc_auc_score(y_test, y_proba)
    acc = accuracy_score(y_test, y_pred)
    prec = precision_score(y_test, y_pred, zero_division=0)
    rec = recall_score(y_test, y_pred, zero_division=0)
    f1 = 2 * (prec * rec) / (prec + rec + 1e-8)

    print(f"\n### Test Performance (Threshold = {threshold:.2f}) ###")
    print(f"AUC:         {auc:.4f}")
    print(f"Accuracy:    {acc:.4f}")
    print(f"Precision:   {prec:.4f}")
    print(f"Recall:      {rec:.4f}")
    print(f"F1-Score:    {f1:.4f}")

    cm = confusion_matrix(y_test, y_pred)
    disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=['No Default', 'Default'])
    fig, ax = plt.subplots(figsize=(6, 6))
    disp.plot(ax=ax, cmap='Blues')
    plt.title(f"Confusion Matrix (Threshold = {threshold:.2f})")
    plt.savefig(os.path.join(eval_save_dir, 'confusion_matrix.png'))
    plt.close()
    print(f"Confusion Matrix saved to {os.path.join(eval_save_dir, 'confusion_matrix.png')}")

    fpr, tpr, _ = roc_curve(y_test, y_proba)
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'AUC = {auc:.4f}')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('ROC Curve')
    plt.legend(loc="lower right")
    plt.grid(True)
    plt.savefig(os.path.join(eval_save_dir, 'roc_curve.png'))
    plt.close()
    print(f"ROC curve saved to {os.path.join(eval_save_dir, 'roc_curve.png')}")

    precision, recall, _ = precision_recall_curve(y_test, y_proba)
    pr_auc = average_precision_score(y_test, y_proba)
    plt.figure(figsize=(8, 6))
    plt.plot(recall, precision, color='purple', lw=2, label=f'PR AUC = {pr_auc:.4f}')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.grid(True)
    plt.legend()
    plt.savefig(os.path.join(eval_save_dir, 'precision_recall_curve.png'))
    plt.close()
    print(f"Precision-Recall curve saved to {os.path.join(eval_save_dir, 'precision_recall_curve.png')}")

def bootstrap_evaluation(model, X_test, y_test, n_iterations=1000):

    auc_scores = []
    print(f"\nStarting bootstrap evaluation ({n_iterations} iterations)...")
    for i in range(n_iterations):
        X_resampled, y_resampled = resample(X_test, y_test, random_state=i)
        y_proba = model.predict_proba(X_resampled)[:, 1]
        auc_scores.append(roc_auc_score(y_resampled, y_proba))

    mean_auc = np.mean(auc_scores)
    ci_lower = np.percentile(auc_scores, 2.5)
    ci_upper = np.percentile(auc_scores, 97.5)

    print(f"\n### Bootstrap Results ({n_iterations} iterations) ###")
    print(f"Mean AUC: {mean_auc:.4f}")
    print(f"95% CI: [{ci_lower:.4f}, {ci_upper:.4f}]")

def get_top_shap_features(model, X_data, top_k=10):

    explainer = shap.TreeExplainer(model)
    shap_values = explainer.shap_values(X_data)
    if isinstance(shap_values, list):
        shap_values = shap_values[1]

    mean_abs_shap = np.abs(shap_values).mean(axis=0)

    feature_importance = pd.DataFrame({
        'feature': X_data.columns,
        'importance': mean_abs_shap
    }).sort_values(by='importance', ascending=False)

    return feature_importance.head(top_k).to_dict(orient='records')

def shap_analysis_global(model, X_train, eval_save_dir):

    explainer = shap.TreeExplainer(model)
    shap_values_train = explainer.shap_values(X_train)

    if isinstance(shap_values_train, list):
        shap_values_train = shap_values_train[1]

    print("\nGenerating Global SHAP plots...")

    shap.summary_plot(shap_values_train, X_train, plot_type="bar", show=False)
    plt.title("Global Feature Importance (Mean Absolute SHAP)")
    plt.savefig(os.path.join(eval_save_dir, 'shap_global_bar.png'), bbox_inches='tight')
    plt.close()
    print(f"SHAP global bar plot saved to {os.path.join(eval_save_dir, 'shap_global_bar.png')}")

    shap.summary_plot(shap_values_train, X_train, show=False)
    plt.title("SHAP Feature Impact Summary")
    plt.savefig(os.path.join(eval_save_dir, 'shap_summary_beeswarm.png'), bbox_inches='tight')
    plt.close()
    print(f"SHAP summary beeswarm plot saved to {os.path.join(eval_save_dir, 'shap_summary_beeswarm.png')}")

def export_shap_insights_for_single_borrower(model, X_test, y_test, optimized_threshold, num_top_features=5):

    explainer = shap.TreeExplainer(model)
    random_idx = random.choice(X_test.index.tolist())
    single_instance = X_test.loc[[random_idx]]
    single_prediction_shap_values = explainer.shap_values(single_instance)

    if isinstance(single_prediction_shap_values, list):
        single_prediction_shap_values = single_prediction_shap_values[1]

    single_shap_dict = {
        'index': int(random_idx),
        'predicted_proba': float(model.predict_proba(single_instance)[:, 1][0]),
        'actual_label': int(y_test.loc[random_idx]),
        'top_features_impact': {}
    }

    feature_impact = pd.DataFrame({
        'feature': X_test.columns,
        'shap_value': single_prediction_shap_values[0]
    }).sort_values(by='shap_value', key=abs, ascending=False)

    for _, row in feature_impact.head(num_top_features).iterrows():
        single_shap_dict['top_features_impact'][row['feature']] = float(row['shap_value'])

    prediction_outcome = "Default" if single_shap_dict['predicted_proba'] >= optimized_threshold else "No Default"
    actual_outcome = "Default" if single_shap_dict['actual_label'] == 1 else "No Default"

    print(f"\n### Selected Borrower SHAP Insights (Random Sample) ###")
    print(f"- Loan Index: {single_shap_dict['index']}")
    print(f"- Predicted Outcome: {prediction_outcome} (Probability: {single_shap_dict['predicted_proba']:.4f})")
    print(f"- Actual Outcome: {actual_outcome}")
    print(f"- Top {num_top_features} Contributing Factors:")
    for feat, val in single_shap_dict['top_features_impact'].items():
        print(f"-> {feat}: {val:.4f}")

    return single_shap_dict

if __name__ == "__main__":
    
    print("### Starting XGBoost Base Model Training and Evaluation Workflow ###")

    X_train, y_train, X_test, y_test = load_data()

    model = train_model(X_train, y_train)
    print("XGBoost model training complete.")

    joblib.dump(model, MODEL_PATH)
    print(f"Model saved to {MODEL_PATH}")

    print("\n### Starting Model Evaluation and Analysis ###")

    best_threshold, _ = optimize_f1_threshold(model, X_test, y_test, EVAL_SAVE_DIR)
    print(f"F1-score optimization complete. Best Threshold: {best_threshold:.4f}")

    evaluate_model_metrics_and_plots(model, X_test, y_test, threshold=best_threshold, eval_save_dir=EVAL_SAVE_DIR)
    print("Core model evaluation complete and plots saved.")

    bootstrap_evaluation(model, X_test, y_test)
    print("Bootstrap evaluation complete.")

    shap_analysis_global(model, X_train, EVAL_SAVE_DIR)
    print("Global SHAP plots generated and saved.")

    single_borrower_insights = export_shap_insights_for_single_borrower(model, X_test, y_test, best_threshold)
    print("SHAP insights for a single borrower exported.")