# Set Up

In [0]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import Window

# Notebook variables - inherit from pipeline or job
catalog = dbutils.widgets.get("catalog")
schema = dbutils.widgets.get("schema")

# load dataframes for all silver tables

df_status_silver_pyspark = spark.read.table(f"{catalog}.{schema}.status_silver_pyspark")
df_orders_silver_pyspark = spark.read.table(f"{catalog}.{schema}.orders_silver_pyspark")
df_customers_silver_pyspark = spark.read.table(f"{catalog}.{schema}.customers_silver_pyspark")
df_order_status_rank = spark.read.table(f"{catalog}.{schema}.order_status_ranking")

# Orders By State

In [0]:
# create or replace orders by state gold table

df_gold_orders_by_city_state = (df_orders_silver_pyspark.alias("orders")
                           .join(df_customers_silver_pyspark.alias("customers"), F.col("customers.customer_id") == F.col("orders.customer_id"), "left")
                           .select(F.col("orders.order_id").alias("order_id"),
                                   F.col("customers.city").alias("city"),
                                   F.col("customers.state").alias("state")
                                    )
                           .groupBy("city","state")
                           .agg(F.count("order_id").alias("total_orders")
                           )
)

df_gold_orders_by_city_state.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.gold_orders_by_city_state_pyspark")

# Returns by Customer

In [0]:
# create or replace returns by customer gold table

df_returned_orders = df_status_silver_pyspark.filter(F.col("order_status").isin("return requested", "return picked up", "return processed")).select("order_id").distinct()

df_order_customer_ids = df_orders_silver_pyspark.select("order_id", "customer_id")

df_gold_returns_by_customer = (df_returned_orders.alias("returned")
                               .join(df_order_customer_ids.alias("ids"), F.col("returned.order_id") == F.col("ids.order_id"), "left")
                               .join(df_customers_silver_pyspark.alias("customers"), F.col("ids.customer_id") == F.col("customers.customer_id"), "left")
                               .select(
                                   F.col("customers.customer_id").alias("customer_id"),
                                   F.col("customers.city").alias("city"),
                                   F.col("customers.state").alias("state"),
                                   F.col("returned.order_id").alias("order_id")
                               )
                               .groupBy("customer_id", "city", "state")
                               .agg(F.count("order_id").alias("total_returns"))
                               )

df_gold_returns_by_customer.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.gold_returns_by_customer_pyspark")

# Customer Order History

In [0]:
# create or replace customer order history gold table

df_customer_order_history = (df_orders_silver_pyspark.alias("orders")
                             .join(df_status_silver_pyspark.alias("status"), F.col("status.order_id") == F.col("orders.order_id"))
                             .join(df_customers_silver_pyspark.alias("customers"), F.col("customers.customer_id") == F.col("orders.customer_id"))
                             .select(
                                 F.col("customers.customer_id").alias("customer_id"),
                                 F.col("orders.order_id").alias("order_id"),                                 
                                 F.col("status.order_status").alias("order_status"),
                                 F.col("status.status_timestamp").alias("status_update_time")
                             )
                             .orderBy("customer_id", "order_id", F.desc("status_update_time"))
)

df_customer_order_history.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.gold_customer_order_history_pyspark")

# Current Order Status

In [0]:
# gold current order status

# build current order status table with window functions

df_status_rank = df_status_silver_pyspark.alias("status").join(df_order_status_rank.alias("rank"), F.col("rank.order_status") == F.col("status.order_status"), "left").select("status.*", "rank.rank")

current_status_window = Window.partitionBy('order_id').orderBy(F.desc("rank"))

df_status_rank_windowed = (df_status_rank.alias("status_rank").withColumn("current_order_num", F.row_number().over(current_status_window)))

df_current_orders = (df_status_rank_windowed.filter(F.col("current_order_num") == 1).alias("current_status")
        .join(
            df_orders_silver_pyspark.alias("orders"), 
            F.col("current_status.order_id") == F.col("orders.order_id"),
            "left")
        .select(
            F.col("current_status.order_id"),
            F.col("orders.customer_id"),
            F.col("current_status.order_status").alias("current_order_status"),
            F.col("current_status.status_timestamp").alias("current_status_timestamp"),
            F.col("orders.order_timestamp").alias("order_placed_timestamp")
        )
)

df_current_orders.write.mode("overwrite").saveAsTable(f"{catalog}.{schema}.gold_current_orders_pyspark")