# ‚ùÑÔ∏è End-to-end ML Demo ‚ùÑÔ∏è

In this workflow we will work through the following elements of a typical tabular machine learning pipeline.

### 1. Use Feature Store to track engineered features
* Store feature definitions in feature store for reproducible computation of ML features
      
### 2. Train two Models using the Snowflake ML APIs
* Baseline XGboost
* XGboost with optimal hyper-parameters identified via Snowflake ML distributed HPO methods

### 3. Register both models in Snowflake model registry
* Explore model registry capabilities such as **metadata tracking, inference, and explainability**
* Compare model metrics on train/test set to identify any issues of model performance or overfitting
* Tag the best performing model version as 'default' version
### 4. Set up Model Monitor to track 1 year of predicted and actual patient readmissions
* **Compute performance metrics** such as F1, Precision, Recall
* **Inspect model drift** (i.e. how much has the average predicted readmission rate changed day-to-day)
* **Compare models** side-by-side to understand which model should be used in production
* Identify and understand **data issues**

### 5. Track data and model lineage throughout
* View and understand
  * The **origin of the data** used for computed features
  * The **data used** for model training
  * The **available model versions** being monitored

In [5]:
! pip install snowflake-ml-python==1.20.0
import ray
runtime_env = {
    "pip": ["snowflake-ml-python==1.20.0"]
}
ray.init(runtime_env=runtime_env)



In [2]:
#Update this VERSION_NUM to version your features, models etc!
VERSION_NUM = '0'
DB = "E2E_SNOW_MLOPS_DB" 
SCHEMA = "MLOPS_SCHEMA" 
COMPUTE_WAREHOUSE = "E2E_SNOW_MLOPS_WH" 

In [3]:
import pandas as pd
import numpy as np
import sklearn
import math
import pickle
from datetime import datetime
from xgboost import XGBClassifier



# Snowpark ML
from snowflake.ml.registry import Registry
from snowflake.ml.modeling.tune import get_tuner_context
from snowflake.ml.modeling import tune
from entities import search_algorithm

#Snowflake feature store
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity, CreationMode

# Snowpark session
from snowflake.snowpark import DataFrame
from snowflake.snowpark.functions import col, to_timestamp, min, max, month, dayofweek, dayofyear, avg, date_add, sql_expr
from snowflake.snowpark.types import IntegerType
from snowflake.snowpark import Window

#setup snowpark session
from snowflake.snowpark.context import get_active_session
session = get_active_session()
session.use_role("E2E_SNOW_MLOPS_ROLE")
session.use_database("E2E_SNOW_MLOPS_DB")
session.use_schema("MLOPS_SCHEMA")
session.use_warehouse("E2E_SNOW_MLOPS_WH")

In [4]:
try:
    print("Reading table data...")
    df = session.table("PATIENT_READMISSION_DEMO_DATA")
    df.show(5)
except:
    print("Table not found! Uploading data to snowflake table")
    df_pandas = pd.read_csv("PATIENT_READMISSION_DEMO_DATA.csv.zip")
    df_pandas.columns = [c.upper() for c in df_pandas.columns]
    session.write_pandas(df_pandas, "PATIENT_READMISSION_DEMO_DATA", auto_create_table=True)
    df = session.table("PATIENT_READMISSION_DEMO_DATA")
    df.show(5)

## Observe Snowflake Snowpark table properties

In [5]:
df.select(min('TS'), max('TS')).show()

In [6]:
#Get current date and time
current_time = datetime.now()
df_max_time = datetime.strptime(str(df.select(max("TS")).collect()[0][0]), "%Y-%m-%d %H:%M:%S.%f")

#Find delta between latest existing timestamp and today's date
timedelta = current_time- df_max_time

#Update timestamps to represent last ~1 year from today's date
df.select(min(date_add(to_timestamp("TS"), timedelta.days-1)), max(date_add(to_timestamp("TS"), timedelta.days-1)))

## Feature Engineering with Snowpark APIs

In [7]:
#Create a dict with keys for feature names and values containing transform code

feature_eng_dict = dict()

#Timestamp features
feature_eng_dict["TIMESTAMP"] = date_add(to_timestamp("TS"), timedelta.days-1)
feature_eng_dict["MONTH"] = month("TIMESTAMP")
feature_eng_dict["DAY_OF_YEAR"] = dayofyear("TIMESTAMP") 
feature_eng_dict["DOTW"] = dayofweek("TIMESTAMP")

# df= df.with_columns(feature_eng_dict.keys(), feature_eng_dict.values())

#Medical and patient features
feature_eng_dict["TREATMENT_COST"] = col("TREATMENT_COST_1000S")*1000
feature_eng_dict["PATIENT_AGE"] = col("PATIENT_AGE_YEARS")
feature_eng_dict["LENGTH_OF_STAY"] = col("LENGTH_OF_STAY_DAYS")
feature_eng_dict["COST_PER_DAY"] = col("TREATMENT_COST")/col("LENGTH_OF_STAY")

df = df.with_columns(feature_eng_dict.keys(), feature_eng_dict.values())
df.show()

In [8]:
df.explain()

## Create a Snowflake Feature Store

In [9]:
fs = FeatureStore(
    session=session, 
    database=DB, 
    name=SCHEMA, 
    default_warehouse=COMPUTE_WAREHOUSE,
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST
)

In [10]:
fs.list_entities()

## Feature Store configuration
- create/register entities of interest

In [11]:
#First try to retrieve an existing entity definition, if not define a new one and register
try:
    #retrieve existing entity
    patient_id_entity = fs.get_entity('PATIENT_ENTITY') 
    print('Retrieved existing entity')
except:
#define new entity
    patient_id_entity = Entity(
        name = "PATIENT_ENTITY",
        join_keys = ["PATIENT_ID"],
        desc = "Features defined on a per patient level")
    #register
    fs.register_entity(patient_id_entity)
    print("Registered new entity")

In [12]:
#Create a dataframe with just the ID, timestamp, and engineered features. We will use this to define our feature view
feature_df = df.select(["PATIENT_ID"]+list(feature_eng_dict.keys()))
feature_df.show(5)

