# Snowpark For Python -- Advertising Spend and ROI Prediction

In [1]:
import json
import logging

import pandas as pd
from snowflake.snowpark.functions import (array_construct, call_udf, col, lit,
                                          month, sum, udf, year)
from snowflake.snowpark.session import Session
from snowflake.snowpark.types import (DateType, FloatType, IntegerType,
                                      StringType, StructField, StructType,
                                      Variant)
from snowflake.snowpark.version import VERSION

logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

## Establish Secure Connection to Snowflake

In [2]:
# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
session.sql_simplifier_enabled = True

Create objects to use for this demonstration.

In [5]:
# session.sql(
#     """CREATE WAREHOUSE IF NOT EXISTS SNOWPARK_DEMO_WH 
#            WITH WAREHOUSE_SIZE = 'MEDIUM' 
#                 WAREHOUSE_TYPE = 'STANDARD' 
#                 AUTO_SUSPEND = 60 
#                 AUTO_RESUME = TRUE 
#                 INITIALLY_SUSPENDED = TRUE;"""
# ).collect()
session.sql("CREATE DATABASE IF NOT EXISTS SNOWPARK_ROI_DEMO;").collect()
session.sql("DROP SCHEMA IF EXISTS SNOWPARK_ROI_DEMO.PUBLIC;").collect()
session.sql("CREATE SCHEMA IF NOT EXISTS SNOWPARK_ROI_DEMO.AD_DATA;").collect()
session.sql(
    """CREATE FILE FORMAT IF NOT EXISTS SNOWPARK_ROI_DEMO.AD_DATA.CSVFORMAT 
           SKIP_HEADER = 1 
           TYPE = 'CSV';"""
).collect()
session.sql(
    """CREATE STAGE IF NOT EXISTS SNOWPARK_ROI_DEMO.AD_DATA.CAMPAIGN_DATA_STAGE 
           FILE_FORMAT = SNOWPARK_ROI_DEMO.AD_DATA.CSVFORMAT  
           URL = 's3://sfquickstarts/Summit 2022 Keynote Demo/campaign_spend/';"""
).collect()
session.sql(
    """CREATE STAGE IF NOT EXISTS SNOWPARK_ROI_DEMO.AD_DATA.MONTHLY_REVENUE_DATA_STAGE 
           FILE_FORMAT = SNOWPARK_ROI_DEMO.AD_DATA.CSVFORMAT  
           URL = 's3://sfquickstarts/Summit 2022 Keynote Demo/monthly_revenue/';"""
).collect()
session.sql("CREATE OR REPLACE STAGE SNOWPARK_ROI_DEMO.AD_DATA.PYTHON_MODELS").collect()
session.sql("CREATE OR REPLACE STAGE SNOWPARK_ROI_DEMO.AD_DATA.PYTHON_CODE").collect()

[Row(status='Stage area PYTHON_CODE successfully created.')]

Change the current context.

In [4]:
# session.use_warehouse("SNOWPARK_DEMO_WH")
session.use_database("SNOWPARK_ROI_DEMO")
session.use_schema("AD_DATA")

In [5]:
snowflake_environment = session.sql('select current_user(), current_role(), current_database(), current_schema(), current_version(), current_warehouse()').collect()
snowpark_version = VERSION

