In [None]:
import pandas as pd
import pyodbc

# Connexion to SQL Server
conn = pyodbc.connect(
    "Driver={SQL Server};"
    "Server=DESKTOP-UA5JP12\\SQLEXPRESS;"
    "Database=CreditRiskDW;"
    "Trusted_Connection=yes;"
)

df = pd.read_sql("SELECT * FROM gold.loans_final", conn)
print(df.head())

## Dataset Scope Definition

In [None]:
# Keep only relevant columns for risk analysis
risk_cols = [
    'loan_amount','term','interest_rate','installment',
    'grade','sub_grade','annual_income','debt_to_income',
    'verified_income','homeownership','state',
    'application_type','loan_purpose',
    'delinq_2y','num_historical_failed_to_pay',
    'total_credit_limit','total_credit_utilized',
    'loan_status_raw','loan_status_group','loan_outcome',
    'default_flag','balance','paid_total'
]

df = df[risk_cols].copy()

## Let's do some data cleaning first 

In [None]:
# Keep only valid grades
valid_grades = ['A','B','C','D','E','F','G']
df = df[df['grade'].isin(valid_grades)]

In [None]:
df = df[df['term'].isin([36, 60])]

In [None]:
df = df[df["loan_purpose"].apply(lambda x : isinstance(x, str))]

## Feature Engineering - Core Risk Variables

### Credit Utilization Ratio

In [None]:
df["credit_utilization_ratio"] = ( 
    df["total_credit_utilized"] / df["total_credit_limit"] 
)

### Installment Burden (Affordability)

In [None]:
df['installment_to_income'] = (
    df['installment'] * 12 / df['annual_income']
)

## Multi-State Loan Performance Variable

### Define Risk State (Target Variable)

#### Modeling Choice
Rather than collapsing loan performance into a binary default/non-default variable,
we explicitly model intermediate delinquency states to better capture early risk signals
and portfolio dynamics.
HEALTHY < DELINQUENT_EARLY < DELINQUENT_LATE < DEFAULT

In [None]:
def risk_state(row):
    if row['default_flag'] == 1:
        return 'DEFAULT'
    elif row['loan_status_group'] == 'DELINQUENT_LATE':
        return 'DELINQUENT_LATE'
    elif row['loan_status_group'] == 'DELINQUENT_EARLY':
        return 'DELINQUENT_EARLY'
    else:
        return 'HEALTHY'

df['risk_state'] = df.apply(risk_state, axis=1)

### Risk State Distribution Check

In [None]:
df['risk_state'].value_counts(normalize=True)

## Outlier Detection

### Numerical Variables Summary

In [None]:
num_cols = [
    'loan_amount','interest_rate','installment',
    'annual_income','debt_to_income',
    'credit_utilization_ratio','installment_to_income'
]
df[num_cols].describe(percentiles=[.01,.05,.95,.99])

### Visual Outlier Scan

In [None]:
import matplotlib.pyplot as plt

for col in num_cols:
    plt.figure(figsize=(6,2))
    plt.boxplot(df[col].dropna(), vert=False)
    plt.title(col)
    plt.show()

## Analytical Dataset Readiness Check

In [None]:
df.isna().mean().sort_values(ascending=False)

## Outlier Treatment

In [None]:
# Identify economically invalid observations
# ============================================
# Credit risk principle:
# Extreme values are informative only if they are economically plausible.
# Technical artifacts must be removed or corrected before analysis.

# Count rows with zero or negative income (invalid for ratios)
df_clean = df.copy()
invalid_income_mask = df['annual_income'] <= 0
df.loc[invalid_income_mask, 'annual_income'].count()

In [None]:
# Drop observations with zero or negative income
# Reason: ratios using income (DTI, installment burden) become meaningless
df = df.loc[~invalid_income_mask].copy()

### Interest Rate – Technical Outliers

In [None]:
# Interest rate cleaning
# ============================================
# Lending Club interest rates are bounded in practice.
# Rates above 60% are almost certainly parsing errors.
df['interest_rate'].describe(percentiles=[.99])

In [None]:
# Cap interest_rate at the 99th percentile
interest_cap = df['interest_rate'].quantile(0.99)
df.loc[df['interest_rate'] > interest_cap, 'interest_rate'] = interest_cap

