### Using Snowpark Pandas API to create a Customer profile

This demo is using the [Snowflake Sample TPC-H dataset](https://docs.snowflake.com/en/user-guide/sample-data-tpch) that should be in a shared database named `SNOWFLAKE_SAMPLE_DATA`.

During this demo you will learn how to use the Snowpark Pandas API to
* Create datframe from a Snowflake table
* Aggregate data to create new features
* Join dataframes
* Save the result into a Snowflake table
* Create a serverless task to schedule the feature engineering


Start by adding neccessary libraries using the Packages menu, the additional libraries needed for this notebook is: 
* `snowflake-snowpark-python` (version 1.17 or higher)
* `modin`
* `snowflake`
* `matplotlib`
* `seaborn`

In [None]:
# foo change jc
import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')

import streamlit as st
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# Snowpark Pandas API
import modin.pandas as spd
# Import the Snowpark pandas plugin for modin
import snowflake.snowpark.modin.plugin

from snowflake.snowpark.context import get_active_session
# Create a snowpark session
session = get_active_session()

In [None]:
# Name of the sample database and the schema to be used
SOURCE_DATA_PATH = "SNOWFLAKE_SAMPLE_DATA.TPCH_SF1"
SAVE_DATA_PATH = "SNOW_PANDAS_DE_QS.DATA"
# Make sure we use the created database and schema for temp tables etc
session.use_schema(SAVE_DATA_PATH)

We will start by creating a number of features based on the customer orders using the line items.

Start with the `LINEITEM` table to create these features so we will start by creating a Snowpark Pandas Datframe aginst it, select the columns we are interested in and then show info about the dataframe, the shape and the first rows.

In [None]:
lineitem_keep_cols = ['L_ORDERKEY', 'L_LINENUMBER', 'L_PARTKEY', 'L_RETURNFLAG', 'L_QUANTITY', 'L_DISCOUNT', 'L_EXTENDEDPRICE']
lineitem_df = spd.read_snowflake(f"{SOURCE_DATA_PATH}.LINEITEM")[lineitem_keep_cols]

In [None]:
st.dataframe(lineitem_df.head())

In [None]:
# Get info about the dataframe
lineitem_df.info()

In [None]:
print(f"DataFrame shape: {lineitem_df.shape}")

## Data Cleaning - Filtering and Aggregation

Taking a look at different values for `L_RETURNFLAG` and include only line items that was delivered (`N`) or returned (`R`).

In [None]:
print(lineitem_df.L_RETURNFLAG.value_counts())

Add a filter to the dataframe

In [None]:
print(f"Before Filtering: {len(lineitem_df)} rows")
spd_lineitem = lineitem_df[lineitem_df['L_RETURNFLAG'] != 'A']
print(f"After Filtering: {len(spd_lineitem)} rows")
st.dataframe(spd_lineitem.head())

To track the actual discount a customer gets per order, we need to calculate that in a new column by taking the product of the amount of discount (`L_DISCOUNT`), numbers sold (`L_QUANTITY`), and the price of item (`L_EXTENDEDPRICE`).

In [None]:
spd_lineitem['DISCOUNT_AMOUNT'] = spd_lineitem['L_DISCOUNT'] * spd_lineitem['L_QUANTITY'] * spd_lineitem['L_EXTENDEDPRICE']
st.dataframe(spd_lineitem.head())

Now we want to compute the aggregate of items and discount amount, grouped by order key and return flag.


In [None]:
# Aggregations we want to do
column_agg = {
                'L_QUANTITY':['sum'], # Total Items Ordered  
                'DISCOUNT_AMOUNT': ['sum'] # Total Discount Amount
             }

# Apply the aggregation
spd_lineitem_agg = spd_lineitem.groupby(by=['L_ORDERKEY', 'L_RETURNFLAG'], as_index=False).agg(column_agg)

# Rename the columns
spd_lineitem_agg.columns = ['L_ORDERKEY', 'L_RETURNFLAG', 'NBR_OF_ITEMS', 'TOT_DISCOUNT_AMOUNT']
st.dataframe(spd_lineitem_agg.head())

## Data Transformation - Pivot and reshape

We want to separate the `NBR_OF_ITEMS` and `TOT_DISCOUNT_AMOUNT` by `L_RETURNFLAG` so we have one column for each uinique `L_RETURNFLAG` value.  
Using the **pivot_table** method will give us one column for each unique value in `RETURN_FLAG`

In [None]:
# This will make L_ORDERKEY the index
spd_lineitem_agg_pivot_df = spd_lineitem_agg.pivot_table(
                                values=['NBR_OF_ITEMS', 'TOT_DISCOUNT_AMOUNT'], 
                                index=['L_ORDERKEY'],
                                columns=['L_RETURNFLAG'], 
                                aggfunc="sum")

The **pivot_table** method returns subcolumns and by renaming the columns we will get rid of those, and have one unique columns for each value.

In [None]:
spd_lineitem_agg_pivot_df.columns = ['NBR_OF_ITEMS_N', 'NBR_OF_ITEMS_R','TOT_DISCOUNT_AMOUNT_N','TOT_DISCOUNT_AMOUNT_R']
# Move L_ORDERKEY back to column
spd_lineitem_agg_pivot = spd_lineitem_agg_pivot_df.reset_index(names=['L_ORDERKEY'])
st.dataframe(spd_lineitem_agg_pivot.head(10))

## Combine lineitem with orders information

Load `ORDERS` table and join with dataframe with transformed lineitem information.

In [None]:
spd_order = spd.read_snowflake(f"{SOURCE_DATA_PATH}.ORDERS")
# Drop unused columns 
spd_order = spd_order.drop(['O_ORDERPRIORITY', 'O_CLERK', 'O_SHIPPRIORITY', 'O_COMMENT'], axis=1)
# Use streamlit to display the dataframe
st.dataframe(spd_order.head())

Use **merge** to join the two dataframes

In [None]:
# Join dataframes
spd_order_items = spd_lineitem_agg_pivot.merge(spd_order,
                                               left_on='L_ORDERKEY', 
                                               right_on='O_ORDERKEY', 
                                               how='left')

Drop the `L_ORDERKEY`column, it has the same values as `O_ORDERKEY`

In [None]:
spd_order_items.drop('L_ORDERKEY', axis=1, inplace=True)
st.write(f"DataFrame shape: {spd_order_items.shape}")
st.dataframe(spd_order_items.head())

More aggregations grouped by customer (`O_CUSTKEY`)
* Total items delivered by customer
* Average items delivered by customer
* Total items returned by customer
* Average items returned by customer

In [None]:
# Aggregations we want to do
column_agg = {
                'O_ORDERKEY':['count'], 
                'O_TOTALPRICE': ['sum' ,'mean', 'median'],
                'NBR_OF_ITEMS_N': ['sum' ,'mean', 'median'],
                'NBR_OF_ITEMS_R': ['sum' ,'mean', 'median'],
                'TOT_DISCOUNT_AMOUNT_N': ['sum'],
                'TOT_DISCOUNT_AMOUNT_R': ['sum']
            }

# Apply the aggregation
spd_order_profile = spd_order_items.groupby(by='O_CUSTKEY', as_index=False).agg(column_agg)

# Rename the columns
spd_order_profile.columns = ['O_CUSTKEY', 'NUMBER_OF_ORDERS', 'TOT_ORDER_AMOUNT', 'AVG_ORDER_AMOUNT', 'MEDIAN_ORDER_AMOUNT', 
                             'TOT_ITEMS_DELIVERED', 'AVG_ITEMS_DELIVERED', 'MEDIAN_ITEMS_DELIVERED', 
                             'TOT_ITEMS_RETURNED', 'AVG_ITEMS_RETURNED', 'MEDIAN_ITEMS_RETURNED',
                             'TOT_DISCOUNT_AMOUNT_N', 'TOT_DISCOUNT_AMOUNT_R']
st.dataframe(spd_order_profile.head())

Calculate the total and average discount

In [None]:
spd_order_profile['TOT_DISCOUNT'] = spd_order_profile['TOT_DISCOUNT_AMOUNT_N'] + spd_order_profile['TOT_DISCOUNT_AMOUNT_R']
spd_order_profile['AVG_DISCOUNT'] = spd_order_profile['TOT_DISCOUNT'] / spd_order_profile['NUMBER_OF_ORDERS']
st.dataframe(spd_order_profile.head())

## Saving results to Snowflake Table

We can now save our customer profile as a Snowflake table, in this case we will replace it if it already exists and by setting `index=False` we do not save the index column.

In [None]:
spd_order_profile.to_snowflake(name=f"{SAVE_DATA_PATH}.customer_profile", if_exists="replace", index=False)

We can check using SQL that we have data in the table

In [None]:
SELECT * FROM {{SAVE_DATA_PATH}}.customer_profile LIMIT 10;

## Visualize data distribution

Plot histogram distribution for different columns in customer profile

In [None]:
spd_profile =  spd.read_snowflake(f"{SAVE_DATA_PATH}.customer_profile")  
pd_profile = spd_profile.to_pandas()

fig, axes = plt.subplots(1,4,figsize=(15,3))

colnames = ['NUMBER_OF_ORDERS', 'AVG_ORDER_AMOUNT', 'AVG_ITEMS_DELIVERED', 'AVG_ITEMS_RETURNED']
# Iterating through axes and names
for col, ax in zip(colnames, axes.flatten()):
    ax.set_title(col)
    sns.histplot(pd_profile, x=col , ax=ax, kde=True, stat="density", kde_kws=dict(cut=3), alpha=.4, edgecolor=(1, 1, 1, .4))
fig.tight_layout()

## Orchestrate Data Pipeline: Scheduling with Serverless Tasks

We have now used Snowpark Pandas API to create a Customer profile based on their purchase data.

A next step is to run this notebook regulary to update the profiles when we have new data, this can be done by scheduling it using the schedule function in notebooks or using a CI/CD pipeline.  

Another way is to create a serverless task directly in the notebook. In order to do that we need to create a Python function with all the steps we have done in so far.

In [None]:
# Snowflake Python API, to be used to create a serverless task
from snowflake.core import Root
from snowflake.core.task import Task
from snowflake.core import CreateMode
from snowflake.snowpark import Session
root = Root(session)

Convert our Snowpark pandas data pipeline from earlier to a function.

In [None]:
def create_customer_profile(snf_session: snowflake.snowpark.Session, data_path: str, save_data_path: str) -> str:
    from datetime import datetime
    #  Get line item
    lineitem_keep_cols = ['L_ORDERKEY', 'L_LINENUMBER', 'L_PARTKEY', 'L_RETURNFLAG', 'L_QUANTITY', 'L_DISCOUNT', 'L_EXTENDEDPRICE']
    spd_lineitem = spd.read_snowflake(f"{data_path}.LINEITEM")[lineitem_keep_cols]
    spd_lineitem = spd_lineitem[spd_lineitem['L_RETURNFLAG'] != 'A']
    spd_lineitem['DISCOUNT_AMOUNT'] = (spd_lineitem['L_DISCOUNT'] * (spd_lineitem['L_QUANTITY'] * spd_lineitem['L_EXTENDEDPRICE']))
    
    # Aggregations we want to do on line item
    column_agg = {
                    'L_QUANTITY':['sum'], # Total Items Ordered  
                    'DISCOUNT_AMOUNT': ['sum'] # Total Discount Amount
                 }
    
    # Apply the aggregation
    spd_lineitem_agg = spd_lineitem.groupby(by=['L_ORDERKEY', 'L_RETURNFLAG'], as_index=False).agg(column_agg)
    
    # Rename the columns
    spd_lineitem_agg.columns = ['L_ORDERKEY', 'L_RETURNFLAG', 'NBR_OF_ITEMS', 'TOT_DISCOUNT_AMOUNT']
    
    # # This will make L_ORDERKEY the index
    spd_lineitem_agg_pivot = spd_lineitem_agg.pivot_table(values=['NBR_OF_ITEMS', 'TOT_DISCOUNT_AMOUNT'], index=['L_ORDERKEY'],
                            columns=['L_RETURNFLAG'], aggfunc="sum")
    # Pivot the dataframe
    spd_lineitem_agg_pivot.columns = ['NBR_OF_ITEMS_N', 'NBR_OF_ITEMS_R','TOT_DISCOUNT_AMOUNT_N','TOT_DISCOUNT_AMOUNT_R']
    
    # # Move L_ORDERKEY back to column
    spd_lineitem_agg_pivot.reset_index(names=['L_ORDERKEY'], inplace=True)

    # Get Orders
    spd_order = spd.read_snowflake(f"{data_path}.ORDERS")
    # Drop unused columns 
    spd_order = spd_order.drop(['O_ORDERPRIORITY', 'O_CLERK', 'O_SHIPPRIORITY', 'O_COMMENT'], axis=1)

    # Join orders with the pivoted lineitems
    spd_order_items = spd_lineitem_agg_pivot.merge(spd_order, left_on='L_ORDERKEY', right_on='O_ORDERKEY', how='left')
    spd_order_items.drop('L_ORDERKEY', axis=1, inplace=True)
    
    # Aggregations we want to do
    column_agg = {
                    'O_ORDERKEY':['count'], 
                    'O_TOTALPRICE': ['sum' ,'mean', 'median'],
                    'NBR_OF_ITEMS_N': ['sum' ,'mean', 'median'],
                    'NBR_OF_ITEMS_R': ['sum' ,'mean', 'median'],
                    'TOT_DISCOUNT_AMOUNT_N': ['sum'],
                    'TOT_DISCOUNT_AMOUNT_R': ['sum']
                }
    
    # Apply the aggregation
    spd_order_profile = spd_order_items.groupby(by='O_CUSTKEY', as_index=False).agg(column_agg)
    
    # Rename the columns
    spd_order_profile.columns = ['O_CUSTKEY', 'NUMBER_OF_ORDERS', 'TOT_ORDER_AMOUNT', 'AVG_ORDER_AMOUNT', 'MEDIAN_ORDER_AMOUNT', 
                                 'TOT_ITEMS_DELIVERED', 'AVG_ITEMS_DELIVERED', 'MEDIAN_ITEMS_DELIVERED', 
                                 'TOT_ITEMS_RETURNED', 'AVG_ITEMS_RETURNED', 'MEDIAN_ITEMS_RETURNED',
                                 'TOT_DISCOUNT_AMOUNT_N', 'TOT_DISCOUNT_AMOUNT_R']
    
    # Calculate total and average                      
    spd_order_profile['TOT_DISCOUNT'] = spd_order_profile['TOT_DISCOUNT_AMOUNT_N'] + spd_order_profile['TOT_DISCOUNT_AMOUNT_R']
    spd_order_profile['AVG_DISCOUNT'] = spd_order_profile['TOT_DISCOUNT'] / spd_order_profile['NUMBER_OF_ORDERS']
    
    # Save to a table, replace if existing
    timestamp = datetime.now().strftime("%Y_%m_%d_%H_%M_%S")
    save_path = f"{save_data_path}.customer_profile_{timestamp}"
    spd_order_profile.to_snowflake(name=save_path, if_exists="replace", index=False)    
    return f'Successful run with Modin:{spd.__version__}, Snowpark:{snowflake.snowpark.__version__}. Saved to {save_path}.'

Create and register a stored procedure based on the function

In [None]:
CREATE STAGE IF NOT EXISTS task_code_stage;

In [None]:
sp_customer_profile = session.sproc.register(name="create_customer_profile_sp", 
                                             func=create_customer_profile, replace=True, 
                                             is_permanent=True, 
                                             packages=['modin', 'snowflake-snowpark-python'], 
                                             stage_location='@task_code_stage')

Here's an example of how you can call the stored procedure manually:
```sql
CALL create_customer_profile_sp('{{SOURCE_DATA_PATH}}', '{{SAVE_DATA_PATH}}')
```

Rather than calling this manually, we will create a serverless task that calls the stored procedure. The task is set on a schedule to run once every minute. Note how we do not need to specify a warehouse size for Serverless Tasks.

In [None]:
from datetime import timedelta
# Register data pipeline function as a task
my_task = Task(name='create_customer_profile_task',
               definition=f"CALL create_customer_profile_sp('{SOURCE_DATA_PATH}', '{SAVE_DATA_PATH}')",
               schedule=timedelta(minutes=1))

In [None]:

DB_NAME = SAVE_DATA_PATH.split(".")[0]
SCHEMA_NAME = SAVE_DATA_PATH.split(".")[1]
tasks = root.databases[DB_NAME].schemas[SCHEMA_NAME].tasks
task_res = tasks.create(my_task,mode=CreateMode.or_replace)

In [None]:
SHOW TASKS LIKE '%CUSTOMER_PROFILE%' IN SCHEMA {{SAVE_DATA_PATH}}

By default, new tasks that are created are suspended, so we resume this to get the task to run.

In [None]:
task_res.resume()

While we are waiting for this task to run, let's take a look at a few things: 
- Commit changes to notebook with Git integration. See [commit history](https://github.com/snowflakedb/summit-python-data-pipeline-demo/commits/main/).
- View [Task Details](https://app.snowflake.com/pm/pm_aws_us_west_2/#/data/databases/DLEE_TEST/schemas/PUBLIC/task/CREATE_CUSTOMER_PROFILE_TASK) in Snowsight 

Now let's take a look at the task history and the status on the task runs using SQL:

In [None]:
SELECT * FROM TABLE({{DB_NAME}}.information_schema.task_history(task_name=> 'create_customer_profile_task'))
WHERE SCHEDULED_TIME >= CURRENT_TIMESTAMP() - INTERVAL '10 MINUTES';

Once the runs have completed, you will see the new table with the timestamp being created.
Note that Notebooks also support scheduling with Tasks through the UI.

In [None]:
SHOW TABLES LIKE 'CUSTOMER_PROFILE_%' IN {{SAVE_DATA_PATH}};

## Cleaning up

Using Python API, I can suspend the task so that it stops running on the schedule.

In [None]:
task_res.suspend()

 Teardown the tables created from the Tasks to clean up my environment.

In [None]:
tables = root.databases[DB_NAME].schemas[SCHEMA_NAME].tables.iter(like='CUSTOMER_PROFILE_%')
for table in tables:
    my_table_res = root.databases[DB_NAME].schemas[SCHEMA_NAME].tables[table.name]
    my_table_res.delete()
    print(f"Deleted {table.name}")

Verify that tables have been dropped

In [None]:
SHOW TABLES LIKE 'CUSTOMER_PROFILE_%' IN {{SAVE_DATA_PATH}};