## Steps:
- Import libraries
- Setup Snowflake objects
- Load data to Snowflake (you may skip this step if you already have data in Snowflake)
- Write code to run Implicit library locally on your machine
- Package code to and make it clean
- Create a Task and schedule it

# Imports

In [1]:
from snowflake.snowpark.session import Session ./
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F
from snowflake.snowpark.functions import when, col
from snowflake.snowpark.window import Window
from snowflake.snowpark.functions import dense_rank

from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask

import json

import os

import numpy as np
import pandas as pd
import regex as re

import scipy.sparse as sparse
from scipy.sparse.linalg import spsolve
import random
from sklearn.preprocessing import MinMaxScaler
import implicit 
from datetime import datetime, timedelta

import warnings
warnings.filterwarnings("ignore")

In [2]:
connection_parameters = json.load(open('/Users/skhara/Documents/GitHub/creds.json'))
session = Session.builder.configs(connection_parameters).create()

In [3]:
session.sql('CREATE DATABASE IF NOT EXISTS RECOMMENDER_SYSTEMS').collect()
session.sql('CREATE SCHEMA IF NOT EXISTS RECOMMENDER_SYSTEMS.COLLABORATIVE_FILTERING_ALS').collect()

session.sql('USE DATABASE RECOMMENDER_SYSTEMS').collect()
session.sql('USE SCHEMA COLLABORATIVE_FILTERING_ALS').collect()
session.sql('CREATE STAGE IF NOT EXISTS ML_MODELS;').collect()

[Row(status='ML_MODELS already exists, statement succeeded.')]

# Load Data to Snowflake
This is done in case your data is not already in a Snowflake table. If it is then you can skip this step.

In [57]:
# Loading from local CSV-files
events_data = pd.read_csv('data/events.csv')
events_data.head()

Unnamed: 0,timestamp,visitorid,event,itemid,transactionid
0,1433221332117,257597,view,355908,
1,1433224214164,992329,view,248676,
2,1433221999827,111016,view,318965,
3,1433221955914,483717,view,253185,
4,1433221337106,951259,view,367447,


In [None]:
session.write_pandas(events_data, table_name='EVENTS_DATA', auto_create_table=True, overwrite=True)

In [71]:
# Cleaning column names to make it easier for future referencing
snow_table = session.table('EVENTS_DATA')

import re

cols = snow_table.columns
for old_col in cols:
    new_col = re.sub(r'[^a-zA-Z0-9_]', '', old_col)
    new_col = new_col.upper()
    snow_table = snow_table.rename(col(old_col), new_col)
snow_table.write.mode("overwrite").save_as_table("EVENTS_DATA")

# Step 1: Testing Locally

In [180]:
# Using Snowpark Dataframes for Push-down compute.
datapath= 'RECOMMENDER_SYSTEMS.COLLABORATIVE_FILTERING_ALS.EVENTS_DATA'
snf_df = session.table(datapath)
snf_df = snf_df.with_column("TS_DATE", F.to_date(F.to_timestamp(F.col('TIMESTAMP')/F.lit(1000))))
snf_df = snf_df.withColumn("EVENT_ID", when(col("EVENT") == "transaction", 3)
                                        .when(col("EVENT") == "addtocart", 2)
                                        .when(col("EVENT") == "view", 1))

window_spec = Window.partition_by().order_by("VISITORID")
snf_df = snf_df.with_column("VISITOR_ID", dense_rank().over(window_spec))

window_spec_item = Window.partition_by().order_by("ITEMID")
snf_df = snf_df.with_column("ITEM_ID", dense_rank().over(window_spec_item))

snf_df.write.mode("overwrite").save_as_table("EVENTS_DATA_CLEANED")

In [152]:
snf_cleaned = session.table('EVENTS_DATA_CLEANED')
snf_cleaned.limit(5).to_pandas()

Unnamed: 0,TIMESTAMP,VISITORID,EVENT,ITEMID,TRANSACTIONID,TS_DATE,EVENT_ID,VISITOR_ID,ITEM_ID
0,1434389885755,290541,view,427952,,2015-06-15,1,290542,215538
1,1434409631844,300845,view,427935,,2015-06-15,1,300846,215526
2,1434402529968,391960,view,236503,,2015-06-15,1,391961,119073
3,1434405086107,536804,view,355266,,2015-06-15,1,536805,179017
4,1434343357382,1108722,view,174284,,2015-06-15,1,1108723,87811


In [160]:
data = snf_cleaned.to_pandas()

