### 

In [None]:
# Imports 
import snowflake.snowpark
from snowflake.snowpark import Session
from snowflake.snowpark import functions as F
from snowflake.snowpark import types as T

import joblib
import io
import os

import json

# Make sure we do not get line breaks when doing show on wide dataframes
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"))

from sklearn.ensemble import RandomForestClassifier
import pandas as pd


In [None]:
# Using the preprocessing library from https://github.com/Snowflake-Labs/snowpark-python-demos/tree/main/sp4py_utilities
import preprocessing as pp


Connect to Snowflake

This example is using a JSON file with the following structure
```
{
    "account":"MY SNOWFLAKE ACCOUNT",
    "user": "MY USER",
    "password":"MY PASSWORD",
    "role":"MY ROLE",
    "warehouse":"MY WH",
    "database":"MY DB",
    "schema":"MY SCHEMA"
}

```

In [None]:
with open('creds.json') as f:
    connection_parameters = json.load(f)

session = Session.builder.configs(connection_parameters).create()
print("Current role: " + session.get_current_role() + ", Current schema: " + session.get_fully_qualified_current_schema() + ", Current WH: " + session.get_current_warehouse())


Function that saves a object as a joblib file on a Snowflake stage

In [None]:
# Helper function used to save a object to a Snowflake stage, used within the training Stored procedure
def save_file_to_stage(snf_session, obj, file_name, stage_path):
    file_path = stage_path + file_name
    input_stream = io.BytesIO()
    input_stream.name = file_name
    joblib.dump(obj, input_stream)
    snf_session.file.put_stream(input_stream, file_path, auto_compress=False, overwrite=True)
    
    return file_path + '/' + file_name # a workaround


Function that scales numeric columns using a Robust Scaler and encode categorical columns using One-Hot encoding.<br>
It saves the fitted scaler and encoder to a Snowflake stage.<br>
Used by the traning stored procedure

In [None]:
def data_prep(snf_session, df, num_cols, cat_cols, file_location):
    
    # Numeric columns
    # Generate names for the output columns
    scaler_output_cols = [col + "_SCALED" for col in num_cols]
    rs = pp.RobustScaler(input_cols=num_cols, output_cols=scaler_output_cols)
    # Fit and transform
    df_scaled = rs.fit_transform(df)
    # Drop the input columns from 
    df_scaled = df_scaled.drop(num_cols)
    # Store the fitted scaler in a Snowflake stage
    scaler_path = save_file_to_stage(snf_session, rs, 'my_rs_scaler.joblib', file_location)

    # Categorical columns using Ordinal Encoder
    oe = pp.OneHotEncoder(input_cols=cat_cols)
    
    # Using the transformed dataframe since we can not extend a dataframe with another (we can stack using UNION)
    df_return = oe.fit_transform(df_scaled)
    
    # Store the fitted encoder in a Snowflake stage
    encoder_path = save_file_to_stage(snf_session, oe, 'my_oe_encoder.joblib', file_location)
    
    # Returned the transformed dataframe
    return (scaler_path, encoder_path, df_return)

Function that deploy a trained model as a Vectorized UDF in Snowflake, used by the training stored procedure

In [None]:
def create_udf(snf_session, udf_name, model, input_cols, stage_loc):
    # Deploy a Batch API UDF
    @F.udf(name=udf_name, is_permanent=True, stage_location=stage_loc, packages=['pandas', 'scikit-learn'], replace=True, session=snf_session)
    def preidict_intl_plan(ds: T.PandasSeries[dict]) -> T.PandasSeries[float]:
        df = pd.io.json.json_normalize(ds)[input_cols]
        prediction = model.predict(df)
        return prediction


Function that will transform the input data using scaler and encoder, save the fitted transformers to a Snowflake
stage and train a model that is deployed as a UDF.

