# End to end Use of Medallion Architecture

In [0]:
application_id = dbutils.secrets.get(scope="databricksVolt2323",key="app-id")
tenant_id = dbutils.secrets.get(scope="databricksVolt2323",key="tenant-id")
secret = dbutils.secrets.get(scope="databricksVolt2323",key="secret")

In [0]:
# container_name = "silver"
# account_name = "databricksfiles2323"
# mount_point = "/mnt/silver"

container_name = "gold"
account_name = "databricksfiles2323"
mount_point = "/mnt/gold"

In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": application_id,
          "fs.azure.account.oauth2.client.secret": secret,
          "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}
 
dbutils.fs.mount(
  source = f"abfss://{container_name}@{account_name}.dfs.core.windows.net/",
  mount_point = mount_point,
  extra_configs = configs)

True

In [0]:
orders_path = '/mnt/bronze/orders.csv'
# Reading the orders csv file
# Initially assigning the order_datetime column as a string because it is not in the correct timestamp format
# when defining the schema I will set nullable, which is the third argument in StructField to False, this does not permit null values

# import the relevant data types
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructField, StructType
orders_schema = StructType([
                    StructField("ORDER_ID", IntegerType(), False),
                    StructField("ORDER_DATETIME", StringType(), False),
                    StructField("CUSTOMER_ID", IntegerType(), False),
                    StructField("ORDER_STATUS", StringType(), False),
                    StructField("STORE_ID", IntegerType(), False)
                    ]
                    )

orders=spark.read.csv(path=orders_path, header=True, schema=orders_schema)

In [0]:
# displaying the orders dataframe
orders.display()

ORDER_ID,ORDER_DATETIME,CUSTOMER_ID,ORDER_STATUS,STORE_ID
447,06-JAN-22 09.35.42.00,355,COMPLETE,1
448,06-JAN-22 10.23.14.00,155,COMPLETE,1
449,06-JAN-22 01.21.54.00,242,COMPLETE,1
450,06-JAN-22 05.57.04.00,49,COMPLETE,1
451,06-JAN-22 10.39.07.00,204,COMPLETE,1
452,07-JAN-22 01.11.46.00,216,COMPLETE,1
453,07-JAN-22 06.53.06.00,4,COMPLETE,4
454,07-JAN-22 03.55.15.00,388,COMPLETE,1
455,07-JAN-22 06.38.38.00,291,COMPLETE,1
456,08-JAN-22 12.52.12.00,272,COMPLETE,1


In [0]:
# Importing the to_timestamp function
from pyspark.sql.functions import to_timestamp

In [0]:
# Converting the order_datetime column to a timestamp and aliasing the name as 'order_timestamp'

orders = orders.select('ORDER_ID', \
              to_timestamp(orders['order_datetime'], "dd-MMM-yy kk.mm.ss.SS").alias('ORDER_TIMESTAMP'), \
              'CUSTOMER_ID', \
              'ORDER_STATUS', \
              'STORE_ID'
             )

In [0]:
# Confirming the data types
orders.dtypes

[('ORDER_ID', 'int'),
 ('ORDER_TIMESTAMP', 'timestamp'),
 ('CUSTOMER_ID', 'int'),
 ('ORDER_STATUS', 'string'),
 ('STORE_ID', 'int')]

In [0]:
# reviewing the current state of the orders dataframe
orders.display()

ORDER_ID,ORDER_TIMESTAMP,CUSTOMER_ID,ORDER_STATUS,STORE_ID
447,2022-01-06T09:35:42Z,355,COMPLETE,1
448,2022-01-06T10:23:14Z,155,COMPLETE,1
449,2022-01-06T01:21:54Z,242,COMPLETE,1
450,2022-01-06T05:57:04Z,49,COMPLETE,1
451,2022-01-06T10:39:07Z,204,COMPLETE,1
452,2022-01-07T01:11:46Z,216,COMPLETE,1
453,2022-01-07T06:53:06Z,4,COMPLETE,4
454,2022-01-07T03:55:15Z,388,COMPLETE,1
455,2022-01-07T06:38:38Z,291,COMPLETE,1
456,2022-01-08T12:52:12Z,272,COMPLETE,1


