# END TO END ML USING SNOWPARK AND SCIKIT-LEARN

In this notebook we fit/train a Scikit-Learn ML pipeline that includes common feature engineering tasks such as Imputations, Scaling and One-Hot Encoding. The pipeline also includes a `RandomForestRegressor` model that will predict member lifetime value for ecommerce customers. 

We will fit/train the pipeline using a Snowpark Python Stored Procedure (SPROC) and then save the pipeline to a Snowflake stage. This example concludes by showing how a saved model/pipeline can be loaded and run in a scalable fashion on a snowflake warehouse using Snowpark Python User-Defined Functions (UDFs). 

![Snowpark ML](images/snowpark_ml.png)

### Create a session with Snowpark connector

In [1]:
# Snowpark
import snowflake.snowpark
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import sproc, udf, udtf, pandas_udf
from snowflake.snowpark.session import Session
from snowflake.snowpark import version as v
from snowflake.snowpark import types as T
from snowflake.snowpark.window import Window
import json

from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

import preprocessing #https://github.com/Snowflake-Labs/snowpark-python-demos/tree/main/sp4py_utilities

import pandas as pd
from cachetools import cached
import sys 
import numpy as np
import datetime
import io
import os
import joblib

with open('creds.json') as f:
    connection_parameters = json.load(f)

session = Session.builder.configs(connection_parameters).create()
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools')

The version of package cachetools in the local environment is 5.2.0, which does not fit the criteria for the requirement cachetools. Your UDF might not work when the package version is different between the server and your local environment


In [2]:
print(f"""
    ROLE: {session.get_current_role()}
    DATABASE: {session.get_current_database()}
    SCHEMA: {session.get_current_schema()}
    WAREHOUSE: {session.get_current_warehouse()}
""")


    ROLE: "SNOWPARK_DEMO_ROLE"
    DATABASE: "SNOWPARK_DEMO_DB"
    SCHEMA: "MEMBERSHIP_MODELING_DEMO"
    WAREHOUSE: "SNOWPARK_DEMO_WH"



### Create stages to save the ML model/pipeline and permanent UDFs

In [3]:
query = "create or replace stage models" +\
        " directory = (enable = true)" +\
        " copy_options = (on_error='skip_file')"

session.sql(query).collect()

[Row(status='Stage area MODELS successfully created.')]

In [4]:
query = "create or replace stage udf" +\
        " copy_options = (on_error='skip_file')"

session.sql(query).collect()
session.file.put("preprocessing.zip", '@udf', auto_compress=False, overwrite=True)     

[PutResult(source='preprocessing.zip', target='preprocessing.zip', source_size=29226, target_size=29232, source_compression='NONE', target_compression='NONE', status='UPLOADED', message='')]

## Explore Snowflake data using Snowpark

In [5]:
snowdf = session.table("ECOMMERCE_CUSTOMERS_100K")
cat_attribs = ['GENDER','MEMBERSHIP_STATUS']
num_attribs = ['MEMBERSHIP_LENGTH', 'AVG_SESSION_LENGTH', 'TIME_ON_APP', 'TIME_ON_WEBSITE']
model_features = cat_attribs + num_attribs

## Train ML Model
Create a stored procedure to push ML training to Snowpark

In [6]:
snowdf_train, snowdf_test = snowdf.random_split([0.8, 0.2], seed=82) # use seed to make the split repeatable

snowdf_train.write.mode("overwrite").save_as_table("MEMBERSHIP_TRAIN")
snowdf_test.write.mode("overwrite").save_as_table("MEMBERSHIP_TEST")

We use an sklearn pipeline for variable transformation. This entire pipeline gets serialized into `@MODELS/member_ltv_pipeline.joblib`, eliminating our need to do transformation on future datasets. 

In [7]:
def save_file(session, model, path):
    model_output_dir = '/tmp'
    
    stage_folder   = '/'.join(path.split('/')[:-1])
    stage_filename = path.split('/')[-1]

    # dump model to temp space
    temp_file = os.path.join(model_output_dir, stage_filename)
    joblib.dump(model, temp_file)

    # put file to stage to persist
    session.file.put(temp_file, stage_folder, overwrite=True)
    return "successfully created file: " + path

