In [1]:
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark.session import Session

from sklearn.impute import SimpleImputer
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

import pandas as pd
import numpy as np
import datetime
import io
import joblib

connection_parameters = {
            "account": "ls88978.eu-west-1",
            "user": "snowpark_user",
            "password": "UdemySnowflake12!@",
            "role": "Accountadmin", 
            "warehouse": "HOUSING_MODEL_WH",
            "database":"SNOWPARK_UDEMY",
            "schema":"TEST_SCHEMA"
}

session = Session.builder.configs(connection_parameters).create()

session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools')



<h2>Create Internal Stages in Snowflake</h2>

In [2]:
query = "create or replace stage house_model_training_sproc_stg" +\
        " directory = (enable = true)" +\
        " copy_options = (on_error='skip_file')"
session.sql(query).collect()

query = "create or replace stage house_model_output_stg" +\
        " copy_options = (on_error='skip_file')"
session.sql(query).collect()

[Row(status='Stage area HOUSE_MODEL_OUTPUT_STG successfully created.')]

<h2>Function Definition for pre-processing & Model Training/Fitting </h2>

In [3]:
def save_file(session, model, path):
  input_stream = io.BytesIO()
  joblib.dump(model, input_stream)
  session._conn._cursor.upload_stream(input_stream, path)
  return "successfully created file: " + path

def train_model(session: snowflake.snowpark.Session) -> float:
    
    snowdf = session.table("HOUSING_DATA")
    snowdf_train, snowdf_test = snowdf.random_split([0.8, 0.2], seed=82)
    
    snowdf_train.write.mode("overwrite").save_as_table("HOUSING_TRAIN")
    snowdf_test.write.mode("overwrite").save_as_table("HOUSING_TEST")
    
    housing_train = snowdf_train.drop("MEDIAN_HOUSE_VALUE").to_pandas() 
    housing_train_labels = snowdf_train.select("MEDIAN_HOUSE_VALUE").to_pandas()
    housing_test = snowdf_test.drop("MEDIAN_HOUSE_VALUE").to_pandas()
    housing_test_labels = snowdf_test.select("MEDIAN_HOUSE_VALUE").to_pandas()


    housing_num = housing_train.drop("OCEAN_PROXIMITY", axis=1)

    num_pipeline = Pipeline([
            ('imputer', SimpleImputer(strategy="median")),
            ('std_scaler', StandardScaler()),
        ])

    num_attribs = list(housing_num)
    cat_attribs = ["OCEAN_PROXIMITY"]

    preprocessor = ColumnTransformer([
            ("num", num_pipeline, num_attribs),
            ("cat", OneHotEncoder(), cat_attribs)
        ])

    full_pipeline = Pipeline([
            ('preprocessor', preprocessor),
            ('model', RandomForestRegressor(n_estimators=100, random_state=42)),
        ])

    full_pipeline.fit(housing_train, housing_train_labels)

    save_file(session, full_pipeline, "@house_model_output_stg/housing_price_reg.joblib")

    housing_predictions = full_pipeline.predict(housing_test)
    lin_mse = mean_squared_error(housing_test_labels, housing_predictions)
    lin_rmse = np.sqrt(lin_mse)
    return lin_rmse

<h2>Create Stored Procedure</h2>

In [4]:

train_model_sp = sproc(train_model,name='train_house_model',stage_location='@house_model_training_sproc_stg', \
                       is_permanent=True,replace=True)


<h2>Invoke SPROC for Model Training </h2>

In [5]:
train_model_sp()

49501.16717526244

<h2>Schedule a Task to perform Model training every Monday at 10 AM UTC </h2>

In [None]:
sql = """
        CREATE OR REPLACE TASK housing_model_training
        WAREHOUSE = COMPUTE_WH
        SCHEDULE = ‘USING CRON * 10 * * MON UTC’
        AS
        call train_house_model();
        """
session.sql(sql)

<h2>Create Stage for Model Serving UDF</h2>

In [17]:
query = "create or replace stage house_model_serving_udf_stg" +\
        " copy_options = (on_error='skip_file')"
session.sql(query).collect()



