In [None]:
# ========= Parameters =========
lookback_hours = 3                 # process rows with first_SinkCreatedOn < now - lookback_hours
grouping_time_zone = "UTC"         # keep UTC or change
source_table = "cdf_staleness_table"
target_table = "cdf_staleness_table_summarized"
# ==============================

StatementMeta(, 5303a7df-58c4-47b4-878a-34385b0f8404, 5, Finished, Available, Finished)

In [None]:
from pyspark.sql import functions as F
from pyspark.sql import types as T


# ------------ Safety checks ------------
if not spark.catalog.tableExists(source_table):
    raise RuntimeError(f"Source table '{source_table}' does not exist in the current Lakehouse.")

if lookback_hours < 1:
    raise RuntimeError(f"Lookback hours paramenter must be greather than 1. Current value: {lookback_hours}")

# Ensure target exists with desired schema (keys + measures). If not, create empty.
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {target_table} (
    TableName STRING,
    HourOfDay INT,
    Date STRING,
    StalenessAvg DOUBLE,
    StalenessMin DOUBLE,
    StalenessMax DOUBLE,
    Staleness05_00to05min BIGINT,
    Staleness10_05to10min BIGINT,
    Staleness15_10to15min BIGINT,
    Staleness20_15to20min BIGINT,
    Staleness25_20to25min BIGINT,
    Staleness30_25to30min BIGINT,
    Staleness60_30to60min BIGINT,
    StanelessO6_Over60 BIGINT
    ) USING delta
