# End-to-end Snowflake ML workflow for loan lending prediction

# __Featured Snowflake ML Capabilities__ 

- __`Distributed ML Processing`__: `Snowflake Container Runtime for ML` with the `Ray` framework to process and train models efficiently across multiple compute nodes.
- __`Data Ingestion & Persistance`__: `Snowflake ML Data Loading APIs` including
    - `DataSource APIs` for ingesting staged data (CSV, Parquet, unstructured),
    - `DataConnector` for loading structured data from Snowflake tables and Snowflake datasets in ML workflows,
    - `DataSink` for writing transformed data, ML predictions and inference results back to Snowflake tables.
- __`Feature Engineering`__: centralized `Feature Store` with registered entities and feature views; reusable `Snowflake Dataset` generation directly from the feature store for training and inference consistency.
- __`Model Development & Experimentation`__: `Snowflake ML APIs` for training and tuning models, including hyperparameter optimization, experiment tracking, and artifacts logging via Snowflake's native `Experiment Manager`.
- __`Model Registry & Lifecycle Management`__: Model registration in the `Snowflake Model Registry` with full versioning, lifecycle control, and support for batch and distributed inference using Python, SQL, or REST API endpoints.
- __`Model Explainability`__: Feature impact evaluation using `Model Registry explainability` powered by `Shapley values`, enabling transparent assessment of individual feature contributions to loan lending prediction.
- __`ML Observability`__: Monitor deployed models for performance, data drift, and prediction volume using stored inference data and Snowflake's ML observability capabilities. 
- __`Model Serving`__: Deploy and serve models in production using Snowpark Container Services (SPCS) for scalable, low-latency inference. 

```sql
USE ROLE ACCOUNTADMIN;
SET USERNAME = (SELECT CURRENT_USER());
CREATE OR REPLACE ROLE MLOPS_ROLE;

-- Grant necessary permissions to create databases, compute pools, and service endpoints to new role
GRANT CREATE DATABASE on ACCOUNT to ROLE MLOPS_ROLE; 
GRANT CREATE COMPUTE POOL on ACCOUNT to ROLE MLOPS_ROLE;
GRANT CREATE WAREHOUSE ON ACCOUNT to ROLE MLOPS_ROLE;
GRANT BIND SERVICE ENDPOINT on ACCOUNT to ROLE MLOPS_ROLE;

-- grant new role to user and switch to that role
GRANT ROLE MLOPS_ROLE to USER identifier($USERNAME);
USE ROLE MLOPS_ROLE;

-- Create warehouse
CREATE OR REPLACE WAREHOUSE MLOPS_WH WITH WAREHOUSE_SIZE='SMALL';

-- Create Database 
CREATE OR REPLACE DATABASE MLOPS_DB;

-- Create Schema
CREATE OR REPLACE SCHEMA MLOPS_SCHEMA;

-- Create compute pool for Snowpark Container Services
CREATE COMPUTE POOL IF NOT EXISTS MLOPS_COMPUTE_POOL 
 MIN_NODES = 1
 MAX_NODES = 1
 INSTANCE_FAMILY = CPU_X64_M;

-- Describe compute pool
DESCRIBE COMPUTE POOL MLOPS_COMPUTE_POOL;

-- Create notebook 
CREATE OR REPLACE NOTEBOOK MLOPS_DB.MLOPS_SCHEMA.MLOPS_NB 
MAIN_FILE = 'Snowflake_ML_Loans.ipynb' 
QUERY_WAREHOUSE = MLOPS_WH
RUNTIME_NAME = 'SYSTEM$BASIC_RUNTIME' --uses Snowpark Container Services (SPCS) infrastructure running on a compute pool
COMPUTE_POOL = 'MLOPS_COMPUTE_POOL'
IDLE_AUTO_SHUTDOWN_TIME_SECONDS = 3600;

CREATE STAGE IF NOT EXISTS MLOPS_DB.MLOPS_SCHEMA.MLOPS_STAGE
  DIRECTORY = (ENABLE = TRUE)
  FILE_FORMAT = (TYPE = 'CSV');  
```

### Initialize environment


In [None]:
!pip freeze | grep snowflake

In [None]:
# SHAP (SHapley Additive exPlanations) 
# a unified approach to explain the output of any machine learning model.

!pip install shap

In [None]:
import warnings

warnings.filterwarnings(
    "ignore"   
)

In [None]:
VERSION_NUM = '1'
DB = "MLOPS_DB" 
SCHEMA = "MLOPS_SCHEMA" 
COMPUTE_WAREHOUSE = "MLOPS_WH" 
ROLE = "MLOPS_ROLE"
ORG_NAME = "xxxxxxx"
ACCOUNT_NAME = "xxxxxxxx"

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# Optimized Data Loading with Snowflake ML DataSource & DataSink APIs 

- Raw loan application data is uploaded to a Snowflake internal stage using the Snowsight UI

- CSV (*.csv) files are ingested from the stage using Snowflake ML DataSource APIs

- Ray is used to parallelize data loading and preprocessing across multiple compute nodes, improving throughput and scalability

- Ray datasets are written back to Snowflake using DataSink for downstream feature engineering and model training

In [None]:
import ray

context = ray.data.DataContext.get_current()
context.execution_options.verbose_progress = False
context.enable_operator_progress_bars = False
context.enable_progress_bars = False

In [None]:
from snowflake.ml.ray.datasource import SFStageCSVDataSource

file_name = "*.csv"
stage_name = "@MLOPS_STAGE"

data_source = SFStageCSVDataSource(
    stage_location=stage_name,
    file_pattern=file_name
)

# Build Ray dataset from provided datasources
ray_ds = ray.data.read_datasource(data_source)


The `ray.data.read_datasource()` function returns a `ray.data.Dataset`, a lazily evaluated, distributed dataset designed to scale beyond the memory of a single node. When ingesting data from Snowflake, the dataset is internally represented as __Apache Arrow tables__, enabling efficient, columnar data transfer. These Arrow tables are transparently converted to the required Python formats — such as __Pandas DataFrames__ or __NumPy arrays__ — during downstream processing or iteration. 

In [None]:

ray_ds.show(3)

In [None]:
from snowflake.ml.ray.datasink import SnowflakeTableDatasink

datasink = SnowflakeTableDatasink(
    table_name="MLOPS_LOANS",
    database="MLOPS_DB",
    schema="MLOPS_SCHEMA",
    auto_create_table=True, 
    override=True 
)

In [None]:
# Write to Snowflake distributedly
ray_ds.write_datasink(datasink)

In [None]:
show tables;

In [None]:
sdf = session.table("MLOPS_LOANS")
sdf.show(5)

# Exploratory Data Analysis

In [None]:
sdf.columns

In [None]:
sdf.count()

In [None]:
from snowflake.snowpark.functions import *

Inspect target label distribution (LOAN_RESPONSE).

In [None]:
sdf.select(col("LOAN_RESPONSE")) \
   .group_by(col("LOAN_RESPONSE")) \
   .count() \
   .show()

# Feature Engineering



In [None]:
from snowflake.snowpark.functions import call_function

# 1. Convert TS → timestamp (MM:SS.FF3)
sdf = sdf.with_column(
    "TS_TIMESTAMP",
    to_timestamp(
        concat(current_date(), lit(" "), col("TS")),
        "YYYY-MM-DD MI:SS.FF3"
    )
)

# 2. Max timestamp
max_ts_sdf = sdf.select(max(col("TS_TIMESTAMP")).alias("MAX_TS"))

