# End-to-end Snowpark jaffle_shop in Python

For funsies.

## Imports

In [1]:
# tracking
import mlflow

# pydata/ml
import sklearn as sklearn

import numpy as np
import pandas as pd
import lightgbm as lgb

from sklearn.decomposition import PCA
from sklearn.model_selection import train_test_split

# viz
import seaborn as sns
import matplotlib as mpl
import matplotlib.pyplot as plt

# snowflake
import yaml
import snowflake.snowpark

from snowflake.snowpark import types
from snowflake.snowpark import Session
from snowflake.snowpark.functions import col, udf, sql_expr

## Setup

In [2]:
# setup viz defaults
sns.set_theme()
sns.set(rc={"figure.figsize": (16, 18)})
sns.set_style("darkgrid")
plt.style.use(["dark_background"])

## Snowpark session

In [3]:
with open("/home/vscode/.dbt/profiles.yml", "r") as f:
    profiles = yaml.safe_load(f)
    dev_profile = profiles["snowflake"]["outputs"]["dev"]

conn_params = {
    "account": dev_profile["account"],
    "user": dev_profile["user"],
    "role": dev_profile["role"],
    "warehouse": dev_profile["warehouse"],
    "database": dev_profile["database"],
    "schema": dev_profile["schema"],
    "authenticator": dev_profile["authenticator"],
}
conn_params

{'account': 'ska67070',
 'user': 'cody.peterson@fishtownanalytics.com',
 'role': 'transformer',
 'warehouse': 'transforming',
 'database': 'analytics',
 'schema': 'dbt_cody',
 'authenticator': 'externalbrowser'}

In [4]:
s = Session.builder.configs(conn_params).create()

Initiating login request with your identity provider. A browser window should have opened for you to complete the login. If you can't see it, check existing browser windows, or your OS settings. Press CTRL+C to abort and try again...


## Raw data (sources)

In [5]:
raw_customers = s.table("raw_customers")
raw_customers.show(5)

-------------------------------------
|"ID"  |"FIRST_NAME"  |"LAST_NAME"  |
-------------------------------------
|1     |Michael       |P.           |
|2     |Shawn         |M.           |
|3     |Kathleen      |P.           |
|4     |Jimmy         |C.           |
|5     |Katherine     |R.           |
-------------------------------------



In [6]:
raw_orders = s.table("raw_orders")
raw_orders.show(5)

-----------------------------------------------
|"ID"  |"USER_ID"  |"ORDER_DATE"  |"STATUS"   |
-----------------------------------------------
|1     |1          |2018-01-01    |returned   |
|2     |3          |2018-01-02    |completed  |
|3     |94         |2018-01-04    |completed  |
|4     |50         |2018-01-05    |completed  |
|5     |64         |2018-01-05    |completed  |
-----------------------------------------------



In [7]:
raw_payments = s.table("raw_payments")
raw_payments.show(5)

---------------------------------------------------
|"ID"  |"ORDER_ID"  |"PAYMENT_METHOD"  |"AMOUNT"  |
---------------------------------------------------
|1     |1           |credit_card       |1000      |
|2     |2           |credit_card       |2000      |
|3     |3           |coupon            |100       |
|4     |4           |coupon            |2500      |
|5     |5           |bank_transfer     |1700      |
---------------------------------------------------



## Staging data

In [8]:
customers_rename = {"id": "customer_id"}
customers_rename = {
    key.upper(): value.upper() for key, value in customers_rename.items()
}  # snowpark

# stg_customers = raw_customers.rename(customers_rename) # pandas

# Option A
# stg_customers = raw_customers.select(
#     *[
#         col(col_name).as_(customers_rename[col_name])
#         if col_name in customers_rename
#         else col_name
#         for col_name in raw_customers.schema.names
#     ]
# )

stg_customers = raw_customers.select("*")
for col_name in customers_rename:
    stg_customers = stg_customers.rename(
        stg_customers[col_name], customers_rename[col_name]
    )

stg_customers.show(5)

----------------------------------------------
|"CUSTOMER_ID"  |"FIRST_NAME"  |"LAST_NAME"  |
----------------------------------------------
|1              |Michael       |P.           |
|2              |Shawn         |M.           |
|3              |Kathleen      |P.           |
|4              |Jimmy         |C.           |
|5              |Katherine     |R.           |
----------------------------------------------



In [9]:
orders_rename = {"id": "order_id", "user_id": "customer_id"}
orders_rename = {
    key.upper(): value.upper() for key, value in orders_rename.items()
}  # snowpark

# stg_orders = raw_orders.rename(columns=orders_rename) # pandas
# Option A
# stg_orders = raw_orders.select(
#     *[
#         col(col_name).as_(orders_rename[col_name])
#         if col_name in orders_rename
#         else col_name
#         for col_name in raw_orders.schema.names
#     ]
# )
stg_orders = raw_orders.select("*")
for col_name in orders_rename:
    stg_orders = stg_orders.rename(stg_orders[col_name], orders_rename[col_name])

stg_orders.show(5)

---------------------------------------------------------
|"ORDER_ID"  |"CUSTOMER_ID"  |"ORDER_DATE"  |"STATUS"   |
---------------------------------------------------------
|1           |1              |2018-01-01    |returned   |
|2           |3              |2018-01-02    |completed  |
|3           |94             |2018-01-04    |completed  |
|4           |50             |2018-01-05    |completed  |
|5           |64             |2018-01-05    |completed  |
---------------------------------------------------------



