Project : Supply Chain Orders ETL Pipeline with Delta Lake


**Project Scnenario**: This project focuses on developing a supply chain dashboard for an online clothing brand that offers a diverse range of fashion products. The dashboard will analyze historical order data, providing insights to inform purchasing decisions and ensure sufficient inventory levels to meet demand during the holiday season.

The project involves creating and ingesting data into Delta Lake tables, utilizing Databricks notebooks with Python and SQL for data processing and transformation. The final output is a supply chain dashboard designed to support inventory and purchasing decisions. Delta Lake’s functionalities, such as merge operations and time travel, will be leveraged to build a robust and scalable data pipeline.

Upload project JSON files to Databricks file system

In [None]:
# First Check that the parameter "DBFS File Browser" is Enable. Navigate to "Settings > Admin > Workspace settings"  to check

### a. Upload ORDERS Json files in Databricks File System

In [None]:
## Load Data Using the UI to this path dbfs:/FileStore/SupplyChain/ORDERS_RAW/

### b. Check loaded files

In [None]:
# Use Databricks Utilities (dbutils). Documentation : https://docs.databricks.com/dev-tools/databricks-utils.html#ls-command-dbutilsfsls
dbutils.fs.ls("dbfs:/FileStore/SupplyChain/ORDERS_RAW/")


Out[63]: [FileInfo(path='dbfs:/FileStore/SupplyChain/ORDERS_RAW/ORDERS_RAW_PART_001.json', name='ORDERS_RAW_PART_001.json', size=260483, modificationTime=1731157420000),
 FileInfo(path='dbfs:/FileStore/SupplyChain/ORDERS_RAW/ORDERS_RAW_PART_002.json', name='ORDERS_RAW_PART_002.json', size=260437, modificationTime=1731157420000),
 FileInfo(path='dbfs:/FileStore/SupplyChain/ORDERS_RAW/ORDERS_RAW_PART_003.json', name='ORDERS_RAW_PART_003.json', size=260640, modificationTime=1731157421000),
 FileInfo(path='dbfs:/FileStore/SupplyChain/ORDERS_RAW/ORDERS_RAW_PART_004.json', name='ORDERS_RAW_PART_004.json', size=4928, modificationTime=1731157421000),
 FileInfo(path='dbfs:/FileStore/SupplyChain/ORDERS_RAW/UPDATE_ORDERS_RAW.json', name='UPDATE_ORDERS_RAW.json', size=2628, modificationTime=1731162395000)]

Create Delta Table : ORDERS_RAW

### a. Read multiline json files using spark dataframe:

In [None]:
# Read multiple line json files using spark dataframeAPI


orders_raw_df = spark.read.option("multiline","true").json("dbfs:/FileStore/SupplyChain/ORDERS_RAW/ORDERS_RAW_PART_*.json")

## Show the datafarme
orders_raw_df.show(n=5, truncate=False)

## click on orders_raw_df to Check the schema

+---------------+-------------------+-----+-----------+-------------+----------+--------+------------+-----------------+-----------------------+--------+---------------+--------+------------+----------+
|BRAND          |CATEGORY           |COLOR|CUSTOMER_ID|ORDER_COUNTRY|ORDER_DATE|ORDER_ID|ORDER_STATUS|PAYMENT_METHOD   |PRODUCT_NAME           |QUANTITY|SHIPPING_METHOD|SIZE    |SUB-CATEGORY|UNIT_PRICE|
+---------------+-------------------+-----+-----------+-------------+----------+--------+------------+-----------------+-----------------------+--------+---------------+--------+------------+----------+
|Gap            |Men's Clothing     |Navy |2348       |Germany      |2023-01-11|ORD-200 |Shipped     |Cash on Delivery |Classic Cotton T-Shirt |6       |Express        |Size L  |Tops        |24.99     |
|Adidas Kids    |Kids Clothing      |Green|2149       |Mexico       |2023-01-11|ORD-1418|Delivered   |Credit/Debit Card|Green Hooded Sweatshirt|3       |Standard       |Size 14 |Tops      

In [None]:
#Validate loaded files Count Number of Rows in the DataFrame, the total Should be "1510"
orders_raw_df.count()

Out[65]: 1510

Create Delta Table ORDERS_RAW

In [None]:
# First, Create Database SupplyChainDB if it doesn't exist
db = "SupplyChainDB"

spark.sql(f"CREATE DATABASE IF NOT EXISTS {db}")
spark.sql(f"USE {db}")

