In [0]:
import dlt
from pyspark.sql.functions import col, avg, max, window, round

In [0]:
@dlt.table(name="03_gold.bridge_metrics", comment="10-min avg temperature, max vibration, max titl per bridge with window start/end")
def gold_bridge_metrics():
    # Apply a 2-min watermark to bound late data for stateful ops
    # Waits for 2-mins after window end - just in case a file within the window is late
    temp = (
        dlt.readStream("02_silver.bridge_temperature")
            .withWatermark("event_time", "2 minutes")
    )
    vibration = (
        dlt.readStream("02_silver.bridge_vibration")
            .withWatermark("event_time", "2 minutes")
    )
    tilt = (
        dlt.readStream("02_silver.bridge_tilt")
            .withWatermark("event_time", "2 minutes")
    )

    # Compute 10-minute tumbling average temperature, retaining metadata
    temp_agg = (
        temp.groupby(
            window(col("event_time"), "10 minutes"),
            col("bridge_id")
        )
        .agg(
            round(avg(col("temperature")), 2).alias("avg_temperature")
        )
        .select(
            col("bridge_id"),
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("avg_temperature")
        )
    )

    # Compute 10-minute tumbling max vibration
    vibration_agg = (
        vibration.groupby(
            window(col("event_time"), "10 minutes"),
            col("bridge_id")
        )
        .agg(
            max(col("vibration")).alias("max_vibration")
        )
        .select(
            col("bridge_id"),
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("max_vibration")
        )
    )

    # Compute 10-minute tumbling max tilt
    tilt_agg = (
        tilt.groupby(
            window(col("event_time"), "10 minutes"),
            col("bridge_id")
        )
        .agg(
            max(col("tilt_angle")).alias("max_tilt_angle")
        )
        .select(
            col("bridge_id"),
            col("window.start").alias("window_start"),
            col("window.end").alias("window_end"),
            col("max_tilt_angle")
        )
    )

    return (
        temp_agg.alias("t")
            .join(
                vibration_agg.alias("v"),
                on=["bridge_id", "window_start", "window_end"],
                how="inner"
            )
            .join(
                tilt_agg.alias("ti"),
                on=["bridge_id", "window_start", "window_end"],
                how="inner"
            )
            .join(
                dlt.read("02_silver.bridge_metadata").alias("m"),
                on="bridge_id",
                how="left"
            )
            .select(
                col("bridge_id"),
                col("m.name"),
                col("m.location"),
                col("window_start"),
                col("window_end"),
                col("avg_temperature"),
                col("max_vibration"),
                col("max_tilt_angle")
            )
    )