### Data Engineering Pipelines with pandas on Snowflake - Manual Version
This notebook performs the following:

1. Create datframe from a Snowflake table
2. Aggregate data to create new features
3. Join dataframes
4. Save the result into a Snowflake table
5. Create a serverless task to schedule the feature engineering pipeline (Currently in Private Preview)

Source Data: `SNOWFLAKE_SAMPLE_DATA`

Tutorial: https://quickstarts.snowflake.com/guide/data_engineering_pipelines_with_snowpark_pandas/index.html?index=..%2F..index#0


In [None]:
# Imports
import streamlit as st
import matplotlib.pyplot as plt
import seaborn as sns
from snowflake.snowpark.context import get_active_session
from snowflake.core import Root
import snowflake.snowpark.modin.plugin  # Snowpark pandas plugin for modin
import modin.pandas as spd  # Snowpark Pandas API


In [None]:
# Set Paths

# Source
source_db = "SNOWFLAKE_SAMPLE_DATA"
source_schema = "TPCH_SF1"
source_data_path = f"{source_db}.{source_schema}"

# Target
target_db = "data_engineering_pipelines_with_pandas_on_snowflake_tutorial_v_manual"
target_schema = "DATA"
target_data_path = f"{target_db}.{target_schema}"


In [None]:
# Create a snowpark session
session = get_active_session()
session.use_schema(target_data_path)

# Add a query tag to the session for troubleshooting and monitoring
session.query_tag = {
    "origin":"sf_sit-is", 
    "name":"de_pipeline_pandas_on_snowflake", 
    "version":{"major":1, "minor":0},
    "attributes":{"is_quickstart":1, "source":"notebook", "vignette":"snowpark_pandas"}
}

# Set root
root = Root(session)

In [None]:
# Get LINEITEM

# Set source table and columns
table = f"{source_data_path}.LINEITEM"
lineitem_keep_cols = [
    'L_ORDERKEY', 
    'L_LINENUMBER', 
    'L_PARTKEY', 
    'L_RETURNFLAG', 
    'L_QUANTITY', 
    'L_DISCOUNT', 
    'L_EXTENDEDPRICE'
]

# Create dataframe
lineitem_sdf = spd.read_snowflake(table)[lineitem_keep_cols]

In [None]:
# Display lineitem_sdf
st.dataframe(lineitem_sdf.head())

In [None]:
# Display lineitem_sdf metadata
print(f"DataFrame shape: {lineitem_sdf.shape}")
lineitem_sdf.info()

In [None]:
# Filter lineitem_sdf

# View L_RETURNFLAG values
print(lineitem_sdf.L_RETURNFLAG.value_counts())

# Create new dataframe that filters out 'A' values
lineitem_sdf = lineitem_sdf[lineitem_sdf['L_RETURNFLAG'] != 'A']

print(f"Before Filtering: {len(lineitem_sdf)} rows")
print(f"After Filtering: {len(lineitem_sdf)} rows")

# Display
st.dataframe(lineitem_sdf.head())


In [None]:
# Add column lineitem_sdf
lineitem_sdf['DISCOUNT_AMOUNT'] = lineitem_sdf['L_DISCOUNT'] * lineitem_sdf['L_QUANTITY'] * lineitem_sdf['L_EXTENDEDPRICE']

# Display
st.dataframe(lineitem_sdf.head())

In [None]:
# Transform lineitem_sdf

# Groupings we want
column_groupby = ['L_ORDERKEY', 'L_RETURNFLAG']

# Aggregations we want to do
column_agg = {
    'L_QUANTITY':['sum'], # Total Items Ordered  
    'DISCOUNT_AMOUNT': ['sum'] # Total Discount Amount
}

# Apply the aggregation
lineitem_sdf = lineitem_sdf.groupby(by=column_groupby, as_index=False).agg(column_agg)

# Rename the columns
lineitem_sdf.columns = ['L_ORDERKEY', 'L_RETURNFLAG', 'NBR_OF_ITEMS', 'TOT_DISCOUNT_AMOUNT']
st.dataframe(lineitem_sdf.head())

In [None]:
# GET ORDERS
order_sdf = spd.read_snowflake(f"{source_data_path}.ORDERS")

# Drop unused columns 
order_sdf = order_sdf.drop(['O_ORDERPRIORITY', 'O_CLERK', 'O_SHIPPRIORITY', 'O_COMMENT'], axis=1)

# Display
st.dataframe(order_sdf.head())

In [None]:
# Join LINEITEM and ORDERS
customer_profile_sdf = spd.merge(lineitem_sdf, order_sdf,
                                         left_on='L_ORDERKEY', 
                                         right_on='O_ORDERKEY', 
                                         how='left')
# Drop duplicate column
customer_profile_sdf.drop('L_ORDERKEY', axis=1, inplace=True)

# Display
st.dataframe(customer_profile_sdf.head())

In [None]:
# Transform customer_profile_sdf

# Groupings we want
column_groupby = ['O_CUSTKEY']

# Aggregations we want to do
column_agg = {
    'O_ORDERKEY':['count'], 
    'O_TOTALPRICE': ['sum' ,'mean', 'median'],
    'TOT_DISCOUNT_AMOUNT': ['sum'],
}

# Apply the aggregation
customer_profile_sdf = customer_profile_sdf.groupby(by=column_groupby, as_index=False).agg(column_agg)

# Rename the columns
customer_profile_sdf.columns = ['O_CUSTKEY', 'NUMBER_OF_ORDERS', 'TOT_ORDER_AMOUNT', 'AVG_ORDER_AMOUNT', 'MEDIAN_ORDER_AMOUNT', 
                             'TOT_DISCOUNT_AMOUNT']
st.dataframe(customer_profile_sdf.head())

In [None]:
# Save customer_profile_sdf to table, replace if existing
save_path = f"{target_data_path}.customer_profile"
customer_profile_sdf.to_snowflake(name=save_path, if_exists="replace", index=False)    
    

In [None]:
# View tables
tables = root.databases[target_db].schemas[target_schema].tables.iter()
# tables = root.databases[target_db].schemas[target_schema].tables.iter(like='CUSTOMER_PROFILE')

for table in tables:
    print(f"table: {table}")

In [None]:
-- View customer_profile table
SELECT * FROM {{target_data_path}}.customer_profile LIMIT 10;

In [None]:
# Visualize data

# Convert snowpark df to pandas df
customer_profile_df = customer_profile_sdf.to_pandas()

# Set figure and axex
fig, axes = plt.subplots(1,4,figsize=(15,3))

colnames = ['NUMBER_OF_ORDERS', 'AVG_ORDER_AMOUNT', 'TOT_DISCOUNT_AMOUNT', 'MEDIAN_ORDER_AMOUNT']
# Iterating through axes and names
for col, ax in zip(colnames, axes.flatten()):
    ax.set_title(col)
    sns.histplot(customer_profile_df, x=col , ax=ax, kde=True, stat="density", kde_kws=dict(cut=3), alpha=.4, edgecolor=(1, 1, 1, .4))
fig.tight_layout()

In [None]:
-- Teardown

DROP DATABASE data_engineering_pipelines_with_pandas_on_snowflake_tutorial_v_manual;
DROP WAREHOUSE TEST_WH;