Out[66]: DataFrame[]

In [None]:
## Create DelaTable ORDERS_RAW in the metastore using DataFrame's schema and write data to it
## Documentation : https://docs.delta.io/latest/quick-start.html#create-a-table
orders_raw_df.write.mode("overwrite").format("delta").option("overwriteSchema","true").saveAsTable("ORDERS_RAW")

Show Created Delta Table:

In [None]:
%sql
-- Switch to SQL Cell using %SQL
SHOW tables

 -- Alternativerly you can use Python: display(spark.sql(f"SHOW TABLES"))

database,tableName,isTemporary
supplychaindb,inventory,False
supplychaindb,orders_gold,False
supplychaindb,orders_raw,False


Validate data loaded successfully to Delta Table ORDERS_RAW**:

In [None]:
%sql
SELECT COUNT(*) FROM ORDERS_RAW


count(1)
1510


Decsribe Detail of the Delta Table**:

In [None]:
%sql

-- describe DETAIL ORDERS_RAW
describe DETAIL ORDERS_RAW
-- Returns the basic metadata information of a delta table.

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics
delta,091bd459-3a0d-48b6-b8c8-9640eede1fe6,spark_catalog.supplychaindb.orders_raw,,dbfs:/user/hive/warehouse/supplychaindb.db/orders_raw,2024-11-09T13:12:42.821+0000,2024-11-09T15:25:16.000+0000,List(),4,57492,Map(),1,2,"List(appendOnly, invariants)",Map()


Create INVENTORY Delta table

Upload INVENTORY.Json file in DBFS

In [None]:
## Load the file using the UI to this path dbfs:/FileStore/SupplyChain/INVENTORY/

 Read the File using spark dataframe

In [None]:
inventory_df = spark.read.option("multiline","true").json("dbfs:/FileStore/SupplyChain/INVENTORY/INVENTORY.json")

## Show the datafarme
inventory_df.show(n=5, truncate=False)

+-------+----------+--------------------------+----------+-----+
|BRAND  |COLOR     |PRODUCT_NAME              |SIZE      |STOCK|
+-------+----------+--------------------------+----------+-----+
|J.Crew |Green     |Green Cargo Pants         |Size 32x32|58   |
|Theory |Grey      |Grey Turtleneck Sweater   |Size S    |42   |
|Ray-Ban|Gold/Brown|Classic Aviator Sunglasses|One Size  |53   |
|ASOS   |Black     |Men's Faux Leather Jacket |Size M    |40   |
|Levi's |Light Blue|Distressed Denim Shorts   |Size M    |46   |
+-------+----------+--------------------------+----------+-----+
only showing top 5 rows



Create Delta Table INVENTORY

In [None]:
# First, Create Database SupplyChainDB
db = "SupplyChainDB"
spark.sql(f"USE {db}")

Out[73]: DataFrame[]

In [None]:
## Create INVENTORY Delta Table
inventory_df.write.mode("overwrite").format("delta").saveAsTable("INVENTORY")

Show Created Delta Tables:

In [None]:
%sql
-- Switch to SQL Cell using %sql
SHOW TABLES

database,tableName,isTemporary
supplychaindb,inventory,False
supplychaindb,orders_gold,False
supplychaindb,orders_raw,False


Transform data in delta table

<a href="https://www.databricks.com/glossary/medallion-architecture" target="_blank">Medallion Architecture</a>   
</br>
<img src="https://databricks.com/wp-content/uploads/2020/09/delta-lake-medallion-model-scaled.jpg" width=900/>

Read ORDERS_RAW delta table using spark Dataframe

In [None]:
#read Delta Table using spark dataframe

ORDERS_Gold_df=  spark.read.table("supplychaindb.ORDERS_RAW")

ORDERS_Gold_df.show(n=5,truncate=False)
# Click on ORDERS_DF to See the Schema of the Table.

