In [0]:
%sql
CREATE CATALOG IF NOT EXISTS my_catalog;
CREATE SCHEMA IF NOT EXISTS my_catalog.my_schema;
CREATE VOLUME IF NOT EXISTS my_catalog.my_schema.session4new;

In [0]:
from pyspark.sql.functions import col, current_timestamp

base_path = "/Volumes/my_catalog/my_schema/session4new"
bronze_input = f"{base_path}/bronze_sales"
bronze_output = f"{base_path}/bronze_delta"
bronze_schema = f"{base_path}/bronze_schema"

bronze_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .option("cloudFiles.schemaLocation", bronze_schema)
    .option("mergeSchema", "true")
    .option("header", "true")
    .load(bronze_input)
    .withColumn("ingestion_timestamp", current_timestamp())
    .withColumn("source_file", col("_metadata.file_path"))
)

(
    bronze_df.writeStream.format("delta")
    .option("checkpointLocation", f"{bronze_output}/_checkpoint")
    .trigger(availableNow=True)
    .outputMode("append")
    .start(bronze_output)
    .awaitTermination()
)


In [0]:

display(spark.read.format("delta").load(f"{base_path}/bronze_delta"))


product_id,product_name,store,quantity,price,sale_date,_rescued_data,ingestion_timestamp,source_file
101,Soap,Store_A,12,25.5,2025-10-02,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-02.csv
102,Shampoo,Store_B,7,120.0,2025-10-02,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-02.csv
103,Toothpaste,Store_B,10,55.0,2025-10-02,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-02.csv
104,Conditioner,Store_C,3,150.0,2025-10-02,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-02.csv
101,Soap,Store_A,10,25.5,2025-10-01,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-01.csv
102,Shampoo,Store_A,5,120.0,2025-10-01,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-01.csv
103,Toothpaste,Store_B,8,55.0,2025-10-01,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-01.csv
104,Conditioner,Store_C,2,150.0,2025-10-01,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-01.csv
101,Soap,Store_A,15,25.5,2025-10-03,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-03.csv
102,Shampoo,Store_B,4,120.0,2025-10-03,,2025-10-28T14:55:43.814Z,/Volumes/my_catalog/my_schema/session4new/bronze_sales/sales_2025-10-03.csv


In [0]:
from pyspark.sql.functions import col, round

# Paths
bronze_output = "/Volumes/my_catalog/my_schema/session4new/bronze_delta"
silver_output = "/Volumes/my_catalog/my_schema/session4new/silver_delta"

# 1 Read data from Bronze Delta
bronze_df = spark.read.format("delta").load(bronze_output)

# 2 Clean + Transform
silver_df = (
    bronze_df
    .dropna(subset=["product_id", "product_name", "store", "quantity", "price", "sale_date"])
    .withColumn("quantity", col("quantity").cast("int"))
    .withColumn("price", col("price").cast("double"))
    .withColumn("total_sales", col("quantity") * col("price"))
)

# 3 Write to Silver Delta table
(
    silver_df.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .save(silver_output)
)

# 4 Display result
display(silver_df)


product_id,product_name,store,quantity,price,sale_date,_rescued_data,ingestion_timestamp,source_file,total_sales
101,Soap,Store_A,12,25.5,2025-10-02,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-02.csv,306.0
102,Shampoo,Store_B,7,120.0,2025-10-02,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-02.csv,840.0
103,Toothpaste,Store_B,10,55.0,2025-10-02,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-02.csv,550.0
104,Conditioner,Store_C,3,150.0,2025-10-02,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-02.csv,450.0
101,Soap,Store_A,10,25.5,2025-10-01,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-01.csv,255.0
102,Shampoo,Store_A,5,120.0,2025-10-01,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-01.csv,600.0
103,Toothpaste,Store_B,8,55.0,2025-10-01,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-01.csv,440.0
104,Conditioner,Store_C,2,150.0,2025-10-01,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-01.csv,300.0
101,Soap,Store_A,15,25.5,2025-10-03,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-03.csv,382.5
102,Shampoo,Store_B,4,120.0,2025-10-03,,2025-10-28T14:16:45.854Z,/Volumes/my_catalog/my_schema/session4new/sales_2025-10-03.csv,480.0


