In [0]:
spark

In [0]:
%fs ls dbfs:/public/retail_db/orders

path,name,size,modificationTime
dbfs:/public/retail_db/orders/part-00000,part-00000,2999944,1689009709052


In [0]:
orders_df =  spark.read.csv(
    'dbfs:/public/retail_db/orders', 
    schema='''
    order_id INT, 
    order_date DATE, 
    order_customer_id INT, 
    order_status STRING
    '''
)

order_items_df = spark.read.csv(
    'dbfs:/public/retail_db/order_items',
    schema='''
        order_item_id INT,
        order_item_order_id INT,
        order_item_product_id INT,
        order_item_quatity INT,
        order_item_subtotal FLOAT,
        order_item_product_price FLOAT
    '''
)

In [0]:
display(
    orders_df
)

order_id,order_date,order_customer_id,order_status
1,2013-07-25,11599,CLOSED
2,2013-07-25,256,PENDING_PAYMENT
3,2013-07-25,12111,COMPLETE
4,2013-07-25,8827,CLOSED
5,2013-07-25,11318,COMPLETE
6,2013-07-25,7130,COMPLETE
7,2013-07-25,4530,COMPLETE
8,2013-07-25,2911,PROCESSING
9,2013-07-25,5657,PENDING_PAYMENT
10,2013-07-25,5648,PENDING_PAYMENT


In [0]:
display(
    order_items_df
)

order_item_id,order_item_order_id,order_item_product_id,order_item_quatity,order_item_subtotal,order_item_product_price
1,1,957,1,299.98,299.98
2,2,1073,1,199.99,199.99
3,2,502,5,250.0,50.0
4,2,403,1,129.99,129.99
5,4,897,2,49.98,24.99
6,4,365,5,299.95,59.99
7,4,502,3,150.0,50.0
8,4,1014,4,199.92,49.98
9,5,957,1,299.98,299.98
10,5,365,5,299.95,59.99


In [0]:
from pyspark.sql.functions import *

In [0]:
daily_product_revenue_df = orders_df\
    .filter("order_status IN ('COMPLETE','CLOSED')")\
    .join(order_items_df, orders_df['order_id'] == order_items_df['order_item_order_id'])\
    .groupBy(orders_df['order_date'], order_items_df['order_item_order_id'])\
    .agg(round(sum('order_item_subtotal'), 2).alias('revenue'))

In [0]:
daily_product_revenue_df\
    .write\
        .mode('overwrite')\
            .parquet('/public/retail_db/daily_product_revenue_df')

