# Snowpark For Python -- Advertising Spend and ROI Prediction

### Objective
In this session, we will train a Linear Regression model to predict future ROI (Return On Investment) of variable ad spend budgets across multiple channels including search, video, social media, and email using Snowpark for Python and scikit-learn.

In [21]:
# Snowpark for Python
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import IntegerType, StringType, StructType, FloatType, StructField, DateType, Variant
from snowflake.snowpark.functions import udf, sum, col,array_construct,month,year,call_udf,lit
from snowflake.snowpark.version import VERSION
# Misc
import json
import pandas as pd
import logging 
logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

In [22]:
# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('select current_user(), current_role(), current_database(), current_schema(), current_version(), current_warehouse()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : '.format(snowflake_environment[0][0]))
print('Role                        : '.format(snowflake_environment[0][1]))
print('Database                    : '.format(snowflake_environment[0][2]))
print('Schema                      : '.format(snowflake_environment[0][3]))
print('Warehouse                   : '.format(snowflake_environment[0][5]))
print('Snowflake version           : '.format(snowflake_environment[0][4]))
print('Snowpark for Python version : '.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

User                        : 
Role                        : 
Database                    : 
Schema                      : 
Warehouse                   : 
Snowflake version           : 
Snowpark for Python version : 


In [3]:
snow_df_spend = session.table('campaign_spend')
snow_df_spend.queries

{'queries': ['SELECT  *  FROM (campaign_spend)'], 'post_actions': []}

In [4]:
# Action sends the DF SQL for execution
# Note: history object provides the query ID which can be helpful for debugging as well as the SQL query executed on the server
with session.query_history() as history:
    snow_df_spend.show()
history.queries

----------------------------------------------------------------------------------
|"CAMPAIGN"  |"CHANNEL"  |"DATE"  |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
----------------------------------------------------------------------------------
|            |           |        |                |              |              |
----------------------------------------------------------------------------------



[QueryRecord(query_id='01aa9247-0000-fdc7-0000-000394de817d', sql_text='SELECT  *  FROM campaign_spend LIMIT 10')]

### Total Spend per Channel per Month
Let's transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions.

In [5]:
# Stats per Month per Channel
snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
    with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

snow_df_spend_per_channel.show(10)

-----------------------------------------------
|"YEAR"  |"MONTH"  |"CHANNEL"  |"TOTAL_COST"  |
-----------------------------------------------
|        |         |           |              |
-----------------------------------------------



#### Pivot on Channel
Let's further transform the campaign spend data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions. This transformation will enable us to join with the revenue table such that we will have our input features and target variable in a single table for model training.

In [6]:
snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month = snow_df_spend_per_month.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)
snow_df_spend_per_month.show()

---------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |
---------------------------------------------------------------------------
|        |         |                 |                |         |         |
---------------------------------------------------------------------------



#### Total Revenue per Month
Now let's load revenue table and transform the data into revenue per year/month using group_by and agg() functions.

In [7]:
snow_df_revenue = session.table('monthly_revenue')
snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')
snow_df_revenue_per_month.show()

--------------------------------
|"YEAR"  |"MONTH"  |"REVENUE"  |
--------------------------------
|        |         |           |
--------------------------------



#### Join Total Spend and Total Revenue per Month
Next let's join this revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training.

In [8]:
snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])
snow_df_spend_and_revenue_per_month.show()

---------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"  |
---------------------------------------------------------------------------------------
|        |         |                 |                |         |         |           |
---------------------------------------------------------------------------------------



### >>>>>>>>>> Examine Snowpark DataFrame Query and Execution Plan <<<<<<<<<<
Snowpark makes is really convenient to look at the DataFrame query and execution plan using explain() Snowpark DataFrame function.

In [9]:
snow_df_spend_and_revenue_per_month.explain()