In [0]:
# filtering the records to display only 'COMPLETE' orders
# assigning the result back to the orders dataframe
orders = orders.filter(orders['order_status']=="COMPLETE")

In [0]:
# Reading the stores csv file

stores_path = "/mnt/bronze/stores.csv"

stores_schema = StructType([
                    StructField("STORE_ID", IntegerType(), False),
                    StructField("STORE_NAME", StringType(), False),
                    StructField("WEB_ADDRESS", StringType(), False),
                    StructField("LATITUDE", DoubleType(), False),
                    StructField("LONGITUDE", DoubleType(), False)
                    ]
                    )

stores=spark.read.csv(path=stores_path, header=True, schema=stores_schema)

In [0]:
# displaying the stores dataframe
stores.display()

STORE_ID,STORE_NAME,WEB_ADDRESS,LATITUDE,LONGITUDE
1,Online,https://www.example.com,,
2,San Francisco,,37.529395,-122.267237
3,Seattle,,47.6053,-122.33221
4,New York City,,40.745216,-73.980518
5,Chicago,,41.878751,-87.636675
6,London,,51.519281,-0.087296
7,Bucharest,,44.43225,26.10626
8,Berlin,,52.5161,13.3873
9,Utrecht,,52.103263,5.061644
10,Madrid,,40.4929,-3.8737


In [0]:
# joining the orders and stores via a 'left' join, the orders table is the left table.
# this operation adds the store_name to the orders dataframe
# the final operation is a select method to select only the required columns and assign it back to the orders dataframe

orders = orders.join(stores, orders['store_id']==stores['store_id'], 'left').select('ORDER_ID', 'ORDER_TIMESTAMP', 'CUSTOMER_ID', 'STORE_NAME')

In [0]:
# the orders dataframe is ready to move to the silver layer
orders.display()

ORDER_ID,ORDER_TIMESTAMP,CUSTOMER_ID,STORE_NAME
447,2022-01-06T09:35:42Z,355,Online
448,2022-01-06T10:23:14Z,155,Online
449,2022-01-06T01:21:54Z,242,Online
450,2022-01-06T05:57:04Z,49,Online
451,2022-01-06T10:39:07Z,204,Online
452,2022-01-07T01:11:46Z,216,Online
453,2022-01-07T06:53:06Z,4,New York City
454,2022-01-07T03:55:15Z,388,Online
455,2022-01-07T06:38:38Z,291,Online
456,2022-01-08T12:52:12Z,272,Online


In [0]:
# writing the orders dataframe as a parquet file in the silver layer, should use mode = 'overwrite' in this instance
orders.write.parquet("/mnt/silver/orders", mode='overwrite')

#### ORDER_ITEMS

Save a table called ORDER_ITEMS in the silver layer, it should contain the following columns:
- ORDER_ID, type INTEGER
- PRODUCT_ID, type INTEGER
- UNIT_PRICE, type DOUBLE
- QUANTITY, type INTEGER

The file should be saved in PARQUET format.

In [0]:
# Reading the order items csv file

order_items_path = "/mnt/bronze/order_items.csv"

order_items_schema = StructType([
                    StructField("ORDER_ID", IntegerType(), False),
                    StructField("LINE_ITEM_ID", IntegerType(), False),
                    StructField("PRODUCT_ID", IntegerType(), False),
                    StructField("UNIT_PRICE", DoubleType(), False),
                    StructField("QUANTITY", IntegerType(), False)
                    ]
                    )

order_items=spark.read.csv(path=order_items_path, header=True, schema=order_items_schema)

In [0]:
# reviewing the order_items dataframe, the line_item_id column can be removed
order_items.display()

ORDER_ID,LINE_ITEM_ID,PRODUCT_ID,UNIT_PRICE,QUANTITY
334,1,26,48.75,1
334,2,46,39.16,4
334,3,12,10.48,4
335,1,32,5.65,2
336,1,2,29.55,5
336,2,20,28.21,5
337,1,32,5.65,4
337,2,29,24.71,4
337,3,45,31.68,3
338,1,35,7.18,2


