In [None]:
# Base dir for this code
import os
base_dir = os.getcwd()
print(base_dir)

In [3]:
import json
import pandas as pd

from snowflake.snowpark import functions as F
from snowflake.snowpark import version as v
from snowflake.snowpark.session import Session

from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.preprocessing import KBinsDiscretizer, OneHotEncoder
from snowflake.ml.modeling.impute import SimpleImputer

In [4]:
import warnings
warnings.filterwarnings("ignore")

# 1.0 Snowflake Setup

In [8]:
# Ensure that your credentials are stored in creds.json
with open('creds.json') as f:
    data = json.load(f)
    USERNAME = data['user']
    PASSWORD = data['password']
    SF_ACCOUNT = data['account']
    SF_WH = data['warehouse']

CONNECTION_PARAMETERS = {
   "account": SF_ACCOUNT,
   "user": USERNAME,
   "password": PASSWORD,
}

session = Session.builder.configs(CONNECTION_PARAMETERS).create()

#### Ensure that TPC-DS dataset is available in your environment.

In [9]:
session.sql('CREATE DATABASE IF NOT EXISTS tpcds_xgboost').collect()
session.sql('CREATE SCHEMA IF NOT EXISTS tpcds_xgboost.demo').collect()
session.sql("create or replace warehouse FE_AND_INFERENCE_WH with warehouse_size='3X-LARGE'").collect()
session.sql("create or replace warehouse snowpark_opt_wh with warehouse_size = 'MEDIUM' warehouse_type = 'SNOWPARK-OPTIMIZED'").collect()
session.sql("alter warehouse snowpark_opt_wh set max_concurrency_level = 1").collect()
session.sql("CREATE OR REPLACE STAGE TPCDS_XGBOOST.DEMO.ML_MODELS").collect()
session.use_warehouse('FE_AND_INFERENCE_WH')
session.use_database('tpcds_xgboost')
session.use_schema('demo')

