In [0]:
from pyspark.sql import SparkSession

# intialize spark session
spark = SparkSession.builder.appName("DeltaLakeExample").getOrCreate()

In [0]:
display(dbutils.fs.ls("dbfs:/Workspace/Users/databricks001@techademy9056.onmicrosoft.com/First"))

path,name,size,modificationTime
dbfs:/Workspace/Users/databricks001@techademy9056.onmicrosoft.com/First/Sample/,Sample/,0,1719261112000
dbfs:/Workspace/Users/databricks001@techademy9056.onmicrosoft.com/First/Temple/,Temple/,0,1719261440000


In [0]:
# load data from CSV
path_to_file = "file:/Workspace/Users/databricks001@techademy9056.onmicrosoft.com/Third/data.csv"
raw_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(path_to_file) 

In [0]:
raw_df.head(5)

[Row(transaction_id=1, product_id=101, quantity=2, price=10.5, sale_date=datetime.date(2023, 6, 1)),
 Row(transaction_id=2, product_id=102, quantity=1, price=20.0, sale_date=datetime.date(2023, 6, 1)),
 Row(transaction_id=3, product_id=103, quantity=5, price=5.0, sale_date=datetime.date(2023, 6, 2)),
 Row(transaction_id=4, product_id=101, quantity=3, price=10.5, sale_date=datetime.date(2023, 6, 2)),
 Row(transaction_id=5, product_id=104, quantity=None, price=2.0, sale_date=datetime.date(2023, 6, 3))]

In [0]:
# write to bronze table in delta lake 
raw_df.write.format("delta").mode("overwrite").save("/delta/bronze/sales")



In [0]:
%sh
path=$(pwd)
echo $path

/Workspace/Users/databricks001@techademy9056.onmicrosoft.com/Third


In [0]:
# read raw data from the bronze table
bronze_df = spark.read.format("delta").load("/delta/bronze/sales")

In [0]:
# clean and transform the data 
# drop duplicates 

clean_df = bronze_df.dropDuplicates(["transaction_id"])

# filter out rows with missing values in critical columns 
clean_df = clean_df.filter("quantity is not null and price is not null AND sale_date IS NOT NULL")

clean_df.write.format("delta").mode("overwrite").save("/delta/silver/sales")

In [0]:
from pyspark.sql.functions import sum, col

# read cleaned data from the silver table
silver_df = spark.read.format("delta").load("/delta/silver/sales")

# aggregate the date total sales and total quantity sold per product
agg_df = silver_df.groupBy("product_id").agg(sum(col("quantity")).alias("total_quantity"), sum(col("quantity") * col("price")).alias("total_sales"))

#write to gold table in delta lake
agg_df.write.format("delta").mode("overwrite").save("/delta/gold/sales_summary")

In [0]:
gold_df = spark.read.format("delta").load("/delta/gold/sales_summary")

In [0]:
raw_df.show()

+--------------+----------+--------+-----+----------+
|transaction_id|product_id|quantity|price| sale_date|
+--------------+----------+--------+-----+----------+
|             1|       101|       2| 10.5|2023-06-01|
|             2|       102|       1| 20.0|2023-06-01|
|             3|       103|       5|  5.0|2023-06-02|
|             4|       101|       3| 10.5|2023-06-02|
|             5|       104|    NULL|  2.0|2023-06-03|
|             6|       105|       7| NULL|2023-06-03|
|             7|       101|       3| 10.5|2023-06-02|
|             8|       106|       2| 15.0|2023-06-04|
|             9|       107|       1| 25.0|      NULL|
|            10|       108|       1| 30.0|2023-06-05|
+--------------+----------+--------+-----+----------+



In [0]:
bronze_df.show()

+--------------+----------+--------+-----+----------+
|transaction_id|product_id|quantity|price| sale_date|
+--------------+----------+--------+-----+----------+
|             1|       101|       2| 10.5|2023-06-01|
|             2|       102|       1| 20.0|2023-06-01|
|             3|       103|       5|  5.0|2023-06-02|
|             4|       101|       3| 10.5|2023-06-02|
|             5|       104|    NULL|  2.0|2023-06-03|
|             6|       105|       7| NULL|2023-06-03|
|             7|       101|       3| 10.5|2023-06-02|
|             8|       106|       2| 15.0|2023-06-04|
|             9|       107|       1| 25.0|      NULL|
|            10|       108|       1| 30.0|2023-06-05|
+--------------+----------+--------+-----+----------+



In [0]:
silver_df.show()

+--------------+----------+--------+-----+----------+
|transaction_id|product_id|quantity|price| sale_date|
+--------------+----------+--------+-----+----------+
|             1|       101|       2| 10.5|2023-06-01|
|             3|       103|       5|  5.0|2023-06-02|
|             4|       101|       3| 10.5|2023-06-02|
|             8|       106|       2| 15.0|2023-06-04|
|             7|       101|       3| 10.5|2023-06-02|
|            10|       108|       1| 30.0|2023-06-05|
|             2|       102|       1| 20.0|2023-06-01|
+--------------+----------+--------+-----+----------+



In [0]:
gold_df.show()

+----------+--------------+-----------+
|product_id|total_quantity|total_sales|
+----------+--------------+-----------+
|       108|             1|       30.0|
|       101|             8|       84.0|
|       103|             5|       25.0|
|       102|             1|       20.0|
|       106|             2|       30.0|
+----------+--------------+-----------+