In [0]:
from pyspark.sql.functions import col, sum as _sum, when

# Paths
silver_output = "/Volumes/my_catalog/my_schema/session4new/silver_delta"
gold_output = "/Volumes/my_catalog/my_schema/session4new/gold_delta"

# 1 Read from Silver Delta table
silver_df = spark.read.format("delta").load(silver_output)

# 2 Aggregate total sales per store & date
gold_df = (
    silver_df.groupBy("store", "sale_date")
    .agg(_sum("total_sales").alias("daily_total_sales"))
    .withColumn(
        "performance_flag",
        when(col("daily_total_sales") > 50000, "High Performing").otherwise("Normal")
    )
)

# 3 Write Gold Layer Delta table (partition by store)
(
    gold_df.write.format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .partitionBy("store")
    .save(gold_output)
)

# 4 Display Gold table result
display(gold_df)


store,sale_date,daily_total_sales,performance_flag
Store_C,2025-10-03,1245.0,Normal
Store_A,2025-10-01,855.0,Normal
Store_C,2025-10-01,300.0,Normal
Store_B,2025-10-03,480.0,Normal
Store_C,2025-10-02,450.0,Normal
Store_A,2025-10-02,306.0,Normal
Store_A,2025-10-03,382.5,Normal
Store_B,2025-10-02,1390.0,Normal
Store_B,2025-10-01,440.0,Normal


In [0]:
%sql
OPTIMIZE delta.`/Volumes/my_catalog/my_schema/session4new/gold_delta`;

VACUUM delta.`/Volumes/my_catalog/my_schema/session4new/gold_delta` RETAIN 169 HOURS;


path
dbfs:/Volumes/my_catalog/my_schema/session4new/gold_delta


In [0]:
%sql
DESCRIBE HISTORY delta.`/Volumes/my_catalog/my_schema/session4new/gold_delta`;


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
0,2025-10-28T14:23:21.000Z,70429301836869,rajatsaini9460@gmail.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [""store""])",,List(4249874690713960),1028-134830-1c4jevt-v2n,,WriteSerializable,False,"Map(numFiles -> 3, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 9, numOutputBytes -> 3673)",,Databricks-Runtime/17.2.x-aarch64-photon-scala2.13


In [0]:
%sql
SELECT * FROM delta.`/Volumes/my_catalog/my_schema/session4new/gold_delta@v0`;


store,sale_date,daily_total_sales,performance_flag
Store_A,2025-10-01,855.0,Normal
Store_A,2025-10-02,306.0,Normal
Store_A,2025-10-03,382.5,Normal
Store_C,2025-10-03,1245.0,Normal
Store_C,2025-10-01,300.0,Normal
Store_C,2025-10-02,450.0,Normal
Store_B,2025-10-03,480.0,Normal
Store_B,2025-10-02,1390.0,Normal
Store_B,2025-10-01,440.0,Normal


In [0]:
spark.sql("""
CREATE TABLE IF NOT EXISTS my_catalog.my_schema.bronze_sales
USING DELTA
AS SELECT * FROM delta.`/Volumes/my_catalog/my_schema/session4new/bronze_delta`
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS my_catalog.my_schema.silver_sales
USING DELTA
AS SELECT * FROM delta.`/Volumes/my_catalog/my_schema/session4new/silver_delta`
""")

spark.sql("""
CREATE TABLE IF NOT EXISTS my_catalog.my_schema.gold_sales
USING DELTA
AS SELECT * FROM delta.`/Volumes/my_catalog/my_schema/session4new/gold_delta`
""")


DataFrame[num_affected_rows: bigint, num_inserted_rows: bigint]

In [0]:
%sql
SELECT * FROM my_catalog.my_schema.gold_sales;


store,sale_date,daily_total_sales,performance_flag
Store_A,2025-10-01,855.0,Normal
Store_A,2025-10-02,306.0,Normal
Store_A,2025-10-03,382.5,Normal
Store_B,2025-10-03,480.0,Normal
Store_B,2025-10-02,1390.0,Normal
Store_B,2025-10-01,440.0,Normal
Store_C,2025-10-03,1245.0,Normal
Store_C,2025-10-01,300.0,Normal
Store_C,2025-10-02,450.0,Normal
