# Risk Score Analysis - Model Development

This notebook demonstrates the process of developing and evaluating machine learning models for risk scoring. We'll explore different algorithms, hyperparameters, and evaluation metrics to find the best model for our risk scoring pipeline.

In [None]:
# Import required libraries
import os
import logging
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when, count, rand
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression, RandomForestClassifier, GBTClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.mllib.evaluation import BinaryClassificationMetrics

# Scikit-learn and XGBoost imports
from sklearn.ensemble import RandomForestClassifier as SklearnRF
from sklearn.linear_model import LogisticRegression as SklearnLR
from sklearn.metrics import roc_curve, precision_recall_curve, auc, roc_auc_score, confusion_matrix
import xgboost as xgb

# Configure matplotlib
%matplotlib inline
plt.style.use('seaborn-whitegrid')
plt.rcParams['figure.figsize'] = (12, 8)

# Set up logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

In [None]:
# Import project modules
import sys
sys.path.append('..')

from src.config import (
    DELTA_TABLES,
    MODEL_PATH,
    RANDOM_SEED,
    TRAIN_TEST_SPLIT_RATIO,
    VALIDATION_SPLIT_RATIO,
    TARGET_COLUMN,
    CURRENT_DATE
)
from src.utils import (
    get_spark_session, 
    read_delta_table,
    setup_mlflow,
    save_model_artifacts
)
from src.feature_engineering import FeatureEngineer

In [None]:
# Initialize Spark Session
spark = get_spark_session()
print(f"Spark version: {spark.version}")

# Set up MLflow for tracking experiments
setup_mlflow()

## 1. Load and Prepare Data

Load the feature-engineered data from Delta Lake and prepare it for model training.

In [None]:
# Load feature data from Delta Lake
feature_df = read_delta_table(spark, DELTA_TABLES["feature_table"])

if feature_df is None or feature_df.rdd.isEmpty():
    print("Feature table is empty or does not exist. Running feature engineering pipeline.")
    # Initialize feature engineering
    feature_engineer = FeatureEngineer(spark)
    # Run feature engineering pipeline
    feature_df = feature_engineer.run_feature_engineering_pipeline()
    
print(f"Loaded {feature_df.count()} records with {len(feature_df.columns)} features")

# Show feature dataframe schema
feature_df.printSchema()

In [None]:
# Prepare features for modeling
feature_engineer = FeatureEngineer(spark)
prepared_df, pipeline_model = feature_engineer.prepare_features_for_modeling(feature_df)

print(f"Prepared dataframe has {prepared_df.count()} records")
prepared_df.select("features").show(5, truncate=True)

In [None]:
# Split data into training, validation, and test sets
# First split: train + validation and test
train_val_df, test_df = prepared_df.randomSplit(
    [1 - TRAIN_TEST_SPLIT_RATIO, TRAIN_TEST_SPLIT_RATIO],
    seed=RANDOM_SEED
)

# Second split: train and validation
train_ratio = 1 - (VALIDATION_SPLIT_RATIO / (1 - TRAIN_TEST_SPLIT_RATIO))
train_df, val_df = train_val_df.randomSplit(
    [train_ratio, 1 - train_ratio],
    seed=RANDOM_SEED
)

print(f"Data split: Train {train_df.count()}, Validation {val_df.count()}, Test {test_df.count()}")

## 2. Model Development and Evaluation

In this section, we'll train and evaluate different models for risk scoring.

### 2.1 Logistic Regression

In [None]:
# Train Logistic Regression model
lr = LogisticRegression(
    featuresCol="features",
    labelCol=TARGET_COLUMN,
    predictionCol="prediction",
    maxIter=10,
    regParam=0.1,
    elasticNetParam=0.8,
    standardization=True,
    family="binomial"
)

# Define parameter grid for tuning
paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.01, 0.1, 0.3]) \
    .addGrid(lr.elasticNetParam, [0.0, 0.5, 0.8, 1.0]) \
    .addGrid(lr.maxIter, [5, 10, 20]) \
    .build()