+------------+----------------+--------------+-----------+-------------+----------+--------+------------+-----------------+---------------------+--------+---------------+-------+------------+----------+
|BRAND       |CATEGORY        |COLOR         |CUSTOMER_ID|ORDER_COUNTRY|ORDER_DATE|ORDER_ID|ORDER_STATUS|PAYMENT_METHOD   |PRODUCT_NAME         |QUANTITY|SHIPPING_METHOD|SIZE   |SUB-CATEGORY|UNIT_PRICE|
+------------+----------------+--------------+-----------+-------------+----------+--------+------------+-----------------+---------------------+--------+---------------+-------+------------+----------+
|H&M Kids    |Kids Clothing   |Pink and Green|2066       |Hong Kong    |2022-01-21|ORD-1281|Processing  |Credit/Debit Card|Pink Floral Dress    |3       |Standard       |Size 6 |Dresses     |24.99     |
|H&M         |Women's Clothing|Cream         |2254       |Spain        |2022-01-23|ORD-541 |Delivered   |Credit/Debit Card|Women's Faux Fur Coat|4       |Standard       |Size M |Outerwear 

Update ORDER_DATE Column's Data Type

In [None]:
#Use withColumn method & to_date()
# withColumn Documentation : https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html
# TO_DATE() Documentation : https://docs.databricks.com/sql/language-manual/functions/to_date.html

from pyspark.sql.functions import *


ORDERS_Gold_df =  ORDERS_Gold_df.withColumn("ORDER_DATE", to_date(col("ORDER_DATE"),"yyyy-MM-dd"))

Drop Rows with Null Values

In [None]:
# Count Nulls for each column
from pyspark.sql.functions import *

display(ORDERS_Gold_df.select([count(when(col(c).isNull(),c)).alias(c) for c in ORDERS_Gold_df.columns]))

BRAND,CATEGORY,COLOR,CUSTOMER_ID,ORDER_COUNTRY,ORDER_DATE,ORDER_ID,ORDER_STATUS,PAYMENT_METHOD,PRODUCT_NAME,QUANTITY,SHIPPING_METHOD,SIZE,SUB-CATEGORY,UNIT_PRICE
0,0,0,10,10,0,10,0,0,10,10,0,0,0,0


In [None]:
#  Remove Nulls using dropna() method which removes all rows with Null Values

ORDERS_Gold_df = ORDERS_Gold_df.dropna()

ORDERS_Gold_df.count()

Out[79]: 1500

Add new Column TOTAL_ORDER

In [None]:
#Use withColumn function
#Documentation : https://spark.apache.org/docs/3.1.3/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html


ORDERS_Gold_df= ORDERS_Gold_df.withColumn("TOTAL_ORDER",round(col("QUANTITY")*col("UNIT_PRICE"),2))

# Display ORDERS_Gold_df to validate the creation of the New Column TOTAL_ORDER
display(ORDERS_Gold_df)

BRAND,CATEGORY,COLOR,CUSTOMER_ID,ORDER_COUNTRY,ORDER_DATE,ORDER_ID,ORDER_STATUS,PAYMENT_METHOD,PRODUCT_NAME,QUANTITY,SHIPPING_METHOD,SIZE,SUB-CATEGORY,UNIT_PRICE,TOTAL_ORDER
H&M Kids,Kids Clothing,Pink and Green,2066,Hong Kong,2022-01-21,ORD-1281,Processing,Credit/Debit Card,Pink Floral Dress,3,Standard,Size 6,Dresses,24.99,74.97
H&M,Women's Clothing,Cream,2254,Spain,2022-01-23,ORD-541,Delivered,Credit/Debit Card,Women's Faux Fur Coat,4,Standard,Size M,Outerwear,129.99,519.96
Canada Goose,Men's Clothing,Dark Green,2033,Hong Kong,2022-01-23,ORD-1388,Shipped,Credit/Debit Card,Men's Parka,2,Standard,Size XL,Jackets,999.99,1999.98
Zara Kids,Kids Clothing,Grey,2144,Switzerland,2022-01-24,ORD-1158,Delivered,Credit/Debit Card,Grey Hoodie,1,Standard,Size L,Sweatshirts,39.99,39.99
Canada Goose,Men's Clothing,Green,2001,Germany,2022-01-24,ORD-1351,Shipped,Credit/Debit Card,Green Parka,2,Standard,Size L,Outerwear,699.99,1399.98
Mango,Women's Clothing,Pink/White,2360,South Africa,2022-01-25,ORD-665,Delivered,Credit/Debit Card,Floral Midi Dress,2,Standard,Size 6,Dresses,148.0,296.0
H&M Kids,Kids Clothing,Orange,2128,Canada,2022-01-26,ORD-428,Cancelled,PayPal,Orange Cargo Shorts,5,Standard,Size 8,Shorts,22.99,114.95
Gap,Men's Clothing,Navy,2406,Egypt,2022-01-26,ORD-760,Shipped,Cash on Delivery,Classic Cotton T-Shirt,7,Express,Size L,Tops,24.99,174.93
Zara,Men's Clothing,Green,2365,Mexico,2022-01-26,ORD-384,Cancelled,PayPal,Green Utility Jacket,7,Standard,Size XL,Jackets,129.99,909.93
H&M Kids,Kids Clothing,Gray,2027,Italy,2022-01-27,ORD-567,Shipped,Credit/Debit Card,Gray Sweatshirt,5,Standard,Size 6,Tops,19.99,99.95