Here, the feature store references an existing table. 

We could also define the dataframe via the use of Snowpark APIs, and use that dataframe (or a function that returns a dataframe) as the feature view definition, below.

In [13]:
#define and register feature view
patient_fv = FeatureView(
    name="Patient_Readmission_Feature_View",
    entities=[patient_id_entity],
    feature_df=feature_df,
    timestamp_col="TIMESTAMP",
    refresh_freq="1 day")

#add feature level descriptions

patient_fv = patient_fv.attach_feature_desc(
    {
        "MONTH": "Month of admission",
        "DAY_OF_YEAR": "Day of calendar year of admission",
        "DOTW": "Day of the week of admission",
        "TREATMENT_COST": "Treatment cost in $USD",
        "PATIENT_AGE": "Patient age in years",
        "LENGTH_OF_STAY": "Length of hospital stay in days",
        "COST_PER_DAY": "Treatment cost per day in $USD",
    }
)

patient_fv = fs.register_feature_view(patient_fv, version=VERSION_NUM, overwrite=True)

In [14]:
fs.list_feature_views()

## Retrieve a Dataset from the featureview

Snowflake Datasets are immutable, file-based objects that exist within your Snowpark session. 

They can be written to persistent Snowflake objects as needed. 

In [15]:
ds = fs.generate_dataset(
    name=f"PATIENT_DATASET_EXTENDED_FEATURES_{VERSION_NUM}",
    spine_df=df.select("PATIENT_ID", "TIMESTAMP", "ADMISSION_TYPE","READMITTED"), #only need the features used to fetch rest of feature view
    features=[patient_fv],
    spine_timestamp_col="TIMESTAMP",
    spine_label_cols=["READMITTED"]
)

In [16]:
ds_sp = ds.read.to_snowpark_dataframe()
ds_sp.show(5)

In [17]:
import snowflake.ml.modeling.preprocessing as snowml
from snowflake.snowpark.types import StringType

OHE_COLS = ds_sp.select([col.name for col in ds_sp.schema if col.datatype ==StringType()]).columns
OHE_POST_COLS = [i+"_OHE" for i in OHE_COLS]


# Encode categoricals to numeric columns
snowml_ohe = snowml.OneHotEncoder(input_cols=OHE_COLS, output_cols = OHE_COLS, drop_input_cols=True)
ds_sp_ohe = snowml_ohe.fit(ds_sp).transform(ds_sp)

ds_sp_ohe.columns

In [18]:
train, test = ds_sp_ohe.random_split(weights=[0.70, 0.30], seed=0)

In [19]:
train = train.fillna(0)
test = test.fillna(0)

In [20]:
train_pd = train.to_pandas()
test_pd = test.to_pandas()

## Model Training
### Below we will define and fit an xgboost classifier as our baseline model and evaluate the performance
##### Note this is all done with OSS frameworks

In [21]:
#Define model config
xgb_base = XGBClassifier(
    max_depth=50,
    n_estimators=3,
    learning_rate = 0.75,
    booster = 'gbtree')

In [22]:
#Split train data into X, y
X_train_pd = train_pd.drop(["TIMESTAMP", "PATIENT_ID", "READMITTED"],axis=1) #remove
y_train_pd = train_pd.READMITTED

#train model
xgb_base.fit(X_train_pd,y_train_pd)

In [23]:
from sklearn.metrics import f1_score, precision_score, recall_score
train_preds_base = xgb_base.predict(X_train_pd) #update this line with correct ata

f1_base_train = round(f1_score(y_train_pd, train_preds_base),4)
precision_base_train = round(precision_score(y_train_pd, train_preds_base),4)
recall_base_train = round(recall_score(y_train_pd, train_preds_base),4)

print(f'F1: {f1_base_train} \nPrecision {precision_base_train} \nRecall: {recall_base_train}')

# Model Registry

- Log models with important metadata
- Manage model lifecycles
- Serve models from Snowflake runtimes

In [24]:
#Create a snowflake model registry object 
from snowflake.ml.registry import Registry

# Define model name
model_name = f"PATIENT_READMISSION_MLOPS_{VERSION_NUM}"

# Create a registry to log the model to
model_registry = Registry(session=session, 
                          database_name=DB, 
                          schema_name=SCHEMA,
                          options={"enable_monitoring": True})

In [25]:
#Log the base model to the model registry (if not already there)
base_version_name = 'XGB_BASE'

try:
    #Check for existing model
    mv_base = model_registry.get_model(model_name).version(base_version_name)
    print("Found existing model version!")
except:
    print("Logging new model version...")
    #Log model to registry
    mv_base = model_registry.log_model(
        model_name=model_name,
        model=xgb_base, 
        version_name=base_version_name,
        sample_input_data = train.drop(["TIMESTAMP", "PATIENT_ID", "READMITTED"]).limit(100), #using snowpark df to maintain lineage
        comment = f"""ML model for predicting patient readmission likelihood.
                    This model was trained using XGBoost classifier.
                    Hyperparameters used were:
                    max_depth={xgb_base.max_depth}, 
                    n_estimators={xgb_base.n_estimators}, 
                    learning_rate = {xgb_base.learning_rate}, 
                    algorithm = {xgb_base.booster}
                    """,
        target_platforms= ["WAREHOUSE", "SNOWPARK_CONTAINER_SERVICES"],
        options= {"enable_explainability": True}

    )
    
    #set metrics
    mv_base.set_metric(metric_name="Train_F1_Score", value=f1_base_train)
    mv_base.set_metric(metric_name="Train_Precision_Score", value=precision_base_train)
    mv_base.set_metric(metric_name="Train_Recall_score", value=recall_base_train)

In [26]:
#Create tag for PROD model
session.sql("CREATE OR REPLACE TAG PROD").collect()

In [27]:
#Apply prod tag 
m = model_registry.get_model(model_name)
m.comment = "Loan approval prediction models" #set model level comment
m.set_tag("PROD", base_version_name)
m.show_tags()

In [28]:
model_registry.show_models()

In [29]:
model_registry.get_model(model_name).show_versions()

