# Loan Limit Optimization — Clean Notebook
This notebook is a runnable conversion of `loan_optimization_analysis_clean.py`.
It preserves the original logic but is split into cells for clarity: data loading, feature engineering, model training, optimization, simulation, and saving results.
Random seed is fixed (42) for reproducibility.

In [None]:
# Imports and global setup
import os
import random
from typing import Optional
import numpy as np
import pandas as pd
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score

# Reproducibility
RANDOM_SEED = 42
random.seed(RANDOM_SEED)
np.random.seed(RANDOM_SEED)

In [None]:
# Constants (kept same as original)
PROFIT_PER_INCREASE = 40
MAX_INCREASES_PER_YEAR = 6
DISCOUNT_RATE = 0.19
ELIGIBILITY_THRESHOLD_DAYS = 60

# Default file names
INPUT_FILE = 'loan_limit_increases.xlsx'
OUT_RESULTS = 'loan_optimization_results.csv'
OUT_RECOMM = 'recommended_increases.csv'
OUT_SIM = 'simulation_results.csv'

In [None]:
def load_data(path: str) -> pd.DataFrame:
    """Load Excel data and normalize header if needed.
    Returns a DataFrame with numeric conversion where appropriate.
    """
    if not os.path.exists(path):
        raise FileNotFoundError(f"Input file not found: {path}")
    df = pd.read_excel(path, skiprows=0)
    if 'Customer ID' not in df.columns:
        first_row = df.iloc[0].astype(str).str.lower()
        if first_row.str.contains('customer id').any():
            df.columns = df.iloc[0]
            df = df.iloc[1:].reset_index(drop=True)
    for col in df.columns:
        if col != 'Customer ID':
            df[col] = pd.to_numeric(df[col], errors='coerce')
    return df

In [None]:
def feature_engineering(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df['Eligible'] = (df['Days Since Last Loan'] >= ELIGIBILITY_THRESHOLD_DAYS).astype(int)
    df['Received_Increase'] = (df['No. of Increases in 2023'] > 0).astype(int)
    def assign_risk(payment_rate):
        if payment_rate >= 95: return 'Prime'
        if payment_rate >= 85: return 'Near-Prime'
        return 'Sub-Prime'
    df['Risk_Category'] = df['On-time Payments (%)'].apply(assign_risk)
    df['Loan_Size_Category'] = pd.cut(df['Initial Loan ($)'], bins=[0,1500,3000,5000], labels=['Small','Medium','Large'])
    df['Credit_Score_Proxy'] = (
        df['On-time Payments (%)'] * 0.6 +
        (df['Days Since Last Loan'] / df['Days Since Last Loan'].max() * 100) * 0.2 +
        ((df['Initial Loan ($)'] / df['Initial Loan ($)'].max()) * 100) * 0.2
    )
    df['Payment_Days_Interaction'] = df['On-time Payments (%)'] * df['Days Since Last Loan'] / 100
    df['Loan_Payment_Ratio'] = df['Initial Loan ($)'] / (df['On-time Payments (%)'] + 1)
    return df

In [None]:
def build_uptake_models(df: pd.DataFrame):
    features = ['Initial Loan ($)','Days Since Last Loan','On-time Payments (%)','Credit_Score_Proxy','Payment_Days_Interaction','Loan_Payment_Ratio']
    X = df[features]
    y = df['Received_Increase']
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=RANDOM_SEED, stratify=y)
    scaler = StandardScaler()
    X_train_sc = scaler.fit_transform(X_train)
    X_test_sc = scaler.transform(X_test)
    lr = LogisticRegression(random_state=RANDOM_SEED, max_iter=1000)
    lr.fit(X_train_sc, y_train)
    rf = RandomForestClassifier(n_estimators=100, random_state=RANDOM_SEED, max_depth=10)
    rf.fit(X_train, y_train)
    gb = GradientBoostingClassifier(n_estimators=100, random_state=RANDOM_SEED, max_depth=5)
    gb.fit(X_train, y_train)
    models = {'Logistic Regression': (lr, X_test_sc), 'Random Forest': (rf, X_test), 'Gradient Boosting': (gb, X_test)}
    best_name, best_auc, best_model = None, -1.0, None
    for name, (model, Xt) in models.items():
        try:
            proba = model.predict_proba(Xt)[:,1]
            auc = roc_auc_score(y_test, proba)
        except Exception:
            auc = 0.0
        if auc > best_auc:
            best_auc = auc
            best_name = name
            best_model = model
    if best_name == 'Logistic Regression':
        df['Uptake_Probability'] = lr.predict_proba(scaler.transform(df[features]))[:,1]
    else:
        df['Uptake_Probability'] = best_model.predict_proba(df[features])[:,1]
    return best_name, best_auc, df