# 3. Timedelta using SQL TIMESTAMPDIFF
delta_sdf = max_ts_sdf.select(
    call_function("timestampdiff",
                  lit("day"),
                  col("MAX_TS"),
                  current_timestamp()
                 ).alias("DELTA")
)

# Extract integer and subtract 1
delta = delta_sdf.collect()[0]["DELTA"]
time_shift = delta - 1

# 4. Shift timestamps
sdf = sdf.with_column(
    "TIMESTAMP",
    dateadd("day", lit(time_shift), col("TS_TIMESTAMP"))
)

# 5. Min/max shifted timestamps
result = sdf.select(
    min(col("TIMESTAMP")).alias("MIN_SHIFTED"),
    max(col("TIMESTAMP")).alias("MAX_SHIFTED")
)

result.show()


In [None]:
sdf.columns

In [None]:
from snowflake.snowpark.types import DecimalType, IntegerType
from snowflake.snowpark import Window

sdf_features = sdf \
    .select("LOAN_ID", "TIMESTAMP",           
           "LOAN_AMOUNT", "APPLICANT_INCOME", "COUNTY") \
    .with_column("LOAN_AMOUNT", col("LOAN_AMOUNT")*1000) \
    .with_column("INCOME", col("APPLICANT_INCOME")*1000) \
    .with_column("INCOME_LOAN_RATIO",(col("INCOME") / col("LOAN_AMOUNT")).cast(DecimalType(18,6)))
    
county_window_spec = Window.partition_by("COUNTY")    
sdf_features = sdf_features \
    .with_column("MEAN_COUNTY_INCOME", avg("INCOME").over(county_window_spec).cast(DecimalType(18,2))) \
    .with_column("HIGH_INCOME_FLAG", (col("INCOME")>col("MEAN_COUNTY_INCOME")).cast(IntegerType())) \
    .with_column("AVG_DAILY_LOAN_AMOUNT",
    sql_expr("""        
            AVG(LOAN_AMOUNT) OVER (
                PARTITION BY COUNTY 
                ORDER BY TIMESTAMP
                RANGE BETWEEN INTERVAL '1 DAY' PRECEDING AND CURRENT ROW
            )       
    """).cast(DecimalType(18, 2)))

In [None]:
sdf_features.show(3)

# Feature Store Overview

A __`feature store`__ centralizes commonly used feature definitions and transformations, enabling feature reuse, reducing duplication, and improving ML team productivity. It ensures that features remain __consistent, up-to-date, and production-ready__, providing a single source of truth for both training and inference workflows.

## Feature Store Concepts

In Snowflake, a feature store is implemented as a __schema__. Can either create a dedicated schema for feature storage or reuse an existing one. All feature store objects are native Snowflake objects and are governed by Snowflake’s access control and security model.

A Snowflake feature store consists of the following core components:

__`Entities`__

Entities represent real-world objects or concepts. They:

- Organize feature views by subject area

- Define the __join keys__ used to link features back to source data during training and inference

__`Feature Views`__

Feature views define and manage groups of related features. They:

- Provide a structured, reusable definition of feature transformations

- Refresh all contained features on a common schedule

- Are backed by a __feature table__ (typically a Snowflake-managed __dynamic table__)

- Support incremental updates to efficiently process new source data

__`Datasets`__

Datasets are curated collections of features assembled from one or more feature views for __model training or inference__, ensuring consistency across the ML lifecycle.

## Feature Store Object Mapping

| Feature Store Object | Snowflake Object |
| :------- | :------- |
| Feature store | Schema |
| Feature view | Dynamic table or view |
| Entity | Tag on dynamic table or view |
| Feature | Column in a dynamic table or in a view |


In [None]:
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity, CreationMode

fs = FeatureStore(
    session = session,
    database = DB,
    name = SCHEMA, 
    default_warehouse=COMPUTE_WAREHOUSE,
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST # create new schema for feature store if not exists
)

In [None]:
# retrieve an existing entity definition, if not define a new one and register it in the feature store

try:
    # retrieve existing entity
    loan_entity = fs.get_entity("LOAN_ENTITY")
    print("Retrieved existing entity")
except:
    # define new entity
    loan_entity = Entity(
        name = "LOAN_ENTITY",
        join_keys = ["LOAN_ID"],
        desc = "Features defined on a per loan level"
    )
    #register
    fs.register_entity(loan_entity)
    print("Registered new entity")

In [None]:
# Snowpark DataFrame that contains the feature transformations used by feature view constructor
feature_df = sdf_features.drop("APPLICANT_INCOME", "COUNTY")
feature_df.columns

## Defining and Registering Feature Views

A __`feature view`__ encapsulates a Python- or SQL-based transformation pipeline that converts raw source data into one or more related features. All features within a feature view are refreshed simultaneously from the underlying source data, ensuring consistency across training and inference.

The __`snowflake.ml.feature_store.FeatureView`__ class provides the Python API for defining feature views. The constructor accepts a __Snowpark DataFrame__ containing the feature generation logic and must include:

- The __join key columns__ defined by the associated entities

- A __timestamp column__ when time-series features are present

Once defined, a feature view can be registered in the feature store using the `register_feature_view` method, specifying a custom name, version, and refresh configuration. After registration:

Feature extraction is __incrementally maintained__

Features are __automatically refreshed__ according to the configured schedule

Feature view definitions are __immutable after registration__, ensuring reproducible and consistent feature computation throughout the feature view’s lifecycle.

In [None]:
# define and register feature view
loan_fv = FeatureView(
    name = "Loan_Feature_View",
    entities = [loan_entity],
    feature_df = feature_df,        # features transformations Snowpark DataFrame
    timestamp_col = "TIMESTAMP",
    refresh_freq = "1 day"          # frequency of feature data refreshes
)

# add feature level descriptions to enhance feature view discoverability in Snowsight Universal Search
loan_fv = loan_fv.attach_feature_desc(
    {
        "LOAN_AMOUNT": "Loan amount in $USD",
        "INCOME": "Household income in $USD",
        "INCOME_LOAN_RATIO": "Ratio of LOAN_AMOUNT/INCOME",
        "MEAN_COUNTY_INCOME": "Average household income aggregated at county level",
        "HIGH_INCOME_FLAG": "Binary flag to indicate whether household income is higher than MEAN_COUNTY_INCOME",
        "AVG_DAILY_LOAN_AMOUNT": "Rolling daily average of LOAN_AMOUNT"
    }
)

loan_fv = fs.register_feature_view(loan_fv, version=VERSION_NUM, overwrite=True)

In [None]:
# Disable online feature serving 
# (save costs for latency-less sensitive latency machine learning inference workflows)
from snowflake.ml.feature_store.feature_view import OnlineConfig

fs.update_feature_view(
    name="Loan_Feature_View",
    version=VERSION_NUM,
    online_config=OnlineConfig(enable=False)
)

## Dataset Generation from the Feature Store

__`Snowflake Datasets`__ are deeply integrated into the Snowflake ML ecosystem, enabling a seamless __end-to-end model development and MLOps experience__ entirely within Snowflake. Datasets can be generated directly from feature store features using the __`FeatureStore.generate_dataset`__ API.

The `generate_dataset` API __always materializes__ the result, producing an immutable, file-based snapshot of the data. This design:

- Ensures __model reproducibility__

- Enables efficient ingestion for __large-scale or distributed training__

- Decouples model training from upstream feature refresh cycles

Generated Snowflake Datasets can be converted into __Snowpark DataFrames__ and passed to __Snowpark ML modeling APIs__ for training. Trained models can then be logged to the __Snowflake Model Registry__, automatically completing the __ML lineage graph__ that links:

