# 🏆 AI-Powered Loan Approval Prediction System
## End-to-End Machine Learning Pipeline on Databricks

---

### 📊 Project Overview

| Attribute | Value |
|-----------|-------|
| **Records** | 50,000+ |
| **Features** | 20 columns |
| **Target Variable** | `loan_status` (0 = Rejected, 1 = Approved) |
| **Use Case** | Credit Risk Assessment & Loan Approval Prediction |

### 🎯 Problem Statement

Banks and financial institutions need to automate loan approval decisions while minimizing default risk and ensuring fair lending practices.

### 🗺️ Pipeline Architecture

```
┌─────────────┐    ┌─────────────┐    ┌─────────────┐    ┌─────────────┐
│   Bronze    │───▶│   Silver    │───▶│    Gold     │───▶│  ML Model   │
│  (Raw Data) │    │ (Features)  │    │ (Analytics) │    │  (MLflow)   │
└─────────────┘    └─────────────┘    └─────────────┘    └─────────────┘
```

### 📦 Section 1: Environment Setup & Library Imports


In [0]:
# Import PySpark Libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.window import Window

In [0]:
# Import PySpark ML Libraries
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.classification import GBTClassifier, RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

In [0]:
#  Import MLflow
import mlflow
import mlflow.spark
from mlflow.models.signature import infer_signature

In [0]:
# Import visualization libraries (for Databricks display)
import matplotlib.pyplot as plt
import seaborn as sns

# Suppress warnings
import warnings
warnings.filterwarnings('ignore')

print("✅ All libraries imported successfully!")


✅ All libraries imported successfully!


## 🥉 Section 2: Bronze Layer - Raw Data Ingestion

The Bronze layer captures raw data as-is from the source with minimal transformation.

### Load Raw Data into Delta Lake

In [0]:
file_path = "/Volumes/workspace/loanapproval/loanapproval/Loan_approval_data_2025.csv"


In [0]:

# Read CSV data using PySpark
bronze_df = spark.read.csv(
    file_path, 
    header=True, 
    inferSchema=True
)

In [0]:

# Display basic info
print(f"📊 Dataset Shape: {bronze_df.count()} rows × {len(bronze_df.columns)} columns")
print(f"📊 Total Records: {bronze_df.count():,}")
print(f"📊 Total Features: {len(bronze_df.columns)}")


📊 Dataset Shape: 50000 rows × 20 columns
📊 Total Records: 50,000
📊 Total Features: 20


In [0]:
# Display schema
print("📋 Data Schema:")
bronze_df.printSchema()

# Preview data
display(bronze_df.limit(10))

📋 Data Schema:
root
 |-- customer_id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- occupation_status: string (nullable = true)
 |-- years_employed: double (nullable = true)
 |-- annual_income: integer (nullable = true)
 |-- credit_score: integer (nullable = true)
 |-- credit_history_years: double (nullable = true)
 |-- savings_assets: integer (nullable = true)
 |-- current_debt: integer (nullable = true)
 |-- defaults_on_file: integer (nullable = true)
 |-- delinquencies_last_2yrs: integer (nullable = true)
 |-- derogatory_marks: integer (nullable = true)
 |-- product_type: string (nullable = true)
 |-- loan_intent: string (nullable = true)
 |-- loan_amount: integer (nullable = true)
 |-- interest_rate: double (nullable = true)
 |-- debt_to_income_ratio: double (nullable = true)
 |-- loan_to_income_ratio: double (nullable = true)
 |-- payment_to_income_ratio: double (nullable = true)
 |-- loan_status: integer (nullable = true)



customer_id,age,occupation_status,years_employed,annual_income,credit_score,credit_history_years,savings_assets,current_debt,defaults_on_file,delinquencies_last_2yrs,derogatory_marks,product_type,loan_intent,loan_amount,interest_rate,debt_to_income_ratio,loan_to_income_ratio,payment_to_income_ratio,loan_status
CUST100000,40,Employed,17.2,25579,692,5.3,895,10820,0,0,0,Credit Card,Business,600,17.02,0.423,0.023,0.008,1
CUST100001,33,Employed,7.3,43087,627,3.5,169,16550,0,1,0,Personal Loan,Home Improvement,53300,14.1,0.384,1.237,0.412,0
CUST100002,42,Student,1.1,20840,689,8.4,17,7852,0,0,0,Credit Card,Debt Consolidation,2100,18.33,0.377,0.101,0.034,1
CUST100003,53,Student,0.5,29147,692,9.8,1480,11603,0,1,0,Credit Card,Business,2900,18.74,0.398,0.099,0.033,1
CUST100004,32,Employed,12.5,63657,630,7.2,209,12424,0,0,0,Personal Loan,Education,99600,13.92,0.195,1.565,0.522,1
CUST100005,32,Employed,13.4,32015,570,7.3,253,1120,0,0,2,Credit Card,Personal,37000,22.92,0.035,1.156,0.385,0
CUST100006,53,Employed,22.9,44989,674,11.1,19667,19298,0,0,0,Personal Loan,Home Improvement,45600,11.02,0.429,1.014,0.338,1
CUST100007,44,Self-Employed,4.2,80603,625,18.5,830,38382,0,0,0,Credit Card,Personal,51700,19.42,0.476,0.641,0.214,1
CUST100008,29,Employed,5.9,28416,569,2.6,1334,22668,1,2,0,Credit Card,Education,33800,22.72,0.798,1.189,0.396,0
CUST100009,41,Employed,7.0,70717,638,21.5,1578,21394,0,1,0,Credit Card,Personal,70000,19.35,0.303,0.99,0.33,1


### Add Metadata Columns & Save Bronze Delta Table

In [0]:
# Add metadata columns for audit trail
bronze_df_with_metadata = bronze_df \
    .withColumn("_ingestion_timestamp", F.current_timestamp()) \
    .withColumn("_source_file", F.lit("Loan_approval_data_2025.csv")) \
    .withColumn("_processing_status", F.lit("RAW"))

# Save as Delta table
bronze_table_path = "bronze_loan_applications"

bronze_df_with_metadata.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(bronze_table_path)

print(f"✅ Bronze layer Delta table created: {bronze_table_path}")


✅ Bronze layer Delta table created: bronze_loan_applications


In [0]:
# Verify Bronze table
display(spark.sql(f"SELECT COUNT(*) as total_records FROM {bronze_table_path}"))

total_records
50000


## 🥈 Section 3: Silver Layer - Data Transformation & Feature Engineering

The Silver layer cleanses data and performs feature engineering.

### Load Bronze Data

In [0]:
# Read from Bronze layer
silver_df = spark.table("bronze_loan_applications")

# Drop metadata columns for transformation
silver_df = silver_df.drop("_ingestion_timestamp", "_source_file", "_processing_status")

print(f"📊 Silver layer input: {silver_df.count()} records")

