In [1]:
import ast
import re
from datetime import datetime

import numpy as np
import pandas as pd
from sklearn.metrics import (
    accuracy_score,
    f1_score,
    precision_score,
    recall_score,
)
from sklearn.model_selection import GridSearchCV, train_test_split
from snowflake.ml.jobs import remote
from snowflake.ml.registry import Registry
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
from xgboost import XGBClassifier

try:
    session = get_active_session()
except:
    session = Session.builder.create()

DB = 'DEMO'
SCHEMA = 'PUBLIC'
COMPUTE_WAREHOUSE = 'DEMO_WH'
model_name = 'MORTGAGE_LENDING_MLOPS'
pool_name = "MORTGAGE_LENDING_TRAIN"
STAGE = "ML_STAGE"

session.use_database(DB)
session.use_schema(SCHEMA)
session.use_warehouse(COMPUTE_WAREHOUSE)

In [2]:
#model version function
def get_next_model_version(df, model_name):
    """
    Check the model registry for a model name and return the next version number.
    If model doesn't exist, return 'V_1'.
    If model exists, increment the highest version number by 1.
    """
    # Check if dataframe is empty or model doesn't exist
    if df.empty or df[df["name"] == model_name].empty:
        return "V_1"

    # Get the model's versions
    model_row = df[df["name"] == model_name]
    versions_str = model_row["versions"].iloc[0]

    try:
        # Parse the versions list
        versions_list = ast.literal_eval(versions_str)

        # Extract version numbers and find the highest
        version_numbers = []
        for version in versions_list:
            # Handle different version formats (V_1, V_2, etc.)
            if "_" in version:
                try:
                    # Split on last underscore and get the number part
                    parts = version.rsplit("_", 1)
                    if len(parts) == 2 and parts[1].isdigit():
                        version_numbers.append(int(parts[1]))
                except:
                    continue

        if version_numbers:
            # Get the highest version number and increment
            max_version = max(version_numbers)
            return f"V_{max_version + 1}"
        else:
            # If no valid version numbers found, start with V_1
            return "V_1"

    except Exception as e:
        print(f"Error parsing versions: {e}")
        return "V_1"


In [3]:
try:
    print("Reading table data...")
    df = session.table(f"{DB}.{SCHEMA}.MORTGAGE_LENDING_DEMO_DATA").to_pandas()
    print(df.head(5))
except:
    print("Table not found! Uploading data to snowflake table")
    df_pandas = pd.read_csv("MORTGAGE_LENDING_DEMO_DATA.csv.zip")
    session.write_pandas(
        df_pandas, table_name="MORTGAGE_LENDING_DEMO_DATA",database= DB,schema=SCHEMA, auto_create_table=True
    )
    df = session.table(f"{DB}.{SCHEMA}.MORTGAGE_LENDING_DEMO_DATA").to_pandas()
    print(df.head(5))

Reading table data...
   LOAN_ID                       TS LOAN_TYPE_NAME LOAN_PURPOSE_NAME  \
0   225846  2024-08-09 23:51:21.600  VA-guaranteed       Refinancing   
1   298793  2024-02-15 10:42:48.960  VA-guaranteed       Refinancing   
2   456295  2024-05-17 06:29:48.480   Conventional     Home purchase   
3   376334  2024-06-21 11:55:14.880    FHA-insured       Refinancing   
4   216409  2024-10-03 17:14:38.400   Conventional       Refinancing   

   APPLICANT_INCOME_000S  LOAN_AMOUNT_000S         COUNTY_NAME  \
0                    NaN               160         Erie County   
1                  109.0               255         Erie County   
2                  283.0               392  Westchester County   
3                   43.0               173       Albany County   
4                  209.0               255        Kings County   

   MORTGAGERESPONSE  
0                 1  
1                 1  
2                 1  
3                 0  
4                 1  


