In [0]:
# ===========================================================
# 03_gold_analytics.py
#
# Gold Layer - Aggregated KPIs for Production Monitoring
# -----------------------------------------------------------
#  - Adds watermarking for late IoT events
#  - Computes per-line & per-machine aggregates
#  - Derives metrics for temperature stability, vibration control, and defect trends
# ===========================================================

import dlt
from pyspark.sql.functions import (
    col, window, avg, count, sum as spark_sum, stddev, max as spark_max, min as spark_min, round as spark_round,
    lit, to_timestamp
)

# ===========================================================
# 1️⃣ FEEDER KPIs (Material Input Rate)
# -----------------------------------------------------------
@dlt.table(
    name="03_gold.feeder_kpi",
    comment="Aggregated metrics for Feeders — rate of blanks fed per minute"
)
def feeder_kpi():
    df = dlt.read_stream("02_silver.feeder_enriched")

    return (
        df.withWatermark("event_time", "2 minutes")
          .groupBy(
              "line_id",
              window("event_time", "1 minute")
          )
          .agg(
              spark_sum("feed_count").alias("total_blanks_fed"),
              avg("feed_count").alias("avg_blanks_per_interval"),
              count("*").alias("record_count")
          )
          .select(
              col("line_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              spark_round("total_blanks_fed", 2).alias("total_blanks_fed"),
              spark_round("avg_blanks_per_interval", 2).alias("avg_blanks_per_interval"),
              col("record_count")
          )
    )

# ===========================================================
# 2️⃣ DRILL CUTTER KPIs (Temperature & Vibration Stability)
# -----------------------------------------------------------
@dlt.table(
    name="03_gold.drillcutter_kpi",
    comment="Aggregated KPIs for DrillCutters — average temperature & vibration per minute"
)
def drillcutter_kpi():
    df = dlt.read_stream("02_silver.drillcutter_enriched")

    return (
        df.withWatermark("event_time", "2 minutes")
          .groupBy(
              "line_id",
              window("event_time", "1 minute")
          )
          .agg(
              avg("temperature_c").alias("avg_temp"),
              stddev("temperature_c").alias("temp_stddev"),
              avg("vibration_mms").alias("avg_vibration"),
              stddev("vibration_mms").alias("vibration_stddev"),
              count("*").alias("record_count")
          )
          .select(
              col("line_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              spark_round("avg_temp", 2).alias("avg_temp_c"),
              spark_round("temp_stddev", 4).alias("temp_variability"),
              spark_round("avg_vibration", 4).alias("avg_vibration_mms"),
              spark_round("vibration_stddev", 4).alias("vibration_variability"),
              col("record_count")
          )
    )

# ===========================================================
# 3️⃣ POLISHER KPIs (Surface Quality Stability)
# -----------------------------------------------------------
@dlt.table(
    name="03_gold.polisher_kpi",
    comment="Aggregated KPIs for Polishers — average temperature & vibration per minute"
)
def polisher_kpi():
    df = dlt.read_stream("02_silver.polisher_enriched")

    return (
        df.withWatermark("event_time", "2 minutes")
          .groupBy(
              "line_id",
              window("event_time", "1 minute")
          )
          .agg(
              avg("temperature_c").alias("avg_temp"),
              stddev("temperature_c").alias("temp_stddev"),
              avg("vibration_mms").alias("avg_vibration"),
              stddev("vibration_mms").alias("vibration_stddev"),
              count("*").alias("record_count")
          )
          .select(
              col("line_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              spark_round("avg_temp", 2).alias("avg_temp_c"),
              spark_round("temp_stddev", 4).alias("temp_variability"),
              spark_round("avg_vibration", 4).alias("avg_vibration_mms"),
              spark_round("vibration_stddev", 4).alias("vibration_variability"),
              col("record_count")
          )
    )

# ===========================================================
# 4️⃣ INSPECTOR KPIs (Defect Rate Trends)
# -----------------------------------------------------------
@dlt.table(
    name="03_gold.inspector_kpi",
    comment="Aggregated KPIs for Inspectors — production volume and defect rate"
)
def inspector_kpi():
    df = dlt.read_stream("02_silver.inspector_enriched")

    return (
        df.withWatermark("event_time", "2 minutes")
          .groupBy(
              "line_id",
              window("event_time", "1 minute")
          )
          .agg(
              spark_sum("produced_count").alias("produced_total"),
              spark_sum("defective_count").alias("defective_total"),
              avg("defect_rate").alias("avg_defect_rate"),
              spark_max("defect_rate").alias("max_defect_rate"),
              spark_min("defect_rate").alias("min_defect_rate")
          )
          .select(
              col("line_id"),
              col("window.start").alias("window_start"),
              col("window.end").alias("window_end"),
              col("produced_total"),
              col("defective_total"),
              spark_round("avg_defect_rate", 4).alias("avg_defect_rate"),
              spark_round("max_defect_rate", 4).alias("max_defect_rate"),
              spark_round("min_defect_rate", 4).alias("min_defect_rate")
          )
    )

# ===========================================================
# 5️⃣ LINE HEALTH SUMMARY
# -----------------------------------------------------------
# Summarizes total production, total defects, and overall defect rate
# since a given start time (e.g. plant start or batch start)
# -----------------------------------------------------------
@dlt.table(
    name="03_gold.line_health_summary",
    comment="Cumulative production and quality metrics per assembly line since plant start"
)
def line_health_summary():
    insp = dlt.read("02_silver.inspector_enriched")  # use silver because we want all data, not windowed
    PLANT_START = "2025-11-04T00:00:00Z"
    return (
        insp.filter(col("event_time") >= to_timestamp(lit(PLANT_START)))
            .groupBy("line_id")
            .agg(
                spark_sum("produced_count").alias("total_produced"),
                spark_sum("defective_count").alias("total_defective")
            )
            .withColumn("defect_rate", spark_round(col("total_defective") / col("total_produced"), 4))
            .select(
                "line_id",
                "total_produced",
                "total_defective",
                "defect_rate"
            )
    )