# ❄️ End-to-end ML Demo ❄️

In this worfklow we will work through the following elements of a typical tabular machine learning pipeline.

### 1. Use Feature Store to track engineered features
* Store feature defintions in feature store for reproducible computation of ML features
      
### 2. Train two Models using the Snowflake ML APIs
* Baseline XGboost
* XGboost with optimal hyper-parameters identified via Snowflake ML distributed HPO methods

### 3. Register both models in Snowflake model registry
* Explore model registry capabilities such as **metadata tracking, inference, and explainability**
* Compare model metrics on train/test set to identify any issues of model performance or overfitting
* Tag the best performing model version as 'default' version
### 4. Set up Model Monitor to track 1 year of predicted and actual loan repayments
* **Compute performance metrics** such a F1, Precision, Recall
* **Inspect model drift** (i.e. how much has the average predicted repayment rate changed day-to-day)
* **Compare models** side-by-side to understand which model should be used in production
* Identify and understand **data issues**

### 5. Track data and model lineage throughout
* View and understand
  * The **origin of the data** used for computed features
  * The **data used** for model training
  * The **available model versions** being monitored

In [1]:
!pip install shap

Collecting shap
  Downloading shap-0.47.0-cp39-cp39-macosx_11_0_arm64.whl.metadata (24 kB)
Collecting tqdm>=4.27.0 (from shap)
  Using cached tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting slicer==0.0.8 (from shap)
  Downloading slicer-0.0.8-py3-none-any.whl.metadata (4.0 kB)
Collecting numba>=0.54 (from shap)
  Downloading numba-0.60.0-cp39-cp39-macosx_11_0_arm64.whl.metadata (2.7 kB)
Collecting llvmlite<0.44,>=0.43.0dev0 (from numba>=0.54->shap)
  Downloading llvmlite-0.43.0-cp39-cp39-macosx_11_0_arm64.whl.metadata (4.8 kB)