In [0]:
# selecting only the required columns and assigning this back to the order_items variable
order_items = order_items.drop('LINE_ITEM_ID')

In [0]:
# revieiwing the order_items dataframe
order_items.display()

ORDER_ID,PRODUCT_ID,UNIT_PRICE,QUANTITY
334,26,48.75,1
334,46,39.16,4
334,12,10.48,4
335,32,5.65,2
336,2,29.55,5
336,20,28.21,5
337,32,5.65,4
337,29,24.71,4
337,45,31.68,3
338,35,7.18,2


In [0]:
# writing the order_items parquet table in the silver layer
order_items.write.parquet("/mnt/silver/order_items", mode='overwrite')

#### PRODUCTS

Save a table called PRODUCTS in the silver layer, it should contain the following columns:
- PRODUCT_ID, type INTEGER
- PRODUCT_NAME, type STRING
- UNIT_PRICE, type DOUBLE

The file should be saved in PARQUET format.

In [0]:
# Reading the products csv file

products_path = "/mnt/bronze/products.csv"

products_schema = StructType([
                    StructField("PRODUCT_ID", IntegerType(), False),
                    StructField("PRODUCT_NAME", StringType(), False),
                    StructField("UNIT_PRICE", DoubleType(), False)
                    ]
                    )

products=spark.read.csv(path=products_path, header=True, schema=products_schema)

In [0]:
# reviewing the records
products.display()

PRODUCT_ID,PRODUCT_NAME,UNIT_PRICE
16,Women's Socks (Grey),39.89
17,Women's Sweater (Brown),24.46
18,Women's Jacket (Black),14.34
19,Men's Coat (Red),28.21
20,Girl's Shorts (Green),38.34
21,Girl's Pyjamas (White),39.78
22,Men's Shorts (Black),10.33
23,Men's Pyjamas (Blue),48.39
24,Boy's Sweater (Red),9.8
25,Girl's Jeans (Grey),48.75


In [0]:
# writing the parquet file
products.write.parquet('/mnt/silver/products', mode='overwrite')

#### CUSTOMERS

Save a table called CUSTOMERS in the silver layer, it should contain the following columns:
- CUSTOMER_ID, type INTEGER
- FULL_NAME, type STRING
- EMAIL_ADDRESS, type STRING

The file should be saved in PARQUET format.

In [0]:
# Reading the customers csv file
customers_path = "/mnt/bronze/customers.csv"

customers_schema = StructType([
                    StructField("CUSTOMER_ID", IntegerType(), False),
                    StructField("FULL_NAME", StringType(), False),
                    StructField("EMAIL_ADDRESS", StringType(), False)
                    ]
                    )

customers=spark.read.csv(path=customers_path, header=True, schema=customers_schema)

In [0]:
# reviewing the dataframe
customers.display()

CUSTOMER_ID,FULL_NAME,EMAIL_ADDRESS
286,Wilfred Welch,wilfred.welch@internalmail
287,Kristina Nunez,kristina.nunez@internalmail
288,Mable Ballard,mable.ballard@internalmail
289,Diane Wilkerson,diane.wilkerson@internalmail
290,Sheryl Banks,sheryl.banks@internalmail
291,Opal Cruz,opal.cruz@internalmail
292,Dale Hughes,dale.hughes@internalmail
293,Diana Fowler,diana.fowler@internalmail
294,Travis Schwartz,travis.schwartz@internalmail
295,Anthony Boone,anthony.boone@internalmail


In [0]:
# writing the parquet file
customers.write.parquet('/mnt/silver/customers', mode='overwrite')

# Silver to Gold


#### ORDER_DETAILS

Create an order_details table that contains the following attributes:

- ORDER ID
- ORDER DATE
- CUSTOMER ID
- STORE NAME
- TOTAL ORDER AMOUNT 

The table should be aggregated by ORDER ID, ORDER DATE, CUSTOMER ID and STORE NAME to show the TOTAL ORDER AMOUNT.

Hint: Please consider the order of operations when finding the TOTAL ORDER AMOUNT.

In [0]:
# import the functions and read silver data tables as DataFrames
from pyspark.sql.functions import *