### Debt-to-Income (DTI) – Economic Bounds

In [None]:
# DTI normalization
# ============================================
# DTI above 100% indicates severe leverage, but values in the thousands are data errors.
df['debt_to_income'].describe(percentiles=[.95, .99])

In [None]:
# Cap DTI at 100%
# Justification:
   # Above 100% repayment capacity is economically exhausted
   # Higher values do not add additional risk information
df.loc[df['debt_to_income'] > 100, 'debt_to_income'] = 100

### Credit Utilization Ratio 

In [None]:
# Utilization > 1 means borrower uses more than total credit limit.
# This can happen, but extremely large values are artifacts.
df['credit_utilization_ratio'].describe(percentiles=[.95, .99])

In [None]:
# Winsorize utilization at 99th percentile
# Rationale:
   # Keep extreme borrowers
   # Remove denominator-driven explosions
util_cap = df['credit_utilization_ratio'].quantile(0.99)
df.loc[df['credit_utilization_ratio'] > util_cap, 'credit_utilization_ratio'] = util_cap

### Installment to Income

In [None]:
# Recompute installment burden safely
# ============================================
# This ratio captures monthly payment stress.
# It must be computed AFTER cleaning income.
df['installment_to_income'] = df['installment'] * 12 / df['annual_income']

In [None]:
# Cap extreme installment burdens
# Reason: above 50% income is already catastrophic risk
df.loc[df['installment_to_income'] > 0.5, 'installment_to_income'] = 0.5

## Economic Normalization

### Income Buckets

In [None]:
# Income segmentation
# ============================================
# Business goal: Risk is non-linear across income levels.
df['income_bucket'] = pd.cut(
    df['annual_income'],
    bins=[0, 30000, 60000, 100000, 200000, df['annual_income'].max()],
    labels=['LOW', 'LOW_MID', 'MID', 'UPPER_MID', 'HIGH']
)

### DTI Buckets

In [None]:
# DTI buckets
# ============================================
# Standard credit risk segmentation
df['dti_bucket'] = pd.cut(
    df['debt_to_income'],
    bins=[0, 10, 20, 35, 50, 100],
    labels=['VERY_LOW', 'LOW', 'MEDIUM', 'HIGH', 'VERY_HIGH']
)

### Credit Utilization Buckets

In [None]:
# Credit utilization buckets
# ============================================
df['utilization_bucket'] = pd.cut(
    df['credit_utilization_ratio'],
    bins=[0, 0.3, 0.6, 0.9, 1.2],
    labels=['LOW', 'MODERATE', 'HIGH', 'MAXED_OUT']
)

### Term Normalization

In [None]:
# Loan maturity normalization
df['term_group'] = df['term'].map({36: 'SHORT', 60: 'LONG'})

### Sanity Check After Cleaning

In [None]:
# Final validation
df[
    [
        'interest_rate',
        'debt_to_income',
        'credit_utilization_ratio',
        'installment_to_income'
    ]
].describe(percentiles=[.01, .05, .95, .99])

## Let's add some features we may want to use after

### Payment Burden Normalization

In [None]:
df['installment_to_credit_limit'] = df['installment'] / df['total_credit_limit']

### Credit Usage Intensity

In [None]:
df['high_credit_utilization_flag'] = (df['credit_utilization_ratio'] > 0.8).astype(int)

### Historical Risk Flags

In [None]:
df['has_past_delinquency'] = (df['delinq_2y'] > 0).astype(int)
df['has_failed_payments'] = (df['num_historical_failed_to_pay'] > 0).astype(int)

### Loan Size Relative to Income

In [None]:
df['loan_to_income'] = df['loan_amount'] / df['annual_income']

### Term Risk Encoding

In [None]:
df['is_long_term'] = (df['term'] == 60).astype(int)

### Pricing vs Risk Coherence

In [None]:
df['rate_minus_grade_avg'] = (
    df['interest_rate'] - df.groupby('grade')['interest_rate'].transform('mean')
)

# Observed Credit Risk 

## Portfolio level Observed Risk

In [None]:
# Observed risk distribution at portfolio level
risk_dist = (
    df['risk_state']
    .value_counts(normalize=True)
    .rename('proportion')
)

risk_dist

