In [1]:
# Generic
import pandas as pd
import io

# Joblib
import joblib

# Snowpark
import snowflake.snowpark
from snowflake.snowpark.functions import sproc
from snowflake.snowpark import Session

#### Prerequisites
1.	Download data file from https://www.kaggle.com/competitions/house-prices-advanced-regression-techniques/data
2.	Download Kaggle data set and upload it in snowflake
a.	You can use the snowflake snowsight (web UI) to upload a dataset and create a table in a single step.
b.	In this solution, the table is created with the name HOUSE_PRICES_RAW_DATA for the train data set and HOUSE_PRICES_TEST_DATA for the test data set.
3.	Create Kaggle account or Google Collab account or you may setup Jypter to run locally.
4. If you had already followed through the previous chapters, you would already have a database called "RAW" and schema "RETAIL". If not, the database and schema used in this example is RAW.RETAIL and it needs to be changed as appropritate.
5. The example also assumes you have a database called "COMMONS" with schema called "UTILS" which is used for storing models and stored procedures.


#### Create session and load data

In [2]:
connection_parameters = {
    "account": "",
    "user": "",
    "password": "",
    "warehouse": "", # optional
    "database":"COMMONS",
    "schema":"UTILS"
}  

session = Session.builder.configs(connection_parameters).create()  

In [3]:
session.sql("alter session set log_level = info;").collect()

[Row(status='Statement executed successfully.')]

#### Add packages which are needed within your functions

In [4]:
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools')

#### Create a stage to store your model

In [5]:
query = "create or replace stage commons.utils.models" +\
        " directory = (enable = true)" +\
        " copy_options = (on_error='skip_file')"
        
session.sql(query).collect()

[Row(status='Stage area MODELS successfully created.')]

#### Input variables

We define the training table name, source columns and the target column available within the table.

In [6]:
training_table = 'RAW.RETAIL.HOUSE_PRICES_RAW_DATA'
src_cols = ["BLDGTYPE", "OVERALLCOND", "MSSUBCLASS", "MSZONING","LOTAREA", "LOTCONFIG", "YEARBUILT", "FOUNDATION"]
target_col = 'SALEPRICE'

Check if the table has been read properly by displaying the count.

In [7]:
session.table(training_table).count()

1460

#### Create stored procedure to train and upload the model

We create a function that will be deployed as a stored procedure to train a random forest regression model on the sample dataset to predict house prices. The model is saved for later use. The response from this procedure is important as it is later used to retrieve the features used by the model.

Note that I did not create a named procedure, instead went with a temporary procedure (snowflake assigns a random name and associates it with the local function name). The temporary stored procedure will only be available within the current snowflake session. Once closed, you would not be able to call or use it from another session like from snowflake worksheets or a different jypter worksheet.



In [8]:

def save_file(session, model, path):
    """
    The function save_file is responsible for saving a machine learning model to a specified path using the Snowflake Python connector.

    Args:
        session: A Snowflake session object that represents the connection to the Snowflake database.
        model: The machine learning model object that needs to be saved.
        path: The path where the model file will be saved.

    Returns:
        str: success message
    """

    # takes the model object and serializes it using the joblib.dump() function 
    # creates an input stream using io.BytesIO() to store the serialized model
    input_stream = io.BytesIO()
    joblib.dump(model, input_stream)
    
    # the serialized model is uploaded to the specified path using the upload_stream() method of the Snowflake connection cursor.
    session._conn._cursor.upload_stream(input_stream, path)
    return "successfully created file: " + path



