### Parámetros y referencias

In [0]:
# [03][1] Parámetros de métricas y referencias a tablas
dbutils.widgets.text("CATALOGO",      "workspace")
dbutils.widgets.text("ESQUEMA_PLATA", "silver_mb")
dbutils.widgets.text("ESQUEMA_ORO",   "gold_mb")
dbutils.widgets.text("PREFIJO_TABLA", "mb_")
dbutils.widgets.dropdown("MODO_TRIGGER","once", ["once","availableNow"])

CATALOGO     = dbutils.widgets.get("CATALOGO")
ESQ_PLATA    = dbutils.widgets.get("ESQUEMA_PLATA")
ESQ_ORO      = dbutils.widgets.get("ESQUEMA_ORO")
PREFIJO      = dbutils.widgets.get("PREFIJO_TABLA")
MODO_TRIGGER = dbutils.widgets.get("MODO_TRIGGER")

T_PLATA   = f"{CATALOGO}.{ESQ_PLATA}.{PREFIJO}events_silver"
T_LOG     = f"{CATALOGO}.{ESQ_ORO}.{PREFIJO}ingestion_log"
T_RUNNING = f"{CATALOGO}.{ESQ_ORO}.{PREFIJO}running_stats"
CHK_ORO   = "/tmp/_chk_oro"   # en productivo: apuntaría a storage gobernado

display({"T_PLATA": T_PLATA, "T_LOG": T_LOG, "T_RUNNING": T_RUNNING, "CHK_ORO": CHK_ORO})


### Función foreachBatch (agregación por micro-batch)

In [0]:
# [03][2] Lógica foreachBatch:
# Calcula métricas SOLO del micro-batch y actualiza:
# - T_LOG (auditoría por archivo)
# - T_RUNNING (acumulado en una sola fila id=1)

from pyspark.sql.functions import count, sum as ssum, min as smin, max as smax, lit
from delta.tables import DeltaTable

def actualizar_metricas(batch_df, batch_id):
    if batch_df.isEmpty():
        return

    m = (batch_df.agg(
            count(lit(1)).alias("rows"),
            ssum("price").alias("sum_price"),
            smin("price").alias("min_price"),
            smax("price").alias("max_price"),
            smin("ts").alias("min_ts"),
            smax("ts").alias("max_ts")
         ).collect()[0])

    filas      = int(m["rows"])
    suma_p     = float(m["sum_price"])
    min_p      = float(m["min_price"])
    max_p      = float(m["max_price"])
    min_ts     = m["min_ts"]
    max_ts     = m["max_ts"]

    # Trazabilidad del micro-batch (intento tomar el _batch_id que viene de Plata)
    if "_batch_id" in batch_df.columns:
        bid = batch_df.select("_batch_id").limit(1).collect()[0]["_batch_id"]
    else:
        bid = f"batch_{batch_id}"

    # 1) Inserto auditoría en T_LOG
    (spark.createDataFrame(
        [(bid, filas, suma_p, min_p, max_p, min_ts, max_ts)],
        "batch_id STRING, rows_loaded BIGINT, sum_price DOUBLE, min_price DOUBLE, max_price DOUBLE, min_ts TIMESTAMP, max_ts TIMESTAMP"
     ).withColumn("processed_at", lit(None).cast("timestamp"))
     .write.format("delta").mode("append").saveAsTable(T_LOG))

    # 2) Actualizo acumulado en T_RUNNING mediante MERGE (sin recalcular histórico)
    rt = DeltaTable.forName(spark, T_RUNNING)
    (rt.alias("t").merge(
        spark.createDataFrame(
            [(1, filas, suma_p, min_p, max_p, min_ts, max_ts)],
            "id INT, d_rows BIGINT, d_sum DOUBLE, d_min DOUBLE, d_max DOUBLE, d_min_ts TIMESTAMP, d_max_ts TIMESTAMP"
        ).alias("d"),
        "t.id = d.id"
    ).whenMatchedUpdate(set={
        "row_count":  "t.row_count + d.d_rows",
        "sum_price":  "t.sum_price + d.d_sum",
        "min_price":  "CASE WHEN t.min_price IS NULL OR d.d_min < t.min_price THEN d.d_min ELSE t.min_price END",
        "max_price":  "CASE WHEN t.max_price IS NULL OR d.d_max > t.max_price THEN d.d_max ELSE t.max_price END",
        "min_ts":     "CASE WHEN t.min_ts IS NULL OR d.d_min_ts < t.min_ts THEN d.d_min_ts ELSE t.min_ts END",
        "max_ts":     "CASE WHEN t.max_ts IS NULL OR d.d_max_ts > t.max_ts THEN d.d_max_ts ELSE t.max_ts END",
        "updated_at": "current_timestamp()
    }).whenNotMatchedInsert(values={
        "id":         "d.id",
        "row_count":  "d.d_rows",
        "sum_price":  "d.d_sum",
        "min_price":  "d.d_min",
        "max_price":  "d.d_max",
        "min_ts":     "d.d_min_ts",
        "max_ts":     "d.d_max_ts",
        "updated_at": "current_timestamp()
    }).execute())


### Lanzar foreachBatch con trigger CE-safe

In [0]:
# [03][3] Ejecución CE-safe:
# Uso trigger once/availableNow para procesar lo pendiente y terminar (Community-friendly).

stream = (spark.readStream.table(T_PLATA)
          .writeStream
          .foreachBatch(actualizar_metricas)
          .option("checkpointLocation", CHK_ORO)
          .outputMode("update"))

if MODO_TRIGGER == "availableNow":
    q = stream.trigger(availableNow=True).start()
else:
    q = stream.trigger(once=True).start()

q.awaitTermination()
print("[INFO] Métricas acumuladas actualizadas (sin reescaneo de histórico).")