📊 Silver layer input: 50000 records


### Exploratory Data Analysis (EDA)

In [0]:
# Target distribution
print("🎯 Target Variable Distribution:")
display(
    silver_df.groupBy("loan_status")
    .agg(
        F.count("*").alias("count"),
        (F.count("*") / silver_df.count() * 100).alias("percentage")
    )
    .orderBy("loan_status")
)

🎯 Target Variable Distribution:


loan_status,count,percentage
0,22477,44.954
1,27523,55.04599999999999


In [0]:
# Numerical features statistics
numerical_cols = ['age', 'years_employed', 'annual_income', 'credit_score', 
                  'credit_history_years', 'savings_assets', 'current_debt',
                  'defaults_on_file', 'delinquencies_last_2yrs', 'derogatory_marks',
                  'loan_amount', 'interest_rate', 'debt_to_income_ratio',
                  'loan_to_income_ratio', 'payment_to_income_ratio']

print("📈 Numerical Features Statistics:")
display(silver_df.select(numerical_cols).describe())

📈 Numerical Features Statistics:


summary,age,years_employed,annual_income,credit_score,credit_history_years,savings_assets,current_debt,defaults_on_file,delinquencies_last_2yrs,derogatory_marks,loan_amount,interest_rate,debt_to_income_ratio,loan_to_income_ratio,payment_to_income_ratio
count,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0,50000.0
mean,34.95706,7.454867999999962,50062.89204,643.61482,8.168274000000027,3595.6194,14290.44222,0.05348,0.55464,0.14764,33041.874,15.498590800000075,0.2857241599999992,0.7019986599999753,0.2339949399999969
stddev,11.118602817934116,7.612096740249143,32630.501014124977,64.73151828712807,7.20755230554234,13232.39939765196,13243.75749293955,0.2249908931891252,0.8450495562835657,0.4129961763949326,26116.18510178328,4.067941970234191,0.1597865231706169,0.465787521364093,0.1552680969099407
min,18.0,0.0,15000.0,348.0,0.0,0.0,60.0,0.0,0.0,0.0,500.0,6.0,0.002,0.008,0.003
max,70.0,39.9,250000.0,850.0,30.0,300000.0,163344.0,1.0,9.0,4.0,100000.0,23.0,0.8,2.001,0.667


In [0]:
# Categorical features analysis
categorical_cols = ['occupation_status', 'product_type', 'loan_intent']

print("📊 Categorical Feature Distributions:")
for col in categorical_cols:
    display(
        silver_df.groupBy(col)
        .agg(
            F.count("*").alias("count"),
            F.round(F.avg("loan_status"), 4).alias("approval_rate")
        )
        .orderBy(F.desc("count"))
    )

📊 Categorical Feature Distributions:


occupation_status,count,approval_rate
Employed,34971,0.5436
Self-Employed,10179,0.5691
Student,4850,0.561


product_type,count,approval_rate
Credit Card,22455,0.6132
Personal Loan,17523,0.4803
Line of Credit,10022,0.5324


loan_intent,count,approval_rate
Personal,12429,0.6083
Education,10134,0.6751
Medical,7598,0.529
Business,7469,0.4434
Home Improvement,7453,0.5352
Debt Consolidation,4917,0.3665


### Feature Engineering using PySpark

In [0]:
# Drop customer_id (not useful for modeling)
if "customer_id" in silver_df.columns:
    silver_df = silver_df.drop("customer_id")

In [0]:
# 1. Age Groups
silver_df = silver_df.withColumn(
    "age_group",
    F.when(F.col("age") < 25, "Young")
    .when((F.col("age") >= 25) & (F.col("age") < 35), "Early_Career")
    .when((F.col("age") >= 35) & (F.col("age") < 50), "Mid_Career")
    .when((F.col("age") >= 50) & (F.col("age") < 60), "Senior")
    .otherwise("Elderly")
)

In [0]:
# 2. Credit Score Tiers (Industry Standard)
silver_df = silver_df.withColumn(
    "credit_tier",
    F.when(F.col("credit_score") >= 750, "Excellent")
    .when((F.col("credit_score") >= 700) & (F.col("credit_score") < 750), "Good")
    .when((F.col("credit_score") >= 650) & (F.col("credit_score") < 700), "Fair")
    .when((F.col("credit_score") >= 600) & (F.col("credit_score") < 650), "Poor")
    .otherwise("Very_Poor")
)

In [0]:
# 3. Net Worth (Assets - Debt)
silver_df = silver_df.withColumn(
    "net_worth",
    F.col("savings_assets") - F.col("current_debt")
)

In [0]:
# 4. Monthly Income
silver_df = silver_df.withColumn(
    "monthly_income",
    F.col("annual_income") / 12
)

In [0]:
# 5. Loan to Savings Ratio
silver_df = silver_df.withColumn(
    "loan_to_savings_ratio",
    F.col("loan_amount") / (F.col("savings_assets") + 1)
)

In [0]:
# 6. Debt to Assets Ratio
silver_df = silver_df.withColumn(
    "debt_to_assets_ratio",
    F.col("current_debt") / (F.col("savings_assets") + 1)
)

In [0]:
# 7. Total Risk Flags
silver_df = silver_df.withColumn(
    "total_risk_flags",
    F.col("defaults_on_file") + F.col("delinquencies_last_2yrs") + F.col("derogatory_marks")
)

In [0]:
# 8. Composite Risk Score
silver_df = silver_df.withColumn(
    "risk_score",
    (F.col("defaults_on_file") * 50) + 
    (F.col("delinquencies_last_2yrs") * 10) + 
    (F.col("derogatory_marks") * 20)
)

In [0]:
# 9. Employment Stability Score
silver_df = silver_df.withColumn(
    "employment_stability",
    F.when(F.col("age") > 0, F.col("years_employed") / F.col("age")).otherwise(0)
)

In [0]:
# 10. Income Bracket
silver_df = silver_df.withColumn(
    "income_bracket",
    F.when(F.col("annual_income") < 30000, "Low")
    .when((F.col("annual_income") >= 30000) & (F.col("annual_income") < 60000), "Lower_Middle")
    .when((F.col("annual_income") >= 60000) & (F.col("annual_income") < 100000), "Upper_Middle")
    .when((F.col("annual_income") >= 100000) & (F.col("annual_income") < 200000), "High")
    .otherwise("Very_High")
)

print("✅ Feature Engineering Complete!")
print(f"📊 New Feature Count: {len(silver_df.columns)}")

✅ Feature Engineering Complete!
📊 New Feature Count: 29


In [0]:
# Display new features
display(silver_df.select(
    "age", "age_group", 
    "credit_score", "credit_tier",
    "net_worth", "monthly_income",
    "loan_to_savings_ratio", "debt_to_assets_ratio",
    "total_risk_flags", "risk_score",
    "employment_stability", "income_bracket",
    "loan_status"
).limit(10))

