In [0]:
%fs ls dbfs:/FileStore/tables/Data/

path,name,size,modificationTime
dbfs:/FileStore/tables/Data/customer/,customer/,0,0
dbfs:/FileStore/tables/Data/order_items/,order_items/,0,0
dbfs:/FileStore/tables/Data/orders/,orders/,0,0


In [0]:
customer_df=spark.read.csv('dbfs:/FileStore/tables/Data/customer/part_00000',schema="""customer_id INT,customer_fname STRING,customer_lname STRING,customer_email STRING,customer_password STRING,customer_street STRING,customer_city STRING,customer_state STRING,customer_zipcode INT""")

orders_df=spark.read.csv('dbfs:/FileStore/tables/Data/orders/part_00000',schema="""order_id INT,order_date DATE,order_customer_id INT,order_status STRING""")

order_items_df=spark.read.csv('dbfs:/FileStore/tables/Data/order_items/part_00000',schema="""order_item_id INT,order_item_order_id INT,order_item_product_id INT,order_item_quantity INT,order_item_subtotal FLOAT,order_item_product_price FLOAT""")

In [0]:
customers_orders_df=customer_df.join(orders_df,customer_df['customer_id']==orders_df['order_customer_id'])

In [0]:
customers_orders_df.show(1)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+--------+----------+-----------------+------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|order_id|order_date|order_customer_id|order_status|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+--------+----------+-----------------+------------+
|      11599|          Mary|        Malone|     XXXXXXXXX|        XXXXXXXXX|8708 Indian Horse...|      Hickory|            NC|           28601|       1|2013-07-25|            11599|      CLOSED|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+--------+----------+-----------------+------------+
only showing top 1 row



In [0]:
customers_orders_df.select('customer_id','order_id','order_date','order_status').orderBy('customer_id').show(10)

+-----------+--------+----------+---------------+
|customer_id|order_id|order_date|   order_status|
+-----------+--------+----------+---------------+
|          1|   22945|2013-12-13|       COMPLETE|
|          2|   15192|2013-10-29|PENDING_PAYMENT|
|          2|   33865|2014-02-18|       COMPLETE|
|          2|   57963|2013-08-02|        ON_HOLD|
|          2|   67863|2013-11-30|       COMPLETE|
|          3|   35158|2014-02-26|       COMPLETE|
|          3|   22646|2013-12-11|       COMPLETE|
|          3|   57617|2014-07-24|       COMPLETE|
|          3|   23662|2013-12-19|       COMPLETE|
|          3|   56178|2014-07-15|        PENDING|
+-----------+--------+----------+---------------+
only showing top 10 rows



In [0]:
from pyspark.sql.functions import struct

In [0]:
customers_orders_df.select('customer_id',struct('order_id','order_date','order_status').alias('order_details')).orderBy('customer_id').show(10)