In [164]:
max(data['TS_DATE'])

datetime.date(2015, 9, 18)

In [136]:
sparse_item_user = sparse.csr_matrix((data['EVENT_ID'].astype(float), (data['ITEM_ID'], data['VISITOR_ID'])))
sparse_user_item = sparse.csr_matrix((data['EVENT_ID'].astype(float), (data['VISITOR_ID'], data['ITEM_ID'])))

alpha_val = 40
data_conf = (sparse_user_item * alpha_val).astype('double')

#Building the model
model = implicit.als.AlternatingLeastSquares(factors=20, regularization=0.1, iterations=20)
model.fit(data_conf)

  0%|          | 0/20 [00:00<?, ?it/s]

In [None]:
# Get Recommendations
user_id = 2
reco = model.recommend(user_id, sparse_user_item[user_id], N=5)
print(reco)

# Option 1: SPROC Based Orchestration in Snowflake

Here we take all the pieces of code written above for local testing and package in a modularized format. We are choosing to
schedule the preprocess pipeline as a predecessor to model train and inference.

In [139]:
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall, Task
from snowflake.core.task.dagv1 import DAGOperation, DAG, DAGTask
api_root = Root(session)

In [198]:
# Task 1: Preprocess Data
def preprocess_data(session:Session) -> str:
    datapath= 'RECOMMENDER_SYSTEMS.COLLABORATIVE_FILTERING_ALS.EVENTS_DATA'
    snf_df = session.table(datapath)
    snf_df = snf_df.with_column("TS_DATE", F.to_date(F.to_timestamp(F.col('TIMESTAMP')/F.lit(1000))))
    snf_df = snf_df.sort(F.col('"TS_DATE"').asc())

    snf_df = snf_df.withColumn("EVENT_ID", when(col("EVENT") == "transaction", 3)
                                            .when(col("EVENT") == "addtocart", 2)
                                            .when(col("EVENT") == "view", 1))

    window_spec = Window.partition_by().order_by("VISITORID")
    snf_df = snf_df.with_column("VISITOR_ID", dense_rank().over(window_spec))

    window_spec_item = Window.partition_by().order_by("ITEMID")
    snf_df = snf_df.with_column("ITEM_ID", dense_rank().over(window_spec_item))

    snf_df.write.mode("overwrite").save_as_table("EVENTS_DATA_CLEANED")

    return 'DATA PROCESSING SUCCESS'

In [199]:
# Registering the function as a Stored Procedure
sproc_model_train = session.sproc.register(func=preprocess_data,
                                           name='ALS_DATA_PREPROCESS',
                                           is_permanent=True,
                                           replace=True,
                                           stage_location='@ML_MODELS',
                                           packages=["snowflake-snowpark-python","snowflake-ml-python", "joblib",
                                                     "regex", "scipy", "implicit==0.6.2", "numpy==1.23.5"])

The version of package 'joblib' in the local environment is 1.3.2, which does not fit the criteria for the requirement 'joblib'. Your UDF might not work when the package version is different between the server and your local environment.


In [207]:
# TEST 1 - SMALL
session.sql('USE WAREHOUSE DEMO_STANDARD;').collect()
session.sql('ALTER WAREHOUSE DEMO_STANDARD SET WAREHOUSE_SIZE = "MEDIUM"').collect()


In [141]:
# Task 2: Model Train + Inference
def train_model(session:Session, sparse_user_item):
    import implicit
    import scipy.sparse as sparse

    alpha_val = 40
    data_conf = (sparse_user_item * alpha_val).astype('double')

    model = implicit.als.AlternatingLeastSquares(factors=20, regularization=0.1, iterations=20)
    model.fit(data_conf)

    return model

# Run the Process
def get_predictions(session:Session) -> str:
    import pandas as pd
    import implicit
    import scipy.sparse as sparse
    from datetime import datetime
    import snowflake.snowpark.functions as F

    data = session.table('RECOMMENDER_SYSTEMS.COLLABORATIVE_FILTERING_ALS.EVENTS_DATA_CLEANED').to_pandas()

    # Sparse matrix are more performant when the range of numbers isnt too large
    # sparse_item_user = sparse.csr_matrix((data['event'].astype(float), (data['item_id'], data['visitor_id'])))
    sparse_user_item = sparse.csr_matrix((data['EVENT_ID'].astype(float), (data['VISITOR_ID'], data['ITEM_ID'])))
    model = train_model(session, sparse_user_item)
    recommended = model.recommend(data['VISITOR_ID'].to_list(), sparse_user_item[data['VISITOR_ID'].to_list()], N=5)
    rec_df = pd.DataFrame(reco[0], columns=['rec1', 'rec2', 'rec3', 'rec4', 'rec5'])

    # Save Data in Snowflake
    session.write_pandas(rec_df, table_name='ITEM_RECOMMENDATIONS', auto_create_table=True, overwrite=True)
    return 'Success'

