# Module 4: Managing Data with Delta Lake and Medallion — Hands-on Lab

**Datasets (upload to `dbfs:/FileStore/delta_lab/input/`):**
- `sales.csv`
- `products.csv`
- `customer_demographics.csv`

**Table names (Unity Catalog or Hive metastore):**
- `delta_lab.bronze_sales`, `delta_lab.bronze_products`, `delta_lab.bronze_customers`
- `delta_lab.silver_sales`, `delta_lab.silver_products`, `delta_lab.silver_customers`
- `delta_lab.gold_sales_item`, `delta_lab.gold_sales_customer`

> If using the Hive metastore, drop the catalog prefix (e.g., use `bronze_sales`).

## 1) Setup: Paths & Helper Config

In [0]:
# Python cell: set DBFS paths and create directories
base_path = "dbfs:/FileStore/delta_lab"
input_path = f"{base_path}/input"
bronze_path = f"{base_path}/bronze"
silver_path = f"{base_path}/silver"
gold_path = f"{base_path}/gold"
chk_path = f"{base_path}/checkpoints"

# Create directories if they don't exist
dbutils.fs.mkdirs(base_path)
dbutils.fs.mkdirs(input_path)
dbutils.fs.mkdirs(bronze_path)
dbutils.fs.mkdirs(silver_path)
dbutils.fs.mkdirs(gold_path)
dbutils.fs.mkdirs(chk_path)

## 2) Ingest CSVs to Bronze as Delta

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType, DateType

# Define explicit schemas for enforcement
sales_schema = StructType([
    StructField("SalesOrderNumber", StringType(), True),
    StructField("SalesOrderLineNumber", IntegerType(), True),
    StructField("OrderDate", TimestampType(), True),
    StructField("CustomerName", StringType(), True),
    StructField("EmailAddress", StringType(), True),
    StructField("Item", StringType(), True),
    StructField("Quantity", IntegerType(), True),
    StructField("UnitPrice", DoubleType(), True),
    StructField("TaxAmount", DoubleType(), True),
])

products_schema = StructType([
    StructField("Item", StringType(), True),
    StructField("ProductName", StringType(), True),
    StructField("Category", StringType(), True),
    StructField("UnitPrice", DoubleType(), True),
])

customers_schema = StructType([
    StructField("CustomerId", StringType(), True),
    StructField("CustomerName", StringType(), True),
    StructField("EmailAddress", StringType(), True),
    StructField("City", StringType(), True),
    StructField("State", StringType(), True),
    StructField("Country", StringType(), True),
])

sales_df = (spark.read
            .format("csv")
            .schema(sales_schema)
            .option("header", "true")
            .option("timestampFormat", "yyyy-MM-dd[ HH:mm:ss]")
            .load(f"{input_path}/sales.csv"))

products_df = (spark.read
               .format("csv")
               .schema(products_schema)
               .option("header", "true")
               .load(f"{input_path}/products.csv"))

customers_df = (spark.read
                .format("csv")
                .schema(customers_schema)
                .option("header", "true")
                .load(f"{input_path}/customer_demographics.csv"))

(sales_df.write.format("delta").mode("overwrite").save(f"{bronze_path}/sales"))
(products_df.write.format("delta").mode("overwrite").save(f"{bronze_path}/products"))
(customers_df.write.format("delta").mode("overwrite").save(f"{bronze_path}/customers"))

# Create Bronze managed/external tables
spark.sql("CREATE SCHEMA IF NOT EXISTS delta_lab")
spark.sql(f"CREATE TABLE IF NOT EXISTS delta_lab.bronze_sales USING DELTA LOCATION '{bronze_path}/sales'")
spark.sql(f"CREATE TABLE IF NOT EXISTS delta_lab.bronze_products USING DELTA LOCATION '{bronze_path}/products'")
spark.sql(f"CREATE TABLE IF NOT EXISTS delta_lab.bronze_customers USING DELTA LOCATION '{bronze_path}/customers'")

## 3) Schema Enforcement & Evolution (to Silver)

In [0]:
from pyspark.sql.functions import col, to_date, lit

silver_sales_df = (spark.read.format("delta").load(f"{bronze_path}/sales")
                   .withColumn("OrderDate", col("OrderDate").cast("timestamp"))
                   .withColumn("OrderDateKey", to_date(col("OrderDate")).cast("date"))
                   .withColumn("Amount", (col("Quantity") * col("UnitPrice") + col("TaxAmount")).cast("double"))
                   .select("SalesOrderNumber","SalesOrderLineNumber","OrderDate","OrderDateKey",
                           "CustomerName","EmailAddress","Item","Quantity","UnitPrice","TaxAmount","Amount"))

silver_products_df = (spark.read.format("delta").load(f"{bronze_path}/products")
                      .select("Item","ProductName","Category","UnitPrice"))

silver_customers_df = (spark.read.format("delta").load(f"{bronze_path}/customers")
                       .select("CustomerId","CustomerName","EmailAddress","City","State","Country"))

(silver_sales_df.write.format("delta").mode("overwrite").save(f"{silver_path}/sales"))
(silver_products_df.write.format("delta").mode("overwrite").save(f"{silver_path}/products"))
(silver_customers_df.write.format("delta").mode("overwrite").save(f"{silver_path}/customers"))