""")

# ------------ Filter source by lookback ------------
lookback_cutoff_expr = F.expr(f"current_timestamp() - INTERVAL {lookback_hours} HOURS")
src = (
    spark.table(source_table)
    .filter(F.col("first_SinkCreatedOn") < lookback_cutoff_expr)
    .withColumn("first_SinkCreatedOn", F.col("first_SinkCreatedOn").cast("timestamp"))
)

# If nothing to process, exit early
if src.rdd.isEmpty():
    print(f"Nothing to process: no rows with first_SinkCreatedOn older than {lookback_hours} hour(s).")
else:
    # ------------ Parse Version_Staleness: 'd.hh:mm:ss' -> total minutes (double) ------------
    # Robust to either 'd.hh:mm:ss' or 'hh:mm:ss' (treat missing days as 0)
    vs_str = F.col("Version_Staleness").cast("string")
    dot_split = F.split(vs_str, "\\.")                           # [days, hh:mm:ss] OR [hh:mm:ss]
    has_day = (F.size(dot_split) == F.lit(2))
    days = F.when(vs_str.isNull(), F.lit(None).cast("int")) \
             .otherwise(F.when(has_day, dot_split.getItem(0).cast("int")).otherwise(F.lit(0)))

    time_part = F.when(has_day, dot_split.getItem(1)).otherwise(dot_split.getItem(0))
    t_split = F.split(time_part, ":")                             # [hh, mm, ss]

    hours = F.when(vs_str.isNull(), F.lit(None).cast("int")).otherwise(t_split.getItem(0).cast("int"))
    minutes = F.when(vs_str.isNull(), F.lit(None).cast("int")).otherwise(t_split.getItem(1).cast("int"))
    seconds = F.when(vs_str.isNull(), F.lit(None).cast("int")).otherwise(t_split.getItem(2).cast("int"))

    total_minutes = (
        days.cast("double") * 1440.0 +                           # days -> minutes
        hours.cast("double") * 60.0 +
        minutes.cast("double") +
        seconds.cast("double") / 60.0
    )

    src = src.withColumn("Version_Staleness_Minutes", total_minutes)

    # ------------ Derive grouping keys (UTC) ------------
    # Convert from UTC-to-UTC (no-op) but consistent; use this column for Date/HourOfDay
    ts_group = F.from_utc_timestamp(F.col("first_SinkCreatedOn"), grouping_time_zone)

    df = (
        src
        .withColumn("TableName", F.col("Table_Name"))
        .withColumn("ts_group", ts_group)
        .withColumn("HourOfDay", F.hour("ts_group"))
        .withColumn("Date", F.date_format(F.to_date("ts_group"), "yyyy-MM-dd"))
    )

    st_mins = F.col("Version_Staleness_Minutes").cast("double")

    # ------------ Bucket counts ------------
    c_00_05 = F.sum(F.when(st_mins <= 5, 1).otherwise(0)).cast("bigint")
    c_05_10 = F.sum(F.when((st_mins > 5) & (st_mins <= 10), 1).otherwise(0)).cast("bigint")
    c_10_15 = F.sum(F.when((st_mins > 10) & (st_mins <= 15), 1).otherwise(0)).cast("bigint")
    c_15_20 = F.sum(F.when((st_mins > 15) & (st_mins <= 20), 1).otherwise(0)).cast("bigint")
    c_20_25 = F.sum(F.when((st_mins > 20) & (st_mins <= 25), 1).otherwise(0)).cast("bigint")
    c_25_30 = F.sum(F.when((st_mins > 25) & (st_mins <= 30), 1).otherwise(0)).cast("bigint")
    c_30_60 = F.sum(F.when((st_mins > 30) & (st_mins <= 60), 1).otherwise(0)).cast("bigint")
    c_o60 = F.sum(F.when((st_mins > 60), 1).otherwise(0)).cast("bigint")

    agg = (
        df.groupBy("TableName", "HourOfDay", "Date")
          .agg(
              F.avg(st_mins).alias("StalenessAvg"),
              F.min(st_mins).alias("StalenessMin"),
              F.max(st_mins).alias("StalenessMax"),
              c_00_05.alias("Staleness05_00to05min"),
              c_05_10.alias("Staleness10_05to10min"),
              c_10_15.alias("Staleness15_10to15min"),
              c_15_20.alias("Staleness20_15to20min"),
              c_20_25.alias("Staleness25_20to25min"),
              c_25_30.alias("Staleness30_25to30min"),
              c_30_60.alias("Staleness60_30to60min"),
              c_o60.alias("StanelessO6_Over60"),
          )
    )

    # ------------ Upsert into target (override by composite key) ------------
    agg.createOrReplaceTempView("agg_staleness_summary_tmp")

    spark.sql(f"""
    MERGE INTO {target_table} AS tgt
    USING agg_staleness_summary_tmp AS src
    ON  tgt.TableName = src.TableName
    AND tgt.HourOfDay = src.HourOfDay
    AND tgt.Date      = src.Date
    WHEN MATCHED THEN UPDATE SET
      tgt.StalenessAvg            = src.StalenessAvg,
      tgt.StalenessMin            = src.StalenessMin,
      tgt.StalenessMax            = src.StalenessMax,
      tgt.Staleness05_00to05min    = src.Staleness05_00to05min,
      tgt.Staleness10_05to10min   = src.Staleness10_05to10min,
      tgt.Staleness15_10to15min   = src.Staleness15_10to15min,
      tgt.Staleness20_15to20min   = src.Staleness20_15to20min,
      tgt.Staleness25_20to25min   = src.Staleness25_20to25min,
      tgt.Staleness30_25to30min   = src.Staleness30_25to30min,
      tgt.Staleness60_30to60min   = src.Staleness60_30to60min,
      tgt.StanelessO6_Over60   = src.StanelessO6_Over60
    WHEN NOT MATCHED THEN INSERT (
      TableName, HourOfDay, Date,
      StalenessAvg, StalenessMin, StalenessMax,
      Staleness05_00to05min, Staleness10_05to10min, Staleness15_10to15min,
      Staleness20_15to20min, Staleness25_20to25min, Staleness30_25to30min, 
      Staleness60_30to60min, StanelessO6_Over60
    ) VALUES (
      src.TableName, src.HourOfDay, src.Date,
      src.StalenessAvg, src.StalenessMin, src.StalenessMax,
      src.Staleness05_00to05min, src.Staleness10_05to10min, src.Staleness15_10to15min,
      src.Staleness20_15to20min, src.Staleness25_20to25min, src.Staleness30_25to30min, 
      src.Staleness60_30to60min, src.StanelessO6_Over60
    )
    """)

    # ------------ Remove processed source rows ------------
    spark.sql(f"""
    DELETE FROM {source_table}
    WHERE first_SinkCreatedOn < (current_timestamp() - INTERVAL {lookback_hours} HOURS)
    """)

    print(f"Summarization complete (UTC). Processed & removed rows older than {lookback_hours} hour(s).")



StatementMeta(, 5303a7df-58c4-47b4-878a-34385b0f8404, 6, Finished, Available, Finished)

Summarization complete (UTC). Processed & removed rows older than 3 hour(s).
