## GOALS
### 1. ORDER_DETAILS: Create an order_details table that contains the following:
- ORDER_ID
- ORDER_DATE - This should be at the day level (e.g 1928-04-23)
- CUSTOMER_ID
- STORE_NAME
- TOTAL_ORDER_AMOUNT(2 decimal places)
#### The table should be aggregated by ORDER_ID, ORDER_DATE, CUSTOMER_ID, STORE_NAME to show the TOTAL_ORDER_AMOUNT

### 2. MONTHLY_SALES: Create an aggregated table to show the monthly sales total
The table should have two columns:
- MONTH_YEAR - This should be in the format yyyy-MM, e.g. 2020-10, sorted in descending order
- TOTAL_SALES (2 decimal places)

### 3. STORE_MONTHLY_SALES: Create an aggregated table to show the monthly sales by store name
The table should have two columns:
- MONTH_YEAR - This should be in the format yyyy-MM, e.g. 2020-10, sorted in descending order
- STORE_NAME
- TOTAL_SALES (2 decimal places)

In [0]:
# import required functions 
from pyspark.sql.functions import *

# read in silver data tablesd as dataframes
orders = spark.read.parquet('/FileStore/tables/silver/orders.parquet')
order_items = spark.read.parquet('/FileStore/tables/silver/order_items.parquet')
products = spark.read.parquet('/FileStore/tables/silver/products.parquet')


In [0]:
orders.display()

order_id,order_timestamp,customer_id,order_status,store_name
447,2022-01-06T09:35:42.000+0000,355,COMPLETE,Online
448,2022-01-06T10:23:14.000+0000,155,COMPLETE,Online
449,2022-01-06T01:21:54.000+0000,242,COMPLETE,Online
450,2022-01-06T05:57:04.000+0000,49,COMPLETE,Online
451,2022-01-06T10:39:07.000+0000,204,COMPLETE,Online
452,2022-01-07T01:11:46.000+0000,216,COMPLETE,Online
453,2022-01-07T06:53:06.000+0000,4,COMPLETE,New York City
454,2022-01-07T03:55:15.000+0000,388,COMPLETE,Online
455,2022-01-07T06:38:38.000+0000,291,COMPLETE,Online
456,2022-01-08T12:52:12.000+0000,272,COMPLETE,Online


In [0]:
# review data types
orders.dtypes

In [0]:
# change order time to order date
order_details = orders.select('order_id', to_date('order_timestamp').alias('date'), 'customer_id', 'store_name')
order_details.display()


order_id,date,customer_id,store_name
447,2022-01-06,355,Online
448,2022-01-06,155,Online
449,2022-01-06,242,Online
450,2022-01-06,49,Online
451,2022-01-06,204,Online
452,2022-01-07,216,Online
453,2022-01-07,4,New York City
454,2022-01-07,388,Online
455,2022-01-07,291,Online
456,2022-01-08,272,Online


In [0]:
# review order_items dataframe
order_items.display()

order_id,product_id,unit_price,quantity
334,26,48.75,1
334,46,39.16,4
334,12,10.48,4
335,32,5.65,2
336,2,29.55,5
336,20,28.21,5
337,32,5.65,4
337,29,24.71,4
337,45,31.68,3
338,35,7.18,2


In [0]:
# join tables 
order_details = order_details.join(order_items, order_details.order_id == order_items.order_id, 'left').\
    select(order_details.order_id, order_details.date, order_details.customer_id, order_details.store_name, order_items.unit_price, order_items.quantity)
# display order_items after joining with order_details
order_details.display()

order_id,date,customer_id,store_name,unit_price,quantity
447,2022-01-06,355,Online,49.12,4
448,2022-01-06,155,Online,5.65,2
449,2022-01-06,242,Online,28.21,4
450,2022-01-06,49,Online,37.0,2
451,2022-01-06,204,Online,10.24,4
452,2022-01-07,216,Online,49.12,2
453,2022-01-07,4,New York City,29.55,2
454,2022-01-07,388,Online,39.91,3
455,2022-01-07,291,Online,10.33,4
456,2022-01-08,272,Online,39.16,4


