In [1]:
import os
import glob
import pandas as pd
import pickle
import matplotlib.pyplot as plt
import numpy as np
import random
from datetime import datetime, timedelta
from dateutil.relativedelta import relativedelta
import pprint
import pyspark
import pyspark.sql.functions as F

from pyspark.sql.functions import col
from pyspark.sql.types import StringType, IntegerType, FloatType, DateType

from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler

import xgboost as xgb
from sklearn.model_selection import RandomizedSearchCV
from sklearn.metrics import make_scorer, f1_score, roc_auc_score
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split


## set up pyspark session

In [2]:
# Initialize SparkSession
spark = pyspark.sql.SparkSession.builder \
    .appName("dev") \
    .master("local[*]") \
    .getOrCreate()

# Set log level to ERROR to hide warnings
spark.sparkContext.setLogLevel("ERROR")

The operation couldn’t be completed. Unable to locate a Java Runtime.
Please visit http://www.java.com for information on installing Java.

/opt/homebrew/anaconda3/lib/python3.12/site-packages/pyspark/bin/spark-class: line 97: CMD: bad array subscript
head: illegal line count -- -1


PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

## set up config

In [None]:
# set up config
model_train_date_str = "2024-09-01"
train_test_period_months = 12
oot_period_months = 2
train_test_ratio = 0.8

config = {}
config["model_train_date_str"] = model_train_date_str
config["train_test_period_months"] = train_test_period_months
config["oot_period_months"] =  oot_period_months
config["model_train_date"] =  datetime.strptime(model_train_date_str, "%Y-%m-%d")
config["oot_end_date"] =  config['model_train_date'] - timedelta(days = 1)
config["oot_start_date"] =  config['model_train_date'] - relativedelta(months = oot_period_months)
config["train_test_end_date"] =  config["oot_start_date"] - timedelta(days = 1)
config["train_test_start_date"] =  config["oot_start_date"] - relativedelta(months = train_test_period_months)
config["train_test_ratio"] = train_test_ratio 


pprint.pprint(config)

## get label store

In [None]:
# connect to label store
folder_path = "datamart/gold/label_store/"
files_list = [folder_path+os.path.basename(f) for f in glob.glob(os.path.join(folder_path, '*'))]
label_store_sdf = spark.read.option("header", "true").parquet(*files_list)
print("row_count:",label_store_sdf.count())

label_store_sdf.show()

In [None]:
# extract label store
labels_sdf = label_store_sdf.filter((col("snapshot_date") >= config["train_test_start_date"]) & (col("snapshot_date") <= config["oot_end_date"]))

print("extracted labels_sdf", labels_sdf.count(), config["train_test_start_date"], config["oot_end_date"])

## get features

In [None]:
feature_location = "data/feature_clickstream.csv"

# Load CSV into DataFrame - connect to feature store
features_store_sdf = spark.read.csv(feature_location, header=True, inferSchema=True)
print("row_count:",features_store_sdf.count())

features_store_sdf.show()


In [None]:
# extract label store
features_sdf = features_store_sdf.filter((col("snapshot_date") >= config["train_test_start_date"]) & (col("snapshot_date") <= config["oot_end_date"]))

print("extracted features_sdf", features_sdf.count(), config["train_test_start_date"], config["oot_end_date"])

## prepare data for modeling

In [None]:
# prepare data for modeling
data_pdf = labels_sdf.join(features_sdf, on=["Customer_ID", "snapshot_date"], how="left").toPandas()
data_pdf

In [None]:
# split data into train - test - oot
oot_pdf = data_pdf[(data_pdf['snapshot_date'] >= config["oot_start_date"].date()) & (data_pdf['snapshot_date'] <= config["oot_end_date"].date())]
train_test_pdf = data_pdf[(data_pdf['snapshot_date'] >= config["train_test_start_date"].date()) & (data_pdf['snapshot_date'] <= config["train_test_end_date"].date())]

feature_cols = [fe_col for fe_col in data_pdf.columns if fe_col.startswith('fe_')]