age,age_group,credit_score,credit_tier,net_worth,monthly_income,loan_to_savings_ratio,debt_to_assets_ratio,total_risk_flags,risk_score,employment_stability,income_bracket,loan_status
40,Mid_Career,692,Fair,-9925,2131.583333333333,0.6696428571428571,12.075892857142858,0,0,0.43,Low,1
33,Early_Career,627,Poor,-16381,3590.583333333333,313.52941176470586,97.3529411764706,1,10,0.2212121212121212,Lower_Middle,0
42,Mid_Career,689,Fair,-7835,1736.6666666666667,116.66666666666669,436.22222222222223,0,0,0.0261904761904761,Low,1
53,Senior,692,Fair,-10123,2428.9166666666665,1.958136394328157,7.834571235651587,1,10,0.0094339622641509,Low,1
32,Early_Career,630,Poor,-12215,5304.75,474.2857142857143,59.161904761904765,0,0,0.390625,Upper_Middle,1
32,Early_Career,570,Very_Poor,-867,2667.9166666666665,145.66929133858267,4.409448818897638,2,40,0.41875,Lower_Middle,0
53,Senior,674,Fair,369,3749.0833333333335,2.318486882245272,0.9811877160870448,0,0,0.4320754716981131,Lower_Middle,1
44,Mid_Career,625,Poor,-37552,6716.916666666667,62.214199759326114,46.18772563176896,0,0,0.0954545454545454,Upper_Middle,1
29,Early_Career,569,Very_Poor,-21334,2368.0,25.318352059925093,16.979775280898878,3,70,0.2034482758620689,Low,0
41,Mid_Career,638,Poor,-19816,5893.083333333333,44.33185560481317,13.549081697276758,1,10,0.1707317073170731,Upper_Middle,1


### Save Silver Delta Table

In [0]:
# Add processing timestamp
silver_df = silver_df.withColumn("_processing_timestamp", F.current_timestamp())

# Save as Delta table
silver_table_path = "silver_loan_applications"

silver_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(silver_table_path)

print(f"✅ Silver layer Delta table created: {silver_table_path}")

✅ Silver layer Delta table created: silver_loan_applications


## 🥇 Section 4: Gold Layer - Analytics & Business Intelligence
 
The Gold layer contains aggregated, business-ready data for reporting and analytics

### Create Analytics Summary Table

In [0]:
# Load from Silver layer
gold_source_df = spark.table("silver_loan_applications")

In [0]:
# Create aggregated analytics summary
gold_analytics_df = gold_source_df.groupBy(
    "loan_intent",
    "credit_tier",
    "occupation_status",
    "income_bracket"
).agg(
    F.count("*").alias("total_applications"),
    F.sum("loan_status").alias("approved_count"),
    F.round(F.avg("loan_amount"), 2).alias("avg_loan_amount"),
    F.round(F.avg("interest_rate"), 4).alias("avg_interest_rate"),
    F.round(F.avg("credit_score"), 2).alias("avg_credit_score"),
    F.round(F.avg("risk_score"), 2).alias("avg_risk_score"),
    F.round(F.avg("debt_to_income_ratio"), 4).alias("avg_dti_ratio"),
    F.round(F.sum("loan_status") / F.count("*"), 4).alias("approval_rate")
)

In [0]:
# Save as Gold table
gold_table_path = "gold_loan_analytics"

gold_analytics_df.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(gold_table_path)

print(f"✅ Gold layer analytics table created: {gold_table_path}")

✅ Gold layer analytics table created: gold_loan_analytics


In [0]:
# Display Gold analytics
display(gold_analytics_df.orderBy(F.desc("total_applications")).limit(20))

loan_intent,credit_tier,occupation_status,income_bracket,total_applications,approved_count,avg_loan_amount,avg_interest_rate,avg_credit_score,avg_risk_score,avg_dti_ratio,approval_rate
Personal,Poor,Employed,Lower_Middle,1212,759,33216.75,16.3256,624.67,10.58,0.2846,0.6262
Personal,Fair,Employed,Lower_Middle,1056,862,32648.39,14.2396,672.81,5.01,0.283,0.8163
Personal,Very_Poor,Employed,Lower_Middle,979,169,33361.39,18.334,561.63,24.62,0.2856,0.1726
Education,Poor,Employed,Lower_Middle,957,670,33653.4,16.2201,626.1,9.26,0.2937,0.7001
Education,Fair,Employed,Lower_Middle,934,824,32514.45,14.2818,672.52,3.95,0.2817,0.8822
Personal,Very_Poor,Employed,Low,892,88,16536.77,18.2906,558.93,24.13,0.2924,0.0987
Education,Very_Poor,Employed,Lower_Middle,831,236,31766.79,18.2242,563.24,25.66,0.2908,0.284
Personal,Poor,Employed,Low,790,412,16645.32,16.5057,624.47,10.29,0.2868,0.5215
Business,Poor,Employed,Lower_Middle,727,226,33449.38,16.2922,625.16,9.77,0.2885,0.3109
Home Improvement,Poor,Employed,Lower_Middle,726,354,32282.64,16.1853,626.13,9.94,0.2824,0.4876


### Business Intelligence Reports

#### 📊 BI Report 1: Approval Rates by Credit Tier

In [0]:
approval_by_credit = gold_source_df.groupBy("credit_tier").agg(
    F.count("*").alias("total"),
    F.sum("loan_status").alias("approved"),
    F.round(F.sum("loan_status") / F.count("*") * 100, 2).alias("approval_rate_pct")
).orderBy(
    F.when(F.col("credit_tier") == "Excellent", 1)
    .when(F.col("credit_tier") == "Good", 2)
    .when(F.col("credit_tier") == "Fair", 3)
    .when(F.col("credit_tier") == "Poor", 4)
    .otherwise(5)
)

print("📊 BI Report 1: Approval Rates by Credit Tier")
display(approval_by_credit)

📊 BI Report 1: Approval Rates by Credit Tier


credit_tier,total,approved,approval_rate_pct
Excellent,2591,2312,89.23
Good,7104,6020,84.74
Fair,13417,10069,75.05
Poor,14395,7395,51.37
Very_Poor,12493,1727,13.82


#### 📊 BI Report 2: Loan Intent Analysis

In [0]:
loan_intent_analysis = gold_source_df.groupBy("loan_intent").agg(
    F.count("*").alias("total_applications"),
    F.round(F.avg("loan_amount"), 2).alias("avg_loan_amount"),
    F.round(F.avg("interest_rate"), 4).alias("avg_interest_rate"),
    F.round(F.sum("loan_status") / F.count("*") * 100, 2).alias("approval_rate_pct"),
    F.round(F.avg("risk_score"), 2).alias("avg_risk_score")
).orderBy(F.desc("total_applications"))

