# Credit Risk Model Stress Testing and Monitoring

This notebook demonstrates how to conduct stress testing and ongoing monitoring for credit risk models. We'll cover:

1. Loading a previously validated model
2. Applying stress scenarios to evaluate model resilience
3. Identifying vulnerable customer segments
4. Setting up ongoing model monitoring
5. Detecting performance degradation and data drift
6. Generating monitoring reports and alerts

These practices are critical for effective model risk management and ensuring models remain reliable under adverse conditions.

## Setup

In [None]:
# Import necessary libraries
import os
import sys
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import yaml
import pickle
from datetime import datetime

# Add the parent directory to path to import local modules
module_path = os.path.abspath(os.path.join('..'))
if module_path not in sys.path:
    sys.path.append(module_path)

# Import local modules
from src.data_processing.generate_synthetic_data import generate_credit_data
from src.data_processing.preprocess import preprocess_data, create_feature_pipeline
from src.model_development.models import CreditRiskModel
from src.stress_testing.stress_tester import StressTester, run_stress_test
from src.monitoring.monitor import ModelMonitor, simulate_monitoring_over_time

# Set plotting style
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)

# Load configuration
with open('../config.yaml', 'r') as f:
    config = yaml.safe_load(f)

## 1. Load Model and Data

First, we'll load the model we developed and validated in previous notebooks.

In [None]:
# Try to load a previously saved model, or train a new one if not available
model_type = 'gradient_boosting'  # Change this to match your best model from notebook 1
model_path = f'../models/credit_risk_{model_type}.pkl'

try:
    # Load the model if it exists
    credit_model = CreditRiskModel.load_model(model_path, model_type)
    print(f"Loaded model from {model_path}")
except (FileNotFoundError, pickle.UnpicklingError):
    print(f"Couldn't load model from {model_path}. Training a new model...")
    # Generate data and train a model
    data = generate_credit_data(n_samples=10000, random_seed=42)
    target_variable = config['data']['target_variable']
    
    # Split data
    train_ratio = 0.7
    train_size = int(len(data) * train_ratio)
    train_data = data.iloc[:train_size]
    test_data = data.iloc[train_size:]
    
    # Preprocess data
    pipeline = create_feature_pipeline(config, target_col=target_variable)
    X_train, y_train = preprocess_data(train_data, config, target_col=target_variable, is_training=True)
    X_test, y_test = preprocess_data(test_data, config, target_col=target_variable, is_training=False, preprocessing_pipeline=pipeline)
    
    # Train model
    from src.model_development.models import train_model
    model = train_model(X_train, y_train, model_type=model_type)
    credit_model = CreditRiskModel(model_type, model=model)
    
    # Save the model
    os.makedirs('../models', exist_ok=True)
    credit_model.save_model(model_path)
    print(f"Model saved to {model_path}")

In [None]:
# Load datasets
try:
    # Try to load saved datasets
    train_data = pd.read_csv('../data/credit_data_train.csv')
    test_data = pd.read_csv('../data/credit_data_test.csv')
    print("Loaded existing datasets")
except FileNotFoundError:
    # Generate new datasets if not found
    print("Generating new synthetic datasets")
    from src.data_processing.generate_synthetic_data import split_and_save_data
    data = generate_credit_data(n_samples=10000, random_seed=42)
    train_data, test_data, _ = split_and_save_data(data, output_dir='../data')

# Display dataset sizes
print(f"Training data: {train_data.shape[0]} samples")
print(f"Test data: {test_data.shape[0]} samples")

In [None]:
# Prepare data for stress testing and monitoring
target_variable = config['data']['target_variable']

# Create feature pipeline
pipeline = create_feature_pipeline(config, target_col=target_variable)

# Preprocess datasets
X_train, y_train = preprocess_data(train_data, config, target_col=target_variable, is_training=True)
X_test, y_test = preprocess_data(test_data, config, target_col=target_variable, is_training=False, preprocessing_pipeline=pipeline)

## 2. Stress Testing

Now we'll perform stress testing to evaluate how the model performs under adverse economic conditions.

In [None]:
# Initialize stress tester
stress_tester = StressTester(credit_model.model, config)

In [None]:
# Run stress test across all defined scenarios
stress_results = stress_tester.run_stress_test(X_test, y_test)

# Display results summary
print("Stress Testing Results Summary:\n")
for scenario, results in stress_results['scenario_results'].items():
    print(f"Scenario: {scenario}")
    print(f"  Average Default Probability: {results['avg_default_prob']:.4f}")
    print(f"  Default Rate Increase: {results['default_rate_increase']:.2f}x")
    print(f"  Scenario Severity Rating: {results['severity_rating']}")
    print()

