# Imports

In [32]:
from snowflake.snowpark.session import Session 
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F 

from snowflake.ml.modeling import preprocessing 
from snowflake.ml.modeling.xgboost import XGBClassifier
from snowflake.ml.modeling.preprocessing import KBinsDiscretizer, OrdinalEncoder, OneHotEncoder
from snowflake.ml.modeling.impute import SimpleImputer


import matplotlib.pyplot as plt 
import seaborn as sns 

import warnings
warnings.filterwarnings("ignore")

# Connect to Snowflake 

In [56]:
connection_params = {
    "connection_name": "default"
}

# To establish a connection to Snowflake, create a Snowpark session and pass connection_params as an argument
session = Session.builder.configs(connection_params).create()



# Train ML Model

In [34]:
session.sql("SELECT CURRENT_WAREHOUSE()").collect()

[Row(CURRENT_WAREHOUSE()='JING_TEST_WH')]

In [35]:
# Change the default warehouse size to "medium", remember to reset it to default before closing this session 
session.sql("ALTER WAREHOUSE JING_TEST_WH SET WAREHOUSE_SIZE = 'MEDIUM'").collect()

[Row(status='Statement executed successfully.')]

In [36]:
session.use_database("ML_SNOWPARK_CI_CD_DB")
session.use_schema("DATA_PROCESSING")

# ML Modeling 

In [37]:
# Prepare data for modeling 
snowdf_train = session.table("CREDIT_DEFAULT_TRAIN")
feature_cols = snowdf_train.columns
feature_cols.remove("TARGET")
target_col = "TARGET"

In [38]:
# Define the XGBClassifier and fit the model 
xgbmodel = XGBClassifier(random_state=123, input_cols=feature_cols, label_cols=target_col, output_cols="PREDICTION")
xgbmodel.fit(snowdf_train)

The version of package 'snowflake-snowpark-python' in the local environment is 1.23.0, which does not fit the criteria for the requirement 'snowflake-snowpark-python'. Your UDF might not work when the package version is different between the server and your local environment.


<snowflake.ml.modeling.xgboost.xgb_classifier.XGBClassifier at 0x3097bdf70>

In [39]:
# Score the data using the fitted xgbmodel
snowdf_test = session.table("CREDIT_DEFAULT_TEST")
scored_sdf = xgbmodel.predict(snowdf_test)

scored_sdf.show(5)

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CODE_GENDER_F"  |"COD

In [40]:
test_df = scored_sdf
test_df.where(F.col("TARGET")!= F.col("PREDICTION")).show(5)

# Percentage of wrong prediction
test_df.where(F.col("TARGET")!= F.col("PREDICTION")).count() / test_df.count()

----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"CODE_GENDER_F"  |"COD

0.014994547437295528

# Deploying model for future use 

Steps to follow --
1. Get model in your local environment 
2. Save the file in you local env. as .joblib file 
3. Upload the file to Snowflake stage 
4. Create UDF using model in stage 

We can use to_xgboost() in order to get the actual xgboost model object which gives us access to its all attributes.

In [41]:
# Change back to the default default warehouse size
session.sql("ALTER WAREHOUSE JING_TEST_WH SET WAREHOUSE_SIZE = 'XSMALL'").collect()

session.sql("SHOW WAREHOUSES LIKE 'JING_TEST_WH'").collect()

[Row(name='JING_TEST_WH', state='STARTED', type='STANDARD', size='X-Small', min_cluster_count=1, max_cluster_count=1, started_clusters=1, running=0, queued=0, is_default='Y', is_current='Y', auto_suspend=300, auto_resume='true', available=' 100', provisioning='0', quiescing='0', other='0', created_on=datetime.datetime(2024, 10, 16, 4, 21, 6, 62000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), resumed_on=datetime.datetime(2024, 10, 25, 5, 31, 23, 884000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), updated_on=datetime.datetime(2024, 10, 25, 5, 31, 57, 710000, tzinfo=<DstTzInfo 'America/Los_Angeles' PDT-1 day, 17:00:00 DST>), owner='JING_TEST_ROLE', comment='', enable_query_acceleration='false', query_acceleration_max_scale_factor=8, resource_monitor='null', actives=1, pendings=0, failed=0, suspended=0, uuid='2202731269', scaling_policy='STANDARD', budget=None, owner_role_type='ROLE', resource_constraint=None)]

In [42]:
import joblib 
import cachetools 

# Step 1: Get model in your local environment

In [53]:
# We can use to_xgboost() in order to get the actual xgboost model object which gives us access to all its attributes
xgb_file = xgbmodel.to_xgboost()
xgb_file

# Step 2: Save the file in your local env. as .joblib file 


In [54]:
MODEL_FILE = "model.joblib.gz"
joblib.dump(xgb_file, MODEL_FILE)   # we are just pickling it locally first 


['model.joblib.gz']

# Step 3: Upload the file to Snowflake stage 

In [57]:
session.sql("CREATE STAGE IF NOT EXISTS ML_SNOWPARK_CI_CD_DB.ML_PROCESSING.ML_MODELS").collect()

[Row(status='ML_MODELS already exists, statement succeeded.')]

In [58]:
session.file.put(MODEL_FILE, "@ML_PROCESSING.ML_MODELS", auto_compress=False, overwrite=True)

SnowparkSQLException: (1304): 01b7ed11-0004-0f24-0000-834b00178132: 090105 (22000): Cannot perform STAGE PUT. This session does not have a current database. Call 'USE DATABASE', or use a qualified name.

# Step 4: Create UDF using model in stage 

In [59]:
session.sql("USE SCHEMA ML_SNOWPARK_CI_CD_DB.ML_PROCESSING").collect()

[Row(status='Statement executed successfully.')]

In [60]:
# Define a simple scoring function 
from cachetools import cached 
import pandas as pd 

@cached(cache={})
def load_model(model_path: str) -> object:
    from joblib import load 
    model = load(model_path)
    return model 

def udf_score_xgboost_model_vec_cached(df: pd.DataFrame) -> pd.Series:
    import os 
    import sys

    # file-dependencies of UDFs are available in snowflake_import_directory
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    model_name = 'model.joblib.gz'
    model = load_model(import_dir + model_name)
    df.columns = feature_cols
    scored_data = pd.Series(model.predict(df))
    return scored_data




In [61]:
# Register UDF

udf_clv = session.udf.register(func=udf_score_xgboost_model_vec_cached, 
                               name="PREDICT_DEFAULT", 
                               stage_location="@ML_MODELS",
                               input_types=[T.FloatType()]*len(feature_cols),
                               return_type=T.FloatType(), 
                               replace=True,
                               is_permanent=True, 
                               imports=["@ML_MODELS/model.joblib.gz"], 
                               packages=['pandas',
                                         'xgboost',
                                         'joblib',
                                         'cachetools'],
                                session=session)

# Wrap-up

In [62]:
!pip freeze > requirements.txt

In [63]:
session.close()