# 1 - Create Demo Data

In [None]:
-- Create fake data
create or replace table ECOMMERCE_CUSTOMERS (email string, gender string, MEMBERSHIP_STATUS string,  MEMBERSHIP_LENGTH double, AVG_SESSION_LENGTH double, TIME_ON_APP double, TIME_ON_WEBSITE double, YEARLY_SPENT double) as 
with profiles as (
select concat(lower(randstr(10, random())), '@', lower(randstr(5, random())), '.com') as EMAIL,
        case when uniform(1,10,random())<=7 then 'MALE'
            when uniform(1,10,random())<=10 then 'FEMALE'
        end as GENDER,
        uniform(100,75000,random()) / 100 as YEARLY_SPENT,
        case when YEARLY_SPENT < 150 then 'BASIC'
            when YEARLY_SPENT < 250 then 'BRONZE'
            when YEARLY_SPENT < 350 then 'SILVER'
            when YEARLY_SPENT < 550 then 'GOLD'
            when YEARLY_SPENT < 650 then 'PLATIN'
            when YEARLY_SPENT < 720 then 'DIAMOND'
        end as MEMBERSHIP_STATUS,
        case when YEARLY_SPENT < 150 then null
            when YEARLY_SPENT < 250 then uniform(50,150,random()) / 100
            when YEARLY_SPENT < 350 then uniform(250,350,random()) / 100
            when YEARLY_SPENT < 550 then uniform(300,550,random()) / 100
            when YEARLY_SPENT < 650 then uniform(500,750,random()) / 100
            when YEARLY_SPENT < 720 then uniform(700,1000,random()) / 100
        end as MEMBERSHIP_LENGTH,
        case when YEARLY_SPENT < 120 then null
            when YEARLY_SPENT < 150 then uniform(500,750,random()) / 100
            when YEARLY_SPENT < 250 then uniform(700,1000,random()) / 100
            when YEARLY_SPENT < 350 then uniform(900,2000,random()) / 100
            when YEARLY_SPENT < 550 then uniform(1900,2700,random()) / 100
            when YEARLY_SPENT < 650 then uniform(2500,3200,random()) / 100
            when YEARLY_SPENT < 1000 then uniform(3000,4000,random()) / 100
        end as AVG_SESSION_LENGTH,
        case when YEARLY_SPENT < 150 then uniform(5000,7500,random()) / 100
            when YEARLY_SPENT < 250 then uniform(7300,10000,random()) / 100
            when YEARLY_SPENT < 350 then uniform(9500,20000,random()) / 100
            when YEARLY_SPENT < 370 then null
            when YEARLY_SPENT < 550 then uniform(19000,27000,random()) / 100
            when YEARLY_SPENT < 650 then uniform(25000,32000,random()) / 100
            when YEARLY_SPENT < 1000 then uniform(30000,40000,random()) / 100
        end as TIME_ON_APP,
        case when YEARLY_SPENT < 300 then uniform(5000,7500,random()) / 100
            when YEARLY_SPENT < 500 then uniform(7000,15000,random()) / 100
            when YEARLY_SPENT < 520 then null
            when YEARLY_SPENT < 1000 then uniform(14000,30000,random()) / 100
        end as TIME_ON_WEBSITE
from table(generator(rowcount=>100)))
select email, gender, MEMBERSHIP_STATUS, MEMBERSHIP_LENGTH, AVG_SESSION_LENGTH, TIME_ON_APP, TIME_ON_WEBSITE, YEARLY_SPENT from profiles;

In [None]:
# Snowpark Imports
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
import snowflake.snowpark.functions as F

# Snowpark ML
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.preprocessing import OrdinalEncoder, OneHotEncoder
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.metrics.correlation import correlation
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error
from snowflake.ml.registry import Registry

# Snowflake Task API
from datetime import timedelta
from snowflake.core import Root
from snowflake.core.table import Table
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.core._common import CreateMode

# Other Imports
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

#
import warnings
warnings.filterwarnings("ignore")

# 2 - Feature Engineering

In [None]:
# Get a Session
session = get_active_session()

# Create a Snowpark DataFrame
df = session.table('ECOMMERCE_CUSTOMERS')
df.show()

