### Data Engineering Pipelines with pandas on Snowflake - Automated Version

This notebook create a serverless task to schedule the feature engineering pipeline (Currently in Private Preview) as follows:

1. Create datframe from a Snowflake table
2. Aggregate data to create new features
3. Join dataframes
4. Save the result into a Snowflake table


Source Data: `SNOWFLAKE_SAMPLE_DATA`

Tutorial: https://quickstarts.snowflake.com/guide/data_engineering_pipelines_with_snowpark_pandas/index.html?index=..%2F..index#0


In [None]:
# Imports
from datetime import timedelta

import streamlit as st
import matplotlib.pyplot as plt
import seaborn as sns

from snowflake.snowpark.context import get_active_session
from snowflake.core import Root
import snowflake.snowpark.modin.plugin  # Snowpark pandas plugin for modin
import modin.pandas as spd  # Snowpark Pandas API
from snowflake.core.task import Task
from snowflake.core import CreateMode

In [None]:
# Set Paths

# Source
source_db = "SNOWFLAKE_SAMPLE_DATA"
source_schema = "TPCH_SF1"
source_data_path = f"{source_db}.{source_schema}"

# Target
target_db = "data_engineering_pipelines_with_pandas_on_snowflake_tutorial_v_automatic"
target_schema = "DATA"
target_data_path = f"{target_db}.{target_schema}"

In [None]:
-- DDL

-- stage
CREATE STAGE IF NOT EXISTS create_customer_profile_stage;

In [None]:
# Create a snowpark session
session = get_active_session()
session.use_schema(target_data_path)

# Add a query tag to the session for troubleshooting and monitoring
session.query_tag = {
    "origin":"sf_sit-is", 
    "name":"de_pipeline_pandas_on_snowflake", 
    "version":{"major":1, "minor":0},
    "attributes":{"is_quickstart":1, "source":"notebook", "vignette":"snowpark_pandas"}
}

# Set root
root = Root(session)

In [None]:
# Define function create_customer_profile
def create_customer_profile(snf_session: snowflake.snowpark.Session, source_data_path: str, target_data_path: str) -> str:

    # -----------------
    # Get LINEITEM
    # -----------------
    lineitem_keep_cols = ['L_ORDERKEY', 'L_LINENUMBER', 'L_PARTKEY', 'L_RETURNFLAG', 'L_QUANTITY', 'L_DISCOUNT', 'L_EXTENDEDPRICE']
    lineitem_sdf = spd.read_snowflake(f"{source_data_path}.LINEITEM")[lineitem_keep_cols]
    # Filter
    lineitem_sdf = lineitem_sdf[lineitem_sdf['L_RETURNFLAG'] != 'A']
    # Add column
    lineitem_sdf['DISCOUNT_AMOUNT'] = (lineitem_sdf['L_DISCOUNT'] * (lineitem_sdf['L_QUANTITY'] * lineitem_sdf['L_EXTENDEDPRICE']))
    # Transform
    column_groupby = ['L_ORDERKEY', 'L_RETURNFLAG']
    column_agg = { 
        'L_QUANTITY':['sum'],
        'DISCOUNT_AMOUNT': ['sum']
    }    
    lineitem_sdf = lineitem_sdf.groupby(by=column_groupby, as_index=False).agg(column_agg)  # Apply the aggregation    
    lineitem_sdf.columns = ['L_ORDERKEY', 'L_RETURNFLAG', 'NBR_OF_ITEMS', 'TOT_DISCOUNT_AMOUNT']  # Rename the columns
    
    # -------------
    # Get ORDERS
    # -------------
    order_sdf = spd.read_snowflake(f"{source_data_path}.ORDERS")
    # Drop unused columns 
    order_sdf = order_sdf.drop(['O_ORDERPRIORITY', 'O_CLERK', 'O_SHIPPRIORITY', 'O_COMMENT'], axis=1)

    # -------------------------
    # Join LINEITEM and ORDERS
    # -------------------------
    customer_profile_sdf = spd.merge(lineitem_sdf,order_sdf, left_on='L_ORDERKEY', right_on='O_ORDERKEY', how='left')
    customer_profile_sdf.drop('L_ORDERKEY', axis=1, inplace=True)
    # Transform
    column_groupby = ['O_CUSTKEY']
    column_agg = {
        'O_ORDERKEY':['count'], 
        'O_TOTALPRICE': ['sum' ,'mean', 'median'],
        'TOT_DISCOUNT_AMOUNT': ['sum'],
    } 
    customer_profile_sdf = customer_profile_sdf.groupby(by=column_groupby, as_index=False).agg(column_agg) # Apply the aggregation 
    customer_profile_sdf.columns = ['O_CUSTKEY', 'NUMBER_OF_ORDERS', 'TOT_ORDER_AMOUNT', 
                                 'AVG_ORDER_AMOUNT', 'MEDIAN_ORDER_AMOUNT', 'TOT_DISCOUNT_AMOUNT'] # Rename the columns  
    
    # Save to a table, replace if existing
    save_path = f"{target_data_path}.customer_profile"
    customer_profile_sdf.to_snowflake(name=save_path, if_exists="replace", index=False)    
    return f'Successful run with Modin:{spd.__version__}, Snowpark:{snowflake.snowpark.__version__}. Saved to {save_path}.'

