### This is a toy example of how a medallion architecture works

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS demo")
spark.sql("USE demo")

# Clean up if rerunning the notebook (safe to skip on first run)
for t in ["orders_bronze","orders_silver","orders_gold","v_orders_silver"]:
    spark.sql(f"DROP VIEW IF EXISTS demo.{t}") if t.startswith("v_") else spark.sql(f"DROP TABLE IF EXISTS demo.{t}")


In [0]:
from pyspark.sql.functions import current_timestamp

raw = [
    # order_id, ts (ISO string), customer_id, item, qty, price, status
    (1001, "2025-01-05T10:15:00",  7, "widget", 1, 19.99, "NEW"),
    (1002, "2025-01-05T11:20:00", 11, "gizmo",  2,  9.50, "NEW"),
    (1003, "2025-01-05T12:05:00",  7, "widget", 1, 19.99, "NEW"),
    (1002, "2025-01-05T11:25:00", 11, "gizmo",  2,  9.50, "NEW"),  # duplicate order_id (late duplicate)
    (1004, "2025-01-06T09:10:00",  8, "doodad", 3,  4.25, "CANCELLED")
]
df_raw = spark.createDataFrame(raw, ["order_id","ts","customer_id","item","qty","price","status"]) \
              .withColumn("_ingest_ts", current_timestamp())
display(df_raw)


**Bronze**

In [0]:
# Persist as a Delta table (raw structure preserved)
df_raw.write.mode("overwrite").format("delta").saveAsTable("demo.orders_bronze")

# Peek
display(spark.table("demo.orders_bronze"))


**Silver**

In [0]:
# Use SQL for readability in class
spark.sql("""
CREATE OR REPLACE TABLE demo.orders_silver AS
WITH typed AS (
  SELECT
    CAST(order_id AS INT)        AS order_id,
    TO_TIMESTAMP(ts)             AS ts,
    CAST(customer_id AS INT)     AS customer_id,
    CAST(item AS STRING)         AS item,
    CAST(qty AS INT)             AS qty,
    CAST(price AS DOUBLE)        AS price,
    UPPER(status)                AS status,
    _ingest_ts
  FROM demo.orders_bronze
),
dedup AS (
  SELECT *
  FROM (
    SELECT
      t.*,
      ROW_NUMBER() OVER (PARTITION BY order_id ORDER BY ts DESC, _ingest_ts DESC) AS rn
    FROM typed t
  )
  WHERE rn = 1    -- keep latest record per order_id
)
SELECT order_id, ts, customer_id, item, qty, price, status
FROM dedup
""")

display(spark.table("demo.orders_silver").orderBy("ts"))


****Gold stage****

In [0]:
# Temp view (session-scoped; disappears when cluster detaches/restarts)
spark.table("demo.orders_silver").createOrReplaceTempView("v_orders_tmp")

# Permanent view (stored in metastore)
spark.sql("""
CREATE OR REPLACE VIEW demo.v_orders_silver AS
SELECT order_id, ts, customer_id, item, qty, price
FROM demo.orders_silver
""")

print("Temp views you can query in *this* notebook session: v_orders_tmp")


In [0]:
%sql
-- Query permanent view (metastore object)
SELECT * FROM demo.v_orders_silver ORDER BY ts;