## Observed Risk by Key Segments

### By Loan Grade

In [None]:
pd_by_grade = (
    df
    .groupby(['grade', 'risk_state'])
    .size()
    .groupby(level = 0)
    .apply(lambda x: x / x.sum())
    .unstack()
    .fillna(0)
)
pd_by_grade.reset_index(level=1, drop=True)

### By Loan Term

In [None]:
pd_by_term = (
    df
    .groupby(['term', 'risk_state'])
    .size()
    .groupby(level=0)
    .apply(lambda x: x / x.sum())
    .unstack()
    .fillna(0)
)
pd_by_term.reset_index(level=1, drop=True)

### By Debt to Income Buckets

In [None]:
pd_by_dti = (
    df
    .groupby(['dti_bucket', 'risk_state'])
    .size()
    .groupby(level=0)
    .apply(lambda x: x / x.sum())
    .unstack()
    .fillna(0)
)
pd_by_dti.reset_index(level=1, drop=True)

### By Loan Purpose

In [None]:
pd_by_purpose = (
    df
    .groupby(["loan_purpose", "risk_state"])
    .size()
    .groupby(level = 0)
    .apply(lambda x: x / x.sum())
    .unstack()
    .fillna(0)
    .sort_values('DEFAULT', ascending = False)
)
pd_by_purpose.head(10).reset_index(level=1, drop=True)

# Risk Concentration & Loss Dispersion

## Risk Concentration Across Performance States

### Let's define an observed risk severity score

In [None]:
# Ordered risk severity 
risk_weights = {
    'HEALTHY': 0.0,
    'DELINQUENT_EARLY': 0.3,
    'DELINQUENT_LATE': 0.6,
    'DEFAULT': 1.0
}
df['risk_severity'] = df['risk_state'].map(risk_weights)

### Risk weighted exposure per loan

In [None]:
df['risk_exposure'] = df['loan_amount'] * df['risk_severity']

### Concentration of risk weighted exposure (Pareto logic)

In [None]:
df_sorted = df.sort_values('risk_exposure', ascending=False)

import numpy as np
df_sorted['cum_loans'] = np.arange(1, len(df_sorted) + 1) / len(df_sorted)
df_sorted['cum_risk'] = df_sorted['risk_exposure'].cumsum() / df_sorted['risk_exposure'].sum()

### Pareto interpretation

In [None]:
df_sorted[['cum_loans', 'cum_risk']].head(10)

## Dispersion of Risk Exposures

### Distribution of risk weighted exposure

In [None]:
df.loc[df['risk_exposure'] > 0, 'risk_exposure'].describe(
    percentiles=[0.5, 0.75, 0.9, 0.95, 0.99]
)

### Skewness of portfolio risk

In [None]:
df.loc[df['risk_exposure'] > 0, 'risk_exposure'].skew()

###  Key Insight — Risk Concentration
A small fraction of loans accounts for a disproportionate share of total portfolio risk.
For instance, approximately 0.1% of loans represent more than 16% of total risk exposure,
highlighting strong risk concentration even before default materializes.


## Loss Dispersion & Tail Risk

### Define realized loss per loan

In [None]:
# Loss = loan amount - total amount paid
df['realized_loss'] = df['loan_amount'] - df['paid_total']

### Keep only defaulted loans, since losses only make sense conditional on default

In [None]:
df_default = df[df['risk_state'] == 'DEFAULT'].copy()
df_default[['loan_amount', 'paid_total', 'realized_loss']].describe()

### Loss Distribution Statistics

In [None]:
# Distribution of losses across defaulted loans
loss_stats = df_default['realized_loss'].describe(
    percentiles=[0.5, 0.75, 0.9, 0.95, 0.99]
)
loss_stats

#### Skewness (tail risk indicator)

In [None]:
loss_skewness = df_default['realized_loss'].skew()
loss_skewness

### Let's visualise the distribution

In [None]:
import matplotlib.pyplot as plt

# ---------------------------------------------
# Histogram of losses
plt.figure()
plt.hist(df_default['realized_loss'], bins=50)
plt.yscale('log')
plt.xlabel('Realized Loss')
plt.ylabel('Frequency (log scale)')
plt.title('Distribution of Realized Losses (Defaulted Loans)')
plt.show()