print("📊 BI Report 2: Loan Intent Analysis")
display(loan_intent_analysis)


📊 BI Report 2: Loan Intent Analysis


loan_intent,total_applications,avg_loan_amount,avg_interest_rate,approval_rate_pct,avg_risk_score
Personal,12429,33193.41,15.5128,60.83,11.3
Education,10134,33304.97,15.5388,67.51,11.34
Medical,7598,33027.55,15.5019,52.9,11.14
Business,7469,32983.14,15.4486,44.34,11.07
Home Improvement,7453,32817.19,15.478,53.52,10.93
Debt Consolidation,4917,32568.5,15.4817,36.65,11.11


#### 📊 BI Report 3: Risk Analysis by Age Group

In [0]:
risk_by_age = gold_source_df.groupBy("age_group").agg(
    F.count("*").alias("total"),
    F.round(F.avg("risk_score"), 2).alias("avg_risk_score"),
    F.round(F.avg("total_risk_flags"), 2).alias("avg_risk_flags"),
    F.round(F.avg("defaults_on_file"), 4).alias("avg_defaults"),
    F.round(F.sum("loan_status") / F.count("*") * 100, 2).alias("approval_rate_pct")
).orderBy(
    F.when(F.col("age_group") == "Young", 1)
    .when(F.col("age_group") == "Early_Career", 2)
    .when(F.col("age_group") == "Mid_Career", 3)
    .when(F.col("age_group") == "Senior", 4)
    .otherwise(5)
)

print("📊 BI Report 3: Risk Analysis by Age Group")
display(risk_by_age)

📊 BI Report 3: Risk Analysis by Age Group


age_group,total,avg_risk_score,avg_risk_flags,avg_defaults,approval_rate_pct
Young,10127,14.53,0.94,0.0752,30.2
Early_Career,14841,12.21,0.83,0.0574,50.14
Mid_Career,19744,9.74,0.67,0.0456,65.51
Senior,4386,7.42,0.53,0.0306,76.4
Elderly,902,5.98,0.43,0.0277,81.71


#### 📊 BI Report 4: Income vs Approval Analysis

In [0]:
income_analysis = gold_source_df.groupBy("income_bracket").agg(
    F.count("*").alias("total"),
    F.round(F.avg("annual_income"), 2).alias("avg_income"),
    F.round(F.avg("loan_amount"), 2).alias("avg_loan_amount"),
    F.round(F.avg("debt_to_income_ratio"), 4).alias("avg_dti"),
    F.round(F.sum("loan_status") / F.count("*") * 100, 2).alias("approval_rate_pct")
).orderBy(
    F.when(F.col("income_bracket") == "Low", 1)
    .when(F.col("income_bracket") == "Lower_Middle", 2)
    .when(F.col("income_bracket") == "Upper_Middle", 3)
    .when(F.col("income_bracket") == "High", 4)
    .otherwise(5)
)

print("📊 BI Report 4: Income Bracket Analysis")
display(income_analysis)

📊 BI Report 4: Income Bracket Analysis


income_bracket,total,avg_income,avg_loan_amount,avg_dti,approval_rate_pct
Low,15056,21647.05,15357.78,0.2869,45.66
Lower_Middle,21260,43326.55,32803.61,0.2848,54.88
Upper_Middle,9860,75696.15,51221.95,0.2867,62.73
High,3625,128081.81,57114.9,0.2844,72.55
Very_High,199,228358.93,57147.24,0.2708,82.91


#### 📊 BI Report 5: Cross-tabulation - Credit Tier vs Loan Intent

In [0]:
cross_tab = gold_source_df.groupBy("credit_tier", "loan_intent").agg(
    F.count("*").alias("count"),
    F.round(F.sum("loan_status") / F.count("*") * 100, 2).alias("approval_rate_pct")
).orderBy("credit_tier", "loan_intent")

print("📊 BI Report 5: Credit Tier vs Loan Intent Cross-tabulation")
display(cross_tab)

📊 BI Report 5: Credit Tier vs Loan Intent Cross-tabulation


credit_tier,loan_intent,count,approval_rate_pct
Excellent,Business,404,87.62
Excellent,Debt Consolidation,246,88.62
Excellent,Education,490,88.78
Excellent,Home Improvement,390,88.72
Excellent,Medical,403,89.83
Excellent,Personal,658,90.73
Fair,Business,1974,65.1
Fair,Debt Consolidation,1354,53.25
Fair,Education,2739,85.76
Fair,Home Improvement,2031,75.33


## 🤖 Section 5: Machine Learning Model Training with MLflow

Training a Gradient Boosted Trees classifier with MLflow tracking.

### Prepare Data for ML

In [0]:
# Load Silver data
ml_df = spark.table("silver_loan_applications")

# Drop metadata columns
ml_df = ml_df.drop("_processing_timestamp")

# Define feature columns
categorical_features = ['occupation_status', 'product_type', 'loan_intent', 
                        'age_group', 'credit_tier', 'income_bracket']

numerical_features = ['age', 'years_employed', 'annual_income', 'credit_score', 
                      'credit_history_years', 'savings_assets', 'current_debt',
                      'defaults_on_file', 'delinquencies_last_2yrs', 'derogatory_marks',
                      'loan_amount', 'interest_rate', 'debt_to_income_ratio',
                      'loan_to_income_ratio', 'payment_to_income_ratio',
                      'net_worth', 'monthly_income', 'loan_to_savings_ratio',
                      'debt_to_assets_ratio', 'total_risk_flags', 'risk_score',
                      'employment_stability']

target_col = 'loan_status'

print(f"📊 Categorical Features: {len(categorical_features)}")
print(f"📊 Numerical Features: {len(numerical_features)}")
print(f"🎯 Target: {target_col}")

📊 Categorical Features: 6
📊 Numerical Features: 22
🎯 Target: loan_status


### Build ML Pipeline

In [0]:
# Create StringIndexer for each categorical column
indexers = [
    StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
    for col in categorical_features
]

# Create OneHotEncoder for indexed columns
indexed_cols = [f"{col}_index" for col in categorical_features]
encoded_cols = [f"{col}_encoded" for col in categorical_features]

encoder = OneHotEncoder(
    inputCols=indexed_cols,
    outputCols=encoded_cols,
    handleInvalid="keep"
)

# Combine all features into a single vector
assembler_inputs = encoded_cols + numerical_features
assembler = VectorAssembler(
    inputCols=assembler_inputs,
    outputCol="features_unscaled",
    handleInvalid="skip"
)

# Scale numerical features
scaler = StandardScaler(
    inputCol="features_unscaled",
    outputCol="features",
    withStd=True,
    withMean=False
)

# Create Gradient Boosted Trees Classifier
gbt = GBTClassifier(
    labelCol=target_col,
    featuresCol="features",
    maxIter=100,
    maxDepth=5,
    seed=42
)