Create Delta Table ORDERS_GOLD

In [None]:
# Make sure you are using SupplyChainDB
spark.sql(f"USE SupplyChainDB")

## Create DeltaTable Orders_GOLD:

ORDERS_Gold_df.write.mode("overwrite").format("delta").saveAsTable("ORDERS_GOLD")


## Validate that the table was created successfully
display(spark.sql(f"SHOW TABLES"))

database,tableName,isTemporary
supplychaindb,inventory,False
supplychaindb,orders_gold,False
supplychaindb,orders_raw,False


In [None]:
display(spark.sql(f"SHOW TABLES"))

database,tableName,isTemporary
supplychaindb,inventory,False
supplychaindb,orders_gold,False
supplychaindb,orders_raw,False


-- Read more about different write options and parameters here https://docs.delta.io/latest/delta-batch.html#write-to-a-table

* **Append** to automatically add new data to an existing Delta table,
* **Overwrite** To automatically replace all the data in a table:

Query Orders Delta table using SQL

### Get Familiar with Orders_Gold dataset

In [None]:
%sql
-- Get top 30 rows Get Familiar with the Data
select * from supplychaindb.ORDERS_GOLD limit 30

BRAND,CATEGORY,COLOR,CUSTOMER_ID,ORDER_COUNTRY,ORDER_DATE,ORDER_ID,ORDER_STATUS,PAYMENT_METHOD,PRODUCT_NAME,QUANTITY,SHIPPING_METHOD,SIZE,SUB-CATEGORY,UNIT_PRICE,TOTAL_ORDER
H&M Kids,Kids Clothing,Pink and Green,2066,Hong Kong,2022-01-21,ORD-1281,Processing,Credit/Debit Card,Pink Floral Dress,3,Standard,Size 6,Dresses,24.99,74.97
H&M,Women's Clothing,Cream,2254,Spain,2022-01-23,ORD-541,Delivered,Credit/Debit Card,Women's Faux Fur Coat,4,Standard,Size M,Outerwear,129.99,519.96
Canada Goose,Men's Clothing,Dark Green,2033,Hong Kong,2022-01-23,ORD-1388,Shipped,Credit/Debit Card,Men's Parka,2,Standard,Size XL,Jackets,999.99,1999.98
Zara Kids,Kids Clothing,Grey,2144,Switzerland,2022-01-24,ORD-1158,Delivered,Credit/Debit Card,Grey Hoodie,1,Standard,Size L,Sweatshirts,39.99,39.99
Canada Goose,Men's Clothing,Green,2001,Germany,2022-01-24,ORD-1351,Shipped,Credit/Debit Card,Green Parka,2,Standard,Size L,Outerwear,699.99,1399.98
Mango,Women's Clothing,Pink/White,2360,South Africa,2022-01-25,ORD-665,Delivered,Credit/Debit Card,Floral Midi Dress,2,Standard,Size 6,Dresses,148.0,296.0
H&M Kids,Kids Clothing,Orange,2128,Canada,2022-01-26,ORD-428,Cancelled,PayPal,Orange Cargo Shorts,5,Standard,Size 8,Shorts,22.99,114.95
Gap,Men's Clothing,Navy,2406,Egypt,2022-01-26,ORD-760,Shipped,Cash on Delivery,Classic Cotton T-Shirt,7,Express,Size L,Tops,24.99,174.93
Zara,Men's Clothing,Green,2365,Mexico,2022-01-26,ORD-384,Cancelled,PayPal,Green Utility Jacket,7,Standard,Size XL,Jackets,129.99,909.93
H&M Kids,Kids Clothing,Gray,2027,Italy,2022-01-27,ORD-567,Shipped,Credit/Debit Card,Gray Sweatshirt,5,Standard,Size 6,Tops,19.99,99.95


### KPI-1: Quantity Sold by Country

