In [0]:

from pyspark.sql import functions as F
from pyspark.sql.functions import col, sum as pysum, row_number, lit
from pyspark.sql.window import Window
from delta.tables import DeltaTable
import os


db_name = "retail_db"
bronze_customers_tbl = f"{db_name}.bronze_customers"
bronze_orders_tbl    = f"{db_name}.bronze_orders"
bronze_products_tbl  = f"{db_name}.bronze_products"
silver_orders_tbl    = f"{db_name}.silver_orders"
gold_revenue_tbl     = f"{db_name}.gold_revenue_by_region"
gold_sales_tbl       = f"{db_name}.gold_sales_summary"

customers_path = "/FileStore/tables/customers-3.csv"
orders_day1_path = "/FileStore/tables/orders_day1.csv"
orders_day2_path = "/FileStore/tables/orders_day2.csv"
products_path = "/FileStore/tables/products.json"


spark.sql(f"CREATE DATABASE IF NOT EXISTS {db_name}")
spark.sql(f"USE {db_name}")


customers_df = spark.read.csv(customers_path, header=True, inferSchema=True)
orders_df = spark.read.csv(orders_day1_path, header=True, inferSchema=True) \
                     .withColumn("order_date", F.to_date(col("order_date"), "yyyy-MM-dd"))
products_df = spark.read.json(products_path)


if "region" not in customers_df.columns:
    customers_df = customers_df.withColumn("region", lit("Unknown"))


for col_name in ["name","email"]:
    if col_name not in customers_df.columns:
        customers_df = customers_df.withColumn(col_name, lit("Unknown"))
for col_name in ["product_id","category"]:
    if col_name not in products_df.columns:
        products_df = products_df.withColumn(col_name, lit("Unknown"))


customers_df.write.format("delta").mode("overwrite") \
    .option("mergeSchema", "true").saveAsTable(bronze_customers_tbl)
orders_df.write.format("delta").mode("overwrite") \
    .option("mergeSchema", "true").saveAsTable(bronze_orders_tbl)
products_df.write.format("delta").mode("overwrite") \
    .option("mergeSchema", "true").saveAsTable(bronze_products_tbl)