In [None]:
def train(snf_session: snowflake.snowpark.Session, params: dict) -> dict:
    # Use the table with the name in train_data_table
    df_cat_cols = params['cat_cols']
    df_num_cols = params['num_cols']
    df_target_col = params['target_col']
        
    df = snf_session.table(params['train_input_table']).select(*df_cat_cols, *df_num_cols, df_target_col)
    # Imputation
    # Numeric columns, replace missing values with the mean value
    imputation = {}
    for num_col in df_num_cols:
        imputation[num_col] = df.select(F.mean(num_col)).collect()[0][0]
    
    # Categorical columns, replace values with the most frequent
    for cat_col in df_cat_cols:
        imputation[cat_col] = df.select(F.mode(cat_col)).collect()[0][0]
    
    df = df.fillna(imputation)
        
    # Prepare the data and store the fitted scaler and encoder to stage
    num_scaler_path, cat_encoder_path, df_prepared = data_prep(snf_session, df, df_num_cols, df_cat_cols, params['transfomers_location'])
    
    # Save the prepared data into train_ouput_table
    df_prepared.write.mode("overwrite").save_as_table(params['train_ouput_table'])
    X_cols = df_prepared.columns
    X_cols.remove(df_target_col)
    
    # Train the classifier model, pull back data into a Pandas dataframe
    pd_df = df_prepared.to_pandas()
    
    X = pd_df[X_cols]
    Y = pd_df[df_target_col]
    clf = RandomForestClassifier(n_estimators=10)
    model = clf.fit(X, Y)
    
    # Deploy the model to Snowflake, the model is stored as part of the UDF definition ie not stored on a stage
    create_udf(snf_session, params['model_udf_name'], model, X_cols, params['model_udf_stage'])
    # Return the path to the saved encoder and scaler and the impute values
    return_dict = {"num_scaler_path": num_scaler_path, "cat_encoder_path": cat_encoder_path, "imputer":imputation}
    return return_dict


Create an instance of Stored Procedure using the sproc() function, the Store procedure is using the train function

In [None]:
session.clear_imports()
session.clear_packages()

train_model_sp = F.sproc(train, name="prep_train_titanic", is_permanent=True, session = session,stage_location='udf_stage', replace=True, 
                         packages=['snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools'], imports=['preprocessing'])

Set the parameters need for training

In [None]:
train_param = {
    "train_input_table": "titanic",
    "cat_cols": ["EMBARKED", "SEX", "PCLASS"],
    "num_cols": ["AGE", "FARE"],
    "target_col": "SURVIVED",
    "train_ouput_table" : "titanic_preped_train",
    "transfomers_location": "@udf_stage/titanic_train/",
    "model_udf_name": "predict_survival",
    "model_udf_stage": "udf_stage"
}

Run the training in Snowflake using the Stored Procedure

In [None]:
ret_dict = train_model_sp(train_param)
transformers = json.loads(ret_dict)
transformers

We can decide to either run the csoring from our local enviroment, by using Snowpark for Python everything is pushed down to Snowflake, or deploy the scoring function as a Python Stored Procedure

In [None]:
# Simple function to transform input_data (list of list) within Snowflake
# This function would be executed outside Snowflake
def load_file_from_stage(snf_session: snowflake.snowpark.Session, object_path): 
    output_stream = snf_session.file.get_stream(object_path)
    obj = joblib.load(output_stream)
    return obj

def score(snf_session: snowflake.snowpark.Session, params: dict) -> str:
    # Convert the input data to a Snowpark dataframe
    df_input = snf_session.table(params['score_input_table'])[params["score_cols"]]
    
    # Add the imputation
    df_imputed = df_input.fillna(params["transformers"]['imputer'])
    # Load the scaler and scale the numeric columns
    loaded_rs = load_file_from_stage(snf_session, params["transformers"]['num_scaler_path'])
    df_scaled = loaded_rs.transform(df_imputed)
    
    # load the encoder and encode categorical columns
    loaded_oe = load_file_from_stage(snf_session, params["transformers"]['cat_encoder_path'])
    df_transformed = loaded_oe.transform(df_scaled)

    # Generate a 
    key_vals = []
    for col in loaded_rs.output_cols:
        key_vals.extend([F.lit(col.upper()), F.col(col)])
    
    for in_col in loaded_oe.output_cols:
        for col in loaded_oe.output_cols[in_col]:
            key_vals.extend([F.lit(col.upper()), F.col(col)])
    
    # Score using the deployed UDF and using object_construct to create a dict that will be part of a Pandas series
    # as input to our UDF. The select is because the scaler will keep the input columns...
    df_scored = df_transformed.with_column("PREDICTED", F.call_function(params["model_udf_name"],  F.object_construct(*key_vals)))
    
    df_scored.write.mode("overwrite").save_as_table(params['score_output_table'])
    # Return the dataframe with scores, could use save_as_table to store the result in Snowflake instead
    return f"Saved scored resutl in: {params['score_output_table']}"



In [None]:
session.clear_imports()
session.clear_packages()

score_titanic_sp = F.sproc(score, name="score_titanic", is_permanent=True, session = session,stage_location='udf_stage', replace=True, 
                         packages=['snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib'], imports=['preprocessing'])

In [None]:
score_param = {
    "score_input_table": "titanic",
    "score_cols": ["EMBARKED", "SEX", "PCLASS", "AGE", "FARE", "SURVIVED"],
    "score_output_table": "titanic_scored",
    "transformers" : transformers,
    "model_udf_name": "predict_survival"
}

In [None]:
score_titanic_sp(session, score_param)

In [None]:
session.table("titanic_scored").show()

In [None]:
session.close()