orders = spark.read.parquet('/mnt/silver/orders')
order_items = spark.read.parquet('/mnt/silver/order_items')
products = spark.read.parquet('/mnt/silver/products')
customers = spark.read.parquet('/mnt/silver/customers')

In [0]:
# display the orders dataframe to review the columns
orders.display()

ORDER_ID,ORDER_TIMESTAMP,CUSTOMER_ID,STORE_NAME
447,2022-01-06T09:35:42Z,355,Online
448,2022-01-06T10:23:14Z,155,Online
449,2022-01-06T01:21:54Z,242,Online
450,2022-01-06T05:57:04Z,49,Online
451,2022-01-06T10:39:07Z,204,Online
452,2022-01-07T01:11:46Z,216,Online
453,2022-01-07T06:53:06Z,4,New York City
454,2022-01-07T03:55:15Z,388,Online
455,2022-01-07T06:38:38Z,291,Online
456,2022-01-08T12:52:12Z,272,Online


In [0]:
# datatype of the order_timestamp column is timestamp and needs to be changed to date
orders.dtypes

[('ORDER_ID', 'int'),
 ('ORDER_TIMESTAMP', 'timestamp'),
 ('CUSTOMER_ID', 'int'),
 ('STORE_NAME', 'string')]

In [0]:
# changing the order_timestamp from 'timestamp' to 'date' using the to_date function
# assigning the result to the order_details dataframe
order_details = orders.select(
'ORDER_ID',
to_date('order_timestamp').alias('DATE'),
'CUSTOMER_ID',
'STORE_NAME'
)

In [0]:
# reviewing the current state of the order details dataframe
order_details.display()

ORDER_ID,DATE,CUSTOMER_ID,STORE_NAME
447,2022-01-06,355,Online
448,2022-01-06,155,Online
449,2022-01-06,242,Online
450,2022-01-06,49,Online
451,2022-01-06,204,Online
452,2022-01-07,216,Online
453,2022-01-07,4,New York City
454,2022-01-07,388,Online
455,2022-01-07,291,Online
456,2022-01-08,272,Online


In [0]:
# reviewing the columns of the order_items dataframe
order_items.display()

ORDER_ID,PRODUCT_ID,UNIT_PRICE,QUANTITY
334,26,48.75,1
334,46,39.16,4
334,12,10.48,4
335,32,5.65,2
336,2,29.55,5
336,20,28.21,5
337,32,5.65,4
337,29,24.71,4
337,45,31.68,3
338,35,7.18,2


In [0]:
# joining the order_details and order_items dataframe on the 'order_id' column of both tabes
# selecting the relevant columns from the resulting dataframs and storing it back to the order_details variable
order_details = order_details.join(order_items, order_items['order_id']==order_details['order_id'], 'left'). \
select(order_details['ORDER_ID'], order_details['DATE'], order_details['CUSTOMER_ID'], order_details['STORE_NAME'], order_items['UNIT_PRICE'], order_items['QUANTITY'])

In [0]:
# reviewing the current state of the order_details dataframe
order_details.display()

ORDER_ID,DATE,CUSTOMER_ID,STORE_NAME,UNIT_PRICE,QUANTITY
447,2022-01-06,355,Online,49.12,4
447,2022-01-06,355,Online,22.98,2
448,2022-01-06,155,Online,5.65,2
449,2022-01-06,242,Online,28.21,4
449,2022-01-06,242,Online,38.34,4
449,2022-01-06,242,Online,31.68,4
450,2022-01-06,49,Online,37.0,2
451,2022-01-06,204,Online,10.24,4
452,2022-01-07,216,Online,49.12,2
452,2022-01-07,216,Online,16.64,2


In [0]:
# creating a total amount column at the record level
order_details = order_details.withColumn('TOTAL_SALES_AMOUNT', order_details['UNIT_PRICE']*order_details['QUANTITY'])

In [0]:
# reviewing the current state of the order_details dataframe
order_details.display()