# Define evaluator
evaluator = BinaryClassificationEvaluator(
    rawPredictionCol="rawPrediction",
    labelCol=TARGET_COLUMN,
    metricName="areaUnderROC"
)

# Create cross-validator
cv = CrossValidator(
    estimator=lr,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3,
    seed=RANDOM_SEED
)

# Train model with cross-validation
print("Training logistic regression model...")
cv_model = cv.fit(train_df)

# Get best model
lr_model = cv_model.bestModel

# Get parameters of the best model
print(f"Best model parameters:")
print(f"  Regularization parameter: {lr_model.getRegParam()}")
print(f"  Elastic Net parameter: {lr_model.getElasticNetParam()}")
print(f"  Max iterations: {lr_model.getMaxIter()}")

In [None]:
# Evaluate logistic regression model on validation set
lr_predictions = lr_model.transform(val_df)
lr_auc = evaluator.evaluate(lr_predictions)

# Calculate additional metrics
multi_evaluator = MulticlassClassificationEvaluator(
    labelCol=TARGET_COLUMN,
    predictionCol="prediction"
)

lr_accuracy = multi_evaluator.setMetricName("accuracy").evaluate(lr_predictions)
lr_precision = multi_evaluator.setMetricName("weightedPrecision").evaluate(lr_predictions)
lr_recall = multi_evaluator.setMetricName("weightedRecall").evaluate(lr_predictions)
lr_f1 = multi_evaluator.setMetricName("f1").evaluate(lr_predictions)

print("Logistic Regression Evaluation Metrics:")
print(f"  AUC: {lr_auc:.4f}")
print(f"  Accuracy: {lr_accuracy:.4f}")
print(f"  Precision: {lr_precision:.4f}")
print(f"  Recall: {lr_recall:.4f}")
print(f"  F1 Score: {lr_f1:.4f}")

### 2.2 Random Forest

In [None]:
# Train Random Forest model
rf = RandomForestClassifier(
    featuresCol="features",
    labelCol=TARGET_COLUMN,
    predictionCol="prediction",
    numTrees=100,
    maxDepth=10,
    maxBins=32,
    minInstancesPerNode=1,
    seed=RANDOM_SEED
)

# Define parameter grid for tuning
paramGrid = ParamGridBuilder() \
    .addGrid(rf.numTrees, [50, 100]) \
    .addGrid(rf.maxDepth, [5, 10, 15]) \
    .build()

# Create cross-validator
cv = CrossValidator(
    estimator=rf,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,  # reuse evaluator from above
    numFolds=3,
    seed=RANDOM_SEED
)

# Train model with cross-validation
print("Training random forest model...")
cv_model = cv.fit(train_df)

# Get best model
rf_model = cv_model.bestModel

# Get parameters of the best model
print(f"Best model parameters:")
print(f"  Number of trees: {rf_model.getNumTrees()}")
print(f"  Max depth: {rf_model.getMaxDepth()}")

In [None]:
# Evaluate Random Forest model on validation set
rf_predictions = rf_model.transform(val_df)
rf_auc = evaluator.evaluate(rf_predictions)

# Calculate additional metrics
rf_accuracy = multi_evaluator.setMetricName("accuracy").evaluate(rf_predictions)
rf_precision = multi_evaluator.setMetricName("weightedPrecision").evaluate(rf_predictions)
rf_recall = multi_evaluator.setMetricName("weightedRecall").evaluate(rf_predictions)
rf_f1 = multi_evaluator.setMetricName("f1").evaluate(rf_predictions)

print("Random Forest Evaluation Metrics:")
print(f"  AUC: {rf_auc:.4f}")
print(f"  Accuracy: {rf_accuracy:.4f}")
print(f"  Precision: {rf_precision:.4f}")
print(f"  Recall: {rf_recall:.4f}")
print(f"  F1 Score: {rf_f1:.4f}")

In [None]:
# Feature importance from Random Forest model
feature_importances = rf_model.featureImportances
print("Feature importances:")
print(feature_importances)

# TODO: If feature names are available, map feature importances to feature names

### 2.3 XGBoost

