 ## Snowflake Model Registry

- In this notebook, we will illustrate how to train an XGBoost model with the loan approval dataset using the Snowpark ML Model API.
- We also show how to do inference and manage models via Model Registry or as a UDF

The Snowpark ML Model API supports `scikit-learn`, `xgboost`, and `lightgbm` models.

### Import Libraries

In [None]:
# Import Snowpark 
from snowflake.snowpark import Session
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.functions import udf
import snowflake.snowpark.functions as F

# Import Misc Libraries 
import pandas as pd
import altair as alt
import numpy as np
import streamlit as st

# Import Snowpark ML 
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.registry import registry
from snowflake.ml._internal.utils import identifier
from snowflake.ml.modeling.metrics import f1_score, confusion_matrix
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.preprocessing import MinMaxScaler, OrdinalEncoder

# Setup a Session 
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
session.use_warehouse("TASTY_DEV_WH")
session.use_database("FEATURES")
session.use_schema("PUBLIC")

### Load data into a dataframe and show results

In [None]:
# Load the diamond data into a dataframe
credit_df = session.table('FEATURES.PUBLIC.CREDIT_RECORD')
application_df = session.table('FEATURES.PUBLIC.APPLICATION_RECORD')
# Show the data 
application_df.show()


### Perform data cleansing, including feature engineering and null value imputation.

In [None]:
application_df = application_df.with_column('AGE', F.floor(F.abs(F.col('DAYS_BIRTH')) / 365))
application_df = application_df.drop("DAYS_BIRTH")
application_df = application_df.fillna(application_df[[F.mode('OCCUPATION_TYPE')]].collect()[0][0], subset='OCCUPATION_TYPE')

# Add in additional dataset to get target column, which we will join on ID
credit_record_sdf = credit_df.with_column('TARGET', 
                                                  F.when((F.col('STATUS') == '2') | 
                                                         (F.col('STATUS') == '3') | 
                                                         (F.col('STATUS') == '4') | 
                                                         (F.col('STATUS') == '5'), 'YES'))
cpunt = credit_record_sdf.group_by('ID').agg(F.count('TARGET').as_('TARGET'))
cpunt = cpunt.with_column('TARGET', F.when(F.col('TARGET') > 0, 1).otherwise(0))
cpunt = cpunt.drop('DEP_VALUE')

# Joining our target variable to our customer records
application_record_sdf = application_df.join(cpunt, using_columns='ID', join_type='inner')
# Finally we can drop the ID variable as we won't use it for training
application_record_sdf = application_record_sdf.drop('ID')


In [None]:
application_record_sdf.show()

In [None]:
CATEGORICAL_COLUMNS = ["CODE_GENDER", "FLAG_OWN_CAR", "FLAG_OWN_REALTY", "NAME_INCOME_TYPE", "NAME_EDUCATION_TYPE", "NAME_FAMILY_STATUS", "NAME_HOUSING_TYPE","OCCUPATION_TYPE"]
CATEGORICAL_COLUMNS_OE = [i+"_OUT" for i in CATEGORICAL_COLUMNS]
NUMERICAL_COLUMNS = ["CNT_CHILDREN", "AMT_INCOME_TOTAL", "DAYS_EMPLOYED", "FLAG_MOBIL", "FLAG_WORK_PHONE", "FLAG_PHONE", "FLAG_EMAIL", "CNT_FAM_MEMBERS", "AGE"]
LABEL_COLUMNS = ["TARGET"]
OUTPUT_COLUMNS = ["PREDICTED_CAT"]

### Build a simple XGBoost Regression model

What's happening here? 

