In [120]:
import snowflake.snowpark
from snowflake.snowpark import functions as F
from snowflake.snowpark.session import Session
from snowflake.snowpark import version as v


In [121]:
import json 

with open('creds1.json') as f:
    data = json.load(f)
    USERNAME = data['user']
    PASSWORD = data['password']
    SF_ACCOUNT = data['account']
    SF_WH = data['warehouse']

CONNECTION_PARAMETERS = {
   "account": SF_ACCOUNT,
   "user": USERNAME,
   "password": PASSWORD,
}

session = Session.builder.configs(CONNECTION_PARAMETERS).create()

## Environment Setup

In [122]:
session.use_warehouse('COMPUTE_WH')

Select either 100 or 10 for the TPC-DS Dataset size to use below. See (https://docs.snowflake.com/en/user-guide/sample-data-tpcds.html)[here] for more information If you choose 100, I recommend >= 3XL warehouse. 

In [123]:
SNOWFLAKE_SAMPLE_DB = 'SNOWFLAKE_SAMPLE_DATA' 
TPCDS_SCHEMA = 'TPCDS_SF10TCL'
    
customer = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer')
address = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_address')
demo = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.customer_demographics')
hdemo = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.HOUSEHOLD_DEMOGRAPHICS')
income = session.table(f'{SNOWFLAKE_SAMPLE_DB}.{TPCDS_SCHEMA}.INCOME_BAND')

## Feature Engineering
We will aggregate sales by customer across all channels(web, store, catalogue) and join that to customer demographic data. 

In [124]:
#select C_CUSTOMER_ID, c_salutation, C_PREFERRED_CUST_FLAG, 
#ca_city, ca_state, ca_location_type,
#cd_gender, cd_marital_status, cd_education_status, cd_credit_rating,
#ib_upper_bound 
#from
#snowflake_sample_data.tpcds_sf10tcl.customer cu inner join snowflake_sample_data.tpcds_sf10tcl.customer_address ca
#on cu.c_current_addr_sk = ca.ca_address_sk
#inner join snowflake_sample_data.TPCDS_SF10TCL.customer_demographics cd
#on cu.c_current_cdemo_sk = cd.cd_demo_sk
#inner join snowflake_sample_data.TPCDS_SF10TCL.HOUSEHOLD_DEMOGRAPHICS hd
#on cu.c_current_hdemo_sk = hd.hd_demo_sk
#inner join snowflake_sample_data.TPCDS_SF10TCL.INCOME_BAND ib
#on hd.hd_income_band_sk = ib.ib_income_band_sk

customer = customer.select('C_CUSTOMER_ID', 'c_salutation', 'C_PREFERRED_CUST_FLAG','c_current_addr_sk','c_current_cdemo_sk','c_current_hdemo_sk')
hdemo = hdemo.select('hd_demo_sk','hd_income_band_sk')
income = income.select('ib_upper_bound','ib_income_band_sk')

customer = customer.join(address.select('ca_city', 'ca_state', 'ca_location_type','ca_address_sk'), customer['c_current_addr_sk'] == address['ca_address_sk'])
customer = customer.join(demo.select('cd_gender', 'cd_marital_status', 'cd_education_status', 'cd_credit_rating','cd_demo_sk'), customer['c_current_cdemo_sk'] == demo['cd_demo_sk'] )
customer = customer.join(hdemo, customer['c_current_hdemo_sk'] == hdemo['hd_demo_sk'])
customer = customer.join(income.select('ib_upper_bound','IB_INCOME_BAND_SK'), customer['hd_income_band_sk'] == income['ib_income_band_sk'])

customer.show()

---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"C_CUSTOMER_ID"   |"C_SALUTATION"  |"C_PREFERRED_CUST_FLAG"  |"C_CURRENT_ADDR_SK"  |"C_CURRENT_CDEMO_SK"  |"C_CURRENT_HDEMO_SK"  |"CA_CITY"        |"CA_STATE"  |"CA_LOCATION_TYPE"  |"CA_ADDRESS_SK"  |"CD_GENDER"  |"CD_MARITAL_STATUS"  |"CD_EDUCATION_STATUS"  |"CD_CREDIT_RATING"  |"CD_DEMO_SK"  |"HD_DEMO_SK"  |"HD_INCOME_BAND_SK"  |"IB_UPPER_BOUND"  |"IB_INCOME_BAND_SK"  |
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [125]:
session.sql('CREATE DATABASE IF NOT EXISTS tpcds_xgboost').collect()
session.sql('CREATE SCHEMA IF NOT EXISTS tpcds_xgboost.demo').collect()

session.use_database('tpcds_xgboost')
session.use_schema('demo')
customer.write.mode('overwrite').save_as_table('feature_store')

In [126]:
session.add_packages('snowflake-snowpark-python', 'scikit-learn', 'pandas', 'numpy', 'joblib', 'cachetools', 'xgboost', 'joblib')

The version of package snowflake-snowpark-python in the local environment is 1.1.0, which does not fit the criteria for the requirement snowflake-snowpark-python. Your UDF might not work when the package version is different between the server and your local environment
The version of package scikit-learn in the local environment is 1.2.1, which does not fit the criteria for the requirement scikit-learn. Your UDF might not work when the package version is different between the server and your local environment
The version of package joblib in the local environment is 1.2.0, which does not fit the criteria for the requirement joblib. Your UDF might not work when the package version is different between the server and your local environment
The version of package cachetools in the local environment is 5.3.0, which does not fit the criteria for the requirement cachetools. Your UDF might not work when the package version is different between the server and your local environment
The versio

In [127]:
session.sql('CREATE OR REPLACE STAGE ml_models ').collect()

[Row(status='Stage area ML_MODELS successfully created.')]

In [128]:
#from sklearn.pipeline import Pipeline
#from sklearn.impute import SimpleImputer
#from sklearn.preprocessing import StandardScaler, OneHotEncoder, MinMaxScaler
#from sklearn.metrics import mean_squared_error
#from sklearn.compose import ColumnTransformer
#from sklearn.impute import SimpleImputer
#from sklearn import preprocessing as prep
#from xgboost import XGBRegressor
#import pandas as pd
#import numpy as np
#import joblib
#import os


#snowdf = session.table("feature_store")
#snowdf = snowdf.drop(['CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])
#snowdf1= snowdf.select('C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS', 'CD_CREDIT_RATING', 'IB_UPPER_BOUND')
#snowdf_train, snowdf_test, snowdf_rest = snowdf1.random_split([0.08, 0.02, 0.9], seed=82) 

# save the train and test sets as time stamped tables in Snowflake 
#snowdf_train.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TRAIN")
#snowdf_test.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TEST")


In [129]:

#df_train = snowdf_train.to_pandas()
#df_test = snowdf_test.to_pandas()
#df_train

In [130]:


#imputer=SimpleImputer(missing_values=np.nan,strategy='most_frequent')
#imputer.fit(df_train)
#df_train=imputer.transform(df_train)
#df_train = pd.DataFrame(imputer.fit_transform(df_train))

#df_train.columns = ['C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 
#'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS', 'CD_CREDIT_RATING', 'IB_UPPER_BOUND']


#df_train = df_train.apply(prep.LabelEncoder().fit_transform)

#df_train




In [131]:

#imputer=SimpleImputer(missing_values=np.nan,strategy='most_frequent')
#imputer.fit(df_test)
#df_test=imputer.transform(df_test)
#df_test = pd.DataFrame(imputer.fit_transform(df_test))
#df_test.columns = ['C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 
#'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS', 'CD_CREDIT_RATING', 'IB_UPPER_BOUND']
#df_test = df_test.apply(prep.LabelEncoder().fit_transform)

#df_train
#train_y = df_train['CD_CREDIT_RATING']
#train_x = df_train.drop(columns=['CD_CREDIT_RATING'], axis = 1)
#test_y = df_test['CD_CREDIT_RATING']
#test_x = df_test.drop(columns = ['CD_CREDIT_RATING'], axis = 1)


#train_x


In [132]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, MinMaxScaler
from sklearn.metrics import mean_squared_error
from sklearn.compose import ColumnTransformer
from sklearn.impute import SimpleImputer
from sklearn import preprocessing as prep
from xgboost import XGBRegressor
import pandas as pd
import numpy as np
import joblib
import os

def train_model(session: snowflake.snowpark.Session) -> float:
    snowdf = session.table("feature_store")
    #snowdf = snowdf.drop(['CUSTOMER_SK', 'C_CURRENT_HDEMO_SK', 'C_CURRENT_ADDR_SK', 'C_CUSTOMER_ID', 'CA_ADDRESS_SK', 'CD_DEMO_SK'])
    snowdf1= snowdf.select('C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 
    'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS', 'CD_CREDIT_RATING', 'IB_UPPER_BOUND')
    snowdf_train, snowdf_test, snowdf_rest = snowdf1.random_split([0.08, 0.02, 0.9], seed=82) 

    # save the train and test sets as time stamped tables in Snowflake 
    snowdf_train.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TRAIN")
    snowdf_test.write.mode("overwrite").save_as_table("tpcds_xgboost.demo.tpc_TEST")

    df_train = snowdf_train.to_pandas()
    df_test = snowdf_test.to_pandas()
    
    imputer=SimpleImputer(missing_values=np.nan,strategy='most_frequent')
    imputer.fit(df_train)
    df_train=imputer.transform(df_train)
    df_train = pd.DataFrame(imputer.fit_transform(df_train))

    df_train.columns = ['C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 
    'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS', 'CD_CREDIT_RATING', 'IB_UPPER_BOUND']


    
    
    imputer=SimpleImputer(missing_values=np.nan,strategy='most_frequent')
    imputer.fit(df_test)
    df_test=imputer.transform(df_test)
    df_test = pd.DataFrame(imputer.fit_transform(df_test))
    df_test.columns = ['C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 
    'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS', 'CD_CREDIT_RATING', 'IB_UPPER_BOUND']


    df_train = df_train.apply(prep.LabelEncoder().fit_transform)
    df_test = df_test.apply(prep.LabelEncoder().fit_transform)

    #df_train
    train_y = df_train['IB_UPPER_BOUND']
    train_x = df_train.drop(columns=['IB_UPPER_BOUND'], axis = 1)
    test_y = df_test['IB_UPPER_BOUND']
    test_x = df_test.drop(columns = ['IB_UPPER_BOUND'], axis = 1)

    #train_x = snowdf_train.drop("cd_credit_rating").to_pandas() # drop labels for training set
    #train_y = snowdf_train.select("cd_credit_rating").to_pandas()
    #test_x = snowdf_test.drop("cd_credit_rating").to_pandas()
    #test_y = snowdf_test.select("cd_credit_rating").to_pandas()
    #cat_cols = ['C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS']
    #num_cols = ['IB_UPPER_BOUND']

    #num_pipeline = Pipeline([
    #        ('imputer', SimpleImputer(strategy="median")),
    #        ('std_scaler', StandardScaler()),
    #    ])

    #preprocessor = ColumnTransformer(
    #transformers=[('num', num_pipeline, num_cols),
    #              ('encoder', OneHotEncoder(handle_unknown="ignore"), cat_cols) ])

    cat_cols = ['C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS','CD_CREDIT_RATING']
    #num_cols = ['IB_UPPER_BOUND']
    num_cols = []
    
    num_pipeline = Pipeline([
            ('imputer', SimpleImputer(strategy="median")),
            ('std_scaler', StandardScaler()),
        ])

    preprocessor = ColumnTransformer(
    transformers=[('num', num_pipeline, num_cols),
                  ('encoder', OneHotEncoder(handle_unknown="ignore"), cat_cols) ])

    pipe = Pipeline([('preprocessor', preprocessor), 
                        ('xgboost', XGBRegressor())])
    pipe.fit(train_x, train_y)
#------------------------------------------------------
    #pipe = Pipeline([('xgboost', XGBRegressor())])
    pipe.fit(train_x, train_y)
    pipe.fit(test_x, test_y)

    test_preds = pipe.predict(test_x)
    rmse = mean_squared_error(test_y, test_preds)
    model_file = os.path.join('/tmp', 'model.joblib')
    joblib.dump(pipe, model_file)
    session.file.put(model_file, "@ml_models",overwrite=True)
    return rmse

In [134]:
session.use_warehouse('COMPUTE_WH')
train_model_sp = F.sproc(train_model, session=session, replace=True, is_permanent=True, name="xgboost_sproc", stage_location="@ml_models")
# Switch to Snowpark Optimized Warehouse for training and to run the stored proc
train_model_sp(session=session)

33.13513074290939

In [135]:
# Switch back to feature engineering/inference warehouse
session.use_warehouse('COMPUTE_WH')

In [136]:
import sys
import pandas as pd
import cachetools
import joblib
from snowflake.snowpark import types as T

session.add_import("@ml_models/model.joblib")  

features = ['CD_CREDIT_RATING', 'C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS']

@cachetools.cached(cache={})
def read_file(filename):
       import_dir = sys._xoptions.get("snowflake_import_directory")
       if import_dir:
              with open(os.path.join(import_dir, filename), 'rb') as file:
                     m = joblib.load(file)
                     return m

@F.pandas_udf(session=session, max_batch_size=10000, is_permanent=True, stage_location='@ml_models', replace=True, name="abcsk_xgboost_udf") #name="clv_xgboost_udf"
def predict(df:  T.PandasDataFrame[str, str, str, str, str, str, str, str, str, str]) -> T.PandasSeries[float]:
       m = read_file('model.joblib')       
       df.columns = features
       df = df.apply(prep.LabelEncoder().fit_transform)
       return m.predict(df)

In [138]:
inference_df = session.table('feature_store')
#inference_df = inference_df.select("C_CUSTOMER_ID", "c_salutation", "C_PREFERRED_CUST_FLAG", "ca_city", "ca_state", "ca_location_type", "cd_gender", "cd_marital_status", "cd_education_status", "cd_credit_rating", "ib_upper_bound")
#inference_df = inference_df.drop([ 'C_CURRENT_ADDR_SK', 'C_CURRENT_CDEMO_SK', 'C_CURRENT_HDEMO_SK', 'CA_ADDRESS_SK', 'CD_DEMO_SK','HD_DEMO_SK','HD_INCOME_BAND_SK','IB_INCOME_BAND_SK'])
inference_df = inference_df.select('IB_UPPER_BOUND', 'C_CUSTOMER_ID', 'C_SALUTATION', 'C_PREFERRED_CUST_FLAG', 'CA_CITY', 'CA_STATE', 'CA_LOCATION_TYPE', 'CD_GENDER', 'CD_MARITAL_STATUS', 'CD_EDUCATION_STATUS','CD_CREDIT_RATING')
inputs = inference_df.drop('IB_UPPER_BOUND')

inputs.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"C_CUSTOMER_ID"   |"C_SALUTATION"  |"C_PREFERRED_CUST_FLAG"  |"CA_CITY"    |"CA_STATE"  |"CA_LOCATION_TYPE"  |"CD_GENDER"  |"CD_MARITAL_STATUS"  |"CD_EDUCATION_STATUS"  |"CD_CREDIT_RATING"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|AAAAAAAAPGLNFACA  |Mr.             |N                        |Pine Grove   |ID          |apartment           |F            |W                    |Primary                |Unknown             |
|AAAAAAAAAHLNFACA  |Miss            |Y                        |Oneida       |GA          |condo               |F            |S                    |Advanced Degree        |Unknown             |
|AAAAAAAABHLNFACA  |Dr.            

In [141]:

snowdf_results = inference_df.select(*inputs,
                    predict(*inputs).alias('PREDICTION'), 
                    (F.col('IB_UPPER_BOUND')).alias('IB_UPPER_BOUND')
                    )
#snowdf_results.write.mode('overwrite').save_as_table('predictions1')
snowdf_results.show()

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"C_CUSTOMER_ID"   |"C_SALUTATION"  |"C_PREFERRED_CUST_FLAG"  |"CA_CITY"      |"CA_STATE"  |"CA_LOCATION_TYPE"  |"CD_GENDER"  |"CD_MARITAL_STATUS"  |"CD_EDUCATION_STATUS"  |"CD_CREDIT_RATING"  |"PREDICTION"        |"IB_UPPER_BOUND"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|AAAAAAAALAAFHGBA  |Ms.             |Y                        |Georgetown     |KS          |single family       |M            |S                    |Unknown                |Unknown             |9.41344928741455    |150000            |
|AAAAAAAAMAAFHGBA  |Miss            |N                      

In [142]:
inference_df.count()

61590306

In [144]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from sklearn.feature_selection import SelectKBest, mutual_info_classif, RFE
from sklearn.model_selection import GridSearchCV

# Train the Random Forest classifier
clf = RandomForestClassifier(n_estimators=100, random_state=0)
clf.fit(train_x, train_y)

# Make predictions on the testing set
y_pred = clf.predict(test_x)

# Evaluate the performance of the model
acc = accuracy_score(test_y, y_pred)
prec = precision_score(test_y, y_pred, average='weighted')
rec = recall_score(test_y, y_pred, average='weighted')
f1 = f1_score(test_y, y_pred, average='weighted')
print("Accuracy:", acc)
print("Precision:", prec)
print("Recall:", rec)
print("F1 Score:", f1)

# Tune the hyperparameters using GridSearchCV
param_grid = {
    'n_estimators': [50, 100, 200],
    'max_depth': [None, 10, 20, 30],
    'min_samples_split': [2, 5, 10],
    'min_samples_leaf': [1, 2, 4],
    'max_features': ['auto', 'sqrt', 'log2']
}
grid_search = GridSearchCV(clf, param_grid, cv=5, scoring='f1_weighted')
grid_search.fit(train_x, train_y)
print("Best parameters:", grid_search.best_params_)

MemoryError: could not allocate 469762048 bytes