# Current Environment Details
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(snowflake_environment[0][1]))
print('Database                    : {}'.format(snowflake_environment[0][2]))
print('Schema                      : {}'.format(snowflake_environment[0][3]))
print('Warehouse                   : {}'.format(snowflake_environment[0][5]))
print('Snowflake version           : {}'.format(snowflake_environment[0][4]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

User                        : RFAJRI27
Role                        : ACCOUNTADMIN
Database                    : SNOWPARK_ROI_DEMO
Schema                      : AD_DATA
Warehouse                   : COMPUTE_WH
Snowflake version           : 7.30.0
Snowpark for Python version : 1.7.0


View the files in the external stage.

In [8]:
session.sql("LS @CAMPAIGN_DATA_STAGE").collect()

[Row(name='s3://sfquickstarts/Summit 2022 Keynote Demo/campaign_spend/campaign_spend.csv', size=13684943, md5='1d87f70421662a7666d3918b16b81daa', last_modified='Fri, 5 Aug 2022 20:22:18 GMT')]

## Load Aggregated Campaign Spend Data from Snowflake table into Snowpark DataFrame
Let's first load the campaign spend data. This table contains ad click data that has been aggregated to show daily spend across digital ad channels including search engines, social media, email and video.

NOTE: Ways to load data in a Snowpark Dataframe

* session.table("db.schema.table")
* session.sql("select col1, col2... from tableName")
* session.read.parquet("@stageName/path/to/file")
* session.create_dataframe([1,2,3], schema=["col1"])

TIP: For more information on Snowpark DataFrames, refer to the [docs](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.html#snowflake.snowpark.DataFrame).

Query and preview the CSV file in the stage.

In [9]:
df = session.sql(
    """SELECT $1::VARCHAR(60) AS CAMPAIGN, 
              $2::VARCHAR(60) AS CHANNEL, 
              $3::DATE AS DATE, 
              $4::NUMBER(38, 0) AS TOTAL_CLICKS, 
              $5::NUMBER(38, 0) AS TOTAL_COST, 
              $6::NUMBER(38, 0) AS ADS_SERVED 
       FROM @CAMPAIGN_DATA_STAGE""")

df.show()

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

Write this DataFrame to a Snowflake table named `CAMPAIGN_SPEND`.

In [10]:
df.write.save_as_table("CAMPAIGN_SPEND", mode="overwrite")

In [6]:
snow_df_spend = session.table('campaign_spend')
snow_df_spend.queries

{'queries': ['SELECT  *  FROM (campaign_spend)'], 'post_actions': []}

In [7]:
# Action sends the DF SQL for execution
# Note: history object provides the query ID which can be helpful for debugging as well as the SQL query executed on the server
with session.query_history() as history:
    snow_df_spend.show(20)
history.queries

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1762          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |678           |157           |
|building_community      |search_engine  |2012-06-03  |66              |471           |134           |
|world_series            |social_media   |2017-12-28  |72              |591           |149           |
|winter_sports           |email          |2018-02-09  |252             |1841          |473           |
|spring_break            |video          |2017-11-14  |162             |1155          |304           |
|nba_finals              |email          |2017-11-22  |68              |4

[QueryRecord(query_id='01aeb816-0000-0f1b-0000-000013319a25', sql_text='SELECT  *  FROM campaign_spend LIMIT 20')]

### Total Spend per Channel per Month
Let's transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions.

In [8]:
# Stats per Month per Channel
snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
    with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

snow_df_spend_per_channel.show(10)

---------------------------------------------------
|"YEAR"  |"MONTH"  |"CHANNEL"      |"TOTAL_COST"  |
---------------------------------------------------
|2012    |5        |search_engine  |516431        |
|2012    |5        |video          |516729        |
|2012    |5        |email          |517208        |
|2012    |5        |social_media   |517618        |
|2012    |6        |video          |501098        |
|2012    |6        |search_engine  |506497        |
|2012    |6        |social_media   |504679        |
|2012    |6        |email          |501947        |
|2012    |7        |search_engine  |522780        |
|2012    |7        |email          |518405        |
---------------------------------------------------



### Pivot on Channel
Let's further transform the campaign spend data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions. This transformation will enable us to join with the revenue table such that we will have our input features and target variable in a single table for model training.

In [9]:
snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
snow_df_spend_per_month = snow_df_spend_per_month.select(
    col("YEAR"),
    col("MONTH"),
    col("'search_engine'").as_("SEARCH_ENGINE"),
    col("'social_media'").as_("SOCIAL_MEDIA"),
    col("'video'").as_("VIDEO"),
    col("'email'").as_("EMAIL")
)
snow_df_spend_per_month.show()

---------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |
---------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |
|2012    |6        |506497           |504679          |501098   |501947   |
|2012    |7        |522780           |521395          |522762   |518405   |
|2012    |8        |519959           |520537          |520685   |521584   |
|2012    |9        |507211           |507404          |511364   |507363   |
|2012    |10       |518942           |520863          |522768   |519950   |
|2012    |11       |505715           |505221          |505292   |503748   |
|2012    |12       |520148           |520711          |521427   |520724   |
|2013    |1        |522151           |518635          |520583   |521167   |
|2013    |2        |467736           |474679          |469856   |469784   |
------------

### Total Revenue per Month
Now let's load revenue table and transform the data into revenue per year/month using group_by and agg() functions.

In [16]:
df = session.sql(
    """SELECT $1::NUMBER(38, 0) AS YEAR, 
              $2::NUMBER(38, 0) AS MONTH, 
              $3::FLOAT AS REVENUE
       FROM @MONTHLY_REVENUE_DATA_STAGE""")

df.write.save_as_table("MONTHLY_REVENUE", mode="overwrite")

In [10]:
snow_df_revenue = session.table('monthly_revenue')
snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')
snow_df_revenue_per_month.show()

---------------------------------
|"YEAR"  |"MONTH"  |"REVENUE"   |
---------------------------------
|2012    |5        |3264300.11  |
|2012    |6        |3208482.33  |
|2012    |7        |3311966.98  |
|2012    |8        |3311752.81  |
|2012    |9        |3208563.06  |
|2012    |10       |3334028.46  |
|2012    |11       |3185894.64  |
|2012    |12       |3334570.96  |
|2013    |1        |3316455.44  |
|2013    |2        |2995042.21  |
---------------------------------



### Join Total Spend and Total Revenue per Month
Next let's **join this revenue data with the transformed campaign spend data** so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training.

In [11]:
snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])
snow_df_spend_and_revenue_per_month.show()

----------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
----------------------------------------------------------------------------------------
|2012    |5        |516431           |517618          |516729   |517208   |3264300.11  |
|2012    |6        |506497           |504679          |501098   |501947   |3208482.33  |
|2012    |7        |522780           |521395          |522762   |518405   |3311966.98  |
|2012    |8        |519959           |520537          |520685   |521584   |3311752.81  |
|2012    |9        |507211           |507404          |511364   |507363   |3208563.06  |
|2012    |10       |518942           |520863          |522768   |519950   |3334028.46  |
|2012    |11       |505715           |505221          |505292   |503748   |3185894.64  |
|2012    |12       |520148           |520711          |521427   |520724   |3334570.96  |
|2013    |1        |5

## >>> Examine Snowpark DataFrame Query and Execution Plan <<<
Snowpark makes is really convenient to look at the DataFrame query and execution plan using explain() Snowpark DataFrame function.

In [12]:
snow_df_spend_and_revenue_per_month.explain()

---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT  *  FROM (( SELECT "YEAR" AS "YEAR", "MONTH" AS "MONTH", "SEARCH_ENGINE" AS "SEARCH_ENGINE", "SOCIAL_MEDIA" AS "SOCIAL_MEDIA", "VIDEO" AS "VIDEO", "EMAIL" AS "EMAIL" FROM ( SELECT "YEAR", "MONTH", "'search_engine'" AS "SEARCH_ENGINE", "'social_media'" AS "SOCIAL_MEDIA", "'video'" AS "VIDEO", "'email'" AS "EMAIL" FROM ( SELECT  *  FROM ( SELECT  *  FROM ( SELECT "YEAR(DATE)" AS "YEAR", "MONTH(DATE)" AS "MONTH", "CHANNEL", "TOTAL_COST" FROM ( SELECT year("DATE") AS "YEAR(DATE)", month("DATE") AS "MONTH(DATE)", "CHANNEL", sum("TOTAL_COST") AS "TOTAL_COST" FROM ( SELECT  *  FROM campaign_spend) GROUP BY year("DATE"), month("DATE"), "CHANNEL")) ORDER BY "YEAR" ASC NULLS FIRST, "MONTH" ASC NULLS FIRST) PIVOT (sum("TOTAL_COST") FOR "CHANNEL" IN ('search_engine', 'social_media', 'video', 'email'))) ORDER BY "YEAR" ASC NULLS FIRST, "MONTH" ASC NULLS FIRST)) AS SNOWPARK_LEFT INNER JOIN ( SELECT "YEAR" AS "YEAR", "MONTH" AS "MONTH"

## Model Training in Snowflake
Features and Target
At this point we are ready to perform the following actions to save features and target for model training.

Delete rows with missing values
Exclude columns we don't need for modeling
Save features into a Snowflake table called MARKETING_BUDGETS_FEATURES
TIP: To see how to handle missing values in Snowpark Python, refer to this [blog](https://medium.com/snowflake/handling-missing-values-with-snowpark-for-python-part-1-4af4285d24e6).

In [13]:
# Delete rows with missing values
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

# Exclude columns we don't need for modeling
snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

# Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')
snow_df_spend_and_revenue_per_month.show()

---------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"REVENUE"   |
---------------------------------------------------------------------
|516431           |517618          |516729   |517208   |3264300.11  |
|506497           |504679          |501098   |501947   |3208482.33  |
|522780           |521395          |522762   |518405   |3311966.98  |
|519959           |520537          |520685   |521584   |3311752.81  |
|507211           |507404          |511364   |507363   |3208563.06  |
|518942           |520863          |522768   |519950   |3334028.46  |
|505715           |505221          |505292   |503748   |3185894.64  |
|520148           |520711          |521427   |520724   |3334570.96  |
|522151           |518635          |520583   |521167   |3316455.44  |
|467736           |474679          |469856   |469784   |2995042.21  |
---------------------------------------------------------------------



## Python function to train a Linear Regression model using scikit-learn
Let's create a Python function that uses scikit-learn and other packages which are already included in Snowflake Anaconda channel and therefore available on the server-side when executing the Python function as a Stored Procedure running in Snowflake.

This function takes the following as parameters:
* session: Snowflake Session object.
* features_table: Name of the table that holds the features and target variable.
* number_of_folds: Number of cross validation folds used in GridSearchCV.
* polynomial_features_degress: PolynomialFeatures as a preprocessing step.
* train_accuracy_threshold: Accuracy thresholds for train dataset. This values is used to determine if the model should be saved.
* test_accuracy_threshold: Accuracy thresholds for test dataset. This values is used to determine if the model should be saved.
* save_model: Boolean that determines if the model should be saved provided the accuracy thresholds are met.

TIP: For large datasets, Snowflake offers [Snowpark-optimized Warehouses](https://docs.snowflake.com/en/user-guide/warehouses-snowpark-optimized.html) which are in Public Preview as of Nov 2022.

In [24]:
def train_revenue_prediction_model(
    session: Session, 
    features_table: str, 
    number_of_folds: int, 
    polynomial_features_degrees: int, 
    train_accuracy_threshold: float, 
    test_accuracy_threshold: float, 
    save_model: bool) -> Variant:
    
    import os

    from joblib import dump
    from sklearn.compose import ColumnTransformer
    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import GridSearchCV, train_test_split
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import PolynomialFeatures, StandardScaler

    # Load features
    df = session.table(features_table).to_pandas()

    # Preprocess the Numeric columns
    # We apply PolynomialFeatures and StandardScaler preprocessing steps to the numeric columns
    # NOTE: High degrees can cause overfitting.
    numeric_features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    numeric_transformer = Pipeline(steps=[('poly',PolynomialFeatures(degree = polynomial_features_degrees)),('scaler', StandardScaler())])

    # Combine the preprocessed step together using the Column Transformer module
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numeric_transformer, numeric_features)])

    # The next step is the integrate the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
    pipeline = Pipeline(steps=[('preprocessor', preprocessor),('classifier', LinearRegression())])
    parameteres = {}

    X = df.drop('REVENUE', axis = 1)
    y = df['REVENUE']

    # Split dataset into training and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 42)

    # Use GridSearch to find the best fitting model based on number_of_folds folds
    model = GridSearchCV(pipeline, param_grid=parameteres, cv=number_of_folds)

    model.fit(X_train, y_train)
    train_r2_score = model.score(X_train, y_train)
    test_r2_score = model.score(X_test, y_test)

    model_saved = False
    pipeline

    if save_model:
        if train_r2_score >= train_accuracy_threshold and test_r2_score >= test_accuracy_threshold:
            # Upload trained model to a stage
            model_output_dir = '/tmp'
            model_file = os.path.join(model_output_dir, 'model.joblib')
            dump(model, model_file)
            session.file.put(model_file,"@PYTHON_MODELS",overwrite=True)
            model_saved = True

    # Return model R2 score on train and test data
    return {"R2 score on Train": train_r2_score,
            "R2 threshold on Train": train_accuracy_threshold,
            "R2 score on Test": test_r2_score,
            "R2 threshold on Test": test_accuracy_threshold,
            "Model saved": model_saved}

### Test Python function before deploying it as a Stored Procedure on Snowflake
Since we're in test mode, we will set save_model = False so that the model is not saved just yet.

In [25]:
cross_validaton_folds = 10
polynomial_features_degrees = 2
train_accuracy_threshold = 0.85
test_accuracy_threshold = 0.85
save_model = False

train_revenue_prediction_model(
    session,
    "MARKETING_BUDGETS_FEATURES",
    cross_validaton_folds,
    polynomial_features_degrees,
    train_accuracy_threshold,
    test_accuracy_threshold,
    save_model)

{'R2 score on Train': 0.9954552822793987,
 'R2 threshold on Train': 0.85,
 'R2 score on Test': 0.8817971097765084,
 'R2 threshold on Test': 0.85,
 'Model saved': False}

### Create Stored Procedure to deploy model training code on Snowflake
Assuming the testing is complete and we're satisfied with the model, let's register the model training Python function as a Snowpark Python Stored Procedure by supplying the packages (snowflake-snowpark-python,scikit-learn, and joblib) it will need and use during execution.

TIP: For more information on Snowpark Python Stored Procedures, refer to the [docs](https://docs.snowflake.com/developer-guide/stored-procedure/stored-procedures-python).

In [33]:
session.sproc.register(
    func=train_revenue_prediction_model,
    name="train_revenue_prediction_model",
    packages=['snowflake-snowpark-python','scikit-learn==1.1.1','joblib'],
    is_permanent=True,
    stage_location="@PYTHON_CODE",
    replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x7fb4989458e0>

### >>> Examine Query History in Snowsight <<<
### Execute Stored Procedure to train model and deploy it on Snowflake
Now we're ready to train the model and save it onto a Snowflake stage so let's set save_model = True and run/execute the Stored Procedure using session.call() function.

In [34]:
cross_validaton_folds = 10
polynomial_features_degrees = 2
train_accuracy_threshold = 0.85
test_accuracy_threshold = 0.85
save_model = True

print(session.call('train_revenue_prediction_model',
                    'MARKETING_BUDGETS_FEATURES',
                    cross_validaton_folds,
                    polynomial_features_degrees,
                    train_accuracy_threshold,
                    test_accuracy_threshold,
                    save_model))

{
  "Model saved": true,
  "R2 score on Test": 0.8817971097765288,
  "R2 score on Train": 0.9954552822793986,
  "R2 threshold on Test": 0.85,
  "R2 threshold on Train": 0.85
}


### >>> Examine Query History in Snowsight <<<
### Create Scalar User-Defined Function (UDF) for inference
Now to deploy this model for inference, let's create and register a Snowpark Python UDF and add the trained model as a dependency. Once registered, getting new predictions is as simple as calling the function by passing in data.

NOTE: Scalar UDFs operate on a single row / set of data points and are great for online inference in real-time. And this UDF is called from the Streamlit App. See Snowpark_Streamlit_Revenue_Prediction.py

TIP: For more information on Snowpark Python User-Defined Functions, refer to the docs.

In [35]:
session.clear_imports()
session.clear_packages()

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import('@PYTHON_MODELS/model.joblib.gz')
session.add_packages('pandas','joblib','scikit-learn==1.1.1')

@udf(name='predict_roi',session=session,replace=True,is_permanent=True,stage_location='@PYTHON_CODE')
def predict_roi(budget_allocations: list) -> float:
    import sys

    import pandas as pd
    import sklearn
    from joblib import load

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    model_file = import_dir + 'model.joblib.gz'
    model = load(model_file)
            
    features = ['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL']
    df = pd.DataFrame([budget_allocations], columns=features)
    roi = abs(model.predict(df)[0])
    return roi

### Call Scalar User-Defined Function (UDF) for inference on new data
Once the UDF is registered, getting new predictions is as simple as calling the call_udf() Snowpark Python function and passing in new datapoints.

Let's create a SnowPark DataFrame with some sample data and call the UDF to get new predictions.

NOTE: This UDF is also called from the Streamlit App. See Snowpark_Streamlit_Revenue_Prediction.py

In [36]:
test_df = session.create_dataframe([[250000,250000,200000,450000],[500000,500000,500000,500000],[8500,9500,2000,500]], 
                                    schema=['SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL'])
test_df.select(
    'SEARCH_ENGINE','SOCIAL_MEDIA','VIDEO','EMAIL', 
    call_udf("predict_roi", 
    array_construct(col("SEARCH_ENGINE"), col("SOCIAL_MEDIA"), col("VIDEO"), col("EMAIL"))).as_("PREDICTED_ROI")).show()

-----------------------------------------------------------------------------
|"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"PREDICTED_ROI"     |
-----------------------------------------------------------------------------
|250000           |250000          |200000   |450000   |4072491.441724832   |
|500000           |500000          |500000   |500000   |3179613.166194174   |
|8500             |9500            |2000     |500      |189866.83304576762  |
-----------------------------------------------------------------------------



**Snowpark Stored Procedures vs User-Defined Functions**

In general, if you're processing a large dataset in a way where each row/batch can be processed independently - UDFs are always better, because the processing is automatically parallelized/scaled across the warehouse. For example, if you already have a trained ML model, and you're doing inference using that model on billions of rows. In that case, each row/batch can be computed independently.

If the use case requires the full dataset to be in-memory (e.g. ML training), then a stored procedure is the way to go. A stored procedure is just a Python program that runs on a single warehouse node. (With a UDF it's not possible to load the full dataset into memory because the processing is done in a streaming fashion, one batch at a time.

### Automate Data Pipeline and Model (re)Training using Snowflake Tasks
We can also optionally create Snowflake (Serverless or User-managed) Tasks to automate data pipelining and (re)training of the model on a set schedule.

NOTE: Creating tasks using Snowpark Python API (instead of SQL) is on the roadmap. Stay tuned! Or, follow me on [Twitter]((https://twitter.com/iamontheinet)) to get the news before anyone else :)_

TIP: Amongst other things, you can also configure tasks for error notification (currently in Private Preview) using cloud messaging service such as Amazon Simple Notification Service (SNS). For more information on Snowflake Tasks, refer to the docs.

#### Create Python Function for Data Pipeline and Feature Engineering

In [37]:
def data_pipeline_feature_engineering(session: Session) -> str:

  # DATA TRANSFORMATIONS
  # Perform the following actions to transform the data

  # Load the campaign spend data
  snow_df_spend = session.table('campaign_spend')

  # Transform the data so we can see total cost per year/month per channel using group_by() and agg() Snowpark DataFrame functions
  snow_df_spend_per_channel = snow_df_spend.group_by(year('DATE'), month('DATE'),'CHANNEL').agg(sum('TOTAL_COST').as_('TOTAL_COST')).\
      with_column_renamed('"YEAR(DATE)"',"YEAR").with_column_renamed('"MONTH(DATE)"',"MONTH").sort('YEAR','MONTH')

  # Transform the data so that each row will represent total cost across all channels per year/month using pivot() and sum() Snowpark DataFrame functions
  snow_df_spend_per_month = snow_df_spend_per_channel.pivot('CHANNEL',['search_engine','social_media','video','email']).sum('TOTAL_COST').sort('YEAR','MONTH')
  snow_df_spend_per_month = snow_df_spend_per_month.select(
      col("YEAR"),
      col("MONTH"),
      col("'search_engine'").as_("SEARCH_ENGINE"),
      col("'social_media'").as_("SOCIAL_MEDIA"),
      col("'video'").as_("VIDEO"),
      col("'email'").as_("EMAIL")
  )

  # Load revenue table and transform the data into revenue per year/month using group_by and agg() functions
  snow_df_revenue = session.table('monthly_revenue')
  snow_df_revenue_per_month = snow_df_revenue.group_by('YEAR','MONTH').agg(sum('REVENUE')).sort('YEAR','MONTH').with_column_renamed('SUM(REVENUE)','REVENUE')

  # Join revenue data with the transformed campaign spend data so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training
  snow_df_spend_and_revenue_per_month = snow_df_spend_per_month.join(snow_df_revenue_per_month, ["YEAR","MONTH"])

  # SAVE FEATURES And TARGET
  # Perform the following actions to save features and target for model training

  # Delete rows with missing values
  snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.dropna()

  # Exclude columns we don't need for modeling
  snow_df_spend_and_revenue_per_month = snow_df_spend_and_revenue_per_month.drop(['YEAR','MONTH'])

  # Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
  snow_df_spend_and_revenue_per_month.write.mode('overwrite').save_as_table('MARKETING_BUDGETS_FEATURES')

  return "SUCCESS"

#### Create Stored Procedure to deploy data pipelining feature engineeering code on Snowflake
TIP: For more information on Snowpark Python Stored Procedures, refer to the docs.

In [38]:
session.sproc.register(
    func=data_pipeline_feature_engineering,
    name="data_pipeline_feature_engineering",
    packages=['snowflake-snowpark-python'],
    is_permanent=True,
    stage_location="@PYTHON_CODE",
    replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x7fb4992f8a90>

#### Execute Stored Procedure to deploy data pipelining feature engineeering code on Snowflake

In [39]:
print(session.call('data_pipeline_feature_engineering'))

SUCCESS


#### Create Root/Parent Snowflake Task: Data pipelining and feature engineeering

In [41]:
create_data_pipeline_feature_engineering_task = """
CREATE OR REPLACE TASK data_pipeline_feature_engineering_task
    WAREHOUSE = 'COMPUTE_WH'
    SCHEDULE  = '1 MINUTE'
AS
    CALL data_pipeline_feature_engineering()
"""
session.sql(create_data_pipeline_feature_engineering_task).collect()

[Row(status='Task DATA_PIPELINE_FEATURE_ENGINEERING_TASK successfully created.')]

#### Create Child/Dependent Snowflake Task: Model training on Snowflake

In [43]:
create_model_training_task = """
CREATE OR REPLACE TASK model_training_task
    WAREHOUSE = 'COMPUTE_WH'
    AFTER data_pipeline_feature_engineering_task
AS
    CALL train_revenue_prediction_model('MARKETING_BUDGETS_FEATURES',10,2,0.85,0.85,True)
"""
session.sql(create_model_training_task).collect()

[Row(status='Task MODEL_TRAINING_TASK successfully created.')]

#### Resume Tasks

In [44]:
session.sql("alter task model_training_task resume").collect()
session.sql("alter task data_pipeline_feature_engineering_task resume").collect()

[Row(status='Statement executed successfully.')]

#### Cleanup Resources

In [45]:
session.sql("alter task data_pipeline_feature_engineering_task suspend").collect()
session.sql("alter task model_training_task suspend").collect()

[Row(status='Statement executed successfully.')]

In [51]:

df1 = session.create_dataframe([[1, 2, 3, 4]], schema=["a", "b", "c", "d"])
df1.show()

-------------------------
|"A"  |"B"  |"C"  |"D"  |
-------------------------
|1    |2    |3    |4    |
-------------------------