if os.path.exists("/dbfs" + orders_day2_path[5:]):
    try:
        orders_day2_df = spark.read.csv(orders_day2_path, header=True, inferSchema=True) \
                                .withColumn("order_date", F.to_date(col("order_date"), "yyyy-MM-dd"))
        bronze_orders_delta = DeltaTable.forName(spark, bronze_orders_tbl)
        bronze_orders_delta.alias("tgt").merge(
            orders_day2_df.alias("src"),
            "tgt.order_id = src.order_id"
        ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
        print("Merged orders_day2.csv into bronze_orders.")
    except Exception as e:
        print("Incremental merge failed:", e)


orders_df = spark.table(bronze_orders_tbl)
customers_df = spark.table(bronze_customers_tbl)
products_df = spark.table(bronze_products_tbl)

orders_clean = orders_df.filter(col("status") == "Completed")

silver_orders = (
    orders_clean.withColumn("total_amount", col("quantity") * col("price"))
    .join(customers_df, on="customer_id", how="left")
    .join(products_df, orders_clean["product"] == products_df["product_name"], how="left")
    .select(
        "order_id","customer_id","product","quantity","price","status","order_date",
        "total_amount","name","region","email","product_id","category"
    )
    .dropDuplicates(["order_id"])
)

if '_corrupt_record' in silver_orders.columns:
    silver_orders = silver_orders.drop('_corrupt_record')


silver_orders.write.format("delta").mode("overwrite") \
    .option("overwriteSchema","true") \
    .option("mergeSchema", "true") \
    .saveAsTable(silver_orders_tbl)

print("=== Silver Orders Table ===")
display(silver_orders)


silver = spark.table(silver_orders_tbl)

revenue_by_region = silver.groupBy("region").agg(pysum("total_amount").alias("total_revenue"))
revenue_by_region.write.format("delta").mode("overwrite") \
    .option("overwriteSchema","true") \
    .option("mergeSchema", "true") \
    .saveAsTable(gold_revenue_tbl)

print("=== Gold Revenue by Region ===")
display(revenue_by_region)


product_sales = silver.groupBy("product").agg(
    pysum("quantity").alias("total_quantity"),
    pysum("total_amount").alias("total_amount")
)
window = Window.orderBy(col("total_quantity").desc())
product_sales_ranked = product_sales.withColumn("rank", row_number().over(window))
product_sales_ranked.write.format("delta").mode("overwrite") \
    .option("overwriteSchema","true") \
    .option("mergeSchema", "true") \
    .saveAsTable(gold_sales_tbl)

print("=== Gold Product Sales Summary ===")
display(product_sales_ranked)

silver_table = DeltaTable.forName(spark, silver_orders_tbl)
print("=== Silver Orders History ===")
display(silver_table.history())

silver_v0 = spark.read.format("delta").option("versionAsOf", 0).table(silver_orders_tbl)
print("=== Silver Orders Version 0 ===")
display(silver_v0)


spark.sql(f"OPTIMIZE {gold_revenue_tbl}")
spark.sql(f"OPTIMIZE {gold_sales_tbl}")

spark.sql(f"VACUUM {silver_orders_tbl} RETAIN 168 HOURS")
spark.sql(f"VACUUM {gold_revenue_tbl} RETAIN 168 HOURS")
spark.sql(f"VACUUM {gold_sales_tbl} RETAIN 168 HOURS")


=== Silver Orders Table ===


order_id,customer_id,product,quantity,price,status,order_date,total_amount,name,region,email,product_id,category
1001,1,Laptop,2,55000,Completed,2024-01-15,110000,Arjun Rao,North,arjun@example.com,P001,Electronics
1002,2,Mobile,3,25000,Completed,2024-01-16,75000,Sneha Patel,South,sneha@example.com,P002,Electronics
1004,1,Headphones,5,3000,Completed,2024-01-17,15000,Arjun Rao,North,arjun@example.com,P004,Accessories


=== Gold Revenue by Region ===


region,total_revenue
South,75000
North,125000




=== Gold Product Sales Summary ===


product,total_quantity,total_amount,rank
Headphones,5,15000,1
Mobile,3,75000,2
Laptop,2,110000,3


=== Silver Orders History ===


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
5,2025-10-13T10:45:59Z,145385977523810,azuser4795_mml.local@techademy.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(1272255375366537),1003-070720-nihwdk0g,4.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 3419, numOutputRows -> 3, numOutputBytes -> 3419)",,Databricks-Runtime/16.4.x-photon-scala2.12
4,2025-10-13T10:41:07Z,145385977523810,azuser4795_mml.local@techademy.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(1272255375366537),1003-070720-nihwdk0g,3.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 2899, numOutputRows -> 3, numOutputBytes -> 3419)",,Databricks-Runtime/16.4.x-photon-scala2.12
3,2025-10-13T10:28:20Z,145385977523810,azuser4795_mml.local@techademy.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2831202045588650),1003-070720-nihwdk0g,2.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 3360, numOutputRows -> 3, numOutputBytes -> 2899)",,Databricks-Runtime/16.4.x-photon-scala2.12
2,2025-10-13T10:13:20Z,145385977523810,azuser4795_mml.local@techademy.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(1272255375366537),1003-070720-nihwdk0g,1.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 2899, numOutputRows -> 3, numOutputBytes -> 3360)",,Databricks-Runtime/16.4.x-photon-scala2.12
1,2025-10-13T10:06:42Z,145385977523810,azuser4795_mml.local@techademy.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(2831202045588650),1003-070720-nihwdk0g,0.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 2899, numOutputRows -> 3, numOutputBytes -> 2899)",,Databricks-Runtime/16.4.x-photon-scala2.12
0,2025-10-13T10:01:04Z,145385977523810,azuser4795_mml.local@techademy.com,CREATE OR REPLACE TABLE AS SELECT,"Map(partitionBy -> [], clusterBy -> [], description -> null, isManaged -> true, properties -> {""delta.enableDeletionVectors"":""true""}, statsOnLoad -> true)",,List(1272255375366537),1003-070720-nihwdk0g,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numOutputRows -> 3, numOutputBytes -> 2899)",,Databricks-Runtime/16.4.x-photon-scala2.12


=== Silver Orders Version 0 ===


order_id,customer_id,product,quantity,price,status,order_date,total_amount,name,product_id,category
1002,2,Mobile,3,25000,Completed,2024-01-16,75000,Priya Singh,P002,Electronics
1001,1,Laptop,2,55000,Completed,2024-01-15,110000,Rahul Sharma,P001,Electronics
1004,1,Headphones,5,3000,Completed,2024-01-17,15000,Rahul Sharma,P004,Accessories


DataFrame[path: string]