In [0]:
# STEP 0 — Setup
# These variables store the paths for each stage of the pipeline

# Landing folders: raw files exactly as they arrive
LANDING_ORDERS    = "dbfs:/FileStore/tables/dlt/landing/orders"
LANDING_CUSTOMERS = "dbfs:/FileStore/tables/dlt/landing/customers"

# Silver folder: cleaned data in Delta format
DELTA_SILVER_PATH = "dbfs:/tmp/delta/sil_orders"

# SQL table name pointing to the silver Delta folder
DELTA_TABLE_NAME  = "sil_orders_tbl"

print("We will store data in:")
print(f"Landing Orders folder:    {LANDING_ORDERS}")
print(f"Landing Customers folder: {LANDING_CUSTOMERS}")
print(f"Silver Delta folder:      {DELTA_SILVER_PATH}")
print(f"SQL Table name:           {DELTA_TABLE_NAME}")

We will store data in:
Landing Orders folder:    dbfs:/FileStore/tables/dlt/landing/orders
Landing Customers folder: dbfs:/FileStore/tables/dlt/landing/customers
Silver Delta folder:      dbfs:/tmp/delta/sil_orders
SQL Table name:           sil_orders_tbl


In [0]:
from delta import *
import pyspark.sql.functions as F
from pyspark.sql import types as T

print("STEP 1: Seeding inline data to landing (JSON) ...")

orders_rows = [
    (1, "C001", "2025-08-08 09:00:00", 12000, "placed"),
    (2, "C002", "2025-08-08 09:05:00",  4500, "placed"),
    (3, "C001", "2025-08-08 09:10:00", 22000, "cancelled"),
    (4, "C003", "2025-08-08 09:15:00",   800, "placed")
]
customers_rows = [
    ("C001", "Ananya", "Bengaluru"),
    ("C002", "Rahul",  "Hyderabad"),
    ("C003", "Meera",  "Pune")
]

orders_schema = T.StructType([
    T.StructField("order_id",    T.IntegerType()),
    T.StructField("customer_id", T.StringType()),
    T.StructField("order_ts",    T.StringType()),
    T.StructField("amount",      T.IntegerType()),
    T.StructField("status",      T.StringType())
])
cust_schema = T.StructType([
    T.StructField("customer_id", T.StringType()),
    T.StructField("name",        T.StringType()),
    T.StructField("city",        T.StringType())
])

orders_df = (spark.createDataFrame(orders_rows, orders_schema)
             .withColumn("order_ts", F.to_timestamp("order_ts")))
customers_df = spark.createDataFrame(customers_rows, cust_schema)

orders_df.write.mode("overwrite").json(LANDING_ORDERS)
customers_df.write.mode("overwrite").json(LANDING_CUSTOMERS)

print("✅ Seeded landing JSON:")
print(f"  {LANDING_ORDERS}")
print(f"  {LANDING_CUSTOMERS}")


STEP 1: Seeding inline data to landing (JSON) ...
✅ Seeded landing JSON:
  dbfs:/FileStore/tables/dlt/landing/orders
  dbfs:/FileStore/tables/dlt/landing/customers


**BRONZE DATA**

In [0]:
print("STEP 2: BRONZE - Reading raw landing data")

bron_orders = spark.read.json(LANDING_ORDERS)
bron_customers = spark.read.json(LANDING_CUSTOMERS)

print("Bronze Orders - schema & sample: ")
bron_orders.printSchema()
bron_orders.show(truncate=False)

print("Bronze Customers - schema & sample: ")
bron_customers.printSchema()
bron_customers.show(truncate=False)