### Loss Concetration

In [None]:
# Loss concentration: cumulative share of losses

df_loss_sorted = df_default.sort_values('realized_loss', ascending=False)

df_loss_sorted['cum_loans'] = (
    np.arange(1, len(df_loss_sorted) + 1) / len(df_loss_sorted)
)
df_loss_sorted['cum_loss'] = (
    df_loss_sorted['realized_loss'].cumsum() /
    df_loss_sorted['realized_loss'].sum()
)
df_loss_sorted[['cum_loans', 'cum_loss']].head(10)

#### Interpretation & Limitations
Losses appear moderately concentrated, with the largest defaulted loan
accounting for approximately 24% of total realized losses.
However, this analysis is based on a very small number of defaulted loans
(n = 7), which limits the statistical significance of distributional
metrics such as skewness and tail behavior.

As a result, conclusions regarding loss concentration should be interpreted
as indicative rather than structural.

# Individual Risk Estimation 

In [None]:
grade_order = {
    'A': 1,
    'B': 2,
    'C': 3,
    'D': 4,
    'E': 5,
    'F': 6,
    'G': 7
}
df['grade_num'] = df['grade'].map(grade_order)

### Let's define our model features

In [None]:
base_features = [
    'loan_amount',
    'interest_rate',
    'term',
    'installment_to_income',
    'debt_to_income',
    'credit_utilization_ratio',
    'annual_income',
    'total_credit_limit',
    'grade_num'
]
loan_purpose_features = [
    col for col in df.columns
    if col.startswith('loan_purpose_')
]
model_features = base_features + loan_purpose_features

### Baseline model — Multinomial Logistic Regression

In [None]:
risk_order = {
    'HEALTHY': 0,
    'DELINQUENT_EARLY': 1,
    'DELINQUENT_LATE': 2,
    'DEFAULT': 3
}
df['risk_target'] = df['risk_state'].map(risk_order)

In [None]:
from sklearn.model_selection import train_test_split

X = df[model_features]
y = df['risk_target']
X_train, X_test, y_train, y_test = train_test_split(
    X,
    y,
    test_size=0.3,
    random_state=42,
    stratify=y
)

In [None]:
X_train = X_train.fillna(0)
X_test  = X_test.fillna(0)

In [None]:
from sklearn.linear_model import LogisticRegression

model = LogisticRegression(
    multi_class='multinomial',
    solver='lbfgs',
    max_iter=1000
)
model.fit(X_train, y_train)

### Accuracy globale

In [None]:
from sklearn.metrics import accuracy_score

y_pred = model.predict(X_test)
accuracy = accuracy_score(y_test, y_pred)
accuracy

In [None]:
Xy_pred = model.predict(X_test)
y_proba = model.predict_proba(X_test)

In [None]:
proba_df = pd.DataFrame(
    y_proba,
    columns=['P_HEALTHY', 'P_EARLY', 'P_LATE', 'P_DEFAULT']
)
proba_df

Model validation is performed on a hold-out test set to ensure generalization.
Once validated, the model is applied to the full portfolio to estimate individual risk probabilities.

In [None]:
X_all = df[model.feature_names_in_]

# Handle missing values exactly as before
X_all = X_all.fillna(X_all.median())

## Predict risk probabilities for all loans

In [None]:
proba_all = model.predict_proba(X_all)

proba_df = pd.DataFrame(
    proba_all,
    columns=[f"P_{cls}" for cls in model.classes_],
    index=df.index
)
df = pd.concat([df, proba_df], axis=1)

In [None]:
proba_df

The multinomial logistic regression provides a loan-level probability distribution across multiple delinquency states. Rather than focusing solely on default, the model captures early and late delinquency risk, allowing a more granular and economically meaningful representation of credit risk.

### Economic Interpretation of the Model

#### Let's see the model coefficients

In [None]:
# Get the coefficients
coef_df = pd.DataFrame(
    model.coef_,
    columns=X_all.columns,
    index=model.classes_
)
coef_df

#### Let's get the coefficients, focusing on the key drivers

In [None]:
key_features = [
    'interest_rate',
    'grade_num',
    'debt_to_income',
    'credit_utilization_ratio',
    'installment_to_income',
    'loan_amount',
    'annual_income'
]
coef_df[key_features]