- Source data

- Feature views

- Datasets

- Models

This end-to-end lineage provides __full traceability and governance across the entire ML lifecycle__.

In [None]:
ds = fs.generate_dataset(
    name=f"LOAN_DATASET_EXTENDED_FEATURES_{VERSION_NUM}",
    spine_df=sdf.select("LOAN_ID", "TIMESTAMP", "LOAN_PURPOSE", "LOAN_RESPONSE"), # DataFrame containing entity ID, timestamp, label, and additional columns from source data
    features=[loan_fv],
    spine_timestamp_col="TIMESTAMP",
    spine_label_cols=["LOAN_RESPONSE"]
)

loan_dataset = ds.read.to_snowpark_dataframe() # get a Snowpark DataFrame
loan_dataset.show(5)

In [None]:
loan_dataset.schema

# Dataset preprocessing for ML

In [None]:
import snowflake.ml.modeling.preprocessing as sfml
from snowflake.snowpark.types import StringType

# encode categorical columns to numeric columns
OHE_COLS = loan_dataset.select([col.name for col in loan_dataset.schema if col.datatype==StringType()]).columns

sfml_ohe = sfml.OneHotEncoder(input_cols=OHE_COLS, output_cols=OHE_COLS, drop_input_cols=True)
loan_dataset_ohe = sfml_ohe.fit(loan_dataset).transform(loan_dataset)

In [None]:
loan_dataset_ohe.columns

In [None]:
#Rename columns to avoid double nested quotes and white space chars
rename_dict = {}
for i in loan_dataset_ohe.columns:
    if '"' in i:
        rename_dict[i] = i.replace('"','').replace(' ', '_')

loan_dataset_ohe = loan_dataset_ohe.rename(rename_dict)


In [None]:
loan_dataset_ohe.select(col("LOAN_PURPOSE_REFINANCING")).distinct()

In [None]:
loan_dataset_ohe.show(3)

# Model Training and Evaluation

Train and evaluate a __baseline loan lending prediction model__ directly within a __`Snowflake Notebook`__ running on the __`Snowflake Container Runtime for ML`__.

Key aspects:

- Model training is executed inside Snowflake, eliminating data movement

- Distributed compute resources are leveraged for scalable experimentation

- A baseline model establishes a performance reference for further tuning and optimization

__Baseline Model__

Use __`Snowflake ML XGBClassifier`__, Snowflake’s native implementation of the __scikit-learn–compatible XGBoost classifier__, which integrates seamlessly with Snowpark DataFrames and the broader Snowflake ML ecosystem. This enables:

- Familiar scikit-learn-style APIs

- Efficient training on Snowflake-managed compute

- Native compatibility with Snowflake Model Registry and experiment tracking

In [None]:
loan_dataset_ohe = loan_dataset_ohe.na.drop()

train, test = loan_dataset_ohe.random_split(weights=[0.80, 0.20], seed=0)

# Split train data into X, y
train_pd = train.to_pandas()
X_train_pd, y_train_pd = train_pd.drop(["TIMESTAMP", "LOAN_ID", "LOAN_RESPONSE"],axis=1), train_pd["LOAN_RESPONSE"]

test_pd = test.to_pandas()
X_test_pd, y_test_pd = test_pd.drop(["TIMESTAMP", "LOAN_ID", "LOAN_RESPONSE"],axis=1), test_pd["LOAN_RESPONSE"]

In [None]:
X_train_pd.shape

In [None]:
X_test_pd.shape

In [None]:
y_train_pd.shape

In [None]:
# just as example of oss xgboost
import xgboost as xgb


xgb_base = xgb.XGBClassifier(
    max_depth=50,
    n_estimators=3,
    learning_rate = 0.75,
    booster = 'gbtree')

# Train model 
xgb_base.fit(X_train_pd, y_train_pd)

In [None]:
from snowflake.ml.modeling.xgboost import XGBClassifier


xgb_base = XGBClassifier(
    input_cols=X_train_pd.columns,
    label_cols=y_train_pd.name,
    objective='binary:logistic',
    max_depth=50,
    n_estimators=3,
    learning_rate = 0.75,
    booster = 'gbtree')

# Train model 
xgb_base.fit(train_pd)

In [None]:
from sklearn.metrics import roc_auc_score

train_preds_base = xgb_base.predict(X_train_pd)
train_auc_base = roc_auc_score(y_train_pd,train_preds_base["OUTPUT_LOAN_RESPONSE"])
print("Train AUC:", train_auc_base)

# Managing and Deploying Models with Snowflake Model Registry

After training a model, operationalizing it and running inference in Snowflake begins by __logging the model to the Snowflake Model Registry__.

The __`Snowflake Model Registry`__ manages machine learning models as __first-class, schema-level objects__, providing end-to-end lifecycle management from development to production.

### Key Capabilities

The Snowflake Model Registry enables you to:

- __Store and manage models__ with versioning, metrics, and rich metadata

- __Serve models and run distributed inference at scale__ using Python, SQL, or REST APIs

- __Manage model lifecycle transitions__ across dev, test, and production environments with flexible governance

- __Monitor model performance and data drift__ using Snowflake ML Observability

- __Secure model access__ using role-based access control (RBAC)

Once a model is logged, its methods can be invoked to perform inference directly within a __`Snowflake virtual warehouse`__ or served via __`Snowpark Container Services (SPCS)`__ for CPU- or GPU-based, low-latency inference.

### Model Registry Python APIs

The primary Python classes used to interact with the Snowflake Model Registry are:

`snowflake.ml.registry.Registry` – Manages models within a Snowflake schema

`snowflake.ml.model.Model` – Represents a registered model

`snowflake.ml.model.ModelVersion` – Represents a specific version of a model



In [None]:
from snowflake.ml.registry import Registry

model_name=f"LOAN_MLOPS_{VERSION_NUM}"

model_registry = Registry(
    session=session,
    database_name=DB,
    schema_name=SCHEMA,
    options={"enable_monitoring": True}
)


In [None]:
# Log a model by calling the registry’s log_model method. 
# log_model method serializes the model — a Python object — and creates a Snowflake model object from it.

base_version_name = 'XGB_BASE'

try:
    mv_base=model_registry.get_model(model_name).version(base_version_name)
    print("Found existing model version")
except:
    print("Logging new model version...")
    mv_base = model_registry.log_model(
        model_name=model_name,
        model=xgb_base,
        version_name=base_version_name,   
        sample_input_data=train.drop(["TIMESTAMP", "LOAN_ID", "LOAN_RESPONSE"]).limit(100), # using snowpark df to maintain lineage
        comment = """ML model for predicting loan approval likelihood.
                    This model was trained using XGBoost classifier.
                    Hyperparameters used were:
                    max_depth=50, n_estimators=3, learning_rate = 0.75, algorithm = gbtree.
                    """,
        target_platforms= ["WAREHOUSE", "SNOWPARK_CONTAINER_SERVICES"],
        options= {"enable_explainability": True}
    )

    mv_base.set_metric(metric_name="Train_ROC_AUC_Score", value=train_auc_base)

In [None]:
model_registry.show_models()

In [None]:
model_registry.get_model(model_name).show_versions()

In [None]:
print(mv_base.show_metrics())

In [None]:
mv_base.show_functions()

In [None]:
# create tag for PROD model
session.sql("CREATE OR REPLACE TAG PROD")



In [None]:
# apply prod tag
m=model_registry.get_model(model_name)
m.comment="Loan approval prediction models" # set model level comment
m.set_tag("PROD", base_version_name)