@sproc(replace=True)
def train_model(session: snowflake.snowpark.Session, training_table: str, src_cols: list, target_col: str) -> str:
    """
    This function trains a random forest regression model to predict house prices. 
    The function performs data preprocessing, including one-hot encoding of categorical variables, cleaning column names, and splitting the dataset into training and validation sets. It then trains the random forest regression model using the training set and predicts house prices for the validation set. 
    The function saves the trained model to a file and returns a JSON string containing information about the model.
    
    Args:
        session (snowflake.snowpark.Session): A Snowflake session object.
        training_table (str): The name of the training table in Snowflake.
        src_cols (list): A list of source columns to use for training the model.
        target_col (str): The target column to predict.

    Returns:
        str: JSON string containing information about the model's feature details, feature importance, and mean absolute percentage error.
    """
    import logging
    logger = logging.getLogger("train_model")

    # load the raw data from the training table into a pandas DataFrame and extract only needed columns
    raw_data = session.table(training_table).to_pandas()
    train_dataset = raw_data[src_cols]
    
    # identify the categorical columns in the source columns and Perform one-hot encoding on the categorical columns.
    from sklearn.preprocessing import OneHotEncoder
     
    s = (train_dataset.dtypes == 'object')
    categorical_cols = list(s[s].index)
    logger.info('Performing OneHotEncoding')
    
    OH_encoder = OneHotEncoder(sparse_output=False).set_output(transform='pandas')
    OH_cols = pd.DataFrame(OH_encoder.fit_transform(train_dataset[categorical_cols]))
    
    # concatenate the one-hot encoded columns with the source columns and drop the original categorical columns.
    train_df = pd.concat([train_dataset, OH_cols], axis=1).drop(columns = categorical_cols)

    # extract the target column from the raw data.
    target_df = raw_data[target_col]

    # clean the column names by replacing special characters with underscores.
    import re
    train_df.columns = train_df.columns.map(lambda s: re.sub(r'[^_a-zA-Z0-9]', '_', s))
    
    logger.info('Building Model')

    from sklearn.metrics import mean_absolute_percentage_error
    from sklearn.model_selection import train_test_split
     
    # Split the training set into 
    # training and validation set
    X_train, X_valid, Y_train, Y_valid = train_test_split(train_df, target_df, train_size=0.8, test_size=0.2, random_state=0)

    # initialize a random forest regression model.
    # train the model using the training set.
    # calculate the feature importances of the trained model.
    # predict house prices for the validation set.
    from sklearn.ensemble import RandomForestRegressor
    
    model_RFR = RandomForestRegressor(n_estimators=10)
    model_RFR.fit(X_train, Y_train)

    feat_importance = pd.DataFrame(
        model_RFR.feature_importances_, train_df.columns, columns=["FeatImportance"]
    ).to_dict()

    logger.info('Testing Model with Test data')
    Y_pred = model_RFR.predict(X_valid)

    # save the trained model to a file.
    logger.info('Saving Model into Stage')
    path = save_file(session, model_RFR, "@MODELS/houseprice_estimator.joblib")
    logger.info('Saved Model:'+path)

    # create a dictionary containing information about the model's feature details, feature importance, and mean absolute percentage error.
    model_info = dict()
    model_info['feature_details'] = train_df.dtypes.astype(str).to_dict()
    model_info['feature_importance']= feat_importance['FeatImportance']
    model_info['mean_absolute_percentage_error']= mean_absolute_percentage_error(Y_valid, Y_pred)
    
    # convert the dictionary to a JSON string and return it.
    import json
    return json.dumps(model_info)

In [9]:
# invoke the procedure to create and upload the model
model_results = train_model(training_table, src_cols, target_col)

In [10]:
import json
model_results_json = json.loads(model_results)

In [11]:
print(json.dumps(model_results_json, indent=4))