In [None]:
# Plot default probability distribution across scenarios
plt.figure(figsize=(12, 6))

for scenario, results in stress_results['scenario_results'].items():
    bins = results['probability_distribution']['bins']
    freqs = results['probability_distribution']['frequencies']
    plt.plot(bins, freqs, label=scenario, alpha=0.7)
    
plt.title('Default Probability Distribution by Scenario')
plt.xlabel('Default Probability')
plt.ylabel('Frequency')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Calculate capital requirements under different scenarios
capital_results = stress_tester.calculate_capital_requirements(X_test, portfolio_size=10000000, lgd=0.6)

# Convert to DataFrame for display
capital_df = pd.DataFrame([
    {
        'scenario': scenario,
        'expected_loss': results['expected_loss'],
        'unexpected_loss': results['unexpected_loss'],
        'total_capital': results['total_capital_required'],
        'capital_increase': results['capital_increase']
    }
    for scenario, results in capital_results.items()
])

# Format as currency and percentage
for col in ['expected_loss', 'unexpected_loss', 'total_capital']:
    capital_df[col] = capital_df[col].apply(lambda x: f"${x:,.2f}")
    
capital_df['capital_increase'] = capital_df['capital_increase'].apply(lambda x: f"{x:.2f}x")

capital_df

In [None]:
# Identify vulnerable segments
vulnerable_segments = stress_tester.identify_vulnerable_segments(X_test, y_test)

# Display most vulnerable segments
vulnerability_df = pd.DataFrame([
    {
        'segment': segment,
        'baseline_default_rate': results['baseline_default_rate'],
        'stressed_default_rate': results['severe_recession_default_rate'],
        'default_increase': results['default_increase'],
        'vulnerability_rating': results['vulnerability_rating'],
        'segment_size': results['segment_size']
    }
    for segment, results in vulnerable_segments['segments'].items()
])

# Sort by vulnerability
vulnerability_df = vulnerability_df.sort_values('default_increase', ascending=False)

# Format percentages
vulnerability_df['baseline_default_rate'] = vulnerability_df['baseline_default_rate'].apply(lambda x: f"{x:.2%}")
vulnerability_df['stressed_default_rate'] = vulnerability_df['stressed_default_rate'].apply(lambda x: f"{x:.2%}")
vulnerability_df['default_increase'] = vulnerability_df['default_increase'].apply(lambda x: f"{x:.2f}x")

vulnerability_df.head(10)

In [None]:
# Plot top vulnerable segments
top_segments = vulnerability_df.head(5)
segment_names = top_segments['segment'].tolist()
baseline_rates = [float(rate.strip('%'))/100 for rate in top_segments['baseline_default_rate']]
stressed_rates = [float(rate.strip('%'))/100 for rate in top_segments['stressed_default_rate']]

# Create plot
fig, ax = plt.subplots(figsize=(12, 6))
width = 0.35
x = np.arange(len(segment_names))

baseline_bars = ax.bar(x - width/2, baseline_rates, width, label='Baseline', color='skyblue')
stressed_bars = ax.bar(x + width/2, stressed_rates, width, label='Severe Recession', color='salmon')

ax.set_ylabel('Default Rate')
ax.set_title('Most Vulnerable Segments: Baseline vs Stressed Default Rates')
ax.set_xticks(x)
ax.set_xticklabels(segment_names, rotation=45, ha='right')
ax.legend()

# Add data labels
def add_labels(bars):
    for bar in bars:
        height = bar.get_height()
        ax.annotate(f'{height:.1%}',
                    xy=(bar.get_x() + bar.get_width() / 2, height),
                    xytext=(0, 3),  # 3 points vertical offset
                    textcoords="offset points",
                    ha='center', va='bottom')

add_labels(baseline_bars)
add_labels(stressed_bars)

plt.tight_layout()
plt.show()

In [None]:
# Generate stress testing report
stress_dir = '../reports/stress_testing'
os.makedirs(stress_dir, exist_ok=True)

report_path = os.path.join(stress_dir, f'stress_test_report_{datetime.now().strftime("%Y%m%d")}.md')
stress_tester.generate_report(report_path)

print(f"Stress test report generated at: {report_path}")

# Generate stress testing visualizations
stress_tester.plot_stress_results(stress_dir)
print(f"Stress test visualizations saved to: {stress_dir}")