In [None]:
%sql
-- Division = CATEGORY
-- Dont forget to Filter out Cancelled Orders
select ORDER_COUNTRY, Sum(QUANTITY) as TOTAL_DEMAND from supplychaindb.ORDERS_GOLD WHERE ORDER_STATUS != "Cancelled" GROUP BY ORDER_COUNTRY

ORDER_COUNTRY,TOTAL_DEMAND
Germany,278
France,243
Greece,252
India,229
United States,215
China,211
Italy,181
Norway,244
Spain,213
Denmark,185


Databricks visualization. Run in Databricks to view.

### KPI-2: Sales by Division ($)

In [None]:
%sql
-- Dont forget to Filter out Cancelled Orders
select CATEGORY, ROUND(SUM(TOTAL_ORDER),2) as Revenue from supplychaindb.ORDERS_GOLD WHERE ORDER_STATUS != "Cancelled" GROUP BY CATEGORY ORDER BY Revenue DESC

CATEGORY,Revenue
Men's Clothing,403902.19
Women's Clothing,275863.92
Accessories,99434.47
Kids Clothing,85677.18
Men's Shoes,50567.42
Men's Accessories,31758.91
Women's Accessories,21388.93
Women's Shoes,10349.13
Unisex Accessories,6899.54


Databricks visualization. Run in Databricks to view.

### KPI-3: Top-5 Popular Brands

In [None]:
%sql
-- Limit Result to 5 and Order Results and order by Sold Quanity
SELECT BRAND, SUM(QUANTITY) as TOTAL_SOLD_ITEMS from supplychaindb.ORDERS_GOLD GROUP BY BRAND ORDER BY TOTAL_SOLD_ITEMS DESC LIMIT 5

BRAND,TOTAL_SOLD_ITEMS
Mango,679
Coach,436
Zara,418
Nike Kids,381
H&M Kids,365


Databricks visualization. Run in Databricks to view.

Create Dashboard

In [None]:
# Use Databricks UI
# 1- Turn results of Previous Queries into visualisations
# 2- Create Dashboard and add Visualisations

Add Monthly Sales Trend to your Dashboard

### KPI-4: Monthly Sales Trend (In QTY)

In [None]:
%sql
-- Use DATE_TRUNC()
select date_trunc('month',ORDER_DATE) as Month, SUM(QUANTITY) FROM supplychaindb.ORDERS_GOLD WHERE ORDER_STATUS != "Cancelled" GROUP BY 1 ORDER BY 1 ASC

Month,sum(QUANTITY)
2022-01-01T00:00:00.000+0000,61
2022-02-01T00:00:00.000+0000,156
2022-03-01T00:00:00.000+0000,192
2022-04-01T00:00:00.000+0000,167
2022-05-01T00:00:00.000+0000,200
2022-06-01T00:00:00.000+0000,198
2022-07-01T00:00:00.000+0000,433
2022-08-01T00:00:00.000+0000,332
2022-09-01T00:00:00.000+0000,479
2022-10-01T00:00:00.000+0000,456


Databricks visualization. Run in Databricks to view.

Update Data in Orders table using Merge

<img src="https://databricks.com/wp-content/uploads/2020/09/delta-lake-medallion-model-scaled.jpg" width=1012/>

Upload Json files into DBFS

Use UI to upload the file "UPDATE_ORDERS_RAW.json" into DBFS, use the same folder dbfs:/FileStore/SupplyChain/ORDERS_RAW/

Read file using Spark dataframe

In [None]:
# Read multiple line json file UPDATE_ORDERS_RAW.json
Update_orders_df = spark.read.option("multiline","true").json("dbfs:/FileStore/SupplyChain/ORDERS_RAW/UPDATE_ORDERS_RAW.json")

## Show the datafarme
display(Update_orders_df)

BRAND,CATEGORY,COLOR,CUSTOMER_ID,ORDER_COUNTRY,ORDER_DATE,ORDER_ID,ORDER_STATUS,PAYMENT_METHOD,PRODUCT_NAME,QUANTITY,SHIPPING_METHOD,SIZE,SUB-CATEGORY,UNIT_PRICE
H&M Kids,Kids Clothing,Pink and Green,2066,Hong Kong,2022-01-21,ORD-1281,Delivered,Credit/Debit Card,Pink Floral Dress,4,Standard,Size 6,Dresses,24.99
Mango,Women's Clothing,Black,2023,Saudi Arabia,2022-01-28,ORD-829,Delivered,Credit/Debit Card,Women's Leather Moto Jacket,3,Standard,Size S,Jackets,199.99
Madewell,Women's Clothing,Blue,2041,Saudi Arabia,2022-01-28,ORD-193,Delivered,Cash on Delivery,Blue Denim Jacket,3,Standard,Size M,Jackets,99.99
Barbour,Men's Clothing,Navy,2074,Norway,2022-05-29,ORD-826,Cancelled,Credit/Debit Card,Men's Quilted Jacket,0,Standard,Size L,Jackets,299.99
Gap Kids,Kids Clothing,Red,2393,Saudi Arabia,2022-05-30,ORD-842,Cancelled,Credit/Debit Card,Red Graphic T-shirt,0,Standard,Size 8,Tops,14.99