In [4]:
def clean_and_engineer_features(df, county_income_map=None):
    """
    Clean data and perform feature engineering for mortgage lending model.
    Works for both batch training and real-time inference.
    
    Args:
        df: pandas DataFrame with columns: LOAN_ID, TS, LOAN_TYPE_NAME, LOAN_PURPOSE_NAME,
            APPLICANT_INCOME_000S, LOAN_AMOUNT_000S, COUNTY_NAME, MORTGAGERESPONSE
        county_income_map: dict mapping COUNTY_NAME to mean income (for real-time inference)
                          If None, will compute from df (for training)
    
    Returns:
        tuple: (cleaned DataFrame, county_income_map dict)
    """
    df['TS'] = pd.to_datetime(df['TS'])
    
    current_time = datetime.now()
    df_max_time = df['TS'].max()
    
    timedelta_days_adjusted = (current_time - df_max_time).days - 1
    
    df['TIMESTAMP'] = df['TS'] + pd.Timedelta(days=timedelta_days_adjusted)
    
    median_income = df['APPLICANT_INCOME_000S'].median()
    
    df['MONTH'] = df['TIMESTAMP'].dt.month
    df['DAY_OF_YEAR'] = df['TIMESTAMP'].dt.dayofyear
    df['DOTW'] = df['TIMESTAMP'].dt.dayofweek
    
    df['MISSING_INCOME'] = df['APPLICANT_INCOME_000S'].isnull().astype(int)
    df['LOAN_AMOUNT'] = df['LOAN_AMOUNT_000S'] * 1000
    df['INCOME'] = df['APPLICANT_INCOME_000S'].fillna(median_income) * 1000
    df['INCOME_LOAN_RATIO'] = df['INCOME'] / df['LOAN_AMOUNT']
    
    if county_income_map is None:
        df['MEAN_COUNTY_INCOME'] = df.groupby("COUNTY_NAME")["INCOME"].transform('mean')
        county_income_map = df.groupby("COUNTY_NAME")["INCOME"].mean().to_dict()
    else:
        df['MEAN_COUNTY_INCOME'] = df['COUNTY_NAME'].map(county_income_map)
        overall_mean = df['INCOME'].mean()
        df['MEAN_COUNTY_INCOME'] = df['MEAN_COUNTY_INCOME'].fillna(overall_mean)
    
    df['HIGH_INCOME_FLAG'] = np.where(
        df['MISSING_INCOME'] == 1,
        0,
        (df['INCOME'] > df['MEAN_COUNTY_INCOME']).astype(int)
    )
    
    df = df.drop(columns=['TS', 'COUNTY_NAME','APPLICANT_INCOME_000S'])
    
    df = pd.get_dummies(df, columns=['LOAN_TYPE_NAME', 'LOAN_PURPOSE_NAME'], drop_first=False, dtype=int)
    df.columns = [re.sub(r'[^a-zA-Z0-9]+', '_', col.upper()) for col in df.columns]
    
    return df, county_income_map

print(f"Original shape: {df.shape}")

df_clean, county_stats = clean_and_engineer_features(df)

print(f"Cleaned shape: {df_clean.shape}")
print(f"\nMin TIMESTAMP: {df_clean['TIMESTAMP'].min()}")
print(f"Max TIMESTAMP: {df_clean['TIMESTAMP'].max()}")
print(f"\nCounty statistics computed for {len(county_stats)} counties")
print(f"\nFeatures created: {list(df_clean.columns)}")
df.head()

Original shape: (369245, 8)
Cleaned shape: (369245, 20)

Min TIMESTAMP: 2025-01-04 00:00:00
Max TIMESTAMP: 2025-12-02 07:40:13.440000

County statistics computed for 63 counties

