# Snowpark For Python -- Advertising Spend and ROI Prediction

### Objective

In this session, we will train a Linear Regression model to predict future ROI (Return On Investment) of variable ad spend budgets across multiple channels including search, video, social media, and email using Snowpark for Python and scikit-learn. 

In this Notebook, we will:

* Create a Session object and securely connect to Snowflake
* Load data from Snowflake table into Snowpark DataFrame
* Perform Exploratory Data Analysis (EDA) on Snowpark DataFrame
* Pivot and Join datasets
* Create a Python Stored Procedure to deploy model training code on Snowflake
* Create Python Scalar and Vectorized User-Defined Functions (UDF) for inference on new data
* Create a Snowflake Task to automate (re)training of the model


### Instructions

This workshop is meant to be hands-on and we've built in exercises throughout the notebook. Whenever you see **YOUR TURN**, this is your queue to complete the following exercise. If you get stuck, the solutions can be found in the *Snowpark_For_Python_Solution.ipynb* notebook in the same repository.

Additionally, when we create the **stored procedure**, **UDF**, and **Task** for model training and scoring, you'll need to add your initials or a unique identifier to the name. This is because we are using the same database and schema and don't want your functions overwriting eachother. There are reminders below as well when you get to that part of the code.

### Prerequisites

  - Log into snowflake account and switch to ACCOUNTADMIN role
    - Click on the **Billing** on the left side panel
    - Click on [Terms and Billing](https://app.snowflake.com/terms-and-billing)
    - Read and accept terms to continue with the workshop
  - Python 3.8
  - Create and Activate Conda Environment (OR, use any other Python environment with Python 3.8) 
    - conda create --name snowpark -c https://repo.anaconda.com/pkgs/snowflake python=3.8
    - conda activate snowpark
  - Install Snowpark for Python, Streamlit and other libraries in Conda environment
    - conda install -c https://repo.anaconda.com/pkgs/snowflake snowflake-snowpark-python pandas notebook scikit-learn cachetools nbformat plotly
  - Update [connection.json](connection.json) with your Snowflake account details and credentials

<div style='text-align: center'>
    <img src="assets/snowpark.png" alt="Snowpark" style="width: 75%;"/>
</div>

### Import Libraries

In [51]:
# Snowpark for Python
from snowflake.snowpark.session import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark.version import VERSION
from snowflake.snowpark.functions import col

# Misc
import json
import sys
import logging

import numpy as np
import plotly.express as px

logger = logging.getLogger("snowflake.snowpark.session")
logger.setLevel(logging.ERROR)

### Establish Secure Connection to Snowflake

Using the Snowpark API, it’s quick and easy to establish a secure connection between Snowflake and Notebook.

 *Connection options: Username/Password, MFA, OAuth, Okta, SSO*

In [42]:
#Test Code
# Create Snowflake Session object
connection_parameters = json.load(open('connection.json'))
session = Session.builder.configs(connection_parameters).create()
#session.sql_simplifier_enabled = True

#session.sql('use role snowpark_workshop_role').collect()
session.sql('use warehouse wh_snowpark_hol').collect()
#session.sql('use database DB_SNOWPARK_HOL').collect()
#session.sql('use SCHEMA ROI_PREDICTION').collect()

snowflake_environment = session.sql(
    '''select
        current_user()
        ,current_role()
        ,current_database()
        ,current_schema()
        ,current_version()
        ,current_warehouse()'''
    ).collect()

snowpark_version = VERSION

# Current Environment Details
print(f'User: {snowflake_environment[0][0]}')
print(f'Role: {snowflake_environment[0][1]}')
print(f"Database: {snowflake_environment[0][2]}")
print(f'Snowflake version: {snowflake_environment[0][4]}')
print(f"Schema: {snowflake_environment[0][3]}")
print(f"Warehouse: {snowflake_environment[0][5]}")
print(f'Snowpark for Python version: {snowpark_version[0]}.{snowpark_version[1]}.{snowpark_version[2]}')



User: JOHN
Role: SNOWPARK_WORKSHOP_ROLE
Database: DB_SNOWPARK_HOL
Snowflake version: 7.33.1
Schema: ROI_PREDICTION
Warehouse: WH_SNOWPARK_HOL
Snowpark for Python version: 1.4.0


### Load Aggregated Campaign Spend Data from Snowflake table into Snowpark DataFrame

Let's first load the campaign spend data. This table contains ad click data that has been aggregated to show daily spend across digital ad channels including search engines, social media, email and video.

NOTE: Ways to load data in a Snowpark Dataframe
* ```session.table("db.schema.table")```
* ```session.sql("select col1, col2... from tableName")```
* ```session.read.parquet("@stageName/path/to/file")```
* ```session.create_dataframe([1,2,3], schema=["col1"])```

TIP: For more information on Snowpark DataFrames, refer to the [docs](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.html#snowflake.snowpark.DataFrame).


In [52]:
snow_df_spend = session.table('CAMPAIGN_SPEND')
snow_df_spend.sort(col("DATE").desc()).limit(1).show()


-------------------------------------------------------------------------------------------
|"CAMPAIGN"       |"CHANNEL"  |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
-------------------------------------------------------------------------------------------
|youth_on_course  |email      |2022-05-12  |89              |653           |176           |
-------------------------------------------------------------------------------------------



In [45]:
snow_df_spend.order_by(cols("date").desc()).show(1)

AttributeError: 'str' object has no attribute 'desc'

<div style='text-align: center'>
    <img src="assets/snowpark_python_api.png" alt="Snowpark" style="width: 75%;"/>
</div>

In [6]:
# Action sends the DF SQL for execution
# Note: history object provides the query ID which can be helpful for debugging as well as the SQL query executed on the server
with session.query_history() as history:
    snow_df_spend.show(20)

history.queries

------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |
------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1205          |426           |
|sports_across_cultures  |video          |2012-06-02  |87              |454           |157           |
|building_community      |search_engine  |2012-06-03  |66              |354           |134           |
|world_series            |social_media   |2017-12-28  |72              |323           |149           |
|winter_sports           |email          |2018-02-09  |252             |1706          |473           |
|spring_break            |video          |2017-11-14  |162             |852           |304           |
|nba_finals              |email          |2017-11-22  |68              |4

[QueryRecord(query_id='01af405a-0604-bf7d-003d-7a870090fbfa', sql_text='SELECT  *  FROM CAMPAIGN_SPEND LIMIT 20')]

### Pandas vs Snowpark DataFrame memory comparison

Snowpark and Pandas DataFrame are similar but do have some notable differences.

One of the biggest differences is the amount of local memory consumed. Run the cell block below to see firsthand.

In [7]:
# get the size in MB of a dataframe
def get_df_memory_size(df) -> float:
    return np.round(sys.getsizeof(df) / (1024.0**2), 2)

pandas_df_spend = snow_df_spend.to_pandas()

print(f'Size in MB of Snowpark DataFrame in Memory: {get_df_memory_size(snow_df_spend)}')
print(f'Size in MB of Pandas DataFrame in Memory: {get_df_memory_size(pandas_df_spend)}')

Size in MB of Snowpark DataFrame in Memory: 0.0
Size in MB of Pandas DataFrame in Memory: 50.88


The Snowpark DataFrame consumes **zero** in memory MB - it is stored entirely within Snowflake. Whereas **the entire** Pandas DataFrame is stored in local memory

*Note: the ```.to_pandas()``` method is called to convert any Snowpark DataFrame to a Pandas DataFrame -- we'll use this method multiple times throughout the rest of the workshop*

### Creating new columns
We can add new columns to our Snowpark DataFrame using the ```.with_column()``` method.

Here, we are adding two new columns: *MONTH* and *YEAR* which represent the month and year an a campaign was run

In [8]:
snow_df_spend=snow_df_spend.with_column("YEAR", F.year(snow_df_spend["DATE"]))
snow_df_spend=snow_df_spend.with_column("MONTH", F.month(snow_df_spend["DATE"]))

snow_df_spend.show()

-------------------------------------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"ADS_SERVED"  |"YEAR"  |"MONTH"  |
-------------------------------------------------------------------------------------------------------------------------
|winter_sports           |video          |2012-06-03  |213             |1205          |426           |2012    |6        |
|sports_across_cultures  |video          |2012-06-02  |87              |454           |157           |2012    |6        |
|building_community      |search_engine  |2012-06-03  |66              |354           |134           |2012    |6        |
|world_series            |social_media   |2017-12-28  |72              |323           |149           |2017    |12       |
|winter_sports           |email          |2018-02-09  |252             |1706          |473           |2018    |2        |
|spring_break           

**YOUR TURN**: add a new column called *COST_PER_CLICK* which is the result of *TOTAL_COST / TOTAL_CLICKS*

*Expected output*:
```
    ------------------------------------------------------------------------------------------
    |"CAMPAIGN"              |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"COST_PER_CLICK"  |
    ------------------------------------------------------------------------------------------
    |winter_sports           |2012-06-03  |213             |1762          |8.272300          |
    |sports_across_cultures  |2012-06-02  |87              |678           |7.793103          |
    |building_community      |2012-06-03  |66              |471           |7.136364          |
    |world_series            |2017-12-28  |72              |591           |8.208333          |
    |winter_sports           |2018-02-09  |252             |1841          |7.305556          |
    ------------------------------------------------------------------------------------------
```

In [9]:
snow_df_spend=snow_df_spend.with_column("COST_PER_CLICK", snow_df_spend["TOTAL_COST"]/snow_df_spend["TOTAL_CLICKS"])
snow_df_spend.select("CAMPAIGN","DATE","TOTAL_CLICKS","TOTAL_COST","COST_PER_CLICK").show(5)

------------------------------------------------------------------------------------------
|"CAMPAIGN"              |"DATE"      |"TOTAL_CLICKS"  |"TOTAL_COST"  |"COST_PER_CLICK"  |
------------------------------------------------------------------------------------------
|winter_sports           |2012-06-03  |213             |1205          |5.657277          |
|sports_across_cultures  |2012-06-02  |87              |454           |5.218391          |
|building_community      |2012-06-03  |66              |354           |5.363636          |
|world_series            |2017-12-28  |72              |323           |4.486111          |
|winter_sports           |2018-02-09  |252             |1706          |6.769841          |
------------------------------------------------------------------------------------------



We can also create new columns using *if-then-else* logic by using the ```.iff()``` method.

Suppose we want to flag a campain as either being focused on **sports** or something else. We can can do so by running the code block below

In [10]:
sports_campaigns=[
    'winter_sports', 'sports_across_cultures',
    'world_series', 'nba_finals', 'uefa',
    'thanksgiving_football', 'women_in_sports',
    'winter_olympics', 'stanley_cup', 'world_cup',
    'super_bowl', 'summer_olympics'
]

snow_df_spend=snow_df_spend.with_column("IS_SPORTS", F.iff(snow_df_spend["CAMPAIGN"].isin(sports_campaigns), 1, 0))
snow_df_spend.select("CAMPAIGN","CHANNEL","IS_SPORTS").show(10)

--------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"IS_SPORTS"  |
--------------------------------------------------------
|winter_sports           |video          |1            |
|sports_across_cultures  |video          |1            |
|building_community      |search_engine  |0            |
|world_series            |social_media   |1            |
|winter_sports           |email          |1            |
|spring_break            |video          |0            |
|nba_finals              |email          |1            |
|winter_sports           |social_media   |1            |
|spring_break            |search_engine  |0            |
|uefa                    |video          |1            |
--------------------------------------------------------



Snowpark supports enhancing DataFrames in a number of different ways. The code block below demonstrates using the ```.when()``` method to add a column indicating the season in which an ad was run.

*Note: this is synonomous with a ```CASE-WHEN``` expression in SQL*

In [11]:
seasons=(
    F.when(snow_df_spend["MONTH"].isin(12, 1, 2),"winter")
    .when(snow_df_spend["MONTH"].isin(3, 4, 5),"spring")
    .when(snow_df_spend["MONTH"].isin(6, 7, 8),"summer")
    .when(snow_df_spend["MONTH"].isin(9, 10, 11),"fall")
)

snow_df_spend=snow_df_spend.with_column("SEASON", seasons)
snow_df_spend.select("CAMPAIGN","DATE","SEASON").show(12)

--------------------------------------------------
|"CAMPAIGN"              |"DATE"      |"SEASON"  |
--------------------------------------------------
|winter_sports           |2012-06-03  |summer    |
|sports_across_cultures  |2012-06-02  |summer    |
|building_community      |2012-06-03  |summer    |
|world_series            |2017-12-28  |winter    |
|winter_sports           |2018-02-09  |winter    |
|spring_break            |2017-11-14  |fall      |
|nba_finals              |2017-11-22  |fall      |
|winter_sports           |2018-03-10  |spring    |
|spring_break            |2017-08-30  |summer    |
|uefa                    |2017-09-30  |fall      |
|uefa                    |2018-01-23  |winter    |
|sports_across_cultures  |2017-10-12  |fall      |
--------------------------------------------------



**YOUR TURN**: suppose there was a reporting error in the winter season and the total cost of each ad was accidentally entered 10% too high.

Use the `.iff()` and `.with_column()` methods to overwrite the TOTAL_COST column so that is 10% lower for any ad run in the winter season.

_Expected output:_
```
    --------------------------------------------------------------------------
    |"CAMPAIGN"              |"CHANNEL"      |"SEASON"  |"TOTAL_COST"        |
    --------------------------------------------------------------------------
    |winter_sports           |video          |summer    |1205.0              |
    |sports_across_cultures  |video          |summer    |454.0               |
    |building_community      |search_engine  |summer    |354.0               |
    |world_series            |social_media   |winter    |261.63              |
    |winter_sports           |email          |winter    |1381.8600000000001  |
    |spring_break            |video          |fall      |852.0               |
    |nba_finals              |email          |fall      |417.0               |
    |winter_sports           |social_media   |spring    |917.0               |
    |spring_break            |search_engine  |summer    |1170.0              |
    |uefa                    |video          |fall      |543.0               |
    --------------------------------------------------------------------------
```

In [12]:
snow_df_spend=snow_df_spend.with_column("TOTAL_COST", F.iff(F.col("SEASON")=="season", F.col("TOTAL_COST")*0.9, F.col("TOTAL_COST")))
snow_df_spend.select("CAMPAIGN","CHANNEL","SEASON","TOTAL_COST").show(10)

--------------------------------------------------------------------
|"CAMPAIGN"              |"CHANNEL"      |"SEASON"  |"TOTAL_COST"  |
--------------------------------------------------------------------
|winter_sports           |video          |summer    |1205.0        |
|sports_across_cultures  |video          |summer    |454.0         |
|building_community      |search_engine  |summer    |354.0         |
|world_series            |social_media   |winter    |323.0         |
|winter_sports           |email          |winter    |1706.0        |
|spring_break            |video          |fall      |852.0         |
|nba_finals              |email          |fall      |417.0         |
|winter_sports           |social_media   |spring    |917.0         |
|spring_break            |search_engine  |summer    |1170.0        |
|uefa                    |video          |fall      |543.0         |
--------------------------------------------------------------------



### Aggregating Snowpark DataFrames
Now, that we have an idea for what our dataset looks like, let's use different Python and Snowpark to derive useful information from the raw data

First, we'll take a look at the total cost per ad channel

In [13]:
snow_df_spend_per_channel=(
    snow_df_spend
        .group_by("CHANNEL")
        .agg(F.sum("TOTAL_COST").as_("TOTAL_COST"))
)

snow_df_spend_per_channel.show()

--------------------------------
|"CHANNEL"      |"TOTAL_COST"  |
--------------------------------
|video          |48768300.0    |
|search_engine  |58499792.0    |
|social_media   |32057328.0    |
|email          |55729550.0    |
--------------------------------



Additionally, if we convert our Snowpark DataFrames to Pandas, we have the ability to create graphs and visualizations using any standard Python plotting library.

Here we are using the ```plotly.express``` library to create a bar chart of total cost per channel.

In [14]:
df=snow_df_spend_per_channel.to_pandas()
fig=px.bar(df, x="CHANNEL", y="TOTAL_COST")
fig.show()

##### Aggregating multiple columns
Just like with Pandas, Snowpark DataFrames support grouping and aggregating multiple columns.

This first code block creates a DataFrame which contains the total cost, average cost, and number of campaigns run for each channel

TIP: For a full list of functions, refer to the [docs](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.functions.html#module-snowflake.snowpark.functions).

In [15]:
# Stats per Month per Channel
snow_df_annual_spend_per_channel=(
    snow_df_spend
        .group_by(
            "YEAR",
            "CHANNEL"
        )
        .agg(
            F.sum("TOTAL_COST").as_("TOTAL_COST"),
            F.avg("COST_PER_CLICK").as_("AVG_TOTAL_COST_PRE_CLICK")
        )
        .sort("YEAR","CHANNEL")
)
snow_df_annual_spend_per_channel.show()

----------------------------------------------------------------------
|"YEAR"  |"CHANNEL"      |"TOTAL_COST"  |"AVG_TOTAL_COST_PRE_CLICK"  |
----------------------------------------------------------------------
|2012    |email          |3087011.0     |5.879600102857              |
|2012    |search_engine  |3243871.0     |6.158964406939              |
|2012    |social_media   |1772916.0     |3.372041736531              |
|2012    |video          |2704568.0     |5.124234212653              |
|2013    |email          |4920381.0     |6.265200765068              |
|2013    |search_engine  |5139688.0     |6.573069845616              |
|2013    |social_media   |2820923.0     |3.594761708767              |
|2013    |video          |4290952.0     |5.474877393973              |
|2014    |email          |5210519.0     |6.653672479041              |
|2014    |search_engine  |5477174.0     |6.998721637260              |
----------------------------------------------------------------------



In [16]:
df=snow_df_annual_spend_per_channel.to_pandas()

fig=px.line(
    df,
    x="YEAR",
    y="TOTAL_COST",
    color="CHANNEL",
    markers=True
)

fig.show()

**YOUR TURN**: Create a DataFrame which shows the total cost per month per channel. Be sure to include the *SEASON* column in the *group by* expression
  
*Expected output*:
```
    --------------------------------------------------------------
    |"YEAR"  |"MONTH"  |"SEASON"  |"CHANNEL"      |"TOTAL_COST"  |
    --------------------------------------------------------------
    |2012    |5        |spring    |email          |517208        |
    |2012    |5        |spring    |social_media   |517618        |
    |2012    |5        |spring    |video          |516729        |
    |2012    |5        |spring    |search_engine  |516431        |
    |2012    |6        |summer    |video          |501098        |
    |2012    |6        |summer    |search_engine  |506497        |
    |2012    |6        |summer    |social_media   |504679        |
    |2012    |6        |summer    |email          |501947        |
    |2012    |7        |summer    |video          |522762        |
    |2012    |7        |summer    |social_media   |521395        |
    --------------------------------------------------------------
```

In [17]:
snow_df_monthly_spend_per_channel=(
    snow_df_spend
        .group_by(
            "YEAR",
            "MONTH",
            "SEASON",
            "CHANNEL"
        )
        .agg(
            F.sum("TOTAL_COST").as_("TOTAL_COST"),
        )
        .sort("YEAR","MONTH")
)

snow_df_monthly_spend_per_channel.show(10)

--------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEASON"  |"CHANNEL"      |"TOTAL_COST"  |
--------------------------------------------------------------
|2012    |5        |spring    |email          |387879.0      |
|2012    |5        |spring    |social_media   |220327.0      |
|2012    |5        |spring    |video          |338111.0      |
|2012    |5        |spring    |search_engine  |407353.0      |
|2012    |6        |summer    |video          |328777.0      |
|2012    |6        |summer    |search_engine  |398845.0      |
|2012    |6        |summer    |social_media   |217760.0      |
|2012    |6        |summer    |email          |378347.0      |
|2012    |7        |summer    |video          |344932.0      |
|2012    |7        |summer    |social_media   |226811.0      |
--------------------------------------------------------------



### Filtering Snowpark DataFrames
By using the ```.filter()``` method, we can filter Snowpark DataFrames just like in Pandas.

Here we are filtering our data to only include the *video* channel

In [18]:
snow_df_monthly_spend_per_channel.filter(F.col("CHANNEL") == "video").show()

----------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEASON"  |"CHANNEL"  |"TOTAL_COST"  |
----------------------------------------------------------
|2012    |5        |spring    |video      |338111.0      |
|2012    |6        |summer    |video      |328777.0      |
|2012    |7        |summer    |video      |344932.0      |
|2012    |8        |summer    |video      |343487.0      |
|2012    |9        |fall      |video      |334327.0      |
|2012    |10       |fall      |video      |342815.0      |
|2012    |11       |fall      |video      |330795.0      |
|2012    |12       |winter    |video      |341324.0      |
|2013    |1        |winter    |video      |364365.0      |
|2013    |2        |winter    |video      |329701.0      |
----------------------------------------------------------



**YOUR TURN**: Show months where *TOTAL_COST* value was less than $510,000

*Expected output:*
```
    ---------------------------------------------------
    |"YEAR"  |"MONTH"  |"CHANNEL"      |"TOTAL_COST"  |
    ---------------------------------------------------
    |2012    |6        |video          |501098        |
    |2012    |6        |email          |501947        |
    |2012    |6        |search_engine  |506497        |
    |2012    |6        |social_media   |504679        |
    |2012    |9        |email          |507363        |
    |2012    |9        |social_media   |507404        |
    |2012    |9        |search_engine  |507211        |
    |2012    |11       |video          |505292        |
    |2012    |11       |email          |503748        |
    |2012    |11       |search_engine  |505715        |
    ---------------------------------------------------
```

In [19]:
# Filter our dataframe on total cost less than $510,000
dff=snow_df_monthly_spend_per_channel.filter(F.col("TOTAL_COST") < 510000)
dff.show()

--------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEASON"  |"CHANNEL"      |"TOTAL_COST"  |
--------------------------------------------------------------
|2012    |5        |spring    |email          |387879.0      |
|2012    |5        |spring    |social_media   |220327.0      |
|2012    |5        |spring    |video          |338111.0      |
|2012    |5        |spring    |search_engine  |407353.0      |
|2012    |6        |summer    |video          |328777.0      |
|2012    |6        |summer    |search_engine  |398845.0      |
|2012    |6        |summer    |social_media   |217760.0      |
|2012    |6        |summer    |email          |378347.0      |
|2012    |7        |summer    |video          |344932.0      |
|2012    |7        |summer    |social_media   |226811.0      |
--------------------------------------------------------------



**YOUR TURN**: Show months where *TOTAL_COST* was less than $510,000 only for the *social_media* channel

*Hint: multiple logical expressions can be chained together using the ```&``` and ```|``` operators*

*Expected output:*
```
    --------------------------------------------------
    |"YEAR"  |"MONTH"  |"CHANNEL"     |"TOTAL_COST"  |
    --------------------------------------------------
    |2012    |6        |social_media  |504679        |
    |2012    |9        |social_media  |507404        |
    |2012    |11       |social_media  |505221        |
    |2013    |2        |social_media  |474679        |
    |2013    |4        |social_media  |505296        |
    |2013    |6        |social_media  |502155        |
    |2013    |9        |social_media  |503883        |
    |2013    |11       |social_media  |505537        |
    |2014    |2        |social_media  |470203        |
    |2014    |4        |social_media  |501089        |
    --------------------------------------------------
```

In [20]:
# Place an additional filter on the channel column to look at social media only
dff=snow_df_monthly_spend_per_channel.filter(
    (F.col("TOTAL_COST") < 510000) & 
    (F.col("CHANNEL") == "social_media")
)

dff.show()

-------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEASON"  |"CHANNEL"     |"TOTAL_COST"  |
-------------------------------------------------------------
|2012    |5        |spring    |social_media  |220327.0      |
|2012    |6        |summer    |social_media  |217760.0      |
|2012    |7        |summer    |social_media  |226811.0      |
|2012    |8        |summer    |social_media  |223602.0      |
|2012    |9        |fall      |social_media  |219440.0      |
|2012    |10       |fall      |social_media  |224430.0      |
|2012    |11       |fall      |social_media  |217140.0      |
|2012    |12       |winter    |social_media  |223406.0      |
|2013    |1        |winter    |social_media  |238459.0      |
|2013    |2        |winter    |social_media  |217905.0      |
-------------------------------------------------------------



**YOUR TURN**: Create a Snowpark Dataframe which shows the total cost per year but only for video and email channels

_Expected output:_
```
    -------------------------  
    |"YEAR"  |"TOTAL_COST"  |  
    -------------------------  
    |2012    |8233054       |  
    |2013    |12268378      |  
    |2014    |12261513      |  
    |2015    |12263271      |  
    |2016    |12301574      |  
    |2017    |12266399      |  
    |2018    |12261590      |  
    |2019    |12265989      |  
    |2021    |12258125      |  
    |2022    |4425672       |  
    -------------------------  
```

In [40]:
dff=(
    snow_df_monthly_spend_per_channel
        .filter(F.col("CHANNEL").isin(["video","email"]))
        .group_by("YEAR")
        .agg(F.sum("TOTAL_COST").as_("TOTAL_COST"))
)

dff.show()

SnowparkSQLException: (1304): 390114 (08001): Authentication token has expired.  The user must authenticate again.

### Pivot on Channel

 Let's further transform the campaign spend data using the ```.pivot()``` and ```.sum()``` Snowpark functions. This transformation results in a DataFrame where **the sum of each row represents the total cost across all channels** per month.

 Pivoting our DataFrame this way enables us to join with the revenue table (which we will load in the next section) such that we will have our input features and target variable in a single table for model training.

 TIP: For a full list of functions, refer to the [docs](https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/_autosummary/snowflake.snowpark.functions.html#module-snowflake.snowpark.functions).

In [22]:
snow_df_spend_per_month_pivot=(
    snow_df_monthly_spend_per_channel
        .pivot(
            pivot_col="CHANNEL",
            values=["search_engine","social_media","video","email"]
        )
        .sum("TOTAL_COST")
        .sort("YEAR","MONTH")
    )

snow_df_spend_per_month_pivot.show()

----------------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEASON"  |"'search_engine'"  |"'social_media'"  |"'video'"  |"'email'"  |
----------------------------------------------------------------------------------------------
|2012    |5        |spring    |407353.0           |220327.0          |338111.0   |387879.0   |
|2012    |6        |summer    |398845.0           |217760.0          |328777.0   |378347.0   |
|2012    |7        |summer    |412642.0           |226811.0          |344932.0   |386480.0   |
|2012    |8        |summer    |410282.0           |223602.0          |343487.0   |393446.0   |
|2012    |9        |fall      |399291.0           |219440.0          |334327.0   |382305.0   |
|2012    |10       |fall      |408286.0           |224430.0          |342815.0   |389974.0   |
|2012    |11       |fall      |397088.0           |217140.0          |330795.0   |377729.0   |
|2012    |12       |winter    |410084.0           

Notice that the names of our pivoted columns are lower cased and enclosed in single quotes - this will cause some problems later on when we use this data to train our regression model.

We will rename the columns below to make them more "model-friendly"

In [23]:
snow_df_spend_per_month_pivot=(
    snow_df_spend_per_month_pivot        
        .with_column_renamed(F.col("'search_engine'"),"SEARCH_ENGINE")
        .with_column_renamed(F.col("'social_media'"),"SOCIAL_MEDIA")
        .with_column_renamed(F.col("'video'"),"VIDEO")
        .with_column_renamed(F.col("'email'"),"EMAIL")
)

snow_df_spend_per_month_pivot.show()

----------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEASON"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"   |"EMAIL"   |
----------------------------------------------------------------------------------------
|2012    |5        |spring    |407353.0         |220327.0        |338111.0  |387879.0  |
|2012    |6        |summer    |398845.0         |217760.0        |328777.0  |378347.0  |
|2012    |7        |summer    |412642.0         |226811.0        |344932.0  |386480.0  |
|2012    |8        |summer    |410282.0         |223602.0        |343487.0  |393446.0  |
|2012    |9        |fall      |399291.0         |219440.0        |334327.0  |382305.0  |
|2012    |10       |fall      |408286.0         |224430.0        |342815.0  |389974.0  |
|2012    |11       |fall      |397088.0         |217140.0        |330795.0  |377729.0  |
|2012    |12       |winter    |410084.0         |223406.0        |341324.0  |390851.0  |
|2013    |1        |w

### Total Revenue per Month

Now let's load the revenue table

**YOUR TURN**: complete the code block below to load a table from Snowflake called ```monthly_revenue```. Save this table as a Snowpark DataFrame named ```snow_df_monthly_revenue```


*Expected output*:
```
    ---------------------------------
    |"YEAR"  |"MONTH"  |"REVENUE"   |
    ---------------------------------
    |2012    |5        |3264300.11  |
    |2012    |6        |3208482.33  |
    |2012    |7        |3311966.98  |
    |2012    |8        |3311752.81  |
    |2012    |9        |3208563.06  |
    |2012    |10       |3334028.46  |
    |2012    |11       |3185894.64  |
    |2012    |12       |3334570.96  |
    |2013    |1        |3316455.44  |
    |2013    |2        |2995042.21  |
    ---------------------------------
```

In [24]:
snow_df_monthly_revenue=session.table('MONTHLY_REVENUE').sort("YEAR","MONTH")
snow_df_monthly_revenue.show()

-----------------------------------------
|"YEAR"  |"MONTH"  |"REVENUE"           |
-----------------------------------------
|2012    |5        |2480868.0836        |
|2012    |6        |2438446.5708        |
|2012    |7        |2351496.5557999997  |
|2012    |8        |2483814.6075        |
|2012    |9        |2342251.0338        |
|2012    |10       |2367160.2065999997  |
|2012    |11       |2357562.0336        |
|2012    |12       |2500928.2199999997  |
|2013    |1        |2686328.9064        |
|2013    |2        |2425984.1901000002  |
-----------------------------------------



### Join Total Spend and Total Revenue per Month

Joining Snowpark DataFrames works in the same way as Pandas or SQL tables.

Next let's **join this revenue data with the transformed campaign spend data** so that our input features (i.e. cost per channel) and target variable (i.e. revenue) can be loaded into a single table for model training. 

In [25]:
snow_df_monthly_spend_and_revenue=(
    snow_df_spend_per_month_pivot.join(snow_df_monthly_revenue, ["YEAR","MONTH"])
)

snow_df_monthly_spend_and_revenue.show()

-------------------------------------------------------------------------------------------------------------
|"YEAR"  |"MONTH"  |"SEASON"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"   |"EMAIL"   |"REVENUE"           |
-------------------------------------------------------------------------------------------------------------
|2012    |5        |spring    |407353.0         |220327.0        |338111.0  |387879.0  |2480868.0836        |
|2012    |6        |summer    |398845.0         |217760.0        |328777.0  |378347.0  |2438446.5708        |
|2012    |7        |summer    |412642.0         |226811.0        |344932.0  |386480.0  |2351496.5557999997  |
|2012    |8        |summer    |410282.0         |223602.0        |343487.0  |393446.0  |2483814.6075        |
|2012    |9        |fall      |399291.0         |219440.0        |334327.0  |382305.0  |2342251.0338        |
|2012    |10       |fall      |408286.0         |224430.0        |342815.0  |389974.0  |2367160.2065999997  |
|2012    |

## >>>>>>>>>> *CHECKPOINT : PAUSE HERE BEFORE MOVING ON* <<<<<<<<<<

### Model Training in Snowflake 

#### Features and Target

At this point we are ready to perform the following actions to create features and target for model training.

* Delete rows with missing values
* Exclude columns we don't need for modeling
* Save features into a Snowflake table called MARKETING_BUDGETS_FEATURES

TIP: To see how to handle missing values in Snowpark Python, refer to this [blog](https://medium.com/snowflake/handling-missing-values-with-snowpark-for-python-part-1-4af4285d24e6).

In [26]:
inits=input("Enter your initials")

In [27]:
# Delete rows with missing values
snow_df_spend_and_revenue_per_month_clean = snow_df_monthly_spend_and_revenue.dropna()

# Exclude columns we don't need for modeling
snow_df_spend_and_revenue_per_month_clean = snow_df_spend_and_revenue_per_month_clean.drop(['YEAR'])

# Save features into a Snowflake table call MARKETING_BUDGETS_FEATURES
snow_df_spend_and_revenue_per_month_clean.write.mode('overwrite').save_as_table(f'MARKETING_BUDGETS_FEATURES_{inits}')
snow_df_spend_and_revenue_per_month_clean.show()

----------------------------------------------------------------------------------------------------
|"MONTH"  |"SEASON"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"   |"EMAIL"   |"REVENUE"           |
----------------------------------------------------------------------------------------------------
|5        |spring    |407353.0         |220327.0        |338111.0  |387879.0  |2480868.0836        |
|6        |summer    |398845.0         |217760.0        |328777.0  |378347.0  |2438446.5708        |
|7        |summer    |412642.0         |226811.0        |344932.0  |386480.0  |2351496.5557999997  |
|8        |summer    |410282.0         |223602.0        |343487.0  |393446.0  |2483814.6075        |
|9        |fall      |399291.0         |219440.0        |334327.0  |382305.0  |2342251.0338        |
|10       |fall      |408286.0         |224430.0        |342815.0  |389974.0  |2367160.2065999997  |
|11       |fall      |397088.0         |217140.0        |330795.0  |377729.0  |2357562.0336

**YOUR TURN**: How would you save another "copy" of this dataframe to a table to do further analysis?

In [28]:
# Try it using the method above (save_as_table)
snow_df_spend_and_revenue_per_month_clean.write.mode("overwrite").save_as_table(f"MARKETING_BUDGETS_FEATURES_ANALYSIS_{inits}")

#### Python function to train a Linear Regression model using scikit-learn

Let's create a Python function that uses **scikit-learn and other packages which are already included in** [Snowflake Anaconda channel](https://repo.anaconda.com/pkgs/snowflake/) and therefore available on the server-side when executing the Python function as a Stored Procedure running in Snowflake.

This function takes the following as parameters:

* _session_: Snowflake Session object.
* _features_table_: Name of the table that holds the features and target variable.
* _number_of_folds_: Number of cross validation folds used in GridSearchCV.
* _polynomial_features_degress_: PolynomialFeatures as a preprocessing step.
* _train_accuracy_threshold_: Accuracy thresholds for train dataset. This values is used to determine if the model should be saved.
* _test_accuracy_threshold_: Accuracy thresholds for test dataset. This values is used to determine if the model should be saved.
* _save_model_: Boolean that determines if the model should be saved provided the accuracy thresholds are met.

TIP: For large datasets, Snowflake offers [Snowpark-optimized Warehouses](https://docs.snowflake.com/en/user-guide/warehouses-snowpark-optimized.html) or in-memory storage



In [30]:

def train_revenue_prediction_model(
    session: Session, 
    features_table: str, 
    numeric_features: list,
    categorical_features: list,
    number_of_folds: int,
    train_accuracy_threshold: float, 
    test_accuracy_threshold: float, 
    save_model: bool) -> str:
    
    from sklearn.compose import ColumnTransformer
    from sklearn.pipeline import Pipeline
    from sklearn.preprocessing import StandardScaler, OneHotEncoder
    from sklearn.linear_model import LinearRegression
    from sklearn.model_selection import train_test_split, GridSearchCV

    import os
    from joblib import dump

    # Load features
    df = session.table(features_table).to_pandas()

    # Preprocess the Numeric columns
    numeric_transformer = Pipeline(
        steps=[
            ('scaler', StandardScaler())
        ]
    )

    # one hot encode categorical columns
    categorical_transformer=Pipeline(
        steps=[
            ("ohe", OneHotEncoder(handle_unknown="ignore"))
        ]
    )

    # Combine the preprocessed step together using the Column Transformer module
    preprocessor = ColumnTransformer(
        transformers=[
            ("num", numeric_transformer, numeric_features),
            ("cat", categorical_transformer, categorical_features)
        ]
    )

    # The next step is the integrate the features we just preprocessed with our Machine Learning algorithm to enable us to build a model
    pipeline = Pipeline(
        steps=[
            ("preprocessor", preprocessor),
            ("classifier", LinearRegression())
        ]
    )

    # define dataframes for features (X) and target (y)
    X = df.drop('REVENUE', axis = 1)
    y = df['REVENUE']

    # Split dataset into training and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state = 42)

    # Use GridSearch to find the best fitting model based on number_of_folds folds
    model = GridSearchCV(pipeline, param_grid={}, cv=number_of_folds)

    
    model.fit(X_train, y_train)
    
    train_r2_score = model.score(X_train, y_train)
    test_r2_score = model.score(X_test, y_test)

    # create a string of output text to view model results when training/testing
    train_accuracy = f"Train accuracy: {train_r2_score:.2%}; Train accuracy threshold: {train_accuracy_threshold:.2%}"
    test_accuracy = f"Test accuracy: {test_r2_score:.2%}; Test accuracy threshold: {test_accuracy_threshold:.2%}"

    if not save_model:
        save_state = f"Running in test mode...not saving model"
    else:
        if train_r2_score >= train_accuracy_threshold and test_r2_score >= test_accuracy_threshold:
            save_state = "Accuracy is acceptable! Saving model"

            # Upload trained model to a stage
            model_output_dir = '/tmp'
            model_file = os.path.join(model_output_dir, f'model_{inits}.joblib')
            dump(model, model_file)
            session.file.put(model_file, "@demo_models",overwrite=True)
        else:
            save_state = "Accuracy threshold violated...not saving"

    return (
        train_accuracy,
        test_accuracy,
        save_state
    )

#### Test Python function before deploying it as a Stored Procedure on Snowflake

To make sure our model works, we'll first call it before deploying to Snowflake

In [31]:
train_revenue_prediction_model(
    session=session,
    features_table=f"MARKETING_BUDGETS_FEATURES_{inits}",
    numeric_features = ["SEARCH_ENGINE","SOCIAL_MEDIA","VIDEO","EMAIL"],
    categorical_features = ["SEASON"],
    number_of_folds=10,
    train_accuracy_threshold=0.85,
    test_accuracy_threshold=0.85,
    save_model=False
)

('Train accuracy: 94.51%; Train accuracy threshold: 85.00%',
 'Test accuracy: 93.44%; Test accuracy threshold: 85.00%',
 'Running in test mode...not saving model')

### Create Stored Procedure to deploy model training code on Snowflake

Assuming the testing is complete and we're satisfied with the model, let's **register the model training Python function as a Snowpark Python Stored Procedure** by supplying the packages (_snowflake-snowpark-python,scikit-learn, and joblib_) it will need and use during execution.

NOTE: Be sure to name your stored procedure below using your initials or another unique identifier since you are using the same database & schema!

In [32]:
# Be sure to name the stored procedure using your initials or another unique identifier such as "train_revenue_prediction_model_STR"
session.sproc.register(
    func=train_revenue_prediction_model,
    name=f"train_revenue_prediction_model_{inits}",
    packages=['snowflake-snowpark-python','scikit-learn','joblib'],
    is_permanent=True,
    stage_location="@demo_sprocs",
    replace=True)

<snowflake.snowpark.stored_procedure.StoredProcedure at 0x7f8a59723a00>

### Execute Stored Procedure to train model and deploy it on Snowflake

Now we're ready to train the model and save it onto a Snowflake stage so let's set _save_model = True_ and run/execute the Stored Procedure using _session.call()_ function. 

NOTE: Use the same stored procedure name you used previously

In [33]:
# Make sure to update the stored procedure name to what you set above
cross_validaton_folds=10
numeric_features=["SEARCH_ENGINE","SOCIAL_MEDIA","VIDEO","EMAIL"]
categorical_features=["SEASON"]
polynomial_features_degrees=2
train_accuracy_threshold=0.85
test_accuracy_threshold=0.85
save_model=True

session.call(
    f"train_revenue_prediction_model_{inits}",
    f"MARKETING_BUDGETS_FEATURES_{inits}",
    numeric_features,
    categorical_features,
    cross_validaton_folds,
    train_accuracy_threshold,
    test_accuracy_threshold,
    save_model
)

"('Train accuracy: 94.51%; Train accuracy threshold: 85.00%', 'Test accuracy: 93.44%; Test accuracy threshold: 85.00%', 'Accuracy is acceptable! Saving model')"

## >>>>>>>>>> *CHECKPOINT 2 : PAUSE HERE BEFORE MOVING ON* <<<<<<<<<<

### Create Scalar User-Defined Function (UDF) for inference

Now to deploy this model for inference, let's **create and register a Snowpark Python UDF and add the trained model as a dependency**. Once registered, getting new predictions is as simple as calling the function by passing in data.

Scalar UDFs operate on a single row / set of data points and are great for online inference

NOTE: Similar to before with the stored procedure, make sure to add your initials or a unique qualifier to the UDF name below

In [34]:
# Remember to change the UDF name to include your initials or a unique identifier
session.clear_imports()
session.clear_packages()

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import(f'@demo_models/model_{inits}.joblib.gz')
session.add_packages('pandas','joblib','scikit-learn==1.3.0')

@F.udf(name=f'predict_roi_{inits}',session=session,replace=True,is_permanent=True,stage_location='@demo_udfs')
def predict_roi(budget_allocations: list) -> float:

    import sys
    import pandas as pd
    from joblib import load

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]
    
    model_file = import_dir + f'model_{inits}.joblib.gz'
    model = load(model_file)
            
    features = ["SEASON","SEARCH_ENGINE","SOCIAL_MEDIA","VIDEO","EMAIL"]
    df = pd.DataFrame([budget_allocations], columns=features)
    roi = abs(model.predict(df)[0])
    
    return roi

<div style='text-align: center'>
    <img src="assets/snowpark_python_udfs.png" alt="Snowpark" style="width: 75%;"/>
</div>

### Call Scalar User-Defined Function (UDF) for inference on new data

 Once the UDF is registered, getting new predictions is as simple as calling the _call_udf()_ Snowpark Python function and passing in new datapoints.

Let's create a SnowPark DataFrame with some sample data and call the UDF to get new predictions.

In [35]:
# define list of columns and convert to an arry
col_list = ["SEASON","SEARCH_ENGINE","SOCIAL_MEDIA","VIDEO","EMAIL"]
col_array = F.array_construct(*col_list)

new_data=[
    ("winter", 250000, 300000, 200000, 19000)
]
test_df=session.create_dataframe(new_data, schema=col_list)


test_df.select(
   "SEASON","SEARCH_ENGINE","SOCIAL_MEDIA","VIDEO","EMAIL", 
   F.call_udf(f"predict_roi_{inits}", col_array).as_("PREDICTED_ROI")
).show()

---------------------------------------------------------------------------------------
|"SEASON"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"PREDICTED_ROI"    |
---------------------------------------------------------------------------------------
|winter    |250000           |300000          |200000   |19000    |2105377.943999556  |
---------------------------------------------------------------------------------------



### Create Vectorized User-Defined Function (UDF) using Batch API for inference

Here we will leverage the Python UDF Batch API to create a **vectorized** UDF which takes a Pandas Dataframe as input. This means that each call to the UDF receives a set/batch of rows compared to a Scalar UDF which gets one row as input. 

First we will create a helper function _load_model()_ that uses **cachetools** to make sure we only load the model once followed by _batch_predict_roi()_ function that does the inference. 

Vectorized UDFs are great for offline inference in batch mode.

Advantages of using the Batch API over Scalar UDFs:

* The potential for better performance if your Python code operates efficiently on batches of rows
* Less transformation logic required if you are calling into libraries that operate on Pandas DataFrames or Pandas arrays

NOTE: Like before, remember to change the UDF function name to include your initials or a unique identifier below

In [36]:
from generate_batch_data import generate_random_table

generate_random_table(
    session=session, 
    num_records=100000,
    table_name=f"random_model_data_{inits}"
)

generating random data...
saving random data...
100000 random records generated!


In [37]:
# Remember to update the UDF function name below similar to before
session.clear_imports()
session.clear_packages()

import cachetools
from snowflake.snowpark.types import PandasSeries, PandasDataFrame

# Add trained model and Python packages from Snowflake Anaconda channel available on the server-side as UDF dependencies
session.add_import(f'@demo_models/model_{inits}.joblib.gz')
session.add_packages('pandas','joblib','scikit-learn','cachetools')

@cachetools.cached(cache={})
def load_model(filename):
    import joblib
    import sys
    import os

    IMPORT_DIRECTORY_NAME = "snowflake_import_directory"
    import_dir = sys._xoptions[IMPORT_DIRECTORY_NAME]

    if import_dir:
        with open(os.path.join(import_dir, filename), 'rb') as file:
            m = joblib.load(file)
            return m

@F.udf(name=f"batch_predict_roi_{inits}",session=session,replace=True,is_permanent=True,stage_location='@demo_udfs')
def batch_predict_roi(budget_allocations_df: PandasDataFrame[str, int, int, int, int]) -> PandasSeries[float]:

    budget_allocations_df.columns=["SEASON","SEARCH_ENGINE","SOCIAL_MEDIA","VIDEO","EMAIL"]
    model = load_model(f'model_{inits}.joblib.gz')

    return abs(model.predict(budget_allocations_df))

### Call Vectorized User-Defined Function (UDF) using Batch API for inference on new data

When you use the Batch API:

* You do not need to change how you write queries using Python UDFs. All batching is handled by the UDF framework rather than your own code
* NOTE: As with the non-batch / scalar API, there is no guarantee of which instances of your handler code will see which batches of input

In [38]:


batch_df=session.table(f"random_model_data_{inits}")

batch_preds=batch_df.select(
    "SEASON","SEARCH_ENGINE","SOCIAL_MEDIA","VIDEO","EMAIL", 
    F.call_udf(f"batch_predict_roi_{inits}",
    F.col("SEASON"),F.col("SEARCH_ENGINE"), F.col("SOCIAL_MEDIA"), F.col("VIDEO"), F.col("EMAIL")).as_("PREDICTED_ROI"))

batch_preds.show(10)
print(f"Num rows: {batch_preds.count()}")
print(f"Snowpark df size: {get_df_memory_size(batch_preds)}")
print(f"Pandas df size: {get_df_memory_size(batch_preds.to_pandas())}")


----------------------------------------------------------------------------------------
|"SEASON"  |"SEARCH_ENGINE"  |"SOCIAL_MEDIA"  |"VIDEO"  |"EMAIL"  |"PREDICTED_ROI"     |
----------------------------------------------------------------------------------------
|spring    |300441           |291136          |275807   |369444   |1462280.072853227   |
|fall      |292578           |357823          |366325   |363254   |1510180.0163394096  |
|winter    |166847           |104843          |183452   |178618   |976227.9950163714   |
|fall      |285117           |363008          |327769   |326596   |1532758.3663781702  |
|spring    |398292           |301389          |398193   |207101   |2813637.881185057   |
|fall      |298738           |305175          |327172   |371552   |1536313.3216123781  |
|spring    |353479           |299733          |364436   |396998   |1875538.7997542382  |
|fall      |310894           |384607          |335914   |355524   |1671039.9443023582  |
|fall      |283904   

In [39]:
batch_preds.write.mode("overwrite").save_as_table(f"BATCH_PREDICT_{inits}")

**Snowpark Stored Procedures vs User-Defined Functions**

_In general, if you're processing a large dataset in a way where each row/batch can be processed independently - UDFs are always better, because the processing is automatically parallelized/scaled across the warehouse. For example, if you already have a trained ML model, and you're doing inference using that model on billions of rows. In that case, each row/batch can be computed independently._

_If the use case requires the full dataset to be in-memory (e.g. ML training), then a stored procedure is the way to go. A stored procedure is just a Python program that runs on a single warehouse node. (With a UDF it's not possible to load the full dataset into memory because the processing is done in a streaming fashion, one batch at a time._

### Automate Model (re)training using Snowflake Tasks - Example Only

Here is an example of how to create a Snowflake (Serverless or User-managed) Task to automate (re)training of the model. For example, every hour.

In [38]:
#create_model_training_task = """
#CREATE OR REPLACE TASK sp_hourly_model_training_
#   WAREHOUSE = 'YOUR_WH'
#   SCHEDULE  = '60 MINUTE'
#AS
#   CALL train_revenue_prediction_model_('MARKETING_BUDGETS_FEATURES',10,2,0.85,0.85,True)
#"""
#session.sql(create_model_training_task).collect()

#session.sql("alter task sp_hourly_model_training_ resume").collect()

### Other Snowpark Resources

Quick Start Guides

* [Getting Started With Snowpark for Python and Streamlit](https://quickstarts.snowflake.com/guide/getting_started_with_snowpark_for_python_streamlit/index.html?index=..%2F..index#0)

* [Getting Started With Snowpark Python](https://quickstarts.snowflake.com/guide/getting_started_with_snowpark_python/index.html?index=..%2F..index#0)

* [Machine Learning with Snowpark Python](https://quickstarts.snowflake.com/guide/machine_learning_with_snowpark_python/index.html?index=..%2F..index#0)

Videos: Snowpark | A Look Under The Hood 

* [Snowpark API](https://www.youtube.com/watch?v=Me2auWdhlKk)

* [Snowpark User-Defined Functions (UDFs)](https://www.youtube.com/watch?v=-W1aL8XwvkE)

[Blogs on Medium](https://medium.com/snowflake/search?q=Snowpark)

* [Deploy Custom UDFs Using GitHub Actions](https://medium.com/snowflake/deploying-custom-python-packages-from-github-to-snowflake-f0bb396480c7)

* [Snowpark For Python Open Source: How I Contributed And So Can You](https://medium.com/snowflake/snowpark-for-python-open-source-how-i-contributed-and-so-can-you-7eb4baac355f)

[Demos on GitHub](https://github.com/Snowflake-Labs/snowpark-python-demos)

[Snowpark for Python Developer Guide](https://docs.snowflake.com/en/developer-guide/snowpark/python/index.html)