def train_model(session: snowflake.snowpark.Session) -> float:
    snowdf = session.table("ECOMMERCE_CUSTOMERS_10K")
    snowdf = snowdf.drop("EMAIL")
    
    # split the train and test set
    snowdf_train, snowdf_test = snowdf.random_split([0.8, 0.2], seed=82) # use seed to make the split repeatable
    

    # save the train and test sets as time stamped tables in Snowflake 
    snowdf_train.write.mode("overwrite").save_as_table("MEMBERSHIP_TRAIN")
    snowdf_test.write.mode("overwrite").save_as_table("MEMBERSHIP_TEST")
    
    X_train = snowdf_train.drop("YEARLY_SPENT").to_pandas() # drop labels for training set
    Y_train = snowdf_train.select("YEARLY_SPENT").to_pandas()
    X_test = snowdf_test.drop("YEARLY_SPENT").to_pandas()
    Y_test = snowdf_test.select("YEARLY_SPENT").to_pandas()

    # numerical features
    X_train_num = X_train.drop(cat_attribs, axis=1)
    # create a pipeline for numerical features
    num_pipeline = Pipeline([
            ('imputer', SimpleImputer(strategy="median")),
            ('std_scaler', StandardScaler())
        ])

    cat_pipeline = Pipeline([
        ("cat", OneHotEncoder())
    ])

    preprocessor = ColumnTransformer([
            ("num", num_pipeline, num_attribs),
            ("cat", cat_pipeline, cat_attribs)
        ])

    full_pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', RandomForestRegressor(n_estimators=100, random_state=42)),
        ])

    # fit the preprocessing pipeline and the model together
    full_pipeline.fit(X_train, Y_train)

    # save the full pipeline including the model
    save_file(session, full_pipeline, "@MODELS/member_ltv_pipeline.joblib")

    # predict on the test set and return the root mean squared error (RMSE)
    Y_pred = full_pipeline.predict(X_test)
    lin_mse = mean_squared_error(Y_test, Y_pred)
    lin_rmse = np.sqrt(lin_mse)
    return lin_rmse

# Create an instance of StoredProcedure using the sproc() function
train_model_sp = sproc(train_model, replace=True)

### Use high-memory warehouse for fitting our model.

In [8]:
#use a snowpark-optimized high memory warehouse for model fitting
session.sql("USE WAREHOUSE snowpark_demo_wh_high_mem").collect()

# run model training stored procedure
rmse = train_model_sp()

# switch back to standard warehouse
session.sql("USE WAREHOUSE snowpark_demo_wh").collect()

print(f"RMSE: {rmse}")

RMSE: 45.1417685930689


### Retrieve our model for additional analysis.

In [9]:
def retrieve_model(stage_location):
    import gzip 
    filename = stage_location.split('/')[-1]
    session.file.get(stage_location, '.')
    with gzip.open(filename, 'rb') as f:
        model_file = joblib.load(f)

    return model_file

pipeline = retrieve_model("@MODELS/member_ltv_pipeline.joblib.gz")
pipeline

In [10]:
# get categorical feature names from onehot
cat_attribs_transformed = list(pipeline.named_steps['preprocessor'].transformers_[1][1]\
   .named_steps['cat'].get_feature_names_out(cat_attribs))

transformed_feature_names = num_attribs + cat_attribs_transformed 

# show the pipeline transform output
sample_data = session.table("MEMBERSHIP_TEST").limit(100).to_pandas()
pd.DataFrame(pipeline.steps[0][1].transform(sample_data), columns=transformed_feature_names)

Unnamed: 0,MEMBERSHIP_LENGTH,AVG_SESSION_LENGTH,TIME_ON_APP,TIME_ON_WEBSITE,GENDER_FEMALE,GENDER_MALE,GENDER_UNKNOWN,MEMBERSHIP_STATUS_BASIC,MEMBERSHIP_STATUS_BRONZE,MEMBERSHIP_STATUS_DIAMOND,MEMBERSHIP_STATUS_GOLD,MEMBERSHIP_STATUS_PLATIN,MEMBERSHIP_STATUS_SILVER
0,0.869533,1.855107,1.721910,1.532446,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0
1,-0.214311,0.058451,0.408017,0.968254,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
2,2.218088,0.260888,1.014187,0.774211,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
3,2.259294,-0.111653,0.808320,1.146925,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
4,-1.303399,-0.598281,-0.592687,-1.193166,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,-0.684717,0.293577,-0.961267,0.448771,0.0,1.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0
96,-1.535028,-2.275921,-1.477059,-1.215832,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0
97,1.503171,1.703704,0.999006,0.721511,0.0,1.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0
98,1.678917,0.119095,0.966047,1.182815,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0


