### Data Engineering Pipelines with Snowpark Python

What You'll Learn
You will learn about the following Snowflake features during this Quickstart:

Snowflake's Table Format
Data ingestion with COPY
Schema inference
Data sharing/marketplace (instead of ETL)
Streams for incremental processing (CDC)
Streams on views
Python UDFs (with third-party packages)
Python Stored Procedures
Snowpark DataFrame API
Snowpark Python programmability
Warehouse elasticity (dynamic scaling)
Visual Studio Code Snowflake native extension (PuPr, Git integration)
SnowCLI (PuPr)
Tasks (with Stream triggers)
Task Observability
GitHub Actions (CI/CD) integration

Source Data:
's3://sfquickstarts/data-engineering-with-snowpark-python/pos';
's3://sfquickstarts/data-engineering-with-snowpark-python/customer'


Tutorial: https://quickstarts.snowflake.com/guide/data_engineering_pipelines_with_snowpark_python/index.html?index=..%2F..index#0

In [None]:
# Imports
import os
import sys

from snowflake.snowpark.context import get_active_session
from snowflake.core import Root
import modin.pandas as spd  # Snowpark Pandas API
import snowflake.snowpark.modin.plugin  # Snowpark pandas plugin for modin
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F


In [None]:
# Set Paths
current_dir = os.getcwd()
parent_dir = os.path.dirname(current_dir)
sys.path.append(parent_dir)

In [None]:
-- Set DDL

-- Warehouses
CREATE OR REPLACE WAREHOUSE TEST_WH WAREHOUSE_SIZE = XSMALL, AUTO_SUSPEND = 300, AUTO_RESUME= TRUE;
USE WAREHOUSE TEST_WH;

-- Databases
CREATE OR ALTER DATABASE data_engineering_pipelines_with_snowpark_python;
USE DATABASE data_engineering_pipelines_with_snowpark_python;

-- Schemas
CREATE OR ALTER SCHEMA EXTERNAL;
CREATE OR ALTER SCHEMA RAW_POS;
CREATE OR ALTER SCHEMA RAW_CUSTOMER;
CREATE OR ALTER SCHEMA HARMONIZED;
CREATE OR ALTER SCHEMA ANALYTICS;

-- Stages
USE SCHEMA ANALYTICS;
CREATE OR ALTER STAGE analytics_stage 
	DIRECTORY = ( ENABLE = true ); 

USE SCHEMA EXTERNAL;  
CREATE OR ALTER STAGE frostbyte_raw_stage
    URL = 's3://sfquickstarts/data-engineering-with-snowpark-python/';

-- File formats
USE SCHEMA EXTERNAL;
CREATE OR ALTER FILE FORMAT PARQUET_FORMAT 
    /*Note: When we loaded raw data from Parquet we can take advantage of Snowflake's schema detection feature 
    to create a table with the same schema as the Parquet files*/
    TYPE = PARQUET
    COMPRESSION = SNAPPY;

In [None]:
# Create a snowpark session
session = get_active_session()

# Add a query tag to the session for troubleshooting and monitoring
session.query_tag = {
    "origin":"sf_sit-is", 
    "name":"data_engineering_pipelines_with_snowpark_python", 
    "version":{"major":1, "minor":0},
    "attributes":{"is_quickstart":1, "source":"notebook", "vignette":"snowpark_pandas"}
}

# Set root
root = Root(session)

In [None]:
-- Create UDF Functions: ANALYTICS

USE SCHEMA ANALYTICS;

-- INCH_TO_MILLIMETER_UDF
CREATE OR ALTER FUNCTION INCH_TO_MILLIMETER_UDF(INCH NUMBER(35,4))
RETURNS NUMBER(35,4)
AS $$
    inch * 25.4
$$;

/* Alternative Python Version
def inch_to_millimeter(inch: float) -> float:
    return inch * 25.4

inch_to_millimeter_udf = session.udf.register(name="inch_to_millimeter_udf", 
                                                func='inch_to_millimeter', 
                                                input_types=[FloatType()],
                                                return_type=FloatType(),
                                                replace=True, 
                                                is_permanent=True,
                                                stage_location='@analytics_stage')
*/