In [None]:
# Convert data to format for XGBoost (Spark DataFrame to pandas with numpy arrays)
train_pd = train_df.select("features", TARGET_COLUMN).toPandas()
val_pd = val_df.select("features", TARGET_COLUMN).toPandas()

# Extract features and target
X_train = np.array([x.toArray() for x in train_pd["features"]])
y_train = train_pd[TARGET_COLUMN].values

X_val = np.array([x.toArray() for x in val_pd["features"]])
y_val = val_pd[TARGET_COLUMN].values

print(f"XGBoost training data: {X_train.shape[0]} samples with {X_train.shape[1]} features")

In [None]:
# Train XGBoost model
print("Training XGBoost model...")
xgb_params = {
    "objective": "binary:logistic",
    "eval_metric": "auc",
    "max_depth": 6,
    "learning_rate": 0.1,
    "n_estimators": 100,
    "subsample": 0.8,
    "colsample_bytree": 0.8,
    "seed": RANDOM_SEED
}

xgb_model = xgb.XGBClassifier(**xgb_params)
xgb_model.fit(
    X_train, y_train,
    eval_set=[(X_val, y_val)],
    early_stopping_rounds=10,
    verbose=True
)

In [None]:
# Evaluate XGBoost model on validation set
y_pred_proba = xgb_model.predict_proba(X_val)[:, 1]
y_pred = xgb_model.predict(X_val)

# Calculate metrics
xgb_auc = roc_auc_score(y_val, y_pred_proba)
xgb_accuracy = accuracy_score(y_val, y_pred)
xgb_precision = precision_score(y_val, y_pred, zero_division=0)
xgb_recall = recall_score(y_val, y_pred, zero_division=0)
xgb_f1 = f1_score(y_val, y_pred, zero_division=0)

print("XGBoost Evaluation Metrics:")
print(f"  AUC: {xgb_auc:.4f}")
print(f"  Accuracy: {xgb_accuracy:.4f}")
print(f"  Precision: {xgb_precision:.4f}")
print(f"  Recall: {xgb_recall:.4f}")
print(f"  F1 Score: {xgb_f1:.4f}")

In [None]:
# Plot feature importances from XGBoost
plt.figure(figsize=(12, 8))
xgb.plot_importance(xgb_model, max_num_features=20)
plt.title('XGBoost Feature Importances')
plt.tight_layout()
plt.show()

## 3. Model Comparison and Selection

Compare the performance of different models and select the best one for risk scoring.

In [None]:
# Compile validation metrics for all models
models = {
    'Logistic Regression': {
        'auc': lr_auc,
        'accuracy': lr_accuracy,
        'precision': lr_precision,
        'recall': lr_recall,
        'f1': lr_f1
    },
    'Random Forest': {
        'auc': rf_auc,
        'accuracy': rf_accuracy,
        'precision': rf_precision,
        'recall': rf_recall,
        'f1': rf_f1
    },
    'XGBoost': {
        'auc': xgb_auc,
        'accuracy': xgb_accuracy,
        'precision': xgb_precision,
        'recall': xgb_recall,
        'f1': xgb_f1
    }
}

# Convert to DataFrame for visualization
model_comparison = pd.DataFrame(models).T
print("Model Comparison on Validation Set:")
display(model_comparison)

# Plot model comparison
plt.figure(figsize=(12, 8))
model_comparison.plot(kind='bar')
plt.title('Model Comparison on Validation Set')
plt.ylabel('Score')
plt.ylim(0, 1)
plt.grid(axis='y')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.show()

In [None]:
# Select the best model based on AUC
best_model_name = model_comparison['auc'].idxmax()
best_auc = model_comparison.loc[best_model_name, 'auc']

print(f"Best model based on AUC: {best_model_name} with AUC = {best_auc:.4f}")

## 4. Final Evaluation on Test Set

Evaluate the best model on the holdout test set to get a final assessment of model performance.

In [None]:
# Evaluate models on test set
print("Evaluating models on test set...")

