In [0]:
# Check if all required files are added
display(dbutils.fs.ls("dbfs:/FileStore/tables/Data/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/Data/Customers/,Customers/,0,0
dbfs:/FileStore/tables/Data/Order_Items/,Order_Items/,0,0
dbfs:/FileStore/tables/Data/Orders/,Orders/,0,0
dbfs:/FileStore/tables/Data/denormalized/,denormalized/,0,0
dbfs:/FileStore/tables/Data/final/,final/,0,0


In [0]:
# Create Dataframes for Customers,Orders and Order_Items.
# We need to define schema while creating dataframes

customers_df = spark.read.csv('dbfs:/FileStore/tables/Data/Customers/part_00000',schema="""customer_id INT,customer_fname STRING,customer_lname STRING,customer_email STRING,customer_password STRING,customer_street STRING,customer_city STRING,customer_state STRING,customer_zipcode INT""")

orders_df = spark.read.csv('dbfs:/FileStore/tables/Data/Orders/part_00000', schema="""order_id INT,order_date DATE,order_customer_id INT,order_status STRING""")

order_items_df = spark.read.csv('dbfs:/FileStore/tables/Data/Order_Items/part_00000', schema="""order_item_id INT,order_item_order_id INT,order_item_product_id INT,order_item_quantity INT,order_item_subtotal FLOAT,order_item_product_price FLOAT""")

In [0]:
# View dataframe
customers_df.show(2)
orders_df.show(2)
order_items_df.show(2)

+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_city|customer_state|customer_zipcode|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
|          1|       Richard|     Hernandez|     XXXXXXXXX|        XXXXXXXXX|  6303 Heather Plaza|  Brownsville|            TX|           78521|
|          2|          Mary|       Barrett|     XXXXXXXXX|        XXXXXXXXX|9526 Noble Embers...|    Littleton|            CO|           80126|
+-----------+--------------+--------------+--------------+-----------------+--------------------+-------------+--------------+----------------+
only showing top 2 rows

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|

### Join Tables into new DataFrame(Order_Details) to create new denormalized dataframe.
##### Join1: Customer with Orders

In [0]:
customers_orders_df = customers_df.join(orders_df, customers_df["customer_id"] == orders_df["order_customer_id"])

##### Select The Joined Data

In [0]:
customers_orders_df.select('customer_id','customer_fname','customer_lname','order_id','order_date','order_status').orderBy('customer_id').show(20)

+-----------+--------------+--------------+--------+----------+---------------+
|customer_id|customer_fname|customer_lname|order_id|order_date|   order_status|
+-----------+--------------+--------------+--------+----------+---------------+
|          1|       Richard|     Hernandez|   22945|2013-12-13|       COMPLETE|
|          2|          Mary|       Barrett|   15192|2013-10-29|PENDING_PAYMENT|
|          2|          Mary|       Barrett|   33865|2014-02-18|       COMPLETE|
|          2|          Mary|       Barrett|   57963|2013-08-02|        ON_HOLD|
|          2|          Mary|       Barrett|   67863|2013-11-30|       COMPLETE|
|          3|           Ann|         Smith|   22646|2013-12-11|       COMPLETE|
|          3|           Ann|         Smith|   23662|2013-12-19|       COMPLETE|
|          3|           Ann|         Smith|   35158|2014-02-26|       COMPLETE|
|          3|           Ann|         Smith|   46399|2014-05-09|     PROCESSING|
|          3|           Ann|         Smi

##### Consolidating order_id,order_date,order_status to structure data type

In [0]:
from pyspark.sql.functions import struct
customers_orders_df.select('customer_id', struct('order_id','order_date','order_status').alias('order_details')).orderBy('customer_id').show(10, truncate=False)

+-----------+------------------------------------+
|customer_id|order_details                       |
+-----------+------------------------------------+
|1          |{22945, 2013-12-13, COMPLETE}       |
|2          |{33865, 2014-02-18, COMPLETE}       |
|2          |{15192, 2013-10-29, PENDING_PAYMENT}|
|2          |{57963, 2013-08-02, ON_HOLD}        |
|2          |{67863, 2013-11-30, COMPLETE}       |
|3          |{35158, 2014-02-26, COMPLETE}       |
|3          |{57617, 2014-07-24, COMPLETE}       |
|3          |{22646, 2013-12-11, COMPLETE}       |
|3          |{23662, 2013-12-19, COMPLETE}       |
|3          |{56178, 2014-07-15, PENDING}        |
+-----------+------------------------------------+
only showing top 10 rows



##### Generate an array of struct field using order_details
##### Then grouping by customer_id and storing the order_details in the form of array

In [0]:
customer_order_struct = customers_orders_df.select('customer_id', struct('order_id','order_date','order_status').alias('order_details'))

from pyspark.sql.functions import collect_list

final_df = customer_order_struct.groupBy('customer_id').agg(collect_list('order_details').alias('order_details')).orderBy('customer_id')

In [0]:
final_df.show(2, truncate=False)

+-----------+----------------------------------------------------------------------------------------------------------------------------------+
|customer_id|order_details                                                                                                                     |
+-----------+----------------------------------------------------------------------------------------------------------------------------------+
|1          |[{22945, 2013-12-13, COMPLETE}]                                                                                                   |
|2          |[{15192, 2013-10-29, PENDING_PAYMENT}, {33865, 2014-02-18, COMPLETE}, {57963, 2013-08-02, ON_HOLD}, {67863, 2013-11-30, COMPLETE}]|
+-----------+----------------------------------------------------------------------------------------------------------------------------------+
only showing top 2 rows



##### Export Dataframe to JSON

In [0]:
final_df.coalesce(1).write.json('dbfs:/FileStore/tables/Data/final')

In [0]:
display(dbutils.fs.ls("dbfs:/FileStore/tables/Data/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/Data/Customers/,Customers/,0,0
dbfs:/FileStore/tables/Data/Order_Items/,Order_Items/,0,0
dbfs:/FileStore/tables/Data/Orders/,Orders/,0,0
dbfs:/FileStore/tables/Data/final/,final/,0,0


#### Now, we perform denormalization for all 3 tables i.e. Customers, Orders and Order_Details

In [0]:
### Joing the tables
customer_details = customers_df.join(orders_df, customers_df["customer_id"] == orders_df["order_customer_id"]). \
                               join(order_items_df, orders_df["order_id"] == order_items_df["order_item_order_id"])

## Create a Denormalized Data Frame by combining all the required field under order_detail as Struct Data Type.

denormalized_df = customer_details. \
select('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status',struct('order_item_id','order_item_product_id','order_item_subtotal').alias('order_item_details')). \
groupBy('customer_id','customer_fname','customer_lname','customer_email','order_id','order_date','order_status'). \
agg(collect_list('order_item_details').alias('order_item_details')). \
orderBy('customer_id'). \
select('customer_id','customer_fname','customer_lname','customer_email',struct('order_id','order_date','order_status','order_item_details').alias('order_details')). \
groupBy('customer_id','customer_fname','customer_lname','customer_email'). \
agg(collect_list('order_details').alias('order_details')). \
orderBy('customer_id')



#####Export Dataframe to JSON

In [0]:
denormalized_df.coalesce(1).write.json('dbfs:/FileStore/tables/Data/denormalized')

#### Perform The Analysis On Our Data

#### Step:1 Read The Data

In [0]:
# We will read our json denormalized data first
json_df = spark.read.json('dbfs:/FileStore/tables/Data/denormalized/part-00000-tid-9052100391716899340-7ab94b77-0613-4544-964c-f7fd4f5d205b-134-1-c000.json')
json_df.show(2)

+--------------+--------------+-----------+--------------+--------------------+
|customer_email|customer_fname|customer_id|customer_lname|       order_details|
+--------------+--------------+-----------+--------------+--------------------+
|     XXXXXXXXX|       Richard|          1|     Hernandez|[{2013-12-13, 229...|
|     XXXXXXXXX|          Mary|          2|       Barrett|[{2013-08-02, 579...|
+--------------+--------------+-----------+--------------+--------------------+
only showing top 2 rows



#### Step 2: Perform Analysis:
##### 1- Get the Details of the order placed by the customer on 2013 December 25th (Christmas) and 2013 Decmeber 31st (New Year Eve)

In [0]:
# Christmas Order Details
from pyspark.sql.functions import explode

json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
filter('order_details.order_date LIKE "2013-12-25%"'). \
orderBy('customer_id'). \
select('customer_id','customer_fname','order_details.order_id','order_details.order_date','order_details.order_status'). \
show(10)


+-----------+--------------+--------+----------+---------------+
|customer_id|customer_fname|order_id|order_date|   order_status|
+-----------+--------------+--------+----------+---------------+
|         12|   Christopher|   24642|2013-12-25| PAYMENT_REVIEW|
|         53|     Katherine|   24704|2013-12-25|        PENDING|
|        119|          Mary|   24672|2013-12-25|     PROCESSING|
|        255|          Mary|   24640|2013-12-25|       COMPLETE|
|        258|         Aaron|   24705|2013-12-25|       COMPLETE|
|        326|       Shirley|   61759|2013-12-25|       COMPLETE|
|        347|         Bryan|   61738|2013-12-25|        PENDING|
|        454|        Robert|   61771|2013-12-25|PENDING_PAYMENT|
|        483|          Mary|   24775|2013-12-25|       COMPLETE|
|        594|          Mary|   24696|2013-12-25|PENDING_PAYMENT|
+-----------+--------------+--------+----------+---------------+
only showing top 10 rows



In [0]:
# New Year Eve Order Details
json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
filter('order_details.order_date LIKE "2013-12-31%"'). \
orderBy('customer_id'). \
select('customer_id','customer_fname','order_details.order_id','order_details.order_date','order_details.order_status'). \
show(10)

+-----------+--------------+--------+----------+---------------+
|customer_id|customer_fname|order_id|order_date|   order_status|
+-----------+--------------+--------+----------+---------------+
|         21|       William|   25802|2013-12-31|PENDING_PAYMENT|
|         42|         Ethan|   25843|2013-12-31|PENDING_PAYMENT|
|         67|      Samantha|   25722|2013-12-31|PENDING_PAYMENT|
|         79|          Mary|   25645|2013-12-31|PENDING_PAYMENT|
|         94|          Mary|   61886|2013-12-31|       COMPLETE|
|        119|          Mary|   25684|2013-12-31|         CLOSED|
|        145|          Joan|   61890|2013-12-31|         CLOSED|
|        175|        Ronald|   25835|2013-12-31|        PENDING|
|        211|          Mary|   25727|2013-12-31|       COMPLETE|
|        285|       Shirley|   25855|2013-12-31|PENDING_PAYMENT|
+-----------+--------------+--------+----------+---------------+
only showing top 10 rows



##### 2- Calculate Monthly Customer Revenue

In [0]:
# First we need to flatten our struct order details
from pyspark.sql.functions import col

flatten = json_df.select('customer_id','customer_fname',explode('order_details').alias('order_details')). \
select('customer_id','customer_fname',col('order_details.order_date').alias('order_date'),col('order_details.order_id').alias('order_id'),col('order_details.order_status').alias('order_status'),explode('order_details.order_item_details').alias('order_item_details')). \
select('customer_id','customer_fname','order_date','order_id','order_status','order_item_details.order_item_id','order_item_details.order_item_product_id','order_item_details.order_item_subtotal')

In [0]:
flatten.show(10)

+-----------+--------------+----------+--------+---------------+-------------+---------------------+-------------------+
|customer_id|customer_fname|order_date|order_id|   order_status|order_item_id|order_item_product_id|order_item_subtotal|
+-----------+--------------+----------+--------+---------------+-------------+---------------------+-------------------+
|          1|       Richard|2013-12-13|   22945|       COMPLETE|        57439|                  191|             499.95|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145023|                 1014|             149.94|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145022|                 1014|              99.96|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145021|                  627|             199.95|
|          2|          Mary|2013-08-02|   57963|        ON_HOLD|       145020|                 1073|             199.99|
|          2|          Mary|2013

In [0]:
# After Flattening, now lets compute customer monthly revenue
from pyspark.sql.functions import to_date,date_format,sum as _sum
from pyspark.sql import Row

flatten.select('customer_id','customer_fname',col("order_date"),to_date(col("order_date"), "yyyy-MM-dd").alias("order_date_converted"),'order_status','order_item_subtotal'). \
filter('order_status IN ("COMPLETE", "CLOSED")').\
groupBy('customer_id','customer_fname',date_format('order_date_converted','yyy-MM').alias('order_month')). \
agg(_sum('order_item_subtotal').alias('Revenue')). \
orderBy('order_month'). \
show()

+-----------+--------------+-----------+------------------+
|customer_id|customer_fname|order_month|           Revenue|
+-----------+--------------+-----------+------------------+
|       1478|          Anna|    2013-07|           1784.76|
|       1180|          Mary|    2013-07|           1129.94|
|         16|       Tiffany|    2013-07|             39.99|
|       2418|         Helen|    2013-07|1099.8400000000001|
|        943|          John|    2013-07| 829.8900000000001|
|       1104|         Linda|    2013-07|            699.96|
|       1265|        Albert|    2013-07|            199.99|
|        965|          Sean|    2013-07|494.95000000000005|
|       1932|       Shirley|    2013-07| 929.9100000000001|
|        121|          Mary|    2013-07| 609.9300000000001|
|         66|          Mary|    2013-07| 749.9300000000001|
|       2129|       William|    2013-07|            589.91|
|        137|      Jonathan|    2013-07|229.98000000000002|
|       2321|          Mary|    2013-07|

##### 3- Calculate Monthly Revenue

In [0]:
### Lets compute Monthly revenue
flatten.select('customer_id','customer_fname',col("order_date"),to_date(col("order_date"), "yyyy-MM-dd").alias("order_date_converted"),'order_status','order_item_subtotal'). \
filter('order_status IN ("COMPLETE", "CLOSED")').\
groupBy(date_format('order_date_converted','yyy-MM').alias('order_month')). \
agg(_sum('order_item_subtotal').alias('Revenue')). \
orderBy('order_month'). \
show()

+-----------+------------------+
|order_month|           Revenue|
+-----------+------------------+
|    2013-07| 333465.4500000003|
|    2013-08|1221828.9000000027|
|    2013-09|1302255.8000000028|
|    2013-10|1171686.9200000018|
|    2013-11|1379935.3300000008|
|    2013-12| 1277719.600000003|
|    2014-01|1230221.7400000019|
|    2014-02|1217770.0900000029|
|    2014-03|1271350.9700000028|
|    2014-04|1249723.5200000028|
|    2014-05|1221679.3300000022|
|    2014-06|1179754.0600000035|
|    2014-07| 955590.7700000018|
+-----------+------------------+



#### Conclusion: In this project, we denormalized Data Tables and use PySpark to perform Analysis.