+-----------+--------------------+
|customer_id|       order_details|
+-----------+--------------------+
|          1|{22945, 2013-12-1...|
|          2|{15192, 2013-10-2...|
|          2|{33865, 2014-02-1...|
|          2|{57963, 2013-08-0...|
|          2|{67863, 2013-11-3...|
|          3|{35158, 2014-02-2...|
|          3|{22646, 2013-12-1...|
|          3|{57617, 2014-07-2...|
|          3|{23662, 2013-12-1...|
|          3|{56178, 2014-07-1...|
+-----------+--------------------+
only showing top 10 rows



In [0]:
customer_order_struct=customers_orders_df.select('customer_id',struct('order_id','order_date','order_status').alias('order_details'))

In [0]:
from pyspark.sql.functions import collect_list

In [0]:
final_df=customer_order_struct.groupBy('customer_id').agg(collect_list('order_details').alias('order_details')).orderBy('customer_id')

In [0]:
final_df.show(2,truncate=False)

+-----------+----------------------------------------------------------------------------------------------------------------------------------+
|customer_id|order_details                                                                                                                     |
+-----------+----------------------------------------------------------------------------------------------------------------------------------+
|1          |[{22945, 2013-12-13, COMPLETE}]                                                                                                   |
|2          |[{15192, 2013-10-29, PENDING_PAYMENT}, {33865, 2014-02-18, COMPLETE}, {57963, 2013-08-02, ON_HOLD}, {67863, 2013-11-30, COMPLETE}]|
+-----------+----------------------------------------------------------------------------------------------------------------------------------+
only showing top 2 rows



In [0]:
final_df.coalesce(1).write.json('dbfs:/FileStore/tables/Data/final')

In [0]:
%fs ls dbfs:/FileStore/tables/Data/final

path,name,size,modificationTime
dbfs:/FileStore/tables/Data/final/_SUCCESS,_SUCCESS,0,1666591534000
dbfs:/FileStore/tables/Data/final/_committed_6795189959550936524,_committed_6795189959550936524,113,1666591533000
dbfs:/FileStore/tables/Data/final/_started_6795189959550936524,_started_6795189959550936524,0,1666591531000
dbfs:/FileStore/tables/Data/final/part-00000-tid-6795189959550936524-4d64a36c-78bd-41bb-8fb5-784e13047dbc-32-1-c000.json,part-00000-tid-6795189959550936524-4d64a36c-78bd-41bb-8fb5-784e13047dbc-32-1-c000.json,5475226,1666591532000


In [0]:
customer_details=customer_df. \
join(orders_df,customer_df['customer_id']==orders_df['order_customer_id']). \
join(order_items_df,orders_df['order_id']==order_items_df['order_item_order_id'])

In [0]:
from pyspark.sql.functions import struct, collect_list

In [0]:
denorm_df=customer_details. \
select('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status',struct('order_item_id','order_item_product_id','order_item_subtotal').alias('order_item_details')). \
groupBy('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status'). \
agg(collect_list('order_item_details').alias('order_item_details')). \
orderBy('customer_id'). \
select('customer_id','customer_fname','customer_lname','customer_email',struct('order_id','order_date','order_status','order_item_details').alias('order_details')). \
groupBy('customer_id','customer_fname','customer_lname','customer_email'). \
agg(collect_list('order_details').alias('order_details')). \
orderBy('customer_id')

In [0]:
denorm_df.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_fname: string (nullable = true)
 |-- customer_lname: string (nullable = true)
 |-- customer_email: string (nullable = true)
 |-- order_details: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- order_id: integer (nullable = true)
 |    |    |-- order_date: date (nullable = true)
 |    |    |-- order_status: string (nullable = true)
 |    |    |-- order_item_details: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- order_item_id: integer (nullable = true)
 |    |    |    |    |-- order_item_product_id: integer (nullable = true)
 |    |    |    |    |-- order_item_subtotal: float (nullable = true)



In [0]:
denorm_df.coalesce(1).write.mode('overwrite').json('dbfs:/FileStore/tables/Data/denorm')

In [0]:
%fs ls dbfs:/FileStore/tables/Data/denorm

path,name,size,modificationTime
dbfs:/FileStore/tables/Data/denorm/_SUCCESS,_SUCCESS,0,1666687404000
dbfs:/FileStore/tables/Data/denorm/_committed_4357456608139543307,_committed_4357456608139543307,114,1666600396000
dbfs:/FileStore/tables/Data/denorm/_committed_7013245287221208694,_committed_7013245287221208694,213,1666687403000
dbfs:/FileStore/tables/Data/denorm/_committed_vacuum7584468855395819167,_committed_vacuum7584468855395819167,96,1666687405000
dbfs:/FileStore/tables/Data/denorm/_started_7013245287221208694,_started_7013245287221208694,0,1666687399000
dbfs:/FileStore/tables/Data/denorm/part-00000-tid-7013245287221208694-e5750a45-1f05-4695-9456-9237f24b0c0c-46-1-c000.json,part-00000-tid-7013245287221208694-e5750a45-1f05-4695-9456-9237f24b0c0c-46-1-c000.json,20949617,1666687402000


In [0]:
json_df=spark.read.json('dbfs:/FileStore/tables/Data/denorm/part-00000-tid-7013245287221208694-e5750a45-1f05-4695-9456-9237f24b0c0c-46-1-c000.json')

In [0]:
from pyspark.sql.functions import explode,filter

In [0]:
json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
filter('order_details.order_date LIKE "2014-01-01%"'). \
orderBy('customer_id'). \
select('customer_id','customer_fname','order_details.order_id','order_details.order_date','order_details.order_status')

Out[58]: DataFrame[customer_id: bigint, customer_fname: string, order_id: bigint, order_date: string, order_status: string]

In [0]:
from pyspark.sql.functions import date_format,col

In [0]:
flatten=json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
select('customer_id','customer_fname',col('order_details.order_date').alias('order_date'),col('order_details.order_id').alias('order_id'),col('order_details.order_status').alias('order_status'),explode('order_details.order_item_details').alias('order_item_details')). \
select('customer_id','customer_fname','order_date','order_id','order_status','order_item_details.order_item_id','order_item_details.order_item_product_id','order_item_details.order_item_subtotal')


In [0]:
flatten.show(10)

+-----------+--------------+----------+--------+---------------+-------------+---------------------+-------------------+
|customer_id|customer_fname|order_date|order_id|   order_status|order_item_id|order_item_product_id|order_item_subtotal|
+-----------+--------------+----------+--------+---------------+-------------+---------------------+-------------------+
|          1|       Richard|2013-12-13|   22945|       COMPLETE|        57439|                  191|             499.95|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145023|                 1014|             149.94|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145022|                 1014|              99.96|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145021|                  627|             199.95|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145020|                 1073|             199.99|
|          2|          Mary|2013

In [0]:
from pyspark.sql.functions import to_date

In [0]:
from pyspark.sql import Row
from pyspark.sql.functions import sum as _sum

In [0]:
flatten.select('customer_id','customer_fname',col("order_date"),to_date(col("order_date"),"yyyy-MM-dd").alias("order_date_converted"),'order_status','order_item_subtotal'). \
filter("order_status IN ('COMPLETE','CLOSED')"). \
groupBy(date_format('order_date_converted','yyyy-MM').alias('order_month')). \
agg(_sum('order_item_subtotal').alias('Revenue')). \
orderBy('order_month'). \
show(10)

+-----------+------------------+
|order_month|           Revenue|
+-----------+------------------+
|    2013-07| 333465.4500000003|
|    2013-08|1221828.9000000027|
|    2013-09|1302255.8000000028|
|    2013-10|1171686.9200000018|
|    2013-11|1379935.3300000008|
|    2013-12| 1277719.600000003|
|    2014-01|1230221.7400000019|
|    2014-02|1217770.0900000029|
|    2014-03|1271350.9700000028|
|    2014-04|1249723.5200000028|
+-----------+------------------+
only showing top 10 rows