# Evaluate Logistic Regression
lr_test_predictions = lr_model.transform(test_df)
lr_test_auc = evaluator.evaluate(lr_test_predictions)
lr_test_accuracy = multi_evaluator.setMetricName("accuracy").evaluate(lr_test_predictions)
lr_test_precision = multi_evaluator.setMetricName("weightedPrecision").evaluate(lr_test_predictions)
lr_test_recall = multi_evaluator.setMetricName("weightedRecall").evaluate(lr_test_predictions)
lr_test_f1 = multi_evaluator.setMetricName("f1").evaluate(lr_test_predictions)

# Evaluate Random Forest
rf_test_predictions = rf_model.transform(test_df)
rf_test_auc = evaluator.evaluate(rf_test_predictions)
rf_test_accuracy = multi_evaluator.setMetricName("accuracy").evaluate(rf_test_predictions)
rf_test_precision = multi_evaluator.setMetricName("weightedPrecision").evaluate(rf_test_predictions)
rf_test_recall = multi_evaluator.setMetricName("weightedRecall").evaluate(rf_test_predictions)
rf_test_f1 = multi_evaluator.setMetricName("f1").evaluate(rf_test_predictions)

# Evaluate XGBoost
test_pd = test_df.select("features", TARGET_COLUMN).toPandas()
X_test = np.array([x.toArray() for x in test_pd["features"]])
y_test = test_pd[TARGET_COLUMN].values

xgb_test_pred_proba = xgb_model.predict_proba(X_test)[:, 1]
xgb_test_pred = xgb_model.predict(X_test)

xgb_test_auc = roc_auc_score(y_test, xgb_test_pred_proba)
xgb_test_accuracy = accuracy_score(y_test, xgb_test_pred)
xgb_test_precision = precision_score(y_test, xgb_test_pred, zero_division=0)
xgb_test_recall = recall_score(y_test, xgb_test_pred, zero_division=0)
xgb_test_f1 = f1_score(y_test, xgb_test_pred, zero_division=0)

In [None]:
# Compile test metrics for all models
test_metrics = {
    'Logistic Regression': {
        'auc': lr_test_auc,
        'accuracy': lr_test_accuracy,
        'precision': lr_test_precision,
        'recall': lr_test_recall,
        'f1': lr_test_f1
    },
    'Random Forest': {
        'auc': rf_test_auc,
        'accuracy': rf_test_accuracy,
        'precision': rf_test_precision,
        'recall': rf_test_recall,
        'f1': rf_test_f1
    },
    'XGBoost': {
        'auc': xgb_test_auc,
        'accuracy': xgb_test_accuracy,
        'precision': xgb_test_precision,
        'recall': xgb_test_recall,
        'f1': xgb_test_f1
    }
}

# Convert to DataFrame
test_comparison = pd.DataFrame(test_metrics).T
print("Model Comparison on Test Set:")
display(test_comparison)

# Plot model comparison on test set
plt.figure(figsize=(12, 8))
test_comparison.plot(kind='bar')
plt.title('Model Comparison on Test Set')
plt.ylabel('Score')
plt.ylim(0, 1)
plt.grid(axis='y')
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left')
plt.tight_layout()
plt.show()

In [None]:
# Select the final best model based on test AUC
final_best_model_name = test_comparison['auc'].idxmax()
final_best_auc = test_comparison.loc[final_best_model_name, 'auc']

print(f"Final best model based on test AUC: {final_best_model_name} with AUC = {final_best_auc:.4f}")

## 5. ROC and Precision-Recall Curves

Plot ROC and Precision-Recall curves for the models to visualize their performance across different thresholds.

In [None]:
# Plot ROC curves for all models
plt.figure(figsize=(10, 8))

# XGBoost ROC curve
fpr_xgb, tpr_xgb, _ = roc_curve(y_test, xgb_test_pred_proba)
plt.plot(fpr_xgb, tpr_xgb, label=f'XGBoost (AUC = {xgb_test_auc:.4f})')

# For Spark models, convert predictions to pandas
lr_preds_pd = lr_test_predictions.select(TARGET_COLUMN, "probability").toPandas()
lr_probs = np.array([p[1] for p in lr_preds_pd["probability"]])
lr_true = lr_preds_pd[TARGET_COLUMN].values

