In [0]:
def main():
    """
    Flujo optimizado extremo para cálculo de persistencia sin crear DataFrame intermedio
    """
    try:
        # --- 1. Validaciones pendientes ---
        df_validations_to_process = (
            spark.table(VALIDATIONS_TABLE)
            .filter((col("status").isin("PASSED", "FAILED")) & col("persistent_failures").isNull())
            .select("execution_id", "validation_id", "rule_id")
            .distinct()
        )

        df_executions_to_process = df_validations_to_process.select("execution_id").distinct()
        if df_executions_to_process.isEmpty():
            print("No hay nuevas ejecuciones para procesar.")
            return "Éxito: No hay nuevas ejecuciones para procesar."
        print(f"Ejecuciones a procesar: {df_executions_to_process.count()}")

        # --- 2. Pares N vs N-1 ---
        window_prev_exec = Window.partitionBy("table_id").orderBy("execution_timestamp")
        df_execution_pairs = (
            spark.table(EXECUTION_TABLE)
            .filter(col("status") == "SUCCESS")
            .withColumn("prev_execution_id", lag("execution_id").over(window_prev_exec))
            .join(df_executions_to_process, "execution_id")
            .filter(col("prev_execution_id").isNotNull())
            .select(
                col("execution_id").alias("current_execution_id"),
                "prev_execution_id",
                "table_id"
            )
        )

        if df_execution_pairs.isEmpty():
            print("No hay pares N vs N-1 para procesar.")
            df_evidences = spark.createDataFrame([], schema=spark.table(EVIDENCES_TABLE).schema)
        else:
            # --- 3. Evidencias relevantes ---
            relevant_executions = df_execution_pairs.select("current_execution_id").union(
                df_execution_pairs.select("prev_execution_id")
            ).distinct()
            df_evidences = (
                spark.table(EVIDENCES_TABLE)
                .join(relevant_executions,
                      (col("execution_id") == col("current_execution_id")) |
                      (col("execution_id") == col("prev_execution_id")),
                      "inner"
                )
                .select("execution_id", "validation_id", "table_pk")
                .distinct()
            )

        # --- 4. Preparar DeltaTable ---
        delta_validations_table = DeltaTable.forName(spark, VALIDATIONS_TABLE)

        # --- 5. MERGE directo con cálculo de métricas usando window ---
        if len(df_evidences.take(1)) > 0:
            window_spec = Window.partitionBy("validation_id", "table_pk").orderBy("execution_id")
            df_for_merge = df_evidences.withColumn(
                "prev_execution_id_lag", lag("execution_id").over(window_spec)
            ).withColumn(
                "new_failures", when(col("execution_id") != col("prev_execution_id_lag"), 1).otherwise(0)
            ).withColumn(
                "persistent_failures", when(col("execution_id") == col("prev_execution_id_lag"), 1).otherwise(0)
            ).withColumn(
                "resolved_failures", when(col("prev_execution_id_lag").isNotNull() & (col("execution_id") != col("prev_execution_id_lag")), 1).otherwise(0)
            ).groupBy("execution_id", "validation_id").agg(
                sum("new_failures").alias("new_failures"),
                sum("persistent_failures").alias("persistent_failures"),
                sum("resolved_failures").alias("resolved_failures")
            ).fillna(0)

            df_for_merge = df_for_merge.repartition("execution_id", "validation_id")
            delta_validations_table.alias("target").merge(
                df_for_merge.alias("source"),
                (col("target.execution_id") == col("source.execution_id")) &
                (col("target.validation_id") == col("source.validation_id"))
            ).whenMatchedUpdateAll().execute()
            print("Métricas de persistencia actualizadas en un solo paso.")

        # --- 6. Rellenar nulos restantes ---
        delta_validations_table.update(
            condition=(
                col("status").isin("PASSED", "FAILED") &
                (col("persistent_failures").isNull() |
                 col("new_failures").isNull() |
                 col("resolved_failures").isNull())
            ),
            set={"persistent_failures": 0, "new_failures": 0, "resolved_failures": 0}
        )
        print("Nulos restantes rellenados.")

        # --- 7. Liberar memoria ---
        if 'df_evidences' in locals() and df_evidences.is_cached:
            df_evidences.unpersist()

        return "Cálculo de persistencia finalizado."

    except Exception as e:
        print(f"Error fatal durante el cálculo de persistencia: {e}")
        if 'df_evidences' in locals() and df_evidences.is_cached:
            df_evidences.unpersist()
        raise e