In [30]:
print(mv_base)
print(mv_base.show_metrics())

In [31]:
mv_base.show_functions()

In [32]:
reg_preds = mv_base.run(test, function_name = "predict").rename(col('"output_feature_0"'), "READMISSION_PREDICTION")
reg_preds.show(10)

In [33]:
#ds_sp_ohe = ds_sp_ohe.rename(col('"LOAN_PURPOSE_NAME_Home improvement"'), "LOAN_PURPOSE_NAME_Home_improvement")

preds_pd = reg_preds.select(["READMITTED", "READMISSION_PREDICTION"]).to_pandas()
f1_base_test = round(f1_score(preds_pd.READMITTED, preds_pd.READMISSION_PREDICTION),4)
precision_base_test = round(precision_score(preds_pd.READMITTED, preds_pd.READMISSION_PREDICTION),4)
recall_base_test = round(recall_score(preds_pd.READMITTED, preds_pd.READMISSION_PREDICTION),4)

#log metrics to model registry model
mv_base.set_metric(metric_name="Test_F1_Score", value=f1_base_test)
mv_base.set_metric(metric_name="Test_Precision_Score", value=precision_base_test)
mv_base.set_metric(metric_name="Test_Recall_score", value=recall_base_test)

print(f'F1: {f1_base_test} \nPrecision {precision_base_test} \nRecall: {recall_base_test}')

## Oh no! Our model's performance seems to have dropped off significantly from training to our test set. 

#### This is evidence that our model is overfit - can we fix this with Distributed Hyperparameter Optimization??

## This is also an excellent opportunity to test out Snowflake's Experiment Tracking Functionality 

#### Snowflake Experiment Tracking provides a mechanism for creating experiments and logging runs within Snowflake from any development environment. This capability allows you to log key pieces of information regarding your model training runs such as model parameters and metrics. In the UI, you can deep dive into a particular run or compare multiple runs to find the optimal model.

#### Below we will train multiple models using distributed HPO and log results to the Experiment Tracker!

In [38]:
from snowflake.ml.data import DataConnector
from snowflake.ml.modeling.tune import get_tuner_context
from snowflake.ml.modeling import tune
from entities import search_algorithm
import psutil
from snowflake.ml.experiment.experiment_tracking import ExperimentTracking

#Define dataset map
dataset_map = {
    "x_train": DataConnector.from_dataframe(train.drop("READMITTED", "TIMESTAMP", "PATIENT_ID")),
    "y_train": DataConnector.from_dataframe(train.select("READMITTED")),
    "x_test": DataConnector.from_dataframe(test.drop("READMITTED","TIMESTAMP", "PATIENT_ID")),
    "y_test": DataConnector.from_dataframe(test.select("READMITTED"))
    }

# exp = ExperimentTracking(session=session)

# Define a training function, with any models you choose within it.
def train_func():

    # A context object provided by HPO API to expose data for the current HPO trial
    
    tuner_context = get_tuner_context()
    
    #Generate params
    config = tuner_context.get_hyper_params()
    dm = tuner_context.get_dataset_map()
    
    #Instantiate mdoel with generated params
    model = XGBClassifier(**config, random_state=42)

    X_train_pd = dm["x_train"].to_pandas().sort_index()
    y_train_pd = dm["y_train"].to_pandas().sort_index()
    X_test_pd = dm["x_test"].to_pandas().sort_index()
    y_test_pd = dm["y_test"].to_pandas().sort_index()

    #Train model, get preds
    model.fit(X_train_pd,y_train_pd)

    #Run inference on train preds
    train_preds = model.predict(X_train_pd)

    #Run inference on test preds
    test_preds = model.predict(X_test_pd)
    
    #compute metrics 
    f1_train = f1_score(y_train_pd,train_preds)
    precision_train = precision_score(y_train_pd,train_preds)
    recall_train = recall_score(y_train_pd,train_preds)

    f1_test = f1_score(y_test_pd,test_preds)
    precision_test = precision_score(y_test_pd,test_preds)
    recall_test = recall_score(y_test_pd,test_preds)

    metrics_to_log = {"F1_Train": f1_train,
                     "Precision_Train": precision_train,
                     "Recall_Train": recall_train,
                     "F1_Test": f1_test,
                     "Precision_Test": precision_test,
                     "Recall_Test": recall_test,}

    tuner_context.report(metrics=metrics_to_log, model=model)
        
tuner = tune.Tuner(
    train_func=train_func,
    search_space={
        "max_depth": tune.randint(1, 30),
        "learning_rate": tune.uniform(0.01, 0.5),
        "n_estimators": tune.randint(50, 150),
    },
    tuner_config=tune.TunerConfig(
        metric="F1_Test",
        mode="max",
        search_alg=search_algorithm.RandomSearch(random_state=101),
        num_trials=8, #run 8 trial runs
        max_concurrent_trials=1 # Use all available CPUs to run distributed HPO across. GPUs can also be used here! 
    ),
)

In [39]:
tuner_results = tuner.run(dataset_map=dataset_map)
tuner_results.results

In [50]:
exp = ExperimentTracking(session=session)
exp.set_experiment("E2E_MLOPS_HPO_Experiments")

cols = tuner_results.results.columns
metrics = tuner_results.results[[c for c in cols if c.split("_")[-1] in ['Train','Test']]].to_dict(orient='records')
configs = tuner_results.results[[c for c in cols if c.startswith("config/")]].to_dict(orient='records')

for cfg, mt in zip(configs,metrics):
    with exp.start_run():
        exp.log_params(cfg)
        exp.log_metrics(mt)

In [51]:
#Select best model results and inspect configuration
tuned_model = tuner_results.best_model
tuned_model

In [52]:
#Generate predictions
xgb_opt_preds = tuned_model.predict(train_pd.drop(["TIMESTAMP", "PATIENT_ID", "READMITTED"],axis=1))

#Generate performance metrics
f1_opt_train = round(f1_score(train_pd.READMITTED, xgb_opt_preds),4)
precision_opt_train = round(precision_score(train_pd.READMITTED, xgb_opt_preds),4)
recall_opt_train = round(recall_score(train_pd.READMITTED, xgb_opt_preds),4)