# Build the pipeline
pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler, gbt])

print("✅ ML Pipeline built successfully!")

✅ ML Pipeline built successfully!


### Train/Test Split

In [0]:
# Split data: 80% train, 20% test
train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)

print(f"📊 Training Set: {train_df.count():,} samples")
print(f"📊 Test Set: {test_df.count():,} samples")

📊 Training Set: 39,945 samples
📊 Test Set: 10,055 samples


### Train Model with MLflow Tracking

In [0]:
# Set MLflow experiment
mlflow.set_experiment("/Loan_Approval_Prediction")
UC_VOLUME_PATH = "/Volumes/workspace/loanapproval/loanapproval/"

# CRITICAL: Clear model cache BEFORE starting MLflow run
print("🧹 Clearing model cache before training...")
import gc

# Delete any existing model objects from global scope
if 'model' in globals():
    del model
if 'fitted_pipeline' in globals():
    del fitted_pipeline
if 'predictions' in globals():
    del predictions

# Force garbage collection
gc.collect()

# Optional: Add a small delay to ensure cleanup
import time
time.sleep(1)

# Start MLflow run
with mlflow.start_run(run_name="GBT_Loan_Approval_Model") as run:
    # Log parameters
    mlflow.log_param("model_type", "GradientBoostedTrees")
    mlflow.log_param("max_iter", 100)
    mlflow.log_param("max_depth", 5)
    mlflow.log_param("num_categorical_features", len(categorical_features))
    mlflow.log_param("num_numerical_features", len(numerical_features))
    mlflow.log_param("train_size", train_df.count())
    mlflow.log_param("test_size", test_df.count())
    
    # Train the model
    print("🔄 Training model...")
    fitted_pipeline = pipeline.fit(train_df)
    
    # Make predictions
    predictions = fitted_pipeline.transform(test_df)
    
    # Evaluate model
    binary_evaluator = BinaryClassificationEvaluator(
        labelCol=target_col,
        rawPredictionCol="rawPrediction",
        metricName="areaUnderROC"
    )
    multiclass_evaluator_acc = MulticlassClassificationEvaluator(
        labelCol=target_col,
        predictionCol="prediction",
        metricName="accuracy"
    )
    multiclass_evaluator_f1 = MulticlassClassificationEvaluator(
        labelCol=target_col,
        predictionCol="prediction",
        metricName="f1"
    )
    multiclass_evaluator_precision = MulticlassClassificationEvaluator(
        labelCol=target_col,
        predictionCol="prediction",
        metricName="weightedPrecision"
    )
    multiclass_evaluator_recall = MulticlassClassificationEvaluator(
        labelCol=target_col,
        predictionCol="prediction",
        metricName="weightedRecall"
    )
    
    # Calculate metrics
    auc_roc = binary_evaluator.evaluate(predictions)
    accuracy = multiclass_evaluator_acc.evaluate(predictions)
    f1_score = multiclass_evaluator_f1.evaluate(predictions)
    precision = multiclass_evaluator_precision.evaluate(predictions)
    recall = multiclass_evaluator_recall.evaluate(predictions)
    
    # Log metrics
    mlflow.log_metric("auc_roc", auc_roc)
    mlflow.log_metric("accuracy", accuracy)
    mlflow.log_metric("f1_score", f1_score)
    mlflow.log_metric("precision", precision)
    mlflow.log_metric("recall", recall)

    print("📝 Creating model signature...")
    signature = infer_signature(train_df.toPandas(), predictions.toPandas())
    
    # Log the model
    print("📦 Logging model to MLflow...")
    mlflow.spark.log_model(
        fitted_pipeline, 
        "loan_approval_model",
        signature=signature,
        dfs_tmpdir=UC_VOLUME_PATH
    )
    
    # Print results
    print("=" * 60)
    print("📊 MODEL EVALUATION RESULTS")
    print("=" * 60)
    print(f"🎯 AUC-ROC Score: {auc_roc:.4f}")
    print(f"📈 Accuracy: {accuracy:.4f}")
    print(f"📊 F1 Score: {f1_score:.4f}")
    print(f"🎯 Precision: {precision:.4f}")
    print(f"📈 Recall: {recall:.4f}")
    print("=" * 60)
    
    # Get run ID for model registration
    run_id = run.info.run_id
    print(f"\n✅ MLflow Run ID: {run_id}")

# Clean up after logging to free cache for next operations
# print("🧹 Cleaning up cache after model logging...")
# if 'fitted_pipeline' in globals():
#     del fitted_pipeline
# if 'model' in globals():
#     del model

# if 'predictions' in globals():
#     del predictions
gc.collect()


🧹 Clearing model cache before training...
🔄 Training model...
📝 Creating model signature...




📦 Logging model to MLflow...




📊 MODEL EVALUATION RESULTS
🎯 AUC-ROC Score: 0.9799
📈 Accuracy: 0.9217
📊 F1 Score: 0.9216
🎯 Precision: 0.9217
📈 Recall: 0.9217

✅ MLflow Run ID: c703a9223e9f4c588f4db1fab74bccf9


402

### Model Evaluation - Confusion Matrix

In [0]:
# Create confusion matrix
confusion_matrix = predictions.groupBy(target_col, "prediction").count().toPandas()
print("📊 Confusion Matrix:")
display(predictions.groupBy(target_col, "prediction").count().orderBy(target_col, "prediction"))

📊 Confusion Matrix:


loan_status,prediction,count
0,0.0,4079
0,1.0,441
1,0.0,346
1,1.0,5189


### Register Model in MLflow Registry

In [0]:
# Register the model
model_uri = f"runs:/{run_id}/loan_approval_model"
model_name = "LoanApprovalPrediction"

try:
    registered_model = mlflow.register_model(model_uri, model_name)
    print(f"✅ Model registered: {model_name}")
    print(f"📊 Model Version: {registered_model.version}")
except Exception as e:
    print(f"⚠️ Model registration skipped: {e}")

Registered model 'LoanApprovalPrediction' already exists. Creating a new version of this model...


✅ Model registered: LoanApprovalPrediction
📊 Model Version: 3


Created version '3' of model 'workspace.default.loanapprovalprediction'.


## 📊 Section 6: Batch Scoring & Predictions

Apply the trained model to score new applications.

### Generate Predictions on Full Dataset

In [0]:
# Use the model we trained (already in memory)
# For large datasets, consider loading from MLflow registry instead
final_model = fitted_pipeline
full_predictions = final_model.transform(ml_df)
# Select relevant columns
prediction_results = full_predictions.select(
    "occupation_status",
    "product_type",
    "loan_intent",
    "credit_score",
    "credit_tier",
    "annual_income",
    "loan_amount",
    "risk_score",
    target_col,
    "prediction",
    "probability"
)

print("📊 Sample Predictions:")
display(prediction_results.limit(20))

📊 Sample Predictions:


occupation_status,product_type,loan_intent,credit_score,credit_tier,annual_income,loan_amount,risk_score,loan_status,prediction,probability
Employed,Credit Card,Business,692,Fair,25579,600,0,1,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.049177713785690406"",""0.9508222862143096""]}"
Employed,Personal Loan,Home Improvement,627,Poor,43087,53300,10,0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9774092347270478"",""0.02259076527295223""]}"
Student,Credit Card,Debt Consolidation,689,Fair,20840,2100,0,1,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.07528540383969172"",""0.9247145961603083""]}"
Student,Credit Card,Business,692,Fair,29147,2900,10,1,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.07696664866856134"",""0.9230333513314386""]}"
Employed,Personal Loan,Education,630,Poor,63657,99600,0,1,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.04998822463262379"",""0.9500117753673762""]}"
Employed,Credit Card,Personal,570,Very_Poor,32015,37000,40,0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9517098994838116"",""0.04829010051618843""]}"
Employed,Personal Loan,Home Improvement,674,Fair,44989,45600,0,1,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.11409268014316769"",""0.8859073198568324""]}"
Self-Employed,Credit Card,Personal,625,Poor,80603,51700,0,1,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.2884625382906362"",""0.7115374617093638""]}"
Employed,Credit Card,Education,569,Very_Poor,28416,33800,70,0,0.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.9920771153371902"",""0.00792288466280977""]}"
Employed,Credit Card,Personal,638,Poor,70717,70000,10,1,1.0,"{""type"":""1"",""size"":null,""indices"":null,""values"":[""0.015752769936353245"",""0.9842472300636468""]}"


### Save Predictions to Gold Layer

In [0]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

# Create UDF to extract probability for class 1 (approval)
@udf(returnType=DoubleType())
def get_approval_probability(probability):
    if probability is not None:
        # For binary classification, probability[1] is the probability of class 1 (approval)
        return float(probability[1])
    return None

# Create predictions table with confidence scores
prediction_output = full_predictions.select(
    "occupation_status",
    "product_type", 
    "loan_intent",
    "credit_score",
    "credit_tier",
    "annual_income",
    "income_bracket",
    "loan_amount",
    "risk_score",
    "total_risk_flags",
    target_col,
    "prediction",
    F.round(get_approval_probability("probability"), 4).alias("approval_probability")
).withColumn("scored_at", F.current_timestamp())

# Save to Gold layer
gold_predictions_path = "gold_loan_predictions"
prediction_output.write \
    .format("delta") \
    .mode("overwrite") \
    .saveAsTable(gold_predictions_path)

print(f"✅ Predictions saved to: {gold_predictions_path}")

✅ Predictions saved to: gold_loan_predictions


### Prediction Analysis

In [0]:
# Analyze predictions - load from saved Delta table
prediction_analysis = spark.table("gold_loan_predictions").groupBy("prediction").agg(
    F.count("*").alias("count"),
    F.round(F.avg("approval_probability"), 4).alias("avg_probability"),
    F.round(F.avg("credit_score"), 2).alias("avg_credit_score"),
    F.round(F.avg("risk_score"), 2).alias("avg_risk_score")
)

print("📊 Prediction Distribution:")
display(prediction_analysis)

📊 Prediction Distribution:


prediction,count,avg_probability,avg_credit_score,avg_risk_score
1.0,28016,0.9022,673.69,4.29
0.0,21984,0.0997,605.29,19.95


## 📈 Section 7: Executive Summary & Key Insights

Business insights and recommendations based on the analysis.


### 📊 Key Metrics at a Glance

In [0]:
# Calculate key metrics
total_applications = ml_df.count()
approval_rate = ml_df.agg(F.avg("loan_status")).collect()[0][0] * 100
avg_loan_amount = ml_df.agg(F.avg("loan_amount")).collect()[0][0]
avg_credit_score = ml_df.agg(F.avg("credit_score")).collect()[0][0]
avg_risk_score = ml_df.agg(F.avg("risk_score")).collect()[0][0]

print("=" * 70)
print("📊 EXECUTIVE SUMMARY - KEY METRICS")
print("=" * 70)
print(f"📌 Total Applications Analyzed: {total_applications:,}")
print(f"📌 Overall Approval Rate: {approval_rate:.2f}%")
print(f"📌 Average Loan Amount: ${avg_loan_amount:,.2f}")
print(f"📌 Average Credit Score: {avg_credit_score:.0f}")
print(f"📌 Average Risk Score: {avg_risk_score:.2f}")
print("=" * 70)

📊 EXECUTIVE SUMMARY - KEY METRICS
📌 Total Applications Analyzed: 50,000
📌 Overall Approval Rate: 55.05%
📌 Average Loan Amount: $33,041.87
📌 Average Credit Score: 644
📌 Average Risk Score: 11.17


### 🎯 Model Performance Summary

In [0]:
print("=" * 70)
print("🤖 MODEL PERFORMANCE SUMMARY")
print("=" * 70)
print(f"📊 Model Type: Gradient Boosted Trees")
print(f"📊 AUC-ROC: {auc_roc:.4f}")
print(f"📊 Accuracy: {accuracy:.4f} ({accuracy*100:.2f}%)")
print(f"📊 Precision: {precision:.4f}")
print(f"📊 Recall: {recall:.4f}")
print(f"📊 F1-Score: {f1_score:.4f}")
print("=" * 70)

🤖 MODEL PERFORMANCE SUMMARY
📊 Model Type: Gradient Boosted Trees
📊 AUC-ROC: 0.9799
📊 Accuracy: 0.9217 (92.17%)
📊 Precision: 0.9217
📊 Recall: 0.9217
📊 F1-Score: 0.9216


### 💡 Key Business Insights

In [0]:
# Top factors affecting approval
insight_1 = gold_source_df.groupBy("credit_tier").agg(
    F.round(F.avg("loan_status") * 100, 2).alias("approval_rate")
).orderBy(F.desc("approval_rate")).collect()

print("💡 KEY BUSINESS INSIGHTS")
print("=" * 70)
print("\n1️⃣ Credit Score Impact on Approval:")
for row in insight_1:
    print(f"   • {row['credit_tier']}: {row['approval_rate']}% approval rate")

print("\n2️⃣ Risk Indicators:")
print(f"   • Applications with 0 risk flags have highest approval rates")
print(f"   • Each additional default reduces approval probability significantly")

print("\n3️⃣ Recommendations:")
print("   • Implement automated decisioning for Excellent/Good credit tiers")
print("   • Focus manual review on Fair/Poor credit applications")
print("   • Monitor risk score threshold for real-time scoring")
print("=" * 70)

💡 KEY BUSINESS INSIGHTS

1️⃣ Credit Score Impact on Approval:
   • Excellent: 89.23% approval rate
   • Good: 84.74% approval rate
   • Fair: 75.05% approval rate
   • Poor: 51.37% approval rate
   • Very_Poor: 13.82% approval rate