#### Let's identify the dominant risk drivers

In [None]:
driver_strength = coef_df.abs().mean(axis=0).sort_values(ascending=False)
driver_strength.head(10)

The model coefficients are economically coherent. Risk is primarily driven by structural loan characteristics (term), borrower leverage (debt-to-income and utilization), and underwriting signals (grade and interest rate). The multinomial structure reveals that some variables primarily explain early delinquency, while others drive late-stage default, confirming the model captures the dynamics of credit deterioration rather than a simple binary outcome.

# Espected Loss & Risk Concentration

## Define Expected Loss at loan level

In [None]:
df = df.loc[:, ~df.columns.duplicated()]

In [None]:
severity_map = {
    0: 0.00,
    1: 0.10,
    2: 0.40,
    3: 1.00
}
df['expected_loss'] = (
    df['P_0'] * severity_map[0] +
    df['P_1'] * severity_map[1] +
    df['P_2'] * severity_map[2] +
    df['P_3'] * severity_map[3]
) * df['risk_exposure']

In [None]:
df['expected_loss'].describe()

The expected loss distribution is highly skewed: while the average expected loss per loan is low, a small number of loans carry very large loss expectations, highlighting strong risk concentration.

### Expected Loss concentration

In [None]:
df_el = df.sort_values('expected_loss', ascending=False).copy()

#### Cumulative share of loans

In [None]:
df_el['cum_loans'] = np.arange(1, len(df_el) + 1) / len(df_el)

#### Cumulative share of Expected Loss

In [None]:
df_el['cum_el'] = (
    df_el['expected_loss'].cumsum()
    / df_el['expected_loss'].sum()
)

In [None]:
df_el.loc[df_el['cum_loans'] <= 0.05, 'expected_loss'].sum() / df_el['expected_loss'].sum()

In [None]:
summary_el = (
    df_el
    .groupby(pd.cut(df_el['cum_loans'], 
                     bins=[0, 0.01, 0.05, 0.10, 0.20, 1.0]))
    .agg(
        cum_loans=('cum_loans', 'max'),
        cum_el=('cum_el', 'max')
    )
)
summary_el

In [None]:
df_el[['cum_loans', 'cum_el']].head(10)

The cumulative expected loss analysis shows an extreme concentration of risk: less than 0.1% of loans account for more than 38% of total expected loss, highlighting that portfolio risk is driven by a very small subset of exposures.

Expected losses are highly concentrated: a small fraction of loans accounts for a disproportionate share of total portfolio risk, confirming that risk management should focus on loss severity rather than default counts

# Portfolio Risk Simulation

## Monte Carlo Simulation

In [None]:
proba_matrix = df[['P_0', 'P_1', 'P_2', 'P_3']].values
exposure = df['risk_exposure'].values

In [None]:
# Number of Monte Carlo simulations
n_simulations = 10000

# Uniform random draws
U = np.random.rand(n_simulations, len(df))

# Cumulative probability per loan
cdf = np.cumsum(df[['P_0', 'P_1', 'P_2', 'P_3']].values, axis=1)

# Simulated delinquency state per loan and simulation
sim_states = (U[..., None] < cdf).argmax(axis=2)

# Severity per state
severity = np.array([0.0, 0.1, 0.4, 1.0])

# Portfolio losses per simulation
losses = (severity[sim_states] * df['risk_exposure'].values).sum(axis=1)

### Distribution of Portfolio Losses

In [None]:
import pandas as pd
import numpy as np

losses = np.array(losses)
losses_series = pd.Series(losses)
losses_series.describe()

### Expected Loss

In [None]:
expected_loss_mc = losses.mean()
expected_loss_mc

### Value At Risk (VaR)

In [None]:
var_95 = np.percentile(losses, 95)
var_99 = np.percentile(losses, 99)

var_95, var_99

### Expected Shortfall (ES)

In [None]:
es_95 = losses[losses >= var_95].mean()
es_99 = losses[losses >= var_99].mean()

In [None]:
es_95

In [None]:
es_99

### Risk concentration

In [None]:
df_el = df.copy()
df_el['expected_loss'] = df['expected_loss']