In [11]:
rfr = pipeline.named_steps['model']
pd.DataFrame(rfr.feature_importances_,
             index=transformed_feature_names, 
             columns=["FEAT_IMPORTANCE"]).sort_values("FEAT_IMPORTANCE", ascending=False)

Unnamed: 0,FEAT_IMPORTANCE
TIME_ON_APP,0.725238
MEMBERSHIP_STATUS_GOLD,0.055366
MEMBERSHIP_STATUS_DIAMOND,0.038848
MEMBERSHIP_STATUS_BASIC,0.035936
AVG_SESSION_LENGTH,0.030397
TIME_ON_WEBSITE,0.029189
MEMBERSHIP_STATUS_BRONZE,0.026521
MEMBERSHIP_LENGTH,0.024725
MEMBERSHIP_STATUS_PLATIN,0.021806
MEMBERSHIP_STATUS_SILVER,0.008277


## Deploy Model as UDF

#### Option 1: Deploy as UDF

In [13]:
session.add_import("@MODELS/member_ltv_pipeline.joblib")  
@cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

def predict(GENDER:str, MEMBERSHIP_STATUS:str, MEMBERSHIP_LENGTH:float, 
            AVG_SESSION_LENGTH:float, TIME_ON_APP:float, TIME_ON_WEBSITE:float ) -> float:
       m = read_file("member_ltv_pipeline.joblib")       
       model_features = ['GENDER', 'MEMBERSHIP_STATUS', 'MEMBERSHIP_LENGTH',
                   'AVG_SESSION_LENGTH', 'TIME_ON_APP', 'TIME_ON_WEBSITE']
       row = pd.DataFrame([locals()], columns=model_features)
       return m.predict(row)[0]

pred_udf = udf(predict, name="predict_pipeline", is_permanent=True, stage_location="@udf", replace=True)

#### Option 2: Deploy as Vectorized UDF
We're using a vectorized UDF, which automatically splits up the rows and sends a batch to each UDF execution resulting in better throughput. Additionally, we're caching the model load from stage to decrease IO cost. 

In [14]:
session.add_import("@MODELS/member_ltv_pipeline.joblib")  

@cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

@pandas_udf(name="predict_pipeline_batch", stage_location="@udf", max_batch_size=1000, is_permanent=True, replace=True)
def predict_pipeline_batch(df: T.PandasDataFrame[str, str, float, float, float, float]) -> T.PandasSeries[float]:
       m = read_file("member_ltv_pipeline.joblib") 
       
       df.columns = model_features
       return m.predict(df)

### Run Inference UDF on 1 million records

Use our `feature_engineering` SPROC to prepare dataset of 1 million fresh records for model scoring 

In [17]:
session.sql("ALTER WAREHOUSE SNOWPARK_DEMO_WH SET WAREHOUSE_SIZE='4X-LARGE' WAIT_FOR_COMPLETION=TRUE").collect()

[Row(status='Statement executed successfully.')]

Now running the vectorized UDF

In [18]:
inference_df = session.table("ECOMMERCE_CUSTOMERS_1M")

scored_df = inference_df.select("*", 
                                F.call_udf("predict_pipeline_batch", 
                                           *[F.col(c) for c in model_features]).alias("PREDICTED_YEARLY_SPENT")
                               )
scored_df.write.mode("OVERWRITE").save_as_table("INFERED_DATA_1M")
    
session.table("INFERED_DATA_1M").limit(100).to_pandas()