Features created: ['LOAN_ID', 'LOAN_AMOUNT_000S', 'MORTGAGERESPONSE', 'TIMESTAMP', 'MONTH', 'DAY_OF_YEAR', 'DOTW', 'MISSING_INCOME', 'LOAN_AMOUNT', 'INCOME', 'INCOME_LOAN_RATIO', 'MEAN_COUNTY_INCOME', 'HIGH_INCOME_FLAG', 'LOAN_TYPE_NAME_CONVENTIONAL', 'LOAN_TYPE_NAME_FHA_INSURED', 'LOAN_TYPE_NAME_FSA_RHS_GUARANTEED', 'LOAN_TYPE_NAME_VA_GUARANTEED', 'LOAN_PURPOSE_NAME_HOME_IMPROVEMENT', 'LOAN_PURPOSE_NAME_HOME_PURCHASE', 'LOAN_PURPOSE_NAME_REFINANCING']


Unnamed: 0,LOAN_ID,TS,LOAN_TYPE_NAME,LOAN_PURPOSE_NAME,APPLICANT_INCOME_000S,LOAN_AMOUNT_000S,COUNTY_NAME,MORTGAGERESPONSE,TIMESTAMP,MONTH,DAY_OF_YEAR,DOTW,MISSING_INCOME,LOAN_AMOUNT,INCOME,INCOME_LOAN_RATIO,MEAN_COUNTY_INCOME,HIGH_INCOME_FLAG
0,225846,2024-08-09 23:51:21.600,VA-guaranteed,Refinancing,,160,Erie County,1,2025-08-13 23:51:21.600,8,225,2,1,160000,88000.0,0.55,81930.629669,0
1,298793,2024-02-15 10:42:48.960,VA-guaranteed,Refinancing,109.0,255,Erie County,1,2025-02-18 10:42:48.960,2,49,1,0,255000,109000.0,0.427451,81930.629669,1
2,456295,2024-05-17 06:29:48.480,Conventional,Home purchase,283.0,392,Westchester County,1,2025-05-21 06:29:48.480,5,141,2,0,392000,283000.0,0.721939,180657.260909,1
3,376334,2024-06-21 11:55:14.880,FHA-insured,Refinancing,43.0,173,Albany County,0,2025-06-25 11:55:14.880,6,176,2,0,173000,43000.0,0.248555,95647.051459,0
4,216409,2024-10-03 17:14:38.400,Conventional,Refinancing,209.0,255,Kings County,1,2025-10-07 17:14:38.400,10,280,1,0,255000,209000.0,0.819608,149160.225503,1


In [5]:
session.sql(f'''
create stage if not exists {STAGE}
            ''').collect()

[Row(status='ML_STAGE already exists, statement succeeded.')]

In [6]:
# If you do not have a compute pool create one
session.sql(
f'''CREATE COMPUTE POOL IF NOT EXISTS {pool_name}
  MIN_NODES = 1
  MAX_NODES = 4
  INSTANCE_FAMILY = CPU_X64_L
  '''
).collect()

[Row(status='MORTGAGE_LENDING_TRAIN already exists, statement succeeded.')]

#### Train single node but larger compute pool on snowflake remotely

