In [0]:
# 03_gold_processing: Windowed production line metrics + machine health KPI
# DLT reads from Silver tables using dlt.read_stream() - dependencies handled automatically

import dlt
from pyspark.sql.functions import col, window, avg, max, round, when, least

@dlt.table(
    name="gold_machine_metrics",
    comment="10-minute windowed production line metrics and machine health KPI",
    table_properties={"quality": "gold"}
)
def gold_machine_metrics():
    # Read Silver streams with watermarking
    
    temperature = (
        dlt.read_stream("silver_temperature")  # ✅ DLT tietää että silver_temperature pitää olla ensin
           .withWatermark("event_time", "2 minutes")
    )

    vibration = (
        dlt.read_stream("silver_vibration")
           .withWatermark("event_time", "2 minutes")
    )

    power = (
        dlt.read_stream("silver_power")
           .withWatermark("event_time", "2 minutes")
    )

    pressure = (
        dlt.read_stream("silver_pressure")
           .withWatermark("event_time", "2 minutes")
    )

    flow = (
        dlt.read_stream("silver_flow_rate")
           .withWatermark("event_time", "2 minutes")
    )

    rpm = (
        dlt.read_stream("silver_rpm")
           .withWatermark("event_time", "2 minutes")
    )

    speed = (
        dlt.read_stream("silver_speed")
           .withWatermark("event_time", "2 minutes")
    )

    current = (
        dlt.read_stream("silver_current")
           .withWatermark("event_time", "2 minutes")
    )

    # 10-minute aggregations per sensor
    temp_agg = (
        temperature
          .groupBy(
              window("event_time", "10 minutes"),
              col("machine_id"),
              col("machine_name")
          )
          .agg(avg("temperature").alias("avg_temperature"))
          .select(
              col("machine_id"),
              col("machine_name"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              col("avg_temperature")
          )
    )

    vib_agg = (
        vibration
          .groupBy(window("event_time", "10 minutes"), col("machine_id"))
          .agg(max("vibration").alias("max_vibration"))
          .select(
              col("machine_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              col("max_vibration")
          )
    )

    power_agg = (
        power
          .groupBy(window("event_time", "10 minutes"), col("machine_id"))
          .agg(avg("power_kw").alias("avg_power"))  # ✅ power_kw Silver-tasolta
          .select(
              col("machine_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              col("avg_power")
          )
    )

    pressure_agg = (
        pressure
          .groupBy(window("event_time", "10 minutes"), col("machine_id"))
          .agg(max("pressure_bar").alias("max_pressure"))  # ✅ pressure_bar Silver-tasolta
          .select(
              col("machine_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              col("max_pressure")
          )
    )

    flow_agg = (
        flow
          .groupBy(window("event_time", "10 minutes"), col("machine_id"))
          .agg(avg("flow_rate_l_min").alias("avg_flow_rate"))  # ✅ flow_rate_l_min Silver-tasolta
          .select(
              col("machine_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              col("avg_flow_rate")
          )
    )

    rpm_agg = (
        rpm
          .groupBy(window("event_time", "10 minutes"), col("machine_id"))
          .agg(max("rpm").alias("max_rpm"))
          .select(
              col("machine_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              col("max_rpm")
          )
    )

    speed_agg = (
        speed
          .groupBy(window("event_time", "10 minutes"), col("machine_id"))
          .agg(avg("speed_m_s").alias("avg_speed"))  # ✅ speed_m_s Silver-tasolta
          .select(
              col("machine_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),  # ✅ Korjattu
              col("avg_speed")  # ✅ Lisätty
          )
    )

    current_agg = (
        current
          .groupBy(window("event_time", "10 minutes"), col("machine_id"))
          .agg(avg("current_a").alias("avg_current"))  # ✅ current_a Silver-tasolta
          .select(
              col("machine_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              col("avg_current")
          )
    )

    # Join all aggregates
    metrics = (
        temp_agg.alias("t")
          .join(vib_agg.alias("v"), ["machine_id", "window_start", "window_end"], "left")
          .join(power_agg.alias("p"), ["machine_id", "window_start", "window_end"], "left")
          .join(pressure_agg.alias("pr"), ["machine_id", "window_start", "window_end"], "left")
          .join(flow_agg.alias("f"), ["machine_id", "window_start", "window_end"], "left")
          .join(rpm_agg.alias("r"), ["machine_id", "window_start", "window_end"], "left")
          .join(speed_agg.alias("s"), ["machine_id", "window_start", "window_end"], "left")
          .join(current_agg.alias("c"), ["machine_id", "window_start", "window_end"], "left")
    )

    # Machine Health KPI (normalized + weighted)
    return (
        metrics
          .withColumn("temperature_score", least(1.0, col("avg_temperature") / 90))
          .withColumn("vibration_score", least(1.0, col("max_vibration") / 0.08))
          .withColumn("load_score", least(1.0, col("avg_power") / 50))
          .withColumn(
              "machine_health_score",
              round(
                  100 * (
                      0.3 * (1 - col("temperature_score")) +
                      0.4 * (1 - col("vibration_score")) +
                      0.3 * (1 - col("load_score"))
                  ), 2
              )
          )
          .withColumn(
              "machine_health_status",
              when(col("machine_health_score") >= 80, "HEALTHY")
              .when(col("machine_health_score") >= 50, "WARNING")
              .otherwise("CRITICAL")
          )
          .select(
              col("machine_id"),
              col("machine_name"),
              col("window_start"),
              col("window_end"),
              round(col("avg_temperature"), 2).alias("avg_temperature"),
              col("max_vibration"),
              round(col("avg_power"), 2).alias("avg_power"),
              col("max_pressure"),
              round(col("avg_flow_rate"), 2).alias("avg_flow_rate"),
              col("max_rpm"),
              round(col("avg_speed"), 2).alias("avg_speed"),
              round(col("avg_current"), 2).alias("avg_current"),
              col("machine_health_score"),
              col("machine_health_status")
          )
    )