In [None]:
m.show_tags()

A __`ModelVersion`__ object represents a specific version of a registered model that can be executed. Its `run` method is used to invoke the model for inference, either in a Snowflake virtual warehouse or via a Snowpark Container Service, depending on the deployment configuration.

In [None]:
test_preds=mv_base.run(test, function_name="predict")

In [None]:
test_preds.columns

In [None]:
test_preds_pd = test_preds.select(["LOAN_RESPONSE", "OUTPUT_LOAN_RESPONSE"]).to_pandas()
test_auc = roc_auc_score(test_preds_pd["LOAN_RESPONSE"], test_preds_pd["OUTPUT_LOAN_RESPONSE"])
print("Test AUC:", test_auc)

mv_base.set_metric(metric_name="Test_ROC_AUC_Score", value=test_auc)

In [None]:
print(mv_base.show_metrics())

#{'Train_ROC_AUC_Score': 0.8255075926591446, 'Test_ROC_AUC_Score': 0.5966326378748072} - model is overfitting

# Hyperparameter Optimization (HPO) and Experiment Tracking

The __`Snowflake ML Hyperparameter Optimization (HPO) API`__ is a model-agnostic framework that enables efficient, parallelized hyperparameter tuning for machine learning models. `HPO runs` can be executed in a __Snowflake Notebook__ configured with the __Container Runtime on Snowpark Container Services (SPCS)__ and scaled across multiple compute nodes.

## Benefits of Snowflake HPO

- __Automatic distributed training__: Handles parallelization and resource allocation transparently

- __Framework-agnostic__: Works with Snowflake ML APIs or any open-source ML framework

- __Flexible search strategies__: Supports Bayesian optimization, random search, and both continuous and discrete sampling

- __Seamless Snowflake integration__: Efficient data ingestion via Snowflake Datasets or Snowpark DataFrames, with automatic ML lineage capture

## Typical HPO Workflow

1. __Ingest data__ for training

2. __Define the search algorithm__ to guide hyperparameter optimization

3. __Specify hyperparameter sampling strategies__

4. __Configure the tuner__ with resources and constraints

5. __Run training jobs__ for each hyperparameter combination

6. __Collect metrics and hyperparameters__ from each training job

7. __Analyze results__ to identify the best-performing configuration

## Experiment Tracking with Snowflake ML

__`Experiments`__ provide a structured way to organize and evaluate model training results. They allow you to:

- Compare outcomes of hyperparameter adjustments, different metrics, or model types

- Track runs, each containing metadata and artifacts such as hyperparameters, metrics, and training logs

- Select the __best-performing model__ in a reproducible, governed workflow

In [None]:
from snowflake.ml.data import DataConnector
from snowflake.ml.modeling.tune import get_tuner_context
from snowflake.ml.modeling import tune
from entities import search_algorithm
import psutil
from snowflake.ml.experiment.experiment_tracking import ExperimentTracking


## HPO Data Ingestion

Before running hyperparameter optimization, the train and test datasets are ingested into the HPO workflow. This is done using the __`dataset_map`__ object, which is a dictionary mapping each dataset (full, features, labels) to its corresponding Snowflake __`DataConnector`__ object, enabling seamless integration with Snowflake ML.

In [None]:
# Define dataset map
X_train = train.drop("LOAN_RESPONSE", "TIMESTAMP", "LOAN_ID")
y_train = train.select("LOAN_RESPONSE")
X_test = test.drop("LOAN_RESPONSE","TIMESTAMP", "LOAN_ID")
y_test = test.select("LOAN_RESPONSE")

dataset_map = {
    "train": DataConnector.from_dataframe(train), 
    "X_train": DataConnector.from_dataframe(X_train), 
    "y_train": DataConnector.from_dataframe(y_train),
    "test": DataConnector.from_dataframe(test),
    "X_test": DataConnector.from_dataframe(X_test), 
    "y_test": DataConnector.from_dataframe(y_test)
    }


The __`dataset_map`__ is then passed to the __`HPO train function`__, giving each trial access to the required datasets for __training, validation, inference, and metric computation__. This approach ensures that data is __organized, reproducible, and fully compatible__ with the Snowflake ML HPO workflow.

## HPO Train Function

The __`HPO train function`__ orchestrates a single trial of the hyperparameter optimization workflow. It leverages the __`TunerContext`__ object to access:

- `Hyperparameters` for the current trial

- `Dataset mapping` (`dataset_map`) for training, validation, and testing

### Workflow Details:

1. __Dataset Preparation__:

The dataset map separates training and test splits into features and labels, wrapped as DataConnector objects for seamless Snowflake ML integration. These datasets are converted to Pandas DataFrames for model training and evaluation.

2. __Experiment Tracking__:

Each trial is logged using __`ExperimentTracking`__, capturing hyperparameters, metrics, and artifacts.

3. __Model Instantiation and Training__:

    - Hyperparameters from `TunerContext` are applied to an `XGBClassifier`

    - The model is trained on the full training dataset

4. __Inference and Metrics Computation__:

    - Predictions are generated on both training and test sets

    - Metrics such as __ROC AUC__ are computed for evaluation

5. __Logging and Reporting__:

    - Hyperparameters and metrics are logged to __`Snowflake ML Experiment Tracking`__

    - Metrics and the trained model are reported back to __TunerContext__, enabling automated selection of the best-performing trial

This design ensures that each HPO trial is __reproducible, fully tracked, and seamlessly integrated__ into the Snowflake ML workflow, from dataset ingestion through model training, evaluation, and experiment logging.

In [None]:
def train_func():

    local_session = get_active_session()
    exp = ExperimentTracking(session=local_session)
    
    exp.set_experiment("MLOPS_HPO_Experiments")
    
    with exp.start_run():
        # A context object provided by HPO API to expose data for the current HPO trial        
        tuner_context = get_tuner_context()
        
        # Generate params
        config = tuner_context.get_hyper_params()        
        dm = tuner_context.get_dataset_map()
    
        # Log params to experiment tracking
        exp.log_params(config)

        train_dm_pd = dm["train"].to_pandas().sort_index()
        X_train_dm_pd = dm["X_train"].to_pandas().sort_index()
        y_train_dm_pd = dm["y_train"].to_pandas().sort_index()        
        test_dm_pd = dm["test"].to_pandas().sort_index()
        X_test_dm_pd = dm["X_test"].to_pandas().sort_index()
        y_test_dm_pd = dm["y_test"].to_pandas().sort_index()
       
        
        # Instantiate model with generated params
        model = XGBClassifier(input_cols=X_train_dm_pd.columns,
                              label_cols=y_train_dm_pd.columns,
                              **config, random_state=42)
            
    
        # Train model, get predictions
        model.fit(train_dm_pd)
        
        # Run inference on train preds
        hpo_train_preds = model.predict(train_dm_pd)       

        # Run inference on test preds
        hpo_test_preds = model.predict(test_dm_pd)
           
        # Compute metrics 
        hpo_auc_train = roc_auc_score(hpo_train_preds["LOAN_RESPONSE"], hpo_train_preds["OUTPUT_LOAN_RESPONSE"])

        hpo_auc_test = roc_auc_score(hpo_test_preds["LOAN_RESPONSE"], hpo_test_preds["OUTPUT_LOAN_RESPONSE"])
                        
        metrics_to_log = {"HPO_ROC_AUC_Train": hpo_auc_train,
                         "HPO_ROC_AUC_Test": hpo_auc_test,
                         }
    
        # Log metrics to experiment tracking and tuner context 
        exp.log_metrics(metrics_to_log)
    
        tuner_context.report(metrics=metrics_to_log, model=model)
       