-- FAHRENHEIT_TO_CELSIUS_UDF
CREATE OR ALTER FUNCTION FAHRENHEIT_TO_CELSIUS_UDF("temp_f" FLOAT)
RETURNS FLOAT
LANGUAGE PYTHON
RUNTIME_VERSION=3.9 -- python version
HANDLER = 'fahrenheit_to_celsius'  --function name
AS $$
def fahrenheit_to_celsius(temp_f: float) -> float:
    return (float(temp_f) - 32) * (5/9)  # function definition
$$;
/* Alternative Python Version
def fahrenheit_to_celsius(temp_f: float) -> float:
    return (float(temp_f) - 32) * (5/9)

fahrenheit_to_celsius_udf = session.udf.register(name="fahrenheit_to_celsius_udf", 
                                                func='fahrenheit_to_celsius', 
                                                input_types=[FloatType()],
                                                return_type=FloatType(),
                                                replace=True, 
                                                is_permanent=True,
                                                stage_location='@analytics_stage')
*/


In [None]:
-- Test INCH_TO_MILLIMETER_UDF (12 -> 304.8)
SELECT data_engineering_pipelines_with_snowpark_python.ANALYTICS.INCH_TO_MILLIMETER_UDF(12);


In [None]:
-- Test FAHRENHEIT_TO_CELSIUS_UDF (32 -> 0)
SELECT data_engineering_pipelines_with_snowpark_python.ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF(32);

In [None]:
# Define load_all_raw_tables

def load_frostbyte_raw_table(session, tname=None, s3dir=None, year=None, schema=None):
    """This function extracts data from s3 and loads it into Snowflake"""
    
    # Set schema
    session.use_schema(schema)

    # Set parquet location path
    if year is None:
        pq_loc = f"@external.frostbyte_raw_stage/{s3dir}/{tname}"
    else:
        pq_loc = f"@external.frostbyte_raw_stage/{s3dir}/{tname}/year={year}"

    # Extract the data from s3dir
    # Note: When we loaded raw data from Parquet we can take advantage of Snowflake's schema detection
    #  feature to create a table with the same schema as the Parquet files
    df = session.read.option("compression", "snappy").parquet(pq_loc)
    
    # Load data into raw table
    df.copy_into_table(f"{tname}")


def load_all_frostbyte_raw_tables(session):
    """This function loops through specific tables in s3d to get the schema.table info and then calls load_raw_table"""
    
    pos_tables = ['country', 'franchise', 'location', 'menu', 'truck', 'order_header', 'order_detail']
    customer_tables = ['customer_loyalty']
    table_dict = {
        "pos": {"schema": "RAW_POS", "tables": pos_tables},
        "customer": {"schema": "RAW_CUSTOMER", "tables": customer_tables}
    }
    order_years = ['2019', '2020', '2021']  # Note: 2022 will be added later (see -- Add year 2022 to order_header_df)

    # Temporarily increase warehouse size
    session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

    for s3dir, data in table_dict.items():
        tnames = data['tables']
        schema = data['schema']
        for tname in tnames:
            print(f"Loading {schema}.{tname}")
            if tname in ['order_header_df', 'order_detail_df']:
                for year in order_years:
                    print(f'\tLoading year {year}') 
                    load_frostbyte_raw_table(session, tname=tname, s3dir=s3dir, year=year, schema=schema)
            else:
                load_frostbyte_raw_table(session, tname=tname, s3dir=s3dir, schema=schema)
    
    # Return warehouse to original size
    session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XSMALL").collect()

In [None]:
# Run load_all_frostbyte_raw_tables
load_all_frostbyte_raw_tables(session)

### Load Weather

Connect to the "Weather Source LLC: frostbyte" feed from Weather Source in the Snowflake Data Marketplace by following these steps:

    -> Snowsight Home Button
         -> Marketplace
             -> Search: "Weather Source LLC: frostbyte" (and click on tile in results)
                 -> Click the blue "Get" button
                     -> Under "Options", adjust the Database name to read "FROSTBYTE_WEATHERSOURCE" (all capital letters)
                        -> Grant to "AccountAdmin"
    
That's it... we don't have to do anything from here to keep this data updated.
The provider will do that for us and data sharing means we are always seeing
whatever they they have published.

In [None]:
-- View Weather data
SELECT * FROM FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES LIMIT 100;


In [None]:
# Define Create "HARMONIZED.POS_FLATTENED_V" view and stream