STEP 2: BRONZE - Reading raw landing data
Bronze Orders - schema & sample: 
root
 |-- amount: long (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- order_id: long (nullable = true)
 |-- order_ts: string (nullable = true)
 |-- status: string (nullable = true)

+------+-----------+--------+------------------------+---------+
|amount|customer_id|order_id|order_ts                |status   |
+------+-----------+--------+------------------------+---------+
|22000 |C001       |3       |2025-08-08T09:10:00.000Z|cancelled|
|12000 |C001       |1       |2025-08-08T09:00:00.000Z|placed   |
|4500  |C002       |2       |2025-08-08T09:05:00.000Z|placed   |
|800   |C003       |4       |2025-08-08T09:15:00.000Z|placed   |
+------+-----------+--------+------------------------+---------+

Bronze Customers - schema & sample: 
root
 |-- city: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- name: string (nullable = true)

+---------+-----------+------+
|city     |cu

**SILVER DATA**

In [0]:
print("Step 3: SILVER - Cleaning data & Writing to Delta")

sil_orders = (
    bron_orders
    .select("order_id", "customer_id", "order_ts", "amount", "status")
    .filter("order_id IS NOT NULL AND amount >= 0")
)

sil_orders.write.format("delta").mode("overwrite").save(DELTA_SILVER_PATH)

print("Wrote silver orders to delta path: ")
print(f" {DELTA_SILVER_PATH}")

print("Reading back from delta to verify: ")
spark.read.format("delta").load(DELTA_SILVER_PATH).show(truncate=False)

Step 3: SILVER - Cleaning data & Writing to Delta
Wrote silver orders to delta path: 
 dbfs:/tmp/delta/sil_orders
Reading back from delta to verify: 
+--------+-----------+------------------------+------+---------+
|order_id|customer_id|order_ts                |amount|status   |
+--------+-----------+------------------------+------+---------+
|3       |C001       |2025-08-08T09:10:00.000Z|22000 |cancelled|
|1       |C001       |2025-08-08T09:00:00.000Z|12000 |placed   |
|4       |C003       |2025-08-08T09:15:00.000Z|800   |placed   |
|2       |C002       |2025-08-08T09:05:00.000Z|4500  |placed   |
+--------+-----------+------------------------+------+---------+



In [0]:
print("STEP 5: GOLD - Enrich orders by joining with customers")

sil_orders_df = spark.read.format("delta").load(DELTA_SILVER_PATH)
gold_enriched = (
    sil_orders_df.alias("o")
    .join(bron_customers.alias("c"), on="customer_id", how="left")
)

print("Gold Enriched - sample:")
gold_enriched.show(truncate=False)


STEP 5: GOLD - Enrich orders by joining with customers
Gold Enriched - sample:
+-----------+--------+------------------------+------+---------+---------+------+
|customer_id|order_id|order_ts                |amount|status   |city     |name  |
+-----------+--------+------------------------+------+---------+---------+------+
|C001       |3       |2025-08-08T09:10:00.000Z|22000 |cancelled|Bengaluru|Ananya|
|C001       |1       |2025-08-08T09:00:00.000Z|12000 |placed   |Bengaluru|Ananya|
|C003       |4       |2025-08-08T09:15:00.000Z|800   |placed   |Pune     |Meera |
|C002       |2       |2025-08-08T09:05:00.000Z|4500  |placed   |Hyderabad|Rahul |
+-----------+--------+------------------------+------+---------+---------+------+



In [0]:
from delta.tables import DeltaTable
import pyspark.sql.functions as F
from pyspark.sql import Row

# Load the Delta table as a DeltaTable object
delta_table = DeltaTable.forPath(spark, DELTA_SILVER_PATH)

#UPDATE — change 'status' to 'shipped' for placed orders with amount > 5000

delta_table.update(
    condition="status = 'placed' AND amount > 5000",
    set={
        "status": F.lit("shipped")
    }
)

print("After UPDATE:")
delta_table.toDF().show()

# DELETE — remove cancelled orders

delta_table.delete("status = 'cancelled'")

print("After DELETE:")
delta_table.toDF().show()

# UPSERT (MERGE) — insert new orders or update existing ones

new_orders = [
    Row(order_id=2, customer_id="C002", order_ts="2025-08-08 10:00:00", amount=4800, status="shipped"),
    Row(order_id=5, customer_id="C004", order_ts="2025-08-08 10:05:00", amount=7000, status="placed")
]
new_df = spark.createDataFrame(new_orders) \
              .withColumn("order_ts", F.to_timestamp("order_ts"))

# Perform merge (upsert)
delta_table.alias("target").merge(
    new_df.alias("source"),
    "target.order_id = source.order_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

print("After UPSERT:")
delta_table.toDF().show()


After UPDATE:
+--------+-----------+--------------------+------+-------+
|order_id|customer_id|            order_ts|amount| status|
+--------+-----------+--------------------+------+-------+
|       1|       C001|2025-08-08T09:00:...| 12000|shipped|
|       4|       C003|2025-08-08T09:15:...|   800| placed|
|       2|       C002| 2025-08-08 10:00:00|  4800|shipped|
|       5|       C004| 2025-08-08 10:05:00|  7000|shipped|
+--------+-----------+--------------------+------+-------+

After DELETE:
+--------+-----------+--------------------+------+-------+
|order_id|customer_id|            order_ts|amount| status|
+--------+-----------+--------------------+------+-------+
|       1|       C001|2025-08-08T09:00:...| 12000|shipped|
|       4|       C003|2025-08-08T09:15:...|   800| placed|
|       2|       C002| 2025-08-08 10:00:00|  4800|shipped|
|       5|       C004| 2025-08-08 10:05:00|  7000|shipped|
+--------+-----------+--------------------+------+-------+

After UPSERT:
+--------+--