df_el = df_el.sort_values('expected_loss', ascending=False)
df_el['cum_el'] = df_el['expected_loss'].cumsum() / df_el['expected_loss'].sum()
df_el['cum_loans'] = np.arange(1, len(df_el)+1) / len(df_el)

In [None]:
plt.plot(df_el['cum_loans'], df_el['cum_el'])
plt.xlabel("Cumulative share of loans")
plt.ylabel("Cumulative share of expected loss")
plt.title("Expected Loss Concentration Curve")

The portfolio loss analysis shows a highly skewed loss distribution, with a relatively moderate expected loss but a significantly heavier tail captured by VaR and Expected Shortfall measures. This indicates that while average losses remain contained, extreme scenarios can lead to materially higher portfolio losses.
The expected loss concentration curve further highlights a strong risk concentration: a very small fraction of loans accounts for the majority of total losses. This confirms that portfolio risk is not evenly distributed, but driven by a limited set of high-risk exposures.

# Stress Testing & Scenario Analysis

## Which scenarios do we have?

### Rate Shock

In [None]:
stress_df = df.copy()

stress_df['interest_rate'] *= 1.20
stress_df['debt_to_income'] *= 1.10

### Credit quality downgrade

In [None]:
stress_df['grade_num'] += 1

### Severe recession

In [None]:
stress_df['interest_rate'] *= 1.30
stress_df['debt_to_income'] *= 1.20
stress_df['credit_utilization_ratio'] *= 1.15
stress_df['grade_num'] += 2

### Probabilities under stress

In [None]:
feature_cols = X_train.columns.tolist()

In [None]:
stress_df[feature_cols] = stress_df[feature_cols].fillna(
    df[feature_cols].median()
)

In [None]:
X_stress = stress_df[feature_cols]
stress_proba = model.predict_proba(X_stress)

stress_df[['P_0_s', 'P_1_s', 'P_2_s', 'P_3_s']] = stress_proba

In [None]:
stress_df[['P_0_s', 'P_1_s', 'P_2_s', 'P_3_s']].head()

Under the stress scenario (rising interest rates, deterioration of credit quality, etc.), this loan still has approximately 96% probability of remaining healthy, but its probability of default and delinquency increases compared to the normal scenario.

### Expected Loss under stress

In [None]:
stress_df['expected_loss_s'] = (
    stress_df['P_1_s'] * 0.10 +
    stress_df['P_2_s'] * 0.40 +
    stress_df['P_3_s'] * 1.00
) * stress_df['risk_exposure']

In [None]:
stress_df[['expected_loss', 'expected_loss_s']].describe()

The stress scenario does not create losses uniformly across the portfolio. Instead, it amplifies tail risk by concentrating additional expected losses on a small subset of already vulnerable loans.

Stress testing reveals a strong non-linear effect: while most loans remain unaffected, adverse scenarios significantly increase tail losses, doubling maximum expected loss and materially raising portfolio vulnerability.

# Pricing Analysis

In this project, pricing is analyzed from a risk consistency perspective rather than an optimization perspective. The objective is not to recommend new rates, but to assess whether historical pricing adequately compensates for observed credit risk.

### Relation between Interest Rate & Risk

In [None]:
df['PD_model'] = df['P_1'] + df['P_2'] + df['P_3']

In [None]:
df[['interest_rate', 'PD_model']].corr()

### Bucketing : Pricing vs Risk

In [None]:
df['pd_bucket'] = pd.qcut(df['PD_model'], q=10, labels=False)

In [None]:
pricing_vs_risk = (
    df
    .groupby('pd_bucket')
    .agg(
        avg_pd=('PD_model', 'mean'),
        avg_rate=('interest_rate', 'mean'),
        avg_el=('expected_loss', 'mean')
    )
    .reset_index()
)

In [None]:
pricing_vs_risk['risk_adjusted_spread'] = (
    pricing_vs_risk['avg_rate'] - pricing_vs_risk['avg_el']
)

In [None]:
pricing_vs_risk.round(4)

In [None]:
pricing_vs_risk.sort_values('risk_adjusted_spread')

While interest rates increase with modeled credit risk, the increase is not proportional to the rise in expected losses. Low-risk segments appear strongly over-priced, whereas high-risk buckets exhibit margin compression, suggesting partial underpricing and increased vulnerability under adverse scenarios.