In [1]:
import argparse
import os
import glob
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint

import pyspark
import pyspark.sql.functions as F
from pyspark.sql.functions import col

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import roc_auc_score, f1_score, accuracy_score, precision_score, recall_score

import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV

import mlflow
import mlflow.sklearn
from mlflow.models import infer_signature

In [2]:
# Initialize SparkSession optimized for 32GB RAM
spark = pyspark.sql.SparkSession.builder \
    .appName("model_training") \
    .master("local[*]") \
    .config("spark.driver.memory", "20g") \
    .config("spark.driver.maxResultSize", "8g") \
    .config("spark.sql.shuffle.partitions", "32") \
    .config("spark.default.parallelism", "16") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/09 12:58:19 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/11/09 12:58:20 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
# Set MLflow tracking URI
mlflow_tracking_uri = os.getenv('MLFLOW_TRACKING_URI', './mlruns')
mlflow.set_tracking_uri(mlflow_tracking_uri)
mlflow.set_experiment("loan_default_prediction")

print(f"MLflow tracking URI: {mlflow_tracking_uri}")

MLflow tracking URI: http://mlflow:5000


In [4]:
snapshotdate = '2024-09-01'

In [15]:
FEATURE_DIR = "/app/datamart/gold/feature_store/"
LABEL_DIR = "/app/datamart/gold/label_store/"
SILVER_DIR = "/app/datamart/silver/"
APP_STORE_PATH = "/app/datamart/gold/application_store/"

In [5]:
# --- set up config ---
model_train_date_str = snapshotdate
train_test_period_months = 12
oot_period_months = 2
train_test_ratio = 0.8

config = {}
config["model_train_date_str"] = model_train_date_str
config["train_test_period_months"] = train_test_period_months
config["oot_period_months"] = oot_period_months
config["model_train_date"] = datetime.strptime(model_train_date_str, "%Y-%m-%d")
config["oot_end_date"] = config['model_train_date'] - timedelta(days=1)
config["oot_start_date"] = config['model_train_date'] - relativedelta(months=oot_period_months)
config["train_test_end_date"] = config["oot_start_date"] - timedelta(days=1)
config["train_test_start_date"] = config["oot_start_date"] - relativedelta(months=train_test_period_months)
config["train_test_ratio"] = train_test_ratio

print("\n" + "="*80)
print("CONFIGURATION")
print("="*80)
pprint.pprint(config)
print("="*80 + "\n")


CONFIGURATION
{'model_train_date': datetime.datetime(2024, 9, 1, 0, 0),
 'model_train_date_str': '2024-09-01',
 'oot_end_date': datetime.datetime(2024, 8, 31, 0, 0),
 'oot_period_months': 2,
 'oot_start_date': datetime.datetime(2024, 7, 1, 0, 0),
 'train_test_end_date': datetime.datetime(2024, 6, 30, 0, 0),
 'train_test_period_months': 12,
 'train_test_ratio': 0.8,
 'train_test_start_date': datetime.datetime(2023, 7, 1, 0, 0)}



In [16]:
# --- get labels ---
print("Loading labels from gold layer...")
label_files_pattern = LABEL_DIR + "gold_label_store_*.parquet"
label_store_sdf = spark.read.parquet(label_files_pattern)
print(f"Total labels loaded: {label_store_sdf.count():,}")

# Filter labels by date range
labels_sdf = label_store_sdf.filter(
    (col("snapshot_date") >= config["train_test_start_date"]) & 
    (col("snapshot_date") <= config["oot_end_date"])
)
print(f"Filtered labels: {labels_sdf.count():,} (from {config['train_test_start_date']} to {config['oot_end_date']})")

Loading labels from gold layer...


                                                                                

Total labels loaded: 21,474




Filtered labels: 13,898 (from 2023-07-01 00:00:00 to 2024-08-31 00:00:00)


                                                                                

In [19]:
FEATURE_DIR = "/app/datamart/gold/feature_store"
# âœ… This automatically reads all partitions (all application_date values)
features_sdf = spark.read.parquet(FEATURE_DIR)
features_sdf.show(5)

                                                                                

+-----------+--------+------+-------------+----------------+------------------+--------------------+------------------+-------------------------------------------------+------------------------------------------------+-------------------------------------------------+------------------------------------------------+--------------------------------------------------+-------------------------------------------------+-------------------+---------------+--------------+----------------------+--------------------------------+--------------------------+-----------------------------+--------------------------+-------------------------+------------------------------------+------------------------+-----------------------+------------------------+-------------------------+-------------------------+--------------------------+-------------+----------------+----+--------------+--------------+--------------+--------------+-----------+-----------+------------+-----------+-----------+------------+-----

In [18]:


# Filter features by date range
features_sdf = features_store_sdf.filter(
    (col("snapshot_date") >= config["train_test_start_date"]) & 
    (col("snapshot_date") <= config["oot_end_date"])
)
print(f"Filtered features: {features_sdf.count():,} (from {config['train_test_start_date']} to {config['oot_end_date']})")


Loading features from gold layer...


AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/app/notebooks/datamart/gold/feature_store.

In [None]:
# --- prepare data for modeling ---
print("\nJoining features and labels...")
data_sdf = features_sdf.join(labels_sdf,["Customer_ID", "snapshot_date"],"inner")

print(f"Joined data: {data_sdf.count():,}")

# Convert to pandas
data_pdf = data_sdf.toPandas()

# Identify feature columns (exclude identifiers and label)
exclude_cols = ['loan_id', 'Customer_ID', 'application_date', 'snapshot_date', 'label', 'label_def']
feature_cols = [col for col in data_pdf.columns if col not in exclude_cols]

print(f"\nTotal feature columns: {len(feature_cols)}")
print(f"Feature columns: {feature_cols[:10]}... (showing first 10)")