Select either 100 or 10 for the TPC-DS Dataset size to use below. See (https://docs.snowflake.com/en/user-guide/sample-data-tpcds.html)[here] for more information If you choose 100, I recommend >= 3XL warehouse. 

In [10]:
TPCDS_SIZE_PARAM = 10
SNOWFLAKE_SAMPLE_DB = 'SFSALESSHARED_SFC_SAMPLES_PROD3_SAMPLE_DATA' # Name of Snowflake Sample Database might be different...

if TPCDS_SIZE_PARAM == 100: 
    TPCDS_SCHEMA = 'TPCDS_SF100TCL'
elif TPCDS_SIZE_PARAM == 10:
    TPCDS_SCHEMA = 'TPCDS_SF10TCL'
else:
    raise ValueError("Invalid TPCDS_SIZE_PARAM selection")
    
store_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store_sales')
catalog_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.catalog_sales') 
web_sales = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.web_sales') 
date = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.date_dim')
dim_stores = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.store')
customer = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer')
address = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_address')
demo = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_demographics')

# 2.0 Data Engineering
We will aggregate sales by customer across all channels(web, store, catalogue) and join that to customer demographic data. 

In [13]:
store_sales_agged = store_sales.group_by('ss_customer_sk').agg(F.sum('ss_sales_price').as_('total_sales'))
web_sales_agged = web_sales.group_by('ws_bill_customer_sk').agg(F.sum('ws_sales_price').as_('total_sales'))
catalog_sales_agged = catalog_sales.group_by('cs_bill_customer_sk').agg(F.sum('cs_sales_price').as_('total_sales'))
store_sales_agged = store_sales_agged.rename('ss_customer_sk', 'customer_sk')
web_sales_agged = web_sales_agged.rename('ws_bill_customer_sk', 'customer_sk')
catalog_sales_agged = catalog_sales_agged.rename('cs_bill_customer_sk', 'customer_sk')

In [14]:
total_sales = store_sales_agged.union_all(web_sales_agged)
total_sales = total_sales.union_all(catalog_sales_agged)

In [15]:
total_sales = total_sales.group_by('customer_sk').agg(F.sum('total_sales').as_('total_sales'))

In [16]:
customer = customer.select('c_customer_sk','c_current_hdemo_sk', 'c_current_addr_sk', 'c_customer_id', 'c_birth_year')

In [17]:
customer = customer.join(address.select('ca_address_sk', 'ca_zip'), customer['c_current_addr_sk'] == address['ca_address_sk'] )
customer = customer.join(demo.select('cd_demo_sk', 'cd_gender', 'cd_marital_status', 'cd_credit_rating', 'cd_education_status', 'cd_dep_count'),
                                customer['c_current_hdemo_sk'] == demo['cd_demo_sk'] )
customer = customer.rename('c_customer_sk', 'customer_sk')

In [18]:
customer.limit(5).to_pandas()

Unnamed: 0,CUSTOMER_SK,C_CURRENT_HDEMO_SK,C_CURRENT_ADDR_SK,C_CUSTOMER_ID,C_BIRTH_YEAR,CA_ADDRESS_SK,CA_ZIP,CD_DEMO_SK,CD_GENDER,CD_MARITAL_STATUS,CD_CREDIT_RATING,CD_EDUCATION_STATUS,CD_DEP_COUNT
0,18518128,407,14473839,AAAAAAAAAHAJKBBA,1984,14473839,62477.0,407,M,W,Good,Advanced Degree,0
1,18518129,1449,8416546,AAAAAAAABHAJKBBA,1959,8416546,,1449,M,U,Low Risk,4 yr Degree,0
2,18518130,4132,815263,AAAAAAAACHAJKBBA,1927,815263,8648.0,4132,F,M,High Risk,Primary,0
3,18518131,2228,30507569,AAAAAAAADHAJKBBA,1931,30507569,94136.0,2228,F,W,Low Risk,Advanced Degree,0
4,18518132,3212,1189423,AAAAAAAAEHAJKBBA,1974,1189423,42477.0,3212,F,M,High Risk,Unknown,0


In [19]:
final_df = total_sales.join(customer, on='customer_sk')

In [20]:
# Size of the final DF is around 95 Million.
final_df.count()

62726989

In [21]:
session.use_database('tpcds_xgboost')
session.use_schema('demo')
final_df.write.mode('overwrite').save_as_table('feature_store')

# 3.0 Feature Engineering

In [23]:
session.use_warehouse('snowpark_opt_wh')
session.use_database('tpcds_xgboost')
session.use_schema('demo')

In [24]:
snowdf = session.table("feature_store")
snowdf = snowdf.drop(['CA_ZIP','CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])

In [25]:
snowdf.limit(5).to_pandas()

Unnamed: 0,TOTAL_SALES,C_BIRTH_YEAR,CD_GENDER,CD_MARITAL_STATUS,CD_CREDIT_RATING,CD_EDUCATION_STATUS,CD_DEP_COUNT
0,49188.45,1964,F,W,Good,College,0
1,30862.22,1946,F,W,Good,College,0
2,35523.56,1978,F,W,Good,College,0
3,31829.89,1974,F,W,Good,College,0
4,32593.95,1988,F,W,Good,College,0


In [26]:
cat_cols = ['CD_GENDER', 'CD_MARITAL_STATUS', 'CD_CREDIT_RATING', 'CD_EDUCATION_STATUS']
num_cols = ['C_BIRTH_YEAR', 'CD_DEP_COUNT']

### 3.1 Missing Value Imputation

We can use the SimpleImputer in snowflake.ml.preprocessing to replace missing values with the most frequent.

```python
# SimpleImputer in snowflake.ml.preprocessing
from snowflake.ml.modeling.impute import SimpleImputer
my_imputer = sfml.preprocessing.SimpleImputer(input_cols=['your_column'],
                                output_cols=['your_column'],
                                strategy='constant',
                                fill_value='OTHER')
my_imputer.fit(my_sdf)
my_sdf = my_imputer.transform(my_sdf)
```

In [27]:
# Imputation of Numeric Cols
my_imputer = SimpleImputer(input_cols= num_cols,
                           output_cols= num_cols,
                           strategy='median')
sdf_prepared = my_imputer.fit(snowdf).transform(snowdf)

Input value type doesn't match the target column data type, this replacement was skipped. Column Name: "C_BIRTH_YEAR", Type: LongType(), Input Value: 1958.0, Type: <class 'float'>
Input value type doesn't match the target column data type, this replacement was skipped. Column Name: "CD_DEP_COUNT", Type: LongType(), Input Value: 0.0, Type: <class 'float'>


### 3.2 One-Hot Encoding of Categorical Cols

In [None]:
# OHE of Categorical Cols
my_ohe_encoder = OneHotEncoder(input_cols=cat_cols, output_cols=cat_cols, drop_input_cols=True)
sdf_prepared = my_ohe_encoder.fit(sdf_prepared).transform(sdf_prepared)

In [28]:
sdf_prepared.limit(5).to_pandas()

Unnamed: 0,CD_GENDER_F,CD_GENDER_M,CD_MARITAL_STATUS_D,CD_MARITAL_STATUS_M,CD_MARITAL_STATUS_S,CD_MARITAL_STATUS_U,CD_MARITAL_STATUS_W,CD_CREDIT_RATING_Good,CD_CREDIT_RATING_High Risk,CD_CREDIT_RATING_Low Risk,...,CD_EDUCATION_STATUS_2 yr Degree,CD_EDUCATION_STATUS_4 yr Degree,CD_EDUCATION_STATUS_Advanced Degree,CD_EDUCATION_STATUS_College,CD_EDUCATION_STATUS_Primary,CD_EDUCATION_STATUS_Secondary,CD_EDUCATION_STATUS_Unknown,C_BIRTH_YEAR,CD_DEP_COUNT,TOTAL_SALES
0,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1952,0,31153.66
1,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1991,0,32507.61
2,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1943,0,46633.79
3,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1925,0,32415.93
4,0.0,1.0,0.0,0.0,0.0,0.0,1.0,1.0,0.0,0.0,...,0.0,0.0,1.0,0.0,0.0,0.0,0.0,1954,0,31113.58


### 3.3 Clean column names

In [29]:
# Cleaning column names to make it easier for future referencing
import re

cols = sdf_prepared.columns
for old_col in cols:
    new_col = re.sub(r'[^a-zA-Z0-9_]', '', old_col)
    new_col = new_col.upper()
    sdf_prepared = sdf_prepared.rename(col(old_col), new_col)

# 4.0 ML Modeling

In [33]:
# Use Snowpark Optimized Warehouse
session.use_warehouse('snowpark_opt_wh')

### 4.1 Prepare data

In [30]:
# Prepare Data for modeling
feature_cols = sdf_prepared.columns
feature_cols.remove('TOTAL_SALES')
target_col = 'TOTAL_SALES'

In [31]:
# Save the train and test sets as time stamped tables in Snowflake
snowdf_train, snowdf_test = sdf_prepared.random_split([0.8, 0.2], seed=82) 
snowdf_train.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TRAIN")
snowdf_test.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TEST")

### 4.2 Initialize Model and Fit

In [34]:
# Define the XGBRegressor and fit the model
xgbmodel = XGBRegressor(random_state=123, input_cols=feature_cols, label_cols=target_col, output_cols='PREDICTION')
xgbmodel.fit(snowdf_train)

The version of package snowflake-snowpark-python in the local environment is 1.5.1, which does not fit the criteria for the requirement snowflake-snowpark-python. Your UDF might not work when the package version is different between the server and your local environment


<snowflake.ml.modeling.xgboost.xgb_regressor.XGBRegressor at 0x2948279a0>

### 4.3 Predict on test set

In [35]:
# Score the data using the fitted xgbmodel
sdf_scored = xgbmodel.predict(snowdf_test)

In [37]:
sdf_scored.limit(5).to_pandas()

Unnamed: 0,CD_DEP_COUNT,CD_CREDIT_RATING_LOW_RISK,CD_MARITAL_STATUS_D,CD_EDUCATION_STATUS_SECONDARY,CD_CREDIT_RATING_HIGH_RISK,CD_MARITAL_STATUS_U,CD_EDUCATION_STATUS_PRIMARY,CD_EDUCATION_STATUS_UNKNOWN,CD_EDUCATION_STATUS_COLLEGE,C_BIRTH_YEAR,...,CD_EDUCATION_STATUS_4_YR_DEGREE,CD_MARITAL_STATUS_W,CD_MARITAL_STATUS_S,TOTAL_SALES,CD_CREDIT_RATING_UNKNOWN,CD_GENDER_M,CD_MARITAL_STATUS_M,CD_GENDER_F,CD_EDUCATION_STATUS_2_YR_DEGREE,PREDICTION
0,0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1971,...,0.0,0.0,0.0,50303.41,0.0,1.0,0.0,0.0,0.0,32319.396484
1,0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1939,...,0.0,0.0,0.0,30779.7,0.0,1.0,0.0,0.0,0.0,32343.357422
2,0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1938,...,0.0,0.0,0.0,32107.81,0.0,1.0,0.0,0.0,0.0,32344.523438
3,0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1977,...,0.0,0.0,0.0,52162.15,0.0,1.0,0.0,0.0,0.0,32321.402344
4,0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,1979,...,0.0,0.0,0.0,30717.62,0.0,1.0,0.0,0.0,0.0,32322.242188


### 4.4 Save predictions in Snowflake

In [39]:
session.use_database('tpcds_xgboost')
session.use_schema('demo')
sdf_scored.write.mode('overwrite').save_as_table('predictions')

# 5.0 Deploying trained model as UDF for future usage

Steps to follow-
1. Get model in your local environment
2. Save the file in your local env. as .joblib file
3. Upload the file to Snowflake stage
4. Create UDF using model in stage

We can use `to_xgboost()` in order to get the actual xgboost model object which gives us access to all its attributes.

### Creating sample dataset for quick predictions

In [40]:
snowdf_test = session.table('tpc_TEST')
# Predicting with sample dataset
sample_data = snowdf_test.limit(100)
sample_data.write.mode("overwrite").save_as_table("temp_test")

In [41]:
test_sdf = session.table('temp_test')

### 5.1 Prepare model to convert to UDF
1. Get model in your local environment
2. Save the file in your local env. as .joblib file
3. Upload the file to Snowflake stage

In [42]:
import joblib
import cachetools

In [43]:
xgb_file = xgbmodel.to_xgboost()
xgb_file

In [51]:
MODEL_FILE = 'model.joblib.gz'
joblib.dump(xgb_file, MODEL_FILE) # we are just pickling it locally first

['model.joblib.gz']

In [52]:
# You can also save the pickled object into the stage we created earlier
session.file.put(MODEL_FILE, "@ML_MODELS", auto_compress=False, overwrite=True)

[PutResult(source='model.joblib.gz', target='model.joblib.gz', source_size=141882, target_size=141888, source_compression='GZIP', target_compression='GZIP', status='UPLOADED', message='')]

### 5.2 Create UDF for future reference


In [53]:
from snowflake.snowpark.functions import udf
import snowflake.snowpark.types as T

In [54]:
# Define a simple scoring function
from cachetools import cached

@cached(cache={})
def load_model(model_path: str) -> object:
    from joblib import load
    model = load(model_path)
    return model

def udf_score_xgboost_model_vec_cached(df: pd.DataFrame) -> pd.Series:
    import os
    import sys
    # file-dependencies of UDFs are available in snowflake_import_directory
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    model_name = 'model.joblib.gz'
    model = load_model(import_dir+model_name)
    df.columns = feature_cols
    scored_data = pd.Series(model.predict(df))
    return scored_data

In [55]:
# Register UDF
udf_clv = session.udf.register(func=udf_score_xgboost_model_vec_cached, 
                               name="TPCDS_PREDICT_CLV", 
                               stage_location='@ML_MODELS',
                               input_types=[T.FloatType()]*len(feature_cols),
                               return_type = T.FloatType(),
                               replace=True, 
                               is_permanent=True, 
                               imports=['@ML_MODELS/model.joblib.gz'],
                               packages=['pandas',
                                         'xgboost',
                                         'joblib',
                                         'cachetools'], 
                               session=session)

### 5.3 Extra Stuff

### Inference using UDF Created right here

Note we are using `udf_clv` that was defined earlier.

In [56]:
test_sdf_w_preds = test_sdf.with_column('PREDICTED', udf_clv(*feature_cols))
test_sdf_w_preds.limit(2).to_pandas()

Unnamed: 0,CD_GENDER_F,CD_GENDER_M,CD_MARITAL_STATUS_D,CD_MARITAL_STATUS_M,CD_MARITAL_STATUS_S,CD_MARITAL_STATUS_U,CD_MARITAL_STATUS_W,CD_CREDIT_RATING_GOOD,CD_CREDIT_RATING_HIGH_RISK,CD_CREDIT_RATING_LOW_RISK,...,CD_EDUCATION_STATUS_4_YR_DEGREE,CD_EDUCATION_STATUS_ADVANCED_DEGREE,CD_EDUCATION_STATUS_COLLEGE,CD_EDUCATION_STATUS_PRIMARY,CD_EDUCATION_STATUS_SECONDARY,CD_EDUCATION_STATUS_UNKNOWN,C_BIRTH_YEAR,CD_DEP_COUNT,TOTAL_SALES,PREDICTED
0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,1973,0,29797.36,32319.482422
1,1.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,1975,0,27902.99,32299.892578


### Inference using UDF Called from Snowflake

Notice we are calling the UDF created in snowflake using `F.call_udf()`

In [57]:
test_sdf_w_preds = test_sdf.with_column('PREDICTED',F.call_udf("TPCDS_PREDICT_CLV",
                                                               [F.col(c) for c in feature_cols]))
test_sdf_w_preds.limit(2).to_pandas()

Unnamed: 0,CD_GENDER_F,CD_GENDER_M,CD_MARITAL_STATUS_D,CD_MARITAL_STATUS_M,CD_MARITAL_STATUS_S,CD_MARITAL_STATUS_U,CD_MARITAL_STATUS_W,CD_CREDIT_RATING_GOOD,CD_CREDIT_RATING_HIGH_RISK,CD_CREDIT_RATING_LOW_RISK,...,CD_EDUCATION_STATUS_4_YR_DEGREE,CD_EDUCATION_STATUS_ADVANCED_DEGREE,CD_EDUCATION_STATUS_COLLEGE,CD_EDUCATION_STATUS_PRIMARY,CD_EDUCATION_STATUS_SECONDARY,CD_EDUCATION_STATUS_UNKNOWN,C_BIRTH_YEAR,CD_DEP_COUNT,TOTAL_SALES,PREDICTED
0,1.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,1973,0,29797.36,32319.482422
1,1.0,0.0,0.0,0.0,0.0,1.0,0.0,1.0,0.0,0.0,...,0.0,1.0,0.0,0.0,0.0,0.0,1975,0,27902.99,32299.892578


# 6.0 Wrap up

In [7]:
session.close()