ORDER_ID,DATE,CUSTOMER_ID,STORE_NAME,UNIT_PRICE,QUANTITY,TOTAL_SALES_AMOUNT
447,2022-01-06,355,Online,49.12,4,196.48
447,2022-01-06,355,Online,22.98,2,45.96
448,2022-01-06,155,Online,5.65,2,11.3
449,2022-01-06,242,Online,28.21,4,112.84
449,2022-01-06,242,Online,38.34,4,153.36
449,2022-01-06,242,Online,31.68,4,126.72
450,2022-01-06,49,Online,37.0,2,74.0
451,2022-01-06,204,Online,10.24,4,40.96
452,2022-01-07,216,Online,49.12,2,98.24
452,2022-01-07,216,Online,16.64,2,33.28


In [0]:
# grouping the order_details dataframe and taking the sum of the total amount, renaming this to 'TOTAL_ORDER_AMOUNT'
# assigning the result back to the order_details dataframe
order_details = order_details. \
groupBy('ORDER_ID', 'DATE', 'CUSTOMER_ID', 'STORE_NAME'). \
sum('TOTAL_SALES_AMOUNT'). \
withColumnRenamed('sum(TOTAL_SALES_AMOUNT)', 'TOTAL_ORDER_AMOUNT')

In [0]:
# reviewing the current state of the order_details dataframe
order_details.display()

ORDER_ID,DATE,CUSTOMER_ID,STORE_NAME,TOTAL_ORDER_AMOUNT
524,2022-01-26,287,Online,28.68
158,2021-10-25,387,Online,200.03
294,2021-11-28,202,Online,220.74
343,2021-12-12,277,Online,213.29
1432,2022-07-14,337,Online,71.74
1544,2022-07-30,234,San Francisco,77.07
1721,2022-08-29,162,Perth,193.34
1748,2022-09-03,176,Online,163.54
746,2022-03-13,263,Online,45.66
802,2022-03-22,381,Online,101.16


In [0]:
# rounding the TOTAL_ORDER_AMOUNT to 2 dp
order_details = order_details.withColumn('TOTAL_ORDER_AMOUNT', round('TOTAL_ORDER_AMOUNT',2))

In [0]:
order_details.display()

ORDER_ID,DATE,CUSTOMER_ID,STORE_NAME,TOTAL_ORDER_AMOUNT
524,2022-01-26,287,Online,28.68
158,2021-10-25,387,Online,200.03
294,2021-11-28,202,Online,220.74
343,2021-12-12,277,Online,213.29
1432,2022-07-14,337,Online,71.74
1544,2022-07-30,234,San Francisco,77.07
1721,2022-08-29,162,Perth,193.34
1748,2022-09-03,176,Online,163.54
746,2022-03-13,263,Online,45.66
802,2022-03-22,381,Online,101.16


In [0]:
# writing the order_details dataframe as a parquet file in the gold layer
order_details.write.parquet('/FileStore/tables/gold/order_details', mode='overwrite')


#### MONTHLY_SALES

Create an aggregated table to show the monthly sales total and save it in the gold layer as a parquet file called MONTHLY_SALES.

The table should have two columns:
- MONTH_YEAR - this should be in the format yyyy-MM e.g. 2020-10
- TOTAL_SALES

Display the sales total rounded to 2 dp and sorted in descending date order.

In [0]:
# can use the date columns from the order_details table
order_details.display()

ORDER_ID,DATE,CUSTOMER_ID,STORE_NAME,TOTAL_ORDER_AMOUNT
524,2022-01-26,287,Online,28.68
158,2021-10-25,387,Online,200.03
294,2021-11-28,202,Online,220.74
343,2021-12-12,277,Online,213.29
1432,2022-07-14,337,Online,71.74
1544,2022-07-30,234,San Francisco,77.07
1721,2022-08-29,162,Perth,193.34
1748,2022-09-03,176,Online,163.54
746,2022-03-13,263,Online,45.66
802,2022-03-22,381,Online,101.16


In [0]:
# creating a column that extracts the month and year from the date column
# assigning the dataframe result back to the sales_with_month variable
# https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.date_format.html?highlight=date_format#pyspark.sql.functions.date_format
sales_with_month = order_details.withColumn('MONTH_YEAR', date_format('DATE','yyyy-MM'))

