In [0]:
%fs ls dbfs:/public/

path,name,size,modificationTime
dbfs:/public/retail_db/,retail_db/,0,1747124194777
dbfs:/public/retail_db_bronze/,retail_db_bronze/,0,0
dbfs:/public/retail_db_gold/,retail_db_gold/,0,0


In [0]:
from pyspark.sql.functions import col, sum, round

In [0]:
orders = spark.read.csv("dbfs:/public/retail_db/orders", schema = 'order_id INT, order_date TIMESTAMP, order_customer_id INT, order_status STRING')

In [0]:
order_items = spark.read.csv("dbfs:/public/retail_db/order_items", schema = 'order_item_id INT, order_item_order_id INT, order_item_product_id INT, order_item_quantity INT, order_item_subtotal FLOAT, order_item_product_price FLOAT')

In [0]:
daily_product_revenue = (orders\
                         .filter('''order_status in ('COMPLETE', 'CLOSED')''')\
                         .join(order_items, orders.order_id == order_items.order_item_order_id)\
                         .groupBy(orders.order_date, order_items.order_item_product_id)\
                         .agg(round(sum(order_items.order_item_subtotal), 2).alias('revenue')))

In [0]:
daily_product_revenue.write.mode('overwrite').parquet("dbfs:/public/retail_db/daily_product_revenue")

In [0]:
daily_product_revenue.explain(True)

== Parsed Logical Plan ==
'Aggregate [order_date#37, order_item_product_id#46], [order_date#37, order_item_product_id#46, 'round('sum(order_item_subtotal#48), 2) AS revenue#88]
+- Join Inner, (order_id#36 = order_item_order_id#45)
   :- Filter order_status#39 IN (COMPLETE,CLOSED)
   :  +- Relation [order_id#36,order_date#37,order_customer_id#38,order_status#39] csv
   +- Relation [order_item_id#44,order_item_order_id#45,order_item_product_id#46,order_item_quantity#47,order_item_subtotal#48,order_item_product_price#49] csv

== Analyzed Logical Plan ==
order_date: timestamp, order_item_product_id: int, revenue: double
Aggregate [order_date#37, order_item_product_id#46], [order_date#37, order_item_product_id#46, round(sum(order_item_subtotal#48), 2) AS revenue#88]
+- Join Inner, (order_id#36 = order_item_order_id#45)
   :- Filter order_status#39 IN (COMPLETE,CLOSED)
   :  +- Relation [order_id#36,order_date#37,order_customer_id#38,order_status#39] csv
   +- Relation [order_item_id#44,orde

In [0]:
daily_product_revenue.explain('extended')

== Parsed Logical Plan ==
'Aggregate [order_date#37, order_item_product_id#46], [order_date#37, order_item_product_id#46, 'round('sum(order_item_subtotal#48), 2) AS revenue#88]
+- Join Inner, (order_id#36 = order_item_order_id#45)
   :- Filter order_status#39 IN (COMPLETE,CLOSED)
   :  +- Relation [order_id#36,order_date#37,order_customer_id#38,order_status#39] csv
   +- Relation [order_item_id#44,order_item_order_id#45,order_item_product_id#46,order_item_quantity#47,order_item_subtotal#48,order_item_product_price#49] csv

== Analyzed Logical Plan ==
order_date: timestamp, order_item_product_id: int, revenue: double
Aggregate [order_date#37, order_item_product_id#46], [order_date#37, order_item_product_id#46, round(sum(order_item_subtotal#48), 2) AS revenue#88]
+- Join Inner, (order_id#36 = order_item_order_id#45)
   :- Filter order_status#39 IN (COMPLETE,CLOSED)
   :  +- Relation [order_id#36,order_date#37,order_customer_id#38,order_status#39] csv
   +- Relation [order_item_id#44,orde