In [None]:
import streamlit as st;
st.image('https://raw.githubusercontent.com/Snowflake-Labs/sfguide-tasty-bytes-snowpark-101-for-data-science/211360e75995b670b2b25d3204b59b42e48456cd//assets/Banner-3.png')


# Overview

Tasty Bytes is one of the largest food truck networks in the world with localized menu options spread across 30 major cities in 15 countries.
**Tasty Bytes is aiming to achieve 25% YoY sales growth over 5 years.**

As Tasty Bytes Data Scientists, we have been asked to support this goal by helping our food truck drivers more intelligently pick where to park for shifts.
**We want to direct our trucks to locations that are expected to have the highest sales on a given shift. This will maximize our daily revenue across our fleet of trucks.**

To provide this insight, we will use historical shift sales at each location to build a model. This data has been made available to us in Snowflake.
Our model will provide the predicted sales at each location for the upcoming shift.



#### **This is an introduction to Snowpark for Snowflake. We will use Snowpark to:**

- Explore the data
- Perform feature engineering
- Train a model
- Deploy the model in Snowflake

**Why Snowpark?**

- No copies or movement of data
- Maintain governance
- Leverage Snowflake scalable compute
- ...and more!



In [None]:
import streamlit as st;
st.image('https://raw.githubusercontent.com/Snowflake-Labs/sfguide-tasty-bytes-snowpark-101-for-data-science/211360e75995b670b2b25d3204b59b42e48456cd//assets/snowpark_101.png')
st.markdown(f"Let\'s get to know Snowpark. We will see that Snowpark makes it easy for Python users to leverage the Snowflake platform. Bringing these users into the Snowflake platform will foster collaboration and streamline architecture across all users and teams.")



## Import Packages

Just like the Python packages we are importing, we will import the Snowpark modules that we need.

**Value**: Snowflake modules provide efficient ways to work with data and functions in Snowflake.





In [None]:
# Import Python packages
import pandas as pd
import json
import sys
import cachetools
#import os

# Import Streamlit and viz modules
import plotly.express as px
import streamlit as st
import matplotlib
import plotly.io as pio

# Import Snowflake modules
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
from snowflake.snowpark import Window

# Import Snowflake Modeling API
from snowflake.ml.modeling.pipeline import pipeline
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.preprocessing import OneHotEncoder
from snowflake.ml.modeling.linear_model import LinearRegression
from snowflake.ml.modeling.metrics import mean_squared_error, mean_absolute_error, r2_score
from snowflake.ml.registry import Registry

import warnings; warnings.simplefilter('ignore')

print("Imports complete.")

from snowflake.snowpark.context import get_active_session
session = get_active_session()

Imports complete.


## Getting  your own environment

So that you can mess around if you feel like it and not step on each other toes, each attendee today will get a dedicated Schema and a dedicated Virtual Warehouse.

So that you can dynamically name your schema and warehouse we will use a variable as shown below.