def create_pos_view(session):
    # Set session schema
    session.use_schema('HARMONIZED')

    # Extract data to dataframes
    truck_df = session.table("RAW_POS.TRUCK")
    franchise_df = session.table("RAW_POS.FRANCHISE")
    order_header_df = session.table("RAW_POS.ORDER_HEADER")
    location_df = session.table("RAW_POS.LOCATION")
    order_detail_df = session.table("RAW_POS.ORDER_DETAIL")
    menu_df = session.table("RAW_POS.MENU")

    # Modify dataframes
    order_header_df = order_header_df.with_column("ORDER_TS_DATE", F.to_date(F.col("ORDER_TS")))  # Add Column
    franchise_df = franchise_df.rename({
        F.col("FIRST_NAME"): "FRANCHISEE_FIRST_NAME",
        F.col("LAST_NAME"): "FRANCHISEE_LAST_NAME"
    })

    # Join dataframes
    t_with_f = truck_df.join(franchise_df, truck_df['FRANCHISE_ID'] == franchise_df['FRANCHISE_ID'], rsuffix='_f')
    oh_w_t_and_l = order_header_df.join(t_with_f, order_header_df['TRUCK_ID'] == t_with_f['TRUCK_ID'], rsuffix='_t') \
                                .join(location_df, order_header_df['LOCATION_ID'] == location_df['LOCATION_ID'], rsuffix='_l')
    final_df = order_detail_df.join(oh_w_t_and_l, order_detail_df['ORDER_ID'] == oh_w_t_and_l['ORDER_ID'], rsuffix='_oh') \
                            .join(menu_df, order_detail_df['MENU_ITEM_ID'] == menu_df['MENU_ITEM_ID'], rsuffix='_m')
    final_df = final_df.select(F.col("ORDER_ID"), \
                            F.col("TRUCK_ID"), \
                            F.col("ORDER_TS"), \
                            F.col("ORDER_TS_DATE"), \
                            F.col("ORDER_DETAIL_ID"), \
                            F.col("LINE_NUMBER"), \
                            F.col("TRUCK_BRAND_NAME"), \
                            F.col("MENU_TYPE"), \
                            F.col("PRIMARY_CITY"), \
                            F.col("REGION"), \
                            F.col("COUNTRY"), \
                            F.col("FRANCHISE_FLAG"), \
                            F.col("FRANCHISE_ID"), \
                            F.col("FRANCHISEE_FIRST_NAME"), \
                            F.col("FRANCHISEE_LAST_NAME"), \
                            F.col("LOCATION_ID"), \
                            F.col("MENU_ITEM_ID"), \
                            F.col("MENU_ITEM_NAME"), \
                            F.col("QUANTITY"), \
                            F.col("UNIT_PRICE"), \
                            F.col("PRICE"), \
                            F.col("ORDER_AMOUNT"), \
                            F.col("ORDER_TAX_AMOUNT"), \
                            F.col("ORDER_DISCOUNT_AMOUNT"), \
                            F.col("ORDER_TOTAL"))
        
    # Create view from dataframe
    final_df.create_or_replace_view('POS_FLATTENED_V')

def create_pos_view_stream(session):
    # Set session schema
    session.use_schema('HARMONIZED')

    # Create streaming view
    session.sql('CREATE OR REPLACE STREAM POS_FLATTENED_V_STREAM \
                        ON VIEW POS_FLATTENED_V \
                        SHOW_INITIAL_ROWS = TRUE').collect()

def test_pos_view(session):
    # Set session schema
    session.use_schema('HARMONIZED')
    session.table('POS_FLATTENED_V').limit(5).show()

In [None]:
# Run Create "HARMONIZED.POS_FLATTENED_V" view and stream

create_pos_view(session)
create_pos_view_stream(session)
test_pos_view(session)

