In [None]:
# Import necessary functions
import streamlit as st
from snowflake.snowpark.context import get_active_session
session = get_active_session()

# Define image in a stage and read the file
image=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/End-to-end_demo.png" , decompress=False).read() 

# Display the image
st.image(image, width=1000)

## Background Information

Tasty Bytes is one of the largest food truck networks in the world with localized menu options spread across 30 major cities in 15 countries. **Tasty Bytes is aiming to achieve 25% YoY sales growth over 5 years.**

As Tasty Bytes Data Scientists, we have been asked to support this goal by helping our food truck drivers more intelligently pick where to park for shifts. 

**We want to direct our trucks to locations that are expected to have the highest sales on a given shift.
This will maximize our daily revenue across our fleet of trucks.**

To provide this insight, we will use historical shift sales at each location to build a model. This data has been made available to us in Snowflake.Our model will provide the predicted sales at each location for the upcoming shift.



In [None]:
image=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/problem_overview.png" , decompress=False).read()
st.image(image, width=1000)

## Import Packages

Just like the Python packages we are importing, we will import the Snowpark modules that we need.

**Value**: Snowflake modules provide efficient ways to work with data and functions in Snowflake.



In [None]:
# Snowpark Imports
from snowflake.snowpark import Session
from snowflake.snowpark.context import get_active_session
from snowflake.snowpark.version import VERSION
import snowflake.snowpark.functions as F
from snowflake.snowpark.exceptions import SnowparkSessionException
from snowflake.snowpark.functions import sproc
from snowflake.snowpark import types as T
from snowflake.snowpark.functions import col
from snowflake.snowpark.window import Window

# Snowpark ML
from snowflake.ml.modeling.impute import SimpleImputer
from snowflake.ml.modeling.preprocessing import OrdinalEncoder, OneHotEncoder
from snowflake.ml.modeling.pipeline import Pipeline
from snowflake.ml.modeling.xgboost import XGBRegressor
from snowflake.ml.modeling.model_selection import GridSearchCV
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error
from snowflake.ml.registry import Registry
from snowflake.ml.feature_store import FeatureStore, FeatureView, Entity, CreationMode

# Snowflake Task API
from snowflake.core import Root
from snowflake.core.database import Database
from snowflake.core.schema import Schema
from snowflake.core.warehouse import Warehouse
from snowflake.core import Root
from snowflake.core.task import StoredProcedureCall
from snowflake.core.task.dagv1 import DAG, DAGTask, DAGOperation
from snowflake.core._common import CreateMode

# Streamlit
import streamlit as st

# Other Imports
from datetime import timedelta
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import sys
import json

#
import warnings
warnings.filterwarnings("ignore")

In [None]:
# Create Snowflake Session object
session = get_active_session()
session.sql_simplifier_enabled = True

snowflake_environment = session.sql('SELECT current_user(), current_version()').collect()
snowpark_version = VERSION

# Current Environment Details
print('\nConnection Established with the following parameters:')
print('User                        : {}'.format(snowflake_environment[0][0]))
print('Role                        : {}'.format(session.get_current_role()))
print('Database                    : {}'.format(session.get_current_database()))
print('Schema                      : {}'.format(session.get_current_schema()))
print('Warehouse                   : {}'.format(session.get_current_warehouse()))
print('Snowflake version           : {}'.format(snowflake_environment[0][1]))
print('Snowpark for Python version : {}.{}.{}'.format(snowpark_version[0],snowpark_version[1],snowpark_version[2]))

# Part 1 - Use Snowpark to access and prepare data for modeling

In [None]:
image=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/Part1.png" , decompress=False).read() 
st.image(image, width=1000)