In [None]:
df.describe().show()

In [None]:
# Split the data into train and test sets
train_df, test_df = df.random_split(weights=[0.9, 0.1], seed=0)
train_df.count(), test_df.count()

In [None]:
# Define sklearn-like Imputers and Encoders
si_numeric =  SimpleImputer(
    input_cols=['MEMBERSHIP_LENGTH','AVG_SESSION_LENGTH','TIME_ON_APP','TIME_ON_WEBSITE'], 
    output_cols=['MEMBERSHIP_LENGTH_IMP','AVG_SESSION_LENGTH_IMP','TIME_ON_APP_IMP','TIME_ON_WEBSITE_IMP'],
    strategy='mean'
)

si_categorical = SimpleImputer(
    input_cols=['MEMBERSHIP_STATUS'], 
    output_cols=['MEMBERSHIP_STATUS_IMP'],
    strategy='most_frequent'
)

# Define sklearn-like Encoders
categories = {
    "MEMBERSHIP_STATUS_IMP": np.array(["BASIC", "BRONZE", "SILVER", "GOLD", "PLATIN", "DIAMOND"]),
}
oe_categorical = OrdinalEncoder(
    input_cols=["MEMBERSHIP_STATUS_IMP"], 
    output_cols=["MEMBERSHIP_STATUS_IMP_OE"], 
    categories=categories
)

ohe_categorical = OneHotEncoder(
    input_cols=["GENDER"], 
    output_cols=["GENDER_OHE"]
)

# Build the pipeline
preprocessing_pipeline = Pipeline(
    steps=[
        ("SI_NUMERIC",si_numeric),
        ("SI_CATEGORICAL",si_categorical),
        ("OE_CATEGORICAL",oe_categorical),
        ("OHE_CATEGORICAL",ohe_categorical),
    ]
)

# Fit the pipeline and transform data
transformed_train_df = preprocessing_pipeline.fit(train_df).transform(train_df)
transformed_train_df.show()

transformed_test_df = preprocessing_pipeline.transform(test_df)
transformed_test_df.show()

In [None]:
# Drop unused columns
transformed_train_df = transformed_train_df.drop([
    'GENDER',
    'MEMBERSHIP_STATUS',
    'MEMBERSHIP_LENGTH',
    'AVG_SESSION_LENGTH',
    'TIME_ON_APP',
    'TIME_ON_WEBSITE']
)
transformed_test_df = transformed_test_df.drop([
    'GENDER',
    'MEMBERSHIP_STATUS',
    'MEMBERSHIP_LENGTH',
    'AVG_SESSION_LENGTH',
    'TIME_ON_APP',
    'TIME_ON_WEBSITE']
)

In [None]:
# Calculate correlations
corr_train_df = correlation(df=transformed_train_df)
#corr_train_df # This is a Pandas DataFrame

# Generate a mask for the upper triangle
mask = np.triu(np.ones_like(corr_train_df, dtype=bool))

# Create a heatmap with the features
plt.figure(figsize=(10, 5))
heatmap = sns.heatmap(corr_train_df.round(3), mask=mask, cmap="YlGnBu", annot=True, vmin=-1, vmax=1)

In [None]:
ALTER WAREHOUSE COMPUTE_WH SET WAREHOUSE_SIZE='MEDIUM';

# 3 - Model Training

In [None]:
feature_cols = [
    'GENDER_OHE_FEMALE',
    'GENDER_OHE_MALE',
    'MEMBERSHIP_STATUS_IMP_OE',
    'MEMBERSHIP_LENGTH_IMP',
    'AVG_SESSION_LENGTH_IMP',
    'TIME_ON_APP_IMP',
    'TIME_ON_WEBSITE_IMP'
]
label_cols = ['YEARLY_SPENT']
output_cols = ['YEARLY_SPENT_PREDICTION']


grid_search = GridSearchCV(
    estimator=XGBRegressor(),
    param_grid={
        "n_estimators":[100, 200],
        "learning_rate":[0.1, 0.2],
    },
    n_jobs = -1,
    scoring="neg_mean_absolute_percentage_error",
    input_cols=feature_cols,
    label_cols=label_cols,
    output_cols=output_cols
)