---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT  *  FROM (( SELECT "YEAR" AS "YEAR", "MONTH" AS "MONTH", "SEARCH_ENGINE" AS "SEARCH_ENGINE", "SOCIAL_MEDIA" AS "SOCIAL_MEDIA", "VIDEO" AS "VIDEO", "EMAIL" AS "EMAIL" FROM ( SELECT "YEAR", "MONTH", "'search_engine'" AS "SEARCH_ENGINE", "'social_media'" AS "SOCIAL_MEDIA", "'video'" AS "VIDEO", "'email'" AS "EMAIL" FROM ( SELECT  *  FROM ( SELECT  *  FROM ( SELECT  *  FROM ( SELECT "YEAR(DATE)" AS "YEAR", "MONTH(DATE)" AS "MONTH", "CHANNEL", "TOTAL_COST" FROM ( SELECT year("DATE") AS "YEAR(DATE)", month("DATE") AS "MONTH(DATE)", "CHANNEL", sum("TOTAL_COST") AS "TOTAL_COST" FROM ( SELECT  *  FROM campaign_spend) GROUP BY year("DATE"), month("DATE"), "CHANNEL")) ORDER BY "YEAR" ASC NULLS FIRST, "MONTH" ASC NULLS FIRST) PIVOT (sum("TOTAL_COST") FOR "CHANNEL" IN ('search_engine', 'social_media', 'video', 'email'))) ORDER BY "YEAR" ASC NULLS FIRST, "MONTH" ASC NULLS FIRST))) AS SNOWPARK_TEMP_TABLE_WZLR9FZ30F INNER JOIN ( SELECT 

### Model Training in Snowflake
#### Features and Target
At this point we are ready to perform the following actions to save features and target for model training.

Delete rows with missing values
Exclude columns we don't need for modeling
Save features into a Snowflake table called MARKETING_BUDGETS_FEATURES

In [10]:
# Delete rows with missing values
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

# Exclude columns we don't need for modeling
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

# Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')
snow_df_spend_and_revenue_per_month.show()

--------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"  |
--------------------------------------------------------------------
|                 |                |         |         |           |
--------------------------------------------------------------------



In [11]:
tst = session.table('MARKETING_BUDGETS_FEATURES')
tst.queries

{'queries': ['SELECT  *  FROM (MARKETING_BUDGETS_FEATURES)'],
 'post_actions': []}

In [12]:
with session.query_history() as history:
    tst.show()
history.queries

--------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"  |
--------------------------------------------------------------------
|                 |                |         |         |           |
--------------------------------------------------------------------



[QueryRecord(query_id='01aa9247-0000-fdc6-0000-000394de6315', sql_text='SELECT  *  FROM MARKETING_BUDGETS_FEATURES LIMIT 10')]

#### Python function to train a Linear Regression model using scikit-learn
Let's create a Python function that uses scikit-learn and other packages which are already included in Snowflake Anaconda channel and therefore available on the server-side when executing the Python function as a Stored Procedure running in Snowflake.

This function takes the following as parameters:

session: Snowflake Session object.
features_table: Name of the table that holds the features and target variable.
number_of_folds: Number of cross validation folds used in GridSearchCV.
polynomial_features_degress: PolynomialFeatures as a preprocessing step.
train_accuracy_threshold: Accuracy thresholds for train dataset. This values is used to determine if the model should be saved.
test_accuracy_threshold: Accuracy thresholds for test dataset. This values is used to determine if the model should be saved.
save_model: Boolean that determines if the model should be saved provided the accuracy thresholds are met.

In [13]:
def train_revenue_prediction_model(
    session: Session, 
    features_table: str, 
    number_of_folds: int, 
    polynomial_features_degrees: int, 
    train_accuracy_threshold: float, 
    test_accuracy_threshold: float, 
    save_model: bool) -> Variant:
    
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import PolynomialFeatures
    from sklearn.preprocessing import StandardScaler
    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import train_test_split, GridSearchCV

    import os
    from joblib import dump

    # Load features
    df = session.table(features_table).to_pandas()

    # Preprocess the Numeric columns
    # We apply PolynomialFeatures and StandardScaler preprocessing steps to the numeric columns
    # NOTE: High degrees can cause overfitting.
    numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = polynomial_features_degrees)),('scaler', StandardScaler())])

    # Combine the preprocessed step together using the Column Transformer module
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features)])

    # The next step is the integrate the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression())])
    parameteres = {}

    X = df.drop('REVENUE', axis = 1)
    y = df['REVENUE']

    # Split dataset into training and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 42)

    # Use GridSearch to find the best fitting model based on number_of_folds folds
    model = GridSearchCV(pipeline, param_grid=parameteres, cv=number_of_folds)

    model.fit(X_train, y_train)
    train_r2_score = model.score(X_train, y_train)
    test_r2_score = model.score(X_test, y_test)

    model_saved = False

    if save_model:
        if train_r2_score >= train_accuracy_threshold and test_r2_score >= test_accuracy_threshold:
            # Upload trained model to a stage
            model_output_dir = '/tmp'
            model_file = os.path.join(model_output_dir, 'model.joblib')
            dump(model, model_file)
            session.file.put(model_file,"@dash_models",overwrite=True)
            model_saved = True

    # Return model R2 score on train and test data
    return {"R2 score on Train": train_r2_score,
            "R2 threshold on Train": train_accuracy_threshold,
            "R2 score on Test": test_r2_score,
            "R2 threshold on Test": test_accuracy_threshold,
            "Model saved": model_saved}