In [0]:
# create aggregated table of order_details with new column named "total_sales_amount"
order_details = order_details.withColumn('total_sales_amount', round(order_details.unit_price * order_details.quantity, 2))
order_details.display()

order_id,date,customer_id,store_name,unit_price,quantity,total_sales_amount
447,2022-01-06,355,Online,49.12,4,196.48
448,2022-01-06,155,Online,5.65,2,11.3
449,2022-01-06,242,Online,28.21,4,112.84
450,2022-01-06,49,Online,37.0,2,74.0
451,2022-01-06,204,Online,10.24,4,40.96
452,2022-01-07,216,Online,49.12,2,98.24
453,2022-01-07,4,New York City,29.55,2,59.1
454,2022-01-07,388,Online,39.91,3,119.73
455,2022-01-07,291,Online,10.33,4,41.32
456,2022-01-08,272,Online,39.16,4,156.64


In [0]:
# group the order_details dataframe and taking sum of the total amount, renaming this to 'total_order_amount'
# assign this to a new dataframe called 'order_details'
order_details = order_details.groupBy('order_id', 'date', 'customer_id', 'store_name') \
    .sum('total_sales_amount') \
    .withColumnRenamed('sum(total_sales_amount)', 'total_order_amount') \
    .withColumn('total_order_amount', round('total_order_amount', 2))
# display the update
order_details.display()


order_id,date,customer_id,store_name,total_order_amount
524,2022-01-26,287,Online,28.68
158,2021-10-25,387,Online,200.03
294,2021-11-28,202,Online,220.74
343,2021-12-12,277,Online,213.29
1432,2022-07-14,337,Online,71.74
1544,2022-07-30,234,San Francisco,77.07
1721,2022-08-29,162,Perth,193.34
1748,2022-09-03,176,Online,163.54
746,2022-03-13,263,Online,45.66
802,2022-03-22,381,Online,101.16


In [0]:
# write the order_details dataframe as a parquet file in the gold layer
order_details.write.parquet('/FileStore/tables/gold/order_details.parquet', mode='overwrite')

In [0]:
# create new column that extracts month and year from date
# assign the dataframe result back to the month_of_sales variable
month_of_sales = order_details.withColumn('month_year', date_format('date', 'yyyy-MM'))

In [0]:
# create a new table with selected columns 
monthly_sales = month_of_sales.groupBy('month_year').sum('total_order_amount').\
    withColumn('total_sales', round('sum(total_order_amount)',2)).sort(month_of_sales.month_year.desc()).\
    select('month_year', 'total_sales')

In [0]:
monthly_sales.display()

month_year,total_sales
2022-10,8214.0
2022-09,23970.29
2022-08,26180.9
2022-07,32217.2
2022-06,27184.09
2022-05,26848.93
2022-04,23223.25
2022-03,25424.28
2022-02,21488.51
2022-01,18116.82


In [0]:
monthly_sales.write.parquet('/FileStore/tables/gold/monthly_sales.parquet', mode='overwrite')

In [0]:
store_monthly_sales = month_of_sales.groupBy('month_year','store_name').sum('total_order_amount').\
    withColumn('total_sales', round('sum(total_order_amount)',2)).sort(month_of_sales.month_year.desc()).\
    select('month_year', 'store_name', 'total_sales')
store_monthly_sales.display()

month_year,store_name,total_sales
2022-10,Buenos Aires,462.63
2022-10,S�o Paulo,268.81
2022-10,Tel Aviv,3484.76
2022-10,Mumbai,96.2
2022-10,Online,933.25
2022-10,Bejing,601.05
2022-10,Seattle,31.44
2022-10,Tokyo,1770.14
2022-10,Mexico City,565.72
2022-09,New Dehli,490.0


In [0]:
# write the store_monthly_sales dataframe as a parquet file in the gold layer
store_monthly_sales.write.parquet('/FileStore/tables/gold/store_monthly_sales', mode='overwrite')