## Librerias

In [0]:
from pyspark.sql import DataFrame
from typing import Tuple, Dict
from pyspark.sql.functions import col, current_timestamp, lit, when
from pyspark.sql.types import StringType
from delta.tables import DeltaTable
from datetime import datetime

## Variables

In [0]:
def variables_globales() -> dict:
    return {
        "container": dbutils.secrets.get("scope-mbc", "secret-env-container"),
        "storage_account": dbutils.secrets.get("scope-mbc", "secret-env-storage-account"),
        "path_base": f"abfss://{dbutils.secrets.get('scope-mbc', 'secret-env-container')}@{dbutils.secrets.get('scope-mbc', 'secret-env-storage-account')}.dfs.core.windows.net" # path_base = f"abfss://{container}@{storage_account}.dfs.core.windows.net"
    }

## Funciones

In [0]:
def read_landing(path: str) -> DataFrame:
    path_base = variables_globales()["path_base"]

    df = spark.read.format("parquet").load(f"{path_base}/{path}")
    columns_to_cast = [col(c).cast("string").alias(c) for c in df.columns]

    return df.select(*columns_to_cast)



In [0]:
def write_bronze(df: DataFrame, tabla: str) -> None:
    path_base = variables_globales()["path_base"]
    df.write.format("delta").mode("append").saveAsTable(tabla)

In [0]:
def get_metrics(
    table_name: str
) -> Tuple[Dict[str, int], str]:
    """
    Extrae métricas de la última operación Delta Lake sobre una tabla (como filas afectadas y tamaño).

    Parameters
    ----------
    table_name : str
        Nombre de la tabla Delta.

    Returns
    -------
    dict
        Diccionario con métricas como:
        - numInsertedRows
        - numUpdatedRows
        - numDeletedRows
        - numOutputBytes
    """
    delta_table = DeltaTable.forName(spark, table_name)
    try:
        history = delta_table.history()
        for row in history.collect():
            if row.operation == "MERGE":
                metrics = row.asDict().get("operationMetrics", {})
                return {k: int(v) for k, v in metrics.items() if v.isdigit()}, table_name
        return {}, ""
    except Exception:
        return {}, ""

In [0]:
def merge(
    table_name: str,
    df: DataFrame,
    identity_column: list = [],
    enable_delete: bool = False
) -> Dict:
    """
    Ejecuta un merge dinámico sobre una tabla Delta Lake utilizando las claves primarias detectadas automáticamente,
    excluyendo columnas de auditoría y devolviendo métricas para la tabla de control.

    Parameters
    ----------
    table_name : str
        Nombre de la tabla Delta destino en formato 'schema.table'.
    df : DataFrame
        DataFrame fuente con los datos a insertar o actualizar.
    enable_delete : bool, optional
        Si es True, se eliminan las filas que no están presentes en df. Por defecto es False.

    Returns
    -------
    dict
        Métricas con número de filas insertadas, actualizadas y eliminadas.
    """

    query = f"""
    SELECT cu.column_name
    FROM system.information_schema.key_column_usage AS cu
    INNER JOIN system.information_schema.table_constraints AS tc
      USING (constraint_catalog, constraint_schema, constraint_name)
    WHERE concat_ws(".", cu.table_schema, cu.table_name) = '{table_name}'
      AND tc.constraint_type = 'PRIMARY KEY'
      AND cu.table_catalog = 'lakehouse'
    ORDER BY ordinal_position
    """

    df_query = spark.sql(query)
    columns_key = [row['column_name'] for row in df_query.collect()]

    merge_conditions = " AND ".join([f"m.{c} = in.{c}" for c in columns_key])

    delta_table = DeltaTable.forName(spark, table_name)
    target_columns = delta_table.toDF().columns

    exclusion_list_update = set(columns_key + ["FechaAuditoriaCreacion"])
    exclusion_list_insert = set(identity_column)

    columns_to_update = {
        col: f"in.{col}" for col in target_columns if col not in exclusion_list_update
    }

    columns_to_insert = {
        col: f"in.{col}" for col in target_columns if col not in exclusion_list_insert
    }

    merge_builder = (
        delta_table.alias("m")
            .merge(df.alias("in"), merge_conditions)
            .whenMatchedUpdate(set=columns_to_update)
            .whenNotMatchedInsert(values=columns_to_insert)
    )

    if enable_delete:
        merge_builder = merge_builder.whenNotMatchedBySourceDelete()

    merge_builder.execute()

    print(f"Merge ejecutado en la tabla {table_name}")

    return insert_metrics(get_metrics(table_name))

In [0]:
def insert_metrics(metrics_tuple: Tuple[Dict[str, int], str]) -> DataFrame:
    """
    Inserta métricas de un merge en la tabla de control.

    Parameters
    ----------
    metrics_tuple : Tuple
        Tupla con las métricas a insertar y el el nombre de la tabla
    """

    metrics, table_name = metrics_tuple

    job_id = dbutils.widgets.get('JobId')
    job_run_id = dbutils.widgets.get('JobRunId')
    task_run_id = dbutils.widgets.get('TaskRunId')
    start_time = dbutils.widgets.get('StartTime')

    table = table_name.split('.')[1]
    layer = table_name.split('.')[0]
    
    df_metrics = (
        spark.createDataFrame([metrics])
            .withColumn('job_id', lit(int(job_id)))
            .withColumn('job_run_id', lit(int(job_run_id)))
            .withColumn('task_run_id', lit(int(task_run_id)))
            .withColumn('job_start_time', lit(start_time).cast("timestamp"))
            .withColumn('job_end_time', current_timestamp())
            .withColumn('job_duration_seconds', (col("job_end_time") - col("job_start_time")).cast('long'))
            .withColumn('table', lit(table))
            .withColumn('layer', lit(layer))
            .withColumn('rows_in', when(col("layer") != "bronze", lit(0)).otherwise(col('numTargetRowsInserted')))
            .withColumn('rows_inserted', col("numTargetRowsInserted"))
            .withColumn('rows_updated', col("numTargetRowsUpdated"))
            .withColumn('rows_deleted', col("numTargetRowsDeleted"))
            .withColumn('file_bytes', col("numTargetBytesAdded"))
            .withColumn('merge_duration_seconds', col("executionTimeMs") / 1000)
            .withColumn('job_status', when(col("file_bytes") > 0, lit("Success")).otherwise(lit("Failed")))
    ).select(
        col("job_id"),
        col("job_run_id"),
        col("task_run_id"),
        col("job_start_time"),
        col("job_end_time"),
        col("job_duration_seconds"),
        col("job_status"),
        col("table"),
        col("layer"),
        col("rows_in"),
        col("rows_inserted"),
        col("rows_updated"),
        col("rows_deleted"),
        col("file_bytes"),
        col("merge_duration_seconds")
    )

    print(f"Metricas insertadas en la tabla auditoria.ingestion_log")

    return df_metrics