## Creating the HPO Tuner

The __`Tuner`__ object orchestrates hyperparameter optimization by combining the __training function, search space, and tuner configuration__.

__`Search Space`__

The search space defines __how hyperparameters are sampled__ during each trial, specifying the range and type of values. Snowflake HPO provides several sampling functions:

- `tune.uniform(lower, upper)` – Samples a __continuous value__ uniformly between __lower__ and __upper__. Ideal for parameters like dropout rates or regularization strengths.

- `tune.loguniform(lower, upper)` – Samples a value in __logarithmic space__, suitable for parameters spanning several orders of magnitude, e.g., learning rates.

- `tune.randint(lower, upper)` – Samples an __integer__ uniformly between __lower__ (inclusive) and __upper__ (exclusive). Useful for discrete parameters like the number of layers.

__`Tuner Configuration`__

The __`TunerConfig`__ object defines how the HPO process runs, including:

- __Metric__: The performance metric to optimize (e.g., __roc_auc_score__, __precision__, __recall__, __F1__, or loss)

- __Mode__: Whether the metric should be __maximized__ or __minimized__ ("max" or "min")

- __Search Algorithm__: The strategy for exploring the hyperparameter space. Options include:

    - __RandomSearch__ – Randomly samples hyperparameters

    - __GridSearch__ – Evaluates every possible combination in a defined grid

    - __BayesOpt__ – Uses Bayesian optimization to intelligently select hyperparameters based on previous trial results

- __Number of Trials__: Total number of hyperparameter configurations to evaluate

- __Concurrency__: Number of trials that can run simultaneously

Once the search space and configuration are defined, the `Tuner` object can execute the HPO workflow, running trials in parallel, evaluating metrics, and reporting results back to `TunerContext` and `Experiment Tracking`.

In [None]:
search_space = {
    "max_depth": [10, 20],
    "learning_rate": [0.05, 0.1],
    "n_estimators": [500, 1000],    
       
    "reg_lambda": [0.5], # L2
    "scale_pos_weight": [0.5]
}

In [None]:
tuner = tune.Tuner(
    train_func=train_func,
    search_space=search_space,
    tuner_config=tune.TunerConfig(
        metric="HPO_ROC_AUC_Test",
        mode="max",
        search_alg=search_algorithm.GridSearch(),
        num_trials=8, #run 8 trial runs
        max_concurrent_trials=psutil.cpu_count(logical=False) # Use all available CPUs to run distributed HPO. Can also use GPUs here. 
    ),
)

In [None]:
# train several model candidates 
tuner_results = tuner.run(dataset_map=dataset_map)
tuner_results.results


In [None]:
from snowflake.ml.runtime_cluster import get_ray_dashboard_url
import streamlit as st

st.write('View HPO job progress on Ray-powered distributed compute cluster running it by clicking the link below!')
st.write('https://'+ get_ray_dashboard_url())

In [None]:
tuned_model = tuner_results.best_model


In [None]:
opt_preds_train = tuned_model.predict(X_train_pd)

train_auc_opt = roc_auc_score(y_train_pd,opt_preds_train["OUTPUT_LOAN_RESPONSE"])
print("Train AUC OPT:", train_auc_opt)

opt_preds_test = tuned_model.predict(X_test_pd)

test_auc_opt = roc_auc_score(y_test_pd,opt_preds_test["OUTPUT_LOAN_RESPONSE"])
print("Test AUC OPT:", test_auc_opt)

# Train AUC OPT: 0.743872725725519  
# Test AUC OPT: 0.6448419192348257

In [None]:
# Log the optimized model to the model registry
optimized_version_name = 'XGB_Optimized'