rf_preds_pd = rf_test_predictions.select(TARGET_COLUMN, "probability").toPandas()
rf_probs = np.array([p[1] for p in rf_preds_pd["probability"]])
rf_true = rf_preds_pd[TARGET_COLUMN].values

# Logistic Regression ROC curve
fpr_lr, tpr_lr, _ = roc_curve(lr_true, lr_probs)
plt.plot(fpr_lr, tpr_lr, label=f'Logistic Regression (AUC = {lr_test_auc:.4f})')

# Random Forest ROC curve
fpr_rf, tpr_rf, _ = roc_curve(rf_true, rf_probs)
plt.plot(fpr_rf, tpr_rf, label=f'Random Forest (AUC = {rf_test_auc:.4f})')

# Add diagonal line for reference (random classifier)
plt.plot([0, 1], [0, 1], 'k--', label='Random')

plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curves for Different Models')
plt.legend(loc='lower right')
plt.grid(True)
plt.show()

In [None]:
# Plot Precision-Recall curves for all models
plt.figure(figsize=(10, 8))

# XGBoost PR curve
precision_xgb, recall_xgb, _ = precision_recall_curve(y_test, xgb_test_pred_proba)
pr_auc_xgb = auc(recall_xgb, precision_xgb)
plt.plot(recall_xgb, precision_xgb, label=f'XGBoost (AUC = {pr_auc_xgb:.4f})')

# Logistic Regression PR curve
precision_lr, recall_lr, _ = precision_recall_curve(lr_true, lr_probs)
pr_auc_lr = auc(recall_lr, precision_lr)
plt.plot(recall_lr, precision_lr, label=f'Logistic Regression (AUC = {pr_auc_lr:.4f})')

# Random Forest PR curve
precision_rf, recall_rf, _ = precision_recall_curve(rf_true, rf_probs)
pr_auc_rf = auc(recall_rf, precision_rf)
plt.plot(recall_rf, precision_rf, label=f'Random Forest (AUC = {pr_auc_rf:.4f})')

# Add baseline for reference (ratio of positives)
baseline = np.sum(y_test) / len(y_test)
plt.axhline(y=baseline, color='k', linestyle='--', label=f'Baseline ({baseline:.4f})')

plt.xlabel('Recall')
plt.ylabel('Precision')
plt.title('Precision-Recall Curves for Different Models')
plt.legend(loc='lower left')
plt.grid(True)
plt.show()

## 6. Confusion Matrix

Examine the confusion matrix for the best model to understand its predictions in more detail.

In [None]:
# Create confusion matrix for the best model
if final_best_model_name == 'Logistic Regression':
    best_preds = lr_test_predictions.select(TARGET_COLUMN, "prediction").toPandas()
    y_true = best_preds[TARGET_COLUMN].values
    y_pred = best_preds["prediction"].values
elif final_best_model_name == 'Random Forest':
    best_preds = rf_test_predictions.select(TARGET_COLUMN, "prediction").toPandas()
    y_true = best_preds[TARGET_COLUMN].values
    y_pred = best_preds["prediction"].values
else:  # XGBoost
    y_true = y_test
    y_pred = xgb_test_pred

# Generate confusion matrix
cm = confusion_matrix(y_true, y_pred)

# Plot confusion matrix
plt.figure(figsize=(8, 6))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', 
            xticklabels=['Non-Default', 'Default'],
            yticklabels=['Non-Default', 'Default'])
plt.xlabel('Predicted')
plt.ylabel('Actual')
plt.title(f'Confusion Matrix - {final_best_model_name}')
plt.tight_layout()
plt.show()

# Calculate additional metrics from confusion matrix
tn, fp, fn, tp = cm.ravel()
sensitivity = tp / (tp + fn) if (tp + fn) > 0 else 0  # True Positive Rate (Recall)
specificity = tn / (tn + fp) if (tn + fp) > 0 else 0  # True Negative Rate
precision = tp / (tp + fp) if (tp + fp) > 0 else 0    # Precision
npv = tn / (tn + fn) if (tn + fn) > 0 else 0          # Negative Predictive Value