In [None]:
def build_default_model(df: pd.DataFrame) -> pd.DataFrame:
    df = df.copy()
    df['Default_Risk_Score'] = (
        (100 - df['On-time Payments (%)']) * 0.5 +
        (100 - df['Credit_Score_Proxy']) * 0.3 +
        (df['Initial Loan ($)'] / df['Initial Loan ($)'].max() * 100) * 0.2
    )
    df['Default_Probability'] = 1 / (1 + np.exp(-0.1 * (df['Default_Risk_Score'] - 50)))
    df['Adjusted_Default_Probability'] = df['Default_Probability'] * (1 + 0.05 * df['No. of Increases in 2023'])
    df['Adjusted_Default_Probability'] = df['Adjusted_Default_Probability'].clip(0, 0.95)
    return df

def calculate_expected_value(row):
    uptake_prob = row['Uptake_Probability']
    default_prob = row['Adjusted_Default_Probability']
    expected_profit = PROFIT_PER_INCREASE * uptake_prob * (1 - default_prob)
    expected_loss = row['Initial Loan ($)'] * 0.5 * uptake_prob * default_prob
    return expected_profit - expected_loss

In [None]:
def optimize_loan_increases(df_input, max_high_risk_pct=0.25, capital_constraint=None):
    eligible = df_input[df_input['Eligible'] == 1].copy()
    eligible = eligible.sort_values('Risk_Adjusted_Score', ascending=False)
    eligible['Recommended_Increases'] = 0
    eligible['Total_Expected_Value'] = 0.0
    total_value = total_exposure = 0.0
    high_risk_count = total_approvals = 0
    for idx, row in eligible.iterrows():
        is_high_risk = row['Risk_Category'] == 'Sub-Prime'
        if is_high_risk and high_risk_count >= len(eligible) * max_high_risk_pct:
            continue
        if row['Expected_Value'] <= 0:
            continue
        optimal_increases = min(MAX_INCREASES_PER_YEAR, int(row['Uptake_Probability'] * MAX_INCREASES_PER_YEAR) + 1)
        if capital_constraint:
            projected_exposure = row['Initial Loan ($)'] * optimal_increases * 0.5
            if total_exposure + projected_exposure > capital_constraint:
                continue
        eligible.at[idx, 'Recommended_Increases'] = optimal_increases
        eligible.at[idx, 'Total_Expected_Value'] = float(row['Expected_Value']) * optimal_increases
        total_value += eligible.at[idx, 'Total_Expected_Value']
        total_exposure += row['Initial Loan ($)'] * optimal_increases * 0.5
        if is_high_risk:
            high_risk_count += 1
        total_approvals += 1
    return {'eligible_df': eligible, 'total_expected_value': total_value, 'total_approvals': total_approvals, 'total_exposure': total_exposure, 'high_risk_count': high_risk_count, 'high_risk_pct': high_risk_count / total_approvals if total_approvals > 0 else 0}