In addition we will create a [Zero Copy Clone](https://docs.snowflake.com/user-guide/tables-storage-considerations#label-cloning-tables) of the _Analytics_ schema, so that we get an instant copy of the data without incurring any delay or storage cost. Only data that we will add or modify in the base tables will result in actual extra storage.



**Value**: workload isolation and instant access to full datasets for feature engineering.







In [None]:
#Set this up to be your name as a unique ID for the lab with some python code
MY_ID =  '<firstname>_<lastname>'
MY_WAREHOUSE = 'SNOWPARK_OPTIMIZED_HOL_VWH'
MY_SCHEMA = 'SCHEMA_'+ MY_ID

#create warehouse and use it
#session.sql is a way to direct execute SQL on snowflake, we will focus on dataframes today
session.use_warehouse(MY_WAREHOUSE)

#create schema and use it
session.sql("create or replace schema "+MY_SCHEMA+ " clone analytics" ).collect()
session.use_schema(MY_SCHEMA)

print(session.get_current_database())
print(session.get_current_schema())
print(session.get_current_warehouse())
print("------------------------------")
print("Session created.")

"FROSTBYTE_TASTY_BYTES"
"SCHEMA_MICHELLE"
"MICHELLE_WH"
------------------------------
Session created.


## Snowpark DataFrame

Let's create a Snowpark DataFrame containing our shift sales data from the **shift_sales** table in our Snowflake account using the Snowpark session.table function. A DataFrame is a data structure that contains rows and columns, similar to a SQL table.

If you have used PySpark in the past, this should also sound very familiar - we even have [cheat sheets](https://www.snowflake.com/en/data-cloud/snowpark/spark-to-snowpark/) to help you transition quicker!


**Value:** Familiar representation of data for Python users.



In [None]:
snowpark_df = session.table("shift_sales")

## What happened on Snowflake?

Let's look at what was executed in Snowflake to create our location_df DataFrame. 

The translated SQL query can be seen in the Snowsight interface under _Activity_ in the _Query History_ 

We will refer to it many times over the course of this hands-on-lab so we encourage you to open it in a NEW TAB.

So, what happened behind the scenes?

Nothing!

Why?

Because Snowpark uses **Lazy execution**! 

Until we perform an [action](https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes#performing-an-action-to-evaluate-a-dataframe) on the dataframe like a show() or collect() type of command nothing is actually pushed down to the compute.



**Value:** Efficient use of compute.




## Preview the Data

With our Snowpark DataFrame defined, let’s use the .show() function to take a look at the first 10 rows.



**Value:** Instant access to data.



In [None]:
# Not necessary, but will allows us to look at the queries we are generating moving forward
query_history = session.query_history()

#display some data
snowpark_df.show(20)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"LOCATION_ID"  |"CITY"  |"DATE"      |"SHIFT_SALES"  |"SHIFT"  |"MONTH"  |"DAY_OF_WEEK"  |"LATITUDE"  |"LONGITUDE"  |"COUNT_LOCATIONS_WITHIN_HALF_MILE"  |"CITY_POPULATION"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|4274           |London  |2022-11-06  |NULL           |PM       |11       |0              |51.496019   |-0.177579    |6                                   |8799800            |
|4274           |London  |2022-11-07  |NULL           |PM       |11       |1              |51.496019   |-0.177579    |6                                   |8799800            |
|4274           |London  |2022-11-04  |NULL           |PM       |11       |5              |51.496019   |-0.177579    |6 

## Select, Filter, Sort

Did you notice the Null values for "shift_sales"? 

Let's look at a single location.

To do this, we will make another Snowpark DataFrame, location_df, from the above DataFrame and we will:

1. Select columns
2. Filter to a single location ID
3. Sort by date

****

**Value**: Efficient transformation pipelines using Python syntax and chained logic.



In [None]:
# Select
location_df = snowpark_df.select("date", "shift", "shift_sales", "location_id", "city")

# Filter (Vancouver=1135, Montreal=5319, Toronto = 1637)
location_df = location_df.filter(F.col("location_id") == 1135)
#location_df = location_df.filter(F.col("location_id") == 5319)

# Sort
location_df = location_df.order_by(["date", "shift"], ascending=[0, 0])

# Display
location_df.show(n=20)

------------------------------------------------------------------
|"DATE"      |"SHIFT"  |"SHIFT_SALES"  |"LOCATION_ID"  |"CITY"   |
------------------------------------------------------------------
|2022-11-08  |PM       |NULL           |1637           |Toronto  |
|2022-11-08  |AM       |NULL           |1637           |Toronto  |
|2022-11-07  |PM       |NULL           |1637           |Toronto  |
|2022-11-07  |AM       |NULL           |1637           |Toronto  |
|2022-11-06  |PM       |NULL           |1637           |Toronto  |
|2022-11-06  |AM       |NULL           |1637           |Toronto  |
|2022-11-05  |PM       |NULL           |1637           |Toronto  |
|2022-11-05  |AM       |NULL           |1637           |Toronto  |
|2022-11-04  |PM       |NULL           |1637           |Toronto  |
|2022-11-04  |AM       |NULL           |1637           |Toronto  |
|2022-11-03  |PM       |NULL           |1637           |Toronto  |
|2022-11-03  |AM       |NULL           |1637           |Toront

We can see that shift sales are populated 8 days prior to the latest date in the data. The **missing values** represent future dates that do not have shift sales yet.

## Snowpark works in two main ways:

1. Snowpark code translated and executed as SQL on Snowflake
2. Python functions deployed in a secure sandbox in Snowflake




In [None]:
st.image('https://raw.githubusercontent.com/Snowflake-Labs/sfguide-tasty-bytes-snowpark-101-for-data-science/211360e75995b670b2b25d3204b59b42e48456cd//assets/snowpark_overview.png')

## SQL is also an option

Please also note that if at any point there is some code that you'd rather write in SQL, you are just a click away from adding a _SQL Cell_



In [None]:
SELECT "DATE", "SHIFT", "SHIFT_SALES", "LOCATION_ID", "CITY" 
FROM frostbyte_tasty_bytes.analytics.shift_sales 
WHERE ("LOCATION_ID" = 1637) ORDER BY "DATE" DESC, "SHIFT" DESC  
LIMIT 20;


In [None]:
session.sql("SELECT DATE, SHIFT, SHIFT_SALES, LOCATION_ID, CITY \
FROM frostbyte_tasty_bytes.analytics.shift_sales \
WHERE (LOCATION_ID = 1637) ORDER BY DATE DESC, SHIFT DESC \
LIMIT 20").show()

------------------------------------------------------------------
|"DATE"      |"SHIFT"  |"SHIFT_SALES"  |"LOCATION_ID"  |"CITY"   |
------------------------------------------------------------------
|2022-11-08  |PM       |NULL           |1637           |Toronto  |
|2022-11-08  |AM       |NULL           |1637           |Toronto  |
|2022-11-07  |PM       |NULL           |1637           |Toronto  |
|2022-11-07  |AM       |NULL           |1637           |Toronto  |
|2022-11-06  |PM       |NULL           |1637           |Toronto  |
|2022-11-06  |AM       |NULL           |1637           |Toronto  |
|2022-11-05  |PM       |NULL           |1637           |Toronto  |
|2022-11-05  |AM       |NULL           |1637           |Toronto  |
|2022-11-04  |PM       |NULL           |1637           |Toronto  |
|2022-11-04  |AM       |NULL           |1637           |Toronto  |
------------------------------------------------------------------



## Explain the Query

Everything  you do in Snowflake UI can be done in code as well. Checking the latest query is no different.

Which means you don't actually need to use the Snowsight UI to see the translated SQL. You can simply query the latest record in the Query History view.

**Value:** Transparent execution and compute usage.



In [None]:
# Check the SQL that just executed on Snowflake!
query_history.queries[-1:]

[QueryRecord(query_id='01b3c0dc-3201-3053-0002-c96200e0e3ce', sql_text='SELECT  *  FROM (SELECT DATE, SHIFT, SHIFT_SALES, LOCATION_ID, CITY FROM frostbyte_tasty_bytes.analytics.shift_sales WHERE (LOCATION_ID = 1637) ORDER BY DATE DESC, SHIFT DESC LIMIT 20) LIMIT 10')]

To be fair you don't even need to run the query to understand what is going to happen! 

You can use the `explain()` function to get the dataframe execution plan and ensure it looks efficient and that Snowflake will be able to prune as much data as possible.



**Value:** Transparent execution and compute usage.



In [None]:
location_df.explain()

---------DATAFRAME EXECUTION PLAN----------
Query List:
1.
SELECT "DATE", "SHIFT", "SHIFT_SALES", "LOCATION_ID", "CITY" FROM shift_sales WHERE ("LOCATION_ID" = 1637 :: INT) ORDER BY "DATE" DESC NULLS LAST, "SHIFT" DESC NULLS LAST
Logical Execution Plan:
GlobalStats:
    partitionsTotal=512
    partitionsAssigned=491
    bytesAssigned=10372608
Operations:
1:0     ->Result  SHIFT_SALES.DATE, SHIFT_SALES.SHIFT, SHIFT_SALES.SHIFT_SALES, SHIFT_SALES.LOCATION_ID, SHIFT_SALES.CITY  
1:1          ->Sort  SHIFT_SALES.DATE DESC NULLS LAST, SHIFT_SALES.SHIFT DESC NULLS LAST  
1:2               ->Filter  SHIFT_SALES.LOCATION_ID = 1637  
1:3                    ->TableScan  FROSTBYTE_TASTY_BYTES.SCHEMA_MICHELLE.SHIFT_SALES  LOCATION_ID, CITY, DATE, SHIFT_SALES, SHIFT  {partitionsTotal=512, partitionsAssigned=491, bytesAssigned=10372608}

--------------------------------------------


## Compare DataFrame Size

Let's bring a sample of our Snowflake dataset to our local environment (AKA our browser) in a pandas DataFrame using the `to_pandas()` function. We will compare how much memory is used for the pandas DataFrame compared to the Snowpark DataFrame. As we will see, no memory is used for the Snowpark DataFrame in our Python environment. All data in the Snowpark DataFrame remains on Snowflake.


**Value:** No copies or movement of data when working with Snowpark DataFrames.



In [None]:
# Bring 10,000 rows from Snowflake to pandas
pandas_df = snowpark_df.limit(10000).to_pandas()

# Get Snowpark DataFrame size
snowpark_size = sys.getsizeof(snowpark_df) / (1024*1024)
print(f"Snowpark DataFrame Size (snowpark_df): {snowpark_size:.2f} MB")

# Get pandas DataFrame size
pandas_size = sys.getsizeof(pandas_df) / (1024*1024)
print(f"Pandas DataFrame Size (pandas_df): {pandas_size:.2f} MB")

Snowpark DataFrame Size (snowpark_df): 0.00 MB
Pandas DataFrame Size (pandas_df): 0.19 MB


In [None]:
st.image('https://raw.githubusercontent.com/Snowflake-Labs/sfguide-tasty-bytes-snowpark-101-for-data-science/211360e75995b670b2b25d3204b59b42e48456cd//assets/data_exploration.png')

Here we will use Snowpark to explore our data. A common pattern for exploration is to use Snowpark to manipulate our data and then bring an aggregate table to our Python environment for visualization.

**Value:**
- Native Snowflake performance and scale for aggregating large datasets.





## How many rows are in our data?

This will give us an idea of how we might need to approach working with this data. Do we have enough data to build a meaningful model? What compute might be required? Will we need to sample the data?

**What's happening where?:** Rows counted in Snowflake. No data transfer.



In [None]:
snowpark_df.count()

707540

## Let's calculate some descriptive statistics.

We use the Snowpark `describe()` function to calculate summary statistics and then bring the aggregate results into a pandas DataFrame to visualize in a formatted table.

**What's happening where?:** Summary statistics calculated in Snowflake. Transfer aggregate summary statistics for client-side visualization.



In [None]:
snowpark_df.describe().to_pandas()

Unnamed: 0,SUMMARY,LOCATION_ID,CITY,SHIFT_SALES,SHIFT,MONTH,DAY_OF_WEEK,LATITUDE,LONGITUDE,COUNT_LOCATIONS_WITHIN_HALF_MILE,CITY_POPULATION
0,count,707540.0,707540,524420.0,707540,707540.0,707540.0,707540.0,707540.0,707540.0,707540.0
1,stddev,4199.251255,,12077.623062,,3.415971,2.001384,28.384873,77.663891,51.598726,4529228.0
2,max,15517.0,Warsaw,78079.0,PM,12.0,6.0,59.486683,151.323435,290.0,16349830.0
3,mean,8195.89519,,19280.00042,,7.649166,2.998681,31.831126,-2.48671,29.874,4281827.0
4,min,1001.0,Barcelona,3.0,AM,1.0,0.0,-38.327454,-123.243134,0.0,105661.0


## What are the numeric columns?

We want to understand the data types in our data and how we might need to handle them in preparation for modeling. For numeric columns, this could include normalizing the data to the same scale or applying a transformation to change the distribution.

**What's happening where?:** The Snowflake table schema is used to get metadata information about the data. No data transfer.



In [None]:
# Define Snowflake numeric types
numeric_types = [T.DecimalType, T.DoubleType, T.FloatType, T.IntegerType, T.LongType]

# Get numeric columns
numeric_columns = [col.name for col in snowpark_df.schema.fields if type(col.datatype) in numeric_types]
numeric_columns

['LOCATION_ID',
 'SHIFT_SALES',
 'MONTH',
 'DAY_OF_WEEK',
 'LATITUDE',
 'LONGITUDE',
 'COUNT_LOCATIONS_WITHIN_HALF_MILE',
 'CITY_POPULATION']

## What are the categorical columns?

Our model requires all features to be numeric. We want to identify columns that we will need to transform to a numeric representation if we would like to use them as features in our model.

**What's happening where?:** The Snowflake table schema is used to get metadata information about the data. No data transfer.



In [None]:
# Define Snowflake categorical types
categorical_types = [T.StringType]

# Get categorical columns
categorical_columns = [col.name for col in snowpark_df.schema.fields if type(col.datatype) in categorical_types]
categorical_columns

['CITY', 'SHIFT']

## What are the average shift sales (USD) by city?

Here, we are trying to understand what a "normal" shift sale looks like. What is the span of averages across cities? Are there any outlier cities that should be removed from our training data? Is there anything unexpected in the order of cities sorted by their average shift sales?

**What's happening where?:** Average sales by city calculated in Snowflake. Transfer city averages for client-side visualization.



In [None]:
# Group by city and average shift sales
analysis_df = snowpark_df.group_by("city").agg(F.mean("shift_sales").alias("avg_shift_sales"))

# Sort by average shift sales
analysis_df = analysis_df.sort("avg_shift_sales", ascending=True)

# Pull to pandas and plot
analysis_df_pandas = analysis_df.to_pandas()

In [None]:
fig = px.bar(analysis_df_pandas, x='CITY',y='AVG_SHIFT_SALES',
      title="Average Shift Sales by City")
fig.update_traces(textfont_size=12, textangle=0, textposition="outside", cliponaxis=False)
st.plotly_chart(fig, theme="streamlit")

In [None]:
st.title("Select a city for further analysis")

selected_city = st.selectbox('Select your city:',snowpark_df.select("city").distinct())

In [None]:
st.markdown(f"## Looking at the {selected_city}, how many locations are there?")

st.markdown("Let's get to know locations and shift sales in that city. First, we will see how many location options there are in the selected city for a food truck to park.")

st.markdown("**What's happening where?:** Data filtered, averages calculated by location, and locations counted in Snowflake. No data transfer.")



In [None]:
# Filter to selected_city
analysis_df = snowpark_df.filter(F.col("city") == selected_city)

# Group by location and average shift sales
analysis_df = analysis_df.group_by("location_id").agg(F.mean("shift_sales").alias("avg_shift_sales"))

print(selected_city,":", analysis_df.count())

San Mateo location count: 372


In [None]:
st.image('https://raw.githubusercontent.com/Snowflake-Labs/sfguide-tasty-bytes-snowpark-101-for-data-science/211360e75995b670b2b25d3204b59b42e48456cd//assets/feature_engineering.png')


## Feature Engineering

Now let's keep revelant columns and transform columns to create features needed for our prediction model.
To make some of our features more useful, we will normalize them using standard preprocessing techniques, such as One-Hot Encoding. Let's fit an encoder to our data, then use it to transform the data, producing new feature columns.

**Value:** The Snowpark syntax makes pipelines easy to implement and understand. The syntax also allows for easy migration of Spark pipelines to Snowflake.

With SnowparkML, you can use a standard sklearn-style API to execute **fully distributed feature engineering preprocessing tasks on Snowflake compute, with zero data movement.**

Notice what we haven't had to do? No tuning, maintenance, or operational overhead. We just need a role, warehouse, and access to the data.

**Value**: Near-zero maintenance. Focus on the work that brings value.

****

## Create a Rolling Average Feature

We will use a Snowflake window function to get a **rolling shift average by location** over time. Window functions allow us to aggregate on a "moving" group of rows.

**Step 1. Create a Window**

Our window will partition the data by location and shift. It will order rows by date. It will include all rows prior to the current date of the observation it is aggregating for.



In [None]:
window_by_location_all_days = (
    Window.partition_by("location_id", "shift")
    .order_by("date")
    .rows_between(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW - 1)
)

**Step 2. Aggregate across the Window**



In [None]:
snowpark_df = snowpark_df.with_column(
    "avg_location_shift_sales", 
    F.avg("shift_sales").over(window_by_location_all_days)
)
snowpark_df.show(n=10)

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"LOCATION_ID"  |"CITY"  |"DATE"      |"SHIFT_SALES"  |"SHIFT"  |"MONTH"  |"DAY_OF_WEEK"  |"LATITUDE"  |"LONGITUDE"  |"COUNT_LOCATIONS_WITHIN_HALF_MILE"  |"CITY_POPULATION"  |"AVG_LOCATION_SHIFT_SALES"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|7705           |Sydney  |2020-05-27  |29446.0        |PM       |5        |3              |-33.86184   |151.20834    |111                                 |5231147            |NULL                        |
|7705           |Sydney  |2020-06-15  |27165.0        |PM       |6        |1              |-33.86184   |151.20834    |111                                 |5231147            |29446

## Impute Missing Values with Snowpark ML



The rolling average feature we just created is missing if there are no prior shift sales at that location. We will replace those missing values with 0.



In [None]:
from snowflake.ml.modeling.impute import SimpleImputer

num_cols=["AVG_LOCATION_SHIFT_SALES"]
num_cols_out=["AVG_LOCATION_SHIFT_SALES"]

impute_cols = SimpleImputer(
    input_cols=num_cols,
    output_cols=num_cols_out,
    strategy="constant",
    drop_input_cols=True
    # without specifying fill_value, default is 0
)

snowpark_df = impute_cols.fit(snowpark_df).transform(snowpark_df)
snowpark_df.show(10)


In [None]:
st.markdown("**One Hot Encoding with Snowpark ML**")
st.image('https://miro.medium.com/v2/resize:fit:1400/format:webp/0*WcPabPyXynVjGcBP')

st.markdown(f"Categorical columns need to be represented as numeric in our model. We will use the OneHotEncoder method from the PreProcessing module to encode the columh 'Shift'.")


In [None]:
# binary encoding
#snowpark_df = snowpark_df.with_column("shift", F.iff(F.col("shift") == "AM", 1, 0))
# snowpark_df.show(n=10)

-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"LOCATION_ID"  |"CITY"         |"DATE"      |"SHIFT_SALES"  |"MONTH"  |"DAY_OF_WEEK"  |"LATITUDE"  |"LONGITUDE"  |"COUNT_LOCATIONS_WITHIN_HALF_MILE"  |"CITY_POPULATION"  |"AVG_LOCATION_SHIFT_SALES"  |"SHIFT"  |
-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|13096          |New York City  |2020-03-09  |30055.0        |3        |1              |40.74021    |-73.987424   |30                                  |8804190            |0.0                         |0        |
|13096          |New York City  |2020-03-18  |28108.0        |3        |3              |40.74021    |-73.987424   |30                                  |

In [None]:
from snowflake.ml.modeling.pipeline import pipeline
from snowflake.ml.modeling.preprocessing import OneHotEncoder

target_cols = ['SHIFT']
target_cols_out = ['SHIFT_OHE']
ohe = OneHotEncoder(
    input_cols=target_cols, 
    output_cols=target_cols_out
)

ohe.fit(snowpark_df)
snowpark_df = ohe.fit(snowpark_df).transform(snowpark_df)
snowpark_df.show(10)

#for SiS
snowpark_df.write.mode("overwrite").save_as_table("shift_sales_all_features")


## Filter to Historical Data

Our data includes placeholders for future data with missing shift sales. The **future data** represents the next 7 days of shifts for all locations. The **historical data** has shift sales for all locations where a food truck parked during a shift. We will only use historical data when training our model and will filter out the dates where the **shift_sales** column is missing.



In [None]:
historical_snowpark_df = snowpark_df.filter(F.col("shift_sales").is_not_null())
historical_snowpark_df.show(n=10)

------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"LOCATION_ID"  |"CITY"  |"DATE"      |"SHIFT_SALES"  |"MONTH"  |"DAY_OF_WEEK"  |"LATITUDE"  |"LONGITUDE"  |"COUNT_LOCATIONS_WITHIN_HALF_MILE"  |"CITY_POPULATION"  |"AVG_LOCATION_SHIFT_SALES"  |"SHIFT"  |
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|12697          |London  |2020-10-15  |29339.0        |10       |4              |51.536468   |-0.003512    |0                                   |8799800            |0.0                         |0        |
|12697          |London  |2021-01-17  |50146.0        |1        |0              |51.536468   |-0.003512    |0                                   |8799800            |29339.0        

## Drop Columns

Drop ID columns that will not be used in the model.



In [None]:
historical_snowpark_df = historical_snowpark_df.drop("location_id", "city", "date","shift")
historical_snowpark_df.show(n=10)

----------------------------------------------------------------------------------------------------------------------------------------------------------------------
|"SHIFT_SALES"  |"MONTH"  |"DAY_OF_WEEK"  |"LATITUDE"  |"LONGITUDE"  |"COUNT_LOCATIONS_WITHIN_HALF_MILE"  |"CITY_POPULATION"  |"AVG_LOCATION_SHIFT_SALES"  |"SHIFT"  |
----------------------------------------------------------------------------------------------------------------------------------------------------------------------
|8171.0         |8        |6              |59.328922   |18.064892    |41                                  |978770             |0.0                         |1        |
|11863.0        |8        |2              |59.328922   |18.064892    |41                                  |978770             |8171.0                      |1        |
|8921.25        |9        |4              |59.328922   |18.064892    |41                                  |978770             |10017.0                     |1        

## Split Data into Training and Testing

We will use 80% of the data for model training and 20% for testing.



In [None]:
train_snowpark_df, test_snowpark_df = historical_snowpark_df.randomSplit([0.8, 0.2])

## Save Tables in Snowflake

We will save our training and test datasets to the **analytics schema** in our Snowflake account.

**Value:** Eliminate redundant data processing. These tables can be re-used to train more models beyond what we are training today.



In [None]:
# Save training data
train_snowpark_df.write.mode("overwrite").save_as_table("shift_sales_train")

# Save test data
test_snowpark_df.write.mode("overwrite").save_as_table("shift_sales_test")

In [None]:
st.image('https://raw.githubusercontent.com/Snowflake-Labs/sfguide-tasty-bytes-snowpark-101-for-data-science/211360e75995b670b2b25d3204b59b42e48456cd//assets/model_training.png')
st.markdown("**Option 1 for Model Training**");
st.markdown(f"We will now use our training data to train a linear regression model on Snowflake.")
st.markdown(f"We will be leveraging the deployment of Python functions into Snowflake for training and model deployment")
st.image('https://raw.githubusercontent.com/Snowflake-Labs/sfguide-tasty-bytes-snowpark-101-for-data-science/211360e75995b670b2b25d3204b59b42e48456cd//assets/end_to_end_ml.png')


Here, we see a typical data science workflow. We are finished **preparing** our data and now move on to **training in a Python stored procedure on Snowflake**. The model created from this stored procedure will be our tool for automating decisions around truck locations to maximize our revenue. We'll surface the predicted sales (model inference) on future data using a **Python user-defined function** to drive the decisions.

**Snowflake Stored Procedures** work well for training because they can read data, hold an entire table in memory to find patterns, and write files (e.g. model files) back to the Snowflake database.

**Snowflake User-Defined Functions** work well for inference because they return a single value for each row passed to the user-defined function. Because of this, they can easily be distributed to provide fast results.

**Value**: Effortless, scalable, and secure processing **without data movement** across compute environments.



In [None]:
st.markdown("**Simplify, accelerate and scale end-to-end AI/ML workflows**")
st.image("https://www.snowflake.com/wp-content/uploads/2023/06/Screenshot-2023-06-27-at-7.53.44-PM.png")

In [None]:
session.sql("ALTER WAREHOUSE " + MY_WAREHOUSE + " SET WAREHOUSE_SIZE = 'LARGE'").collect()

[Row(status='Statement executed successfully.')]

## Option 2:  Model Training using ML Modeling API


Let's run our Linear Regression training job using the SnowparkML Modeling API- this will push down our model training to run on Snowflake.

- The `model.fit()` function actually creates a temporary stored procedure in the background. This also means that the model training is a single-node operation but will use multiple cores within the node via the n_jobs argument if used. Be sure to use a Snowpark Optimized Warehouse if you need more memory.
- The `model.predict()` function actually creates a temporary vectorized UDF in the background, which means the input DataFrame is batched as Pandas DataFrames and inference is parallelized across the batches of data. You can check the query history once you execute the following cell to check. The result is a dataframe with the results appended to it, not just the results like vanilla scikit-learn. 


In [None]:
from snowflake.ml.modeling.linear_model import LinearRegression
from snowflake.ml.modeling.metrics import mean_squared_error, mean_absolute_error, r2_score

#Define Linear Regression
linregression = LinearRegression(
    input_cols = session.table("shift_sales_train").drop("shift_sales").columns,
    label_cols = ['SHIFT_SALES'],
    output_cols= ['PREDICTION']
)

#Train model
linregression.fit(session.table("shift_sales_train"))

#Predict
result = linregression.predict(session.table("shift_sales_train"))

# Test the model
df_test_pred = linregression.predict(session.table("shift_sales_test"))
#df_test_pred_pd = df_test_pred.to_pandas()
actuals = 'SHIFT_SALES'
prediction = 'PREDICTION'

MSE = mean_squared_error(df=df_test_pred, y_true_col_names=actuals, y_pred_col_names=prediction)
MAB = mean_absolute_error(df=df_test_pred, y_true_col_names=actuals, y_pred_col_names=prediction)
R2 = r2_score(df=df_test_pred, y_true_col_name=actuals, y_pred_col_name=prediction)

print(f'MSE: {MSE}')
print(f'MAE: {MAB}')
print(f'R2: {R2}')

In [None]:
result.select("SHIFT_SALES", "PREDICTION").show()

--------------------------------------
|"SHIFT_SALES"  |"PREDICTION"        |
--------------------------------------
|18218.0        |19838.588844526716  |
|17071.0        |28855.686444856037  |
|25681.0        |28923.19008717186   |
|29.0           |29438.371425806647  |
|32032.0        |30257.608191137893  |
|33771.0        |31309.010707456735  |
|48511.0        |31761.002816853368  |
|16963.5        |32969.76190263197   |
|24897.0        |31773.147323830213  |
|39317.75       |27672.45426783916   |
--------------------------------------



## Model Registry

Now, with Snowpark ML's model registry, we have a Snowflake native model versioning and deployment framework. This allows us to log models, tag parameters and metrics, track metadata, create versions, and ultimately deploy models into a Snowflake warehouse or Snowpark Container Service for batch scoring tasks. 

Snowflake's Model Registry supports SciKitLearn, XGBoost, Pytorch, Tensorflow and MLFlow (via the pyfunc interface) models. Model Registry allows easy deployment of pre-trained open-source models from providers such as HuggingFace.

***
When creating a model registry, the database name is optional. If you do not specify it, MODEL_REGISTRY is the default. By using different database names, you can create multiple registries in your account for access control, lifecycle management, or other purposes.

Add a model by to the registry calling the registry’s `log_model` method. This method:

- Serializes the model and uploads it to a Snowflake stage. The model, a Python object, must be serializable (“pickleable”).
- Creates an entry in the model registry for the model, referencing the staged location.
- Adds metadata such as description and tags to the model as specified in the `log_model` call.



In [None]:
from snowflake.ml.registry import Registry
#suppressed warning

native_registry = Registry(session, database_name="FROSTBYTE_TASTY_BYTES", schema_name=MY_SCHEMA)

# Define model name
model_name = "linear_regression"

# Let's first log the very first model we trained
model_v1 = native_registry.log_model(
    model_name=model_name,
    version_name='V1',
    model=linregression
)
# Add a description
model_v1.comment = "This is the initial model of the Shift Sales Price Prediction model."

In [None]:
df_test_pred_pd = df_test_pred.to_pandas()
actuals = df_test_pred_pd[["SHIFT_SALES"]]
prediction = df_test_pred_pd[['PREDICTION']]

model_v1.set_metric(metric_name="mean_squared_error", value=MSE)
model_v1.set_metric(metric_name="mean_absolute_error", value=MAB)
model_v1.set_metric(metric_name="r2_score", value=R2)


In [None]:
model_df = native_registry.show_models()
model_df[['created_on','database_name','schema_name','owner','name','versions']]

In [None]:
# Let's confirm model(s) that were added
native_registry.get_model(model_name).show_versions()

In [None]:
# to delete models
#m = native_registry.get_model("LINEAR_REGRESSION")
#default_version = m.default
#m.default = "V2"
#m.delete_version("V1")

If you have multiple versions of the model, we want the UDF to be deployed as the version with the highest R2 value

In [None]:
import json 
reg_df = native_registry.get_model(model_name).show_versions()
reg_df["r2_score"] = reg_df["metadata"].apply(
    lambda x: json.loads(x)["metrics"]["r2_score"]
)
best_model = reg_df.sort_values(by="r2_score", ascending=False)

deployed_version = best_model["name"].iloc[0]
deployed_version

**Model Inference**

Now we can use the best model to perform inference using the `run` method specifying the name of the function to be called and passing a Snowpark or pandas DataFrame containing the inference data. 

In [None]:
# Set the default version to the deployed version (best model)
# Batch Inference without needing to create a UDF

m = native_registry.get_model(model_name)
m.default = deployed_version
model_version = m.default

remote_prediction = model_version.run(session.table("shift_sales_test"), function_name="predict")
remote_prediction.show()

In [None]:
st.image('https://raw.githubusercontent.com/Snowflake-Labs/sfguide-tasty-bytes-snowpark-101-for-data-science/211360e75995b670b2b25d3204b59b42e48456cd//assets/model_utilization.png');
st.markdown(f"Now that our model is built and deployed, let's see it in action! We will find the best place to park in Vancouver for tomorrow morning's shift.");
st.image('https://raw.githubusercontent.com/Snowflake-Labs/sfguide-tasty-bytes-snowpark-101-for-data-science/211360e75995b670b2b25d3204b59b42e48456cd//assets/problem_overview.png');

## Predict Location Sales for the Next Shift

We will filter to the morning shift of the first future date in our data and Vancouver. We then call our scoring user-defined function to get predicted shift sales at each location.



In [None]:
# Check model predictions for holdout data SHIFT_SALES predictions for Location_IDs in Vancouver
date_tomorrow_df = snowpark_df.filter(
    (F.col("SHIFT_SALES").isNull())
    & (F.col("SHIFT_OHE_AM") == 1)
    & (F.col("CITY") == selected_city)
)

results_pred = linregression.predict(date_tomorrow_df)
results_pred.show(20)

## Democratize Data Access and Visualize Results on a Map

The yellow dots indicate higher predicted sales locations and the purple dots indicate lower predicted sales. We will use this insight to ensure that our drivers are parking at the high-value locations.

**Value:** Updated predictions readily available to drive towards our corporate goals.



In [None]:
from snowflake.snowpark import Window

st.title("Select a city to visualize top 20 locations on the map")

selected_city_map = st.text_input("Enter the city 👇")

snowpark_df = session.table("frostbyte_tasty_bytes.analytics.shift_sales_all_features")
# Get the date to predict
date_tomorrow = snowpark_df.filter(F.col("shift_sales").is_null()).select(F.min("date")).collect()[0][0]

# Filter to tomorrow's date and the morning shift in {{ selected_city_map }}
location_predictions_df = snowpark_df.filter((F.col("date") == date_tomorrow) 
                                             & (F.col("shift_ohe_AM") == 1) 
                                             & (F.col("city")==selected_city_map))
# Get predictions
location_predictions_df = linregression.predict(location_predictions_df).select(
    "city",
    "location_id", 
    "latitude", 
    "longitude",
    "prediction"
)

window = Window.partitionBy(location_predictions_df['city']).orderBy(location_predictions_df['prediction'].desc())
filtered_df = location_predictions_df.select(
    "city",
    "location_id", 
    "latitude", 
    "longitude",
    "prediction",
    F.rank().over(window).alias('rank')).filter(F.col('rank') <= 20)
                                               
# Pull location predictions into a pandas DataFrame
predictions_df = filtered_df.to_pandas()

st.map(predictions_df,
    latitude='latitude',
    longitude='longitude',
    size='prediction',
)




# Next Steps



We've created features, built a base model, and deployed it - all with Snowflake. What else can we do from here?


### 1. Add new features from __Snowflake Data Marketplace:

- [Snowflake Marketplace](https://www.snowflake.com/snowflake-marketplace/),
- [Free demo listing: Weather Source, LLC: Frostbyte](https://www.snowflake.com/datasets/weather-source-llc-frostbyte/),
- [Free demo Listing: Safegraph](https://app.snowflake.com/marketplace/listing/GZSNZL1CN82/safegraph-safegraph-frostbyte?search=frostbyte),

### 2. Improve model performance using a User-Defined Table Function (UDTF) for parallel hyperparameter tuning to identify the optimal combination of hyperparameters to use in training:[¶](http://localhost:8889/notebooks/tasty_bytes_snowpark_101-release1.ipynb#2.-Improve-model-performance-using-a-User-Defined-Table-Function-(UDTF)-for-parallel-hyperparameter-tuning-to-identify-the-optimal-comibination-of-hyperparameters-to-use-in-training:)

- [Blog: Parallel Hyperparameter Tuning Using Snowpark](https://medium.com/snowflake/parallel-hyperparameter-tuning-using-snowpark-53cdec2faf77),
- [Documentation: Implementing User-Defined Table Functions (UDTFs) in Python](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-tabular-functions.html),

### 3. Automate predictions with __Streams & Tasks__:

- [Quickstart: Getting Started with Streams & Tasks](https://quickstarts.snowflake.com/guide/getting_started_with_streams_and_tasks/index.html?index=..%2F..index#0),
- [Documentation: Introduction to Tasks](https://docs.snowflake.com/en/user-guide/tasks-intro.html),
- [Documentation: Introduction to Streams](https://docs.snowflake.com/en/user-guide/streams-intro.html),

### 4. Create an app interface with __Streamlit__ for truck drivers to get location predictions:

- [Quickstart: Getting Started with Snowpark for Python and Streamlit](https://quickstarts.snowflake.com/guide/getting_started_with_snowpark_for_python_streamlit/index.html?index=..%2F..index#0),



# Reset

We will run the following cell to remove the objects from Snowflake that we created in this notebook. Dropping our stage will remove the model file. We will also close our Snowflake session.



In [None]:
#Drop training table
session.sql("DROP TABLE IF EXISTS shift_sales_train").collect()

#Drop testing table
session.sql("DROP TABLE IF EXISTS shift_sales_test").collect()

session.sql("DROP SCHEMA IF EXISTS "+ MY_SCHEMA).collect()


#Drop training stored procedure
session.sql("DROP PROCEDURE IF EXISTS sproc_train_linreg(varchar, array, varchar, varchar)").collect()

#Drop inference user-defined function
session.sql(
   "DROP FUNCTION IF EXISTS udf_linreg_predict_location_sales(float, float, float, float, float, float, float, float)"
).collect()

#Drop stage
session.sql("DROP STAGE IF EXISTS model_stage").collect()

#Scale down compute
session.sql("DROP WAREHOUSE " + MY_WAREHOUSE).collect()

#Close the session
session.close()