In [142]:
schema = api_root.databases['RECOMMENDER_SYSTEMS'].schemas['COLLABORATIVE_FILTERING_ALS']
tasks = schema.tasks

In [143]:
# Create Task 1: Preprocess Task
task1_entity = Task(
    "PREPROCESS_DATA",
    definition = StoredProcedureCall(preprocess_data,
                                   stage_location="@ML_MODELS",
                                   packages=["snowflake-snowpark-python","snowflake-ml-python", "regex"]),
    warehouse = connection_parameters['warehouse'],
    schedule = timedelta(days=1))

task1 = tasks.create(task1_entity, mode="orReplace")

# Create Task 2: for model training and inference
task2_entity = Task(
    "RECO_ENGINE",
    definition = StoredProcedureCall(get_predictions, stage_location="@ML_MODELS", 
                                     packages=["snowflake-snowpark-python","snowflake-ml-python",
                                               "regex", "scipy", "implicit==0.6.2", "numpy==1.23.5"]),
    warehouse = connection_parameters['warehouse']
    )

task2_entity.predecessors = ["RECOMMENDER_SYSTEMS.COLLABORATIVE_FILTERING_ALS.PREPROCESS_DATA"]
task2 = tasks.create(task2_entity, mode="orReplace")

task2.resume()
task1.resume()
task1.execute()

# Option 2: Distributed Modeling using UDF

### Model Training using SPROC

In [4]:
# Model Training SPROC
def model_train(session:Session, table_name: str) -> str:
    import pandas as pd
    import joblib, implicit
    import scipy.sparse as sparse
    from scipy.sparse import save_npz

    data = session.table(table_name).to_pandas()

    # Sparse matrix are more performant when the range of numbers isnt too large
    sparse_user_item = sparse.csr_matrix((data['EVENT_ID'].astype(float), (data['VISITOR_ID'], data['ITEM_ID'])))

    # Model Training
    alpha_val = 40
    data_conf = (sparse_user_item * alpha_val).astype('double')
    model = implicit.als.AlternatingLeastSquares(factors=20, regularization=0.1, iterations=20)
    model.fit(data_conf)

    # Serialize sparse_user_item
    save_npz('/tmp/sparse_user_item.npz', sparse_user_item)
    session.file.put('/tmp/sparse_user_item.npz', '@ML_MODELS/TRAIN_OUTPUT', auto_compress=False, overwrite=True)

    # Save model file
    FILE_LOCATION = '/tmp/als_model.joblib'
    joblib.dump(model, FILE_LOCATION)
    session.file.put(FILE_LOCATION, '@ML_MODELS/TRAIN_OUTPUT', auto_compress=False, overwrite=True)
    
    return 'Success'

In [5]:
# Registering the function as a Stored Procedure
sproc_model_train = session.sproc.register(func=model_train,
                                           name='ALS_MODEL_TRAIN',
                                           is_permanent=True,
                                           replace=True,
                                           stage_location='@ML_MODELS',
                                           packages=["snowflake-snowpark-python","snowflake-ml-python", "joblib",
                                                     "regex", "scipy", "implicit==0.6.2", "numpy==1.23.5"])

The version of package 'joblib' in the local environment is 1.3.2, which does not fit the criteria for the requirement 'joblib'. Your UDF might not work when the package version is different between the server and your local environment.


In [6]:
# TEST 3 - SOWH MEDIUM
session.sql('USE WAREHOUSE SSK_RESEARCH;').collect()
session.sql('ALTER WAREHOUSE DEMO_STANDARD SET WAREHOUSE_SIZE = "MEDIUM"').collect()

table_name = 'RECOMMENDER_SYSTEMS.COLLABORATIVE_FILTERING_ALS.EVENTS_DATA_CLEANED'
_ = session.call("ALS_MODEL_TRAIN", table_name)

In [7]:
_

'Success'

### UDF for Inference

In [154]:
feature_cols = ['VISITORID','VISITOR_ID']

In [219]:
# Define a simple scoring function
from cachetools import cached