**[Snowflake ML](https://docs.snowflake.com/en/developer-guide/snowpark-ml/overview)** is an integrated set of capabilities for end-to-end machine learning in a single platform on top of your governed data. Snowflake ML can be used for both fully custom and out-of-the-box workflows.

For custom ML, data scientists and ML engineers can easily and securely develop and productionize scalable features and models without any data movement, silos, or governance tradeoffs. These custom ML capabilities can be accessed through Python APIs from the Snowpark ML library.

Capabilities for custom ML include:

- **[Snowflake Notebooks](https://docs.snowflake.com/en/user-guide/ui-snowsight/notebooks)** for a familiar, easy-to-use notebook interface that blends Python, SQL, and Markdown
- **Container Runtimes** for distributed CPU and GPU processing out of the box from Snowflake Notebooks
- **[Snowpark ML Modeling](https://docs.snowflake.com/en/developer-guide/snowpark-ml/modeling)** for feature engineering and model training with familiar Python frameworks
- **[Snowflake Feature Store](https://docs.snowflake.com/en/developer-guide/snowpark-ml/feature-store/overview)** for continuous, automated refreshes on batch or streaming data
- **[Snowflake Model Registry](https://docs.snowflake.com/en/developer-guide/snowpark-ml/model-registry/overview)** to manage models and their metadata
- **ML Lineage** to trace end-to-end feature and model lineage (currently in private preview)

In [None]:
image=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/snowpark_ML.png" , decompress=False).read() 
st.image(image, width=1000)

## Snowpark DataFrame

Let's create a Snowpark DataFrame containing our shift sales data from the **shift_sales_v** view in our Snowflake account using the Snowpark session.table function. A DataFrame is a data structure that contains rows and columns, similar to a SQL table.

**Value:** Familiar representation of data for Python users.



In [None]:
snowpark_df = session.table("SHIFT_SALES")

## Preview the Data

With our Snowpark DataFrame defined, let’s use the .show() function to take a look at the first 10 rows.

**Value:** Instant access to data.



In [None]:
# Preview the data using the .show() function to look at the first 10 rows.
snowpark_df.show()

## Select, Filter, Sort

Notice the Null values for "shift_sales". Let's look at a single location.To do this, we will make another Snowpark DataFrame, location_df, from the above DataFrame and we will:

1. Select columns
2. Filter to a single location ID
3. Sort by date

**Value**: Efficient transformation pipelines using Python syntax and chained logic.



In [None]:
# Select
location_df = snowpark_df.select("date", "shift", "shift_sales", "location_id", "city")

# Filter
location_df = location_df.filter(F.col("location_id") == 1135)

# Sort
location_df = location_df.order_by(["date", "shift"], ascending=[0, 0])

# Display
location_df.show(n=20)

We can see that shift sales are populated 8 days prior to the latest date in the data. The **missing values** represent future dates that do not have shift sales yet.

## Snowpark ML Modeling 
Supports data preprocessing, feature engineering, and model training in Snowflake using popular machine learning frameworks, such as scikit-learn, xgboost, and lightgbm. This API also includes a preprocessing module that can use compute resources provided by a Snowpark-optimized warehouse to provide scalable data transformations.

In [None]:
image1=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/Snowpark_ML_API.png" , decompress=False).read() 
image2=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/SnowparkValue.png" , decompress=False).read() 

# Display the image
st.image(image1, width=1000)
st.subheader("Here's the value of using Snowpark:")
st.image(image2, width=1000)

## Explain the Query

Let's look at what was executed in Snowflake to create our location_df DataFrame.

The translated SQL query can be seen in the Snowsight interface under _Activity_ in the _Query History_ or directly in our notebook by using the explain() function. 

**Value:** Transparent execution and compute usage.

In [None]:
location_df.explain()

## Compare DataFrame Size

Let's bring a sample of our Snowflake dataset to our Python environment in a pandas DataFrame using the to_pandas() function. We will compare how much memory is used for the pandas DataFrame compared to the Snowpark DataFrame. As we will see, no memory is used for the Snowpark DataFrame in our Python environment. All data in the Snowpark DataFrame remains on Snowflake.
**Value:** No copies or movement of data when working with Snowpark DataFrames.



In [None]:
# Bring 10,000 rows from Snowflake to pandas
pandas_df = snowpark_df.limit(10000).to_pandas()

# Get Snowpark DataFrame size
snowpark_size = sys.getsizeof(snowpark_df) / (1024*1024)
print(f"Snowpark DataFrame Size (snowpark_df): {snowpark_size:.2f} MB")

# Get pandas DataFrame size
pandas_size = sys.getsizeof(pandas_df) / (1024*1024)
print(f"Pandas DataFrame Size (pandas_df): {pandas_size:.2f} MB")

## Data Exploration

Here, we will use Snowpark to explore our data. A common pattern for exploration is to use Snowpark to manipulate our data and then bring an aggregate table to our Python environment for visualization.

**Value:** - Native Snowflake performance and scale for aggregating large datasets. - Easy transfer of aggregate data to the client-side environment for visualization.
As we explore our data, we will highlight what is being done in Snowflake and what we are transferring to our client-side environment (Python notebook environment) for visualization.



## How many rows are in our data?

This will give us an idea of how we might need to approach working with this data. Do we have enough data to build a meaningful model? What compute might be required? Will we need to sample the data?

**What's happening where?:** Rows counted in Snowflake. No data transfer.



In [None]:
#Use the .count() function
snowpark_df.count()

In [None]:
# Add a select slider to choose the day of the week in the first column
days_of_week = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday']
selected_day = st.select_slider("Select Day of the Week", options=list(range(7)), format_func=lambda x: days_of_week[x])

# Filter the dataframe based on the selected day of the week
filtered_df = snowpark_df.filter(col("day_of_week") == selected_day)

# Group by city and calculate the average shift sales
df = filtered_df.group_by("city").agg(F.mean("shift_sales").alias("avg_shift_sales"))

# Plot the data using Streamlit's bar_chart
st.bar_chart(data=df,x='CITY',y='AVG_SHIFT_SALES')

## Let's calculate some descriptive statistics.

We use the Snowpark describe() function to calculate summary statistics and then bring the aggregate results into a pandas DataFrame to visualize in a formatted table.

**What's happening where?:** Summary statistics calculated in Snowflake. Transfer aggregate summary statistics for client-side visualization.



In [None]:
#Use the Snowpark DataFrame .describe function. You need to need to visualize from a pandas DataFrame
snowpark_df.describe().to_pandas()

## Feature Engineering

Now let's keep revelant columns and transform columns to create features needed for our prediction model.To make some of our features more useful, we will normalize them using standard preprocessing techniques, such as One-Hot Encoding and MinMaxScaling. With SnowparkML, you can use a standard sklearn-style API to execute fully distributed feature engineering preprocessing tasks on Snowflake compute, with zero data movement. Let's fit a scaler and encoder to our data, then use it to transform the data, producing new feature columns.


**Value:** The Snowpark syntax makes pipelines easy to implement and understand. The syntax also allows for easy migration of Spark pipelines to Snowflake.


**All transformations for feature engineering in this notebook will be executed on Snowflake compute.**

Notice what we haven't had to do? No tuning, maintenance, or operational overhead. We just need a role, warehouse, and access to the data.

**Value**: Near-zero maintenance. Focus on the work that brings value.



In [None]:
image1=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/Part1b.png" , decompress=False).read() 
st.image(image1, width=1000)
image2=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/SFFeatureStore.png" , decompress=False).read() 
st.image(image2, width=1000)

In the **[Snowflake Feature Store](https://docs.snowflake.com/en/developer-guide/snowpark-ml/feature-store/overview)**, as typical of other Feature Store solutions:

- **Entities** - define the business-entity and the level that we want to gather data and develop ML models at. (e.g. store or/and product key etc).
- **Features** are defined and grouped within **FeatureViews**. In Snowflake Feature Store features are columns, or column-expressions defined via the Snowpark for Python dataframe api, or via SQL directly.
- **FeatureViews** are associated (defined) for one or more **Entities**. A **FeatureView** can be defined with 1:n Entities, but typically only one. Several (many) **FeatureViews** may contain Features for the same Entity. FeatureViews tend to get defined based on the data-source they are derived from, the data's refresh or calculation frequency. A **FeatureView** is defined via a Snowpark Dataframe (or SQL expression) enabling a complex pipeline to be used.
- The **Entity** (key columns) are used to join **FeatureViews** together when needed to gather features from multiple **FeatureViews** within a single training or inference dataset, or derive new **FeatureViews**.
- A **FeatureSlice** provides a way of creating a subset of the Features from a single **FeatureViews** when needed. It can be used within the API, pretty much anywhere the **FeatureViews** can be used.
- **FeatureViews** and **FeatureSlices** can be merged (via merge_features) to gather features together and create a new **FeatureView**. For example, all the features for a given **Entity** could be gathered via the merge into a single.

### Creating and Registering the Feature Store and Entity
Before we can calculate and register our features, we need to create a feature store and define an entity that encapsulates the keys used to join features.
- **Feature Store Creation:** The feature store is created within a specified database and schema, and it is configured to create if it does not already exist.
- **Entity Definition:** An entity named AGGREGATE_WINDOW is defined, which includes the join keys location_id and shift that will be used for feature lookups.
- **Entity Registration:** The entity is registered in the feature store if it doesn't already exist. If it does exist, the existing entity is retrieved.

In [None]:
# Import necessary libraries and create session
from snowflake.ml.feature_store import FeatureStore, CreationMode

# Create a Feature Store
fs = FeatureStore(
    session=session,
    database=session.get_current_database(),
    name=session.get_current_schema(),
    default_warehouse=session.get_current_warehouse(),
    creation_mode=CreationMode.CREATE_IF_NOT_EXIST,
)

print("FeatureStore created or referenced successfully.")

In [None]:
# Define the name and keys for the AGGREGATE_WINDOW entity
aggregate_window_entity_name = "AGGREGATE_WINDOW"
aggregate_window_entity_join_keys = ["location_id", "shift"]

# List existing entities in the feature store
existing_entities = json.loads(fs.list_entities().select(F.to_json(F.array_agg("NAME", True))).collect()[0][0])

# Check if the AGGREGATE_WINDOW entity already exists
if aggregate_window_entity_name not in existing_entities:
    # Create the AGGREGATE_WINDOW entity
    aggregate_window_entity = Entity(
        name=aggregate_window_entity_name, 
        join_keys=aggregate_window_entity_join_keys, 
        desc="Aggregate window for rolling shift average by location"
    )
    fs.register_entity(aggregate_window_entity)
else:
    # Get the existing AGGREGATE_WINDOW entity
    aggregate_window_entity = fs.get_entity(aggregate_window_entity_name)

### Create a new FeatureView and materialize the feature pipeline
Now we construct a Feature View with above snowpark_df DataFrame. First, we create a draft feature view (fv). We set the refresh_freq to 30 days which will create a Snowflake dynamic table . At this point, the draft feature view will not take effect because it is not registered yet. Then we register the feature view by via register_feature_view. It will materialize to Snowflake backend. Incremental maintenance will start if the query is supported.

### Aggregating Across the Window
Next, we'll define a rolling average feature that calculates the average shift sales across all previous days for each location and shift. This feature will be used in our ML model development.
- Window Definition: A window function is defined to calculate the rolling average of shift sales for each location and shift.
- Feature Creation: The rolling average is calculated using the Snowpark DataFrame API and added as a new column in the DataFrame.
- Feature View Definition: A feature view is created to encapsulate this feature, and it is registered in the feature store. The timestamp_col ensures that the feature is treated as a time-series feature, and the refresh_freq determines how often it gets updated.

In [None]:
from snowflake.ml.feature_store import FeatureView
from snowflake.snowpark import Window
import snowflake.snowpark.functions as F

# Define the window for calculating the rolling shift average by location
window_by_location_all_days = (
    Window.partition_by('"LOCATION_ID"', '"SHIFT"')
    .order_by('"DATE"')
    .rows_between(Window.UNBOUNDED_PRECEDING, Window.CURRENT_ROW - 1)
)

# Use the window to create an aggregate across the window
snowpark_df = snowpark_df.with_column(
    "avg_location_shift_sales", 
    F.avg("shift_sales").over(window_by_location_all_days)
)

# Define the feature view
feature_view = FeatureView(
    name="AGGREGATE_WINDOW_FV",
    entities=[aggregate_window_entity],   # Use the entity created above
    feature_df=snowpark_df,               # This is your Snowpark DataFrame with features
    timestamp_col="DATE",                 # Use your timestamp column here
    refresh_freq="30 days",               # Set the refresh frequency according to your needs
    desc="TastyBytes rolling shift average by location_id refreshed on a schedule"
)

# Register the feature view
registered_fv = fs.register_feature_view(
    feature_view=feature_view,
    version="1",
    block=True
)

In [None]:
image1=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/feature_store.png" , decompress=False).read() 
st.image(image1, width=1000)

# Display what HOL should viusalize
st.subheader("Here is what your Snowpark Feature should look like:")
image2=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/HOL_Example.png" , decompress=False).read() 
st.image(image2, width=1000)

## Impute Missing Values

The rolling average feature we just created is missing if there are no prior shift sales at that location. We will replace those missing values with 0.



In [None]:
# Fill NaN values in the avg_location_shift_sales column with 0
snowpark_df = snowpark_df.fillna(value=0, subset=["avg_location_shift_sales"])

# Show the DataFrame to see the filled NaN values
snowpark_df.show()

## Leverage Snowpark ML Modeling API to create features

Snowpark ML provides APIs to support each stage of an end-to-end machine learning development and deployment process and includes two key components: [Snowpark ML Modeling](https://docs.snowflake.com/en/developer-guide/snowpark-ml/snowpark-ml-modeling) and [Snowpark ML Ops](https://docs.snowflake.com/en/developer-guide/snowpark-ml/snowpark-ml-mlops-model-registry).

[Snowpark ML Modeling](https://docs.snowflake.com/en/developer-guide/snowpark-ml/snowpark-ml-modeling) supports data preprocessing, feature engineering, and model training in Snowflake using popular machine learning frameworks, such as scikit-learn, xgboost, and lightgbm. This API also includes a preprocessing module that can use compute resources provided by a Snowpark-optimized warehouse to provide scalable data transformations.

Snowpark ML Operations (MLOps), featuring the [Snowpark ML Model Registry](https://docs.snowflake.com/en/developer-guide/snowpark-ml/snowpark-ml-mlops-model-registry), complements the Snowpark ML Development API. The model registry allows secure deployment and management of models in Snowflake, and supports models trained both inside and outside of Snowflake.

In [None]:
image=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/Snowpark4DE.png" , decompress=False).read()

# Display the image
st.subheader("Here are a Snowpark Features commonly used for Data Engineering tasks:")
st.image(image, width=1000)

In [None]:
session.sql("ALTER WAREHOUSE " + session.get_current_warehouse() + " SET WAREHOUSE_SIZE = 'LARGE'").collect()

In [None]:
# Import Snowpark ML: Machine Learning Toolkit for Snowflake
import snowflake.ml.modeling.preprocessing as snowmlpp

# Define our scaler and ordinal encoding functions

# Snowpark ML scaler (MinMaxScaler) is used to shrink data within the given range, usually of 0 to 1. 
# It transforms data by scaling features to a given range. It scales the values to a specific value range without changing the shape of the original distribution.
# For the Tasty_Bytes data, use MinMaxScaler to normalize "CITY_POPULATION" into "CITY_POPULATION_NORM" with values between 0 and 1.

def fit_scaler(session, df):
    mm_target_columns = ["CITY_POPULATION"]
    mm_target_cols_out = ["CITY_POPULATION_NORM"]
    snowml_mms = snowmlpp.MinMaxScaler(input_cols=mm_target_columns, output_cols=mm_target_cols_out)
    snowml_mms.fit(df)
    return snowml_mms

# Snowpark ML ordinal encoding (OE) is used to improve model performance by providing more information to the model about categorical variables. 
# It can help to avoid the problem of ordinality, which can occur when a categorical variable has a natural ordering (e.g. “small”, “medium”, “large”).
# For the Tasty_Bytes data, use OE to change "SHIFT" which is currently AM or PM into and integer representation of "SHIFT_OHE" is 1.0 or 0.0. 

def fit_oe(session, df):
    oe_target_cols = ["SHIFT"]
    oe_output_cols = ["SHIFT_OE"]
    snowml_oe = snowmlpp.OrdinalEncoder(input_cols=oe_target_cols, output_cols=oe_output_cols)
    snowml_oe.fit(df)
    return snowml_oe

In [None]:
# Run Snowpark ML preprocessing functions against our feature data

# For the Tasty_Bytes data, use MinMaxScaler to normalize "CITY_POPULATION" into "CITY_POPULATION_NORM" with values between 0 and 1.
snowml_mms = fit_scaler(session, snowpark_df)
normed_df = snowml_mms.transform(snowpark_df)

# For the Tasty_Bytes data, use OneHotEncoder to change "SHIFT" which is currently AM or PM into and integer representation of "SHIFT_OHE_AM" is 1 or 0 and "SHIFT_OHE_PM" is 1 or 0. 
snowml_oe = fit_oe(session, normed_df)
oe_df = snowml_oe.transform(normed_df)
oe_df.show()

## Filter to Historical Data

Our data includes placeholders for future data with missing shift sales. The future data represents the next 7 days of shifts for all locations. The historical data has shift sales for all locations where a food truck parked during a shift. We will only use historical data when training our model and will filter out the dates where the shift_sales column is missing.



In [None]:
# Data Science best practice: Always perform data quality on your training set e.g. remove nulls or invalid cells as they are the biggest problem in a training set as they output high false positives

# Specifically for Tasty_Bytes data, dates where "shift_sales" are null values reflect future dates where sales need to be predicted.
# Filter out these future dates so these records will not be used in model training. 
historical_df = oe_df.filter(F.col("shift_sales").is_not_null())

## Persist Transformations

If we want to save the changes we can either save it as a table, meaning the SQL generated by the DataFrame is executed and the result is stored in a table or as a view where the DataFrame SQL will be the definition of the view.
**save_as_table** saves the result in a table, if **mode='overwrite'** then it will also replace the data that is in it.



In [None]:
# Let's select 
historical_df.write.save_as_table(table_name='DATASCIENCECOLLEGE.PUBLIC.INPUT_DATA', mode='overwrite')
session.table('DATASCIENCECOLLEGE.PUBLIC.INPUT_DATA').show()

# Part 2 - Use Snowpark to train a model

In [None]:
image=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/Part2.png" , decompress=False).read() 

# Display the image
st.image(image, width=600)

## Drop Columns

Let's return to the original prepared table, with all cities listed, and drop columns that will not be used in the model.



In [None]:
prepared_df = historical_df.drop("location_id", "city_population", "shift", "city", "date")
prepared_df.show()

## Build a simple XGBoost Regression Model on Snowflake

We will now use our training data to train a linear regression model on Snowflake.Recall from above, the two main ways that Snowpark works:

1. Snowpark code translated and executed as SQL on Snowflake
2. Python functions deployed in a secure sandbox in Snowflake

We will be leveraging the deployment of Python functions into Snowflake for training and model deployment.



In [None]:
# Let's define relevant features needed for the prediction model.
LABEL_COLUMNS = ["SHIFT_SALES"]
OUTPUT_COLUMNS = ["PRED_SHIFT_SALES"]
FEATURE_COLUMN_NAMES = ["SHIFT_OE", "CITY_POPULATION_NORM", "MONTH", "DAY_OF_WEEK","LATITUDE","LONGITUDE","AVG_LOCATION_SHIFT_SALES"]

input_df = prepared_df.select(*LABEL_COLUMNS, *FEATURE_COLUMN_NAMES)
input_df.show()

[Snowpark ML Modeling](https://docs.snowflake.com/en/developer-guide/snowpark-ml/modeling) also includes metric calculations such as correlations, and more. We will use the [snowflake.ml.modeling.metrics](https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/index) includes a correlation method on our input dataframe to identify any linearly correlated features to the output. We'll also use matplotlib to plot the resulting matrix. Notice that all of the correlation calculations are pushed down to Snowflake!



In [None]:
import seaborn as sns
import matplotlib.pyplot as plt
from snowflake.ml.modeling.metrics.correlation import correlation
corr_df = correlation(df=input_df)

fig, ax = plt.subplots(10,8)
sns.heatmap(corr_df.corr(), ax=ax, annot=True)

What's great about this, is that we are using a lot of Snowpark components under the hood- the dataframe API, SQL, Python stored procedures and more. But with the new [Snowpark ML API](https://docs.snowflake.com/en/developer-guide/snowpark-ml/reference/latest/index), data scientists can take advantage of all that Snowpark affords them, while using common, familiar APIs that match how they do their work today.

Now that we have our feature data, let's actually fit an XGBoost model to our features to attempt to predict future sales. We'll fit several different models with different hyperparameters, and then show how we can use the Snowpark Model Registry to select our best-fit model.



In [None]:
# Split the data into train and test sets
train_df, test_df = input_df.random_split(weights=[0.9, 0.1], seed=98)

## What's happening when you leverage Snowpark ML Modeling API?

Let's run our training job using the SnowparkML Modeling API- this will push down our model training to run on Snowflake, and you'll notice that the type of the model object returend is a SnowparkML XGBClassifier- this has some benefits, but also is fully compatible with the standard sklearn/xgboost model objects.

- The model.fit() function actually creates a temporary stored procedure in the background. This also means that the model training is a single-node operation. Be sure to use a Snowpark Optimized Warehouse if you need more memory. We are just using an XS Standard Virtual Warehouse here, which we created at the beginning of this quickstart.
- The model.predict() function actually creates a temporary vectorized UDF in the background, which means the input DataFrame is batched as Pandas DataFrames and inference is parallelized across the batches of data. You can check the query history once you execute the following cell to check.

The Snowpark ML Modeling API enables the use of popular Python ML frameworks, such as scikit-learn and XGBoost, for feature engineering and model training without the need to move data out of Snowflake. 

Benefits of Snowpark ML Modeling: 
- **Feature engineering and preprocessing:** Improve performance and scalability with distributed execution for common scikit-learn preprocessing functions. 
- **Model training:** Accelerate model training for scikit-learn, XGBoost and LightGBM models without the need to manually create stored procedures or user-defined functions (UDFs), and leverage distributed hyperparameter optimization.

Behind the scenes, Snowpark ML parallelizes data processing operations by taking advantage of Snowflake’s scalable computing platform.

In [None]:
image=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/DistributedDataProcessing.png" , decompress=False).read() 

# Display the image
st.image(image, width=1000)

In [None]:
from snowflake.ml.modeling.xgboost import XGBRegressor
# Define the XGBRegressor
regressor = XGBRegressor(
    label_cols = LABEL_COLUMNS,
    input_cols = FEATURE_COLUMN_NAMES,
    output_cols = OUTPUT_COLUMNS
)

# Train
_ = regressor.fit(train_df)

# Predict
# result = regressor.predict(test_df)

In [None]:
# Just to illustrate, we can also pass in a Pandas DataFrame to Snowpark ML's model.predict()
regressor.predict(test_df.to_pandas())

In [None]:
# Let's analyze the results using Snowpark ML's MAPE
# Use Snowpark ML metrics to calculate
from snowflake.ml.modeling.metrics import mean_absolute_percentage_error, mean_squared_error

# Predict
results = regressor.predict(test_df)

# Calculate MAPE
mape = mean_absolute_percentage_error(df=results, y_true_col_names=LABEL_COLUMNS, y_pred_col_names=OUTPUT_COLUMNS)

# Calculate MSE
mse = mean_squared_error(df=results, y_true_col_names=LABEL_COLUMNS, y_pred_col_names=OUTPUT_COLUMNS)

results.select([*LABEL_COLUMNS, *OUTPUT_COLUMNS]).show()
print(f'''Mean absolute percentage error: {mape}''')
print(f'''Mean squared error: {mse}''')

In [None]:
# Plot actual vs predicted 
g = sns.relplot(data=results["SHIFT_SALES", "PRED_SHIFT_SALES"].to_pandas().astype("float64"), x="SHIFT_SALES", y="PRED_SHIFT_SALES", kind="scatter")
g.ax.axline((0,0), slope=1, color="r")

## Snowpark ML's GridSearchCV()

Now, let's use Snowpark ML's GridSearchCV() function to find optimal model parameters.



In [None]:
from snowflake.ml.modeling.model_selection import GridSearchCV

grid_search = GridSearchCV(
estimator=XGBRegressor(),
param_grid={
"n_estimators":[25, 50],
"learning_rate":[0.4, 0.5],
},
n_jobs = -1,
scoring="neg_mean_absolute_percentage_error",
input_cols=FEATURE_COLUMN_NAMES,
label_cols=LABEL_COLUMNS,
output_cols=OUTPUT_COLUMNS
)
_ = grid_search.fit(train_df)

In [None]:
import warnings

# Suppress specific FutureWarning
warnings.simplefilter(action='ignore', category=FutureWarning)

# Let's analyze the grid_search results
gs_results = grid_search.to_sklearn().cv_results_
n_estimators_val = []
learning_rate_val = []
for param_dict in gs_results["params"]:
    n_estimators_val.append(param_dict["n_estimators"])
    learning_rate_val.append(param_dict["learning_rate"])
mape_val = gs_results["mean_test_score"] * -1

gs_results_df = pd.DataFrame(data={
    "n_estimators": n_estimators_val,
    "learning_rate": learning_rate_val,
    "mape": mape_val
})

# Convert inf values to NaN
gs_results_df.replace([float('inf'), float('-inf')], pd.NA, inplace=True)

# Plot the results
g2 = sns.relplot(data=gs_results_df, x="learning_rate", y="mape", hue="n_estimators", kind="line")

In [None]:
# Let's save our optimal model and its metadata:
optimal_model = grid_search.to_sklearn().best_estimator_
optimal_n_estimators = grid_search.to_sklearn().best_estimator_.n_estimators
optimal_learning_rate = grid_search.to_sklearn().best_estimator_.learning_rate


optimal_mape = gs_results_df.loc[(gs_results_df['n_estimators']==optimal_n_estimators) &
                        (gs_results_df['learning_rate']==optimal_learning_rate),'mape'].values[0]

print(optimal_model)
print(optimal_mape)
print(optimal_n_estimators)
print(optimal_learning_rate)

## **Scale down your assigned Snowflake compute warehouse.**



In [None]:
# Decrease size of Snowflake compute warehouse to XSMALL
session.sql("ALTER WAREHOUSE " + session.get_current_warehouse() + " SET WAREHOUSE_SIZE = 'XSMALL'").collect()

# Part 3 - Use Snowpark for MLOps

In [None]:
image=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/Part3.png" , decompress=False).read() 

# Display the image
st.image(image, width=1000)

## Now let's use Snowflake Model Registry (Snowpark ML Ops)

Model Registry was created to support model management operations including model registration, versioning, metadata and audit trails. Integrated deployment infrastructure for batch inference is a critical ease-of-use feature. Users can deploy ML models for batch inference from the registry directly into a Snowflake Warehouse as a vectorized UDF, or as a service to a customer-specified Compute Pool in Snowpark Container Services.

Snowflake's Model Registry supports SciKitLearn, XGBoost, Pytorch, Tensorflow and MLFlow (via the pyfunc interface) models.

Model Registry allows easy deployment of pre-trained open-source models from providers such as HuggingFace. See this [blog](https://www.snowflake.com/blog/accelerate-ml-workflow-python-snowpark-ml/) for more detailsor the [Snowflake Model Registry documentation](https://docs.snowflake.com/en/developer-guide/snowpark-ml/snowpark-ml-mlops-model-registry)

In [None]:
image=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/ModelRegistry.png" , decompress=False).read() 

# Display the image
st.image(image, width=1000)

In [None]:
# Create the Model Registry and register your initial model
from snowflake.ml.registry import Registry

registry = Registry(session=session, database_name="DATASCIENCECOLLEGE", schema_name="PUBLIC")
model_name = "SHIFT_SALES_PREDICTION"

In [None]:
 # Here is the code to get the latest version of the model named "SHIFT_SALES_PREDICTION" within the Snowflake Model Registry
def get_next_model_version_to_be_registered(model_name:str):
    def remove_char(lst, char):
        import re
        from functools import reduce
        return reduce(lambda x, y: x + [int(re.sub(char, '', y))], lst, [])
    model_version_to_be_registered = 0
    try:
        m = registry.get_model(model_name)
        model_version_to_be_registered = max(remove_char(m.show_versions()['name'].tolist(),'V'))+1
    except:
        pass
    return model_version_to_be_registered

In [None]:
# Log the model to the registry
model_ver = registry.log_model(
    model_name=model_name,
    version_name=f'V{get_next_model_version_to_be_registered(model_name)}',
    conda_dependencies=["xgboost"],
    model=regressor,
    comment="This is the initial model of the Shift Sales Price Prediction model.",
    sample_input_data=train_df.drop("SHIFT_SALES"),
    options={'relax_version': False})

In [None]:
# Log the optimal model to the registry
model_ver2 = registry.log_model(
    model_name=model_name,
    version_name=f'V{get_next_model_version_to_be_registered(model_name)}',
    model=optimal_model,
    comment="This is the optimal model Shift Sales Price Prediction model developed using hyperparameter optimization.",
    conda_dependencies=["xgboost"],
    sample_input_data=train_df.drop("SHIFT_SALES"),
    options={'relax_version': False}
)

# Add evaluation metric
model_ver.set_metric(metric_name="mean_abs_pct_err", value=optimal_mape)

In [None]:
# Let's confirm model(s) that were added
registry.get_model(model_name).show_versions()

In [None]:
# We can see what the default model is when we have multiple versions with the same model name:
registry.get_model(model_name).default.version_name

In [None]:
# Now we can use the default version model to perform inference.
model_ver = registry.get_model(model_name).version('V0')
result_sdf = model_ver.run(test_df, function_name="predict")
result_sdf.show()

## View Registered Models

Let's look at the Snowflake Model Registry UI. It is a unified place in Snowsight UI where all models registered in the Model Registry can be found and explored. 

To navigate to the Model Registry, click on “AI & ML” in the left-nav, and select “Models”. This will display all available models in the Model Registry across all DBs/Schemas that your role has access to.

Currently, Model Registry will display user-created models from Snowpark ML, Snowpark, or models sourced from external ML platforms that are registered in Snowflake. In the future, Model Registry will also support other model types such as Cortex ML, Doc AI, Cortex Fine Tuning etc. 

To explore the details of a model, click on the corresponding row in the listing. This will open the details page for the selected model as shown below. 

Key model details such as description, any tags that are applied to the model, and a table of all versions of the model are displayed. 

Model Version metadata such as metrics can be found here. This page also shows the available model methods for invocation along with the inputs and outputs of those functions.


In [None]:
image1=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/ModelRegistered.png" , decompress=False).read() 
image2=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/Model.png" , decompress=False).read() 

# Display the image
st.image(image1, width=1000)
st.image(image2, width=1000)


Now that our model is built and deployed, let's see it in action! We will find the best place to park in Vancouver for tomorrow morning's shift.

In [None]:
# Check model predictions for holdout data SHIFT_SALES predictions for Location_IDs in Vancouver
date_tomorrow_df = oe_df.filter(
    (F.col("shift_sales").isNull())
    & (F.col("shift_oe") == 1)
    & (F.col("city") == "Vancouver")
)
result_sdf = regressor.predict(date_tomorrow_df)
result_sdf.show()

## Visualize on a Map

The red and yellow areas indicate higher predicted sales locations and the green zones indicate lower predicted sales. We will use this insight to ensure that our drivers are parking at the high-value locations. Value: Updated predictions readily available to drive towards our corporate goals.



In [None]:
# Pull location predictions into a pandas DataFrame
predictions_df = result_sdf.to_pandas()
predictions_df.head()

# Visualize on a map
st.map(predictions_df)

# Part 4 - Create a SiS application to use predicted outputs

Create a SiS application for local managers to identify where to place daily food trucks. 

See completed [SiS App](https://app.snowflake.com/sfsenorthamerica/sfhol/#/streamlit-apps/HOL.SCHEMA0.ZXGOF2KZL26CGAA8?ref=snowsight_shared) using role=PUBLIC



In [None]:
image1=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/Part4.png" , decompress=False).read() 
image2=session.file.get_stream("@DATASCIENCECOLLEGE.PUBLIC.ASSETS/SiSapp.png" , decompress=False).read() 

# Display the image
st.image(image1, width=600)

st.subheader("Here's a picture of the running SiS app:")
st.image(image2, width=600)

- Create new SiS app.
- Replace existing code with SiS_application.py code listed below.
- Update schema variable in line 29.
- Include required pydeck and snowflake-ml-python package.
- Run the SiS application.

```
# Import Python packages
import streamlit as st
import pydeck as pdk
import numpy as np

# Import Snowflake modules
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
from snowflake.snowpark.context import get_active_session
from snowflake.ml.registry import Registry
import snowflake.ml.modeling.preprocessing as snowmlpp
from snowflake.ml.feature_store import FeatureStore

# Set Streamlit page config
st.set_page_config(
    page_title="Streamlit App: Snowpark 101", 
    page_icon=":truck:",
    layout="wide",
)

# Add header and a subheader
st.header("Predicted Shift Sales by Location")
st.subheader("Data-driven recommendations for food truck drivers.")

# Connect to Snowflake
session = get_active_session()

# Create input widgets for cities and shift
with st.container():
    col1, col2 = st.columns(2)
    with col1:
        # Drop down to select city
        city = st.selectbox(
            "City:",
            session.table(f"DATASCIENCECOLLEGE.PUBLIC.SHIFT_SALES")
            .select("city")
            .distinct()
            .sort("city"),
        )

    with col2:
        # Select AM/PM Shift
        shift = st.radio("Shift:", ("AM", "PM"), horizontal=True)

    n_trucks = st.selectbox('How many food trucks would you like to schedule today?', np.arange(1,10))

    if n_trucks > 1:
        range = st.slider('What is the minimum distance in kilometers between food trucks?', 0, 20, 1)
        st.write('You are requesting a minimum distance of ', range, 'km')
        st.write('Click **:blue[Update]** to get the ', n_trucks, ' highest predicted Shift_Sales food truck locations.')
    else:
        st.write('Click **:blue[Update]** to get one food truck location predicted to have the Shift_Sales')
        
# Get predictions for city and shift time
def get_predictions(city, shift):
    # Connect to the Feature Store
    fs = FeatureStore(
        session=session,
        database="DATASCIENCECOLLEGE",
        name="PUBLIC"
    )
    
    # Retrieve the feature view from the Feature Store
    feature_view = fs.get_feature_view(
        name="AGGREGATE_WINDOW_FV", 
        version="1"
    )
    
    # Get data and filter by city and shift
    snowpark_df = feature_view.to_snowpark_df().filter((F.col("SHIFT") == shift) & (F.col("CITY") == city))

    # Get tomorrow's date
    date_tomorrow = (
        snowpark_df.filter(F.col("SHIFT_SALES").is_null())
        .select(F.min("DATE"))
        .collect()[0][0]
    )

    # Filter to tomorrow's date
    snowpark_df = snowpark_df.filter(F.col("DATE") == date_tomorrow)

    # Impute
    snowpark_df = snowpark_df.fillna(value=0, subset=["AVG_LOCATION_SHIFT_SALES"])

    for colname in snowpark_df.columns:
        new_colname = str.upper(colname)
        snowpark_df = snowpark_df.with_column_renamed(colname, new_colname)

    # Encode
    snowpark_df = snowpark_df.with_column("SHIFT_OE", F.iff(F.col("SHIFT") == "AM", 0, 1))\
    .with_column("SHIFT_OE", F.iff(F.col("SHIFT") == "PM", 1, 0))

    # Scale
    mm_target_columns = ["CITY_POPULATION"]
    mm_target_cols_out = ["CITY_POPULATION_NORM"]
    snowml_mms = snowmlpp.MinMaxScaler(input_cols=mm_target_columns, 
                                       output_cols=mm_target_cols_out)
    snowml_mms.fit(snowpark_df)
    snowpark_df = snowml_mms.transform(snowpark_df)

    # Get all features
    feature_cols = ["SHIFT_OE", 
                    "CITY_POPULATION_NORM", 
                    "MONTH", 
                    "DAY_OF_WEEK",
                    "LATITUDE",
                    "LONGITUDE",
                    "AVG_LOCATION_SHIFT_SALES",
                    "LOCATION_ID"]

    snowpark_df = snowpark_df.select(feature_cols)
    native_registry = Registry(session=session, database_name="DATASCIENCECOLLEGE", schema_name="PUBLIC")
    model_ver = native_registry.get_model("SHIFT_SALES_PREDICTION").version('v0')
    result_sdf = model_ver.run(snowpark_df, function_name="predict")
    return result_sdf

# Update predictions and plot when the "Update" button is clicked
if st.button(":blue[Update]"):
    # Get predictions
    with st.spinner("Getting predictions..."):
        predictions_sdf = get_predictions(city, shift)
        predictions = predictions_sdf.to_pandas()

    # Plot on a map
    st.subheader("Predicted Shift Sales for position")
    predictions["PRED_SHIFT_SALES"].clip(0, inplace=True)
    st.pydeck_chart(
        pdk.Deck(
            map_style=None,
            initial_view_state=pdk.ViewState(
                latitude=predictions["LATITUDE"][0],
                longitude=predictions["LONGITUDE"][0],
                zoom=11,
                pitch=50,
            ),
            layers=[
                pdk.Layer(
                    "HexagonLayer",
                    data=predictions,
                    get_position="[LONGITUDE, LATITUDE]",
                    radius=200,
                    elevation_scale=4,
                    elevation_range=[0, 1000],
                    pickable=True,
                    extruded=True,
                ),
                pdk.Layer(
                    "ScatterplotLayer",
                    data=predictions,
                    get_position="[LONGITUDE, LATITUDE]",
                    get_color="[200, 30, 0, 160]",
                    get_radius=200,
                ),
            ],
        )
    )

    max_x = predictions.loc[predictions["PRED_SHIFT_SALES"].idxmax()]
    st.write("Maximum Predicted Sales are expected at the following location:", max_x)

    #st.dataframe(predictions_sdf)
    location_id = max_x["LOCATION_ID"]
    lat = max_x["LATITUDE"]
    long = max_x["LONGITUDE"]
    st.subheader("The following chart is generated using the st_point and st_distance Snowflake Geospatial features")

    if n_trucks == 1:
        st.write("Have your only food truck positioned at Location ID ", location_id, " to maximize SHIFT_SALES")
    elif n_trucks > 1:
        best_locations = [location_id]
        available_locations_sdf = predictions_sdf
        st_distance = F.function('st_distance')
        st_point = F.function('st_point')
        for truck_n in np.arange(0,n_trucks - 1):
            available_locations_sdf = available_locations_sdf.with_column("DISTANCE_TO_TRUCK", 
                                        st_distance(
                                            st_point(F.lit(float(long)), F.lit(float(lat))),
                                            st_point(F.col("LONGITUDE"), F.col("LATITUDE"))
                                        )/1609
                                       ).filter(F.col("DISTANCE_TO_TRUCK") >= range/1.609).order_by("PRED_SHIFT_SALES", ascending=False)
            max_x = available_locations_sdf.limit(1).to_pandas()
            try:
                location_id = max_x["LOCATION_ID"].iloc[0]
                lat = max_x["LATITUDE"].iloc[0]
                long = max_x["LONGITUDE"].iloc[0]
            except:
                break
            best_locations.append(location_id)

        selected_locations = predictions[predictions["LOCATION_ID"].isin(best_locations)]
        st.map(selected_locations)
        st.dataframe(selected_locations)
```