In [None]:
orders_upsert_sp_script = """
import time
from snowflake.snowpark import Session
import snowflake.snowpark.functions as F

# Define orders_upsert
def orders_upsert(session):
    
    # Create tables and stream if not exists
    schema ='HARMONIZED'
    table ='ORDERS'
    exists = session.sql(f"SELECT EXISTS ( \
                         SELECT * \
                         FROM INFORMATION_SCHEMA.TABLES \
                         WHERE TABLE_SCHEMA = '{schema}' \
                         AND TABLE_NAME = '{table}') \
                         AS TABLE_EXISTS").collect()[0]['TABLE_EXISTS']
    if not exists:
        session.sql("CREATE TABLE HARMONIZED.ORDERS LIKE HARMONIZED.POS_FLATTENED_V").collect()
        session.sql("ALTER TABLE HARMONIZED.ORDERS ADD COLUMN META_UPDATED_AT TIMESTAMP").collect()
        session.sql("CREATE STREAM HARMONIZED.ORDERS_STREAM ON TABLE HARMONIZED.ORDERS").collect()
    
    # Temporarily increase warehouse size
    session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

    # Get data
    source_df = session.table('HARMONIZED.POS_FLATTENED_V_STREAM')
    target_df = session.table('HARMONIZED.ORDERS')

    # Upsert data
    cols_to_update = {c: source_df[c] for c in source_df.schema.names if "METADATA" not in c}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    records = {**cols_to_update, **metadata_col_to_update}
    target_df.merge(source_df, target_df['ORDER_DETAIL_ID'] == source_df['ORDER_DETAIL_ID'], \
                        [F.when_matched().update(records), F.when_not_matched().insert(records)])

    # Return warehouse to original size
    session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XSMALL").collect()
"""

In [None]:
-- Create ORDERS_UPSERT_SP
USE SCHEMA HARMONIZED;
CREATE OR REPLACE PROCEDURE ORDERS_UPSERT_SP()
    RETURNS string
    LANGUAGE PYTHON
    RUNTIME_VERSION=3.9
    PACKAGES=('snowflake-snowpark-python')
    HANDLER = 'orders_upsert'  -- python function to execute from script
    AS $$
        {{orders_upsert_sp_script}} 
    $$;
/*Alternative Python Version
session.sproc.register(name="ORDERS_UPSERT_SP", 
                       func=orders_upsert,  
                       packages=['snowflake-snowpark-python'])

*/

In [None]:
-- Call ORDERS_UPSERT_SP
CALL ORDERS_UPSERT_SP();
/*Alternative Python Version
session.call('ORDERS_UPSERT_SP')
*/