try:
    mv_opt = model_registry.get_model(model_name).version(optimized_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    mv_opt = model_registry.log_model(
        model_name=model_name,
        model=tuned_model, 
        version_name=optimized_version_name,
        sample_input_data=train.drop(["TIMESTAMP", "LOAN_ID", "LOAN_RESPONSE"]).limit(100),
        comment = "snowflake ml model built of feature store using HPO model",
        target_platforms= ["WAREHOUSE", "SNOWPARK_CONTAINER_SERVICES"],
        options= {"enable_explainability": True}
    )

    mv_opt.set_metric(metric_name="Train_ROC_AUC_Score", value=train_auc_opt)
    mv_opt.set_metric(metric_name="Test_ROC_AUC_Score", value=test_auc_opt)

In [None]:
# Set the optimized model to be the default model version
model_registry.get_model(model_name).default = optimized_version_name

# Update the PROD tagged model to be the optimized model version
m.unset_tag("PROD")
m.set_tag("PROD", optimized_version_name)

# Model Explainability

During training, machine learning models learn relationships between inputs and outputs without these relationships being explicitly stated. When a model underperforms, understanding __why__ it behaves a certain way can be challenging. In __regulated industries__ like __finance__ or __healthcare__, explainability is critical to demonstrate that models are producing correct results for the right reasons.

## Shapley Values in Snowflake Model Registry

Snowflake Model Registry provides __model explainability__ using __Shapley values__, a method to attribute a model’s output to its input features. Key aspects:

- __Fair feature attribution__: Shapley values consider all possible combinations of input features to measure the average marginal contribution of each feature to a prediction.

- __Insight for interpretability__: Helps debug models and understand why predictions differ across inputs.

- __Deviation from background data__: Shapley values report how predictions deviate from an average baseline, calculated using a representative sample of the dataset.

## Background Data

- The background data provides the baseline for calculating Shapley values. Consistency is critical when comparing Shapley values across datasets.

- Some tree-based models may encode background data internally, but most models benefit from explicit background data for accurate explanations.

- Snowflake allows up to __1,000 rows__ of background data to be provided when logging a model via the __`sample_input_data`__ parameter or explicitly during analysis.

## How It Works

- Input features are systematically perturbed and replaced with background data to compute the contribution of each feature.

- Positive or negative Shapley values indicate whether a feature __increases__ or __decreases__ the predicted outcome relative to the baseline.

By integrating Shapley-based explainability, Snowflake ensures that models are transparent, interpretable, and trustworthy, supporting both debugging and regulatory compliance.

In [None]:
# Create a sample of records for explanation
test_pd_sample=test_pd.rename(columns=rename_dict).sample(n=2500, random_state = 100).reset_index(drop=True)

# Compute and retrieve shapley values for each model
base_shap_pd = mv_base.run(test_pd_sample, function_name="explain")
opt_shap_pd = mv_opt.run(test_pd_sample, function_name="explain")

In [None]:
base_shap_pd.columns

In [None]:
# built-in Snowflake's Python visualization functions used to interpret the explainability values of a model
from snowflake.ml.monitoring import explain_visualize

feat_df=test_pd_sample.drop(["LOAN_RESPONSE","TIMESTAMP", "LOAN_ID"],axis=1)

explain_visualize.plot_influence_sensitivity(base_shap_pd, feat_df, figsize=(1500, 500))



In [None]:
# shap visualizations
import shap 
import numpy as np

# Summary plot for base model
shap.summary_plot(np.array(base_shap_pd.astype(float)), 
                  test_pd_sample.drop(["LOAN_ID","LOAN_RESPONSE", "TIMESTAMP"], axis=1), 
                  feature_names = test_pd_sample.drop(["LOAN_ID","LOAN_RESPONSE", "TIMESTAMP"], axis=1).columns)

# Summary plot for optimized model
shap.summary_plot(np.array(opt_shap_pd.astype(float)), 
                  test_pd_sample.drop(["LOAN_ID","LOAN_RESPONSE", "TIMESTAMP"], axis=1), 
                  feature_names = test_pd_sample.drop(["LOAN_ID","LOAN_RESPONSE", "TIMESTAMP"], axis=1).columns)


In [None]:
# Merge shap vals and actual vals together for easier plotting
all_shap_base = test_pd_sample.merge(base_shap_pd, right_index=True, left_index=True, how='outer')
all_shap_opt = test_pd_sample.merge(opt_shap_pd, right_index=True, left_index=True, how='outer')



In [None]:
all_shap_base = all_shap_base.rename(
    columns=lambda c: c.strip('"')
)

all_shap_opt = all_shap_opt.rename(
    columns=lambda c: c.strip('"')
)

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

# Filter data down to strip outliers
asb_filtered = all_shap_base[(all_shap_base.INCOME>0) & (all_shap_base.INCOME<250000)]
aso_filtered = all_shap_opt[(all_shap_opt.INCOME>0) & (all_shap_opt.INCOME<250000)]

# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("INCOME EXPLANATION")
# Plot side-by-side boxplots
sns.scatterplot(data = asb_filtered, x ='INCOME', y = 'INCOME_explanation', ax=axes[0])
sns.regplot(data = asb_filtered, x ="INCOME", y = 'INCOME_explanation', scatter=False, color='red', line_kws={"lw":2},ci =100, lowess=False, ax =axes[0])

axes[0].set_title('Base Model')
sns.scatterplot(data = aso_filtered, x ='INCOME', y = 'INCOME_explanation',color = "orange", ax = axes[1])
sns.regplot(data = aso_filtered, x ="INCOME", y = 'INCOME_explanation', scatter=False, color='blue', line_kws={"lw":2},ci =100, lowess=False, ax =axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("Income")
    ax.set_ylabel("Influence")
plt.tight_layout()
plt.show()

In [None]:
# Filter data down to strip outliers
asb_filtered = all_shap_base[all_shap_base.LOAN_AMOUNT<2000000]
aso_filtered = all_shap_opt[all_shap_opt.LOAN_AMOUNT<2000000]


# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("LOAN_AMOUNT EXPLANATION")
# Plot side-by-side boxplots
sns.scatterplot(data = asb_filtered, x ='LOAN_AMOUNT', y = 'LOAN_AMOUNT_explanation', ax=axes[0])
sns.regplot(data = asb_filtered, x ="LOAN_AMOUNT", y = 'LOAN_AMOUNT_explanation', scatter=False, color='red', line_kws={"lw":2},ci =100, lowess=True, ax =axes[0])
axes[0].set_title('Base Model')

sns.scatterplot(data = aso_filtered, x ='LOAN_AMOUNT', y = 'LOAN_AMOUNT_explanation',color = "orange", ax = axes[1])
sns.regplot(data = aso_filtered, x ="LOAN_AMOUNT", y = 'LOAN_AMOUNT_explanation', scatter=False, color='blue', line_kws={"lw":2},ci =100, lowess=True, ax =axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("LOAN_AMOUNT")
    ax.set_ylabel("Influence")
    # ax.set_xlim((0,10000))
plt.tight_layout()
plt.show()

In [None]:
# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("HOME PURCHASE LOAN EXPLANATION")
# Plot side-by-side boxplots
sns.boxplot(data = all_shap_base, x ='LOAN_PURPOSE_HOME_PURCHASE', y = 'LOAN_PURPOSE_HOME_PURCHASE_explanation',
            hue='LOAN_PURPOSE_HOME_PURCHASE', width=0.8, ax=axes[0])
axes[0].set_title('Base Model')
sns.boxplot(data = all_shap_opt, x ='LOAN_PURPOSE_HOME_PURCHASE', y = 'LOAN_PURPOSE_HOME_PURCHASE_explanation',
            hue='LOAN_PURPOSE_HOME_PURCHASE', width=0.4, ax = axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("Home PURCHASE Loan (1 = True)")
    ax.set_ylabel("Influence")
    ax.legend(loc='upper right')

plt.show()

# Model Inference on Warehouse using Registered Model and Snowflake Python Stored Procedure 

Models registered in the __Snowflake Model Registry__ can be executed directly within a __Snowflake virtual warehouse__ (default). This method is ideal for small-to-medium CPU-only models, whose dependencies are available via the Snowflake Conda channel. The method requires no container setup and is best for testing or moderate-sized datasets.

For __warehouse-based inference__, the `ModelVersion` object (mv) is retrieved from the Model Registry and its `run` method is invoked to make predictions with following parameters:
- `X` - the input inference data provided as a __Snowpark__ or __pandas DataFrame__.
- `function_name` - the function of the model to call (e.g., `predict`)

`run` returns a DataFrame of the same type as the input, containing model predictions.

For __automated, table-based batch inference__, model predictions obtained via `mv.run` are wrapped in a __Snowflake Python stored procedure__, enabling inference on entire tables and saving results back to Snowflake.

__Key features__:
- Automates __batch inference__ on production-scale tables.
- Uses the __registered model version__ from the Snowflake Model Registry.
- Predictions are joined back to the input table and saved in Snowflake.
- Supports __schema evolution__ and reusable calls from SQL or Python.

In [None]:
train.write.save_as_table(f"LOANS_TRAIN_{VERSION_NUM}", mode="overwrite")
test.write.save_as_table(f"LOANS_TEST_{VERSION_NUM}", mode="overwrite")

In [None]:
SHOW TABLES;

In [None]:
from snowflake import snowpark
from snowflake.snowpark.types import StringType
from snowflake.ml.registry import Registry


def inference_sproc(session: snowpark.Session, table_name: str, modelname: str, modelversion: str) -> str:

    reg = Registry(session=session)
    m = reg.get_model(modelname)  # Fetch the model using the registry
    mv = m.version(modelversion)
    
    input_table_name=table_name
    pred_col = f'{modelversion}_PREDICTION'

    # Read the input table to a dataframe
    sdf = session.table(input_table_name)
    results = mv.run(sdf, function_name="predict") \
        .select("LOAN_ID", "OUTPUT_LOAN_RESPONSE") \
        .withColumnRenamed("OUTPUT_LOAN_RESPONSE", pred_col)
    # 'results' is the output DataFrame with predictions

    final = sdf.join(results, on="LOAN_ID", how="full")
    
    # Write results back to Snowflake table
    final.write.save_as_table(table_name, mode='overwrite', enable_schema_evolution=True)

    return "Success"

# Register the stored procedure
session.sproc.register(
    func=inference_sproc,
    name="inference_sproc",
    replace=True,
    is_permanent=True,
    stage_location="@MLOPS_STAGE",
    packages=['joblib', 'snowflake-snowpark-python', 'snowflake-ml-python'],
    return_type=StringType()
)

In [None]:
CALL inference_sproc('LOANS_TRAIN_{{VERSION_NUM}}','{{model_name}}', '{{base_version_name}}');

In [None]:
CALL inference_sproc('LOANS_TEST_{{VERSION_NUM}}','{{model_name}}', '{{base_version_name}}');


In [None]:
CALL inference_sproc('LOANS_TRAIN_{{VERSION_NUM}}','{{model_name}}', '{{optimized_version_name}}');

In [None]:
CALL inference_sproc('LOANS_TEST_{{VERSION_NUM}}','{{model_name}}', '{{optimized_version_name}}');

In [None]:
SELECT TIMESTAMP, LOAN_ID, INCOME, LOAN_AMOUNT, XGB_BASE_PREDICTION, XGB_OPTIMIZED_PREDICTION, LOAN_RESPONSE 
FROM LOANS_TEST_{{VERSION_NUM}} 
LIMIT 10;

# ML Observability:  Monitoring model behavior over time

`ML Observability` in Snowflake enables traking the behavior and quality of production models deployed via Snowflake Model Registry. By monitoring models over time, can detect issues early, understand performance trends, and ensure reliable predictions.

## Key capabilities:
- __Performance Monitoring__: Track metrics such as acuuracy, ROC AUC, precision, recall, or custom metrics over time using stored inference data.
- __Drift Detection__: Detect changes in input feature distributions or target labels to identify __data drift__ or __concept drift__, which can impact model performance.
- __Volume Monitoring__: Observe the number of predictions over time to detect operational anomalies or changes in usage patterns,
- __Segmented Monitoring__: Analyze model performance across different __data segments__, such as categories in string columns, to ensure consistent behavior for all groups.

## How It Works:
- ML Observability uses __stored inference data__ from deployed models.
- Metrics and statistics can be automatically calculated and visualized over time,
- Alerts or dashboards can be set up to flag deviations or unexpected patterns.

By leveraging ML Observability, organizations gain __continuous insight__ into model health, enabling proactive maintenance, timely retraining, and compliance with regulatory standards.

In [None]:
-- create segments

ALTER TABLE LOANS_TRAIN_{{VERSION_NUM}}
ADD COLUMN IF NOT EXISTS LOAN_PURPOSE VARCHAR(50);

UPDATE LOANS_TRAIN_{{VERSION_NUM}}
SET LOAN_PURPOSE = CASE
    WHEN LOAN_PURPOSE_HOME_IMPROVEMENT = 1 THEN 'HOME_IMPROVEMENT'
    WHEN LOAN_PURPOSE_HOME_PURCHASE = 1 THEN 'HOME_PURCHASE'
    WHEN LOAN_PURPOSE_REFINANCING = 1 THEN 'REFINANCING'
    ELSE 'OTHER'
END;

ALTER TABLE LOANS_TEST_{{VERSION_NUM}}
ADD COLUMN IF NOT EXISTS LOAN_PURPOSE VARCHAR(50);

UPDATE LOANS_TEST_{{VERSION_NUM}}
SET LOAN_PURPOSE = CASE
    WHEN LOAN_PURPOSE_HOME_IMPROVEMENT = 1 THEN 'HOME_IMPROVEMENT'
    WHEN LOAN_PURPOSE_HOME_PURCHASE = 1 THEN 'HOME_PURCHASE'
    WHEN LOAN_PURPOSE_REFINANCING = 1 THEN 'REFINANCING'
    ELSE 'OTHER'
END;

SELECT LOAN_PURPOSE_HOME_PURCHASE, LOAN_PURPOSE_HOME_IMPROVEMENT, LOAN_PURPOSE_REFINANCING, LOAN_PURPOSE FROM LOANS_TEST_{{VERSION_NUM}} limit 10;

# Model Monitor

A __`Model Monitor`__ in Snowflake is a __first-class object__ created for each model version that requires ongoing monitoring. It enables __continuous tracking of model behavior and performance__ using __stored inference data__.

## Key Attributes of a Model Monitor 

Each monitor encapsulates configuration and metadata required to track a specific model version:

- `Model & Model Version`: the model and its specific version to monitor.

- `Function Name`: the specific method in the model version to monitor (e.g., predict).

- `Source table`: the Snowflake table where monitor logs are stored.

- `Warehouse`: The Snowflake warehouse used for the monitor’s internal compute operations.

- `Refresh interval`: How frequently the monitor updates its internal state (minimum: 60 seconds). 

- `Aggregation Window`: The minimum time granularity for storing monitoring data (minimum: 1 day).

- `Timestamp Column`: The column of type TIMESTAMP_NTZ in the source data containing timestamps.

- `Baseline Table (Optional)`: A snapshot of historical or reference data  used for computing drift and comparative metrics. Required to detect drift.

- `ID Columns (Optional)`: Array of column names that uniquely identify each row in the source data.

- `Prediction Class Columns (Optional)`: Columns representing model predictions. For binary classification or regression tasks, columns must be of type NUMBER. For multi-class classification, must be STRING.

- `Actual Class Columns (Optional)`: Columns containing true target values from the source data.

- `Segment Columns (Optional)`: Columns in source data used to segment data for performance monitoring (must be STRING). 

## How It Works:

- The monitor __automatically refreshes its logs__ by querying source data at the specified refresh interval.

- Monitoring reports are updated based on the aggregated logs.

- Supports tracking __performance metrics, drift, and segment-level insights__.

Using a Model Monitor ensures that the deployed models are __continuously observed__, enabling proactive alerts, early detection of drift, and reliable model operations over time.

In [None]:
CREATE OR REPLACE MODEL MONITOR LOANS_BASE_MODEL_MONITOR
WITH
    MODEL={{model_name}}
    VERSION={{base_version_name}}
    FUNCTION=predict
    SOURCE=LOANS_TEST_{{VERSION_NUM}}
    BASELINE=LOANS_TRAIN_{{VERSION_NUM}}
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(XGB_BASE_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(LOAN_RESPONSE)
    ID_COLUMNS=(LOAN_ID)
    SEGMENT_COLUMNS = (LOAN_PURPOSE)
    WAREHOUSE={{COMPUTE_WAREHOUSE}}
    REFRESH_INTERVAL='12 hours'
    AGGREGATION_WINDOW='1 day';

In [None]:
CREATE OR REPLACE MODEL MONITOR LOANS_OPTIMIZED_MODEL_MONITOR
WITH
    MODEL={{model_name}}
    VERSION={{optimized_version_name}}
    FUNCTION=predict
    SOURCE=LOANS_TEST_{{VERSION_NUM}}
    BASELINE=LOANS_TRAIN_{{VERSION_NUM}}
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(XGB_OPTIMIZED_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(LOAN_RESPONSE)
    ID_COLUMNS=(LOAN_ID)
    SEGMENT_COLUMNS = (LOAN_PURPOSE)
    WAREHOUSE={{COMPUTE_WAREHOUSE}}
    REFRESH_INTERVAL='12 hours'
    AGGREGATION_WINDOW='1 day';

In [None]:
import streamlit as st

st.write('Click the link below to visualize monitored model reports!')
st.write(f'https://app.snowflake.com/{ORG_NAME}/{ACCOUNT_NAME}/#/data/databases/{DB}/schemas/{SCHEMA}/model/{model_name.upper()}')


## Querying Model Monitoring Results

Each __Model Monitor__ automatically computes and tracks a variety of metrics for the monitored model version. These metrics help understand model behavior, detect drift, and ensure consistent performance over time.

### Types of Metrics

- __`Drift Metrics`__: Track changes in input feature distributions or target distributions over time. Detects __data drift__ or __concept drift__.

- __`Performance Metrics`__: Track changes in model performance, such as ROC AUC, accuracy, or other user-defined metrics.

- __`Statistical Metrics`__: Basic statistics on the source data, such as row counts, null values, or distribution summaries.

### Querying Metrics

Snowflake provides dedicated functions to query metrics for a model monitor:

| Metric Type         | Query Function                     | Description                                                                  |
| ------------------- | ---------------------------------- | ---------------------------------------------------------------------------- |
| Drift Metrics       | `MODEL_MONITOR_DRIFT_METRIC`       | Returns metrics on distribution or statistical shifts in monitored features. |
| Performance Metrics | `MODEL_MONITOR_PERFORMANCE_METRIC` | Returns metrics tracking model prediction quality over time.                 |
| Statistical Metrics | `MODEL_MONITOR_STAT_METRIC`        | Returns statistics on data quality, counts, or null values.                  |

These functions allow to __analyze, visualize, and alert__ on __model behavior trends__ directly within Snowflake.

In [None]:
SELECT * FROM TABLE(MODEL_MONITOR_DRIFT_METRIC (
    'LOANS_BASE_MODEL_MONITOR', -- model monitor to use
    'DIFFERENCE_OF_MEANS', -- metric for computing drift
    'XGB_BASE_PREDICTION', -- column to compute drift on
    '1 DAY',  -- day granularity for drift computation
    DATEADD(DAY, -90, CURRENT_DATE()), -- end date
    DATEADD(DAY, -60, CURRENT_DATE()) -- start date
    )
)

# Model Serving: Running Inference in Snowpark Container Services (SPCS)

The __Snowflake Model Registry__ allows models to run either:

1. Directly in a __warehouse__ (default, CPU-only, small-to-medium datasets).
2. In a `Snowpark Container Services (SPCS) compute pool` via ` Model Serving` for scalable, low-latency inference.

## How SPCS Model Serving Works

- Snowflake builds an __inference container image__ containing:
    - The registered model
    - Dependencies, including PyPI packages or other sources

- The container image is stored in an __image repository__.
    - Snowflake provides a __default image repository__, automatically used if no custom repository is provided.
    - To use a __custom repository__, create one and pass its name via the `image_repo` argument in the `create_service` method.
    - Multiple users or roles can __share the same container image__ to save time and compute costs.
    - Use a __single image repository__ to avoid rebuilding the image multiple times and enable reuse by all users.
    - All roles that will use the repository must have the following privileges:
        - SERVICE READ & WRITE
        - REPO READ & WRITE
    - Keep wite privileges even after the image is initially built, as updates to dependencies may require rebuilding the image.    

  
- The container runs in a Snowflake __compute pool__ and exposes the model for inference to serve predictions. A __compute pool__ is a collection of one or more VM nodes on which Snowflake runs SPCS services. Creating a compute pool involves specifying at minimum:

    - __Instance Family__ to provision for the compute pool nodes (e.g., CPU_X64_L)

    - __Minimum Nodes__ to launch the compute pool with

    - __Maximum Nodes__ the compute pool can scale to; prevents an unexpectedly large number of nodes from being added to the compute pool by Snowflake autoscaling.

- __`Ingress HTTP endpoints`__ can be provisioned for external applications to call the model from the public Internet.

- __`Each created service for SPCS deployment`__ - has an __internal DNS name__, and a public endpoint is created if `ingress_enabled=True`.

## Deploying a Model to SPCS

1. Obtain a __registered__ `ModelVersion` object (`mv`).

2. Create a service for SPCS deployment using the `create_service` method of `mv`.

3. Run inference using the service by calling the `run` method of `mv` including the `service_name` parameter. Omitting `service_name` will default to running inference in a warehouse.

## Accessing Service Endpoints

- __Public HTTP endpoint (external calls)__:

    - Use `SHOW ENDPOINT` or `mv.list_services()`

    - The `ingress_url` output column has the format: <unique-service-id>-<account-id>.snowflakecomputing.app

    - Call the service via: `https://<unique-service-id>-<account-id>.snowflakecomputing.app/<method-name>` (e.g., `https://ingress_url/predict`)

- __Internal DNS (internal calls within Snowflake)__:

- Use `DESCRIBE SERVICE` to retrieve the `dns_name` column

- Call the service via: `https://<dns_name>:<port>/<method_name>`



## Key benefits

- __Scalable Inference__: Automatically leverages a distributed compute pool. Enables scalable, low-latency predictions for production workloads.

- __Custom Dependencies__: Full control over Python packages and libraries.

- __Efficient Sharing__: Single image repository can be resued across users and roles.

- __External Accessibility__: Optionally expose models via public HTTP endpoints for external applications.

- __Seamless Snowflake Integration__: Works with Snowflake Datasets, Feature Store, and Model Registry for full end-to-end ML lifecycle management.



In [None]:
-- Increase compute pool's instance type and number of nodes for multi-node inference

ALTER COMPUTE POOL MLOPS_COMPUTE_POOL SET MIN_NODES=2;
ALTER COMPUTE POOL MLOPS_COMPUTE_POOL SET MAX_NODES=2;
ALTER COMPUTE POOL MLOPS_COMPUTE_POOL SET INSTANCE_FAMILY="CPU_X64_L";

In [None]:
model_name=f"LOAN_MLOPS_{VERSION_NUM}"
mv_opt = model_registry.get_model(model_name).version("XGB_OPTIMIZED")

mv_opt.create_service(
    service_name="MLOPS_LOANS_INFERENCE_SPCS_SERVICE",
    service_compute_pool="MLOPS_COMPUTE_POOL",
    ingress_enabled=True,
    max_instances=2   # 2 service instances running
)

In [None]:
model_registry.get_model(model_name).version("XGB_OPTIMIZED")

In [None]:
model_registry.get_model(model_name).show_versions()

In [None]:
mv_container = model_registry.get_model(model_name).default
mv_container.run(test, function_name = "predict", service_name = "MLOPS_LOANS_INFERENCE_SPCS_SERVICE").rename("OUTPUT_LOAN_RESPONSE", 'XGB_PREDICTION')

In [None]:
SHOW ENDPOINTS IN SERVICE {{DB}}.{{SCHEMA}}.MLOPS_LOANS_INFERENCE_SPCS_SERVICE;

In [None]:
mv_container.list_services()

In [None]:
DESCRIBE SERVICE {{DB}}.{{SCHEMA}}.MLOPS_LOANS_INFERENCE_SPCS_SERVICE;

In [None]:
import os
import json
import numpy as np
import requests

def get_headers(pat_token):
    headers = {'Authorization': f'Snowflake Token="{pat_token}"'}
    return headers

headers = get_headers(os.getenv("PAT_TOKEN"))

URL = 'https://<ingress_url>/predict'

# Prepare data to be sent
test_pd = test.to_pandas()
data = {"data": np.column_stack([range(test_pd.shape[0]), test_pd.values]).tolist()}

# Send over HTTP
def send_request(data: dict):
    output = requests.post(URL, json=data, headers=headers)
    assert (output.status_code == 200), f"Failed to get response from the service. Status code: {output.status_code}"
    return output.content

results = send_request(data=data)
print(json.loads(results))

When a model is deployed to Snowpark Container Services, Snowflake Model Serving builds a `container inference image` that holds the model and its dependencies. The container image is stored in an `IMAGE REPOSITORY`. Snowflake provides a default image repository, which is used automatically when no custom repository is provided when creating a service. To use a custom repository, create an image repository and pass its name in the `image_repo` argument of the `create_service` method.

Sharing the image repository

It is common for multiple users or roles to use the same model. Using a single image repository allows the image to be built once and reused by all users, saving time and expense. All roles that will use the repo need the SERVICE READ, SERVICE WRITE, READ, and WRITE privileges on the repo. Since the image might need to be rebuilt to update dependencies, keep the write privileges; don’t revoke them after the image is initially built.


In [None]:
ALTER COMPUTE POOL MLOPS_COMPUTE_POOL STOP ALL;