-->Check the original data **BEFORE MERGE**

In [None]:
%sql
select ORDER_ID,ORDER_STATUS,Quantity from Supplychaindb.ORDERS_RAW WHERE ORDER_ID in ("ORD-1281","ORD-829","ORD-193","ORD-826","ORD-842")

ORDER_ID,ORDER_STATUS,Quantity
ORD-1281,Processing,3
ORD-829,Processing,3
ORD-193,Shipped,1
ORD-826,Processing,10
ORD-842,Processing,10


Update Orders_RAW deltatable using Merge

In [None]:
%sql
DESCRIBE DETAIL supplychaindb.ORDERS_RAW

format,id,name,description,location,createdAt,lastModified,partitionColumns,numFiles,sizeInBytes,properties,minReaderVersion,minWriterVersion,tableFeatures,statistics
delta,091bd459-3a0d-48b6-b8c8-9640eede1fe6,spark_catalog.supplychaindb.orders_raw,,dbfs:/user/hive/warehouse/supplychaindb.db/orders_raw,2024-11-09T13:12:42.821+0000,2024-11-09T15:25:16.000+0000,List(),4,57492,Map(),1,2,"List(appendOnly, invariants)",Map()


In [None]:
from delta.tables import *

# programmatically interacting with Delta tables using the class delta.tables.DeltaTable(spark: pyspark.sql.session.SparkSession, jdt: JavaObject)
delta_orders_raw =  DeltaTable.forPath(spark,'dbfs:/user/hive/warehouse/supplychaindb.db/orders_raw')

In [None]:
## merge data into delta Table ORDER_RAW
# DOCUMENTATION https://docs.delta.io/latest/delta-update.html#language-python

delta_orders_raw.alias("ORDERS_RAW").merge(Update_orders_df.alias("UpdateOrders"),
                                          "ORDERS_RAW.ORDER_ID = UpdateOrders.ORDER_ID")\
                                          .whenMatchedUpdateAll()\
                                          .whenNotMatchedInsertAll()\
                                          .execute()

# must be at least one WHEN clause in a MERGE statement.

--> check the udaptes rows **AFTER MERGE**

In [None]:
%sql
select ORDER_ID,ORDER_STATUS,Quantity from SUPPLYCHAINDB.ORDERS_RAW WHERE ORDER_ID in ("ORD-1281","ORD-829","ORD-193","ORD-826","ORD-842")

ORDER_ID,ORDER_STATUS,Quantity
ORD-1281,Delivered,4
ORD-829,Delivered,3
ORD-193,Delivered,3
ORD-826,Cancelled,0
ORD-842,Cancelled,0


In [None]:
%sql
SELECT COUNT(*)
FROM supplychaindb.orders_gold
WHERE ORDER_STATUS = "Cancelled"

count(1)
178


Query previous versions of delta table using **Time Travel**

### a. Describe Detla Table History:

In [None]:
%sql
-- Check Table History
DESCRIBE HISTORY supplychaindb.ORDERS_RAW
-- Use the UI to see Delta Table History