[Row(status='Stage area HOUSE_MODEL_SERVING_UDF_STG successfully created.')]

<h2>Create UDF for Prediction Serving</h2>

In [18]:
import sys
import cachetools
import os
from snowflake.snowpark.functions import udf
session.add_import("@house_model_output_stg/housing_price_reg.joblib")  

@cachetools.cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

features = ['LONGITUDE', 'LATITUDE', 'HOUSING_MEDIAN_AGE', 'TOTAL_ROOMS',
       'TOTAL_BEDROOMS', 'POPULATION', 'HOUSEHOLDS', 'MEDIAN_INCOME', 'OCEAN_PROXIMITY']

@udf(name="predict_house_value", is_permanent=True, stage_location="@house_model_serving_udf_stg", replace=True)
def predict_house_value(LONGITUDE: float, LATITUDE: float, HOUSING_MEDIAN_AGE: float, TOTAL_ROOMS: float, 
                    TOTAL_BEDROOMS: float, POPULATION: float, HOUSEHOLDS: float, MEDIAN_INCOME: float, 
                    OCEAN_PROXIMITY: str) -> float:
       m = read_file('housing_price_reg.joblib')       
       row = pd.DataFrame([locals()], columns=features)
       return m.predict(row)[0]


<h2>Run Predictions & Model Serving</h2>

In [19]:
from snowflake.snowpark import functions as F

snowdf_test = session.table("HOUSING_TEST")
inputs = snowdf_test.drop("MEDIAN_HOUSE_VALUE")

# snowdf_results = snowdf_test.select(*inputs,
#                     predict_house_value(*inputs).alias('predicted_value'), 
#                     (F.col('MEDIAN_HOUSE_VALUE')).alias('actual_value')
#                     ).limit(20)

snowdf_results = snowdf_test.select(
                    predict_house_value(*inputs).alias('predicted_value'), 
                    (F.col('MEDIAN_HOUSE_VALUE')).alias('actual_value')
                   ).limit(20)

snowdf_results.to_pandas()

Unnamed: 0,PREDICTED_VALUE,ACTUAL_VALUE
0,135441.0,140000.0
1,118197.0,110400.0
2,233106.0,191400.0
3,224431.0,188800.0
4,103799.0,87500.0
5,355583.11,75000.0
6,101761.0,75000.0
7,123777.0,112500.0
8,217602.24,137500.0
9,228977.05,257800.0


<h2>Create Vectorized UDF For Optimal Performance

In [20]:
import pandas
import sys
import cachetools
import os
from snowflake.snowpark.functions import pandas_udf
from snowflake.snowpark import types as T

features = ['LONGITUDE', 'LATITUDE', 'HOUSING_MEDIAN_AGE', 'TOTAL_ROOMS',
       'TOTAL_BEDROOMS', 'POPULATION', 'HOUSEHOLDS', 'MEDIAN_INCOME', 'OCEAN_PROXIMITY']

session.add_import("@house_model_output_stg/housing_price_reg.joblib")
@cachetools.cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

@pandas_udf(max_batch_size=100)
def predict_batch(df: T.PandasDataFrame[float, float, float, float,
                                          float, float, float, float, str]) -> T.PandasSeries[float]:
       m = read_file('housing_price_reg.joblib') 
       df.columns = features
       return m.predict(df)

In [21]:
from snowflake.snowpark import functions as F

snowdf_test = session.table("HOUSING_TEST")
inputs = snowdf_test.drop("MEDIAN_HOUSE_VALUE")
snowdf_results = snowdf_test.select(
                    predict_batch(*inputs).alias('predicted_value'), 
                    (F.col('MEDIAN_HOUSE_VALUE')).alias('actual_value')
                    ).limit(20)
                   
snowdf_results.to_pandas().head(20)

Unnamed: 0,PREDICTED_VALUE,ACTUAL_VALUE
0,135441.0,140000.0
1,118197.0,110400.0
2,233106.0,191400.0
3,224431.0,188800.0
4,103799.0,87500.0
5,355583.11,75000.0
6,101761.0,75000.0
7,123777.0,112500.0
8,217602.24,137500.0
9,228977.05,257800.0