# Train
grid_search.fit(transformed_train_df)

In [None]:
ALTER WAREHOUSE COMPUTE_WH SET WAREHOUSE_SIZE='XSMALL';

# 4 - Model Evaluation

In [None]:
# Analyze grid search results
gs_results = grid_search.to_sklearn().cv_results_
n_estimators_val = []
learning_rate_val = []
for param_dict in gs_results["params"]:
    n_estimators_val.append(param_dict["n_estimators"])
    learning_rate_val.append(param_dict["learning_rate"])
mape_val = gs_results["mean_test_score"]*-1

gs_results_df = pd.DataFrame(data={
    "n_estimators":n_estimators_val,
    "learning_rate":learning_rate_val,
    "mape":mape_val})

sns.relplot(data=gs_results_df, x="learning_rate", y="mape", hue="n_estimators", kind="line")

plt.show()

In [None]:
# Predict
result = grid_search.predict(transformed_test_df)

# Analyze results
mape = mean_absolute_percentage_error(df=result, 
                                        y_true_col_names="YEARLY_SPENT", 
                                        y_pred_col_names="YEARLY_SPENT_PREDICTION")

result.select("YEARLY_SPENT", "YEARLY_SPENT_PREDICTION").show()
print(f"Mean absolute percentage error: {mape}")

