In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

spark.sql("""
create table if not exists workspace.default.etl_watermarks(
pipeline string,
entity string,
watermark_ts timestamp
)       
using delta   
"""
)

spark.sql("""
INSERT into workspace.default.etl_watermarks 
SELECT 'demo_pipeline', 'orders', TIMESTAMP('1900-01-01 00:00:00')
WHERE NOT EXISTS (
  SELECT 1 FROM workspace.default.etl_watermarks
  WHERE pipeline='demo_pipeline' AND entity='orders'
)
"""
)

display(spark.table("workspace.default.etl_watermarks"))


In [0]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window

wm = spark.sql("""
SELECT watermark_ts
FROM workspace.default.etl_watermarks
WHERE pipeline='demo_pipeline' AND entity='orders'
""").first()[0]

print("Current watermark:", wm)
new_orders_data = [
    (1, 107, 155.0, "2025-12-08"),  # update existing order
    (2, 110,  95.0, "2025-12-13"),  # new order
    (6, 111,  40.0, "2025-12-14")   # new customer order
]

incoming = (spark.createDataFrame(new_orders_data, ["CustomerId","OrderId","Amount","OrderDate"])
            .withColumn("OrderDate", F.to_date("OrderDate"))
            .withColumn("_ingested_at", F.current_timestamp())
            .withColumn("_source", F.lit("day9_batch"))
)
display(incoming)

incoming.write.format("delta").mode("overwrite").saveAsTable("workspace.default.bronze_orders")

bronze_orders= spark.table("workspace.default.bronze_orders")
bronze_new= bronze_orders.filter(F.col("_ingested_at") > wm)
display(bronze_new)

w = Window.partitionBy("CustomerId","OrderId").orderBy(F.col("_ingested_at").desc())
silver_incr = (bronze_new
  .withColumn("CustomerId", F.col("CustomerId").cast("long"))
  .withColumn("OrderId", F.col("OrderId").cast("long"))
  .withColumn("Amount", F.col("Amount").cast("double"))
  .withColumn("OrderDate", F.col("OrderDate").cast("date"))
  .withColumn("UpdatedAt", F.current_timestamp())
  .withColumn("rn", F.row_number().over(w))
  .filter(F.col("rn") == 1)
  .drop("rn")
)

display(silver_incr)
silver_incr.createOrReplaceTempView("src")

spark.sql("""
MERGE INTO workspace.default.silver_orders AS tgt
USING src
ON tgt.CustomerId = src.CustomerId AND tgt.OrderId = src.OrderId
WHEN MATCHED THEN UPDATE SET
  tgt.Amount = src.Amount,
  tgt.OrderDate = src.OrderDate,
  tgt._ingested_at = src._ingested_at,
  tgt._source = src._source
WHEN NOT MATCHED THEN INSERT *
""")

display(spark.table("workspace.default.silver_orders").orderBy("CustomerId","OrderDate","OrderId"))

new_wm = bronze_new.agg(F.max("_ingested_at").alias("max_ts")).first()["max_ts"]

spark.sql(f"""
UPDATE workspace.default.etl_watermarks
SET watermark_ts = TIMESTAMP('{new_wm}')
WHERE pipeline='demo_pipeline' AND entity='orders'
""")

print("Updated watermark to:", new_wm)