print(f'Train Results: \nF1: {f1_opt_train} \nPrecision {precision_opt_train} \nRecall: {recall_opt_train}')

In [53]:
#Generate test predictions
xgb_opt_preds_test = tuned_model.predict(test_pd.drop(["TIMESTAMP", "PATIENT_ID", "READMITTED"],axis=1))

#Generate performance metrics on test data
f1_opt_test = round(f1_score(test_pd.READMITTED, xgb_opt_preds_test),4)
precision_opt_test = round(precision_score(test_pd.READMITTED, xgb_opt_preds_test),4)
recall_opt_test = round(recall_score(test_pd.READMITTED, xgb_opt_preds_test),4)

print(f'Test Results: \nF1: {f1_opt_test} \nPrecision {precision_opt_test} \nRecall: {recall_opt_test}')

## Here we see the HPO model has a more modest train accuracy than our base model - but the peformance doesn't drop off on new data (test set) 

In [54]:
#Log the optimized model to the model registry (if not already there)
optimized_version_name = 'XGB_Optimized'

try:
    #Check for existing model
    mv_opt = model_registry.get_model(model_name).version(optimized_version_name)
    print("Found existing model version!")
except:
    #Log model to registry
    print("Logging new model version...")
    mv_opt = model_registry.log_model(
        model_name=model_name,
        model=tuned_model, 
        version_name=optimized_version_name,
        sample_input_data = train.drop(["TIMESTAMP", "PATIENT_ID", "READMITTED"]).limit(100),
        comment = f"""HPO ML model for predicting patient readmission likelihood.
            This model was trained using XGBoost classifier.
            Optimized hyperparameters used were:
            max_depth={tuned_model.max_depth}, 
            n_estimators={tuned_model.n_estimators}, 
            learning_rate = {tuned_model.learning_rate}, 
            """,
        target_platforms= ["WAREHOUSE", "SNOWPARK_CONTAINER_SERVICES"],
        options= {"enable_explainability": True}

        

    )
    #Set metrics
    mv_opt.set_metric(metric_name="Train_F1_Score", value=f1_opt_train)
    mv_opt.set_metric(metric_name="Train_Precision_Score", value=precision_opt_train)
    mv_opt.set_metric(metric_name="Train_Recall_score", value=recall_opt_train)

    mv_opt.set_metric(metric_name="Test_F1_Score", value=f1_opt_test)
    mv_opt.set_metric(metric_name="Test_Precision_Score", value=precision_opt_test)
    mv_opt.set_metric(metric_name="Test_Recall_score", value=recall_opt_test)

In [None]:
model_name

In [55]:
#Here we see the BASE version is our default version
model_registry.get_model(model_name).default

In [56]:
#Now we'll set the optimized model to be the default model version going forward
model_registry.get_model(model_name).default = optimized_version_name

In [57]:
#Now we see our optimized version we have now recently promoted to our DEFAULT model version
model_registry.get_model(model_name).default

In [59]:
#we'll now update the PROD tagged model to be the optimized model version rather than our overfit base version
m.unset_tag("PROD")
m.set_tag("PROD", optimized_version_name)
m.show_tags()


## Now that we've deployed some model versions and tested inference... 
# Let's explain our models!
- ### Snowflake offers built in explainability capabilities on top of models logged to the model registry
- ### In the below section we'll generate shapley values using these built in functions to understand how input features impact our model's behavior

In [60]:
#create a sample of 1000 records
test_pd_sample=test_pd.rename(columns=rename_dict).sample(n=2500, random_state = 100).reset_index(drop=True)

#Compute shapley values for each model
base_shap_pd = mv_base.run(test_pd_sample, function_name="explain")
opt_shap_pd = mv_opt.run(test_pd_sample, function_name="explain")

In [None]:
import shap 

shap.summary_plot(np.array(base_shap_pd.astype(float)), 
                  test_pd_sample.drop(["PATIENT_ID","READMITTED", "TIMESTAMP"], axis=1), 
                  feature_names = test_pd_sample.drop(["PATIENT_ID","READMITTED", "TIMESTAMP"], axis=1).columns)

In [None]:
shap.summary_plot(np.array(opt_shap_pd.astype(float)), 
                  test_pd_sample.drop(["PATIENT_ID","READMITTED", "TIMESTAMP"], axis=1), 
                  feature_names = test_pd_sample.drop(["PATIENT_ID","READMITTED", "TIMESTAMP"], axis=1).columns)

In [None]:
#Merge shap vals and actual vals together for easier plotting below
all_shap_base = test_pd_sample.merge(base_shap_pd, right_index=True, left_index=True, how='outer')
all_shap_opt = test_pd_sample.merge(opt_shap_pd, right_index=True, left_index=True, how='outer')

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

#filter data down to strip outliers
asb_filtered = all_shap_base[(all_shap_base.PATIENT_AGE>0) & (all_shap_base.PATIENT_AGE<100)]
aso_filtered = all_shap_opt[(all_shap_opt.PATIENT_AGE>0) & (all_shap_opt.PATIENT_AGE<100)]

# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("PATIENT AGE EXPLANATION")
# Plot side-by-side boxplots
sns.scatterplot(data = asb_filtered, x ='PATIENT_AGE', y = 'PATIENT_AGE_explanation', ax=axes[0])
sns.regplot(data = asb_filtered, x ="PATIENT_AGE", y = 'PATIENT_AGE_explanation', scatter=False, color='red', line_kws={"lw":2},ci =100, lowess=False, ax =axes[0])