X_oot = oot_pdf[feature_cols]
y_oot = oot_pdf["label"]
X_train, X_test, y_train, y_test = train_test_split(
    train_test_pdf[feature_cols], train_test_pdf["label"], 
    test_size= 1 - config["train_test_ratio"],
    random_state=88,     # Ensures reproducibility
    shuffle=True,        # Shuffle the data before splitting
    stratify=train_test_pdf["label"]           # Stratify based on the label column
)


print('X_train', X_train.shape[0])
print('X_test', X_test.shape[0])
print('X_oot', X_oot.shape[0])
print('y_train', y_train.shape[0], round(y_train.mean(),2))
print('y_test', y_test.shape[0], round(y_test.mean(),2))
print('y_oot', y_oot.shape[0], round(y_oot.mean(),2))

X_train

## preprocess data

In [None]:
# set up standard scalar preprocessing
scaler = StandardScaler()

transformer_stdscaler = scaler.fit(X_train) # Q which should we use? train? test? oot? all?

# transform data
X_train_processed = transformer_stdscaler.transform(X_train)
X_test_processed = transformer_stdscaler.transform(X_test)
X_oot_processed = transformer_stdscaler.transform(X_oot)

print('X_train_processed', X_train_processed.shape[0])
print('X_test_processed', X_test_processed.shape[0])
print('X_oot_processed', X_oot_processed.shape[0])

pd.DataFrame(X_train_processed)

## train model

In [None]:
# Model setup
models = {
    "XGBoost": xgb.XGBClassifier(eval_metric='logloss', random_state=88),
    "LightGBM": lgb.LGBMClassifier(random_state=88, verbose=-1),
    "CatBoost": cb.CatBoostClassifier(random_state=88, verbose=False)
}

param_grids = {
    "XGBoost": {
        'n_estimators': [50, 100, 200],
        'max_depth': [3, 4, 5, 6],
        'learning_rate': [0.05, 0.1, 0.15, 0.2],
        'subsample': [0.7, 0.8, 0.9, 1.0],
        'colsample_bytree': [0.7, 0.8, 0.9, 1.0],
        'gamma': [0, 0.1, 0.2],
        'min_child_weight': [1, 3, 5],
        'reg_alpha': [0, 0.01, 0.1],
        'reg_lambda': [0.01, 0.1, 1]
    },
    "LightGBM": {
        'n_estimators': [50, 100, 200],
        'max_depth': [3, 4, 5, 6, -1],
        'learning_rate': [0.05, 0.1, 0.15, 0.2],
        'subsample': [0.7, 0.8, 0.9, 1.0],
        'colsample_bytree': [0.7, 0.8, 0.9, 1.0],
        'num_leaves': [31, 63, 127],
        'min_child_samples': [5, 10, 20],
        'reg_alpha': [0, 0.01, 0.1],
        'reg_lambda': [0.01, 0.1, 1]
    },
    "CatBoost": {
        'iterations': [50, 100, 200],
        'depth': [3, 4, 5, 6],
        'learning_rate': [0.05, 0.1, 0.15, 0.2],
        'subsample': [0.7, 0.8, 0.9, 1.0],
        'colsample_bylevel': [0.7, 0.8, 0.9, 1.0],
        'l2_leaf_reg': [0.1, 1, 3],
        'border_count': [32, 64, 128]
    }
}

# Hyperparameter tuning and result collection
results = []
auc_scorer = make_scorer(roc_auc_score)

for model_name, model in models.items():
    print(f"\n=== Training {model_name} ===")
    param_grid = param_grids[model_name]
    
    random_search = RandomizedSearchCV(
        estimator=model,
        param_distributions=param_grid,
        scoring=auc_scorer,
        n_iter=50,  # Reduced for faster training
        cv=3,
        verbose=1,
        random_state=42,
        n_jobs=-1
    )
    
    # Perform the random search
    random_search.fit(X_train_processed, y_train)
    
    best_model = random_search.best_estimator_
    
    # Evaluate on all sets
    train_pred = best_model.predict_proba(X_train_processed)[:, 1]
    test_pred = best_model.predict_proba(X_test_processed)[:, 1]
    oot_pred = best_model.predict_proba(X_oot_processed)[:, 1]
    
    train_auc = roc_auc_score(y_train, train_pred)
    test_auc = roc_auc_score(y_test, test_pred)
    oot_auc = roc_auc_score(y_oot, oot_pred)
    
    results.append({
        "Model": model_name,
        "BestParams": random_search.best_params_,
        "CV AUC": random_search.best_score_,
        "Train AUC": train_auc,
        "Test AUC": test_auc,
        "OOT AUC": oot_auc,
        "Train GINI": round(2*train_auc-1, 3),
        "Test GINI": round(2*test_auc-1, 3),
        "OOT GINI": round(2*oot_auc-1, 3),
        "BestModel": best_model
    })
    
    print(f"{model_name} - Test AUC: {test_auc:.4f}, OOT AUC: {oot_auc:.4f}")