spark.sql(f"CREATE TABLE IF NOT EXISTS delta_lab.silver_sales USING DELTA LOCATION '{silver_path}/sales'")
spark.sql(f"CREATE TABLE IF NOT EXISTS delta_lab.silver_products USING DELTA LOCATION '{silver_path}/products'")
spark.sql(f"CREATE TABLE IF NOT EXISTS delta_lab.silver_customers USING DELTA LOCATION '{silver_path}/customers'")

### Auto evolution vs explicit schema (add new column to products)
We'll simulate a new column `UnitCost` and upsert with schema evolution.

In [0]:
# Simulate UnitCost column as 70% of UnitPrice
products_new_cols_df = (spark.read.format("delta").load(f"{silver_path}/products")
                        .withColumn("UnitCost", (col("UnitPrice") * lit(0.7)).cast("double")))

# Explicit schema merge (per write):
(products_new_cols_df.write
 .format("delta")
 .mode("overwrite")
 .option("mergeSchema", "true")
 .save(f"{silver_path}/products"))

# Or enable auto-merge globally/session:
spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

## 4) Time Travel, History, and Rollback

In [0]:
# Capture current version
hist_df = spark.sql("DESCRIBE HISTORY delta_lab.silver_products")
display(hist_df)

# Read a previous version (adjust version number after you inspect history)
prev_version = 0  # <-- replace with an actual older version from history
silver_products_v0 = (spark.read.format("delta")
                      .option("versionAsOf", prev_version)
                      .load(f"{silver_path}/products"))
display(silver_products_v0)


## 5) Build Gold tables

In [0]:
# Join silver tables for item-level sales
silver_sales = spark.read.format("delta").load(f"{silver_path}/sales")
silver_products = spark.read.format("delta").load(f"{silver_path}/products")
silver_customers = spark.read.format("delta").load(f"{silver_path}/customers")

gold_sales_item_df = (silver_sales.alias("s")
                      .join(silver_products.alias("p"), "Item", "left")
                      .select("s.*","p.ProductName","p.Category"))

(gold_sales_item_df.write.format("delta")
 .mode("overwrite")
 .save(f"{gold_path}/gold_sales_item"))

spark.sql(f"CREATE TABLE IF NOT EXISTS delta_lab.gold_sales_item USING DELTA LOCATION '{gold_path}/gold_sales_item'")

# Customer-aggregated gold (by day)
from pyspark.sql.functions import sum as _sum
gold_sales_customer_df = (silver_sales.groupBy("OrderDateKey","CustomerName","EmailAddress")
                          .agg(_sum("Amount").alias("TotalAmount"),
                               _sum("Quantity").alias("TotalQty")))

(gold_sales_customer_df.write.format("delta")
 .mode("overwrite")
 .save(f"{gold_path}/gold_sales_customer"))

spark.sql(f"CREATE TABLE IF NOT EXISTS delta_lab.gold_sales_customer USING DELTA LOCATION '{gold_path}/gold_sales_customer'")

## 6) Optimizations: OPTIMIZE, Z-ORDER, VACUUM, Auto Optimize

In [0]:
# OPTIMIZE + Z-ORDER (Databricks)
spark.sql("OPTIMIZE delta_lab.gold_sales_item ZORDER BY (Item, OrderDateKey)")

# Table-level Auto Optimize (Databricks)
spark.sql("ALTER TABLE delta_lab.gold_sales_item SET TBLPROPERTIES (   delta.autoOptimize.optimizeWrite = true,   delta.autoOptimize.autoCompact = true )")

# VACUUM with safe retention (e.g., 168 hours = 7 days)
spark.sql("VACUUM delta_lab.gold_sales_item RETAIN 168 HOURS")

## 7) Handling Late Arriving Data: MERGE and Streaming
We'll simulate late updates using MERGE and demonstrate watermarking in Structured Streaming.

In [0]:
# Simulate a late-arriving upsert set for silver_sales
late_updates = silver_sales.limit(5).withColumn("Quantity", col("Quantity") + 1)

# MERGE into Silver
spark.sql("CREATE TABLE IF NOT EXISTS delta_lab.silver_sales USING DELTA LOCATION '{loc}'".format(loc=f"{silver_path}/sales"))

late_updates.createOrReplaceTempView("updates")

spark.sql("""
MERGE INTO delta_lab.silver_sales AS tgt
USING updates AS src
ON  tgt.SalesOrderNumber = src.SalesOrderNumber
AND tgt.SalesOrderLineNumber = src.SalesOrderLineNumber
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *
""")

## 8) Validation queries

In [0]:
%sql
SELECT * FROM delta_lab.gold_sales_item ORDER BY OrderDateKey DESC, Item LIMIT 20;

SELECT * FROM delta_lab.gold_sales_customer ORDER BY OrderDateKey DESC, TotalAmount DESC LIMIT 20;

DESCRIBE HISTORY delta_lab.gold_sales_item;


In [0]:
%sql
select current_metastore()

current_metastore()
azure:centralus:c98cf59a-7f5b-4696-b5fc-7df83fc1e057