2️⃣ Risk Indicators:
   • Applications with 0 risk flags have highest approval rates
   • Each additional default reduces approval probability significantly

3️⃣ Recommendations:
   • Implement automated decisioning for Excellent/Good credit tiers
   • Focus manual review on Fair/Poor credit applications
   • Monitor risk score threshold for real-time scoring


## 📋 Section 8: Delta Lake Tables Summary

List all tables created in this pipeline.


In [0]:
%sql
-- Show all tables created
SHOW TABLES LIKE '*loan*'

database,tableName,isTemporary
default,bronze_loan_applications,False
default,gold_loan_analytics,False
default,gold_loan_predictions,False
default,silver_loan_applications,False


### 📊 Table Row Counts

In [0]:
# Summary of all tables
tables_summary = [
    ("bronze_loan_applications", spark.table("bronze_loan_applications").count()),
    ("silver_loan_applications", spark.table("silver_loan_applications").count()),
    ("gold_loan_analytics", spark.table("gold_loan_analytics").count()),
    ("gold_loan_predictions", spark.table("gold_loan_predictions").count())
]

print("📊 Delta Tables Summary:")
print("=" * 50)
for table_name, row_count in tables_summary:
    print(f"   {table_name}: {row_count:,} rows")
print("=" * 50)

📊 Delta Tables Summary:
   bronze_loan_applications: 50,000 rows
   silver_loan_applications: 50,000 rows
   gold_loan_analytics: 393 rows
   gold_loan_predictions: 50,000 rows


## ✅ Pipeline Complete!

 ### Pipeline Layer Executor Function
 
 Use this function to run specific layers when the notebook is scheduled as a job.
 
 **Job Configuration:**
 - Task 1: `bronze_layer` (notebook with layer="bronze")
 - Task 2: `silver_layer` (depends on Task 1, layer="silver")
 - Task 3: `gold_layer` (depends on Task 2, layer="gold")
 - Task 4: `ml_training` (depends on Task 3, layer="ml_training")
 - Task 5: `batch_scoring` (depends on Task 4, layer="batch_scoring")
 - Schedule: Weekly 2 AM

In [0]:
# Add widgets for parameters
dbutils.widgets.text("source_path", "/Volumes/workspace/loanapproval/loanapproval/Loan_approval_data_2025.csv", "Source Data Path")
dbutils.widgets.dropdown("layer", "all", ["bronze", "silver", "gold", "ml_training", "batch_scoring", "all"], "Pipeline Layer")

# Get parameter values
source = dbutils.widgets.get("source_path")
dbutils.widgets.dropdown("layer", "bronze", ["bronze","silver","gold","ml_training","batch_scoring","all"])

print(f"📁 Source Path: {source}")
print(f"🔄 Selected Layer: {layer}")

📁 Source Path: /Volumes/workspace/loanapproval/loanapproval/Loan_approval_data_2025.csv
🔄 Selected Layer: all