Downloading shap-0.47.0-cp39-cp39-macosx_11_0_arm64.whl (532 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m532.8/532.8 kB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading slicer-0.0.8-py3-none-any.whl (15 kB)
Downloading numba-0.60.0-cp39-cp39-macosx_11_0_arm64.whl (2.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.7/2.7 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m

In [2]:
#Update this VERSION_NUM to version your features, models etc!
VERSION_NUM = '0'

In [5]:
import pandas as pd
import numpy as np
import sklearn
import math
import pickle
import shap
from datetime import datetime
import streamlit as st
from xgboost import XGBClassifier

# Snowpark ML
from snowflake.ml.registry import Registry
#from snowflake.ml.modeling.tune import get_tuner_context
#from snowflake.ml.modeling import tune
#from entities import search_algorithm

#Snowflake feature store
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity, CreationMode

# Snowpark session
from snowflake.snowpark import DataFrame
from snowflake.snowpark.functions import col, to_timestamp, min, max, month, dayofmonth, dayofweek, dayofyear, avg, median, date_add
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark import Window

#setup snowpark session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.session import Session
from snowflake.ml.utils.connection_params import SnowflakeLoginOptions
session = Session.builder.configs(SnowflakeLoginOptions()).create()
session

SnowflakeLoginOptions() is in private preview since 0.2.0. Do not use it in production. 


<snowflake.snowpark.session.Session at 0x375ac7ee0>

In [22]:
try:
    print("Reading table data...")
    df = session.table("MORTGAGE_LENDING_DEMO_DATA")
    df.show(5)
except:
    print("Table not found! Uploading data to snowflake table")
    df_pandas = pd.read_csv("MORTGAGE_LENDING_DEMO_DATA.csv.zip")
    session.write_pandas(df_pandas, "MORTGAGE_LENDING_DEMO_DATA", auto_create_table=True)
    df = session.table("MORTGAGE_LENDING_DEMO_DATA")
    df.show(5)

Reading table data...
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"LOAN_ID"  |"TS"                     |"LOAN_TYPE_NAME"  |"LOAN_PURPOSE_NAME"  |"APPLICANT_INCOME_000S"  |"LOAN_AMOUNT_000S"  |"COUNTY_NAME"       |"MORTGAGERESPONSE"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|225846     |2024-08-09 23:51:21.600  |VA-guaranteed     |Refinancing          |NULL                     |160                 |Erie County         |1                   |
|298793     |2024-02-15 10:42:48.960  |VA-guaranteed     |Refinancing          |109.0                    |255                 |Erie County         |1                   |
|456295     |2024-05-17 06:29:48.480  |Conventional      |Home purchase        |283.0                    |392                 |W

## Observe Snowflake Snowpark table properties

In [23]:
df.count()

369245

In [8]:
df.select(min('TS'), max('TS')).show()

-----------------------------------------------------
|"MIN(""TS"")"            |"MAX(""TS"")"            |
-----------------------------------------------------
|2024-01-01 00:00:00.000  |2024-11-28 07:40:13.440  |
-----------------------------------------------------



In [9]:
#Get current date and time
current_time = datetime.now()
# df_max_time = datetime.strptime(df.select(max("TS")).collect()[0][0], "%Y-%m-%d %H:%M:%S.%f")
df_max_time = datetime.strptime(str(df.select(max("TS")).collect()[0][0]), "%Y-%m-%d %H:%M:%S.%f")
df_max_time

datetime.datetime(2024, 11, 28, 7, 40, 13, 440000)

In [10]:
#Find delta between latest existing timestamp and today's date
timedelta = current_time- df_max_time
timedelta

datetime.timedelta(days=109, seconds=30598, microseconds=770251)

In [11]:
df.select(min(date_add(to_timestamp("TS"), timedelta.days-1)), max(date_add(to_timestamp("TS"), timedelta.days-1))).show()

-------------------------------------------------------------------------------------------------------
|"MIN(DATEADD('DAY', 108, TO_TIMESTAMP(""TS"")))"  |"MAX(DATEADD('DAY', 108, TO_TIMESTAMP(""TS"")))"  |
-------------------------------------------------------------------------------------------------------
|2024-04-18 00:00:00                               |2025-03-16 07:40:13.440000                        |
-------------------------------------------------------------------------------------------------------



In [None]:
#Get current date and time
current_time = datetime.now()
# df_max_time = datetime.strptime(df.select(max("TS")).collect()[0][0], "%Y-%m-%d %H:%M:%S.%f")
df_max_time = datetime.strptime(str(df.select(max("TS")).collect()[0][0]), "%Y-%m-%d %H:%M:%S.%f")

#Find delta between latest existing timestamp and today's date
timedelta = current_time- df_max_time

#Update timestamps to represent last ~1 year from today's date
df.select(min(date_add(to_timestamp("TS"), timedelta.days-1)), max(date_add(to_timestamp("TS"), timedelta.days-1)))

## Feature Engineering with Snowpark APIs

In [12]:
#Create a dict with keys for feature names and values containing transform code

feature_eng_dict = dict()

#Timstamp features
feature_eng_dict["TIMESTAMP"] = date_add(to_timestamp("TS"), timedelta.days-1)
feature_eng_dict["MONTH"] = month("TIMESTAMP")
feature_eng_dict["DAY_OF_YEAR"] = dayofyear("TIMESTAMP") 
feature_eng_dict["DOTW"] = dayofweek("TIMESTAMP")

# df= df.with_columns(feature_eng_dict.keys(), feature_eng_dict.values())

#Income and loan features
feature_eng_dict["LOAN_AMOUNT"] = col("LOAN_AMOUNT_000s")*1000
feature_eng_dict["INCOME"] = col("APPLICANT_INCOME_000s")*1000
feature_eng_dict["INCOME_LOAN_RATIO"] = col("INCOME")/col("LOAN_AMOUNT")

county_window_spec = Window.partition_by("COUNTY_NAME")
feature_eng_dict["MEDIAN_COUNTY_INCOME"] = median("INCOME").over(county_window_spec)
feature_eng_dict["HIGH_INCOME_FLAG"] = (col("INCOME")>col("MEDIAN_COUNTY_INCOME")).astype(IntegerType())

day_window_spec = Window.order_by("DAY_OF_YEAR").rows_between(-30,0)
feature_eng_dict["AVG_THIRTY_DAY_LOAN_AMOUNT"] =  avg("LOAN_AMOUNT").over(day_window_spec)

df = df.with_columns(feature_eng_dict.keys(), feature_eng_dict.values())
df.show(3)

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"LOAN_ID"  |"TS"                     |"LOAN_TYPE_NAME"  |"LOAN_PURPOSE_NAME"  |"APPLICANT_INCOME_000S"  |"LOAN_AMOUNT_000S"  |"COUNTY_NAME"  |"MORTGAGERESPONSE"  |"TIMESTAMP"                 |"MONTH"  |"DAY_OF_YEAR"  |"DOTW"  |"LOAN_AMOUNT"  |"INCOME"  |"INCOME_LOAN_RATIO"  |"MEDIAN_COUNTY_INCOME"  |"HIGH_INCOME_FLAG"  |"AVG_THIRTY_DAY_LOAN_AMOUNT"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [33]:
def create_mortgage_features(df):
    # Step 1: Timestamp features (Per-row features)
    df = df.with_columns(
        ["TIMESTAMP", "MONTH", "DAY_OF_YEAR", "DOTW"],
        [
            date_add(to_timestamp("TS"), timedelta.days-1),
            month("TIMESTAMP"),
            dayofyear("TIMESTAMP"),
            dayofweek("TIMESTAMP")
        ]
    )
    
    # Step 2: Income and loan features (Per-row features)
    df = df.with_columns(
        ["LOAN_AMOUNT", "INCOME", "INCOME_LOAN_RATIO"],
        [
            col("LOAN_AMOUNT_000s")*1000,
            col("APPLICANT_INCOME_000s")*1000,
            col("INCOME")/col("LOAN_AMOUNT")
        ]
    )
    
    # Step 3: County-level income stats (Per-group features)
    county_income_df = df.group_by(["COUNTY_NAME"]).agg(
        median("INCOME").alias("MEDIAN_COUNTY_INCOME")
    )
    # Join back to the original dataframe
    df = df.join(county_income_df, "COUNTY_NAME")
    
    # Step 4: Add high income flag
    df = df.with_column(
        "HIGH_INCOME_FLAG", 
        (col("INCOME") > col("MEDIAN_COUNTY_INCOME")).astype(IntegerType())
    )
    
    # Step 5: Time-based rolling average
    df = df.analytics.time_series_agg(
        aggs={"LOAN_AMOUNT": ["AVG"]},
        windows=["-30D"],
        sliding_interval="1D",
        time_col="TIMESTAMP",
        group_by=["COUNTY_NAME"] 
    )
    df = df.rename("LOAN_AMOUNT_AVG_-30D", "AVG_THIRTY_DAY_LOAN_AMOUNT")

    # Step 6: Select only ID, timestamp, and engineered features
    feature_df = df.select(
        ["LOAN_ID", "TIMESTAMP", "MONTH", "DAY_OF_YEAR", "DOTW", 
         "LOAN_AMOUNT", "INCOME", "INCOME_LOAN_RATIO", 
         "MEDIAN_COUNTY_INCOME", "HIGH_INCOME_FLAG", 
         "AVG_THIRTY_DAY_LOAN_AMOUNT"]
    )
    
    return feature_df

feature_df = create_mortgage_features(df)

In [None]:
feature_df.count()

In [13]:
df.show(20)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"LOAN_ID"  |"TS"                     |"LOAN_TYPE_NAME"    |"LOAN_PURPOSE_NAME"  |"APPLICANT_INCOME_000S"  |"LOAN_AMOUNT_000S"  |"COUNTY_NAME"       |"MORTGAGERESPONSE"  |"TIMESTAMP"                 |"MONTH"  |"DAY_OF_YEAR"  |"DOTW"  |"LOAN_AMOUNT"  |"INCOME"  |"INCOME_LOAN_RATIO"  |"MEDIAN_COUNTY_INCOME"  |"HIGH_INCOME_FLAG"  |"AVG_THIRTY_DAY_LOAN_AMOUNT"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [26]:
print(df.explain())

---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT  *  FROM MORTGAGE_LENDING_DEMO_DATA
Logical Execution Plan:
GlobalStats:
    partitionsTotal=1
    partitionsAssigned=1
    bytesAssigned=4618240
Operations:
1:0     ->Result  MORTGAGE_LENDING_DEMO_DATA.LOAN_ID, MORTGAGE_LENDING_DEMO_DATA.TS, MORTGAGE_LENDING_DEMO_DATA.LOAN_TYPE_NAME, MORTGAGE_LENDING_DEMO_DATA.LOAN_PURPOSE_NAME, MORTGAGE_LENDING_DEMO_DATA.APPLICANT_INCOME_000S, MORTGAGE_LENDING_DEMO_DATA.LOAN_AMOUNT_000S, MORTGAGE_LENDING_DEMO_DATA.COUNTY_NAME, MORTGAGE_LENDING_DEMO_DATA.MORTGAGERESPONSE  
1:1          ->TableScan  ML_DATASETS.PUBLIC.MORTGAGE_LENDING_DEMO_DATA  LOAN_ID, TS, LOAN_TYPE_NAME, LOAN_PURPOSE_NAME, APPLICANT_INCOME_000S, LOAN_AMOUNT_000S, COUNTY_NAME, MORTGAGERESPONSE  {partitionsTotal=1, partitionsAssigned=1, bytesAssigned=4618240}

--------------------------------------------
None


## Create a Snowflake Feature Store

In [27]:
fs = FeatureStore(
    session=session, 
    database=session.get_current_database(), 
    name=session.get_current_schema(), 
    default_warehouse=session.get_current_warehouse(),
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

In [28]:
fs.list_entities()

<snowflake.snowpark.dataframe.DataFrame at 0x3200aa370>

## Feature Store configuration
- create/register entities of interest

In [29]:
#First try to retrieve an existing entity definition, if not define a new one and register
try:
    #retrieve existing entity
    loan_id_entity = fs.get_entity('LOAN_ENTITY') 
    print('Retrieved existing entity')
except:
#define new entity
    loan_id_entity = Entity(
        name = "LOAN_ENTITY",
        join_keys = ["LOAN_ID"],
        desc = "Features defined on a per loan level")
    #register
    fs.register_entity(loan_id_entity)
    print("Registered new entity")

Registered new entity


In [32]:
#Create a dataframe with just the ID, timestamp, and engineered features. We will use this to define our feature view
#feature_df = df.select(["LOAN_ID"]+list(feature_eng_dict.keys()))
feature_df.show(5)

AttributeError: 'NoneType' object has no attribute 'show'

Here, the feature store references an existing table. 

We could also define the dataframe via the use of Snowpark APIs, and use that dataframe (or a function that returns a dataframe) as the feature view definition, below.

In [None]:
#define and register feature view
loan_fv = FeatureView(
    name="Mortgage_Feature_View",
    entities=[loan_id_entity],
    feature_df=feature_df,
    timestamp_col="TIMESTAMP",
    refresh_freq="1 day")

#add feature level descriptions

loan_fv = loan_fv.attach_feature_desc(
    {
        "MONTH": "Month of loan",
        "DAY_OF_YEAR": "Day of calendar year of loan",
        "DOTW": "Day of the week of loan",
        "LOAN_AMOUNT": "Loan amount in $USD",
        "INCOME": "Household income in $USD",
        "INCOME_LOAN_RATIO": "Ratio of LOAN_AMOUNT/INCOME",
        "MEDIAN_COUNTY_INCOME": "Median household income aggregated at county level",
        "HIGH_INCOME_FLAG": "Binary flag to indicate whether household income is higher than MEDIAN_COUNTY_INCOME",
        "AVG_THIRTY_DAY_LOAN_AMOUNT": "Rolling 30 day average of LOAN_AMOUNT"
    }
)

loan_fv = fs.register_feature_view(loan_fv, version=VERSION_NUM, overwrite=True)

In [None]:
fs.list_feature_views()

In [None]:
#Create link to feature store UI to inspect newly created feature view!

org_name = session.sql('SELECT CURRENT_ORGANIZATION_NAME()').collect()[0][0]
account_name = session.sql('SELECT CURRENT_ACCOUNT_NAME()').collect()[0][0]
db_name = session.sql('SELECT CURRENT_DATABASE()').collect()[0][0]
schema_name = session.sql('SELECT CURRENT_SCHEMA()').collect()[0][0]

st.write(f'https://app.snowflake.com/{org_name}/{account_name}/#/features/database/{db_name}/store/{schema_name}')

## Retrieve a Dataset from the featureview

Snowflake Datasets are immutable, file-based objects that exist within your Snowpark session. 

They can be written to persistent Snowflake objects as needed. 

In [None]:
ds = fs.generate_dataset(
    name=f"MORTGAGE_DATASET_EXTENDED_FEATURES_{VERSION_NUM}",
    spine_df=df.select("LOAN_ID", "TIMESTAMP", "LOAN_PURPOSE_NAME","MORTGAGERESPONSE"), #only need the features used to fetch rest of feature view
    features=[loan_fv],
    spine_timestamp_col="TIMESTAMP",
    spine_label_cols=["MORTGAGERESPONSE"]
)

In [None]:
ds_sp = ds.read.to_snowpark_dataframe()
ds_sp.show(5)

In [None]:
import snowflake.ml.modeling.preprocessing as snowml
from snowflake.snowpark.types import StringType

OHE_COLS = ds_sp.select([col.name for col in ds_sp.schema if col.datatype ==StringType()]).columns
OHE_POST_COLS = [i+"_OHE" for i in OHE_COLS]


# Encode categoricals to numeric columns
snowml_ohe = snowml.OneHotEncoder(input_cols=OHE_COLS, output_cols = OHE_COLS, drop_input_cols=True)
ds_sp_ohe = snowml_ohe.fit(ds_sp).transform(ds_sp)

#Rename columns to avoid double nested quotes and white space chars
rename_dict = {}
for i in ds_sp_ohe.columns:
    if '"' in i:
        rename_dict[i] = i.replace('"','').replace(' ', '_')

ds_sp_ohe = ds_sp_ohe.rename(rename_dict)
ds_sp_ohe.columns

In [None]:
train, test = ds_sp_ohe.random_split(weights=[0.70, 0.30], seed=0)

In [None]:
train = train.fillna(0)
test = test.fillna(0)

In [None]:
train_pd = train.to_pandas()
test_pd = test.to_pandas()

## Configure model

In [None]:
#Define model config
xgb_base = XGBClassifier(
    max_depth=50,
    n_estimators=3,
    learning_rate = 0.75,
    booster = 'gbtree')

## Fit the model

In [None]:
#Split train data into X, y
X_train_pd = train_pd.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"],axis=1) #remove
y_train_pd = train_pd.MORTGAGERESPONSE

#train model
xgb_base.fit(X_train_pd,y_train_pd)

Measure baseline performance of our first version

Note that here, we simply use OSS methods from scikit-learn

In [None]:
from sklearn.metrics import f1_score, precision_score, recall_score
train_preds_base = xgb_base.predict(X_train_pd) #update this line with correct ata

f1_base_train = round(f1_score(y_train_pd, train_preds_base),4)
precision_base_train = round(precision_score(y_train_pd, train_preds_base),4)
recall_base_train = round(recall_score(y_train_pd, train_preds_base),4)

print(f'F1: {f1_base_train} \nPrecision {precision_base_train} \nRecall: {recall_base_train}')

# Model Registry

- Log models with important metadata
- Manage model lifecycles
- Serve models from Snowflake runtimes

In [None]:
#Create a snowflake model registry object 
from snowflake.ml.registry import Registry
from snowflake.ml._internal.utils import identifier
from snowflake.ml.model import model_signature

db = identifier._get_unescaped_name(session.get_current_database())
schema = identifier._get_unescaped_name(session.get_current_schema())


# Define model name
model_name = f"MORTGAGE_LENDING_MLOPS_{VERSION_NUM}"

# Create a registry to log the model to
model_registry = Registry(session=session, 
                          database_name=db, 
                          schema_name=schema,
                          options={"enable_monitoring": True})

In [None]:
#Deploy the base model to the model registry
base_version_name = 'XGB_BASE'

try:
    mv_base = model_registry.get_model(model_name).version(base_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_base = model_registry.log_model(
        model_name=model_name,
        model=xgb_base, 
        version_name=base_version_name,
        sample_input_data = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).limit(100), #using snowpark df to maintain lineage
        comment = """ML model for predicting loan approval likelihood.
                    This model was trained using  xgboost classifier.
                    Hyperparameters used were:a
                    max_depth=50, n_estimators=3, learning_rate = 0.75, algorithm = gbtree.
                    """,
    )
    mv_base.set_metric(metric_name="Train_F1_Score", value=f1_base_train)
    mv_base.set_metric(metric_name="Train_Precision_Score", value=precision_base_train)
    mv_base.set_metric(metric_name="Train_Recall_score", value=recall_base_train)

In [None]:
session.sql("CREATE OR REPLACE TAG PROD")

In [None]:
#Create tag for PROD model and apply 
session.sql("CREATE OR REPLACE TAG PROD")
m = model_registry.get_model(model_name)
m.set_tag("PROD", base_version_name)
m.comment = "Loan approval prediction models" #set model level comment

In [None]:
model_registry.show_models()

In [None]:
model_registry.get_model(model_name).show_versions()

In [None]:
print(mv_base)
print(mv_base.show_metrics())

In [None]:
mv_base.show_functions()

In [None]:
reg_preds = mv_base.run(test, function_name = "predict")
reg_preds.show(10)

In [None]:
reg_preds = reg_preds.rename(col('"output_feature_0"'), "MORTGAGE_PREDICTION")
reg_preds.columns

In [None]:
#ds_sp_ohe = ds_sp_ohe.rename(col('"LOAN_PURPOSE_NAME_Home improvement"'), "LOAN_PURPOSE_NAME_Home_improvement")

preds_pd = reg_preds.select(["MORTGAGERESPONSE", "MORTGAGE_PREDICTION"]).to_pandas()
f1_base_test = round(f1_score(preds_pd.MORTGAGERESPONSE, preds_pd.MORTGAGE_PREDICTION),4)
precision_base_test = round(precision_score(preds_pd.MORTGAGERESPONSE, preds_pd.MORTGAGE_PREDICTION),4)
recall_base_test = round(recall_score(preds_pd.MORTGAGERESPONSE, preds_pd.MORTGAGE_PREDICTION),4)

#log metrics to model registry model
mv_base.set_metric(metric_name="Test_F1_Score", value=f1_base_test)
mv_base.set_metric(metric_name="Test_Precision_Score", value=precision_base_test)
mv_base.set_metric(metric_name="Test_Recall_score", value=recall_base_test)

print(f'F1: {f1_base_test} \nPrecision {precision_base_test} \nRecall: {recall_base_test}')

# Oh no! Our model's performance seems to have dropped off significantly from training to our test set. 
## This is evidence that our model is overfit - can we fix this with Distributed Hyperparameter Optimization??

In [None]:
X_train = train.drop("MORTGAGERESPONSE", "TIMESTAMP", "LOAN_ID")
y_train = train.select("MORTGAGERESPONSE")
X_test = test.drop("MORTGAGERESPONSE","TIMESTAMP", "LOAN_ID")
y_test = test.select("MORTGAGERESPONSE")

In [None]:
from snowflake.ml.data import DataConnector
from snowflake.ml.modeling.tune import get_tuner_context
from snowflake.ml.modeling import tune
from entities import search_algorithm

#Define dataset map
dataset_map = {
    "x_train": DataConnector.from_dataframe(X_train),
    "y_train": DataConnector.from_dataframe(y_train),
    "x_test": DataConnector.from_dataframe(X_test),
    "y_test": DataConnector.from_dataframe(y_test)
    }


# Define a training function, with any models you choose within it.
def train_func():
    # A context object provided by HPO API to expose data for the current HPO trial
    tuner_context = get_tuner_context()
    config = tuner_context.get_hyper_params()
    dm = tuner_context.get_dataset_map()

    model = XGBClassifier(**config, random_state=42)
    model.fit(dm["x_train"].to_pandas().sort_index(), dm["y_train"].to_pandas().sort_index())
    f1_metric = f1_score(
        dm["y_train"].to_pandas().sort_index(), model.predict(dm["x_train"].to_pandas().sort_index())
    )
    tuner_context.report(metrics={"f1_score": f1_metric}, model=model)

tuner = tune.Tuner(
    train_func=train_func,
    search_space={
        "max_depth": tune.randint(1, 20),
        "learning_rate": tune.uniform(0.01, 0.1),
        "n_estimators": tune.randint(50, 100),
    },
    tuner_config=tune.TunerConfig(
        metric="f1_score",
        mode="max",
        search_alg=search_algorithm.RandomSearch(random_state=101),
        num_trials=8, #run 8 trial runs
        max_concurrent_trials=4, #use 4 gpus concurrently
    ),
)

In [None]:
tuner_results = tuner.run(dataset_map=dataset_map)

In [None]:
tuned_model = tuner_results.best_model
tuned_model

In [None]:
#Generate predictions
xgb_opt_preds = tuned_model.predict(train_pd.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"],axis=1))

#Generate performance metrics
f1_opt_train = round(f1_score(train_pd.MORTGAGERESPONSE, xgb_opt_preds),4)
precision_opt_train = round(precision_score(train_pd.MORTGAGERESPONSE, xgb_opt_preds),4)
recall_opt_train = round(recall_score(train_pd.MORTGAGERESPONSE, xgb_opt_preds),4)

print(f'Train Results: \nF1: {f1_opt_train} \nPrecision {precision_opt_train} \nRecall: {recall_opt_train}')

In [None]:
#Generate test predictions
xgb_opt_preds_test = tuned_model.predict(test_pd.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"],axis=1))

#Generate performance metrics on test data
f1_opt_test = round(f1_score(test_pd.MORTGAGERESPONSE, xgb_opt_preds_test),4)
precision_opt_test = round(precision_score(test_pd.MORTGAGERESPONSE, xgb_opt_preds_test),4)
recall_opt_test = round(recall_score(test_pd.MORTGAGERESPONSE, xgb_opt_preds_test),4)

print(f'Test Results: \nF1: {f1_opt_test} \nPrecision {precision_opt_test} \nRecall: {recall_opt_test}')

# Here we see the HPO model has a more modest train accuracy than our base model - but the peformance doesn't drop off during testing

In [None]:
#Log the optimized model to the model registry
optimized_version_name = 'XGB_Optimized'

try:
    mv_opt = model_registry.get_model(model_name).version(optimized_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_opt = model_registry.log_model(
        model_name=model_name,
        model=tuned_model, 
        version_name=optimized_version_name,
        sample_input_data = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).limit(100),
        comment = "snow ml model built off feature store using HPO model",
    )
    mv_opt.set_metric(metric_name="Train_F1_Score", value=f1_opt_train)
    mv_opt.set_metric(metric_name="Train_Precision_Score", value=precision_opt_train)
    mv_opt.set_metric(metric_name="Train_Recall_score", value=recall_opt_train)

    mv_opt.set_metric(metric_name="Test_F1_Score", value=f1_opt_test)
    mv_opt.set_metric(metric_name="Test_Precision_Score", value=precision_opt_test)
    mv_opt.set_metric(metric_name="Test_Recall_score", value=recall_opt_test)

In [None]:
#Here we see the BASE version is our default version
model_registry.get_model(model_name).default

In [None]:
#Now we'll set the optimized model to be the default model version going forward
model_registry.get_model(model_name).default = optimized_version_name

In [None]:
#Now we see our optimized version we have now recently promoted to our DEFAULT model version
model_registry.get_model(model_name).default

In [None]:
#we'll now update the PROD tagged model to be the optimized model version rather than our overfit base version
m.unset_tag("PROD")
m.set_tag("PROD", optimized_version_name)
m.show_tags()

## Now that we've deployed some model versions and tested inference... 
# Let's explain our models!
- ### Snowflake offers built in explainability capabilities on top of models logged to the model registry
- ### In the below section we'll generate shapley values using these built in functions to understand how input features impact our model's behavior

In [None]:
#create a sample of 1000 records
test_pd_sample=test_pd.rename(columns=rename_dict).sample(n=1000, random_state = 100).reset_index(drop=True)

#Compute shapley values for each model
base_shap_pd = mv_base.run(test_pd_sample, function_name="explain")
opt_shap_pd = mv_opt.run(test_pd_sample, function_name="explain")

In [None]:
import shap 

shap.summary_plot(np.array(base_shap_pd.astype(float)), 
                  test_pd_sample.drop(["LOAN_ID","MORTGAGERESPONSE", "TIMESTAMP"], axis=1), 
                  feature_names = test_pd_sample.drop(["LOAN_ID","MORTGAGERESPONSE", "TIMESTAMP"], axis=1).columns)

In [None]:
shap.summary_plot(np.array(opt_shap_pd.astype(float)), 
                  test_pd_sample.drop(["LOAN_ID","MORTGAGERESPONSE", "TIMESTAMP"], axis=1), 
                  feature_names = test_pd_sample.drop(["LOAN_ID","MORTGAGERESPONSE", "TIMESTAMP"], axis=1).columns)

In [None]:
#Merge shap vals and actual vals together for easier plotting below
all_shap_base = test_pd_sample.merge(base_shap_pd, right_index=True, left_index=True, how='outer')
all_shap_opt = test_pd_sample.merge(opt_shap_pd, right_index=True, left_index=True, how='outer')

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

#filter data down to strip outliers
asb_filtered = all_shap_base[(all_shap_base.INCOME>0) & (all_shap_base.INCOME<250000)]
aso_filtered = all_shap_opt[(all_shap_opt.INCOME>0) & (all_shap_opt.INCOME<250000)]

# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("INCOME EXPLANATION")
# Plot side-by-side boxplots
sns.scatterplot(data = asb_filtered, x ='INCOME', y = 'INCOME_explanation', ax=axes[0])
sns.regplot(data = asb_filtered, x ="INCOME", y = 'INCOME_explanation', scatter=False, color='red', line_kws={"lw":2},ci =100, lowess=False, ax =axes[0])

axes[0].set_title('Base Model')
sns.scatterplot(data = aso_filtered, x ='INCOME', y = 'INCOME_explanation',color = "orange", ax = axes[1])
sns.regplot(data = aso_filtered, x ="INCOME", y = 'INCOME_explanation', scatter=False, color='blue', line_kws={"lw":2},ci =100, lowess=False, ax =axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("Income")
    ax.set_ylabel("Influence")
plt.tight_layout()
plt.show()


In [None]:
#filter data down to strip outliers
asb_filtered = all_shap_base[all_shap_base.LOAN_AMOUNT<2000000]
aso_filtered = all_shap_opt[all_shap_opt.LOAN_AMOUNT<2000000]


# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("LOAN_AMOUNT EXPLANATION")
# Plot side-by-side boxplots
sns.scatterplot(data = asb_filtered, x ='LOAN_AMOUNT', y = 'LOAN_AMOUNT_explanation', ax=axes[0])
sns.regplot(data = asb_filtered, x ="LOAN_AMOUNT", y = 'LOAN_AMOUNT_explanation', scatter=False, color='red', line_kws={"lw":2},ci =100, lowess=True, ax =axes[0])
axes[0].set_title('Base Model')

sns.scatterplot(data = aso_filtered, x ='LOAN_AMOUNT', y = 'LOAN_AMOUNT_explanation',color = "orange", ax = axes[1])
sns.regplot(data = aso_filtered, x ="LOAN_AMOUNT", y = 'LOAN_AMOUNT_explanation', scatter=False, color='blue', line_kws={"lw":2},ci =100, lowess=True, ax =axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("LOAN_AMOUNT")
    ax.set_ylabel("Influence")
    # ax.set_xlim((0,10000))
plt.tight_layout()
plt.show()


In [None]:
# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("HOME PURCHASE LOAN EXPLANATION")
# Plot side-by-side boxplots
sns.boxplot(data = all_shap_base, x ='LOAN_PURPOSE_NAME_HOME_PURCHASE', y = 'LOAN_PURPOSE_NAME_HOME_PURCHASE_explanation',
            hue='LOAN_PURPOSE_NAME_HOME_PURCHASE', width=0.8, ax=axes[0])
axes[0].set_title('Base Model')
sns.boxplot(data = all_shap_opt, x ='LOAN_PURPOSE_NAME_HOME_PURCHASE', y = 'LOAN_PURPOSE_NAME_HOME_PURCHASE_explanation',
            hue='LOAN_PURPOSE_NAME_HOME_PURCHASE', width=0.4, ax = axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("Home PURCHASE Loan (1 = True)")
    ax.set_ylabel("Influence")
    ax.legend(loc='upper right')

plt.show()


In [None]:
# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("HOME IMPROVEMENT LOAN EXPLANATION")
# Plot side-by-side boxplots
sns.boxplot(data = all_shap_base, x ='LOAN_PURPOSE_NAME_HOME_IMPROVEMENT', y = 'LOAN_PURPOSE_NAME_HOME_IMPROVEMENT_explanation',
            hue='LOAN_PURPOSE_NAME_HOME_IMPROVEMENT', width=0.8, ax=axes[0])
axes[0].set_title('Base Model')
sns.boxplot(data = all_shap_opt, x ='LOAN_PURPOSE_NAME_HOME_IMPROVEMENT', y = 'LOAN_PURPOSE_NAME_HOME_IMPROVEMENT_explanation',
            hue='LOAN_PURPOSE_NAME_HOME_IMPROVEMENT', width=0.4, ax = axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("Home Improvement Loan (1 = True)")
    ax.set_ylabel("Influence")
    ax.legend(loc='upper right')

plt.show()


# Model Monitoring setup

In [None]:
train.write.save_as_table(f"DEMO_MORTGAGE_LENDING_TRAIN_{VERSION_NUM}", mode="overwrite")
test.write.save_as_table(f"DEMO_MORTGAGE_LENDING_TEST_{VERSION_NUM}", mode="overwrite")

In [None]:
session.sql("CREATE stage IF NOT EXISTS ML_STAGE").collect()

In [None]:
from snowflake import snowpark
from snowflake.ml.registry import Registry
import joblib
import os
import logging
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.ml.modeling.preprocessing as pp
from snowflake.snowpark.types import StringType, IntegerType
import snowflake.snowpark.functions as F


def demo_inference_sproc(session: snowpark.Session, table_name: str, modelname: str, modelversion: str) -> str:
    
    database=session.get_current_database()
    schema=session.get_current_schema()
    reg = Registry(session=session)
    m = reg.get_model(model_name)  # Fetch the model using the registry
    mv = m.version(modelversion)
    
    input_table_name=table_name
    pred_col = f'{modelversion}_PREDICTION'

    # Read the input table to a dataframe
    df = session.table(input_table_name)
    results = mv.run(df, function_name="predict").select("LOAN_ID",'"output_feature_0"').withColumnRenamed('"output_feature_0"', pred_col)
    # 'results' is the output DataFrame with predictions

    final = df.join(results, on="LOAN_ID", how="full")
    # Write results back to Snowflake table
    final.write.save_as_table(table_name, mode='overwrite',enable_schema_evolution=True)

    return "Success"

# Register the stored procedure
session.sproc.register(
    func=demo_inference_sproc,
    name="model_inference_sproc",
    replace=True,
    is_permanent=True,
    stage_location="@ML_STAGE",
    packages=['joblib', 'snowflake-snowpark-python', 'snowflake-ml-python'],
    return_type=StringType()
)


In [None]:
CALL model_inference_sproc('DEMO_MORTGAGE_LENDING_TRAIN_{{VERSION_NUM}}','{{model_name}}', '{{base_version_name}}');

In [None]:
CALL model_inference_sproc('DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}}','{{model_name}}', '{{base_version_name}}');

In [None]:
CALL model_inference_sproc('DEMO_MORTGAGE_LENDING_TRAIN_{{VERSION_NUM}}','{{model_name}}', '{{optimized_version_name}}');

In [None]:
CALL model_inference_sproc('DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}}','{{model_name}}', '{{optimized_version_name}}');

In [None]:
select * FROM DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}} limit 10

In [None]:
# from snowflake.ml.monitoring.entities.model_monitor_config import ModelMonitorConfig, ModelMonitorSourceConfig
# # snowflake/ml/monitoring/entities/model_monitor_config.py

# # Set up source/baseline table config for base model
# base_source_config = ModelMonitorSourceConfig(
#     baseline = "DEMO_MORTGAGE_LENDING_TRAIN",
#     source="DEMO_MORTGAGE_LENDING_TEST",
#     timestamp_column="TIMESTAMP",
#     prediction_score_columns=["XGB_BASE_PREDICTION"],
#     actual_score_columns=["MORTGAGERESPONSE"],
#     id_columns=["LOAN_ID"]
# )

# # Set up model config for tree booster
# base_monitor_config = ModelMonitorConfig(
#     model_version=mv_base,
#     model_function_name="predict",
#     background_compute_warehouse_name="ML_WH"
# )

# # Set up source/baseline table config for opt model
# opt_source_config = ModelMonitorSourceConfig(
#     baseline = "DEMO_MORTGAGE_LENDING_TRAIN",
#     source="DEMO_MORTGAGE_LENDING_TEST",
#     timestamp_column="TIMESTAMP",
#     prediction_score_columns=["XGB_OPTIMIZED_PREDICTION"],
#     actual_score_columns=["MORTGAGERESPONSE"],
#     id_columns=["LOAN_ID"]
# )

# # Set up model config for linear booster
# opt_monitor_config = ModelMonitorConfig(
#     model_version=mv_opt,
#     model_function_name="predict",
#     background_compute_warehouse_name="ML_WH"
# )

In [None]:
# # Add a new ModelMonitor
# model_monitor = model_registry.add_monitor(
#     name="GB_TREE_MORTGAGE_LENDING_MODEL_MONITOR", 
#     source_config=tree_source_config,
#     model_monitor_config=tree_monitor_config,
# )


# model_monitor = model_registry.add_monitor(
#     name="GB_MORTGAGE_LENDING_MODEL_MONITOR", 
#     source_config=linear_source_config,
#     model_monitor_config=linear_monitor_config,
# )

In [None]:
CREATE OR REPLACE MODEL MONITOR MORTGAGE_LENDING_BASE_MODEL_MONITOR
WITH
    MODEL={{model_name}}
    VERSION={{base_version_name}}
    FUNCTION=predict
    SOURCE=DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}}
    BASELINE=DEMO_MORTGAGE_LENDING_TRAIN_{{VERSION_NUM}}
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(XGB_BASE_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(MORTGAGERESPONSE)
    ID_COLUMNS=(LOAN_ID)
    WAREHOUSE=SMALL
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

In [None]:
CREATE OR REPLACE MODEL MONITOR MORTGAGE_LENDING_OPTIMIZED_MODEL_MONITOR
WITH
    MODEL={{model_name}}
    VERSION={{optimized_version_name}}
    FUNCTION=predict
    SOURCE=DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}}
    BASELINE=DEMO_MORTGAGE_LENDING_TRAIN_{{VERSION_NUM}}
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(XGB_OPTIMIZED_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(MORTGAGERESPONSE)
    ID_COLUMNS=(LOAN_ID)
    WAREHOUSE=SMALL
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

In [None]:
SELECT * FROM TABLE(MODEL_MONITOR_DRIFT_METRIC(
'MORTGAGE_LENDING_OPTIMIZED_MODEL_MONITOR', 'DIFFERENCE_OF_MEANS', 'XGB_BASE_PREDICTION', '1 DAY', TO_TIMESTAMP_TZ('2024-06-01'), TO_TIMESTAMP_TZ('2024-09-01')))

# SPCS Deployment setup (OPTIONAL) - **** REWRITE ****
## This is disabled by default but uncommenting the below code cells will allow a user to 

- ### Create a new compute pool with 3 XL CPU nodes
- ### Create a new image repository to store the container image for conatiner-based model scoring
- ### Deploys a service on top of our existing HPO model version
- ### Tests out inference on newly created container service


In [None]:
# image_repo_name = "MORTGAGE_LENDING_IMAGE_REPO_LLM"
# cp_name = "MORTGAGE_LENDING_INFERENCE_CP"
# num_spcs_nodes = '3'
# spcs_instance_family = 'CPU_X64_L'
# service_name = 'MORTGAGE_LENDING_PREDICTION_SERVICE'

# current_database = session.get_current_database().replace('"', '')
# current_schema = session.get_current_schema().replace('"', '')
# extended_image_repo_name = f"{current_database}.{current_schema}.{image_repo_name}"
# extended_service_name = f'{current_database}.{current_schema}.{service_name}'

In [None]:
# session.sql(f"show image repositories").collect()

In [None]:
# session.sql(f"alter compute pool if exists {cp_name} stop all").collect()
# session.sql(f"drop compute pool if exists {cp_name}").collect()
# session.sql(f"create compute pool {cp_name} min_nodes={num_spcs_nodes} max_nodes={num_spcs_nodes} instance_family={spcs_instance_family} auto_resume=True auto_suspend_secs=300").collect()
# session.sql(f"describe compute pool {cp_name}").show()

In [None]:
# session.sql(f"create image repository if not exists {extended_image_repo_name}").collect()

In [None]:
# mv_opt.create_service(
#     service_name=extended_service_name,
#     service_compute_pool=cp_name,
#     image_repo=extended_image_repo_name,
#     ingress_enabled=True,
#     max_instances=int(num_spcs_nodes),
#     build_external_access_integration="ALLOW_ALL_INTEGRATION"
# )

In [None]:
# model_registry.get_model(f"MORTGAGE_LENDING_MLOPS_{VERSION_NUM}").show_versions()

In [None]:
# mv_container = model_registry.get_model(f"MORTGAGE_LENDING_MLOPS_{VERSION_NUM}").default
# mv_container.list_services()

In [None]:
# mv_container.run(test, function_name = "predict", service_name = "MORTGAGE_LENDING_PREDICTION_SERVICE")

## Conclusion **** RERWRITE ****

#### 🛠️ Snowflake Feature Store tracks feature definitions and maintains lineage of sources and destinations 🛠️
#### 🚀 Snowflake Model Registry gives users a secure and flexible framework to deploy track and monitor models 🚀
#### 🔮 All model versions logged in the Model Registry can be accessed for inference, explainability, lineage tracking, visibility and more 🔮


# ARCHIVE BELOW

# Distributed model training
## For demonstrations sake - below we have an example doing distributed model training
### Snowflake will set up a ray cluster on all available nodes in your compute pool (CPU or GPU) and execute the distributed training job

In [None]:
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector
dc = DataConnector.from_dataframe(train)

#Specify Scaling Config 
scaling_config = XGBScalingConfig(use_gpu=True)

#Define distributed xgb estimator
dist_gpu_xgb = XGBEstimator(
    params = {"booster": "gbtree",
              "n_estimators":10,},
    scaling_config = scaling_config)

dist_gpu_xgb.fit(dc,
                 input_cols = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).columns,
                 label_col = "MORTGAGERESPONSE")