# Display results
results_df = pd.DataFrame(results)
print("\n=== MODEL COMPARISON ===")
print(results_df[['Model', 'Test AUC', 'OOT AUC', 'Test GINI', 'OOT GINI']].to_string(index=False))

# Select best model based on OOT AUC
best_model_result = max(results, key=lambda x: x['OOT AUC'])
best_model = best_model_result['BestModel']
best_model_name = best_model_result['Model']

print(f"\n=== BEST MODEL: {best_model_name} ===")
print(f"OOT AUC: {best_model_result['OOT AUC']:.4f}")
print(f"OOT GINI: {best_model_result['OOT GINI']}")
print(f"Best Parameters: {best_model_result['BestParams']}")

# Set variables for artifact saving
train_auc_score = best_model_result['Train AUC']
test_auc_score = best_model_result['Test AUC'] 
oot_auc_score = best_model_result['OOT AUC']
random_search = type('obj', (object,), {'best_estimator_': best_model, 'best_params_': best_model_result['BestParams']})

## prepare model artefact to save

In [None]:
model_artefact = {}

model_artefact['model'] = best_model
model_artefact['model_version'] = "credit_model_"+config["model_train_date_str"].replace('-','_')
model_artefact['preprocessing_transformers'] = {}
model_artefact['preprocessing_transformers']['stdscaler'] = transformer_stdscaler
model_artefact['data_dates'] = config
model_artefact['data_stats'] = {}
model_artefact['data_stats']['X_train'] = X_train.shape[0]
model_artefact['data_stats']['X_test'] = X_test.shape[0]
model_artefact['data_stats']['X_oot'] = X_oot.shape[0]
model_artefact['data_stats']['y_train'] = round(y_train.mean(),2)
model_artefact['data_stats']['y_test'] = round(y_test.mean(),2)
model_artefact['data_stats']['y_oot'] = round(y_oot.mean(),2)
model_artefact['results'] = {}
model_artefact['results']['auc_train'] = train_auc_score
model_artefact['results']['auc_test'] = test_auc_score
model_artefact['results']['auc_oot'] = oot_auc_score
model_artefact['results']['gini_train'] = round(2*train_auc_score-1,3)
model_artefact['results']['gini_test'] = round(2*test_auc_score-1,3)
model_artefact['results']['gini_oot'] = round(2*oot_auc_score-1,3)
model_artefact['hp_params'] = random_search.best_params_


pprint.pprint(model_artefact)

## save artefact to model bank

In [14]:
# create model_bank dir
model_bank_directory = "model_bank/"

if not os.path.exists(model_bank_directory):
    os.makedirs(model_bank_directory)

In [None]:
# Full path to the file
file_path = os.path.join(model_bank_directory, model_artefact['model_version'] + '.pkl')

# Write the model to a pickle file
with open(file_path, 'wb') as file:
    pickle.dump(model_artefact, file)

print(f"Model saved to {file_path}")


## test load pickle and make model inference

In [None]:
# Load the model from the pickle file
with open(file_path, 'rb') as file:
    loaded_model_artefact = pickle.load(file)

y_pred_proba = loaded_model_artefact['model'].predict_proba(X_oot_processed)[:, 1]
oot_auc_score = roc_auc_score(y_oot, y_pred_proba)
print("OOT AUC score: ", oot_auc_score)

print("Model loaded successfully!")