In [0]:
sales_with_month.display()

ORDER_ID,DATE,CUSTOMER_ID,STORE_NAME,TOTAL_ORDER_AMOUNT,MONTH_YEAR
524,2022-01-26,287,Online,28.68,2022-01
158,2021-10-25,387,Online,200.03,2021-10
294,2021-11-28,202,Online,220.74,2021-11
343,2021-12-12,277,Online,213.29,2021-12
1432,2022-07-14,337,Online,71.74,2022-07
1544,2022-07-30,234,San Francisco,77.07,2022-07
1721,2022-08-29,162,Perth,193.34,2022-08
1748,2022-09-03,176,Online,163.54,2022-09
746,2022-03-13,263,Online,45.66,2022-03
802,2022-03-22,381,Online,101.16,2022-03


In [0]:
monthly_sales = sales_with_month.groupBy('MONTH_YEAR').sum('TOTAL_ORDER_AMOUNT'). \
withColumn('TOTAL_SALES', round('sum(TOTAL_ORDER_AMOUNT)',2)).sort(sales_with_month['MONTH_YEAR'].desc()). \
select('MONTH_YEAR', 'TOTAL_SALES')

In [0]:
monthly_sales.display()

MONTH_YEAR,TOTAL_SALES
2022-10,8214.0
2022-09,23970.29
2022-08,26180.9
2022-07,32217.2
2022-06,27184.09
2022-05,26848.93
2022-04,23223.25
2022-03,25424.28
2022-02,21488.51
2022-01,18116.82


In [0]:
# writing the monthly_sales dataframe as a parquet file in the gold layer
monthly_sales.write.parquet('/mnt/gold/monthly_sales', mode='overwrite')


#### STORE_MONTHLY_SALES

Create an aggregated table to show the monthly sales total by store and save it in the gold layer as a parquet file called STORE_MONTHLY_SALES.

The table should have two columns:
- MONTH_YEAR - this should be in the format yyyy-MM e.g. 2020-10
- STORE NAME
- TOTAL_SALES

Display the sales total rounded to 2 dp and sorted in descending date order.

In [0]:
# we can leverage the intermediate dataframe called sales_with_month to extract the information we need
sales_with_month.display()

ORDER_ID,DATE,CUSTOMER_ID,STORE_NAME,TOTAL_ORDER_AMOUNT,MONTH_YEAR
524,2022-01-26,287,Online,28.68,2022-01
158,2021-10-25,387,Online,200.03,2021-10
294,2021-11-28,202,Online,220.74,2021-11
343,2021-12-12,277,Online,213.29,2021-12
1432,2022-07-14,337,Online,71.74,2022-07
1544,2022-07-30,234,San Francisco,77.07,2022-07
1721,2022-08-29,162,Perth,193.34,2022-08
1748,2022-09-03,176,Online,163.54,2022-09
746,2022-03-13,263,Online,45.66,2022-03
802,2022-03-22,381,Online,101.16,2022-03


In [0]:
# in addition to month_year you must also group by store_name
store_monthly_sales = sales_with_month.groupBy('MONTH_YEAR', 'STORE_NAME').sum('TOTAL_ORDER_AMOUNT'). \
withColumn('TOTAL_SALES', round('sum(TOTAL_ORDER_AMOUNT)',2)).sort(sales_with_month['MONTH_YEAR'].desc()). \
select('MONTH_YEAR','STORE_NAME', 'TOTAL_SALES')

In [0]:
store_monthly_sales.display()

MONTH_YEAR,STORE_NAME,TOTAL_SALES
2022-10,S�o Paulo,268.81
2022-10,Tel Aviv,3484.76
2022-10,Online,933.25
2022-10,Tokyo,1770.14
2022-10,Buenos Aires,462.63
2022-10,Mumbai,96.2
2022-10,Bejing,601.05
2022-10,Seattle,31.44
2022-10,Mexico City,565.72
2022-09,Tel Aviv,972.43


In [0]:
# writing the store_monthly_sales dataframe as a parquet file in the gold layer
store_monthly_sales.write.parquet('/mnt/gold/store_monthly_sales', mode='overwrite')