In [None]:
def simulate_loan_lifecycle(customer_row, n_simulations=100, time_periods=4):
    results = []
    transition_matrix = np.array([[0.85,0.12,0.03],[0.15,0.7,0.15],[0.05,0.25,0.7]])
    risk_states = ['Prime','Near-Prime','Sub-Prime']
    for sim in range(n_simulations):
        total_profit = total_losses = 0.0
        defaults = increases = 0
        current_risk_state = customer_row['Risk_Category']
        for quarter in range(time_periods):
            if quarter > 0:
                state_idx = {'Prime':0,'Near-Prime':1,'Sub-Prime':2}[current_risk_state]
                probs = transition_matrix[state_idx]
                next_state_idx = np.random.choice(3, p=probs)
                current_risk_state = risk_states[next_state_idx]
            risk_multiplier = {'Prime':0.8,'Near-Prime':1.0,'Sub-Prime':1.3}[current_risk_state]
            adjusted_default_prob = min(customer_row['Default_Probability'] * risk_multiplier, 0.95)
            accepts = np.random.random() < customer_row['Uptake_Probability']
            if accepts:
                increases += 1
                if np.random.random() < adjusted_default_prob:
                    defaults += 1
                    total_losses += customer_row['Initial Loan ($)'] * 0.5
                else:
                    total_profit += PROFIT_PER_INCREASE
        results.append({'simulation': sim, 'total_profit': total_profit, 'total_losses': total_losses, 'net_value': total_profit - total_losses, 'defaults': defaults, 'increases_granted': increases})
    return pd.DataFrame(results)

def calculate_npv(cash_flows, discount_rate=DISCOUNT_RATE):
    npv = 0.0
    for t, cf in enumerate(cash_flows):
        npv += cf / ((1 + discount_rate) ** (t / 4))
    return npv

In [None]:
# === RUN PIPELINE (example parameters) ===
# Parameters: adjust as needed inside the notebook before re-running cells
SIM_CUSTOMER_COUNT = 1000
N_SIMULATIONS_PER_CUSTOMER = 100
SIM_QUARTERS = 4

# Load and prepare data
df = load_data(INPUT_FILE)
print('Dataset Shape:', df.shape)
df = feature_engineering(df)
print('Eligible Customers (>=60 days):', int(df['Eligible'].sum()))

# Models
best_name, best_auc, df = build_uptake_models(df)
print('Best uptake model:', best_name, 'AUC=', round(best_auc,4))
df = build_default_model(df)

# Expected values and risk-adjusted score
df['Expected_Value'] = df.apply(calculate_expected_value, axis=1)
df['Risk_Adjusted_Score'] = df['Expected_Value'] * (1 - df['Adjusted_Default_Probability']) * df['Uptake_Probability']

# Optimization
optimization_results = optimize_loan_increases(df, max_high_risk_pct=0.25)
print('Approved for Increases:', optimization_results['total_approvals'])
print('Total Expected Value: $', round(optimization_results['total_expected_value'],2))

# Monte Carlo sampling
eligible_customers = df[df['Eligible'] == 1]
available = len(eligible_customers)
if SIM_CUSTOMER_COUNT <= available:
    sample_customers = eligible_customers.sample(n=SIM_CUSTOMER_COUNT, random_state=RANDOM_SEED)
else:
    sample_customers = eligible_customers.sample(n=SIM_CUSTOMER_COUNT, replace=True, random_state=RANDOM_SEED)

sim_list = []
for _, row in sample_customers.iterrows():
    sim_df = simulate_loan_lifecycle(row, n_simulations=N_SIMULATIONS_PER_CUSTOMER, time_periods=SIM_QUARTERS)
    sim_df['customer_id'] = row['Customer ID']
    sim_df['risk_category'] = row['Risk_Category']
    sim_list.append(sim_df)
all_simulations = pd.concat(sim_list, ignore_index=True)

print('Total Simulation Runs:', SIM_CUSTOMER_COUNT * N_SIMULATIONS_PER_CUSTOMER)
print('Total Individual Decisions:', SIM_CUSTOMER_COUNT * N_SIMULATIONS_PER_CUSTOMER * SIM_QUARTERS)
print('Simulation sample mean net value:', round(all_simulations.groupby('customer_id')['net_value'].mean().mean(),2))

# Save outputs
df.to_csv(OUT_RESULTS, index=False)
optimization_results['eligible_df'][optimization_results['eligible_df']['Recommended_Increases'] > 0][['Customer ID','Risk_Category','On-time Payments (%)','Initial Loan ($)','Uptake_Probability','Default_Probability','Expected_Value','Recommended_Increases','Total_Expected_Value']].to_csv(OUT_RECOMM, index=False)
all_simulations.to_csv(OUT_SIM, index=False)
print('Saved outputs:', OUT_RESULTS, OUT_RECOMM, OUT_SIM)