# Data Engineering Pipelines with Snowpark Python


Are you interested in unleashing the power of Snowpark Python to build data engineering pipelines? Well then, this Hands-on Lab is for you! The focus here will be on building data engineering pipelines with Python, and not on data science. For examples of doing data science with Snowpark Python please check out our [Machine Learning with Snowpark Python: - Credit Card Approval Prediction](https://quickstarts.snowflake.com/guide/getting_started_snowpark_machine_learning/index.html?index=..%2F..index#0) Quickstart.

This Lab is a modified version of [This Quickstart](https://quickstarts.snowflake.com/guide/data_engineering_pipelines_with_snowpark_python/index.html) and captures all the content of the quickstart in this convenient Snowflake Notebook directly in your Snowflake account. 

Run the next cell to see a visual overview of what we're going to build:

In [None]:
import streamlit as st
st.image("https://raw.githubusercontent.com/Snowflake-Labs/sfguide-data-engineering-with-snowpark-python/main/images/demo_overview.png",width=800)


### Prerequisites
* This lab assumes at least introductory experience with Python and Snowflake**A Snowflake Account**
* **A Snowflake user created with ACCOUNTADMIN permissions**. This user will be used to get things setup in Snowflake.
* **Anaconda Terms & Conditions accepted**. See Getting Started section in [Third-Party Packages](https://docs.snowflake.com/en/developer-guide/udf/python/udf-python-packages.html#getting-started).

### What You’ll Learn
You will learn about the following Snowflake features during this Quickstart:

* Snowflake's Table Format
* Data ingestion with COPY
* Schema inference
* Data sharing/marketplace (instead of ETL)
* Streams for incremental processing (CDC)
* Streams on views
* Python UDFs (with third-party packages)
* Python Stored Procedures
* Snowpark DataFrame API
* Snowpark Python programmability
* Warehouse elasticity (dynamic scaling)
* SnowCLI (PuPr)
* Tasks (with Stream triggers)
* Task Observability

### What You’ll Build
During this Quickstart you will accomplish the following things:

* Load Parquet data to Snowflake using schema inference
* Setup access to Snowflake Marketplace data
* Create a Python UDF to convert temperature
* Create a data engineering pipeline with Python stored procedures to incrementally process data
* Orchestrate the pipelines with tasks
* Monitor the pipelines with Snowsight
* Deploy the Snowpark Python stored procedures via a CI/CD pipeline

### Infographic explaining the [Tasty Bytes Food Truck Business](https://quickstarts.snowflake.com/guide/tasty_bytes_introduction/img/a51f501137dea3c5.png)


## Setup Script
**Important:** *Read through the next two cells to understand the objects involved in the Lab*

Once you've read the code, run it by clicking the 'play' button in the top right corner of each cell, or press cmd/ctrl+Enter on your keyboard while the cell is active

In [None]:
-- ----------------------------------------------------------------------------
-- Create the account level objects
-- ----------------------------------------------------------------------------
USE ROLE ACCOUNTADMIN;

-- Roles
SET MY_USER = CURRENT_USER(); 
CREATE OR REPLACE ROLE HOL_ROLE;
GRANT ROLE HOL_ROLE TO ROLE SYSADMIN;
GRANT ROLE HOL_ROLE TO USER IDENTIFIER($MY_USER);

CREATE STAGE IF NOT EXISTS SCRIPTS 
	DIRECTORY = ( ENABLE = true );

GRANT EXECUTE TASK ON ACCOUNT TO ROLE HOL_ROLE;
GRANT MONITOR EXECUTION ON ACCOUNT TO ROLE HOL_ROLE;
GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE HOL_ROLE;

-- Databases
CREATE OR REPLACE DATABASE NB_HOL_DB;
GRANT OWNERSHIP ON DATABASE NB_HOL_DB TO ROLE HOL_ROLE;

-- Warehouses
CREATE OR REPLACE WAREHOUSE HOL_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE;
GRANT OWNERSHIP ON WAREHOUSE HOL_WH TO ROLE HOL_ROLE;

In [None]:
-- ----------------------------------------------------------------------------
-- Create the database level objects
-- ----------------------------------------------------------------------------
USE ROLE HOL_ROLE;
USE WAREHOUSE HOL_WH;
USE DATABASE NB_HOL_DB;

-- Schemas
CREATE OR REPLACE SCHEMA EXTERNAL;
CREATE OR REPLACE SCHEMA RAW_POS;
CREATE OR REPLACE SCHEMA RAW_CUSTOMER;
CREATE OR REPLACE SCHEMA HARMONIZED;
CREATE OR REPLACE SCHEMA ANALYTICS;

-- External Frostbyte objects
USE SCHEMA EXTERNAL;
CREATE OR REPLACE FILE FORMAT PARQUET_FORMAT
    TYPE = PARQUET
    COMPRESSION = SNAPPY;
    
CREATE OR REPLACE STAGE FROSTBYTE_RAW_STAGE
    URL = 's3://sfquickstarts/data-engineering-with-snowpark-python/';

-- ANALYTICS objects
USE SCHEMA ANALYTICS;

CREATE OR REPLACE FUNCTION ANALYTICS.INCH_TO_MILLIMETER_UDF(INCH NUMBER(35,4))
RETURNS NUMBER(35,4)
    AS
$$
    inch * 25.4
$$;

### Load Raw 

During this step we will be loading the raw Tasty Bytes POS and Customer loyalty data from raw Parquet files in `s3://sfquickstarts/data-engineering-with-snowpark-python/` to our `RAW_POS` and `RAW_CUSTOMER` schemas in Snowflake. To put this in context, we are on step **#2** in our data flow overview:


In [None]:
import streamlit as st
st.image("https://raw.githubusercontent.com/Snowflake-Labs/sfguide-data-engineering-with-snowpark-python/main/images/demo_overview.png",width=800)

The following cell contains three functions; two to load raw data, and one to validate the raw tables.

In [None]:
import time
from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
#import snowflake.snowpark.functions as F


POS_TABLES = ['country', 'franchise', 'location', 'menu', 'truck', 'order_header', 'order_detail']
CUSTOMER_TABLES = ['customer_loyalty']
TABLE_DICT = {
    "pos": {"schema": "RAW_POS", "tables": POS_TABLES},
    "customer": {"schema": "RAW_CUSTOMER", "tables": CUSTOMER_TABLES}
}

# SNOWFLAKE ADVANTAGE: Schema detection
# SNOWFLAKE ADVANTAGE: Data ingestion with COPY
# SNOWFLAKE ADVANTAGE: Snowflake Tables (not file-based)

def load_raw_table(session, tname=None, s3dir=None, year=None, schema=None):
    session.use_schema(schema)
    if year is None:
        location = "@external.frostbyte_raw_stage/{}/{}".format(s3dir, tname)
    else:
        print('\tLoading year {}'.format(year)) 
        location = "@external.frostbyte_raw_stage/{}/{}/year={}".format(s3dir, tname, year)
    
    # we can infer schema using the parquet read option
    df = session.read.option("compression", "snappy") \
                            .parquet(location)
    df.copy_into_table("{}".format(tname))

# SNOWFLAKE ADVANTAGE: Warehouse elasticity (dynamic scaling)

def load_all_raw_tables(session):
    _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

    for s3dir, data in TABLE_DICT.items():
        tnames = data['tables']
        schema = data['schema']
        for tname in tnames:
            print("Loading {}".format(tname))
            # Only load the first 3 years of data for the order tables at this point
            # We will load the 2022 data later in the lab
            if tname in ['order_header', 'order_detail']:
                # For testing, use only records from 2019. (Un)comment to include years.
                for year in ['2019']:#, '2020', '2021']: 
                    load_raw_table(session, tname=tname, s3dir=s3dir, year=year, schema=schema)
            else:
                load_raw_table(session, tname=tname, s3dir=s3dir, schema=schema)

    _ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL").collect()

def validate_raw_tables(session):
    # check column names from the inferred schema
    for tname in POS_TABLES:
        print('{}: \n\t{}\n'.format(tname, session.table('RAW_POS.{}'.format(tname)).columns))

    for tname in CUSTOMER_TABLES:
        print('{}: \n\t{}\n'.format(tname, session.table('RAW_CUSTOMER.{}'.format(tname)).columns))

In [None]:
from snowflake.snowpark.context import get_active_session
session = get_active_session()

In [None]:
# Call the function created above to load all raw tables
# Monitor the output and wait until the cell is finished running before continuing
load_all_raw_tables(session)

In [None]:
# Once raw data is loaded, validate the customer and POS table with the function created previously
validate_raw_tables(session)

### Viewing What Happened in Snowflake
---
#### `ACTION:` Duplicate this browser tab (or open a new window and navigate to your account URL) and take a quick look at the query history in the left pane. You can see the SQL that was generated by the Snowpark API. This will help you better understand what the API is doing and will help you debug any issues you may run into.
---
The [Query History](https://docs.snowflake.com/en/user-guide/ui-snowsight-activity.html#query-history) in Snowflake is a powerful feature. It logs every query run against your Snowflake account, no matter which tool or process initiated it. And this is especially helpful when working with client tools and APIs.

The Python script you just ran did a small amount of work locally, basically just orchestrating the process by looping through each table and issuing the command to Snowflake to load the data. But all of the heavy lifting ran inside Snowflake! This push-down is a hallmark of the Snowpark API and allows you to leverage the scalability and compute power of Snowflake.


### Schema Inference
One very helpful feature in Snowflake is the ability to infer the schema of files in a stage that you wish to work with. This is accomplished in SQL with the [`INFER_SCHEMA()`](https://docs.snowflake.com/en/sql-reference/functions/infer_schema.html) function. The Snowpark Python API does this for you automatically when you call the `session.read()` method. **Can** you find the session.read() method in the code we just ran?

### Data Ingestion with COPY
In order to load the data into a Snowflake table we used the `copy_into_table()` method on a DataFrame. This method creates the target table in Snowflake using the inferred schema (if it doesn't exist), and then calls the highly optimized Snowflake `COPY INTO TABLE;` [Command](https://docs.snowflake.com/en/sql-reference/sql/copy-into-table.html).

### Snowflake's Table Format
One of the major advantages of Snowflake is being able to eliminate the need to manage a file-based data lake. And Snowflake was designed for this purpose from the beginning. In the next step we are loading the raw data into a structured Snowflake managed table. But Snowflake tables can natively support structured and semi-structured data, and are stored in Snowflake's mature cloud table format.

### Warehouse Elasticity (Dynamic Scaling)
With Snowflake there is only one type of user-defined compute cluster, the [Virtual Warehouse](https://docs.snowflake.com/en/user-guide/warehouses.html), regardless of the language you use to process that data (SQL, Python, Java, Scala, Javascript, etc.). This makes working with data much simpler in Snowflake.

These virtual warehouses can be instantly resized -- directly within your code -- to increase the capacity and run a section of code in a fraction of the time, then dynamically resized again to reduce the amount of capacity.

**Here is this code snippet as an example of how this works:**

```python
_ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

# Some data processing code

_ = session.sql("ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL").collect()
```

Note the `WAIT_FOR_COMPLETION` parameter in the first `ALTER WAREHOUSE` statement. Setting this parameter to `TRUE` will block the return of the `ALTER WAREHOUSE` command until the resize has finished provisioning all its compute resources. This way we make sure that the full cluster is available before processing any data with it.

We will use this pattern a few more times during this Lab, so it's important to understand.


## Load Weather


During this step we will be "loading" the raw weather data to Snowflake. But "loading" is really the wrong word here. Because we're using Snowflake's unique data sharing capability we don't actually need to copy the data to our Snowflake account. Instead we can directly access the data shared by Weather Source in the Snowflake Data Marketplace. To put this in context, we are on step **#3** in our data flow overview. 


### Snowflake Data Marketplace
Let's connect to the `Weather Source LLC: frostbyte` feed from Weather Source in the Snowflake Data Marketplace by following these steps:

* Login to Snowsight again in another browser tab
* Click on the `Marketplace` link in the left navigation bar
* Enter "Weather Source LLC: frostbyte" in the search box and click return
* Click on the "Weather Source LLC: frostbyte" listing tile
* Click the blue "Get" button
    * Expand the "Options" dialog
    * Change the "Database name" to read "FROSTBYTE_WEATHERSOURCE" (all capital letters)
    * Select the "HOL_ROLE" role to have access to the new database
* Click on the blue "Get" button

That's it... we don't have to do anything from here to keep this data updated. The provider will do that for us and data sharing means we are always seeing whatever they have published. How amazing is that? Just think of all the things you didn't have do here to get access to an always up-to-date, third-party dataset!

In [None]:
-- Make sure you grant the table privileges to HOL_ROLE for this to run
USE ROLE ACCOUNTADMIN;
GRANT IMPORTED PRIVILEGES ON DATABASE IDENTIFIER('"FROSTBYTE_WEATHERSOURCE"') TO ROLE IDENTIFIER('"HOL_ROLE"');
USE ROLE HOL_ROLE;
-- Preview Weathersource data
SELECT * FROM FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES LIMIT 100; 

## Create POS View

During this step we will be creating a view to simplify the raw POS schema by joining together 6 different tables and picking only the columns we need. But what's really cool is that we're going to define that view with the Snowpark DataFrame API! Then we're going to create a Snowflake stream on that view so that we can incrementally process changes to any of the POS tables. To put this in context, we are on step **#4** in our data flow overview.

### Run the Script
*To create the view and stream, execute the following code:*

In [None]:
from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F

def create_pos_view(session):
    session.use_schema('HARMONIZED')
    order_detail = session.table("RAW_POS.ORDER_DETAIL").select(F.col("ORDER_DETAIL_ID"), \
                                                                F.col("LINE_NUMBER"), \
                                                                F.col("MENU_ITEM_ID"), \
                                                                F.col("QUANTITY"), \
                                                                F.col("UNIT_PRICE"), \
                                                                F.col("PRICE"), \
                                                                F.col("ORDER_ID"))
    order_header = session.table("RAW_POS.ORDER_HEADER").select(F.col("ORDER_ID"), \
                                                                F.col("TRUCK_ID"), \
                                                                F.col("ORDER_TS"), \
                                                                F.to_date(F.col("ORDER_TS")).alias("ORDER_TS_DATE"), \
                                                                F.col("ORDER_AMOUNT"), \
                                                                F.col("ORDER_TAX_AMOUNT"), \
                                                                F.col("ORDER_DISCOUNT_AMOUNT"), \
                                                                F.col("LOCATION_ID"), \
                                                                F.col("ORDER_TOTAL"))
    truck = session.table("RAW_POS.TRUCK").select(F.col("TRUCK_ID"), \
                                                F.col("PRIMARY_CITY"), \
                                                F.col("REGION"), \
                                                F.col("COUNTRY"), \
                                                F.col("FRANCHISE_FLAG"), \
                                                F.col("FRANCHISE_ID"))
    menu = session.table("RAW_POS.MENU").select(F.col("MENU_ITEM_ID"), \
                                                F.col("TRUCK_BRAND_NAME"), \
                                                F.col("MENU_TYPE"), \
                                                F.col("MENU_ITEM_NAME"))
    franchise = session.table("RAW_POS.FRANCHISE").select(F.col("FRANCHISE_ID"), \
                                                        F.col("FIRST_NAME").alias("FRANCHISEE_FIRST_NAME"), \
                                                        F.col("LAST_NAME").alias("FRANCHISEE_LAST_NAME"))
    location = session.table("RAW_POS.LOCATION").select(F.col("LOCATION_ID"))

    t_with_f = truck.join(franchise, truck['FRANCHISE_ID'] == franchise['FRANCHISE_ID'], rsuffix='_f')
    oh_w_t_and_l = order_header.join(t_with_f, order_header['TRUCK_ID'] == t_with_f['TRUCK_ID'], rsuffix='_t') \
                                .join(location, order_header['LOCATION_ID'] == location['LOCATION_ID'], rsuffix='_l')
    final_df = order_detail.join(oh_w_t_and_l, order_detail['ORDER_ID'] == oh_w_t_and_l['ORDER_ID'], rsuffix='_oh') \
                            .join(menu, order_detail['MENU_ITEM_ID'] == menu['MENU_ITEM_ID'], rsuffix='_m')
    final_df = final_df.select(F.col("ORDER_ID"), \
                            F.col("TRUCK_ID"), \
                            F.col("ORDER_TS"), \
                            F.col('ORDER_TS_DATE'), \
                            F.col("ORDER_DETAIL_ID"), \
                            F.col("LINE_NUMBER"), \
                            F.col("TRUCK_BRAND_NAME"), \
                            F.col("MENU_TYPE"), \
                            F.col("PRIMARY_CITY"), \
                            F.col("REGION"), \
                            F.col("COUNTRY"), \
                            F.col("FRANCHISE_FLAG"), \
                            F.col("FRANCHISE_ID"), \
                            F.col("FRANCHISEE_FIRST_NAME"), \
                            F.col("FRANCHISEE_LAST_NAME"), \
                            F.col("LOCATION_ID"), \
                            F.col("MENU_ITEM_ID"), \
                            F.col("MENU_ITEM_NAME"), \
                            F.col("QUANTITY"), \
                            F.col("UNIT_PRICE"), \
                            F.col("PRICE"), \
                            F.col("ORDER_AMOUNT"), \
                            F.col("ORDER_TAX_AMOUNT"), \
                            F.col("ORDER_DISCOUNT_AMOUNT"), \
                            F.col("ORDER_TOTAL"))
    final_df.create_or_replace_view('POS_FLATTENED_V')

def create_pos_view_stream(session):
    session.use_schema('HARMONIZED')
    _ = session.sql('CREATE OR REPLACE STREAM POS_FLATTENED_V_STREAM \
                        ON VIEW POS_FLATTENED_V \
                        SHOW_INITIAL_ROWS = TRUE').collect()

def test_pos_view(session):
    session.use_schema('HARMONIZED')
    tv = session.table('POS_FLATTENED_V')
    tv.limit(5).show()

In [None]:
create_pos_view(session)

In [None]:
create_pos_view_stream(session)

### Snowpark DataFrame API
You'll notice in the `create_pos_view()` function that we define the Snowflake view using the Snowpark DataFrame API. After defining the final DataFrame, which captures all the logic we want in the view, we can simply call the Snowpark `create_or_replace_view()` method. 

For more details about the Snowpark Python DataFrame API, please check out our [Working with DataFrames in Snowpark Python](https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes.html) page.

### Streams for Incremental Processing (CDC)
Snowflake makes processing data incrementally very easy. Traditionally the data engineer had to keep track of a high watermark (usually a datetime column) in order to process only new records in a table. This involved tracking and persisting that watermark somewhere and then using it in any query against the source table. But with Snowflake streams all the heavy lifting is done for you by Snowflake. For more details please check out our [Change Tracking Using Table Streams](https://docs.snowflake.com/en/user-guide/streams.html) user guide.

All you need to do is create a [`STREAM`](https://docs.snowflake.com/en/sql-reference/sql/create-stream.html) object in Snowflake against your base table or view, then query that stream just like any table in Snowflake. The stream will return only the changed records since the last DML option your performed. To help you work with the changed records, Snowflake streams will supply the following metadata columns along with the base table or view columns:

* METADATA$ACTION
* METADATA$ISUPDATE
* METADATA$ROW_ID

For more details about these stream metadata columns please check out the [Stream Columns](https://docs.snowflake.com/en/user-guide/streams-intro.html#stream-columns) section in our documentation.

### Streams on views
What's really cool about Snowflake's incremental/CDC stream capability is the ability to create a stream on a view! In this example we are creating a stream on a view which joins together 6 of the raw POS tables. Here is the code to do that:

```python
def create_pos_view_stream(session):
    session.use_schema('HARMONIZED')
    _ = session.sql('CREATE OR REPLACE STREAM POS_FLATTENED_V_STREAM \
                        ON VIEW POS_FLATTENED_V \
                        SHOW_INITIAL_ROWS = TRUE').collect()
```

Now when we query the `POS_FLATTENED_V_STREAM` stream to find changed records, Snowflake is actually looking for changed records in any of the 6 tables included in the view. For those who have tried to build incremental/CDC processes around denormalized schemas like this, you will appreciate the incredibly powerful feature that Snowflake provides here.

For more details please check out the [Streams on Views](https://docs.snowflake.com/en/user-guide/streams-intro.html#streams-on-views) section in our documentation.


## Fahrenheit to Celsius UDF

During this step we will be creating and deploying a Snowpark Python user-defined function (UDF). The UDF provides a reusable calculation to simplify and standardize across users and pipelines, providing consistency and reducing development effort.

To put this in context, we are on step **#5** in our data flow overview.

In [None]:
CREATE OR REPLACE FUNCTION NB_HOL_DB.ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF("temp_f" FLOAT)
RETURNS FLOAT
LANGUAGE PYTHON
RUNTIME_VERSION=3.8
HANDLER = 'main'
AS '
def main(temp_f: float) -> float:
    return (float(temp_f) - 32) * (5/9)
';

### Running the UDF in Snowflake
In order to run the UDF in Snowflake you have a few options. Any UDF in Snowflake can be invoked inline through SQL as follows, and later we'll use it as part of our pipeline:

In [None]:
SELECT NB_HOL_DB.ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF(35);

## Orders Update Stored Procedure

During this step we will be creating and deploying our first Snowpark Python stored procedure (or sproc) to Snowflake. This sproc will merge changes from the `HARMONIZED.POS_FLATTENED_V_STREAM` stream into our target `HARMONIZED.ORDERS` table. To put this in context, we are on step **#6** in our data flow overview.

### TESTING the Sproc Locally
First, to test the procedure locally, we will write the literal code inline, and then call the python method directly here in our notebook.

So go ahead and define the python methods:

In [None]:

import time
from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def table_exists(session, schema='', name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS']
    return exists

def create_orders_table(session):
    _ = session.sql("CREATE TABLE HARMONIZED.ORDERS LIKE HARMONIZED.POS_FLATTENED_V").collect()
    _ = session.sql("ALTER TABLE HARMONIZED.ORDERS ADD COLUMN META_UPDATED_AT TIMESTAMP").collect()

def create_orders_stream(session):
    _ = session.sql("CREATE STREAM HARMONIZED.ORDERS_STREAM ON TABLE HARMONIZED.ORDERS").collect()

def merge_order_updates(session):
    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect()

    source = session.table('HARMONIZED.POS_FLATTENED_V_STREAM')
    target = session.table('HARMONIZED.ORDERS')
    print("{} records in stream".format(session.table('HARMONIZED.POS_FLATTENED_V_STREAM').count()))

    # TODO: Is the if clause supposed to be based on "META_UPDATED_AT"?
    cols_to_update = {c: source[c] for c in source.schema.names if "METADATA" not in c}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    updates = {**cols_to_update, **metadata_col_to_update}

    # merge into DIM_CUSTOMER
    target.merge(source, target['ORDER_DETAIL_ID'] == source['ORDER_DETAIL_ID'], \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])

    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()

def main(session: Session) -> str:
    # Create the ORDERS table and ORDERS_STREAM stream if they don't exist
    if not table_exists(session, schema='HARMONIZED', name='ORDERS'):
        create_orders_table(session)
        create_orders_stream(session)

    # Process data incrementally
    merge_order_updates(session)
#    session.table('HARMONIZED.ORDERS').limit(5).show()

    return f"Successfully processed ORDERS"

And then execute your recently-defined `main()` method

In [None]:
session.use_database('NB_HOL_DB')
main(session)

Take a look at the data we just loaded during our test.

Note the META_UPDATED_AT time -- that was entered dynamically, so should be just a few seconds ago!

In [None]:
SELECT ORDER_ID,ORDER_TS,META_UPDATED_AT,TRUCK_BRAND_NAME,PRIMARY_CITY,MENU_ITEM_NAME,QUANTITY,UNIT_PRICE 
FROM HARMONIZED.ORDERS 
LIMIT 5;

Now that we know our script works, we'll pop it into a giant string called 'script', and then use that string when we deploy the sproc in the subsequent cell:

In [None]:
script = '''
import time
from snowflake.snowpark import Session
#import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def table_exists(session, schema='', name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS']
    return exists

def create_orders_table(session):
    _ = session.sql("CREATE TABLE HARMONIZED.ORDERS LIKE HARMONIZED.POS_FLATTENED_V").collect()
    _ = session.sql("ALTER TABLE HARMONIZED.ORDERS ADD COLUMN META_UPDATED_AT TIMESTAMP").collect()

def create_orders_stream(session):
    _ = session.sql("CREATE STREAM HARMONIZED.ORDERS_STREAM ON TABLE HARMONIZED.ORDERS").collect()

def merge_order_updates(session):
    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect()

    source = session.table('HARMONIZED.POS_FLATTENED_V_STREAM')
    target = session.table('HARMONIZED.ORDERS')
    print("{} records in stream".format(session.table('HARMONIZED.POS_FLATTENED_V_STREAM').count()))

    # TODO: Is the if clause supposed to be based on "META_UPDATED_AT"?
    cols_to_update = {c: source[c] for c in source.schema.names if "METADATA" not in c}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    updates = {**cols_to_update, **metadata_col_to_update}

    # merge into DIM_CUSTOMER
    target.merge(source, target['ORDER_DETAIL_ID'] == source['ORDER_DETAIL_ID'], \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])

    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()

def main(session: Session) -> str:
    # Create the ORDERS table and ORDERS_STREAM stream if they don't exist
    if not table_exists(session, schema='HARMONIZED', name='ORDERS'):
        create_orders_table(session)
        create_orders_stream(session)

    # Process data incrementally
    merge_order_updates(session)
#    session.table('HARMONIZED.ORDERS').limit(5).show()

    return f"Successfully processed ORDERS"
'''

Here is the SQL query to deploy the procedure:

In [None]:
USE NB_HOL_DB.HARMONIZED;
CREATE OR REPLACE PROCEDURE orders_update_sp()
 RETURNS string
 LANGUAGE PYTHON
 RUNTIME_VERSION=3.8
 PACKAGES=('snowflake-snowpark-python','toml')
 HANDLER = 'main'
 AS $$
 {{script}}
 $$;

Now, instead of directly calling the `main()` function, we'll use SQL to `CALL` the deployed SProc

In [None]:
USE NB_HOL_DB.HARMONIZED;
CALL ORDERS_UPDATE_SP();

### More on the Snowpark API
In this step we're starting to really use the Snowpark DataFrame API for data transformations. To begin you need to create a Snowpark session object. When running in a Snowflake Notebook, the session object is provisioned for you automatically by Snowflake. And when building a Snowpark Python sproc the contract is that the first argument to the entry point (or handler) function is a Snowpark session.

You'll notice in the script that we have some functions which use SQL to create objects in Snowflake and to check object status. To issue a SQL statement to Snowflake with the Snowpark API you use the `session.sql()` function. Here's one example:

```python
def create_orders_stream(session):
    _ = session.sql("CREATE STREAM IF NOT EXISTS HARMONIZED.ORDERS_STREAM ON TABLE HARMONIZED.ORDERS \
                    SHOW_INITIAL_ROWS = TRUE;").collect()
```

The second thing to point out is how we're using DataFrames to merge changes from the source view to the target table. The Snowpark DataFrame API provides a `merge()` method which will ultimately generate a `MERGE` command in Snowflake.

Again, for more details about the Snowpark Python DataFrame API, please check out our [Working with DataFrames in Snowpark Python](https://docs.snowflake.com/en/developer-guide/snowpark/python/working-with-dataframes.html) page.


## Daily City Metrics Update Sproc
During this step we will be creating and deploying our second Snowpark Python sproc to Snowflake. This sproc will join the `HARMONIZED.ORDERS` data with the Weather Source data to create a final, aggregated table for analysis named `ANALYTICS.DAILY_CITY_METRICS`. We will process the data incrementally from the `HARMONIZED.ORDERS` table using another Snowflake Stream. And we will again use the Snowpark DataFrame `merge()` method to merge/upsert the data. To put this in context, we are on step **#7** in our data flow overview.

Once again, we'll start by writing the literal functions inline, for testing, and then call the `main()` function directly from here in the notebook.
Run the direct function definitions:

In [None]:
import time
from snowflake.snowpark import Session
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def table_exists(session, schema='', name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS']
    return exists

def create_daily_city_metrics_table(session):
    SHARED_COLUMNS= [T.StructField("DATE", T.DateType()),
                                        T.StructField("CITY_NAME", T.StringType()),
                                        T.StructField("COUNTRY_DESC", T.StringType()),
                                        T.StructField("DAILY_SALES", T.StringType()),
                                        T.StructField("AVG_TEMPERATURE_FAHRENHEIT", T.DecimalType()),
                                        T.StructField("AVG_TEMPERATURE_CELSIUS", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_INCHES", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_MILLIMETERS", T.DecimalType()),
                                        T.StructField("MAX_WIND_SPEED_100M_MPH", T.DecimalType()),
                                    ]
    DAILY_CITY_METRICS_COLUMNS = [*SHARED_COLUMNS, T.StructField("META_UPDATED_AT", T.TimestampType())]
    DAILY_CITY_METRICS_SCHEMA = T.StructType(DAILY_CITY_METRICS_COLUMNS)

    dcm = session.create_dataframe([[None]*len(DAILY_CITY_METRICS_SCHEMA.names)], schema=DAILY_CITY_METRICS_SCHEMA) \
                        .na.drop() \
                        .write.mode('overwrite').save_as_table('ANALYTICS.DAILY_CITY_METRICS')
    dcm = session.table('ANALYTICS.DAILY_CITY_METRICS')


def merge_daily_city_metrics(session):
    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect()

    print("{} records in stream".format(session.table('HARMONIZED.ORDERS_STREAM').count()))
    orders_stream_dates = session.table('HARMONIZED.ORDERS_STREAM').select(F.col("ORDER_TS_DATE").alias("DATE")).distinct()
    

    orders = session.table("HARMONIZED.ORDERS_STREAM").group_by(F.col('ORDER_TS_DATE'), F.col('PRIMARY_CITY'), F.col('COUNTRY')) \
                                        .agg(F.sum(F.col("PRICE")).as_("price_nulls")) \
                                        .with_column("DAILY_SALES", F.call_builtin("ZEROIFNULL", F.col("price_nulls"))) \
                                        .select(F.col('ORDER_TS_DATE').alias("DATE"), F.col("PRIMARY_CITY").alias("CITY_NAME"), \
                                        F.col("COUNTRY").alias("COUNTRY_DESC"), F.col("DAILY_SALES"))
#    orders.limit(5).show()

    weather_pc = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES")
    countries = session.table("RAW_POS.COUNTRY")
    weather = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY")
    weather = weather.join(weather_pc, (weather['POSTAL_CODE'] == weather_pc['POSTAL_CODE']) & (weather['COUNTRY'] == weather_pc['COUNTRY']), rsuffix='_pc')
    weather = weather.join(countries, (weather['COUNTRY'] == countries['ISO_COUNTRY']) & (weather['CITY_NAME'] == countries['CITY']), rsuffix='_c')
    weather = weather.join(orders_stream_dates, weather['DATE_VALID_STD'] == orders_stream_dates['DATE'])

    weather_agg = weather.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('COUNTRY_C')) \
                        .agg( \
                            F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"), \
                            F.avg(F.call_udf("ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF", F.col("AVG_TEMPERATURE_AIR_2M_F"))).alias("AVG_TEMPERATURE_C"), \
                            F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), \
                            F.avg(F.call_udf("ANALYTICS.INCH_TO_MILLIMETER_UDF", F.col("TOT_PRECIPITATION_IN"))).alias("AVG_PRECIPITATION_MM"), \
                            F.max(F.col("MAX_WIND_SPEED_100M_MPH")).alias("MAX_WIND_SPEED_100M_MPH") \
                        ) \
                        .select(F.col("DATE_VALID_STD").alias("DATE"), F.col("CITY_NAME"), F.col("COUNTRY_C").alias("COUNTRY_DESC"), \
                            F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), \
                            F.round(F.col("AVG_TEMPERATURE_C"), 2).alias("AVG_TEMPERATURE_CELSIUS"), \
                            F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \
                            F.round(F.col("AVG_PRECIPITATION_MM"), 2).alias("AVG_PRECIPITATION_MILLIMETERS"), \
                            F.col("MAX_WIND_SPEED_100M_MPH")
                            )
#    weather_agg.limit(5).show()

    daily_city_metrics_stg = orders.join(weather_agg, (orders['DATE'] == weather_agg['DATE']) & (orders['CITY_NAME'] == weather_agg['CITY_NAME']) & (orders['COUNTRY_DESC'] == weather_agg['COUNTRY_DESC']), \
                        how='left', rsuffix='_w') \
                    .select("DATE", "CITY_NAME", "COUNTRY_DESC", "DAILY_SALES", \
                        "AVG_TEMPERATURE_FAHRENHEIT", "AVG_TEMPERATURE_CELSIUS", \
                        "AVG_PRECIPITATION_INCHES", "AVG_PRECIPITATION_MILLIMETERS", \
                        "MAX_WIND_SPEED_100M_MPH")
#    daily_city_metrics_stg.limit(5).show()

    cols_to_update = {c: daily_city_metrics_stg[c] for c in daily_city_metrics_stg.schema.names}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    updates = {**cols_to_update, **metadata_col_to_update}

    dcm = session.table('ANALYTICS.DAILY_CITY_METRICS')
    dcm.merge(daily_city_metrics_stg, (dcm['DATE'] == daily_city_metrics_stg['DATE']) & (dcm['CITY_NAME'] == daily_city_metrics_stg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == daily_city_metrics_stg['COUNTRY_DESC']), \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])

    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()

def main(session: Session) -> str:
    # Create the DAILY_CITY_METRICS table if it doesn't exist
    if not table_exists(session, schema='ANALYTICS', name='DAILY_CITY_METRICS'):
        create_daily_city_metrics_table(session)
    
    merge_daily_city_metrics(session)
#    session.table('ANALYTICS.DAILY_CITY_METRICS').limit(5).show()

    return f"Successfully processed DAILY_CITY_METRICS"

Now let's run the `main()` function to test our code before plunking it into a string variable for deployment:

In [None]:
main(session)

### Deploying the Sproc to Snowflake
The test code looks good, so let's copy it into a string:

In [None]:
script = '''
import time
from snowflake.snowpark import Session
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


def table_exists(session, schema='', name=''):
    exists = session.sql("SELECT EXISTS (SELECT * FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(schema, name)).collect()[0]['TABLE_EXISTS']
    return exists

def create_daily_city_metrics_table(session):
    SHARED_COLUMNS= [T.StructField("DATE", T.DateType()),
                                        T.StructField("CITY_NAME", T.StringType()),
                                        T.StructField("COUNTRY_DESC", T.StringType()),
                                        T.StructField("DAILY_SALES", T.StringType()),
                                        T.StructField("AVG_TEMPERATURE_FAHRENHEIT", T.DecimalType()),
                                        T.StructField("AVG_TEMPERATURE_CELSIUS", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_INCHES", T.DecimalType()),
                                        T.StructField("AVG_PRECIPITATION_MILLIMETERS", T.DecimalType()),
                                        T.StructField("MAX_WIND_SPEED_100M_MPH", T.DecimalType()),
                                    ]
    DAILY_CITY_METRICS_COLUMNS = [*SHARED_COLUMNS, T.StructField("META_UPDATED_AT", T.TimestampType())]
    DAILY_CITY_METRICS_SCHEMA = T.StructType(DAILY_CITY_METRICS_COLUMNS)

    dcm = session.create_dataframe([[None]*len(DAILY_CITY_METRICS_SCHEMA.names)], schema=DAILY_CITY_METRICS_SCHEMA) \
                        .na.drop() \
                        .write.mode('overwrite').save_as_table('ANALYTICS.DAILY_CITY_METRICS')
    dcm = session.table('ANALYTICS.DAILY_CITY_METRICS')


def merge_daily_city_metrics(session):
    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE').collect()

    print("{} records in stream".format(session.table('HARMONIZED.ORDERS_STREAM').count()))
    orders_stream_dates = session.table('HARMONIZED.ORDERS_STREAM').select(F.col("ORDER_TS_DATE").alias("DATE")).distinct()
#   orders_stream_dates.limit(5).show()

    orders = session.table("HARMONIZED.ORDERS_STREAM").group_by(F.col('ORDER_TS_DATE'), F.col('PRIMARY_CITY'), F.col('COUNTRY')) \
                                        .agg(F.sum(F.col("PRICE")).as_("price_nulls")) \
                                        .with_column("DAILY_SALES", F.call_builtin("ZEROIFNULL", F.col("price_nulls"))) \
                                        .select(F.col('ORDER_TS_DATE').alias("DATE"), F.col("PRIMARY_CITY").alias("CITY_NAME"), \
                                        F.col("COUNTRY").alias("COUNTRY_DESC"), F.col("DAILY_SALES"))
#    orders.limit(5).show()

    weather_pc = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES")
    countries = session.table("RAW_POS.COUNTRY")
    weather = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY")
    weather = weather.join(weather_pc, (weather['POSTAL_CODE'] == weather_pc['POSTAL_CODE']) & (weather['COUNTRY'] == weather_pc['COUNTRY']), rsuffix='_pc')
    weather = weather.join(countries, (weather['COUNTRY'] == countries['ISO_COUNTRY']) & (weather['CITY_NAME'] == countries['CITY']), rsuffix='_c')
    weather = weather.join(orders_stream_dates, weather['DATE_VALID_STD'] == orders_stream_dates['DATE'])

    weather_agg = weather.group_by(F.col('DATE_VALID_STD'), F.col('CITY_NAME'), F.col('COUNTRY_C')) \
                        .agg( \
                            F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"), \
                            F.avg(F.call_udf("ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF", F.col("AVG_TEMPERATURE_AIR_2M_F"))).alias("AVG_TEMPERATURE_C"), \
                            F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), \
                            F.avg(F.call_udf("ANALYTICS.INCH_TO_MILLIMETER_UDF", F.col("TOT_PRECIPITATION_IN"))).alias("AVG_PRECIPITATION_MM"), \
                            F.max(F.col("MAX_WIND_SPEED_100M_MPH")).alias("MAX_WIND_SPEED_100M_MPH") \
                        ) \
                        .select(F.col("DATE_VALID_STD").alias("DATE"), F.col("CITY_NAME"), F.col("COUNTRY_C").alias("COUNTRY_DESC"), \
                            F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), \
                            F.round(F.col("AVG_TEMPERATURE_C"), 2).alias("AVG_TEMPERATURE_CELSIUS"), \
                            F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \
                            F.round(F.col("AVG_PRECIPITATION_MM"), 2).alias("AVG_PRECIPITATION_MILLIMETERS"), \
                            F.col("MAX_WIND_SPEED_100M_MPH")
                            )
#    weather_agg.limit(5).show()

    daily_city_metrics_stg = orders.join(weather_agg, (orders['DATE'] == weather_agg['DATE']) & (orders['CITY_NAME'] == weather_agg['CITY_NAME']) & (orders['COUNTRY_DESC'] == weather_agg['COUNTRY_DESC']), \
                        how='left', rsuffix='_w') \
                    .select("DATE", "CITY_NAME", "COUNTRY_DESC", "DAILY_SALES", \
                        "AVG_TEMPERATURE_FAHRENHEIT", "AVG_TEMPERATURE_CELSIUS", \
                        "AVG_PRECIPITATION_INCHES", "AVG_PRECIPITATION_MILLIMETERS", \
                        "MAX_WIND_SPEED_100M_MPH")
#    daily_city_metrics_stg.limit(5).show()

    cols_to_update = {c: daily_city_metrics_stg[c] for c in daily_city_metrics_stg.schema.names}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    updates = {**cols_to_update, **metadata_col_to_update}

    dcm = session.table('ANALYTICS.DAILY_CITY_METRICS')
    dcm.merge(daily_city_metrics_stg, (dcm['DATE'] == daily_city_metrics_stg['DATE']) & (dcm['CITY_NAME'] == daily_city_metrics_stg['CITY_NAME']) & (dcm['COUNTRY_DESC'] == daily_city_metrics_stg['COUNTRY_DESC']), \
                        [F.when_matched().update(updates), F.when_not_matched().insert(updates)])

    _ = session.sql('ALTER WAREHOUSE HOL_WH SET WAREHOUSE_SIZE = XSMALL').collect()

def main(session: Session) -> str:
    # Create the DAILY_CITY_METRICS table if it doesn't exist
    if not table_exists(session, schema='ANALYTICS', name='DAILY_CITY_METRICS'):
        create_daily_city_metrics_table(session)
    
    merge_daily_city_metrics(session)
#    session.table('ANALYTICS.DAILY_CITY_METRICS').limit(5).show()

    return f"Successfully processed DAILY_CITY_METRICS"
'''

Then deploy our sproc:

In [None]:
CREATE OR REPLACE PROCEDURE DAILY_CITY_METRICS_UPDATE_SP()
 RETURNS string
 LANGUAGE PYTHON
 RUNTIME_VERSION=3.8
 PACKAGES=('snowflake-snowpark-python','toml')
 HANDLER = 'main'
 AS $$
 {{script}}
 $$;

### Running the Sproc in Snowflake
And finally, let's run it! To put this in context, we are on step **#7** in our data flow overview.


In [None]:
CALL DAILY_CITY_METRICS_UPDATE_SP();

### Data Modeling Best Practice
When modeling data for analysis a best practice has been to clearly define and manage the schema of the table. In step 2, when we loaded raw data from Parquet we took advantage of Snowflake's schema detection feature to create a table with the same schema as the Parquet files. In this step we are explicitly defining the schema in DataFrame syntax and using that to create the table.

### Complex Aggregation Query
The `merge_daily_city_metrics()` function contains a complex aggregation query which is used to join together and aggregate the data from our POS and Weather Source. Take a look at the series of complex series of joins and aggregations that are expressed, and how we're even leveraging the Snowpark UDF we created in step #5!

The complex aggregation query is then merged into the final analytics table using the Snowpark `merge()` method. If you haven't already, check out your Snowflake Query history and see which queries were generated by the Snowpark API. In this case you will see that the Snowpark API took all the complex logic, including the merge and created a single Snowflake query to execute!


## That's it for part 1 of the Lab!
Proceed to part 2 of the lab in a separate notebook