version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2024-11-09T15:26:03.000+0000,1178705447765075,phanvantien.sgn@gmail.com,MERGE,"Map(predicate -> [""(ORDER_ID#17385 = ORDER_ID#17119)""], matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(2312249111136527),1109-125726-s3y4aw7u,2.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 495, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 22755, numTargetBytesRemoved -> 17717, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 5, executionTimeMs -> 5967, materializeSourceTimeMs -> 206, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 2082, numTargetRowsUpdated -> 5, numOutputRows -> 500, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 5, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 3402)",,Databricks-Runtime/12.2.x-scala2.12
2,2024-11-09T15:25:16.000+0000,1178705447765075,phanvantien.sgn@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2312249111136527),1109-125726-s3y4aw7u,1.0,WriteSerializable,False,"Map(numFiles -> 4, numOutputRows -> 1510, numOutputBytes -> 57492)",,Databricks-Runtime/12.2.x-scala2.12
1,2024-11-09T14:39:18.000+0000,1178705447765075,phanvantien.sgn@gmail.com,MERGE,"Map(predicate -> [""(ORDER_ID#8836 = ORDER_ID#8570)""], matchedPredicates -> [{""actionType"":""update""}], notMatchedPredicates -> [{""actionType"":""insert""}], notMatchedBySourcePredicates -> [])",,List(2312249111136527),1109-125726-s3y4aw7u,0.0,WriteSerializable,False,"Map(numTargetRowsCopied -> 495, numTargetRowsDeleted -> 0, numTargetFilesAdded -> 2, numTargetBytesAdded -> 22755, numTargetBytesRemoved -> 17717, numTargetDeletionVectorsAdded -> 0, numTargetRowsMatchedUpdated -> 5, executionTimeMs -> 9992, materializeSourceTimeMs -> 436, numTargetRowsInserted -> 0, numTargetRowsMatchedDeleted -> 0, scanTimeMs -> 4666, numTargetRowsUpdated -> 5, numOutputRows -> 500, numTargetDeletionVectorsRemoved -> 0, numTargetRowsNotMatchedBySourceUpdated -> 0, numTargetChangeFilesAdded -> 0, numSourceRows -> 5, numTargetFilesRemoved -> 1, numTargetRowsNotMatchedBySourceDeleted -> 0, rewriteTimeMs -> 4370)",,Databricks-Runtime/12.2.x-scala2.12
0,2024-11-09T13:12:58.000+0000,1178705447765075,phanvantien.sgn@gmail.com,CREATE OR REPLACE TABLE AS SELECT,"Map(isManaged -> true, description -> null, partitionBy -> [], properties -> {})",,List(2312249111136527),1109-125726-s3y4aw7u,,WriteSerializable,False,"Map(numFiles -> 4, numOutputRows -> 1510, numOutputBytes -> 57492)",,Databricks-Runtime/12.2.x-scala2.12


 Using SQL:

In [None]:
%sql
 select ORDER_ID,ORDER_STATUS,Quantity from SUPPLYCHAINDB.ORDERS_RAW VERSION AS OF 1 WHERE ORDER_ID in ("ORD-1281","ORD-829","ORD-193","ORD-826","ORD-842")

-- CHange Version Number to See different Versions of the delta table

ORDER_ID,ORDER_STATUS,Quantity
ORD-1281,Delivered,4
ORD-829,Delivered,3
ORD-193,Delivered,3
ORD-826,Cancelled,0
ORD-842,Cancelled,0


Using Spark dataframe:

In [None]:
#Time Travel
version_1 = spark.read.format('delta').option('TimeStamp', "2023-05-16").table("SUPPLYCHAINDB.ORDERS_RAW")
display(version_1)

BRAND,CATEGORY,COLOR,CUSTOMER_ID,ORDER_COUNTRY,ORDER_DATE,ORDER_ID,ORDER_STATUS,PAYMENT_METHOD,PRODUCT_NAME,QUANTITY,SHIPPING_METHOD,SIZE,SUB-CATEGORY,UNIT_PRICE
H&M,Women's Clothing,Cream,2254.0,Spain,2022-01-23,ORD-541,Delivered,Credit/Debit Card,Women's Faux Fur Coat,4.0,Standard,Size M,Outerwear,129.99
Canada Goose,Men's Clothing,Dark Green,2033.0,Hong Kong,2022-01-23,ORD-1388,Shipped,Credit/Debit Card,Men's Parka,2.0,Standard,Size XL,Jackets,999.99
Zara Kids,Kids Clothing,Grey,2144.0,Switzerland,2022-01-24,ORD-1158,Delivered,Credit/Debit Card,Grey Hoodie,1.0,Standard,Size L,Sweatshirts,39.99
Canada Goose,Men's Clothing,Green,2001.0,Germany,2022-01-24,ORD-1351,Shipped,Credit/Debit Card,Green Parka,2.0,Standard,Size L,Outerwear,699.99
Mango,Women's Clothing,Pink/White,2360.0,South Africa,2022-01-25,ORD-665,Delivered,Credit/Debit Card,Floral Midi Dress,2.0,Standard,Size 6,Dresses,148.0
H&M Kids,Kids Clothing,Orange,2128.0,Canada,2022-01-26,ORD-428,Cancelled,PayPal,Orange Cargo Shorts,5.0,Standard,Size 8,Shorts,22.99
Gap,Men's Clothing,Navy,2406.0,Egypt,2022-01-26,ORD-760,Shipped,Cash on Delivery,Classic Cotton T-Shirt,7.0,Express,Size L,Tops,24.99
Zara,Men's Clothing,Green,2365.0,Mexico,2022-01-26,ORD-384,Cancelled,PayPal,Green Utility Jacket,7.0,Standard,Size XL,Jackets,129.99
H&M Kids,Kids Clothing,Gray,2027.0,Italy,2022-01-27,ORD-567,Shipped,Credit/Debit Card,Gray Sweatshirt,5.0,Standard,Size 6,Tops,19.99
J.Crew,Men's Clothing,Green,2092.0,United Kingdom,2022-01-27,ORD-1590,Delivered,Credit/Debit Card,Green Cargo Pants,1.0,Standard,Size 32x32,Pants,89.99


