# ❄️ Building an End to End Machine Learning Workflow in Snowflake ❄️

In this Notebook ([on Container Runtime](https://docs.snowflake.com/developer-guide/snowflake-ml/notebooks-on-spcs)), we will develop a machine learning model that accurately predicts the "mortgage response" (e.g., loan approval, offer acceptance) based on borrower characteristics and loan details.

**Why is this important?**

- `Risk Management:` Lenders can better assess the risk of loan default.
- `Operational Efficiency:` Automating parts of the approval process.
- `Targeted Marketing:` Identifying potential borrowers more effectively.
- `Improved Customer Experience:` Streamlining the loan process.

We will showcase all the typical steps in a machine learning pipeline using native capabilities in Snowflake through this use case:

### 1. `FEATURE ENGINEERING:` Use [Snowflake Feature Store](https://docs.snowflake.com/en/developer-guide/snowflake-ml/feature-store/overview) to track engineered features
- Store feature defintions in a feature store for reproducible computation of ML features
      
### 2. `MODEL TRAINING:` Train models using OSS XGBoost and Snowflake ML APIs
- Baseline OSS XGboost
- XGBoost with optimal hyperparameters identified via [Snowflake ML Parallel Hyperparameter Optimization](https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-hpo)

### 3. `MODEL LOGGING, INFERENCE, & EXPLAINABILITY:` Register models in [Snowflake Model Registry](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/overview)
- Explore model registry capabilities such as **metadata tracking, inference, and explainability**
- Compare model metrics on train/test set to identify any issues of model performance or overfitting
- Tag the best performing model version as 'default' version

### 4. `ML OBSERVABILITY:` Set up [Model Monitors](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/model-observability) to track 1 year of predicted and actual loan repayments
- **Compute performance metrics** such a F1, Precision, Recall
- **Inspect model drift** (i.e. how much has the average predicted repayment rate changed day-to-day)
- **Compare models** side-by-side to understand which model should be used in production
- Identify and understand **data issues**

### 5. `ML LINEAGE:` Track [data and model lineage](https://docs.snowflake.com/en/user-guide/ui-snowsight-lineage#ml-lineage) throughout
- View and understand
  - The **origin of the data** used for computed features
  - The **data used** for model training
  - The **available model versions** being monitored

### 6. `[OPTIONAL] DISTRIBUTED MODEL TRAINING & DEPLOYMENT`
- Distributed XGBoost via [Snowflake's Distributed Modeling Classes](https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors) - single node, multi-GPU and multi-node, multi-GPU
- [Deploy model to Snowpark Container Services](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/container) 

First, the only library we need to install is `shap`, which will be used to understand model explainability later on.

In [None]:
!pip install shap

Let's set a version (must be a string) that will be used through all the artifacts that will be created for our project.

In [None]:
VERSION_NUM = 'V0'

Now, let's import all the libraries we need.

In [1]:
# Built-in
import math
import pickle
from datetime import datetime

# Third-party
import pandas as pd
import numpy as np
import shap
import sklearn
import streamlit as st
from xgboost import XGBClassifier

# Snowflake ML
from snowflake.ml.registry import Registry
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity, CreationMode
from snowflake.ml.runtime_cluster import scale_cluster, get_nodes

# Snowpark Core
from snowflake.snowpark import DataFrame, Window
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.functions import (
    col,
    to_timestamp,
    min,
    max,
    month,
    dayofmonth,
    dayofweek,
    dayofyear,
    avg,
    date_add,
    sql_expr,
)
from snowflake.snowpark.types import IntegerType

# Initialize session
session = get_active_session()
session

In [None]:
# Use schema DEFAULT_SCHEMA to create new Snowflake objects in this project
session.use_schema('DEFAULT_SCHEMA')

We can read the data that's already been pre-loaded into your account as a Snowflake [Snowpark dataframe](https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes).

In [None]:
df = session.table("MORTGAGE_LENDING_DEMO_DATA")
df.show()

# Feature Engineering with Snowpark APIs

We will create a number of features within `create_mortgage_features()`:
- Timestamp features (i.e. month, day of year, day of week)
- Income and Loan features
- County-level income stats
- High income flag
- Time-based rolling average.

All the features are created using Snowpark DF operations and functions, which you can find the API reference documentation for [here](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/1.2.0/index). 

In [None]:
def create_mortgage_features(df):
    # Step 1: Timestamp features (Per-row features)
    # Get current date and time
    current_time = datetime.now()
    df_max_time = datetime.strptime(str(df.select(max("TS")).collect()[0][0]), "%Y-%m-%d %H:%M:%S.%f")
    
    # Find delta between latest existing timestamp and today's date
    timedelta = current_time- df_max_time

    df = df.with_columns(
        ["TIMESTAMP", "MONTH", "DAY_OF_YEAR", "DOTW"],
        [
            date_add(to_timestamp("TS"), timedelta.days-1),
            month("TIMESTAMP"),
            dayofyear("TIMESTAMP"),
            dayofweek("TIMESTAMP")
        ]
    )
    
    # Step 2: Income and loan features (Per-row features)
    df = df.with_columns(
        ["LOAN_AMOUNT", "INCOME", "INCOME_LOAN_RATIO"],
        [
            col("LOAN_AMOUNT_000s")*1000,
            col("APPLICANT_INCOME_000s")*1000,
            col("INCOME")/col("LOAN_AMOUNT")
        ]
    )
    
    # Step 3: County-level income stats (Per-group features)
    county_income_df = df.group_by(["COUNTY_NAME"]).agg(
        avg("INCOME").alias("AVG_COUNTY_INCOME")
    )
    # Join back to the original dataframe
    df = df.join(county_income_df, "COUNTY_NAME")
    
    # Step 4: Add high income flag
    df = df.with_column(
        "HIGH_INCOME_FLAG", 
        (col("INCOME") > col("AVG_COUNTY_INCOME")).astype(IntegerType())
    )
    
    # Step 5: Time-based rolling average
    df = df.with_column(
        "AVG_THIRTY_DAY_LOAN_AMOUNT",
        sql_expr("""
            AVG(LOAN_AMOUNT) OVER (
                PARTITION BY COUNTY_NAME 
                ORDER BY TIMESTAMP 
                RANGE BETWEEN INTERVAL '30 DAYS' PRECEDING AND CURRENT ROW
            )
        """)
    )

    return df
    
df = create_mortgage_features(df)

feature_df = df.select(
        ["LOAN_ID", "TIMESTAMP", "MONTH", "DAY_OF_YEAR", "DOTW", 
         "LOAN_AMOUNT", "INCOME", "INCOME_LOAN_RATIO", 
         "AVG_COUNTY_INCOME", "HIGH_INCOME_FLAG", 
         "AVG_THIRTY_DAY_LOAN_AMOUNT"]
    )

Let's take a look at the features we just created:

In [None]:
feature_df.limit(5)

We can even see what the actual SQL execution plan in under the hood from defining all the feature logic above using Snowpark.

In [None]:
feature_df.explain()

## Now, we can create a [Snowflake Feature Store](https://docs.snowflake.com/en/developer-guide/snowflake-ml/feature-store/overview).

We'll go ahead and use our session's current Snowflake database, schema, and warehouse to create it for the purpose of this demo.

In [None]:
fs = FeatureStore(
    session=session, 
    database=session.get_current_database(), 
    name=session.get_current_schema(), 
    default_warehouse=session.get_current_warehouse(),
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

A feature store contains feature views. 

A **feature view** encapsulates a Python or SQL pipeline for transforming raw data into one or more related features. 

Feature views are organized in the feature store according to the **entities** to which they apply. An **entity** is a higher-level abstraction that represents the subject matter of a feature. 

So, let's proceed to create a new entity now. Since we are creating features at the loan level, we will create an entity for loans. Entities are used to look up and join features.

In [None]:
# First try to retrieve an existing entity definition
# If not, define a new one and register
try:
    # Retrieve existing entity
    loan_id_entity = fs.get_entity('LOAN_ENTITY') 
    print('Retrieved existing entity')
except:
    # Define new entity
    loan_id_entity = Entity(
        name = "LOAN_ENTITY",
        join_keys = ["LOAN_ID"],
        desc = "Features defined on a per loan level")
    # Register
    fs.register_entity(loan_id_entity)
    print("Registered new entity")

In [None]:
fs.list_entities()

Now, we can create a [Feature View](https://docs.snowflake.com/en/developer-guide/snowflake-ml/feature-store/feature-views) based on the feature engineering logic we created above and applied to the data we read into a Snowpark DF, which is encapusulated into `feature_df`.

In [None]:
# Define and register feature view
loan_fv = FeatureView(
    name="Mortgage_Feature_View",
    entities=[loan_id_entity],
    feature_df=feature_df,
    timestamp_col="TIMESTAMP",
    refresh_freq="1 day",
    refresh_mode="INCREMENTAL")

# Add feature level descriptions
loan_fv = loan_fv.attach_feature_desc(
    {
        "MONTH": "Month of loan",
        "DAY_OF_YEAR": "Day of calendar year of loan",
        "DOTW": "Day of the week of loan",
        "LOAN_AMOUNT": "Loan amount in $USD",
        "INCOME": "Household income in $USD",
        "INCOME_LOAN_RATIO": "Ratio of LOAN_AMOUNT/INCOME",
        "AVG_COUNTY_INCOME": "Average household income aggregated at county level",
        "HIGH_INCOME_FLAG": "Binary flag to indicate whether household income is higher than MEDIAN_COUNTY_INCOME",
        "AVG_THIRTY_DAY_LOAN_AMOUNT": "Rolling 30 day average of LOAN_AMOUNT"
    }
)

loan_fv = fs.register_feature_view(loan_fv, version=VERSION_NUM, overwrite=True)

Let's take a look at the existing feature views we have in our Feature Store through the Python API.

In [None]:
fs.list_feature_views()

You can also inspect your feature view in the Feature Store UI link generated in the next cell. 

You can also click on the `Lineage` tab in the Feature View to look at the lineage of these features. 

In [None]:
# Create link to feature store UI to inspect newly created feature view!
org_name = session.sql('SELECT CURRENT_ORGANIZATION_NAME()').collect()[0][0]
account_name = session.sql('SELECT CURRENT_ACCOUNT_NAME()').collect()[0][0]
db_name = session.sql('SELECT CURRENT_DATABASE()').collect()[0][0]
schema_name = session.sql('SELECT CURRENT_SCHEMA()').collect()[0][0]

st.write(f'https://app.snowflake.com/{org_name}/{account_name}/#/features/database/{db_name}/store/{schema_name}')

### Retrieve a [Dataset from the feature view for model training](https://docs.snowflake.com/en/developer-guide/snowflake-ml/feature-store/modeling#generating-snowflake-datasets-for-training)

We can now generate a [Snowflake Dataset](https://docs.snowflake.com/en/developer-guide/snowflake-ml/dataset), which are immutable, file-based objects that exist within your Snowpark session. 

They can be written to persistent Snowflake objects as needed.

First, we create a "spine dataframe" which will contain the rows and entity IDs that we want to include in our training data. In this example, we are selecting all the rows in our source data. 

In [None]:
# Create a spine dataframe to select the desired rows for training. 
# In this case we are using all the rows 
spine_df = df.select("LOAN_ID", "TIMESTAMP", "LOAN_PURPOSE_NAME","MORTGAGERESPONSE") #only need the features used to fetch rest of feature view
spine_df.show()

The `generate_dataset()` API will fill this spine_df with the correct feature values from the Feature View.

In [None]:
ds = fs.generate_dataset(
    name=f"MORTGAGE_DATASET_EXTENDED_FEATURES_{VERSION_NUM}",
    # only need the features used to fetch rest of feature view
    spine_df=df.select("LOAN_ID", "TIMESTAMP", "LOAN_PURPOSE_NAME","MORTGAGERESPONSE"), 
    features=[loan_fv],
    spine_timestamp_col="TIMESTAMP",
    spine_label_cols=["MORTGAGERESPONSE"]
)

In [None]:
ds_sp = ds.read.to_snowpark_dataframe()
ds_sp.show(5)

Next, we will use [Snowflake ML distributed preprocessors such as OneHotEncoder](https://docs.snowflake.com/en/developer-guide/snowflake-ml/modeling#preprocessing) which are implementations of sklearn preprocessors and run in parallel on the Warehouse. Alternatively you can use OSS sklearn preprocessors directly on the Container Runtime.

In [None]:
import snowflake.ml.modeling.preprocessing as snowml
from snowflake.snowpark.types import StringType

OHE_COLS = ds_sp.select([col.name for col in ds_sp.schema if col.datatype ==StringType()]).columns
OHE_POST_COLS = [i+"_OHE" for i in OHE_COLS]

# Encode categoricals to numeric columns
snowml_ohe = snowml.OneHotEncoder(input_cols=OHE_COLS, output_cols = OHE_COLS, drop_input_cols=True)
ds_sp_ohe = snowml_ohe.fit(ds_sp).transform(ds_sp)

# Rename columns to avoid double nested quotes and white space chars
rename_dict = {}
for i in ds_sp_ohe.columns:
    if '"' in i:
        rename_dict[i] = i.replace('"','').replace(' ', '_')

ds_sp_ohe = ds_sp_ohe.rename(rename_dict)
ds_sp_ohe.columns

Now, we're ready to split the processed data into train/test sets, fill any null values, and convert the data to pandas to use OSS XGBoost.

In [None]:
train, test = ds_sp_ohe.random_split(weights=[0.70, 0.30], seed=0)

In [None]:
cols = list(ds_sp_ohe.columns)
cols.remove("TIMESTAMP")

train = train.fillna(0)
test = test.fillna(0)

In [None]:
train_pd = train.to_pandas()
test_pd = test.to_pandas()

# Model Training, Model Logging, Inference, & Explainability

## Configure the base OSS XGBoost model & fit the model

Let's define and train a model using the OSS `xgboost` library. Note in our imports at the top: `from xgboost import XGBClassifier`

In [None]:
# Define model config
xgb_base = XGBClassifier(
    max_depth=50,
    n_estimators=3,
    learning_rate = 0.75,
    booster = 'gbtree')

In [None]:
# Split train data into X, y
X_train_pd = train_pd.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"],axis=1) #remove
y_train_pd = train_pd.MORTGAGERESPONSE

# Train model
xgb_base.fit(X_train_pd,y_train_pd)

Let's measure the baseline performance of our first version.

Note that here, we simply use OSS methods from `scikit-learn`.

In [None]:
from sklearn.metrics import f1_score, precision_score, recall_score
train_preds_base = xgb_base.predict(X_train_pd)

f1_base_train = round(f1_score(y_train_pd, train_preds_base),4)
precision_base_train = round(precision_score(y_train_pd, train_preds_base),4)
recall_base_train = round(recall_score(y_train_pd, train_preds_base),4)

print(f'F1: {f1_base_train} \nPrecision {precision_base_train} \nRecall: {recall_base_train}')

## Now, we can log this model to [Snowflake Model Registry](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/overview):

- Log models with important metadata
- Manage model lifecycles
- Serve models from Snowflake runtimes

First, we need to create a Model Registry.

In [None]:
#Create a snowflake model registry object 
from snowflake.ml.registry import Registry
from snowflake.ml._internal.utils import identifier
from snowflake.ml.model import model_signature

db = identifier._get_unescaped_name(session.get_current_database())
schema = identifier._get_unescaped_name(session.get_current_schema())

# Define model name
model_name = f"MORTGAGE_LENDING_MLOPS_{VERSION_NUM}"

# Create a registry to log the model to
model_registry = Registry(session=session, 
                          database_name=db, 
                          schema_name=schema,
                          options={"enable_monitoring": True})

Now, we can log our model along with all the calculated metrics and other metadata such as model comment (description).

In [None]:
# Deploy the base model to the model registry
base_version_name = 'XGB_BASE'

try:
    mv_base = model_registry.get_model(model_name).version(base_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_base = model_registry.log_model(
        model_name=model_name,
        model=xgb_base, 
        version_name=base_version_name,
        sample_input_data = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).limit(100), #using snowpark df to maintain lineage
        comment = """ML model for predicting loan approval likelihood.
                    This model was trained using  xgboost classifier.
                    Hyperparameters used were:a
                    max_depth=50, n_estimators=3, learning_rate = 0.75, algorithm = gbtree.
                    """,
        options = {'relax_version': True},
        target_platforms= {"WAREHOUSE"}
    )
    mv_base.set_metric(metric_name="Train_F1_Score", value=f1_base_train)
    mv_base.set_metric(metric_name="Train_Precision_Score", value=precision_base_train)
    mv_base.set_metric(metric_name="Train_Recall_score", value=recall_base_train)

Let's set this base model as our `DEV` model by using model tagging. We'll also create a `PROD` tag to be used later.

In [None]:
# Create tag for DEV model
session.sql("CREATE OR REPLACE TAG DEV")

In [None]:
# Create tag for PROD model
session.sql("CREATE OR REPLACE TAG PROD")

In [None]:
# Apply tag
m = model_registry.get_model(model_name)
m.set_tag("DEV", base_version_name)
m.comment = "Loan approval prediction models" # Set model level comment

The next few cells demonstrate different functions available to us to see our existing models, their versions, metadata such as metrics, and available model functions.

In [None]:
model_registry.show_models()

In [None]:
model_registry.get_model(model_name).show_versions()

In [None]:
print(mv_base)
print(mv_base.show_metrics())

In [None]:
mv_base.show_functions()

Let's now test running inference on our test set using our logged model in the Registry.

In [None]:
reg_preds = mv_base.run(test, function_name = "predict")
reg_preds = reg_preds.rename(col('"output_feature_0"'), "MORTGAGE_PREDICTION")
reg_preds.show(10)

In [None]:
preds_pd = reg_preds.select(["MORTGAGERESPONSE", "MORTGAGE_PREDICTION"]).to_pandas()
f1_base_test = round(f1_score(preds_pd.MORTGAGERESPONSE, preds_pd.MORTGAGE_PREDICTION),4)
precision_base_test = round(precision_score(preds_pd.MORTGAGERESPONSE, preds_pd.MORTGAGE_PREDICTION),4)
recall_base_test = round(recall_score(preds_pd.MORTGAGERESPONSE, preds_pd.MORTGAGE_PREDICTION),4)

# Log metrics to model registry model
mv_base.set_metric(metric_name="Test_F1_Score", value=f1_base_test)
mv_base.set_metric(metric_name="Test_Precision_Score", value=precision_base_test)
mv_base.set_metric(metric_name="Test_Recall_score", value=recall_base_test)

print(f'F1: {f1_base_test} \nPrecision {precision_base_test} \nRecall: {recall_base_test}')

### Oh no! Our model's performance seems to have dropped off significantly from training to our test set. 

### This is evidence that our model is overfit - can we fix this with Snowflake's [Parallel Hyperparameter Optimization](https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-hpo)?

In [None]:
X_train = train.drop("MORTGAGERESPONSE", "TIMESTAMP", "LOAN_ID")
y_train = train.select("MORTGAGERESPONSE")
X_test = test.drop("MORTGAGERESPONSE","TIMESTAMP", "LOAN_ID")
y_test = test.select("MORTGAGERESPONSE")

We first ingest data from the Snowpark dataframes through the Container Runtime DataConnector API. Then, we define a training function that creates an XGBoost model. The Tuner interface provides the tuning functionality, based on the given training function and search space.

In [None]:
from snowflake.ml.data import DataConnector
from snowflake.ml.modeling.tune import get_tuner_context
from snowflake.ml.modeling import tune
from entities import search_algorithm

# Define dataset map
dataset_map = {
    "x_train": DataConnector.from_dataframe(X_train),
    "y_train": DataConnector.from_dataframe(y_train),
    "x_test": DataConnector.from_dataframe(X_test),
    "y_test": DataConnector.from_dataframe(y_test)
    }


# Define a training function, with any models you choose within it.
def train_func():
    # A context object provided by HPO API to expose data for the current HPO trial
    tuner_context = get_tuner_context()
    config = tuner_context.get_hyper_params()
    dm = tuner_context.get_dataset_map()

    model = XGBClassifier(**config, random_state=42)
    model.fit(dm["x_train"].to_pandas().sort_index(), dm["y_train"].to_pandas().sort_index())
    f1_metric = f1_score(
        dm["y_train"].to_pandas().sort_index(), model.predict(dm["x_train"].to_pandas().sort_index())
    )
    tuner_context.report(metrics={"f1_score": f1_metric}, model=model)

tuner = tune.Tuner(
    train_func=train_func,
    search_space={
        "max_depth": tune.randint(1, 20),
        "learning_rate": tune.uniform(0.01, 0.1),
        "n_estimators": tune.randint(50, 100),
    },
    tuner_config=tune.TunerConfig(
        metric="f1_score",
        mode="max",
        search_alg=search_algorithm.RandomSearch(random_state=101),
        num_trials=8, #run 8 trial runs
        max_concurrent_trials=4, #use 4 gpus concurrently
    ),
)

Now, we can run the Tuner and get the best model.

In [None]:
# Snowflake HPO in PrPr is single node, ensure ray context only knows of head node.
tuner_results = tuner.run(dataset_map=dataset_map)

In [None]:
tuned_model = tuner_results.best_model
tuned_model

Let's compute the train and test metrics.

In [None]:
#Generate predictions
xgb_opt_preds = tuned_model.predict(train_pd.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"],axis=1))

#Generate performance metrics
f1_opt_train = round(f1_score(train_pd.MORTGAGERESPONSE, xgb_opt_preds),4)
precision_opt_train = round(precision_score(train_pd.MORTGAGERESPONSE, xgb_opt_preds),4)
recall_opt_train = round(recall_score(train_pd.MORTGAGERESPONSE, xgb_opt_preds),4)

print(f'Train Results: \nF1: {f1_opt_train} \nPrecision {precision_opt_train} \nRecall: {recall_opt_train}')

In [None]:
#Generate test predictions
xgb_opt_preds_test = tuned_model.predict(test_pd.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"],axis=1))

#Generate performance metrics on test data
f1_opt_test = round(f1_score(test_pd.MORTGAGERESPONSE, xgb_opt_preds_test),4)
precision_opt_test = round(precision_score(test_pd.MORTGAGERESPONSE, xgb_opt_preds_test),4)
recall_opt_test = round(recall_score(test_pd.MORTGAGERESPONSE, xgb_opt_preds_test),4)

print(f'Test Results: \nF1: {f1_opt_test} \nPrecision {precision_opt_test} \nRecall: {recall_opt_test}')

We see the HPO model has a more modest train accuracy than our base model, but the peformance doesn't drop off during testing.

In [None]:
#Log the optimized model to the model registry
optimized_version_name = 'XGB_Optimized'

try:
    mv_opt = model_registry.get_model(model_name).version(optimized_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_opt = model_registry.log_model(
        model_name=model_name,
        model=tuned_model, 
        version_name=optimized_version_name,
        sample_input_data = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).limit(100),
        comment = "snow ml model built off feature store using HPO model",
        options = {'relax_version': True},
        target_platforms={"WAREHOUSE"}
    )
    mv_opt.set_metric(metric_name="Train_F1_Score", value=f1_opt_train)
    mv_opt.set_metric(metric_name="Train_Precision_Score", value=precision_opt_train)
    mv_opt.set_metric(metric_name="Train_Recall_score", value=recall_opt_train)

    mv_opt.set_metric(metric_name="Test_F1_Score", value=f1_opt_test)
    mv_opt.set_metric(metric_name="Test_Precision_Score", value=precision_opt_test)
    mv_opt.set_metric(metric_name="Test_Recall_score", value=recall_opt_test)

Let's check what our default model version is in our Model Registry.

In [None]:
# Here we see the BASE version is our default version
model_registry.get_model(model_name).default

Let's make this optimized model the default version instead and also set it as our `PROD` version using tags.

In [None]:
# Now we'll set the optimized model to be the default model version going forward
model_registry.get_model(model_name).default = optimized_version_name

In [None]:
# Now we see our optimized version we have now recently promoted to our DEFAULT model version
model_registry.get_model(model_name).default

In [None]:
# We'll now update the PROD tagged model to be the optimized model version 
# rather than our overfit base version
m.unset_tag("DEV")
m.set_tag("PROD", optimized_version_name)
m.show_tags()

### Now that we've deployed some model versions and tested inference...

### Let's explain our models!
- ### Snowflake offers [built in explainability capabilities](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/model-explainability) on top of models logged to the model registry
- ### In the below section we'll generate shapley values using these built in functions to understand how input features impact our model's behavior

In [None]:
# Create a sample of 1000 records
test_pd_sample = test_pd.rename(columns=rename_dict).sample(n=1000, random_state = 100).reset_index(drop=True)

# Compute shapley values for each model
base_shap_pd = mv_base.run(test_pd_sample, function_name="explain")
opt_shap_pd = mv_opt.run(test_pd_sample, function_name="explain")

We can take a look at the summary plot to understand how important each feature is to the prediction.

In [None]:
import shap 

shap.summary_plot(np.array(base_shap_pd.astype(float)), 
                  test_pd_sample.drop(["LOAN_ID","MORTGAGERESPONSE", "TIMESTAMP"], axis=1), 
                  feature_names = test_pd_sample.drop(["LOAN_ID","MORTGAGERESPONSE", "TIMESTAMP"], axis=1).columns)

In [None]:
# Merge shap vals and actual vals together for easier plotting below
all_shap_base = test_pd_sample.merge(base_shap_pd, right_index=True, left_index=True, how='outer')
all_shap_opt = test_pd_sample.merge(opt_shap_pd, right_index=True, left_index=True, how='outer')

We can also create plots to dive deeper into the feature influence in our 2 logged models.

We will create 2 plot functions and then apply them to the features we want to look at.

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

def plot_feature_influence_by_model(data, X, Y, title):
    # Set up the figure
    fig, axes = plt.subplots(1, 2, figsize=(10, 6))
    fig.suptitle(title)
    
    # Plot side-by-side boxplots
    sns.scatterplot(data = data, x = X, y = Y, ax=axes[0])
    sns.regplot(data = data, x = X, y = Y, scatter=False, color='red', line_kws={"lw":2},ci =100, lowess=False, ax =axes[0])
    
    axes[0].set_title('Base Model')
    sns.scatterplot(data = data, x = X, y = Y, color = "orange", ax = axes[1])
    sns.regplot(data = data, x = X, y = Y, scatter=False, color='blue', line_kws={"lw":2},ci =100, lowess=False, ax =axes[1])
    axes[1].set_title('Opt Model')
    
    # Customize and show the plot
    for ax in axes:
        ax.set_xlabel("Income")
        ax.set_ylabel("Influence")
    plt.tight_layout()
    plt.show()

def plot_boxplot_by_model(data, X, Y, title):
    # Set up the figure
    fig, axes = plt.subplots(1, 2, figsize=(10, 6))
    fig.suptitle(title)
    
    # Plot side-by-side boxplots
    sns.boxplot(data = data, x = X, y = Y,
                hue=X, width=0.8, ax=axes[0])
    axes[0].set_title('Base Model')
    sns.boxplot(data = data, x = X, y = Y,
                hue=X, width=0.4, ax = axes[1])
    axes[1].set_title('Opt Model')
    
    # Customize and show the plot
    for ax in axes:
        ax.set_xlabel("(1 = True)")
        ax.set_ylabel("Influence")
        ax.legend(loc='upper right')
    
    plt.show()


In [None]:
# Income

# Filter data down to strip outliers
asb_filtered = all_shap_base[(all_shap_base.INCOME>0) & (all_shap_base.INCOME<250000)]
aso_filtered = all_shap_opt[(all_shap_opt.INCOME>0) & (all_shap_opt.INCOME<250000)]

plot_feature_influence_by_model(asb_filtered, 'INCOME', 'INCOME_explanation', 
                                "INCOME EXPLANATION")

# Loan Amount

# Filter data down to strip outliers
asb_filtered = all_shap_base[all_shap_base.LOAN_AMOUNT<2000000]
aso_filtered = all_shap_opt[all_shap_opt.LOAN_AMOUNT<2000000]

plot_feature_influence_by_model(asb_filtered, 'LOAN_AMOUNT', 'LOAN_AMOUNT_explanation', 
                                "LOAN_AMOUNT EXPLANATION")

In [None]:
plot_boxplot_by_model(all_shap_base, 
                      "LOAN_PURPOSE_NAME_HOME_PURCHASE",
                      "LOAN_PURPOSE_NAME_HOME_PURCHASE_explanation",
                      "HOME PURCHASE LOAN EXPLANATION")

plot_boxplot_by_model(all_shap_base, 
                      "LOAN_PURPOSE_NAME_HOME_IMPROVEMENT",
                      "LOAN_PURPOSE_NAME_HOME_IMPROVEMENT_explanation",
                      "HOME IMPROVEMENT LOAN EXPLANATION")

# Model Observability (Model Monitoring)

[ML Observability](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/model-observability) allows you to track the quality of production models you have deployed via the Snowflake Model Registry across multiple dimensions, such as performance, drift, and volume.

Let's first save our train and test data as Snowflake tables and create a new stage for the next few steps.

In [None]:
train.write.save_as_table(f"DEMO_MORTGAGE_LENDING_TRAIN_{VERSION_NUM}", mode="overwrite")
test.write.save_as_table(f"DEMO_MORTGAGE_LENDING_TEST_{VERSION_NUM}", mode="overwrite")

In [None]:
session.sql("CREATE STAGE IF NOT EXISTS ML_STAGE").collect()

We can create stored procedures for running model inference. These procedures can be scheduled or orchestrated to run continuous inference.

In [None]:
from snowflake import snowpark
from snowflake.ml.registry import Registry
import joblib
import os
import logging
from snowflake.ml.modeling.pipeline import Pipeline
import snowflake.ml.modeling.preprocessing as pp
from snowflake.snowpark.types import StringType, IntegerType
import snowflake.snowpark.functions as F


def demo_inference_sproc(session: snowpark.Session, table_name: str, modelname: str, modelversion: str) -> str:
    database=session.get_current_database()
    schema=session.get_current_schema()
    reg = Registry(session=session)
    m = reg.get_model(model_name)  # Fetch the model using the registry
    mv = m.version(modelversion)
    
    input_table_name=table_name
    pred_col = f'{modelversion}_PREDICTION'

    # Read the input table to a dataframe
    df = session.table(input_table_name)
    # 'results' is the output DataFrame with predictions
    results = mv.run(df, function_name="predict").select("LOAN_ID",'"output_feature_0"').withColumnRenamed('"output_feature_0"', pred_col)

    final = df.join(results, on="LOAN_ID", how="full")
    
    # Write results back to Snowflake table
    final.write.save_as_table(table_name, mode='overwrite',enable_schema_evolution=True)

    return "Success"

# Register the stored procedure
session.sproc.register(
    func=demo_inference_sproc,
    name="model_inference_sproc",
    replace=True,
    is_permanent=True,
    stage_location="@ML_STAGE",
    packages=['joblib', 'snowflake-snowpark-python', 'snowflake-ml-python'],
    return_type=StringType()
)


We'll call our stored procedure 4 times which will execute inference using our 2 models (base and HPO optimized) on our train and test data.

In [None]:
CALL model_inference_sproc(
    'DEMO_MORTGAGE_LENDING_TRAIN_{{VERSION_NUM}}',
    '{{model_name}}', 
    '{{base_version_name}}');

In [None]:
CALL model_inference_sproc(
    'DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}}',
    '{{model_name}}', 
    '{{base_version_name}}');

In [None]:
CALL model_inference_sproc(
    'DEMO_MORTGAGE_LENDING_TRAIN_{{VERSION_NUM}}',
    '{{model_name}}', 
    '{{optimized_version_name}}');

In [None]:
CALL model_inference_sproc(
    'DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}}',
    '{{model_name}}', 
    '{{optimized_version_name}}');

We can take a look at our predictions now.

In [None]:
SELECT * FROM DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}} LIMIT 10

### Create Model Monitors

We'll now create model monitors for both the base and HPO optimized models.

In [None]:
CREATE OR REPLACE MODEL MONITOR MORTGAGE_LENDING_BASE_MODEL_MONITOR
WITH
    MODEL={{model_name}}
    VERSION={{base_version_name}}
    FUNCTION=predict
    SOURCE=DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}}
    BASELINE=DEMO_MORTGAGE_LENDING_TRAIN_{{VERSION_NUM}}
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(XGB_BASE_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(MORTGAGERESPONSE)
    ID_COLUMNS=(LOAN_ID)
    WAREHOUSE={{session.get_current_warehouse()}}
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

In [None]:
CREATE OR REPLACE MODEL MONITOR MORTGAGE_LENDING_OPTIMIZED_MODEL_MONITOR
WITH
    MODEL={{model_name}}
    VERSION={{optimized_version_name}}
    FUNCTION=predict
    SOURCE=DEMO_MORTGAGE_LENDING_TEST_{{VERSION_NUM}}
    BASELINE=DEMO_MORTGAGE_LENDING_TRAIN_{{VERSION_NUM}}
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(XGB_OPTIMIZED_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(MORTGAGERESPONSE)
    ID_COLUMNS=(LOAN_ID)
    WAREHOUSE={{session.get_current_warehouse()}}
    REFRESH_INTERVAL='1 min'
    AGGREGATION_WINDOW='1 day';

We can also compute the prediction drift using the Model Monitor.

In [None]:
SELECT * FROM TABLE(MODEL_MONITOR_DRIFT_METRIC(
'MORTGAGE_LENDING_OPTIMIZED_MODEL_MONITOR', 'DIFFERENCE_OF_MEANS', 'XGB_BASE_PREDICTION', '1 DAY', TO_TIMESTAMP_TZ('2024-06-01'), TO_TIMESTAMP_TZ('2024-09-01')))

# Conclusion

#### 🛠️ Snowflake Feature Store tracks feature definitions and maintains lineage of sources and destinations 🛠️
#### 🚀 Snowflake Model Registry gives users a secure and flexible framework to deploy track and monitor models 🚀
#### 🔮 All model versions logged in the Model Registry can be accessed for inference, explainability, lineage tracking, visibility and more 🔮


# [Optional] Distributed model training & deployment

For demonstrations sake, below we have an example doing distributed model training using Snowflake's [distributed modeling classes](https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/modeling_distributors).

Snowflake will set up a ray cluster on all available nodes in your compute pool (CPU or GPU) and execute the distributed training job

For the first example, we will run single node, multi-GPU distributed XGBoost training.

We can go ahead and specify a scaling config (using `XGBScalingConfig`), define the distributed xgb estimator (`XGBEstimator`), and run the fit to kick of the training process.

In [None]:
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector

# Use Snowflake's DataConnector to efficiently connect Snowflake data to Ray
dc = DataConnector.from_dataframe(train)

# Set up the scaling configuration for multi-node and multi-GPU usage
scaling_config = XGBScalingConfig(
    num_workers=-1,            # Use all available workers
    num_cpu_per_worker=-1,     # Use all available CPU cores per worker
    use_gpu=True               # Enable GPU for training, resources_per_worker={"GPU": 1}
)

# Define distributed xgb estimator
dist_gpu_xgb = XGBEstimator(
    params = {"booster": "gbtree",
              "n_estimators":10,},
    scaling_config = scaling_config)

dist_gpu_xgb.fit(dc,
                 input_cols = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).columns,
                 label_col = "MORTGAGERESPONSE")

We will need to extract the xgb booster object from the Snowflake optimized XGB model.


In [None]:
# Extract xgb booster object from Snowflake optimized XGB model
gpu_booster = dist_gpu_xgb.get_booster()

Now we can log the distributed xgb model.

In [None]:
# Log the distributed model to the model registry
dist_version_name = 'XGB_GPU_DIST'

try:
    mv_base = model_registry.get_model(model_name).version(dist_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_base = model_registry.log_model(
        model_name=model_name,
        model=gpu_booster,
        version_name=dist_version_name,
        sample_input_data = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).limit(100), #using snowpark df to maintain lineage
        comment = """Distributed ML model for predicting loan approval likelihood.""",
        options={'relax_version': True},
        target_platforms={"WAREHOUSE", "SNOWPARK_CONTAINER_SERVICES"}
    )

Now, we will scale our resources to run multi-node, multi-GPU distributed XGBoost training.

To see the available GPU resources, we will create a function.


In [None]:
import ray
import pprint

def _format_resources(resources):
    """Convert memory fields to GB and filter out internal node tags."""
    formatted = {}
    for k, v in resources.items():
        # Skip internal node identifiers
        if k.startswith("node:"):
            continue
        if k in {"memory", "object_store_memory"}:
            formatted[k] = f"{v / (1024 ** 3):.2f} GB"
        else:
            formatted[k] = v
    return formatted

def show_ray_cluster_resources():
    """Nicely formatted cluster-wide and node-level resource info from Ray."""
    print(" Cluster Resources:")
    cluster = _format_resources(ray.cluster_resources())
    pprint.pprint(cluster, sort_dicts=True, width=100)

    print("\n Node-Level Resources:")
    for node in ray.nodes():
        print(f"\nNode: {node['NodeManagerAddress']}")
        node_resources = _format_resources(node["Resources"])
        pprint.pprint(node_resources, sort_dicts=True, width=100)

As expected, we're only seeing a single node right now.

In [None]:
show_ray_cluster_resources()

Let's scale our resources to use all 4 nodes in our cluster using Snowflake ML's `scale_cluster` function.

In [None]:
# Scale to max compute pool nodes
num_nodes = 4

# Suppress SIGTERM ray warnings 
logging.basicConfig(level=logging.ERROR)

# Use full path name
nb_name = "E2E_ML_NOTEBOOK"
scale_cluster(expected_cluster_size=num_nodes, 
              notebook_name=nb_name)

We should see details for all 4 nodes now.

In [None]:
# Show number of nodes after changing cluster manager settings
show_ray_cluster_resources()

Now, we're ready to do multi-node, multi-GPU training.

In [None]:
# Quiet messages from Ray
context = ray.data.DataContext().get_current() 
context.execution_options.verbose_progress = False
# context.enable_operator_progress_bars = False
# context.enable_progress_bars = False

In [None]:
from snowflake.ml.modeling.distributors.xgboost.xgboost_estimator import XGBEstimator, XGBScalingConfig
from snowflake.ml.data.data_connector import DataConnector

# Use Snowflake's DataConnector to efficiently connect Snowflake data to Ray
dc = DataConnector.from_dataframe(train)

# Set up the scaling configuration for multi-node and multi-GPU usage
scaling_config = XGBScalingConfig(
    num_workers=-1,            # Use all available workers
    num_cpu_per_worker=-1,     # Use all available CPU cores per worker
    use_gpu=True               # Enable GPU for training, resources_per_worker={"GPU": 1}
)

# Define XGBoost hyperparameters for GPU
hyp_params = {
    "booster": "gbtree",               # Standard boosting model
    "tree_method": "gpu_hist",         # Enables NV_GPU Histogram
    "predictor": "auto",               # Uses GPU for prediction
    "n_estimators": 100,               # Number of trees to train
    "max_depth": 0,                    # Maximum tree depth
    "grow_policy": "lossguide",        # Enables better GPU scalability by growing trees leaf-wise (instead of depth-wise)
    "max_leaves": 512,                 # Limits tree complexity; balances model capacity and memory usage when using 'lossguide'
    "learning_rate": 0.1,              # Step size for each tree
    "subsample": 0.8,                  # Fraction of samples used for each tree
    "colsample_bytree": 0.8            # Fraction of features used for each tree
}

# Define distributed xgb estimator
multi_node_gpu_xgb = XGBEstimator(
    params = hyp_params,
    scaling_config = scaling_config)

multi_node_gpu_xgb.fit(dc,
                 input_cols = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).columns,
                 label_col = "MORTGAGERESPONSE")

As before, let's log this model.

In [None]:
# Extract xgb booster object from Snowflake optimized XGB model
gpu_booster = multi_node_gpu_xgb.get_booster()

# Log the distributed model to the model registry
dist_version_name = 'XGB_MULTI_NODE_GPU_DIST'

try:
    mv_base = model_registry.get_model(model_name).version(dist_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_base = model_registry.log_model(
        model_name=model_name,
        model=gpu_booster,
        version_name=dist_version_name,
        sample_input_data = train.drop(["TIMESTAMP", "LOAN_ID", "MORTGAGERESPONSE"]).limit(100), #using snowpark df to maintain lineage
        comment = """Distributed ML model for predicting loan approval likelihood.""",
        options={'relax_version': True},
        target_platforms={"SNOWPARK_CONTAINER_SERVICES"}
    )

In [None]:
SHOW VERSIONS IN MODEL {{model_name}}

We can take a look at all our models now.

In [None]:
model_registry.show_models()

Since we don't need all nodes anymore, let's scale down our cluster now.

In [None]:
# Scale down 
num_nodes = 1

# Suppress SIGTERM ray warnings 
logging.basicConfig(level=logging.ERROR)

# Use full path
nb_name = "E2E_ML_NOTEBOOK"
scale_cluster(expected_cluster_size=num_nodes, 
              notebook_name=nb_name)

We can see that 3 of the nodes are empty now, which tells us they're not longer being considered as available.

In [None]:
# Show number of nodes after changing cluster manager settings
show_ray_cluster_resources()

## Model Deployment to Snowpark Container Services (SPCS)

Here, we also show how to deploy a model to Snowpark Container Services as a long running service using [Model Serving](https://docs.snowflake.com/en/developer-guide/snowflake-ml/model-registry/container).

This is useful to run GPU based inference, run very large models that do not fit in the Warehouse memory, or when you have additional package dependencies that are not met in the Warehouse. You can also create REST API endpoints for running inference using external HTTP requests. 

Let's first define the variables we'll use to create the deployment service.

In [None]:
image_repo_name = "my_inference_images"
# cp_name = "CP_GPU_NV_S_1_4"

cp_name = "CP_GPU_NV_S_4"
num_spcs_nodes = '2'
service_name = 'MORTGAGE_LENDING_PREDICTION_SERVICE'

current_database = session.get_current_database().replace('"', '')
current_schema = session.get_current_schema().replace('"', '')
extended_image_repo_name = f"{current_database}.DEFAULT_SCHEMA.{image_repo_name}"
extended_service_name = f'{current_database}.DEFAULT_SCHEMA.{service_name}'

In [None]:
DROP SERVICE IF EXISTS {{service_name}};

Note, we're creating a service based on the model we just logged to the Model Registry: `mv_base`

You can also specify pip_requirements if your model has pip dependencies. Here we are selecting `ingress_enabled = True` to also create a REST API endpoint for our model.

In [None]:
mv_base.create_service(
    service_name=extended_service_name,
    service_compute_pool=cp_name,
    image_repo=extended_image_repo_name,
    ingress_enabled=True,
    max_instances=int(num_spcs_nodes),
    build_external_access_integration="ALLOW_ALL_INTEGRATION"
)

Now, we can check that the service is created.

In [None]:
SHOW SERVICES;

In [None]:
# mv_base = model_registry.get_model(f"MORTGAGE_LENDING_MLOPS_{VERSION_NUM}").version("XGB_GPU_DIST")
mv_base = model_registry.get_model(f"MORTGAGE_LENDING_MLOPS_{VERSION_NUM}").version("XGB_MULTI_NODE_GPU_DIST")
mv_base.list_services()

Now, we can run predictions on test data using this deployed SPCS Model Service.

In [None]:
mv_base.run(test, 
            function_name = "PREDICT", 
            service_name = "DEFAULT_SCHEMA.MORTGAGE_LENDING_PREDICTION_SERVICE")

Since we created a REST API above, this service will run continuously. It is a good idea to drop or suspend the service if you do not need it. Compute pool will automatically suspend if no service is running.

In [None]:
DROP SERVICE IF EXISTS {{service_name}};