axes[0].set_title('Base Model')
sns.scatterplot(data = aso_filtered, x ='PATIENT_AGE', y = 'PATIENT_AGE_explanation',color = "orange", ax = axes[1])
sns.regplot(data = aso_filtered, x ="PATIENT_AGE", y = 'PATIENT_AGE_explanation', scatter=False, color='blue', line_kws={"lw":2},ci =100, lowess=False, ax =axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("Patient Age")
    ax.set_ylabel("Influence")
plt.tight_layout()
plt.show()

In [None]:
#filter data down to strip outliers
asb_filtered = all_shap_base[all_shap_base.TREATMENT_COST<500000]
aso_filtered = all_shap_opt[all_shap_opt.TREATMENT_COST<500000]


# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("TREATMENT_COST EXPLANATION")
# Plot side-by-side boxplots
sns.scatterplot(data = asb_filtered, x ='TREATMENT_COST', y = 'TREATMENT_COST_explanation', ax=axes[0])
sns.regplot(data = asb_filtered, x ="TREATMENT_COST", y = 'TREATMENT_COST_explanation', scatter=False, color='red', line_kws={"lw":2},ci =100, lowess=True, ax =axes[0])
axes[0].set_title('Base Model')

sns.scatterplot(data = aso_filtered, x ='TREATMENT_COST', y = 'TREATMENT_COST_explanation',color = "orange", ax = axes[1])
sns.regplot(data = aso_filtered, x ="TREATMENT_COST", y = 'TREATMENT_COST_explanation', scatter=False, color='blue', line_kws={"lw":2},ci =100, lowess=True, ax =axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("TREATMENT_COST")
    ax.set_ylabel("Influence")
    # ax.set_xlim((0,10000))
plt.tight_layout()
plt.show()

In [None]:
# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("EMERGENCY ADMISSION EXPLANATION")
# Plot side-by-side boxplots
sns.boxplot(data = all_shap_base, x ='ADMISSION_TYPE_EMERGENCY', y = 'ADMISSION_TYPE_EMERGENCY_explanation',
            hue='ADMISSION_TYPE_EMERGENCY', width=0.8, ax=axes[0])
axes[0].set_title('Base Model')
sns.boxplot(data = all_shap_opt, x ='ADMISSION_TYPE_EMERGENCY', y = 'ADMISSION_TYPE_EMERGENCY_explanation',
            hue='ADMISSION_TYPE_EMERGENCY', width=0.4, ax = axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("Emergency Admission (1 = True)")
    ax.set_ylabel("Influence")
    ax.legend(loc='upper right')

plt.show()

In [None]:
# Set up the figure
fig, axes = plt.subplots(1, 2, figsize=(10, 6))
fig.suptitle("ELECTIVE ADMISSION EXPLANATION")
# Plot side-by-side boxplots
sns.boxplot(data = all_shap_base, x ='ADMISSION_TYPE_ELECTIVE', y = 'ADMISSION_TYPE_ELECTIVE_explanation',
            hue='ADMISSION_TYPE_ELECTIVE', width=0.8, ax=axes[0])
axes[0].set_title('Base Model')
sns.boxplot(data = all_shap_opt, x ='ADMISSION_TYPE_ELECTIVE', y = 'ADMISSION_TYPE_ELECTIVE_explanation',
            hue='ADMISSION_TYPE_ELECTIVE', width=0.4, ax = axes[1])
axes[1].set_title('Opt Model')

# Customize and show the plot
for ax in axes:
    ax.set_xlabel("Elective Admission (1 = True)")
    ax.set_ylabel("Influence")
    ax.legend(loc='upper right')

plt.show()

# Model Monitoring setup

In [None]:
train.write.save_as_table(f"DEMO_PATIENT_READMISSION_TRAIN_{VERSION_NUM}", mode="overwrite")
test.write.save_as_table(f"DEMO_PATIENT_READMISSION_TEST_{VERSION_NUM}", mode="overwrite")

In [None]:
session.sql("CREATE stage IF NOT EXISTS ML_STAGE").collect()

In [None]:
from snowflake import snowpark

def demo_inference_sproc(session: snowpark.Session, table_name: str, modelname: str, modelversion: str) -> str:

    reg = Registry(session=session)
    m = reg.get_model(model_name)  # Fetch the model using the registry
    mv = m.version(modelversion)
    
    input_table_name=table_name
    pred_col = f'{modelversion}_PREDICTION'

    # Read the input table to a dataframe
    df = session.table(input_table_name)
    results = mv.run(df, function_name="predict").select("PATIENT_ID",'"output_feature_0"').withColumnRenamed('"output_feature_0"', pred_col)
    # 'results' is the output DataFrame with predictions

    final = df.join(results, on="PATIENT_ID", how="full")
    # Write results back to Snowflake table
    final.write.save_as_table(table_name, mode='overwrite',enable_schema_evolution=True)

    return "Success"

# Register the stored procedure
session.sproc.register(
    func=demo_inference_sproc,
    name="model_inference_sproc",
    replace=True,
    is_permanent=True,
    stage_location="@ML_STAGE",
    packages=['joblib', 'snowflake-snowpark-python', 'snowflake-ml-python'],
    return_type=StringType()
)

In [None]:
CALL model_inference_sproc('DEMO_PATIENT_READMISSION_TRAIN_{{VERSION_NUM}}','{{model_name}}', '{{base_version_name}}');

In [None]:
CALL model_inference_sproc('DEMO_PATIENT_READMISSION_TEST_{{VERSION_NUM}}','{{model_name}}', '{{base_version_name}}');

In [None]:
CALL model_inference_sproc('DEMO_PATIENT_READMISSION_TRAIN_{{VERSION_NUM}}','{{model_name}}', '{{optimized_version_name}}');

In [None]:
CALL model_inference_sproc('DEMO_PATIENT_READMISSION_TEST_{{VERSION_NUM}}','{{model_name}}', '{{optimized_version_name}}');

In [None]:
select TIMESTAMP, PATIENT_ID, PATIENT_AGE, TREATMENT_COST, XGB_BASE_PREDICTION, XGB_OPTIMIZED_PREDICTION, READMITTED 
FROM DEMO_PATIENT_READMISSION_TEST_{{VERSION_NUM}} 
limit 20

## Now that our models have been deployed and we have run inference - lets set up ML Observability!

- First we will add a column to our inference data to later explore with our segmentation capabilities 
- We will define a model monitor for each model, with the training data as our baseline and the test data representing inference results. 
- Once the monitors are defined we can access them via the Model Registry 
    - We can also query drift metrics etc. programmatically

In [None]:
ALTER TABLE DEMO_PATIENT_READMISSION_TEST_{{VERSION_NUM}}
ADD COLUMN IF NOT EXISTS ADMISSION_CATEGORY VARCHAR(50);


UPDATE DEMO_PATIENT_READMISSION_TEST_{{VERSION_NUM}}
SET ADMISSION_CATEGORY = CASE
    WHEN ADMISSION_TYPE_EMERGENCY = 1 THEN 'EMERGENCY'
    WHEN ADMISSION_TYPE_ELECTIVE = 1 THEN 'ELECTIVE'
    WHEN ADMISSION_TYPE_URGENT = 1 THEN 'URGENT'
    ELSE 'OTHER'
END;

In [None]:
ALTER TABLE DEMO_PATIENT_READMISSION_TRAIN_{{VERSION_NUM}}
ADD COLUMN IF NOT EXISTS ADMISSION_CATEGORY VARCHAR(50);


UPDATE DEMO_PATIENT_READMISSION_TRAIN_{{VERSION_NUM}}
SET ADMISSION_CATEGORY = CASE
    WHEN ADMISSION_TYPE_EMERGENCY = 1 THEN 'EMERGENCY'
    WHEN ADMISSION_TYPE_ELECTIVE = 1 THEN 'ELECTIVE'
    WHEN ADMISSION_TYPE_URGENT = 1 THEN 'URGENT'
    ELSE 'OTHER'
END;

In [None]:
SELECT ADMISSION_TYPE_EMERGENCY, ADMISSION_TYPE_ELECTIVE, ADMISSION_TYPE_URGENT, ADMISSION_CATEGORY FROM DEMO_PATIENT_READMISSION_TEST_{{VERSION_NUM}} limit 10;

In [None]:
CREATE OR REPLACE MODEL MONITOR PATIENT_READMISSION_BASE_MODEL_MONITOR
WITH
    MODEL={{model_name}}
    VERSION={{base_version_name}}
    FUNCTION=predict
    SOURCE=DEMO_PATIENT_READMISSION_TEST_{{VERSION_NUM}}
    BASELINE=DEMO_PATIENT_READMISSION_TRAIN_{{VERSION_NUM}}
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(XGB_BASE_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(READMITTED)
    ID_COLUMNS=(PATIENT_ID)
    SEGMENT_COLUMNS = ('ADMISSION_CATEGORY')
    WAREHOUSE={{COMPUTE_WAREHOUSE}}
    REFRESH_INTERVAL='12 hours'
    AGGREGATION_WINDOW='1 day';

In [None]:
CREATE OR REPLACE MODEL MONITOR PATIENT_READMISSION_OPTIMIZED_MODEL_MONITOR
WITH
    MODEL={{model_name}}
    VERSION={{optimized_version_name}}
    FUNCTION=predict
    SOURCE=DEMO_PATIENT_READMISSION_TEST_{{VERSION_NUM}}
    BASELINE=DEMO_PATIENT_READMISSION_TRAIN_{{VERSION_NUM}}
    TIMESTAMP_COLUMN=TIMESTAMP
    PREDICTION_CLASS_COLUMNS=(XGB_OPTIMIZED_PREDICTION)  
    ACTUAL_CLASS_COLUMNS=(READMITTED)
    ID_COLUMNS=(PATIENT_ID)
    SEGMENT_COLUMNS = ('ADMISSION_CATEGORY')
    WAREHOUSE={{COMPUTE_WAREHOUSE}}
    REFRESH_INTERVAL='12 hours'
    AGGREGATION_WINDOW='1 day';

In [None]:
SELECT * FROM TABLE(MODEL_MONITOR_DRIFT_METRIC(
'PATIENT_READMISSION_BASE_MODEL_MONITOR', -- model monitor to use
'DIFFERENCE_OF_MEANS', -- metric for computing drift
'XGB_BASE_PREDICTION', -- column to compute drift on
'1 DAY',  -- day granularity for drift computation
DATEADD(DAY, -90, CURRENT_DATE()), -- end date
DATEADD(DAY, -60, CURRENT_DATE()) -- start date
)
)

# Retraining pipeline

In [None]:
-- Create table to store daily drift metrics
CREATE OR REPLACE TABLE PATIENT_READMISSION_DRIFT_METRICS (
    METRIC_DATE DATE,
    MODEL_MONITOR_NAME VARCHAR(200),
    COLUMN_NAME VARCHAR(200),
    DRIFT_METRIC_NAME VARCHAR(100),
    DRIFT_VALUE FLOAT,
    THRESHOLD_EXCEEDED BOOLEAN,
    RECORDED_AT TIMESTAMP_NTZ DEFAULT CURRENT_TIMESTAMP()
);

In [None]:
-- Create task to compute and store drift metrics daily
CREATE OR REPLACE TASK DAILY_DRIFT_METRICS_UPDATE
  WAREHOUSE = {{COMPUTE_WAREHOUSE}}
  SCHEDULE = 'USING CRON 0 2 * * * America/Los_Angeles'  -- 2 AM daily
AS
INSERT INTO PATIENT_READMISSION_DRIFT_METRICS (
    METRIC_DATE,
    MODEL_MONITOR_NAME,
    COLUMN_NAME,
    DRIFT_METRIC_NAME,
    DRIFT_VALUE,
    THRESHOLD_EXCEEDED
)
SELECT 
    CURRENT_DATE() AS METRIC_DATE,
    'PATIENT_READMISSION_BASE_MODEL_MONITOR' AS MODEL_MONITOR_NAME,
    'XGB_BASE_PREDICTION' AS COLUMN_NAME,
    'DIFFERENCE_OF_MEANS' AS DRIFT_METRIC_NAME,
    dm.METRIC_VALUE,
    CASE 
        WHEN ABS(dm.METRIC_VALUE) > 0.05 THEN TRUE  -- 5% threshold
        ELSE FALSE 
    END AS THRESHOLD_EXCEEDED
FROM TABLE(MODEL_MONITOR_DRIFT_METRIC(
    'PATIENT_READMISSION_BASE_MODEL_MONITOR',
    'DIFFERENCE_OF_MEANS',
    'XGB_BASE_PREDICTION',
    '1 DAY',
    DATEADD(DAY, -1, CURRENT_DATE()),
    CURRENT_DATE()
)) dm;

In [None]:
-- Create stream to track changes to drift metrics table
CREATE OR REPLACE STREAM DRIFT_METRICS_STREAM
  ON TABLE PATIENT_READMISSION_DRIFT_METRICS
  APPEND_ONLY = TRUE;

In [None]:
from snowflake.snowpark.types import StringType

def trigger_hpo_training_job(session: snowpark.Session) -> str:
    """
    Trigger HPO training using Snowpark ML Jobs API
    """
    import json
    from datetime import datetime
    from snowflake.ml.jobs import remote
    
    job_id = f"hpo_retrain_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    
    
    # Ensure compute pool exists
    try:
        session.sql("""
            CREATE COMPUTE POOL IF NOT EXISTS ML_HPO_COMPUTE_POOL
            MIN_NODES = 1
            MAX_NODES = 3
            INSTANCE_FAMILY = CPU_X64_M
            AUTO_RESUME = TRUE
            AUTO_SUSPEND_SECS = 300
        """).collect()
        print("Compute pool created/verified")
    except Exception as e:
        print(f"Compute pool setup: {e}")

    @remote(
        compute_pool='ML_HPO_COMPUTE_POOL', 
        stage_name='ML_STAGE',
        pip_requirements=[
            'snowflake-snowpark-python',
            'snowflake-ml-python',
            'xgboost',
            'scikit-learn',
            'pandas'
        ],
        python_version='3.12'
    )
    def hpo_training_job():
        """
        ML Job function to perform HPO training and register new model version
        This will run on Snowpark Container Services
        """
        from snowflake.snowpark import Session
        from snowflake.ml.registry import Registry
        from snowflake.ml.modeling.tune import get_tuner_context
        from snowflake.ml.modeling import tune
        from snowflake.ml.data import DataConnector
        from xgboost import XGBClassifier
        from sklearn.metrics import f1_score, precision_score, recall_score
        import pandas as pd
        from datetime import datetime
        
        # Get session (will be automatically provided in ML Job context)
        session = Session.builder.getOrCreate()
        
        # Configuration
        VERSION_NUM = '0'
        DB = "E2E_SNOW_MLOPS_DB"
        SCHEMA = "MLOPS_SCHEMA"
        
        # Get training data
        train_table = f"DEMO_PATIENT_READMISSION_TRAIN_{VERSION_NUM}"
        test_table = f"DEMO_PATIENT_READMISSION_TEST_{VERSION_NUM}"
        
        train_df = session.table(train_table)
        test_df = session.table(test_table)
        
        # Define dataset map
        dataset_map = {
            "x_train": DataConnector.from_dataframe(
                train_df.drop("READMITTED", "TIMESTAMP", "PATIENT_ID", "ADMISSION_CATEGORY")
            ),
            "y_train": DataConnector.from_dataframe(train_df.select("READMITTED")),
            "x_test": DataConnector.from_dataframe(
                test_df.drop("READMITTED", "TIMESTAMP", "PATIENT_ID", "ADMISSION_CATEGORY")
            ),
            "y_test": DataConnector.from_dataframe(test_df.select("READMITTED"))
        }
        
        # Define training function for HPO
        def train_func():
            tuner_context = get_tuner_context()
            config = tuner_context.get_hyper_params()
            dm = tuner_context.get_dataset_map()
            
            model = XGBClassifier(**config, random_state=42)
            
            X_train_pd = dm["x_train"].to_pandas().sort_index()
            y_train_pd = dm["y_train"].to_pandas().sort_index()
            X_test_pd = dm["x_test"].to_pandas().sort_index()
            y_test_pd = dm["y_test"].to_pandas().sort_index()
            
            model.fit(X_train_pd, y_train_pd)
            
            train_preds = model.predict(X_train_pd)
            test_preds = model.predict(X_test_pd)
            
            f1_train = f1_score(y_train_pd, train_preds)
            precision_train = precision_score(y_train_pd, train_preds)
            recall_train = recall_score(y_train_pd, train_preds)
            
            f1_test = f1_score(y_test_pd, test_preds)
            precision_test = precision_score(y_test_pd, test_preds)
            recall_test = recall_score(y_test_pd, test_preds)
            
            tuner_context.report(
                F1_Train=f1_train,
                Precision_Train=precision_train,
                Recall_Train=recall_train,
                F1_Test=f1_test,
                Precision_Test=precision_test,
                Recall_Test=recall_test
            )
            
            return model
        
        # Configure and run HPO tuner
        tuner = tune.Tuner(
            trainable=train_func,
            param_space={
                "max_depth": tune.randint(3, 10),
                "learning_rate": tune.uniform(0.01, 0.3),
                "n_estimators": tune.randint(50, 200)
            },
            tune_config=tune.TuneConfig(
                metric="F1_Test",
                mode="max",
                num_samples=8
            )
        )
        
        print("Starting HPO tuning...")
        tuner_results = tuner.run(dataset_map=dataset_map)
        best_model = tuner_results.best_model
        
        # Get best metrics
        best_config = tuner_results.best_config
        best_result = tuner_results.results.loc[tuner_results.results['F1_Test'].idxmax()]
        
        print(f"Best config: {best_config}")
        print(f"Best F1 Test: {best_result['F1_Test']}")
        
        # Register model with version suffix as current date
        model_name = f"PATIENT_READMISSION_MLOPS_{VERSION_NUM}"
        version_suffix = datetime.now().strftime('%Y%m%d_%H%M%S')
        new_version = f"XGB_RETRAINED_{version_suffix}"
        
        registry = Registry(
            session=session,
            database_name=DB,
            schema_name=SCHEMA,
            options={"enable_monitoring": True}
        )
        
        print(f"Registering model version: {new_version}")
        mv = registry.log_model(
            model_name=model_name,
            model=best_model,
            version_name=new_version,
            sample_input_data=train_df.drop(
                ["TIMESTAMP", "PATIENT_ID", "READMITTED", "ADMISSION_CATEGORY"]
            ).limit(100),
            comment=f"""Automatically retrained model due to drift detection.
                       HPO optimized with max_depth={best_config['max_depth']},
                       learning_rate={best_config['learning_rate']:.4f},
                       n_estimators={best_config['n_estimators']}
                       Triggered by drift monitoring on {version_suffix}""",
            target_platforms=["WAREHOUSE", "SNOWPARK_CONTAINER_SERVICES"],
            options={"enable_explainability": True}
        )
        
        # Log metrics
        mv.set_metric(metric_name="Train_F1_Score", value=float(best_result['F1_Train']))
        mv.set_metric(metric_name="Train_Precision_Score", value=float(best_result['Precision_Train']))
        mv.set_metric(metric_name="Train_Recall_score", value=float(best_result['Recall_Train']))
        mv.set_metric(metric_name="Test_F1_Score", value=float(best_result['F1_Test']))
        mv.set_metric(metric_name="Test_Precision_Score", value=float(best_result['Precision_Test']))
        mv.set_metric(metric_name="Test_Recall_score", value=float(best_result['Recall_Test']))
        
        print(f"Successfully trained and registered model version: {new_version}")
        
        return {
            "model_version": new_version,
            "f1_score": float(best_result['F1_Test']),
            "status": "success"
        }
        
    try:
        # Submit the job
        result = hpo_training_job()
        return f"HPO training job {job_id} submitted: {result}"
    except Exception as e:
        return f"Error submitting job: {str(e)}"

# Register the stored procedure
session.sproc.register(
    func=trigger_hpo_training_job,
    name="TRIGGER_HPO_TRAINING_JOB",
    replace=True,
    is_permanent=True,
    stage_location="@ML_STAGE",
    packages=['snowflake-snowpark-python','snowflake-ml-python'],
    return_type=StringType(),
    python_version="3.12"
)

print("HPO training job trigger registered")

In [None]:
-- Create task that triggers HPO training job when drift detected
CREATE OR REPLACE TASK RETRAIN_ON_DRIFT_DETECTION
  WAREHOUSE = {{COMPUTE_WAREHOUSE}}
  AFTER DAILY_DRIFT_METRICS_UPDATE
WHEN
  SYSTEM$STREAM_HAS_DATA('DRIFT_METRICS_STREAM')
AS
DECLARE
  drift_detected BOOLEAN;
  job_result VARCHAR;
BEGIN
  -- Check if drift threshold exceeded
  SELECT MAX(THRESHOLD_EXCEEDED)::BOOLEAN
  INTO :drift_detected
  FROM DRIFT_METRICS_STREAM
  WHERE METRIC_DATE >= DATEADD(DAY, -1, CURRENT_DATE());
  
  -- Trigger HPO training job if drift detected
  IF (:drift_detected = TRUE) THEN
    CALL TRIGGER_HPO_TRAINING_JOB() INTO :job_result;
    
    -- Log the event
    INSERT INTO PATIENT_READMISSION_DRIFT_METRICS (
      METRIC_DATE,
      MODEL_MONITOR_NAME,
      COLUMN_NAME,
      DRIFT_METRIC_NAME,
      DRIFT_VALUE,
      THRESHOLD_EXCEEDED
    )
    VALUES (
      CURRENT_DATE(),
      'RETRAINING_EVENT',
      'HPO_JOB_ID',
      :job_result,
      NULL,
      TRUE
    );
  END IF;
END;

# SPCS Deployment setup (OPTIONAL)
## This is disabled by default but uncommenting the below code cells will allow a user to 

- ### Create a new compute pool with 3 XL CPU nodes
- ### Deploys a service on top of our existing HPO model version
- ### Tests out inference on newly created container service

In [None]:
cp_name = "PATIENT_READMISSION_INFERENCE_CP"
num_spcs_nodes = '2'
spcs_instance_family = 'CPU_X64_L'
service_name = 'PATIENT_READMISSION_PREDICTION_SERVICE'

current_database = session.get_current_database().replace('"', '')
current_schema = session.get_current_schema().replace('"', '')
extended_service_name = f'{current_database}.{current_schema}.{service_name}'

In [None]:
session.sql(f"alter compute pool if exists {cp_name} stop all").collect()
session.sql(f"drop compute pool if exists {cp_name}").collect()
session.sql(f"create compute pool {cp_name} min_nodes={num_spcs_nodes} max_nodes={num_spcs_nodes} instance_family={spcs_instance_family} auto_resume=True auto_suspend_secs=300").collect()
session.sql(f"describe compute pool {cp_name}").show()

In [None]:
#note this may take up to 5 minutes to run

mv_opt.create_service(
    service_name=extended_service_name,
    service_compute_pool=cp_name,
    ingress_enabled=True,
    max_instances=int(num_spcs_nodes)
)

In [None]:
model_registry.get_model(model_name).show_versions()

In [None]:
mv_container = model_registry.get_model(model_name).default
mv_container.run(test, function_name = "predict", service_name = "PATIENT_READMISSION_PREDICTION_SERVICE").rename('"output_feature_0"', 'XGB_PREDICTION')

In [None]:
SHOW ENDPOINTS IN SERVICE E2E_SNOW_MLOPS_DB.MLOPS_SCHEMA.PATIENT_READMISSION_PREDICTION_SERVICE

In [None]:
SHOW MODELS;

In [None]:
#Stop the service to save costs
#session.sql(f"alter compute pool if exists {cp_name} stop all").collect()

## Conclusion 

#### üõ†Ô∏è Snowflake Feature Store tracks feature definitions and maintains lineage of sources and destinations üõ†Ô∏è
#### üöÄ Snowflake Model Registry gives users a secure and flexible framework to log models, tag candidates for production, and run inference and explainability jobs üöÄ
#### üìà ML observability in Snowflake allows users to monitor model performance over time and detect model, feature, and concept drift üìà
#### üîÆ All models logged in the Model Registry can be accessed for inference, explainability, lineage tracking, visibility and more üîÆ