In [None]:
inventory_df = spark.read.option("multiline","true").json("dbfs:/FileStore/SupplyChain/INVENTORY/INVENTORY.json")

## Show the datafarme
inventory_df.show(n=5, truncate=False)

+-------+----------+--------------------------+----------+-----+
|BRAND  |COLOR     |PRODUCT_NAME              |SIZE      |STOCK|
+-------+----------+--------------------------+----------+-----+
|J.Crew |Green     |Green Cargo Pants         |Size 32x32|58   |
|Theory |Grey      |Grey Turtleneck Sweater   |Size S    |42   |
|Ray-Ban|Gold/Brown|Classic Aviator Sunglasses|One Size  |53   |
|ASOS   |Black     |Men's Faux Leather Jacket |Size M    |40   |
|Levi's |Light Blue|Distressed Denim Shorts   |Size M    |46   |
+-------+----------+--------------------------+----------+-----+
only showing top 5 rows



Cross Join ORDERS_GOLD and INVENTORY DeltaTables to find the list of Low Stock or Out-of Stock Items

**Goal** is to find the list of Low-Stock or Out-of-Stock Items and Add the result to your SupplyChain Dashboard<br />


In [None]:
%sql
-- Write Your Query Here :
SELECT * FROM
(
  SELECT O.BRAND,O.PRODUCT_NAME, O.COLOR, O.SIZE, SUM(O.QUANTITY) QTY_SOLD, I.STOCK, (I.STOCK - QTY_SOLD) QTY_LEFT_STOCK
  FROM supplychaindb.ORDERS_GOLD O INNER JOIN supplychaindb.INVENTORY I
  ON O.BRAND = I.BRAND and O.PRODUCT_NAME = O.PRODUCT_NAME and O.COLOR = I.COLOR and O.SIZE = I.SIZE
  WHERE O.ORDER_STATUS != "Cancelled"
  GROUP BY O.BRAND, O.COLOR, O.PRODUCT_NAME, O.SIZE, I.STOCK
) AS STOCK
WHERE STOCK.QTY_LEFT_STOCK < 20
ORDER BY STOCK.QTY_LEFT_STOCK ASC

BRAND,PRODUCT_NAME,COLOR,SIZE,QTY_SOLD,STOCK,QTY_LEFT_STOCK
ASOS,Men's Faux Leather Jacket,Black,Size M,37,40,3
Gap,Classic Cotton T-Shirt,Navy,Size L,40,44,4
Theory,Grey Turtleneck Sweater,Grey,Size S,37,42,5
J.Crew,Green Cargo Pants,Green,Size 32x32,52,58,6
Ray-Ban,Classic Aviator Sunglasses,Gold/Brown,One Size,46,53,7
Levi's,Distressed Denim Shorts,Light Blue,Size M,36,46,10
Steve Madden,Lace-Up Combat Boots,Black,Size 8,55,65,10
Coach,Leather Crossbody Bag,Black,One Size,40,53,13
Coach,Black Leather Crossbody Bag,Black,One Size,39,53,14
Nike Kids,Gray Joggers,Gray,Size 12,36,50,14


Databricks visualization. Run in Databricks to view.

### e. Turn the result into a Visualisation (Table) and Add it to SupplyChain Dashboard

In [None]:
# Use Databricks UI to Turn results into a visualisation and then add it to your SupplyChain Dashboard

# **END OF PROJECT**