## 3. Model Monitoring

Now we'll set up model monitoring to track performance over time and detect data drift.

In [None]:
# Initialize model monitor
monitor = ModelMonitor(credit_model.model, config, model_id="credit_risk_model_v1")

# Set reference data (training data)
monitor.set_reference_data(X_train, y_train)

In [None]:
# Create functions to generate data drift for simulation
def create_slight_drift(X, y):
    """Create slight data drift."""
    X_drift = X.copy()
    for col in X_drift.select_dtypes(include=['number']).columns:
        X_drift[col] = X_drift[col] * np.random.normal(1, 0.05, size=len(X_drift))
    return X_drift, y

def create_moderate_drift(X, y):
    """Create moderate data drift."""
    X_drift = X.copy()
    for col in X_drift.select_dtypes(include=['number']).columns:
        X_drift[col] = X_drift[col] * np.random.normal(1.05, 0.1, size=len(X_drift))
    
    # Introduce some systematic drift in specific columns
    if 'income' in X_drift.columns:
        X_drift['income'] = X_drift['income'] * 1.15  # Simulate income inflation
    
    return X_drift, y

def create_severe_drift(X, y):
    """Create severe data drift and target shift."""
    X_drift = X.copy()
    
    # Apply severe drift to all numeric columns
    for col in X_drift.select_dtypes(include=['number']).columns:
        X_drift[col] = X_drift[col] * np.random.normal(1.1, 0.15, size=len(X_drift))
    
    # Introduce dramatic shifts in key variables
    if 'income' in X_drift.columns:
        X_drift['income'] = X_drift['income'] * 1.3  # Dramatic increase in income
    
    if 'debt_to_income' in X_drift.columns:
        X_drift['debt_to_income'] = X_drift['debt_to_income'] * 1.25  # Higher debt ratios
    
    # Simulate economic shock affecting performance and data distributions
    y_drift = y.copy()
    # Increase default rate by 20% if 'debt_to_income' is above median
    if 'debt_to_income' in X_drift.columns:
        high_risk_idx = X_drift['debt_to_income'] > X_drift['debt_to_income'].median()
        y_drift[high_risk_idx] = 1  # 1 is the default label
    
    return X_drift, y_drift

In [None]:
# Define drift scenarios for monitoring simulation
drift_scenarios = {
    1: lambda X, y: (X.copy(), y.copy()),  # No drift
    2: create_slight_drift,
    3: create_slight_drift,
    4: create_moderate_drift,
    5: create_moderate_drift, 
    6: create_severe_drift
}

In [None]:
# Run monitoring simulation
print("Simulating model monitoring over 6 time periods...")
simulation_monitor = simulate_monitoring_over_time(
    credit_model.model, 
    X_train, y_train, 
    drift_scenarios, 
    periods=6,
    model_id="credit_risk_model_v1",
    config=config
)

In [None]:
# Display monitoring results
print("Monitoring Results Summary:\n")
for i, result in enumerate(simulation_monitor.monitoring_results):
    print(f"Period {i+1}: {result.period}")
    print(f"  Alert Status: {result.alert_status}")
    print(f"  AUC: {result.performance_metrics.get('roc_auc', 'N/A')}")
    print(f"  Number of Alerts: {len(result.alert_details)}")
    
    # Print alerts for periods with issues
    if result.alert_status != "OK" and result.alert_details:
        print("  Alerts:")
        for alert in result.alert_details:
            print(f"    - {alert['type']}: {alert['message']}")
    print()

In [None]:
# Plot monitoring metrics
simulation_monitor.plot_monitoring_results()

In [None]:
# Extract and plot performance over time
periods = [result.period for result in simulation_monitor.monitoring_results]
auc_values = [result.performance_metrics.get('roc_auc', None) for result in simulation_monitor.monitoring_results]
accuracy_values = [result.performance_metrics.get('accuracy', None) for result in simulation_monitor.monitoring_results]

plt.figure(figsize=(12, 6))
plt.plot(periods, auc_values, 'o-', label='AUC')
plt.plot(periods, accuracy_values, 's-', label='Accuracy')
plt.title('Model Performance Over Time')
plt.xlabel('Period')
plt.ylabel('Metric Value')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()

In [None]:
# Extract and plot data drift metrics
drift_scores = [result.data_drift_metrics.get('overall_drift_score', 0) for result in simulation_monitor.monitoring_results]
drifted_feature_counts = [len(result.data_drift_metrics.get('drifted_features', [])) for result in simulation_monitor.monitoring_results]