### Test Python function before deploying it as a Stored Procedure on Snowflake
Since we're in test mode, we will set save_model = False so that the model is not saved just yet.

In [14]:
cross_validaton_folds = 10
polynomial_features_degrees = 2
train_accuracy_threshold = 0.85
test_accuracy_threshold = 0.85
save_model = False

train_revenue_prediction_model(
    session,
    'MARKETING_BUDGETS_FEATURES',
    cross_validaton_folds,
    polynomial_features_degrees,
    train_accuracy_threshold,
    test_accuracy_threshold,
    save_model)

ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.

### Create Stored Procedure to deploy model training code on Snowflake
Assuming the testing is complete and we're satisfied with the model, let's register the model training Python function as a Snowpark Python Stored Procedure by supplying the packages (snowflake-snowpark-python,scikit-learn, and joblib) it will need and use during execution.

In [15]:
session.sproc.register(
    func=train_revenue_prediction_model,
    name="train_revenue_prediction_model",
    packages=['snowflake-snowpark-python','scikit-learn','joblib'],
    is_permanent=True,
    stage_location="@dash_sprocs",
    replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x29ce08f3d60>

### >>>>>>>>>> Examine Query History in Snowsight <<<<<<<<<<
Execute Stored Procedure to train model and deploy it on Snowflake
Now we're ready to train the model and save it onto a Snowflake stage so let's set save_model = True and run/execute the Stored Procedure using session.call() function.

In [16]:
cross_validaton_folds = 10
polynomial_features_degrees = 2
train_accuracy_threshold = 0.85
test_accuracy_threshold = 0.85
save_model = True

print(session.call('train_revenue_prediction_model',
                    'MARKETING_BUDGETS_FEATURES',
                    cross_validaton_folds,
                    polynomial_features_degrees,
                    train_accuracy_threshold,
                    test_accuracy_threshold,
                    save_model))

Failed to execute query [queryID: 01aa9248-0000-fdc7-0000-000394de81ad] CALL train_revenue_prediction_model('MARKETING_BUDGETS_FEATURES', 10 :: INT, 2 :: INT, '0.85' :: FLOAT, '0.85' :: FLOAT, True :: BOOLEAN)
100357 (P0000): Python Interpreter Error:
Traceback (most recent call last):
  File "_udf_code.py", line 7, in compute
  File "C:\Users\15512\AppData\Local\Temp\ipykernel_22328\2873238866.py", line 42, in train_revenue_prediction_model
  File "/usr/lib/python_udf/c44bd28c1f111d4dac440cea14fff60ed16c45e517074f7a33374de715f9e7db/lib/python3.8/site-packages/sklearn/model_selection/_split.py", line 2562, in train_test_split
    n_train, n_test = _validate_shuffle_split(
  File "/usr/lib/python_udf/c44bd28c1f111d4dac440cea14fff60ed16c45e517074f7a33374de715f9e7db/lib/python3.8/site-packages/sklearn/model_selection/_split.py", line 2236, in _validate_shuffle_split
    raise ValueError(
ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty

SnowparkSQLException: (1304): 01aa9248-0000-fdc7-0000-000394de81ad: 100357 (P0000): Python Interpreter Error:
Traceback (most recent call last):
  File "_udf_code.py", line 7, in compute
  File "C:\Users\15512\AppData\Local\Temp\ipykernel_22328\2873238866.py", line 42, in train_revenue_prediction_model
  File "/usr/lib/python_udf/c44bd28c1f111d4dac440cea14fff60ed16c45e517074f7a33374de715f9e7db/lib/python3.8/site-packages/sklearn/model_selection/_split.py", line 2562, in train_test_split
    n_train, n_test = _validate_shuffle_split(
  File "/usr/lib/python_udf/c44bd28c1f111d4dac440cea14fff60ed16c45e517074f7a33374de715f9e7db/lib/python3.8/site-packages/sklearn/model_selection/_split.py", line 2236, in _validate_shuffle_split
    raise ValueError(
ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.
 in function TRAIN_REVENUE_PREDICTION_MODEL with handler compute

### Create Stored Procedure to deploy model training code on Snowflake
Assuming the testing is complete and we're satisfied with the model, let's register the model training Python function as a Snowpark Python Stored Procedure by supplying the packages (snowflake-snowpark-python,scikit-learn, and joblib) it will need and use during execution.

In [17]:
session.sproc.register(
    func=train_revenue_prediction_model,
    name="train_revenue_prediction_model",
    packages=['snowflake-snowpark-python','scikit-learn','joblib'],
    is_permanent=True,
    stage_location="@dash_sprocs",
    replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x29ce14bab80>

### >>>>>>>>>> Examine Query History in Snowsight <<<<<<<<<<
Execute Stored Procedure to train model and deploy it on Snowflake
Now we're ready to train the model and save it onto a Snowflake stage so let's set save_model = True and run/execute the Stored Procedure using session.call() function.

In [18]:
cross_validaton_folds = 10
polynomial_features_degrees = 2
train_accuracy_threshold = 0.85
test_accuracy_threshold = 0.85
save_model = True

print(session.call('train_revenue_prediction_model',
                    'MARKETING_BUDGETS_FEATURES',
                    cross_validaton_folds,
                    polynomial_features_degrees,
                    train_accuracy_threshold,
                    test_accuracy_threshold,
                    save_model))

Failed to execute query [queryID: 01aa9248-0000-fdc7-0000-000394de81c5] CALL train_revenue_prediction_model('MARKETING_BUDGETS_FEATURES', 10 :: INT, 2 :: INT, '0.85' :: FLOAT, '0.85' :: FLOAT, True :: BOOLEAN)
100357 (P0000): Python Interpreter Error:
Traceback (most recent call last):
  File "_udf_code.py", line 7, in compute
  File "C:\Users\15512\AppData\Local\Temp\ipykernel_22328\2873238866.py", line 42, in train_revenue_prediction_model
  File "/usr/lib/python_udf/c44bd28c1f111d4dac440cea14fff60ed16c45e517074f7a33374de715f9e7db/lib/python3.8/site-packages/sklearn/model_selection/_split.py", line 2562, in train_test_split
    n_train, n_test = _validate_shuffle_split(
  File "/usr/lib/python_udf/c44bd28c1f111d4dac440cea14fff60ed16c45e517074f7a33374de715f9e7db/lib/python3.8/site-packages/sklearn/model_selection/_split.py", line 2236, in _validate_shuffle_split
    raise ValueError(
ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty

SnowparkSQLException: (1304): 01aa9248-0000-fdc7-0000-000394de81c5: 100357 (P0000): Python Interpreter Error:
Traceback (most recent call last):
  File "_udf_code.py", line 7, in compute
  File "C:\Users\15512\AppData\Local\Temp\ipykernel_22328\2873238866.py", line 42, in train_revenue_prediction_model
  File "/usr/lib/python_udf/c44bd28c1f111d4dac440cea14fff60ed16c45e517074f7a33374de715f9e7db/lib/python3.8/site-packages/sklearn/model_selection/_split.py", line 2562, in train_test_split
    n_train, n_test = _validate_shuffle_split(
  File "/usr/lib/python_udf/c44bd28c1f111d4dac440cea14fff60ed16c45e517074f7a33374de715f9e7db/lib/python3.8/site-packages/sklearn/model_selection/_split.py", line 2236, in _validate_shuffle_split
    raise ValueError(
ValueError: With n_samples=0, test_size=0.2 and train_size=None, the resulting train set will be empty. Adjust any of the aforementioned parameters.
 in function TRAIN_REVENUE_PREDICTION_MODEL with handler compute

### >>>>>>>>>> Examine Query History in Snowsight <<<<<<<<<<
Create Scalar User-Defined Function (UDF) for inference
Now to deploy this model for inference, let's create and register a Snowpark Python UDF and add the trained model as a dependency. Once registered, getting new predictions is as simple as calling the function by passing in data.

NOTE: Scalar UDFs operate on a single row / set of data points and are great for online inference in real-time. And this UDF is called from the Streamlit App. See Snowpark_Streamlit_Revenue_Prediction.py

In [None]:
session.clear_imports()
session.clear_packages()

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import('@dash_models/model.joblib.gz')
session.add_packages('pandas','joblib','scikit-learn==1.1.1')

@udf(name='predict_roi',session=session,replace=True,is_permanent=True,stage_location='@dash_udfs')
def predict_roi(budget_allocations: list) -> float:
    import sys
    import pandas as pd
    from joblib import load
    import sklearn

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    model_file = import_dir + 'model.joblib.gz'
    model = load(model_file)
            
    features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    df = pd.DataFrame([budget_allocations], columns=features)
    roi = abs(model.predict(df)[0])
    return roi

### Call Scalar User-Defined Function (UDF) for inference on new data
Once the UDF is registered, getting new predictions is as simple as calling the call_udf() Snowpark Python function and passing in new datapoints.

Let's create a SnowPark DataFrame with some sample data and call the UDF to get new predictions.

In [19]:
test_df = session.create_dataframe([[250000,250000,200000,450000],[500000,500000,500000,500000],[8500,9500,2000,500]], 
                                    schema=['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL'])
test_df.select(
    'SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL', 
    call_udf("predict_roi", 
    array_construct(col("SEARCH_ENGINE"), col("SOCIAL_MEDIA"), col("VIDEO"), col("EMAIL"))).as_("PREDICTED_ROI")).show()

Failed to execute query [queryID: 01aa9248-0000-fdc7-0000-000394de81d5]  SELECT "SEARCH_ENGINE", "SOCIAL_MEDIA", "VIDEO", "EMAIL", predict_roi(array_construct("SEARCH_ENGINE", "SOCIAL_MEDIA", "VIDEO", "EMAIL")) AS "PREDICTED_ROI" FROM ( SELECT $1 AS "SEARCH_ENGINE", $2 AS "SOCIAL_MEDIA", $3 AS "VIDEO", $4 AS "EMAIL" FROM  VALUES (250000 :: INT, 250000 :: INT, 200000 :: INT, 450000 :: INT), (500000 :: INT, 500000 :: INT, 500000 :: INT, 500000 :: INT), (8500 :: INT, 9500 :: INT, 2000 :: INT, 500 :: INT)) LIMIT 10
002140 (42601): SQL compilation error:
Unknown function PREDICT_ROI


SnowparkSQLException: (1304): 01aa9248-0000-fdc7-0000-000394de81d5: 002140 (42601): SQL compilation error:
Unknown function PREDICT_ROI

### Create Vectorized User-Defined Function (UDF) using Batch API for inference
Here we will leverage the Python UDF Batch API to create a vectorized UDF which takes a Pandas Dataframe as input. This means that each call to the UDF receives a set/batch of rows compared to a Scalar UDF which gets one row as input.

First we will create a helper function load_model() that uses cachetools to make sure we only load the model once followed by batch_predict_roi() function that does the inference.

In [None]:
session.clear_imports()
session.clear_packages()

import cachetools
from snowflake.snowpark.types import PandasSeries, PandasDataFrame

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import('@dash_models/model.joblib.gz')
session.add_packages('pandas','joblib','scikit-learn','cachetools')

@cachetools.cached(cache={})
def load_model(filename):
    import joblib
    import sys
    import os

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

@udf(name='batch_predict_roi',session=session,replace=True,is_permanent=True,stage_location='@dash_udfs')
def batch_predict_roi(budget_allocations_df: PandasDataFrame[int, int, int, int]) -> PandasSeries[float]:
    import sklearn
    budget_allocations_df.columns = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    model = load_model('model.joblib.gz')
    return abs(model.predict(budget_allocations_df))

### Call Vectorized User-Defined Function (UDF) using Batch API for inference on new data

In [None]:
test_df.select(
    'SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL', 
    call_udf("batch_predict_roi", 
    col("SEARCH_ENGINE"), col("SOCIAL_MEDIA"), col("VIDEO"), col("EMAIL")).as_("PREDICTED_ROI")).show()

-----------------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"PREDICTED_ROI"     |
-----------------------------------------------------------------------------
|250000           |250000          |200000   |450000   |4072491.441668165   |
|500000           |500000          |500000   |500000   |3179613.166194177   |
|8500             |9500            |2000     |500      |189866.83304722887  |
-----------------------------------------------------------------------------



### Automate Data Pipeline and Model (re)Training using Snowflake Tasks
We can also optionally create Snowflake (Serverless or User-managed) Tasks to automate data pipelining and (re)training of the model on a set schedule.

NOTE: Creating tasks using Snowpark Python API (instead of SQL) is on the roadmap. Stay tuned! Or, follow me on [Twitter]((https://twitter.com/iamontheinet)) to get the news before anyone else :)_

#### Create Python Function for Data Pipeline and Feature Engineering

In [None]:
def data_pipeline_feature_engineering(session: Session) -> str:

  # DATA TRANSFORMATIONS
  # Perform the following actions to transform the data

  # Load the campaign spend data
  snow_df_spend = session.table('campaign_spend')

  # Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
  snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
      with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

  # Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
  snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
  snow_df_spend_per_month = snow_df_spend_per_month.select(
      col("YEAR"),
      col("MONTH"),
      col("'search_engine'").as_("SEARCH_ENGINE"),
      col("'social_media'").as_("SOCIAL_MEDIA"),
      col("'video'").as_("VIDEO"),
      col("'email'").as_("EMAIL")
  )

  # Load revenue table and transform the data into revenue per year/month using group_by and agg() functions
  snow_df_revenue = session.table('monthly_revenue')
  snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')

  # Join revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training
  snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])

  # SAVE FEATURES And TARGET
  # Perform the following actions to save features and target for model training

  # Delete rows with missing values
  snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

  # Exclude columns we don't need for modeling
  snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

  # Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
  snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')

  return "SUCCESS"

### Create Stored Procedure to deploy data pipelining feature engineeering code on Snowflake

In [None]:
session.sproc.register(
    func=data_pipeline_feature_engineering,
    name="data_pipeline_feature_engineering",
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location="@dash_sprocs",
    replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x1e1e4f189d0>

### Execute Stored Procedure to deploy data pipelining feature engineeering code on Snowflake

In [None]:
print(session.call('data_pipeline_feature_engineering'))

SUCCESS


### Create Root/Parent Snowflake Task: Data pipelining and feature engineeering

In [None]:
create_data_pipeline_feature_engineering_task = """
CREATE OR REPLACE TASK data_pipeline_feature_engineering_task
    WAREHOUSE = 'DASH_L'
    SCHEDULE  = '1 MINUTE'
AS
    CALL data_pipeline_feature_engineering()
"""
session.sql(create_data_pipeline_feature_engineering_task).collect()

[Row(status='Task DATA_PIPELINE_FEATURE_ENGINEERING_TASK successfully created.')]

### Create Child/Dependent Snowflake Task: Model training on Snowflake

In [None]:
create_model_training_task = """
CREATE OR REPLACE TASK model_training_task
    WAREHOUSE = 'DASH_L'
    AFTER data_pipeline_feature_engineering_task
AS
    CALL train_revenue_prediction_model('MARKETING_BUDGETS_FEATURES',10,2,0.85,0.85,True)
"""
session.sql(create_model_training_task).collect()

[Row(status='Task MODEL_TRAINING_TASK successfully created.')]

### Resume Tasks 

In [None]:
session.sql("alter task model_training_task resume").collect()
session.sql("alter task data_pipeline_feature_engineering_task resume").collect()

[Row(status='Statement executed successfully.')]

### Cleanup Resources

In [None]:
session.sql("alter task data_pipeline_feature_engineering_task suspend").collect()
session.sql("alter task model_training_task suspend").collect()

[Row(status='Statement executed successfully.')]