print(f"Confusion Matrix Metrics for {final_best_model_name}:")
print(f"  True Negatives: {tn}")
print(f"  False Positives: {fp}")
print(f"  False Negatives: {fn}")
print(f"  True Positives: {tp}")
print(f"  Sensitivity (Recall): {sensitivity:.4f}")
print(f"  Specificity: {specificity:.4f}")
print(f"  Precision: {precision:.4f}")
print(f"  Negative Predictive Value: {npv:.4f}")

## 7. Save the Best Model

Save the best model and pipeline for use in the risk scoring system.

In [None]:
# Save the best model
print(f"Saving best model: {final_best_model_name}")

# Get the model object and type
if final_best_model_name == 'Logistic Regression':
    best_model = lr_model
    model_type = 'spark_lr'
    test_metrics = test_metrics['Logistic Regression']
elif final_best_model_name == 'Random Forest':
    best_model = rf_model
    model_type = 'spark_rf'
    test_metrics = test_metrics['Random Forest']
else:  # XGBoost
    best_model = xgb_model
    model_type = 'xgboost'
    test_metrics = test_metrics['XGBoost']

# Get feature names
if 'feature_df' in locals():
    feature_names = [col for col in feature_df.columns 
                   if col != TARGET_COLUMN]
else:
    feature_names = [f"feature_{i}" for i in range(X_train.shape[1])]

# Define model parameters
if model_type == 'spark_lr':
    params = {
        "maxIter": best_model.getMaxIter(),
        "regParam": best_model.getRegParam(),
        "elasticNetParam": best_model.getElasticNetParam()
    }
elif model_type == 'spark_rf':
    params = {
        "numTrees": best_model.getNumTrees(),
        "maxDepth": best_model.getMaxDepth(),
        "maxBins": best_model.getMaxBins()
    }
elif model_type == 'xgboost':
    params = {
        "max_depth": best_model.get_params()['max_depth'],
        "learning_rate": best_model.get_params()['learning_rate'],
        "n_estimators": best_model.get_params()['n_estimators'],
        "subsample": best_model.get_params()['subsample'],
        "colsample_bytree": best_model.get_params()['colsample_bytree']
    }

# Add common parameters
params.update({
    "model_type": model_type,
    "training_date": datetime.now().strftime('%Y-%m-%d'),
    "random_seed": RANDOM_SEED
})

# Create model name with timestamp
model_name = f"{model_type}_model_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

# Save model artifacts
model_path = save_model_artifacts(
    model=best_model,
    model_name=model_name,
    model_type=model_type,
    features=feature_names,
    metrics=test_metrics,
    params=params
)

# Save pipeline model if available
if 'pipeline_model' in locals() and pipeline_model is not None:
    pipeline_path = os.path.join(model_path, "pipeline_model")
    pipeline_model.write().overwrite().save(pipeline_path)
    print(f"Saved pipeline model to {pipeline_path}")

print(f"Best model saved to: {model_path}")

## 8. Risk Score Generation

Demonstrate how the model can be used to generate risk scores for loan applications.

In [None]:
# Define risk score ranges
RISK_SCORE_RANGES = {
    "very_low": (0, 20),
    "low": (21, 40),
    "medium": (41, 60),
    "high": (61, 80),
    "very_high": (81, 100)
}