- The `model.fit()` function actually creates a temporary stored procedure in the background. This also means that the model training is a single-node operation. Be sure to use a [Snowpark Optimized Warehouse](https://docs.snowflake.com/en/user-guide/warehouses-snowpark-optimized) if you need more memory. We are just using an XS Standard Virtual Warehouse here, which we created at the beginning of this quickstart.
- The `model.predict()` function actualls creates a temporary vectorized UDF in the background, which means the input DataFrame is batched as Pandas DataFrames and inference is parallelized across the batches of data.

You can check the query history once you execute the following cell to check.

In [None]:
# Define the categories 

# Initialize a pipeline of transforms
pipeline = Pipeline(
    steps=[
        (
            "OE",
            OrdinalEncoder(
            input_cols=CATEGORICAL_COLUMNS,
            output_cols=CATEGORICAL_COLUMNS_OE
            )
        ),
        (
            "MMS",
            MinMaxScaler(
            clip=True,
            input_cols=NUMERICAL_COLUMNS,
            output_cols=NUMERICAL_COLUMNS,
            )
        ),
        (
            "classifier",
            XGBClassifier(
            input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
            label_cols=LABEL_COLUMNS,
            output_cols=OUTPUT_COLUMNS
            )
        )
    ]
)

# Split the data into train and test sets
train_df, test_df = application_record_sdf.random_split(weights=[0.9, 0.1], seed=0)

# Train on the train set 
pipeline.fit(train_df)

# Predict on the test set 
predicted_results = pipeline.predict(test_df)

# Analyze the results using Snowpark ML's MAPE.
f1 = f1_score(df=predicted_results, y_true_col_names="TARGET", 
                                        y_pred_col_names="PREDICTED_CAT")

predicted_results.select("TARGET", "PREDICTED_CAT").show()
print(f"F1 Score: {f1}")

### Looking at confusion matrix to investigate precision and recall

In [None]:
conf_matrix_df = confusion_matrix(df = predicted_results, y_true_col_name = 'TARGET', y_pred_col_name = 'PREDICTED_CAT')
conf_matrix_df

## Model deployment using Model Registry

Now, with Snowpark ML's model registry, we have a Snowflake native model versioning and deployment framework. This allows us to log models, tag parameters and metrics, track metadata, create versions, and ultimately deploy models into a Snowflake warehouse or Snowpark Container Service for batch scoring tasks.

### Open Model Registry and see all Models

In [None]:
# Get the current database and schema from the current session 
current_db = identifier._get_unescaped_name(session.get_current_database())
current_schema = identifier._get_unescaped_name(session.get_current_schema())

# Open the model registry and point ot the appropriate database and schema 
native_registry = registry.Registry(session=session, database_name=current_db, schema_name=current_schema)

# Show the models in our registry 
native_registry.show_models()

### Log a Model

In [None]:
# Get sample input data to pass into the registry logging function
sample_data = train_df.select(CATEGORICAL_COLUMNS+NUMERICAL_COLUMNS).limit(100)

# Define model name and version
model_name = "loan_approval_model"
model_version = 'v6'

# Log the model 
model_ver = native_registry.log_model( 
    model_name=model_name,
    version_name=model_version,
    model=pipeline,
    sample_input_data=sample_data,
    options={"embed_local_ml_library": True, # This option is enabled to pull latest dev code changes.
             "relax": True}) # relax dependencies


# Add evaluation metric
model_ver.set_metric(metric_name="f1", value=f1)

### Inspect the Model Registry again 

In [None]:
# Show the models in our registry 
native_registry.show_models()

### See the versions of a registered Model

In [None]:

# Show the versions of a registered model 
m = native_registry.get_model('LOAN_APPROVAL_MODEL')
m.show_versions()

### Add or change comments for the model

In [None]:
# Add a comment
m.comment = "Adding in more categorical columns - 2/7/24"

# Show the versions again 
print(m.comment)

### Set the default version for the model


In [None]:
# Set the default version for the model 
m.default = "V4"

# Show the versions 
m.show_versions()

### Use a model from the registry

In [None]:
# Get the version of the model we want to work with 
model_ver = native_registry.get_model('LOAN_APPROVAL_MODEL').version('V4')

# Run the model passing in the test dataframe and using the predict function 
remote_prediction = model_ver.run(test_df, function_name="predict")

# Convert the output to pandas 
remote_prediction.to_pandas()


In [None]:
large_application_df = session.table("FEATURES.PUBLIC.APPLICATION_RECORD_LARGE")
large_application_df.count()

## Inference is quick!

Snowflake parallelizes the batch inference job so that you can get millions of predictions using the tiniest warehouse available (XSMALL). The below runs the model we created on 10 million rows. 

In [None]:
import time
start_time = time.time()
remote_prediction = model_ver.run(large_application_df, function_name="predict")
end_time = time.time()
time_taken = end_time - start_time
print("Time taken to run a batch prediction on 10 Million records using an XSMALL warehouse: {:.2f} seconds".format(time_taken))

In [None]:
remote_prediction.show()