In [10]:
payments_rename = {"id": "payment_id"}
payments_rename = {
    key.upper(): value.upper() for key, value in payments_rename.items()
}  # snowpark

# stg_payments = raw_payments.rename(columns=payments_rename) # pandas
# stg_payments["amount"] /= 100  # this makes cents into dollars

stg_payments = raw_payments.select(
    *[
        col(col_name).as_(payments_rename[col_name])
        if col_name in payments_rename
        else col_name
        for col_name in raw_payments.schema.names
        if col_name != "AMOUNT"
    ],
    (raw_payments["amount"] / 100).as_("amount"),
)

stg_payments.show(5)

------------------------------------------------------------
|"PAYMENT_ID"  |"ORDER_ID"  |"PAYMENT_METHOD"  |"AMOUNT"   |
------------------------------------------------------------
|1             |1           |credit_card       |10.000000  |
|2             |2           |credit_card       |20.000000  |
|3             |3           |coupon            |1.000000   |
|4             |4           |coupon            |25.000000  |
|5             |5           |bank_transfer     |17.000000  |
------------------------------------------------------------



## Final models

In [11]:
# pandas
# customer_orders = (
#     stg_orders.groupby("customer_id")
#     .agg(
#         first_order=("order_date", "min"),
#         most_recent_order=("order_date", "max"),
#         number_of_orders=("order_id", "count"),
#     )
#     .reset_index()
# )

customer_orders = stg_orders.group_by("customer_id").agg(
    [
        (stg_orders["order_date"], "min"),
        (stg_orders["order_date"], "max"),
        (stg_orders["order_id"], "count"),
    ]
)

customer_orders.show(5)

-----------------------------------------------------------------------------
|"CUSTOMER_ID"  |"MIN(ORDER_DATE)"  |"MAX(ORDER_DATE)"  |"COUNT(ORDER_ID)"  |
-----------------------------------------------------------------------------
|1              |2018-01-01         |2018-02-10         |2                  |
|3              |2018-01-02         |2018-03-11         |3                  |
|94             |2018-01-04         |2018-01-29         |2                  |
|64             |2018-01-05         |2018-01-20         |2                  |
|54             |2018-01-07         |2018-03-24         |5                  |
-----------------------------------------------------------------------------



In [12]:
stg_orders.show(5)

---------------------------------------------------------
|"ORDER_ID"  |"CUSTOMER_ID"  |"ORDER_DATE"  |"STATUS"   |
---------------------------------------------------------
|1           |1              |2018-01-01    |returned   |
|2           |3              |2018-01-02    |completed  |
|3           |94             |2018-01-04    |completed  |
|4           |50             |2018-01-05    |completed  |
|5           |64             |2018-01-05    |completed  |
---------------------------------------------------------



In [13]:
# pandas
# customer_payments = (
#     stg_payments.merge(stg_orders, on="order_id", how="left")
#     .groupby("customer_id")
#     .agg(stg_orders["amount"], "sum")
# )
customer_payments = (
    stg_payments.join(stg_orders, using_columns=["order_id"], join_type="left")
    .group_by("customer_id")
    .agg(
        [
            (stg_payments["amount"], "sum"),
        ]
    )
)

customer_payments.show(5)

---------------------------------
|"CUSTOMER_ID"  |"SUM(AMOUNT)"  |
---------------------------------
|1              |33.000000      |
|3              |65.000000      |
|94             |24.000000      |
|50             |47.000000      |
|64             |30.000000      |
---------------------------------



In [17]:
customers_rename = {"sum(amount)": "customer_lifetime_value"}
customers_rename = {
    key.upper(): value.upper() for key, value in customers_rename.items()
}

# copilot actually wrote this line, minus the renaming (maybe would have if I'd added the dictionary?)
customers = stg_customers.join(
    customer_orders, stg_customers["customer_id"] == customer_orders["customer_id"]
).join(
    customer_payments, stg_customers["customer_id"] == customer_payments["customer_id"]
)
for col_name in customers_rename:
    customers = customers.rename(customers[col_name], customers_rename[col_name])

customers.show(5)

SnowparkColumnException: (1105): The DataFrame does not contain the column named TOTAL_AMOUNT.

In [None]:
payment_methods = ["credit_card", "coupon", "bank_transfer", "gift_card"]

order_payments_renames = {
    f"{payment_method}": f"{payment_method}_amount"
    for payment_method in payment_methods
}

order_payments_totals = stg_payments.groupby("order_id").agg(
    total_amount=("amount", "sum")
)

order_payments = (
    stg_payments.groupby(["order_id", "payment_method"])
    .agg(payment_method_amount=("amount", "sum"))
    .reset_index()
    .pivot(index="order_id", columns="payment_method", values="payment_method_amount")
    .rename(columns=order_payments_renames)
    .merge(order_payments_totals, on="order_id", how="left")
    .reset_index()
)

order_payments.show(5)

In [None]:
orders_renames = {"total_amount": "amount"}

orders = stg_orders.merge(order_payments, on="order_id", how="left").rename(
    columns=orders_renames
)

orders.show(5)