In [None]:
# Generate risk scores for a sample of test data
if model_type.startswith('spark'):
    # Use Spark model to make predictions
    risk_preds = best_model.transform(test_df.limit(100))
    
    # Extract probability of default (class 1)
    from pyspark.sql.functions import udf, col, when, lit
    from pyspark.sql.types import DoubleType, IntegerType, StringType
    
    # Extract default probability and calculate risk score
    risk_df = risk_preds.withColumn(
        "default_probability", 
        risk_preds["probability"].getItem(1)
    ).withColumn(
        "risk_score",
        (col("default_probability") * 100).cast(IntegerType())
    )
    
    # Add risk category
    risk_df = risk_df.withColumn(
        "risk_category",
        when(col("risk_score").between(RISK_SCORE_RANGES["very_low"][0], RISK_SCORE_RANGES["very_low"][1]), "very_low")
        .when(col("risk_score").between(RISK_SCORE_RANGES["low"][0], RISK_SCORE_RANGES["low"][1]), "low")
        .when(col("risk_score").between(RISK_SCORE_RANGES["medium"][0], RISK_SCORE_RANGES["medium"][1]), "medium")
        .when(col("risk_score").between(RISK_SCORE_RANGES["high"][0], RISK_SCORE_RANGES["high"][1]), "high")
        .when(col("risk_score").between(RISK_SCORE_RANGES["very_high"][0], RISK_SCORE_RANGES["very_high"][1]), "very_high")
        .otherwise("unknown")
    )
    
    # Display sample risk scores
    display(risk_df.select(
        "default_probability", "risk_score", "risk_category", TARGET_COLUMN, "prediction"
    ).limit(10))
    
    # Create distribution of risk categories
    risk_dist = risk_df.groupBy("risk_category").count().orderBy("risk_category")
    risk_dist_pd = risk_dist.toPandas()
    
    # Plot distribution of risk categories
    plt.figure(figsize=(10, 6))
    plt.bar(risk_dist_pd["risk_category"], risk_dist_pd["count"])
    plt.title('Distribution of Risk Categories')
    plt.xlabel('Risk Category')
    plt.ylabel('Count')
    plt.tight_layout()
    plt.show()
    
else:  # XGBoost model
    # Use scikit-learn/XGBoost model to make predictions
    sample_X = X_test[:100]
    sample_y = y_test[:100]
    
    # Get predictions and probabilities
    sample_preds = best_model.predict(sample_X)
    sample_probs = best_model.predict_proba(sample_X)[:, 1]
    
    # Calculate risk scores
    risk_scores = (sample_probs * 100).astype(int)
    
    # Assign risk categories
    def get_risk_category(score):
        if RISK_SCORE_RANGES["very_low"][0] <= score <= RISK_SCORE_RANGES["very_low"][1]:
            return "very_low"
        elif RISK_SCORE_RANGES["low"][0] <= score <= RISK_SCORE_RANGES["low"][1]:
            return "low"
        elif RISK_SCORE_RANGES["medium"][0] <= score <= RISK_SCORE_RANGES["medium"][1]:
            return "medium"
        elif RISK_SCORE_RANGES["high"][0] <= score <= RISK_SCORE_RANGES["high"][1]:
            return "high"
        elif RISK_SCORE_RANGES["very_high"][0] <= score <= RISK_SCORE_RANGES["very_high"][1]:
            return "very_high"
        else:
            return "unknown"
    
    risk_categories = [get_risk_category(score) for score in risk_scores]
    
    # Create DataFrame with results
    risk_results = pd.DataFrame({
        "default_probability": sample_probs,
        "risk_score": risk_scores,
        "risk_category": risk_categories,
        TARGET_COLUMN: sample_y,
        "prediction": sample_preds
    })
    
    # Display sample risk scores
    display(risk_results.head(10))
    
    # Plot distribution of risk categories
    risk_dist = risk_results["risk_category"].value_counts().sort_index()
    
    plt.figure(figsize=(10, 6))
    risk_dist.plot(kind='bar')
    plt.title('Distribution of Risk Categories')
    plt.xlabel('Risk Category')
    plt.ylabel('Count')
    plt.tight_layout()
    plt.show()

## 9. Summary and Conclusions

Based on our model development and evaluation, here are the key findings and next steps for the risk scoring system:

### Key Findings
- [Document the best performing model and its metrics]
- [Summarize important features for risk prediction]
- [Describe the distribution of risk scores in the dataset]

### Next Steps
1. Implement the full risk scoring pipeline using the best model
2. Create monitoring and retraining processes for the model
3. Develop a system to explain risk scores and provide reasons
4. Validate the risk scoring system on new data

### Potential Improvements
- [List potential improvements to the model or pipeline]
- [Suggest additional features or data sources]
- [Recommend model deployment and monitoring strategies]