In [0]:
def run_layer(layer_name):
    """
    Execute a specific pipeline layer.
    
    Args:
        layer_name: One of 'bronze', 'silver', 'gold', 'ml_training', 'batch_scoring', 'all'
    
    Returns:
        dict: Status and metadata for the executed layer
    """
    result = {"layer": layer_name, "status": "pending", "tables_created": []}
    
    if layer_name == "bronze":
        # Bronze Layer: Raw data ingestion
        print("Running Bronze Layer - Raw Data Ingestion...")
        file_path = dbutils.widgets.get("source_path")
        
        bronze_df = spark.read.csv(file_path, header=True, inferSchema=True)
        bronze_df = bronze_df.withColumn("_ingestion_timestamp", F.current_timestamp())
        bronze_df = bronze_df.withColumn("_source_file", F.lit(file_path))
        
        bronze_table_path = "bronze_loan_applications"
        bronze_df.write.format("delta").mode("overwrite").saveAsTable(bronze_table_path)
        
        result["status"] = "completed"
        result["tables_created"] = [bronze_table_path]
        result["record_count"] = bronze_df.count()
        print(f"Bronze layer complete: {result['record_count']} records")
        
    elif layer_name == "silver":
        # Silver Layer: Data transformation & feature engineering
        print("Running Silver Layer - Feature Engineering...")
        
        silver_df = spark.table("bronze_loan_applications")
        
        # Drop metadata columns if present
        if "_ingestion_timestamp" in silver_df.columns:
            silver_df = silver_df.drop("_ingestion_timestamp", "_source_file")
        
        # Drop customer_id (not useful for modeling)
        if "customer_id" in silver_df.columns:
            silver_df = silver_df.drop("customer_id")
        
        # Feature Engineering
        silver_df = silver_df.withColumn("age_group",
            F.when(F.col("age") < 25, "Young")
            .when((F.col("age") >= 25) & (F.col("age") < 35), "Early_Career")
            .when((F.col("age") >= 35) & (F.col("age") < 50), "Mid_Career")
            .when((F.col("age") >= 50) & (F.col("age") < 60), "Senior")
            .otherwise("Elderly"))
        
        silver_df = silver_df.withColumn("credit_tier",
            F.when(F.col("credit_score") >= 750, "Excellent")
            .when((F.col("credit_score") >= 700) & (F.col("credit_score") < 750), "Good")
            .when((F.col("credit_score") >= 650) & (F.col("credit_score") < 700), "Fair")
            .when((F.col("credit_score") >= 600) & (F.col("credit_score") < 650), "Poor")
            .otherwise("Very_Poor"))
        
        silver_df = silver_df.withColumn("net_worth", F.col("savings_assets") - F.col("current_debt"))
        silver_df = silver_df.withColumn("monthly_income", F.col("annual_income") / 12)
        silver_df = silver_df.withColumn("loan_to_savings_ratio", F.col("loan_amount") / (F.col("savings_assets") + 1))
        silver_df = silver_df.withColumn("debt_to_assets_ratio", F.col("current_debt") / (F.col("savings_assets") + 1))
        silver_df = silver_df.withColumn("total_risk_flags", 
            F.col("defaults_on_file") + F.col("delinquencies_last_2yrs") + F.col("derogatory_marks"))
        silver_df = silver_df.withColumn("risk_score",
            (F.col("defaults_on_file") * 50) + (F.col("delinquencies_last_2yrs") * 10) + (F.col("derogatory_marks") * 20))
        silver_df = silver_df.withColumn("employment_stability",
            F.when(F.col("age") > 0, F.col("years_employed") / F.col("age")).otherwise(0))
        silver_df = silver_df.withColumn("income_bracket",
            F.when(F.col("annual_income") < 30000, "Low")
            .when((F.col("annual_income") >= 30000) & (F.col("annual_income") < 60000), "Lower_Middle")
            .when((F.col("annual_income") >= 60000) & (F.col("annual_income") < 100000), "Upper_Middle")
            .when((F.col("annual_income") >= 100000) & (F.col("annual_income") < 200000), "High")
            .otherwise("Very_High"))
        
        silver_df = silver_df.withColumn("_processing_timestamp", F.current_timestamp())
        
        silver_table_path = "silver_loan_applications"
        silver_df.write.format("delta").mode("overwrite").saveAsTable(silver_table_path)
        
        result["status"] = "completed"
        result["tables_created"] = [silver_table_path]
        result["record_count"] = silver_df.count()
        print(f"Silver layer complete: {result['record_count']} records, {len(silver_df.columns)} features")
        
    elif layer_name == "gold":
        # Gold Layer: Analytics aggregations
        print("Running Gold Layer - Analytics Aggregations...")
        
        gold_source_df = spark.table("silver_loan_applications")
        
        gold_analytics_df = gold_source_df.groupBy(
            "loan_intent", "credit_tier", "occupation_status", "income_bracket"
        ).agg(
            F.count("*").alias("total_applications"),
            F.sum("loan_status").alias("approved_count"),
            F.round(F.avg("loan_amount"), 2).alias("avg_loan_amount"),
            F.round(F.avg("interest_rate"), 4).alias("avg_interest_rate"),
            F.round(F.avg("credit_score"), 2).alias("avg_credit_score"),
            F.round(F.avg("risk_score"), 2).alias("avg_risk_score"),
            F.round(F.avg("debt_to_income_ratio"), 4).alias("avg_dti_ratio"),
            F.round(F.sum("loan_status") / F.count("*"), 4).alias("approval_rate")
        )
        
        gold_table_path = "gold_loan_analytics"
        gold_analytics_df.write.format("delta").mode("overwrite").saveAsTable(gold_table_path)
        
        result["status"] = "completed"
        result["tables_created"] = [gold_table_path]
        result["record_count"] = gold_analytics_df.count()
        print(f"Gold layer complete: {result['record_count']} aggregated records")
        
    elif layer_name == "ml_training":
        # ML Training Layer
        print("Running ML Training Layer...")
        
        ml_df = spark.table("silver_loan_applications")
        ml_df = ml_df.drop("_processing_timestamp")
        
        categorical_features = ['occupation_status', 'product_type', 'loan_intent', 
                                'age_group', 'credit_tier', 'income_bracket']
        numerical_features = ['age', 'years_employed', 'annual_income', 'credit_score', 
                              'credit_history_years', 'savings_assets', 'current_debt',
                              'defaults_on_file', 'delinquencies_last_2yrs', 'derogatory_marks',
                              'loan_amount', 'interest_rate', 'debt_to_income_ratio',
                              'loan_to_income_ratio', 'payment_to_income_ratio',
                              'net_worth', 'monthly_income', 'loan_to_savings_ratio',
                              'debt_to_assets_ratio', 'total_risk_flags', 'risk_score',
                              'employment_stability']
        target_col = 'loan_status'
        
        # Build pipeline
        indexers = [StringIndexer(inputCol=col, outputCol=f"{col}_index", handleInvalid="keep")
                    for col in categorical_features]
        indexed_cols = [f"{col}_index" for col in categorical_features]
        encoded_cols = [f"{col}_encoded" for col in categorical_features]
        
        encoder = OneHotEncoder(inputCols=indexed_cols, outputCols=encoded_cols, handleInvalid="keep")
        assembler = VectorAssembler(inputCols=encoded_cols + numerical_features, 
                                     outputCol="features_unscaled", handleInvalid="skip")
        scaler = StandardScaler(inputCol="features_unscaled", outputCol="features", withStd=True, withMean=False)
        gbt = GBTClassifier(labelCol=target_col, featuresCol="features", maxIter=100, maxDepth=5, seed=42)
        
        pipeline = Pipeline(stages=indexers + [encoder, assembler, scaler, gbt])
        
        # Train/Test Split
        train_df, test_df = ml_df.randomSplit([0.8, 0.2], seed=42)
        
        # MLflow tracking
        mlflow.set_experiment("/Loan_Approval_Prediction")
        with mlflow.start_run(run_name="GBT_Loan_Approval_Model_Job"):
            mlflow.log_param("model_type", "GradientBoostedTrees")
            mlflow.log_param("layer", "ml_training")
            
            model = pipeline.fit(train_df)
            predictions = model.transform(test_df)
            
            # Evaluate
            evaluator = BinaryClassificationEvaluator(labelCol=target_col, metricName="areaUnderROC")
            auc_roc = evaluator.evaluate(predictions)
            
            mlflow.log_metric("auc_roc", auc_roc)
            mlflow.spark.log_model(model, "loan_approval_model")
            
            run_id = mlflow.active_run().info.run_id
            
            # Register model
            try:
                mlflow.register_model(f"runs:/{run_id}/loan_approval_model", "LoanApprovalPrediction")
            except Exception as e:
                print(f"Model registration skipped: {e}")
        
        result["status"] = "completed"
        result["run_id"] = run_id
        result["auc_roc"] = auc_roc
        print(f"ML Training complete: AUC-ROC = {auc_roc:.4f}")
        
    elif layer_name == "batch_scoring":
        # Batch Scoring Layer
        print("Running Batch Scoring Layer...")
        
        ml_df = spark.table("silver_loan_applications")
        ml_df = ml_df.drop("_processing_timestamp")
        
        # Load model from registry
        model = mlflow.spark.load_model("models:/LoanApprovalPrediction/latest")
        full_predictions = model.transform(ml_df)
        
        prediction_output = full_predictions.select(
            "occupation_status", "product_type", "loan_intent",
            "credit_score", "credit_tier", "annual_income", "income_bracket",
            "loan_amount", "risk_score", "total_risk_flags",
            "loan_status", "prediction",
            F.round(F.element_at("probability", 2), 4).alias("approval_probability")
        ).withColumn("scored_at", F.current_timestamp())
        
        gold_predictions_path = "gold_loan_predictions"
        prediction_output.write.format("delta").mode("overwrite").saveAsTable(gold_predictions_path)
        
        result["status"] = "completed"
        result["tables_created"] = [gold_predictions_path]
        result["record_count"] = prediction_output.count()
        print(f"Batch Scoring complete: {result['record_count']} predictions")
        
    elif layer_name == "all":
        # Run all layers sequentially
        print("Running Complete Pipeline (all layers)...")
        for l in ["bronze", "silver", "gold", "ml_training", "batch_scoring"]:
            layer_result = run_layer(l)
            print(f"   {l}: {layer_result['status']}")
        result["status"] = "completed"
        print("Complete pipeline finished!")
    
    else:
        result["status"] = "error"
        result["error"] = f"Unknown layer: {layer_name}"
        print(f"Error: Unknown layer '{layer_name}'")
    
    return result