In [None]:
daily_city_metrics_upsert_sp_script = """

import time
from snowflake.snowpark import Session
import snowflake.snowpark.types as T
import snowflake.snowpark.functions as F

# Define daily_city_metrics_upsert
def daily_city_metrics_upsert(session):

    # Create table if not exists
    schema ='ANALYTICS'
    table ='DAILY_CITY_METRICS'
    exists = session.sql(f"SELECT EXISTS ( \
                         SELECT * \
                         FROM INFORMATION_SCHEMA.TABLES \
                         WHERE TABLE_SCHEMA = '{schema}' \
                         AND TABLE_NAME = '{table}') \
                         AS TABLE_EXISTS").collect()[0]['TABLE_EXISTS']
    if not exists:
        # In this step we are explicitly defining the schema in DataFrame syntax and using that to create the table.
        shared_columns= [
            T.StructField("DATE", T.DateType()),
            T.StructField("CITY_NAME", T.StringType()),
            T.StructField("COUNTRY_DESC", T.StringType()),
            T.StructField("DAILY_SALES", T.StringType()),
            T.StructField("AVG_TEMPERATURE_FAHRENHEIT", T.DecimalType()),
            T.StructField("AVG_TEMPERATURE_CELSIUS", T.DecimalType()),
            T.StructField("AVG_PRECIPITATION_INCHES", T.DecimalType()),
            T.StructField("AVG_PRECIPITATION_MILLIMETERS", T.DecimalType()),
            T.StructField("MAX_WIND_SPEED_100M_MPH", T.DecimalType()),
        ]
        daily_city_metrics_columns = [
            *shared_columns, 
            T.StructField("META_UPDATED_AT", T.TimestampType())
        ]
        daily_city_metrics_schema = T.StructType(daily_city_metrics_columns)
        daily_city_metrics_table = 'ANALYTICS.DAILY_CITY_METRICS'
        session.create_dataframe([[None]*len(daily_city_metrics_schema.names)], schema=daily_city_metrics_schema) \
                .na.drop() \
                .write.mode('overwrite').save_as_table(daily_city_metrics_table)

    # Temporarily increase warehouse size
    session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()
 
    
    # Get data
    orders_stream_dates_df = session.table('HARMONIZED.ORDERS_STREAM').select(F.col("ORDER_TS_DATE").alias("DATE")).distinct()
    orders_stream_dates_df.limit(5).show()
    orders_df = session.table("HARMONIZED.ORDERS_STREAM")\
                        .group_by(\
                            F.col('ORDER_TS_DATE'), \
                            F.col('PRIMARY_CITY'), \
                            F.col('COUNTRY')) \
                        .agg(\
                            F.sum(F.col("PRICE")).as_("price_nulls")) \
                        .with_column(\
                            "DAILY_SALES", \
                            F.call_builtin("ZEROIFNULL", \
                            F.col("price_nulls"))) \
                        .select(\
                            F.col('ORDER_TS_DATE').alias("DATE"), \
                            F.col("PRIMARY_CITY").alias("CITY_NAME"), \
                            F.col("COUNTRY").alias("COUNTRY_DESC"), \
                            F.col("DAILY_SALES"))
    weather_pc_df = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.POSTAL_CODES")
    countries_df = session.table("RAW_POS.COUNTRY")
    weather_df = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY")
    weather_df = weather_df.join(weather_pc_df, on=(weather_df['POSTAL_CODE'] == weather_pc_df['POSTAL_CODE']) & (weather_df['COUNTRY'] == weather_pc_df['COUNTRY']), rsuffix='_pc')
    weather_df = weather_df.join(countries_df,on=(weather_df['COUNTRY'] == countries_df['ISO_COUNTRY']) & (weather_df['CITY_NAME'] == countries_df['CITY']), rsuffix='_c')
    weather_df = weather_df.join(orders_stream_dates_df, on=weather_df['DATE_VALID_STD'] == orders_stream_dates_df['DATE']) \
                    .group_by(\
                        F.col('DATE_VALID_STD'), \
                        F.col('CITY_NAME'), \
                        F.col('COUNTRY_C')) \
                    .agg(\
                        F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"), \
                        F.avg(F.call_udf("ANALYTICS.FAHRENHEIT_TO_CELSIUS_UDF", F.col("AVG_TEMPERATURE_AIR_2M_F"))).alias("AVG_TEMPERATURE_C"), \
                        F.avg("TOT_PRECIPITATION_IN").alias("AVG_PRECIPITATION_IN"), \
                        F.avg(F.call_udf("ANALYTICS.INCH_TO_MILLIMETER_UDF", F.col("TOT_PRECIPITATION_IN"))).alias("AVG_PRECIPITATION_MM"), \
                        F.max(F.col("MAX_WIND_SPEED_100M_MPH")).alias("MAX_WIND_SPEED_100M_MPH")) \
                    .select(\
                        F.col("DATE_VALID_STD").alias("DATE"), \
                        F.col("CITY_NAME"), \
                        F.col("COUNTRY_C").alias("COUNTRY_DESC"), \
                        F.round(F.col("AVG_TEMPERATURE_F"), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"), \
                        F.round(F.col("AVG_TEMPERATURE_C"), 2).alias("AVG_TEMPERATURE_CELSIUS"), \
                        F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"), \
                        F.round(F.col("AVG_PRECIPITATION_MM"), 2).alias("AVG_PRECIPITATION_MILLIMETERS"), \
                        F.col("MAX_WIND_SPEED_100M_MPH"))
    # Stage data
    daily_city_metrics_stg_df = orders_df.join(weather_df, on=(orders_df['DATE'] == weather_df['DATE']) & (orders_df['CITY_NAME'] == weather_df['CITY_NAME']) & (orders_df['COUNTRY_DESC'] == weather_df['COUNTRY_DESC']), how='left', rsuffix='_w') \
                                   .select(\
                                        "DATE", \
                                        "CITY_NAME", \
                                        "COUNTRY_DESC", \
                                        "DAILY_SALES", \
                                        "AVG_TEMPERATURE_FAHRENHEIT", \
                                        "AVG_TEMPERATURE_CELSIUS", \
                                        "AVG_PRECIPITATION_INCHES",\
                                        "AVG_PRECIPITATION_MILLIMETERS", \
                                        "MAX_WIND_SPEED_100M_MPH")

    # Upsert data
    cols_to_update = {c: daily_city_metrics_stg_df[c] for c in daily_city_metrics_stg_df.schema.names}
    metadata_col_to_update = {"META_UPDATED_AT": F.current_timestamp()}
    records = {**cols_to_update, **metadata_col_to_update}
    daily_city_metrics_df = session.table('ANALYTICS.DAILY_CITY_METRICS')
    daily_city_metrics_df.merge(daily_city_metrics_stg_df, \
                                  (daily_city_metrics_df['DATE'] == daily_city_metrics_stg_df['DATE']) \
                                & (daily_city_metrics_df['CITY_NAME'] == daily_city_metrics_stg_df['CITY_NAME']) \
                                & (daily_city_metrics_df['COUNTRY_DESC'] == daily_city_metrics_stg_df['COUNTRY_DESC']), \
                                [F.when_matched().update(records), F.when_not_matched().insert(records)])

    # Return warehouse to original size
    session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XSMALL").collect()
"""