@cached(cache={})
def load_from_stage(import_dir) -> object:
    import joblib
    from scipy.sparse import load_npz
    model = joblib.load(import_dir + 'als_model.joblib') # Load Model
    sparse_user_item = load_npz(import_dir + 'sparse_user_item.npz') # Load the Sparse input file
    return model, sparse_user_item

def udf_als_score(df: T.PandasDataFrame[int, int]) -> T.PandasSeries[dict]:
    import sys, implicit
    # file-dependencies of UDFs are available in snowflake_import_directory
    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    model, sparse_user_item = load_from_stage(import_dir)
    df.columns = feature_cols
    recommended = model.recommend(df['VISITOR_ID'].to_list(), sparse_user_item[df['VISITOR_ID'].to_list()], N=5)

    # Processing for output
    recommended_series = pd.Series([{"recommendations": row.tolist()} for row in recommended[0]])
    return recommended_series

In [220]:
# Register UDF
udf_als = session.udf.register(func=udf_als_score, 
                               name="ALS_COLAB_FILTERING", 
                               stage_location='@ML_MODELS',
                               replace=True,
                               is_permanent=True, 
                               imports=['@ML_MODELS/TRAIN_OUTPUT/als_model.joblib',
                                        '@ML_MODELS/TRAIN_OUTPUT/sparse_user_item.npz'],
                               packages=["snowflake-snowpark-python","snowflake-ml-python", "joblib",
                                         "regex", "scipy", "implicit==0.6.2", "numpy==1.23.5"],
                               session=session)

The version of package 'joblib' in the local environment is 1.3.2, which does not fit the criteria for the requirement 'joblib'. Your UDF might not work when the package version is different between the server and your local environment.


In [221]:
# TEST 5 - SOWH MEDIUM
session.sql('USE WAREHOUSE SSK_RESEARCH;').collect()
session.sql('ALTER WAREHOUSE DEMO_STANDARD SET WAREHOUSE_SIZE = "MEDIUM"').collect()

feature_cols = ['VISITORID','VISITOR_ID']
snowdf_test = session.table('RECOMMENDER_SYSTEMS.COLLABORATIVE_FILTERING_ALS.EVENTS_DATA_CLEANED').select(feature_cols)
snowdf_test = snowdf_test.drop_duplicates('VISITORID')
test_sdf_w_preds = snowdf_test.with_column('PREDICTED', F.call_udf("ALS_COLAB_FILTERING", [F.col(c) for c in feature_cols]))\
                                .with_column("RECOMMENDATIONS", F.col("PREDICTED")['recommendations'])
test_sdf_w_preds.write.mode("overwrite").save_as_table("RECOMMENDATIONS_OUTPUT")

### Orchestration Using Tasks

In [125]:
# Task: Train and Predict
def train_and_predict(session:Session) -> str:
    from snowflake.snowpark.functions import udf
    import snowflake.snowpark.functions as F

    # Call SPROC
    table_name = 'RECOMMENDER_SYSTEMS.COLLABORATIVE_FILTERING_ALS.EVENTS_DATA_CLEANED'
    _ = session.call("ALS_MODEL_TRAIN", table_name)

    # Prediction using UDF
    feature_cols = ['VISITORID','VISITOR_ID']
    snowdf_test = session.table('RECOMMENDER_SYSTEMS.COLLABORATIVE_FILTERING_ALS.EVENTS_DATA_CLEANED').select(feature_cols)
    snowdf_test = snowdf_test.drop_duplicates('VISITORID')

    test_sdf_w_preds = snowdf_test.with_column('PREDICTED', F.call_udf("ALS_COLAB_FILTERING", [F.col(c) for c in feature_cols]))\
                                    .with_column("RECOMMENDATIONS", F.col("PREDICTED")['recommendations'])

    test_sdf_w_preds.write.mode("overwrite").save_as_table("RECOMMENDATIONS_OUTPUT")

    return 'Recommendation Model Success'

In [126]:
# Create Task Object
api_root = Root(session)
schema = api_root.databases['RECOMMENDER_SYSTEMS'].schemas['COLLABORATIVE_FILTERING_ALS']
tasks = schema.tasks

# Create the Task
task1_entity = Task(
    "TRAIN_AND_PREDICT",
    definition = StoredProcedureCall(train_and_predict,
                                     stage_location="@ML_MODELS",
                                     packages=["snowflake-snowpark-python","snowflake-ml-python", "regex"]),
    warehouse = connection_parameters['warehouse'],
    schedule = timedelta(days=1))

task1 = tasks.create(task1_entity, mode="orReplace")
task1.resume()

In [129]:
task1.execute()