In [None]:
# Register a stored proc that calls function create_customer_profile
create_customer_profile_sp = session.sproc.register(name="create_customer_profile_sp", 
                                                    func=create_customer_profile, replace=True, 
                                                    is_permanent=True, 
                                                    packages=['modin', 'snowflake-snowpark-python'], 
                                                    stage_location='@create_customer_profile_stage')
print(f"create_customer_profile_sp: {create_customer_profile_sp}")

In [None]:
# Create a task object
task_obj = Task(name='create_customer_profile_task',
                definition=f"CALL create_customer_profile_sp('{source_data_path}', '{target_data_path}')",
                schedule=timedelta(minutes=1))
print(f"task_obj: {task_obj}")

# Get task_collection
task_collection = root.databases[target_db].schemas[target_schema].tasks
print(f"task_collection: {task_collection}")

# Create task resource
task_resource = task_collection.create(task_obj,mode=CreateMode.or_replace)
print(f"task_resource: {task_resource}")

In [None]:
# Start task

# Note: By default, new tasks that are created are suspended
task_resource.resume()

In [None]:
# Stop task

task_resource.suspend()

In [None]:
-- View tasks
SHOW TASKS IN SCHEMA {{target_data_path}}
--SHOW TASKS LIKE '%CUSTOMER_PROFILE%' IN SCHEMA {{target_data_path}}

In [None]:
-- View task history
SELECT * FROM TABLE({{target_db}}.information_schema.task_history(
    task_name=>'create_customer_profile_task'))
--WHERE SCHEDULED_TIME >= CURRENT_TIMESTAMP() - INTERVAL '10 MINUTES';

In [None]:
-- View tables
SHOW TABLES IN {{target_data_path}};
--SHOW TABLES LIKE 'CUSTOMER_PROFILE' IN {{target_data_path}};

In [None]:
-- View customer_profile table
SELECT * FROM {{target_data_path}}.customer_profile LIMIT 10;

In [None]:
# Visualize data

# Retrieve latest data
customer_profile_sdf = spd.read_snowflake(f"{target_data_path}.CUSTOMER_PROFILE")    

# Convert snowpark df to pandas df
customer_profile_df = customer_profile_sdf.to_pandas()

# Set figure and axex
fig, axes = plt.subplots(1,4,figsize=(15,3))

colnames = ['NUMBER_OF_ORDERS', 'AVG_ORDER_AMOUNT', 'TOT_DISCOUNT_AMOUNT', 'MEDIAN_ORDER_AMOUNT']
# Iterating through axes and names
for col, ax in zip(colnames, axes.flatten()):
    ax.set_title(col)
    sns.histplot(customer_profile_df, x=col , ax=ax, kde=True, stat="density", kde_kws=dict(cut=3), alpha=.4, edgecolor=(1, 1, 1, .4))
fig.tight_layout()

In [None]:
-- Teardown

DROP DATABASE data_engineering_pipelines_with_pandas_on_snowflake_tutorial_v_automatic;
DROP WAREHOUSE MHN;