In [None]:
-- Create DAILY_CITY_METRICS_UPSERT_SP
USE SCHEMA HARMONIZED;
CREATE OR REPLACE PROCEDURE DAILY_CITY_METRICS_UPSERT_SP()
    RETURNS string
    LANGUAGE PYTHON
    RUNTIME_VERSION=3.9
    PACKAGES=('snowflake-snowpark-python')
    HANDLER = 'daily_city_metrics_upsert'
    AS $$
        {{daily_city_metrics_upsert_sp_script}}
    $$;
/*Alternative Python Version
session.sproc.register(name="DAILY_CITY_METRICS_UPSERT_SP", 
                       func=daily_city_metrics_upsert,  
                       packages=['snowflake-snowpark-python'])

*/

In [None]:
-- Call DAILY_CITY_METRICS_UPDATE_SP
CALL DAILY_CITY_METRICS_UPSERT_SP();
/*Alternative Python Version
session.call('DAILY_CITY_METRICS_UPSERT_SP')
*/

In [None]:
-- Tasks

-- ----------------------------------------------------------------------------
-- Step #1: Create the tasks that trigger based on new data pushes
-- ----------------------------------------------------------------------------

-- ORDERS_UPSERT_TASK
USE SCHEMA HARMONIZED;
CREATE OR ALTER TASK ORDERS_UPSERT_TASK
WAREHOUSE = TEST_WH
WHEN
  SYSTEM$STREAM_HAS_DATA('POS_FLATTENED_V_STREAM')
AS 
CALL HARMONIZED.ORDERS_UPSERT_SP();

-- DAILY_CITY_METRICS_UPSERT_TASK
USE SCHEMA HARMONIZED;
CREATE OR ALTER TASK DAILY_CITY_METRICS_UPSERT_TASK
WAREHOUSE = TEST_WH
AFTER ORDERS_UPSERT_TASK
WHEN
  SYSTEM$STREAM_HAS_DATA('ORDERS_STREAM')
AS
CALL HARMONIZED.DAILY_CITY_METRICS_UPSERT_SP();

-- ----------------------------------------------------------------------------
-- Step #2: Execute the tasks
-- ----------------------------------------------------------------------------

ALTER TASK DAILY_CITY_METRICS_UPSERT_TASK RESUME;

EXECUTE TASK ORDERS_UPSERT_TASK;

In [None]:
-- Add year 2022 to order_header_df

-- Set schema
USE SCHEMA RAW_POS;

-- Temporarily increase warehouse size
ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE;

-- Set parquet location path
LS @external.frostbyte_raw_stage/pos/order_header/year=2022;

--
COPY INTO order_header 
FROM @external.frostbyte_raw_stage/pos/order_header/year=2022
FILE_FORMAT = (FORMAT_NAME = EXTERNAL.PARQUET_FORMAT)
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;

--
COPY INTO order_detail 
FROM @external.frostbyte_raw_stage/pos/order_detail/year=2022
FILE_FORMAT = (FORMAT_NAME = EXTERNAL.PARQUET_FORMAT)
MATCH_BY_COLUMN_NAME = CASE_SENSITIVE;

-- Return warehouse to original size
ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XSMALL;

/* Alternative Python Version
schema = 
tables = ['order_header', 'order_header']
year = 2022

# Set schema
session.use_schema(schema)

# Temporarily increase warehouse size
session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XLARGE WAIT_FOR_COMPLETION = TRUE").collect()

for tname in tables:
    # Set parquet location path
    location = f"@external.frostbyte_raw_stage/pos/{tname}/year={year}"
    
    # Extract the data from s3dir
    """Note: When we loaded raw data from Parquet we can take advantage of Snowflake's schema detection feature 
    to create a table with the same schema as the Parquet files"""
    df = session.read.option("compression", "snappy").parquet(location)

    # Load data into raw table
    df.copy_into_table(f"{tname}")

# Return warehouse to original size
session.sql("ALTER WAREHOUSE TEST_WH SET WAREHOUSE_SIZE = XSMALL").collect()
*/

In [None]:
-- Teardown

DROP DATABASE data_engineering_pipelines_with_snowpark_python;