fig, ax1 = plt.subplots(figsize=(12, 6))
ax1.set_xlabel('Period')
ax1.set_ylabel('Overall Drift Score', color='tab:blue')
ax1.plot(periods, drift_scores, 'o-', color='tab:blue')
ax1.tick_params(axis='y', labelcolor='tab:blue')

ax2 = ax1.twinx()
ax2.set_ylabel('Number of Drifted Features', color='tab:red')
ax2.plot(periods, drifted_feature_counts, 's-', color='tab:red')
ax2.tick_params(axis='y', labelcolor='tab:red')

plt.title('Data Drift Over Time')
plt.tight_layout()
plt.show()

In [None]:
# Identify top drifted features
drifted_features = {}
for result in simulation_monitor.monitoring_results:
    for feature in result.data_drift_metrics.get('drifted_features', []):
        if feature in drifted_features:
            drifted_features[feature] += 1
        else:
            drifted_features[feature] = 1

# Sort and display top drifted features
top_drifted = sorted(drifted_features.items(), key=lambda x: x[1], reverse=True)[:10]
top_drifted_df = pd.DataFrame(top_drifted, columns=['Feature', 'Drift Occurrences'])
top_drifted_df

In [None]:
# Plot top drifted features
plt.figure(figsize=(10, 6))
plt.barh(top_drifted_df['Feature'], top_drifted_df['Drift Occurrences'], color='salmon')
plt.title('Top Drifted Features')
plt.xlabel('Number of Periods with Drift')
plt.ylabel('Feature')
plt.grid(True, alpha=0.3, axis='x')
plt.tight_layout()
plt.show()

In [None]:
# Generate monitoring report and visualizations
monitoring_dir = '../reports/monitoring'
os.makedirs(monitoring_dir, exist_ok=True)

# Save monitoring results
simulation_monitor.save_results(monitoring_dir)

print(f"Monitoring results and visualizations saved to: {monitoring_dir}")

## 4. Model Retraining Recommendation

Based on the monitoring results, we can make a data-driven decision about whether the model needs to be retrained.

In [None]:
# Check if retraining is recommended based on monitoring results
latest_result = simulation_monitor.monitoring_results[-1]

# Count critical and warning alerts in the latest period
critical_alerts = sum(1 for alert in latest_result.alert_details if alert['severity'] == 'HIGH')
warning_alerts = sum(1 for alert in latest_result.alert_details if alert['severity'] == 'MEDIUM')

# Get performance degradation if any
performance_degradation = None
for alert in latest_result.alert_details:
    if alert['type'] == 'PERFORMANCE_DEGRADATION':
        performance_degradation = alert
        break

# Get data drift if any
data_drift = None
for alert in latest_result.alert_details:
    if alert['type'] == 'DATA_DRIFT':
        data_drift = alert
        break

# Make recommendation
if latest_result.alert_status == "CRITICAL" or critical_alerts > 0:
    recommendation = "Model retraining is REQUIRED. Critical issues detected."
    action = "Retrain the model as soon as possible with the latest data."
elif latest_result.alert_status == "WARNING" or warning_alerts > 0:
    recommendation = "Model retraining is RECOMMENDED. Multiple warning alerts detected."
    action = "Plan for model retraining and review the affected features."
else:
    recommendation = "Model retraining is NOT necessary at this time."
    action = "Continue monitoring the model performance."

# Display recommendation
print("Model Retraining Recommendation:\n")
print(recommendation)
print(f"Recommended Action: {action}")
print("\nBasis for recommendation:")
print(f"- Alert Status: {latest_result.alert_status}")
print(f"- Critical Alerts: {critical_alerts}")
print(f"- Warning Alerts: {warning_alerts}")

if performance_degradation:
    print(f"- Performance Issue: {performance_degradation['message']}")
    
if data_drift:
    print(f"- Data Drift Issue: {data_drift['message']}")

## 5. Summary

In this notebook, we demonstrated two critical components of model risk management for credit risk models:

1. **Stress Testing**: We evaluated the model's performance under adverse economic scenarios, identified vulnerable customer segments, and calculated capital requirements under stress. This helps ensure the bank is prepared for economic downturns and maintains adequate capital reserves.

2. **Model Monitoring**: We simulated monitoring the model over time, detecting performance degradation and data drift. This ongoing monitoring is essential for maintaining model effectiveness and complying with regulatory requirements.

Together, these practices form a robust framework for model risk management that aligns with regulatory expectations for banks and financial institutions.