## 3. ML Modeling

- In this notebook, we will illustrate how to train an XGBoost model with the diamonds dataset using the Snowpark ML Model API. 
- We also show how to do inference and deploy the model via Model Registry or as a UDF (See Appendix).

The Snowpark ML Model API currently supports sklearn, xgboost, and lightgbm models.

### Import Libraries

In [None]:
# Snowpark for Python
from snowflake.snowpark import Session
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.functions import udf
import snowflake.snowpark.functions as F

# Snowpark ML
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.registry import model_registry
from snowflake.ml._internal.utils import identifier
from snowflake.ml.utils import connection_params

# data science libs
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error

# misc
import json
import joblib
import cachetools

# warning suppresion
import warnings; warnings.simplefilter('ignore')

### Establish a Secure Connection to Snowflake

*Other connection options include Username/Password, MFA, OAuth, Okta, SSO. For more information, refer to the [Python Connector](https://docs.snowflake.com/en/developer-guide/python-connector/python-connector-example) documentation.*

In [None]:
# Make a Snowpark Connection
session = Session.builder.configs(
connection_params.SnowflakeLoginOptions()).getOrCreate()
session.sql_simplifier_enabled = True

In [None]:
# Specify the table name where we stored the diamonds dataset
# **nChange this only if you named your table something else in the data ingest notebook **
DEMO_TABLE = 'diamonds' 
input_tbl = f"{session.get_current_database()}.{session.get_current_schema()}.{DEMO_TABLE}"

### Load the data & preprocessing pipeline

In [None]:
# Load in the data
diamonds_df = session.table(input_tbl)
diamonds_df.show()

In [None]:
# Categorize all the features for modeling
CATEGORICAL_COLUMNS = ["CUT", "COLOR", "CLARITY"]
CATEGORICAL_COLUMNS_OE = ["CUT_OE", "COLOR_OE", "CLARITY_OE"] # To name the ordinal encoded columns
NUMERICAL_COLUMNS = ["CARAT", "DEPTH", "TABLE_PCT", "X", "Y", "Z"]

LABEL_COLUMNS = ['PRICE']
OUTPUT_COLUMNS = ['PREDICTED_PRICE']

In [None]:
# Load the preprocessing pipeline object (locally in this case but you can also load the one saved to stage in the previous notebook as an optional exercise)
PIPELINE_FILE = 'preprocessing_pipeline.joblib'
preprocessing_pipeline = joblib.load(PIPELINE_FILE)

### Build a simple XGBoost Regression model

What's happening here? 

- The `model.fit()` function actually creates a temporary stored procedure in the background. This also means that the model training is a single-node operation. Be sure to use a [Snowpark Optimized Warehouse](https://docs.snowflake.com/en/user-guide/warehouses-snowpark-optimized) if you need more memory. We are just using an XS Standard Virtual Warehouse here, which we created at the beginning of this quickstart.
- The `model.predict()` function actualls creates a temporary vectorized UDF in the background, which means the input DataFrame is batched as Pandas DataFrames and inference is parallelized across the batches of data.

You can check the query history once you execute the following cell to check.

In [None]:
# Split the data into train and test sets
diamonds_train_df, diamonds_test_df = diamonds_df.random_split(weights=[0.9, 0.1], seed=0)

# Run the train and test sets through the Pipeline object we defined earlier
train_df = preprocessing_pipeline.fit(diamonds_train_df).transform(diamonds_train_df)
test_df = preprocessing_pipeline.transform(diamonds_test_df)

In [None]:
# Define the XGBRegressor
regressor = XGBRegressor(
    input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
regressor.fit(train_df)

# Predict
result = regressor.predict(test_df)

In [None]:
# Just to illustrate, we can also pass in a Pandas DataFrame to Snowpark ML's model.predict()
regressor.predict(test_df[CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS].to_pandas())

Let's analyze the results using Snowpark ML's MAPE.

In [None]:
mape = mean_absolute_percentage_error(df=result, 
                                        y_true_col_names="PRICE", 
                                        y_pred_col_names="PREDICTED_PRICE")

result.select("PRICE", "PREDICTED_PRICE").show()
print(f"Mean absolute percentage error: {mape}")

In [None]:
# Plot actual vs predicted 
g = sns.relplot(data=result["PRICE", "PREDICTED_PRICE"].to_pandas().astype("float64"), x="PRICE", y="PREDICTED_PRICE", kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

plt.show()

### Now, let's use Snowpark ML's **Distributed** `GridSearchCV()` function to find optimal model parameters

We will increase the warehouse size to scale up our hyperparameter tuning to take advantage of parallelized model training to accelerate this search.

In [None]:
session.sql("ALTER WAREHOUSE ML_HOL_WH SET WAREHOUSE_SIZE=LARGE;").collect()

In [None]:
grid_search = GridSearchCV(
    estimator=XGBRegressor(),
    param_grid={
        "n_estimators":[100, 200, 300, 400, 500],
        "learning_rate":[0.1, 0.2, 0.3, 0.4, 0.5],
    },
    n_jobs = -1,
    scoring="neg_mean_absolute_percentage_error",
    input_cols=CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS,
    label_cols=LABEL_COLUMNS,
    output_cols=OUTPUT_COLUMNS
)

# Train
grid_search.fit(train_df)

In [None]:
session.sql("ALTER WAREHOUSE ML_HOL_WH SET WAREHOUSE_SIZE=XSMALL;").collect()

We see that the best estimator has the following parameters: `n_estimators=500` & `learning_rate=0.4`.

We can use `to_sklearn()` in order to get the actual xgboost model object, which gives us access to all its attributes.

In [None]:
grid_search.to_sklearn().best_estimator_

We can also analyze the grid search results.

In [None]:
# Analyze grid search results
gs_results = grid_search.to_sklearn().cv_results_
n_estimators_val = []
learning_rate_val = []
for param_dict in gs_results["params"]:
    n_estimators_val.append(param_dict["n_estimators"])
    learning_rate_val.append(param_dict["learning_rate"])
mape_val = gs_results["mean_test_score"]*-1

gs_results_df = pd.DataFrame(data={
    "n_estimators":n_estimators_val,
    "learning_rate":learning_rate_val,
    "mape":mape_val})

sns.relplot(data=gs_results_df, x="learning_rate", y="mape", hue="n_estimators", kind="line")

plt.show()

This is consistent with the `learning_rate=0.4` and `n_estimator=500` chosen as the best estimator with the lowest MAPE.

Now, let's predict and analyze the results from using the best estimator.

In [None]:
# Predict
result = grid_search.predict(test_df)

# Analyze results
mape = mean_absolute_percentage_error(df=result, 
                                        y_true_col_names="PRICE", 
                                        y_pred_col_names="PREDICTED_PRICE")

result.select("PRICE", "PREDICTED_PRICE").show()
print(f"Mean absolute percentage error: {mape}")

In [None]:
# Plot actual vs predicted 
g = sns.relplot(data=result["PRICE", "PREDICTED_PRICE"].to_pandas().astype("float64"), x="PRICE", y="PREDICTED_PRICE", kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

plt.show()

### Model deployment using Model Registry

Now, with Snowpark ML's model registry, we have a Snowflake native model versioning and deployment framework. This allows us to log models, tag parameters and metrics, track metadata, create versions, and ultimately deploy models into a Snowflake warehouse or Snowpark Container Service for batch scoring tasks.

In [None]:
# Let's save our optimal model first and its metadata
optimal_model = grid_search.to_sklearn().best_estimator_
optimal_n_estimators = grid_search.to_sklearn().best_estimator_.n_estimators
optimal_learning_rate = grid_search.to_sklearn().best_estimator_.learning_rate

optimal_mape = gs_results_df.loc[(gs_results_df['n_estimators']==optimal_n_estimators) &
                                 (gs_results_df['learning_rate']==optimal_learning_rate), 'mape'].values[0]

First, we will log our model.

In [None]:
# Get sample input data to pass into the registry logging function
X = train_df.select(CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS).limit(100)

db = identifier._get_unescaped_name(session.get_current_database())
schema = identifier._get_unescaped_name(session.get_current_schema())

# Define model name and version
model_name = "diamonds_model"
model_version = 1

# Create a registry and log the model
registry = model_registry.ModelRegistry(session=session, database_name=db, schema_name=schema, create_if_not_exists=True)

registry.log_model(
    model_name=model_name,
    model_version=model_version,
    model=optimal_model,
    sample_input_data=X,
    options={"embed_local_ml_library": True, # This option is enabled to pull latest dev code changes.
             "relax": True} # relax dependencies
)

# Add evaluation metric
registry.set_metric(model_name=model_name, model_version=model_version, metric_name="mean_abs_pct_err", metric_value=optimal_mape)

In [None]:
# Let's confirm it was added
registry.list_models().to_pandas()

Now, we're ready to deploy to a Snowflake Warehouse.

In [None]:
# Pick a deployment name and deploy
model_deployment_name = model_name + f"{model_version}" + "_UDF"

registry.deploy(model_name=model_name,
                model_version=model_version,
                deployment_name=model_deployment_name, 
                target_method="predict", 
                permanent=True, 
                options={"relax_version": True})

In [None]:
# Let's confirm it was added
registry.list_deployments(model_name, model_version).to_pandas()

Now we can use the deployed model to perform inference.

In [None]:
# We can always get a reference to our registry using this function call
model_ref = model_registry.ModelReference(registry=registry, model_name=model_name, model_version=model_version)

# We can then use the deployed model to perform inference
result_sdf = model_ref.predict(deployment_name=model_deployment_name, data=test_df)
#result_sdf.rename(F.col('"output_feature_0"'),"PREDICTED_PRICE").show()
result_sdf.show()

In [None]:
model_ref.deploy(deployment_name="Diamonds_predict",
             target_method="predict",    # the name of the model's method, usually predict
             permanent=True)

In [None]:
model_ref.predict("Diamonds_predict", test_df).show()


In [None]:
test_df.write.mode("overwrite").save_as_table(
    "TEST_DATA"
)

Let's do some clean up now.

In [None]:
# Clean up
registry.delete_deployment(model_name=model_name, model_version=model_version, deployment_name=model_deployment_name)
registry.delete_model(model_name=model_name, model_version=model_version, delete_artifact=True)

In [None]:
registry.list_deployments(model_name, model_version).to_pandas()

In [None]:
registry.list_models().to_pandas()

### Appendix

#### Deploy model using a Vectorized UDF

In case you need to deploy a model not trained using Snowpark ML or externally trained, you can still deploy via Vectorized UDFs.

In [None]:
# Let's save our optimal model first
MODEL_FILE = 'model.joblib'
joblib.dump(optimal_model, MODEL_FILE) # we are just pickling it locally first

# You can also save the pickled object into the stage we created earlier
session.file.put(MODEL_FILE, "@ML_HOL_ASSETS", overwrite=True)

# Get all relevant column names to pass into the UDF call
feature_cols = test_df[CATEGORICAL_COLUMNS_OE+NUMERICAL_COLUMNS].columns

In [None]:
# Cache the model load to optimize inference
@cachetools.cached(cache={})
def load_model(filename):
    import joblib
    import sys
    import os

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

# Register the UDF via decorator
@udf(name='batch_predict_diamond', 
     session=session, 
     replace=True, 
     is_permanent=True, 
     stage_location='@ML_HOL_ASSETS',
     input_types=[F.FloatType()]*len(feature_cols),
     return_type=F.FloatType(),
     imports=['@ML_HOL_ASSETS/model.joblib.gz'],
     packages=['pandas','joblib','cachetools','xgboost'])
def batch_predict_diamond(test_df: pd.DataFrame) -> pd.Series:
    # Need to name the columns because column names aren't passed in to this function
    test_df.columns = ["CUT_OE", "COLOR_OE", "CLARITY_OE", 'CARAT', 'DEPTH', 'TABLE_PCT', 'X', 'Y', 'Z']
    model = load_model('model.joblib.gz')
    return model.predict(test_df) # This is using the XGBoost library's model.predict(), not Snowpark ML's

Call Vectorized User-Defined Function (UDF) on test data.

In [None]:
test_df_w_preds = test_df.with_column('PREDICTED_PRICE', batch_predict_diamond(*feature_cols))
test_df_w_preds.show()

In [None]:
session.close()