In [0]:
daily_product_revenue_df.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(3) Project [order_date#357, order_item_order_id#365, round(sum(order_item_subtotal#368)#440, 2) AS revenue#441]
   +- *(3) HashAggregate(keys=[order_date#357, order_item_order_id#365], functions=[finalmerge_sum(merge sum#449) AS sum(order_item_subtotal#368)#440])
      +- AQEShuffleRead coalesced
         +- ShuffleQueryStage 1, Statistics(sizeInBytes=789.6 KiB, rowCount=2.53E+4, isRuntime=true)
            +- Exchange hashpartitioning(order_date#357, order_item_order_id#365, 200), ENSURE_REQUIREMENTS, [plan_id=828]
               +- *(2) HashAggregate(keys=[order_date#357, order_item_order_id#365], functions=[merge_sum(merge sum#449) AS sum#449])
                  +- *(2) Project [order_date#357, order_item_order_id#365, sum#449]
                     +- *(2) BroadcastHashJoin [order_id#356], [order_item_order_id#365], Inner, BuildLeft, false
                        :- ShuffleQueryStage 0, Statistics(sizeIn

In [0]:
daily_product_revenue_df.explain(True)

== Parsed Logical Plan ==
'Aggregate [order_date#357, order_item_order_id#365], [order_date#357, order_item_order_id#365, round(sum('order_item_subtotal), 2) AS revenue#441]
+- Join Inner, (order_id#356 = order_item_order_id#365)
   :- Filter order_status#359 IN (COMPLETE,CLOSED)
   :  +- Relation [order_id#356,order_date#357,order_customer_id#358,order_status#359] csv
   +- Relation [order_item_id#364,order_item_order_id#365,order_item_product_id#366,order_item_quatity#367,order_item_subtotal#368,order_item_product_price#369] csv

== Analyzed Logical Plan ==
order_date: date, order_item_order_id: int, revenue: double
Aggregate [order_date#357, order_item_order_id#365], [order_date#357, order_item_order_id#365, round(sum(order_item_subtotal#368), 2) AS revenue#441]
+- Join Inner, (order_id#356 = order_item_order_id#365)
   :- Filter order_status#359 IN (COMPLETE,CLOSED)
   :  +- Relation [order_id#356,order_date#357,order_customer_id#358,order_status#359] csv
   +- Relation [order_item

In [0]:
daily_product_revenue_df.explain('extended')

== Parsed Logical Plan ==
'Aggregate [order_date#357, order_item_order_id#365], [order_date#357, order_item_order_id#365, round(sum('order_item_subtotal), 2) AS revenue#441]
+- Join Inner, (order_id#356 = order_item_order_id#365)
   :- Filter order_status#359 IN (COMPLETE,CLOSED)
   :  +- Relation [order_id#356,order_date#357,order_customer_id#358,order_status#359] csv
   +- Relation [order_item_id#364,order_item_order_id#365,order_item_product_id#366,order_item_quatity#367,order_item_subtotal#368,order_item_product_price#369] csv

== Analyzed Logical Plan ==
order_date: date, order_item_order_id: int, revenue: double
Aggregate [order_date#357, order_item_order_id#365], [order_date#357, order_item_order_id#365, round(sum(order_item_subtotal#368), 2) AS revenue#441]
+- Join Inner, (order_id#356 = order_item_order_id#365)
   :- Filter order_status#359 IN (COMPLETE,CLOSED)
   :  +- Relation [order_id#356,order_date#357,order_customer_id#358,order_status#359] csv
   +- Relation [order_item

In [0]:
%sql 

DROP DATABASE IF EXISTS itversity_retail_db CASCADE

In [0]:
%sql

create database if not exists itversity_retail_db

In [0]:
%sql

use itversity_retail_db

In [0]:
%sql

SELECT current_database()

current_database()
itversity_retail_db


In [0]:
%sql

show tables

database,tableName,isTemporary


In [0]:

%sql
CREATE TABLE orders (
  order_id INT,
  order_date DATE,
  order_customer_id INT,
  order_status STRING
) USING CSV
OPTIONS (
  path='/public/retail_db/orders'
)

In [0]:
%sql

CREATE TABLE order_items (
	order_item_id INT,
	order_item_order_id INT,
	order_item_product_id INT, 
	order_item_quantity INT,
	order_item_subtotal FLOAT,
	order_item_product_price FLOAT
) USING CSV
OPTIONS (
  path='/public/retail_db/order_items'
)

In [0]:
%sql
INSERT OVERWRITE DIRECTORY 'dbfs:/public/retail_db/daily_product_revenue_df'
USING parquet
SELECT  o.order_date,
        oi.order_item_order_id,
        ROUND(SUM(oi.order_item_subtotal), 2) AS revenue
FROM orders AS o
  JOIN order_items AS oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE','CLOSED')
GROUP BY  o.order_date,
          oi.order_item_order_id
ORDER BY 1, 3 DESC

In [0]:
%sql

explain
INSERT OVERWRITE DIRECTORY 'dbfs:/public/retail_db/daily_product_revenue_df'
USING parquet
SELECT  o.order_date,
        oi.order_item_order_id,
        ROUND(SUM(oi.order_item_subtotal), 2) AS revenue
FROM orders AS o
  JOIN order_items AS oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE','CLOSED')
GROUP BY  o.order_date,
          oi.order_item_order_id
ORDER BY 1, 3 DESC

plan
"== Physical Plan == Execute InsertIntoDataSourceDirCommand  +- InsertIntoDataSourceDirCommand Storage(Location: dbfs:/public/retail_db/daily_product_revenue_df), parquet, true  +- Sort [order_date#528 ASC NULLS FIRST, revenue#692 DESC NULLS LAST], true  +- Aggregate [order_date#528, order_item_order_id#534], [order_date#528, order_item_order_id#534, round(sum(order_item_subtotal#537), 2) AS revenue#692]  +- Filter order_status#530 IN (COMPLETE,CLOSED)  +- Join Inner, (order_id#527 = order_item_order_id#534)  :- SubqueryAlias o  : +- SubqueryAlias spark_catalog.itversity_retail_db.orders  : +- Relation spark_catalog.itversity_retail_db.orders[order_id#527,order_date#528,order_customer_id#529,order_status#530] csv  +- SubqueryAlias oi  +- SubqueryAlias spark_catalog.itversity_retail_db.order_items  +- Relation spark_catalog.itversity_retail_db.order_items[order_item_id#533,order_item_order_id#534,order_item_product_id#535,order_item_quantity#536,order_item_subtotal#537,order_item_product_price#538] csv"


In [0]:
%sql

explain
SELECT  o.order_date,
        oi.order_item_order_id,
        ROUND(SUM(oi.order_item_subtotal), 2) AS revenue
FROM orders AS o
  JOIN order_items AS oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE','CLOSED')
GROUP BY  o.order_date,
          oi.order_item_order_id
ORDER BY 1, 3 DESC

plan
"== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Sort [order_date#528 ASC NULLS FIRST, revenue#632 DESC NULLS LAST], true, 0  +- Exchange rangepartitioning(order_date#528 ASC NULLS FIRST, revenue#632 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1464]  +- Project [order_date#528, order_item_order_id#534, round(sum(order_item_subtotal#537)#646, 2) AS revenue#632]  +- HashAggregate(keys=[order_date#528, order_item_order_id#534], functions=[finalmerge_sum(merge sum#648) AS sum(order_item_subtotal#537)#646])  +- Exchange hashpartitioning(order_date#528, order_item_order_id#534, 200), ENSURE_REQUIREMENTS, [plan_id=1460]  +- HashAggregate(keys=[order_date#528, order_item_order_id#534], functions=[merge_sum(merge sum#648) AS sum#648])  +- Project [order_date#528, order_item_order_id#534, sum#648]  +- BroadcastHashJoin [order_id#527], [order_item_order_id#534], Inner, BuildLeft, false  :- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1455]  : +- Project [order_id#527, order_date#528]  : +- Filter (order_status#530 IN (COMPLETE,CLOSED) AND isnotnull(order_id#527))  : +- FileScan csv spark_catalog.itversity_retail_db.orders[order_id#527,order_date#528,order_status#530] Batched: false, DataFilters: [order_status#530 IN (COMPLETE,CLOSED), isnotnull(order_id#527)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/public/retail_db/orders], PartitionFilters: [], PushedFilters: [In(order_status, [CLOSED,COMPLETE]), IsNotNull(order_id)], ReadSchema: struct  +- HashAggregate(keys=[order_item_order_id#534], functions=[partial_sum(order_item_subtotal#537) AS sum#648])  +- Filter isnotnull(order_item_order_id#534)  +- FileScan csv spark_catalog.itversity_retail_db.order_items[order_item_order_id#534,order_item_subtotal#537] Batched: false, DataFilters: [isnotnull(order_item_order_id#534)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/public/retail_db/order_items], PartitionFilters: [], PushedFilters: [IsNotNull(order_item_order_id)], ReadSchema: struct"


In [0]:
%sql

explain extended
SELECT  o.order_date,
        oi.order_item_order_id,
        ROUND(SUM(oi.order_item_subtotal), 2) AS revenue
FROM orders AS o
  JOIN order_items AS oi
    ON o.order_id = oi.order_item_order_id
WHERE o.order_status IN ('COMPLETE','CLOSED')
GROUP BY  o.order_date,
          oi.order_item_order_id
ORDER BY 1, 3 DESC

plan
"== Parsed Logical Plan == 'Sort [1 ASC NULLS FIRST, 3 DESC NULLS LAST], true +- 'Aggregate ['o.order_date, 'oi.order_item_order_id], ['o.order_date, 'oi.order_item_order_id, 'ROUND('SUM('oi.order_item_subtotal), 2) AS revenue#662]  +- 'Filter 'o.order_status IN (COMPLETE,CLOSED)  +- 'Join Inner, ('o.order_id = 'oi.order_item_order_id)  :- 'SubqueryAlias o  : +- 'UnresolvedRelation [orders], [], false  +- 'SubqueryAlias oi  +- 'UnresolvedRelation [order_items], [], false == Analyzed Logical Plan == order_date: date, order_item_order_id: int, revenue: double Sort [order_date#528 ASC NULLS FIRST, revenue#662 DESC NULLS LAST], true +- Aggregate [order_date#528, order_item_order_id#534], [order_date#528, order_item_order_id#534, round(sum(order_item_subtotal#537), 2) AS revenue#662]  +- Filter order_status#530 IN (COMPLETE,CLOSED)  +- Join Inner, (order_id#527 = order_item_order_id#534)  :- SubqueryAlias o  : +- SubqueryAlias spark_catalog.itversity_retail_db.orders  : +- Relation spark_catalog.itversity_retail_db.orders[order_id#527,order_date#528,order_customer_id#529,order_status#530] csv  +- SubqueryAlias oi  +- SubqueryAlias spark_catalog.itversity_retail_db.order_items  +- Relation spark_catalog.itversity_retail_db.order_items[order_item_id#533,order_item_order_id#534,order_item_product_id#535,order_item_quantity#536,order_item_subtotal#537,order_item_product_price#538] csv == Optimized Logical Plan == Sort [order_date#528 ASC NULLS FIRST, revenue#662 DESC NULLS LAST], true +- Project [order_date#528, order_item_order_id#534, round(sum(order_item_subtotal#537)#676, 2) AS revenue#662]  +- AggregatePart [order_date#528, order_item_order_id#534], [finalmerge_sum(merge sum#678) AS sum(order_item_subtotal#537)#676], true  +- AggregatePart [order_date#528, order_item_order_id#534], [merge_sum(merge sum#678) AS sum#678], false  +- Project [order_date#528, order_item_order_id#534, sum#678]  +- Join Inner, (order_id#527 = order_item_order_id#534)  :- Project [order_id#527, order_date#528]  : +- Filter (order_status#530 IN (COMPLETE,CLOSED) AND isnotnull(order_id#527))  : +- Relation spark_catalog.itversity_retail_db.orders[order_id#527,order_date#528,order_customer_id#529,order_status#530] csv  +- AggregatePart [order_item_order_id#534], [partial_sum(order_item_subtotal#537) AS sum#678], false  +- Project [order_item_order_id#534, order_item_subtotal#537]  +- Filter isnotnull(order_item_order_id#534)  +- Relation spark_catalog.itversity_retail_db.order_items[order_item_id#533,order_item_order_id#534,order_item_product_id#535,order_item_quantity#536,order_item_subtotal#537,order_item_product_price#538] csv == Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- Sort [order_date#528 ASC NULLS FIRST, revenue#662 DESC NULLS LAST], true, 0  +- Exchange rangepartitioning(order_date#528 ASC NULLS FIRST, revenue#662 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1568]  +- Project [order_date#528, order_item_order_id#534, round(sum(order_item_subtotal#537)#676, 2) AS revenue#662]  +- HashAggregate(keys=[order_date#528, order_item_order_id#534], functions=[finalmerge_sum(merge sum#678) AS sum(order_item_subtotal#537)#676], output=[order_date#528, order_item_order_id#534, sum(order_item_subtotal#537)#676])  +- Exchange hashpartitioning(order_date#528, order_item_order_id#534, 200), ENSURE_REQUIREMENTS, [plan_id=1564]  +- HashAggregate(keys=[order_date#528, order_item_order_id#534], functions=[merge_sum(merge sum#678) AS sum#678], output=[order_date#528, order_item_order_id#534, sum#678])  +- Project [order_date#528, order_item_order_id#534, sum#678]  +- BroadcastHashJoin [order_id#527], [order_item_order_id#534], Inner, BuildLeft, false  :- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=1559]  : +- Project [order_id#527, order_date#528]  : +- Filter (order_status#530 IN (COMPLETE,CLOSED) AND isnotnull(order_id#527))  : +- FileScan csv spark_catalog.itversity_retail_db.orders[order_id#527,order_date#528,order_status#530] Batched: false, DataFilters: [order_status#530 IN (COMPLETE,CLOSED), isnotnull(order_id#527)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/public/retail_db/orders], PartitionFilters: [], PushedFilters: [In(order_status, [CLOSED,COMPLETE]), IsNotNull(order_id)], ReadSchema: struct  +- HashAggregate(keys=[order_item_order_id#534], functions=[partial_sum(order_item_subtotal#537) AS sum#678], output=[order_item_order_id#534, sum#678])  +- Filter isnotnull(order_item_order_id#534)  +- FileScan csv spark_catalog.itversity_retail_db.order_items[order_item_order_id#534,order_item_subtotal#537] Batched: false, DataFilters: [isnotnull(order_item_order_id#534)], Format: CSV, Location: InMemoryFileIndex(1 paths)[dbfs:/public/retail_db/order_items], PartitionFilters: [], PushedFilters: [IsNotNull(order_item_order_id)], ReadSchema: struct"