Unnamed: 0,EMAIL,MEMBERSHIP_LENGTH,AVG_SESSION_LENGTH,TIME_ON_APP,TIME_ON_WEBSITE,YEARLY_SPENT,GENDER_FEMALE,GENDER_MALE,GENDER_UNKNOWN,MEMBERSHIP_STATUS_BASIC,MEMBERSHIP_STATUS_BRONZE,MEMBERSHIP_STATUS_DIAMOND,MEMBERSHIP_STATUS_GOLD,MEMBERSHIP_STATUS_PLATIN,MEMBERSHIP_STATUS_SILVER,PREDICTED_YEARLY_SPENT
0,tnzwv2docj@d4o9d.com,2.625180,9.157463,100.939647,54.811385,247.462410,0,1,0,0,1,0,0,0,0,248.667400
1,eqbmoy83dq@by5pv.com,1.244177,14.279386,180.397931,67.862776,398.051616,0,1,0,0,0,0,1,0,0,387.959093
2,crotwmeceb@vxbst.com,5.945734,29.565405,280.229443,40.412405,560.079367,1,0,0,0,0,0,1,0,0,544.160566
3,skceyi1su5@c30eu.com,5.354699,24.773932,215.424641,47.125920,488.905015,1,0,0,0,0,0,1,0,0,470.172277
4,aga7clfokc@i0zro.com,8.033190,25.056523,285.379809,103.931373,767.477356,0,1,0,0,0,1,0,0,0,699.137450
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
95,p43zpd9rmx@virgk.com,7.131819,22.476144,226.860576,115.729622,674.034999,0,1,0,0,0,1,0,0,0,621.925098
96,1n8yonxatw@3zhdn.com,3.980869,24.230480,204.904425,62.621617,401.159443,1,0,0,0,0,0,1,0,0,434.822059
97,ujkncmmug9@ruzsk.com,0.414515,9.636493,62.521577,41.745499,223.108218,1,0,0,0,1,0,0,0,0,206.966865
98,eesyev6mih@kkmfj.com,6.315429,17.757215,160.430751,95.944590,409.221733,1,0,0,0,0,0,1,0,0,450.032300


You could also run this in SQL

In [21]:
session.sql(""" create temp table pred_1m as 
                select predict_pipeline_batch(
                                A.GENDER, 
                                A.MEMBERSHIP_STATUS, 
                                A.MEMBERSHIP_LENGTH, 
                                A.AVG_SESSION_LENGTH, 
                                A.TIME_ON_APP, 
                                A.TIME_ON_WEBSITE) as prediction, 
                       a.* 
                from ECOMMERCE_CUSTOMERS_1M a""").collect()

[Row(status='Table PRED_1M successfully created.')]

In [23]:

session.sql("""select * from pred_1m limit 100""").to_pandas()

Unnamed: 0,PREDICTION,EMAIL,GENDER,MEMBERSHIP_STATUS,MEMBERSHIP_LENGTH,AVG_SESSION_LENGTH,TIME_ON_APP,TIME_ON_WEBSITE,YEARLY_SPENT
0,302.208597,nnwjycmabh@s8w43.com,MALE,SILVER,3.957579,14.002707,106.187090,44.138489,306.809984
1,652.794011,h2w0wkri41@th1sl.com,FEMALE,PLATIN,8.498461,26.525002,281.778347,119.989756,627.431100
2,610.218320,u0wb0jssly@uie5k.com,FEMALE,PLATIN,6.382756,37.274461,231.529442,128.730983,637.466773
3,604.923510,pltqapdrf7@rg3ea.com,MALE,PLATIN,10.289480,34.688028,219.202593,138.083633,588.856916
4,273.060000,c27abosumn@wkzsg.com,MALE,SILVER,2.075146,8.597488,101.162862,34.048345,284.427829
...,...,...,...,...,...,...,...,...,...
95,513.907746,naid2zoqlc@qmvds.com,FEMALE,GOLD,9.747075,33.143796,173.916489,50.917335,514.232564
96,456.909713,xqh7e29aou@g3eol.com,FEMALE,GOLD,5.272529,25.959274,219.551377,66.501171,446.045215
97,324.800778,wm3umlbst5@vgbqu.com,MALE,SILVER,1.417340,23.472896,66.889136,55.443651,308.442859
98,320.836412,9lhnzehylr@4u2hv.com,FEMALE,SILVER,3.493995,15.604096,116.470956,70.715810,345.249595