{
    "feature_details": {
        "OVERALLCOND": "int8",
        "MSSUBCLASS": "int16",
        "LOTAREA": "int32",
        "YEARBUILT": "int16",
        "BLDGTYPE_1Fam": "float64",
        "BLDGTYPE_2fmCon": "float64",
        "BLDGTYPE_Duplex": "float64",
        "BLDGTYPE_Twnhs": "float64",
        "BLDGTYPE_TwnhsE": "float64",
        "MSZONING__C__all__": "float64",
        "MSZONING_FV": "float64",
        "MSZONING_RH": "float64",
        "MSZONING_RL": "float64",
        "MSZONING_RM": "float64",
        "LOTCONFIG_Corner": "float64",
        "LOTCONFIG_CulDSac": "float64",
        "LOTCONFIG_FR2": "float64",
        "LOTCONFIG_FR3": "float64",
        "LOTCONFIG_Inside": "float64",
        "FOUNDATION_BrkTil": "float64",
        "FOUNDATION_CBlock": "float64",
        "FOUNDATION_PConc": "float64",
        "FOUNDATION_Slab": "float64",
        "FOUNDATION_Stone": "float64",
        "FOUNDATION_Wood": "float64"
    },
    "feature_importance": {
        "OVERALLCOND": 0.043187

In [12]:
model_features = [key for key in model_results_json['feature_importance']]

In [13]:
model_features

['OVERALLCOND',
 'MSSUBCLASS',
 'LOTAREA',
 'YEARBUILT',
 'BLDGTYPE_1Fam',
 'BLDGTYPE_2fmCon',
 'BLDGTYPE_Duplex',
 'BLDGTYPE_Twnhs',
 'BLDGTYPE_TwnhsE',
 'MSZONING__C__all__',
 'MSZONING_FV',
 'MSZONING_RH',
 'MSZONING_RL',
 'MSZONING_RM',
 'LOTCONFIG_Corner',
 'LOTCONFIG_CulDSac',
 'LOTCONFIG_FR2',
 'LOTCONFIG_FR3',
 'LOTCONFIG_Inside',
 'FOUNDATION_BrkTil',
 'FOUNDATION_CBlock',
 'FOUNDATION_PConc',
 'FOUNDATION_Slab',
 'FOUNDATION_Stone',
 'FOUNDATION_Wood']

#### Unit test the model using a python stored procedure

Like above, I'm using a anonymous (temporary) procedure. All operations happens within the proc which includes - reading test data from snowflake, loading the pre-trained model from file, encodes categorical variables in the test dataset before using it to test the model and finally returning the lsit of predicted values.

Do not consider this as a best practice for unit testing and only consider this as a quick and dirty way to test your model.

In [44]:
import os
import sys

session.add_import("@MODELS/houseprice_estimator.joblib")  

def read_file(filename):
    """
    Reads the snowflake model from the stage location. 
    You can think of this as mounting the stage location on a special path, which is local to the python container that is execuitng your code.
    The local path that has file mounted is available by reading the value of "snowflake_import_directory" system variable
    Args:
        filename (str): name of model file

    Returns:
        binary: machine learning model
    """
    import_dir = sys._xoptions.get("snowflake_import_directory")
    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
                m = joblib.load(file)
                return m

@sproc(replace=True)
def test_model(session: snowflake.snowpark.Session, test_table: str, src_cols: list) -> bool:
    """
    Tests the model

    Args:
        session (snowflake.snowpark.Session): a Snowflake session object
        test_table (str): the name of the test table
        src_cols (list): a list of strings representing the names of the source columns

    Returns:
        bool: success or failure
    """
    
    import logging
    logger = logging.getLogger("train_model")

    test_raw = session.table(test_table).limit(2000).to_pandas()

    test_dataset = test_raw[src_cols]
    model_RFR = read_file('houseprice_estimator.joblib')

    if model_RFR is None:
        raise Exception('Unable to read model file')
    
    from sklearn.preprocessing import OneHotEncoder

    s = (test_dataset.dtypes == 'object')
    categorical_cols = list(s[s].index)
    logger.info("Categorical variables:")
    logger.info(categorical_cols)
    logger.info('No. of. categorical features: ', len(categorical_cols))
    
    OH_encoder = OneHotEncoder(sparse_output=False).set_output(transform='pandas')
    OH_cols = pd.DataFrame(OH_encoder.fit_transform(test_dataset[categorical_cols]))    
    
    test_df = pd.concat([test_dataset, OH_cols], axis=1).drop(columns = categorical_cols)

    # clean column names
    import re
    test_df.columns = test_df.columns.map(lambda s: re.sub(r'[^_a-zA-Z0-9]', '_', s))
    logger.info(test_df.columns)

    # drop columns not available in the model
    unavailable_features_in_model = set(test_df.columns).difference(model_features)
    logger.info(unavailable_features_in_model)

    for feature in unavailable_features_in_model:
        test_df = test_df.drop(feature, axis=1)
    
    # add columns available in the model but not in the testdata (this method is not ideal but okay for quick poc)
    # enable this only if needed
    # unavailable_feature_in_testdf = set(model_features).difference(test_df.columns)

    # for feature in unavailable_feature_in_testdf:
    #     test_df.loc[:,feature] = 0

    test_df = test_df.loc[:, model_features]
            
    test_df['SALEPRICE_ESTIMATED']  = model_RFR.predict(test_df)

    
    tbl_ref = session.write_pandas(test_df, table_name='HOUSE_PRICES_TEST_DATA_RESULT', database='RAW', schema='RETAIL', auto_create_table=True, create_temp_table=False).collect()
    if tbl_ref is None:
        raise Exception('Error writing estimation results')
    return True


In [39]:
test_table =  'RAW.RETAIL.HOUSE_PRICES_TEST_DATA'

In [40]:
# check if test table exists
session.table(test_table).count()

1459

In [45]:
# run the test_model procedure
test_model(test_table, src_cols)

True

In [47]:
session.table('RAW.RETAIL.HOUSE_PRICES_TEST_DATA_RESULT').show(10)

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"OVERALLCOND"  |"MSSUBCLASS"  |"LOTAREA"  |"YEARBUILT"  |"BLDGTYPE_1Fam"  |"BLDGTYPE_2fmCon"  |"BLDGTYPE_Duplex"  |"BLDGTYPE_Twnhs"  |"BLDGTYPE_TwnhsE"  |"MSZONING__C__all__"  |"MSZONING_FV"  |"MSZONING_RH"  |"MSZONING_RL"  |"MSZONING_RM"  |"LOTCONFIG_Corner"  |"LOTCONFIG_CulDSac"  |"LOTCONFIG_FR2"  |"LOTCONFIG_FR3"  |"LOTCONFIG_Inside"  |"FOUNDATION_BrkTil"  |"FOUNDATION_CBlock"  |"FOUNDATION_PConc"  |"FOUNDATION_Slab"  |"FOUNDATION_Stone"  |"FOUNDATION_Wood"  |"SALEPRICE_ESTIMATED"  |
--------------

#### Utilize the model from a UDF

Let us see how to use the model on you data using UDF (User Defined Function).

In [48]:
# Create a stage to store the UDF
query = "create or replace stage commons.utils.udf" +\
        " copy_options = (on_error='skip_file')"
        
session.sql(query).collect()

[Row(status='Stage area UDF successfully created.')]

##### Create the UDF

In [49]:
from snowflake.snowpark.functions import udf
@udf(name="predict", is_permanent=True, stage_location="@udf", replace=True)
def predict(
   OVERALLCOND: int, MSSUBCLASS: int, LOTAREA: int, YEARBUILT: int, BLDGTYPE_1Fam: float, BLDGTYPE_2fmCon: float, BLDGTYPE_Duplex: float, BLDGTYPE_Twnhs: float, BLDGTYPE_TwnhsE: float, MSZONING_C__all_: float, MSZONING_FV: float, MSZONING_RH: float, MSZONING_RL: float, MSZONING_RM: float, LOTCONFIG_Corner: float, LOTCONFIG_CulDSac: float, LOTCONFIG_FR2: float, LOTCONFIG_FR3: float, LOTCONFIG_Inside: float, FOUNDATION_BrkTil: float, FOUNDATION_CBlock: float, FOUNDATION_PConc: float, FOUNDATION_Slab: float, FOUNDATION_Stone: float, FOUNDATION_Wood: float
   ) -> float:
   model_RFR = read_file('houseprice_estimator.joblib')       
   row = pd.DataFrame([locals()], columns=model_features)
   return model_RFR.predict(row)[0]

#### Prep the test data prior to applying the UDF

In [50]:
test_table =  'RAW.RETAIL.HOUSE_PRICES_TEST_DATA'

test_raw = session.table(test_table).to_pandas()

test_dataset = test_raw[src_cols]

from sklearn.preprocessing import OneHotEncoder

s = (test_dataset.dtypes == 'object')
categorical_cols = list(s[s].index)
print("Categorical variables:")
print(categorical_cols)
print('No. of. categorical features: ', len(categorical_cols))

OH_encoder = OneHotEncoder(sparse_output=False).set_output(transform='pandas')
OH_cols = pd.DataFrame(OH_encoder.fit_transform(test_dataset[categorical_cols]))    
    
test_df = pd.concat([test_dataset, OH_cols], axis=1).drop(columns = categorical_cols)

# clean column names
import re
test_df.columns = test_df.columns.map(lambda s: re.sub(r'[^_a-zA-Z0-9]', '_', s))

unavailable_features_in_model = set(test_df.columns).difference(model_features)

for feature in unavailable_features_in_model:
    test_df = test_df.drop(feature, axis=1)

unavailable_feature_in_testdf = set(model_features).difference(test_df.columns)

for feature in unavailable_feature_in_testdf:
    test_df.loc[:,feature] = 0

test_df = test_df.loc[:, model_features]

Categorical variables:
['BLDGTYPE', 'MSZONING', 'LOTCONFIG', 'FOUNDATION']
No. of. categorical features:  4


In [51]:
session.write_pandas(df=test_df, table_name='HOUSE_PRICES_TEST_DATA_INPUT', database='RAW', schema='RETAIL', auto_create_table=True, create_temp_table=False, overwrite=True)

<snowflake.snowpark.table.Table at 0x30d5a5050>

In [53]:
session.table("RAW.RETAIL.HOUSE_PRICES_TEST_DATA_INPUT").show(10)

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"OVERALLCOND"  |"MSSUBCLASS"  |"LOTAREA"  |"YEARBUILT"  |"BLDGTYPE_1Fam"  |"BLDGTYPE_2fmCon"  |"BLDGTYPE_Duplex"  |"BLDGTYPE_Twnhs"  |"BLDGTYPE_TwnhsE"  |"MSZONING__C__all__"  |"MSZONING_FV"  |"MSZONING_RH"  |"MSZONING_RL"  |"MSZONING_RM"  |"LOTCONFIG_Corner"  |"LOTCONFIG_CulDSac"  |"LOTCONFIG_FR2"  |"LOTCONFIG_FR3"  |"LOTCONFIG_Inside"  |"FOUNDATION_BrkTil"  |"FOUNDATION_CBlock"  |"FOUNDATION_PConc"  |"FOUNDATION_Slab"  |"FOUNDATION_Stone"  |"FOUNDATION_Wood"  |
--------------------------------------------------------------

#### Testing the UDF

In [61]:

test_df = session.table("RAW.RETAIL.HOUSE_PRICES_TEST_DATA_INPUT")
test_df_results = test_df.select(*test_df,
                    predict(*test_df).alias('PREDICTION')
                    ).limit(20)
                    
#test_df_results.to_pandas().head(20)


#### Using Vectorized UDFs For Optimal Performance

The code above runs the model in parallel but performs the predictions row by row. We can further improve it by using vectorized UDFs. Snowpark automatically splits up the rows and sends a batch to each UDF execution resulting in better throughput.


In [63]:
# Create a stage to store the VUDF
query = "create or replace stage commons.utils.vudf" +\
        " copy_options = (on_error='skip_file')"
        
session.sql(query).collect()

[Row(status='Stage area VUDF successfully created.')]

#### Create the Vectorized UDF

**Caching for better performance**

Cachetools is a Python library that provides a collection of caching algorithms. 

The library is useful in applications where temporarily storing data in memory improves performance. It's easy to use and can be integrated into an application with just a few lines of code.

We redefine the read_file() function below with the cachetools annotation. As this decorator is used the file will be read once and then cached. 
Any recurring UDF executions will use the cached file and avoids reading the file from the storage/stage.

In [64]:
import cachetools
from snowflake.snowpark import types as T
from snowflake.snowpark.functions import pandas_udf

@cachetools.cached(cache={})
def read_file(filename):
    import_dir = sys._xoptions.get("snowflake_import_directory")
    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
                m = joblib.load(file)
                return m

#@pandas_udf(max_batch_size=100)
@pandas_udf(max_batch_size=100, name="predict_batch", is_permanent=True, stage_location="@vudf", replace=True)
def predict_batch(df: T.PandasDataFrame[int, int, int, int, float, float, float, float, float, float, float, float, float, float, float, float, float, float, float, float, float, float, float, float, float]) -> T.PandasSeries[float]:
    model_RFR = read_file('houseprice_estimator.joblib') 
    df.columns = model_features
    return model_RFR.predict(df)

#### Test the Vectorized UDF

In [65]:
test_df = session.table("RAW.RETAIL.HOUSE_PRICES_TEST_DATA_INPUT")

test_df_results = test_df.select(*test_df,
                    predict_batch(*test_df).alias('PREDICTION')
                    ).limit(20)
                    
test_df_results.to_pandas().head(20)

Unnamed: 0,OVERALLCOND,MSSUBCLASS,LOTAREA,YEARBUILT,BLDGTYPE_1Fam,BLDGTYPE_2fmCon,BLDGTYPE_Duplex,BLDGTYPE_Twnhs,BLDGTYPE_TwnhsE,MSZONING__C__all__,...,LOTCONFIG_FR2,LOTCONFIG_FR3,LOTCONFIG_Inside,FOUNDATION_BrkTil,FOUNDATION_CBlock,FOUNDATION_PConc,FOUNDATION_Slab,FOUNDATION_Stone,FOUNDATION_Wood,PREDICTION
0,6,20,11622,1961,1.0,0.0,0.0,0.0,0.0,0,...,0.0,0.0,1.0,0.0,1.0,0.0,0.0,0.0,0.0,145700.0
1,6,20,14267,1958,1.0,0.0,0.0,0.0,0.0,0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,187040.0
2,5,60,13830,1997,1.0,0.0,0.0,0.0,0.0,0,...,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,219090.0
3,6,60,9978,1998,1.0,0.0,0.0,0.0,0.0,0,...,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,249630.0
4,5,120,5005,1992,0.0,0.0,0.0,0.0,1.0,0,...,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,220120.0
5,5,60,10000,1993,1.0,0.0,0.0,0.0,0.0,0,...,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,202450.0
6,7,20,7980,1992,1.0,0.0,0.0,0.0,0.0,0,...,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,172740.0
7,5,60,8402,1998,1.0,0.0,0.0,0.0,0.0,0,...,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,201340.0
8,5,20,10176,1990,1.0,0.0,0.0,0.0,0.0,0,...,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,0.0,192188.0
9,5,20,8400,1970,1.0,0.0,0.0,0.0,0.0,0,...,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,133750.0


In [66]:
session.close()