In [7]:
@remote(compute_pool=pool_name, stage_name=STAGE, session=session)
def train_model(session: Session):
    print("Preparing features...")
    df = session.table(f"{DB}.{SCHEMA}.MORTGAGE_LENDING_DEMO_DATA").to_pandas()
    df_clean, county_stats = clean_and_engineer_features(df)

    X = df_clean.drop(['MORTGAGERESPONSE','LOAN_ID','TIMESTAMP'],axis=1)
    y = df_clean.MORTGAGERESPONSE

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.3, random_state=42, stratify=y)

    scale_pos_weight = len(y_train[y_train == 0]) / len(y_train[y_train == 1])
    print(f"Calculated scale_pos_weight: {scale_pos_weight:.3f}")

    # Hyperparameter grid - balanced for reasonable runtime with significant scaling benefit
    param_grid = {
        "max_depth": [3, 7],
        "n_estimators": [50, 100, 200],
        "learning_rate": [0.01, 0.3],
        "subsample": [0.7, 1.0],
        "colsample_bytree": [0.7, 1.0],
        "min_child_weight": [1, 3, 5],
        "gamma": [0, 0.2],
    }


    model = XGBClassifier(objective='binary:logistic', 
                        eval_metric='logloss',
                        scale_pos_weight=scale_pos_weight,
                        n_jobs=-1)

    grid_search = GridSearchCV(estimator=model, 
                            param_grid=param_grid,
                                    cv=3,
        scoring="f1_weighted",
        n_jobs=-1,  
        verbose=2,)

    grid_search.fit(X_train, y_train)


    print("Training model with hyperparameter tuning...")
    print(
        f"Testing {len(param_grid['max_depth']) * len(param_grid['n_estimators']) * len(param_grid['learning_rate']) * len(param_grid['subsample']) * len(param_grid['colsample_bytree']) * len(param_grid['min_child_weight']) * len(param_grid['gamma'])} parameter combinations"
    )
    print(f"Best parameters: {grid_search.best_params_}")
    print(f"Best CV score: {grid_search.best_score_:.4f}")

    best_model = grid_search.best_estimator_

    print("Calculating metrics...")
    y_pred = best_model.predict(X_test)

    metrics_test = {
        "Accuracy": accuracy_score(y_test, y_pred),
        "Precision": precision_score(y_test, y_pred, average="weighted"),
        "Recall": recall_score(y_test, y_pred, average="weighted"),
        "F1 Score": f1_score(y_test, y_pred, average="weighted"),
        "best_cv_score": grid_search.best_score_,
        "best_params": str(grid_search.best_params_),
    }

    y_pred_train = best_model.predict(X_train)

    metrics_train = {
        "Accuracy": accuracy_score(y_train, y_pred_train),
        "Precision": precision_score(y_train, y_pred_train, average="weighted"),
        "Recall": recall_score(y_train, y_pred_train, average="weighted"),
        "F1 Score": f1_score(y_train, y_pred_train, average="weighted"),
        "best_cv_score": grid_search.best_score_,
        "best_params": str(grid_search.best_params_),
    }

    sample_data = X.sample(n=1)
    reg = Registry(session=session, database_name= DB, schema_name= SCHEMA)
    df = reg.show_models()
    next_version = get_next_model_version(df, model_name)
    print(next_version)

    reg.log_model(
        model_name=model_name,
        version_name = next_version,
        model=best_model,
        sample_input_data= sample_data,
        metrics=metrics_test,
        target_platforms=["SNOWPARK_CONTAINER_SERVICES"]
    )

    return {"status": "Success", "metrics_test": metrics_test, "metrics_train": metrics_train}

In [8]:
train_job = train_model(session)
print("Waiting for training to complete...")
result = train_job.result()

Waiting for training to complete...


In [9]:
result

{'status': 'Success',
 'metrics_test': {'Accuracy': 0.6993969704082186,
  'Precision': 0.7746879242238848,
  'Recall': 0.6993969704082186,
  'F1 Score': 0.7223588014091902,
  'best_cv_score': 0.7262809579279604,
  'best_params': "{'colsample_bytree': 1.0, 'gamma': 0, 'learning_rate': 0.3, 'max_depth': 7, 'min_child_weight': 1, 'n_estimators': 200, 'subsample': 1.0}"},
 'metrics_train': {'Accuracy': 0.7513957078356953,
  'Precision': 0.8215894479145842,
  'Recall': 0.7513957078356953,
  'F1 Score': 0.7703184110417596,
  'best_cv_score': 0.7262809579279604,
  'best_params': "{'colsample_bytree': 1.0, 'gamma': 0, 'learning_rate': 0.3, 'max_depth': 7, 'min_child_weight': 1, 'n_estimators': 200, 'subsample': 1.0}"}}