# Plot actual vs predicted 
g = sns.relplot(
    data=result["YEARLY_SPENT", "YEARLY_SPENT_PREDICTION"].to_pandas().astype("float64"), 
    x="YEARLY_SPENT", 
    y="YEARLY_SPENT_PREDICTION", 
    kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

plt.show()

# 5 - Register & Run Model

In [None]:
# Create reference to model registry
reg = Registry(session=session, database_name="MACHINE_LEARNING", schema_name="PUBLIC")

# Get latest model version
try:
    model_versions = reg.get_model("ECOMMERCE_SPENT_MODEL").show_versions()
    idx = model_versions['created_on'].idxmax()
    most_recent_version = model_versions.loc[idx]
    new_version = 'V'+str(int(most_recent_version['name'][1:])+1)
except:
    new_version = 'V0'

# Register new model version
registered_model = reg.log_model(
    grid_search,
    model_name="ECOMMERCE_SPENT_MODEL",
    version_name=new_version,
    comment="Model trained using GridsearchCV in Snowpark to predict customer's yearly spending.",
    metrics={"mean_abs_pct_err": mape},
    sample_input_data=transformed_train_df.select(feature_cols).limit(100)
)

# Create predictions from registered model
predictions_df = registered_model.run(transformed_test_df, function_name="predict")
predictions_df.show()

In [None]:
transformed_test_df.create_or_replace_temp_view('INFERENCE_TEST')

In [None]:
SELECT *, 
    ECOMMERCE_SPENT_MODEL!PREDICT(
        "GENDER_OHE_FEMALE",
        "GENDER_OHE_MALE",
        "MEMBERSHIP_STATUS_IMP_OE",
        "MEMBERSHIP_LENGTH_IMP",
        "AVG_SESSION_LENGTH_IMP",
        "TIME_ON_APP_IMP",
        "TIME_ON_WEBSITE_IMP")['YEARLY_SPENT_PREDICTION'] AS YEARLY_SPENT_PREDICTION
FROM INFERENCE_TEST;

# 6 - Automate Pipeline

In [None]:
-- Create a stage to store Pipeline artifacts
CREATE OR REPLACE STAGE DAG_STAGE;

In [None]:
# Define preprocessing task
def preprocess_data(session: Session) -> str:
    df = session.table('ECOMMERCE_CUSTOMERS')
    transformed_df = preprocessing_pipeline.transform(df)
    transformed_df.write.save_as_table('ECOMMERCE_CUSTOMERS_PREPARED', mode='overwrite')
    num_rows = session.table('ECOMMERCE_CUSTOMERS_PREPARED').count()
    return f"Prepared new training table with {num_rows} customers."

# Define model training task
def train_model(session: Session) -> str:
    reg = Registry(session=session, database_name="MACHINE_LEARNING", schema_name="PUBLIC")
    df = session.table('ECOMMERCE_CUSTOMERS_PREPARED')
    feature_cols = [
        'GENDER_OHE_FEMALE',
        'GENDER_OHE_MALE',
        'MEMBERSHIP_STATUS_IMP_OE',
        'MEMBERSHIP_LENGTH_IMP',
        'AVG_SESSION_LENGTH_IMP',
        'TIME_ON_APP_IMP',
        'TIME_ON_WEBSITE_IMP'
    ]
    label_cols = ['YEARLY_SPENT']
    output_cols = ['YEARLY_SPENT_PREDICTION']

    # Define parameter tuning
    grid_search = GridSearchCV(
        estimator=XGBRegressor(),
        param_grid={
            "n_estimators":[100, 200],
            "learning_rate":[0.1, 0.2],
        },
        n_jobs = -1,
        scoring="neg_mean_absolute_percentage_error",
        input_cols=feature_cols,
        label_cols=label_cols,
        output_cols=output_cols
    )
    
    # Train
    grid_search.fit(df)

    # Get latest model version
    model_versions = reg.get_model("ECOMMERCE_SPENT_MODEL").show_versions()
    idx = model_versions['created_on'].idxmax()
    most_recent_version = model_versions.loc[idx]
    new_version = 'V'+str(int(most_recent_version['name'][1:])+1)

    # Register new model version
    registered_model = reg.log_model(
        grid_search,
        model_name="ECOMMERCE_SPENT_MODEL",
        version_name=new_version,
        comment="Model trained using GridsearchCV in Snowpark to predict customer's yearly spending.",
        sample_input_data=df.select(feature_cols).limit(100)
    )
    
    return f"Registered new model with version: {new_version}"

In [None]:
root = Root(session)

with DAG("MY_TRAINING_PIPELINE", stage_location='DAG_STAGE', schedule=timedelta(minutes=10)) as dag:
    task_preprocess_data = DAGTask(
        "PREPROCESS_DATA",
        definition=StoredProcedureCall(preprocess_data, stage_location='DAG_STAGE', packages=["snowflake-snowpark-python","snowflake-ml-python"]),
        warehouse="COMPUTE_WH"
    )
    task_train_model = DAGTask(
        "TRAIN_MODEL",
        definition=StoredProcedureCall(train_model, stage_location='DAG_STAGE', packages=["snowflake-snowpark-python","snowflake-ml-python"]),
        warehouse="COMPUTE_WH"
    )

task_preprocess_data >> task_train_model
schema = root.databases["MACHINE_LEARNING"].schemas["PUBLIC"]
dag_op = DAGOperation(schema)
dag_op.deploy(dag, mode=CreateMode.or_replace)
dag_op.run(dag)


# 7 - Clean Up

In [None]:
*STOP*

# Delete Tasks
root = Root(session)
tasks = root.databases["MACHINE_LEARNING"].schemas["PUBLIC"].tasks

task_res = tasks['MY_TRAINING_PIPELINE']
task_res.suspend()
task_res.delete()

task_res = tasks['MY_TRAINING_PIPELINE$PREPROCESS_DATA']
task_res.suspend()
task_res.delete()

task_res = tasks['MY_TRAINING_PIPELINE$TRAIN_MODEL']
task_res.suspend()
task_res.delete()

for t in root.databases["MACHINE_LEARNING"].schemas["PUBLIC"].tasks.iter(like='%'):
    print(t.name)

# Delete Data
my_table_res = root.databases["MACHINE_LEARNING"].schemas["PUBLIC"].tables["ECOMMERCE_CUSTOMERS_PREPARED"]
my_table_res.delete()

my_table_res = root.databases["MACHINE_LEARNING"].schemas["PUBLIC"].tables["ECOMMERCE_CUSTOMERS"]
my_table_res.delete()

# Delete models
reg = Registry(session=session, database_name="MACHINE_LEARNING", schema_name="PUBLIC")
reg.delete_model("ECOMMERCE_SPENT_MODEL")

# Drop stage
session.sql('DROP STAGE DAG_STAGE').collect()