
# Transformación de Datos: Capa Silver en Azure Databricks

Este notebook lee datos desde la capa Bronze (Delta Lake gestionada) y aplica transformaciones y limpieza antes de escribir en la **capa Silver** utilizando `MERGE INTO` para control de cambios.


In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import upper
from delta.tables import DeltaTable

In [0]:
from typing import Optional
from pyspark.sql.functions import col, current_timestamp, expr, lit
from functools import reduce
from operator import and_

def leer_desde_bronze(
    nombre_tabla: str,
    catalog_name: str = "desarrollo",
    db_silver: str = "bronze_ventas",
    mode: str = "full",                 # "full" | "diff"
    ts_col: str = "FECCARGA",
    last_n_days: Optional[int] = None,  # ej. 2  -> últimos 2 días
    last_n_hours: Optional[int] = None, # ej. 6  -> últimas 6 horas
    since: Optional[str] = None,        # "YYYY-MM-DD" o "YYYY-MM-DD HH:mm:ss"
    until: Optional[str] = None,        # límite superior EXCLUSIVO
    drop_nulls: bool = True
):
    full = f"{catalog_name}.{db_silver}.{nombre_tabla}"
    if not spark.catalog.tableExists(full):
        raise ValueError(f"La tabla {full} no existe en el metastore.")

    df = spark.table(full)

    # Validar tipo timestamp
    dtype = dict(df.dtypes).get(ts_col, "").lower()
    if dtype != "timestamp":
        raise ValueError(f"La columna '{ts_col}' debe ser timestamp. Actual: {dtype}")

    if mode.lower() == "full":
        return df if not drop_nulls else df.filter(col(ts_col).isNotNull())

    # --- Diferencial por timestamp ---
    conds = []
    if drop_nulls:
        conds.append(col(ts_col).isNotNull())

    if last_n_days is not None:
        conds.append(col(ts_col) >= (current_timestamp() - expr(f"INTERVAL {int(last_n_days)} DAYS")))
    if last_n_hours is not None:
        conds.append(col(ts_col) >= (current_timestamp() - expr(f"INTERVAL {int(last_n_hours)} HOURS")))
    if since:
        conds.append(col(ts_col) >= lit(since).cast("timestamp"))
    if until:
        conds.append(col(ts_col) <  lit(until).cast("timestamp"))  # upper bound abierto

    if not conds:
        raise ValueError("En mode='diff' especifica last_n_days/last_n_hours o since/until.")

    return df.filter(reduce(and_, conds))

In [0]:
from pyspark.sql.functions import col, to_timestamp, when

# Leer desde silver
#df = leer_desde_silver("md_camas", "clinica_silver", mode="diff",
#                           since="2025-08-20 00:00:00", until="2025-08-23 00:00:00")
subcategoria = leer_desde_bronze("t_subcategoria", db_silver="bronze_ventas", mode="full")
categoria = leer_desde_bronze("t_categoria", db_silver="bronze_ventas", mode="full")
producto = leer_desde_bronze("t_producto", db_silver="bronze_ventas", mode="full")
ubigeo = leer_desde_bronze("t_ubigeo", db_silver="bronze_ventas", mode="full")
segmento = leer_desde_bronze("t_segmento", db_silver="bronze_ventas", mode="full")
mercado = leer_desde_bronze("t_mercado", db_silver="bronze_ventas", mode="full")
sectoreconomico = leer_desde_bronze("t_sectoreconomico", db_silver="bronze_ventas", mode="full")
# cliente = leer_desde_bronze("t_cliente", db_silver="bronze_ventas", mode="full")
vendedor = leer_desde_bronze("t_vendedor", db_silver="bronze_ventas", mode="full")
modalidadenvio = leer_desde_bronze("t_modalidadenvio", db_silver="bronze_ventas", mode="full")
modalidadventa = leer_desde_bronze("t_modalidadventa", db_silver="bronze_ventas", mode="full")
moneda = leer_desde_bronze("t_moneda", db_silver="bronze_ventas", mode="full")
mediopago = leer_desde_bronze("t_mediopago", db_silver="bronze_ventas", mode="full")
prioridadpedido = leer_desde_bronze("t_prioridadpedido", db_silver="bronze_ventas", mode="full")
pedido = leer_desde_bronze("t_pedido", db_silver="bronze_ventas", mode="full")

# Mostrar la estructura del DataFrame después de la limpieza



In [0]:
# SILVER 
from typing import Dict, List, Tuple, Optional
from pyspark.sql import DataFrame, Window
from pyspark.sql.functions import (
    col, lit, when, trim, upper, lower, lpad, rpad, regexp_replace,
    to_timestamp, current_timestamp, sha2, concat_ws, row_number, length,
    coalesce
)
from pyspark.sql.types import TimestampType

def transformar_silver_df(
    df: DataFrame,
    *,
    # 1) Conformado y normalización
    rename_map: Optional[Dict[str, str]] = None,
    casts: Optional[Dict[str, str]] = None,
    normalize_whitespace_cols: Optional[List[str]] = None,
    trim_cols: Optional[List[str]] = None,
    upper_cols: Optional[List[str]] = None,
    lower_cols: Optional[List[str]] = None,
    pad_left: Optional[Dict[str, Tuple[int, str]]] = None,
    pad_right: Optional[Dict[str, Tuple[int, str]]] = None,

    # 2) Incremental & dedupe
    bk_cols: Optional[List[str]] = None,        # business key
    ts_col: Optional[str] = None,               # timestamp de la fila
    ts_fmt: str = "yyyy-MM-dd HH:mm:ss",
    watermark_value: Optional[str] = None,      # e.g. "2025-08-01 00:00:00"
    keep_latest: bool = True,

    # 3) Reglas ligeras
    fillna_map: Optional[Dict[str, object]] = None,
    value_ranges: Optional[Dict[str, Tuple[float, float]]] = None,
    non_negative_cols: Optional[List[str]] = None,
    regex_validations: Optional[Dict[str, str]] = None,
    not_null_cols: Optional[List[str]] = None,

    # 4) Lookups (enriquecimiento in-memory)
    # {"desc_pais": (df_ref, "cod_pais", "descripcion", "DESCONOCIDO")}
    lookups: Optional[Dict[str, Tuple[DataFrame, str, str, object]]] = None,

    # 5) Auditoría ligera (sin SK)
    etl_source: Optional[str] = None,
    etl_batch_id: Optional[str] = None,

    # 6) Soft-delete
    soft_delete_flag_col: Optional[str] = None,
    treat_deleted_as_reject: bool = False,

    # 7) Métricas
    compute_metrics: bool = True,

    # 8) Columna motivo de rechazo
    reject_reason_col: str = "_reject_reason",
) -> Tuple[DataFrame, DataFrame, Dict]:
    dq = {}
    if compute_metrics:
        dq["rows_in"] = df.count()

    # --- Renombrado
    if rename_map:
        for old, new in rename_map.items():
            if old in df.columns and old != new:
                df = df.withColumnRenamed(old, new)

    # --- Casts
    if casts:
        for c, t in casts.items():
            if c in df.columns:
                if t.lower() == "timestamp" and not isinstance(df.schema[c].dataType, TimestampType):
                    df = df.withColumn(c, to_timestamp(col(c), ts_fmt))
                else:
                    df = df.withColumn(c, col(c).cast(t))

    # ts a timestamp si aplica
    if ts_col and ts_col in df.columns and not isinstance(df.schema[ts_col].dataType, TimestampType):
        df = df.withColumn(ts_col, to_timestamp(col(ts_col), ts_fmt))

    # --- Normalización
    def collapse_ws(cname: str):
        return trim(regexp_replace(col(cname), r"\s+", " "))

    if normalize_whitespace_cols:
        for c in normalize_whitespace_cols:
            if c in df.columns:
                df = df.withColumn(c, collapse_ws(c))
    if trim_cols:
        for c in trim_cols:
            if c in df.columns:
                df = df.withColumn(c, trim(col(c)))
    if upper_cols:
        for c in upper_cols:
            if c in df.columns:
                df = df.withColumn(c, upper(col(c)))
    if lower_cols:
        for c in lower_cols:
            if c in df.columns:
                df = df.withColumn(c, lower(col(c)))
    if pad_left:
        for c, (ln, ch) in pad_left.items():
            if c in df.columns:
                df = df.withColumn(c, lpad(col(c).cast("string"), ln, ch))
    if pad_right:
        for c, (ln, ch) in pad_right.items():
            if c in df.columns:
                df = df.withColumn(c, rpad(col(c).cast("string"), ln, ch))

    # --- Watermark incremental
    if watermark_value and ts_col and ts_col in df.columns:
        wm_ts = to_timestamp(lit(watermark_value), ts_fmt)
        df = df.where(col(ts_col) >= wm_ts)

    if compute_metrics:
        dq["rows_after_watermark"] = df.count()

    # --- Defaults
    if fillna_map:
        df = df.fillna({k: v for k, v in fillna_map.items() if k in df.columns})

    # --- Reglas ligeras
    if non_negative_cols:
        for c in non_negative_cols:
            if c in df.columns:
                df = df.withColumn(c, when(col(c) < 0, lit(0)).otherwise(col(c)))
    if value_ranges:
        for c, (mn, mx) in value_ranges.items():
            if c in df.columns:
                df = df.withColumn(
                    c,
                    when(col(c) < mn, lit(mn)).when(col(c) > mx, lit(mx)).otherwise(col(c))
                )

    # --- Validaciones → rejects
    rejects = None
    def _union_rejects(bad: DataFrame):
        nonlocal rejects
        rejects = bad if rejects is None else rejects.unionByName(bad, allowMissingColumns=True)

    if not_null_cols:
        for c in not_null_cols:
            if c in df.columns:
                bad = df.where(col(c).isNull()).withColumn(reject_reason_col, lit(f"NOT_NULL({c})"))
                _union_rejects(bad)
    if regex_validations:
        for c, pattern in regex_validations.items():
            if c in df.columns:
                bad = df.where(~col(c).rlike(pattern)).withColumn(reject_reason_col, lit(f"REGEX_FAIL({c})"))
                _union_rejects(bad)
    if soft_delete_flag_col and soft_delete_flag_col in df.columns and treat_deleted_as_reject:
        bad = df.where(coalesce(col(soft_delete_flag_col), lit(0)) != lit(0)) \
                 .withColumn(reject_reason_col, lit("SOFT_DELETED"))
        _union_rejects(bad)

    # Excluir rechazados del válido
    if rejects is not None:
        all_cols = df.columns
        sig_expr = sha2(concat_ws("§", *[col(c).cast("string") for c in all_cols]), 256)
        df = df.withColumn("_row_sig", sig_expr)
        rejects = rejects.withColumn("_row_sig", sig_expr)
        df = df.join(rejects.select("_row_sig").dropDuplicates(), on="_row_sig", how="left_anti").drop("_row_sig")
        rejects = rejects.drop("_row_sig")

    if compute_metrics:
        dq["rows_rejects"] = 0 if rejects is None else rejects.count()
        dq["rows_after_rejects"] = df.count()

    # --- Lookups
    if lookups:
        for target_col, (df_ref, key_col, value_col, default_value) in lookups.items():
            ref = df_ref.select(col(key_col).alias(f"lk_{key_col}"),
                                col(value_col).alias(f"lk_{value_col}")).dropDuplicates()
            if key_col in df.columns:
                df = (
                    df.join(ref, df[key_col] == ref[f"lk_{key_col}"], "left")
                      .withColumn(target_col, coalesce(col(f"lk_{value_col}"), lit(default_value)))
                      .drop(f"lk_{key_col}", f"lk_{value_col}")
                )
            else:
                df = df.withColumn(target_col, lit(default_value))

    # --- Dedupe por BK + TS
    if bk_cols and len(bk_cols) > 0:
        if ts_col and ts_col in df.columns:
            order_col = col(ts_col).desc() if keep_latest else col(ts_col).asc()
        else:
            order_col = length(concat_ws("§", *[col(c).cast("string") for c in bk_cols])).desc()
        w = Window.partitionBy(*[col(c) for c in bk_cols]).orderBy(order_col)
        df = df.withColumn("_rn", row_number().over(w)).where(col("_rn") == 1).drop("_rn")

    if compute_metrics:
        dq["rows_after_dedupe"] = df.count()

    # --- Auditoría mínima (opcional)
    if etl_source:
        df = df.withColumn("_etl_source", lit(etl_source))
    if etl_batch_id:
        df = df.withColumn("_etl_batch_id", lit(etl_batch_id))
    df = df.withColumn("_etl_loaded_at", current_timestamp())

    if compute_metrics:
        dq["rows_out"] = df.count()

    if rejects is None:
        rejects = df.limit(0).withColumn(reject_reason_col, lit(None).cast("string"))

    return df, rejects, dq


In [0]:
df_Subcategoria, df_rech_subcategoria, dq = transformar_silver_df(
    subcategoria,
    rename_map={"CODSUBCAT":"id_subcategoria","CODCAT":"id_categoria","FECCARGA":"fecha_carga", "NOMSUBCAT":"des_subcategoria"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_subcategoria"],
    trim_cols=["des_subcategoria"],
    bk_cols=["id_subcategoria"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_categoria":0},
    non_negative_cols=["id_categoria"],
    regex_validations=False,
    not_null_cols=["id_subcategoria","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_subcategoria",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Categoria, df_rech_categoria, dq = transformar_silver_df(
    categoria,
    rename_map={"CODCAT":"id_categoria","FECCARGA":"fecha_carga", "NOMCAT":"des_categoria"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_categoria"],
    trim_cols=["des_categoria"],
    bk_cols=["id_categoria"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_categoria":0},
    non_negative_cols=["id_categoria"],
    regex_validations=False,
    not_null_cols=["id_categoria","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_categoria",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Moneda, df_rech_moneda, dq = transformar_silver_df(
    moneda,
    rename_map={"CODMNDA":"id_moneda","FECCARGA":"fecha_carga", "DESMNDA":"des_moneda", "DESISO":"des_abrmoneda", "DESSIMB":"des_simbolo"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_moneda"],
    trim_cols=["des_moneda"],
    bk_cols=["id_moneda"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_moneda":0},
    non_negative_cols=["id_moneda"],
    regex_validations=False,
    not_null_cols=["id_moneda","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_moneda",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Producto, df_rech_producto, dq = transformar_silver_df(
    producto,
    rename_map={"IDPRODUCTO":"id_producto","FECCARGA":"fecha_carga", "NOMPROD":"des_producto" , "CODPROD":"cod_producto" , "CODSUBCAT":"id_subcategoria", "CODMNDA":"id_moneda", "MTOPRECUNIT":"mto_preciounitario"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_producto"],
    trim_cols=["des_producto"],
    bk_cols=["id_producto"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_producto":0},
    non_negative_cols=["id_producto"],
    regex_validations=False,
    not_null_cols=["id_producto","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_producto",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Segmento, df_rech_segmento, dq = transformar_silver_df(
    segmento,
    rename_map={"CODSGMNTO":"id_segmento","FECCARGA":"fecha_carga", "DESSGMNTO":"des_segmento"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_segmento"],
    trim_cols=["des_segmento"],
    bk_cols=["id_segmento"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_segmento":0},
    non_negative_cols=["id_segmento"],
    regex_validations=False,
    not_null_cols=["id_segmento","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_segmento",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Ubigeo, df_rech_ubigeo, dq = transformar_silver_df(
    ubigeo,
    rename_map={"CODUBIGEO":"id_ubigeo","FECCARGA":"fecha_carga", "NOMCIUDAD":"nom_ciudad", "NOMESTADO":"nom_estado", "NOMPAIS":"nom_pais" , "NOMREGION":"nom_region"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["nom_ciudad"],
    trim_cols=["nom_ciudad"],
    bk_cols=["id_ubigeo"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_ubigeo":0},
    non_negative_cols=["id_ubigeo"],
    regex_validations=False,
    not_null_cols=["id_ubigeo","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_ubigeo",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Mercado, df_rech_mercado, dq = transformar_silver_df(
    mercado,
    rename_map={"CODMRCADO":"id_mercado","FECCARGA":"fecha_carga", "DESMERCADO":"des_mercado"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_mercado"],
    trim_cols=["des_mercado"],
    bk_cols=["id_mercado"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_mercado":0},
    non_negative_cols=["id_mercado"],
    regex_validations=False,
    not_null_cols=["id_mercado","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_mercado",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Sectoreconomico, df_rech_sectoreconomico, dq = transformar_silver_df(
    sectoreconomico,
    rename_map={"CODSECTECON":"id_sectoreconomico","CODSGMNTO":"id_segmento","FECCARGA":"fecha_carga", "DESSECTECON":"des_sectoreconomico"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_sectoreconomico"],
    trim_cols=["des_sectoreconomico"],
    bk_cols=["id_sectoreconomico"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_sectoreconomico":0},
    non_negative_cols=["id_sectoreconomico"],
    regex_validations=False,
    not_null_cols=["id_sectoreconomico","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_sectoreconomico",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
# df_Cliente, df_rech_cliente, dq = transformar_silver_df(
#     cliente,
#     rename_map={"IDCLI":"id_cliente","FECCARGA":"fecha_carga", "NOMCLI":"nom_cliente", "CODCLI":"cod_cliente", "CODUBIGEO":"id_ubigeo", "CODSECTECON":"id_sectoreconomico", "CODMRCADO":"id_mercado"},
#     casts={"fecha_carga":"timestamp"},
#     normalize_whitespace_cols=["nom_cliente"],
#     trim_cols=["nom_cliente"],
#     bk_cols=["id_cliente"],
#     ts_col="fecha_carga",
#     watermark_value="2025-08-01 00:00:00",
#     fillna_map={"id_cliente":0},
#     non_negative_cols=["id_cliente"],
#     regex_validations=False,
#     not_null_cols=["id_cliente","fecha_carga"],
#     ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
#     etl_source="silver_cliente",
#     etl_batch_id="20250824_01",
#     compute_metrics=False
# )

In [0]:
df_Mediopago, df_rech_mediopago, dq = transformar_silver_df(
    mediopago,
    rename_map={"CODMEDIOPAGO":"id_mediopago","FECCARGA":"fecha_carga", "DESMEDIOPAGO":"des_mediopago"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_mediopago"],
    trim_cols=["des_mediopago"],
    bk_cols=["id_mediopago"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_mediopago":0},
    non_negative_cols=["id_mediopago"],
    regex_validations=False,
    not_null_cols=["id_mediopago","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_mediopago",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Prioridadpedido, df_rech_prioridadpedido, dq = transformar_silver_df(
    prioridadpedido,
    rename_map={"CODPRIORPEDI":"id_prioridadpedido","FECCARGA":"fecha_carga", "DESPRIORPEDI":"des_prioridadpedido"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_prioridadpedido"],
    trim_cols=["des_prioridadpedido"],
    bk_cols=["id_prioridadpedido"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_prioridadpedido":0},
    non_negative_cols=["id_prioridadpedido"],
    regex_validations=False,
    not_null_cols=["id_prioridadpedido","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_prioridadpedido",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Modalidadventa, df_rech_modalidadventa, dq = transformar_silver_df(
    modalidadventa,
    rename_map={"CODMODVALVTA":"id_modalidadventa","FECCARGA":"fecha_carga", "DESMODVALVTA":"des_modalidadventa"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_modalidadventa"],
    trim_cols=["des_modalidadventa"],
    bk_cols=["id_modalidadventa"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_modalidadventa":0},
    non_negative_cols=["id_modalidadventa"],
    regex_validations=False,
    not_null_cols=["id_modalidadventa","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_modalidadventa",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Modalidadenvio, df_rech_modalidadenvio, dq = transformar_silver_df(
    modalidadenvio,
    rename_map={"CODMODALENV":"id_modalidadenvio","FECCARGA":"fecha_carga", "DESMODALENV":"des_modalidadenvio"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["des_modalidadenvio"],
    trim_cols=["des_modalidadenvio"],
    bk_cols=["id_modalidadenvio"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_modalidadenvio":0},
    non_negative_cols=["id_modalidadenvio"],
    regex_validations=False,
    not_null_cols=["id_modalidadenvio","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_modalidadenvio",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Vendedor, df_rech_vendedor, dq = transformar_silver_df(
    vendedor,
    rename_map={"CODVEND":"id_vendedor","FECCARGA":"fecha_carga", "NOMVEND":"nom_vendedor", "CODMNDA":"id_moneda", "MTOSUELDOBASE":"mto_sueldobase", "PCTCOMIS":"pct_comision"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["nom_vendedor"],
    trim_cols=["nom_vendedor"],
    bk_cols=["id_vendedor"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_vendedor":0},
    non_negative_cols=["id_vendedor"],
    regex_validations=False,
    not_null_cols=["id_vendedor","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_vendedor",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
df_Pedido, df_rech_pedido, dq = transformar_silver_df(
    pedido,
    rename_map={"IDPEDIDO":"id_pedido","FECCARGA":"fecha_carga", "CODPEDID":"cod_pedido", "CODCLI":"cod_cliente", "CODPROD":"cod_producto", "CODMODVALVTA":"id_modalidadventa", "CODMEDIOPAGO":"id_mediopago", "CODVEND":"id_vendedor", "CODMNDA":"id_moneda", "IDCLI":"id_cliente", "IDPRODUCTO":"id_producto", "CODMODALENV":"id_modalidadenvio", "CODPRIORPEDI":"id_prioridadpedido", "MTOVALUNIT":"mto_valorunitario", "CTDPEDID":"nro_pedidos", "MTOSUBT":"mto_subtotal", "MTODSCTO":"mto_descuento", "MTOBENEF":"mto_ganancia", "MTOVALVTA":"mto_valorventa", "MTOIGV":"mto_iva", "MTOSUBTPROD":"mto_subtotalproducto", "MTOCSTOENV":"mto_costoenvio", "MTOTOTPROD":"mto_totalproducto"},
    casts={"fecha_carga":"timestamp"},
    normalize_whitespace_cols=["cod_pedido"],
    trim_cols=["cod_pedido"],
    bk_cols=["id_pedido"],
    ts_col="fecha_carga",
    watermark_value="2025-08-01 00:00:00",
    fillna_map={"id_pedido":0},
    non_negative_cols=["id_pedido"],
    regex_validations=False,
    not_null_cols=["id_pedido","fecha_carga"],
    ##lookups={"id_categoria": (categoria, "codcat", "DESCONOCIDO")},
    etl_source="silver_pedido",
    etl_batch_id="20250824_01",
    compute_metrics=False
)

In [0]:
%sql
USE CATALOG desarrollo;

SHOW DATABASES;

In [0]:
from typing import List, Optional
from delta.tables import DeltaTable

def crear_tabla_delta_merge_managed(
    nombre_df: str,
    nombre_tabla: str,
    llave_origen: List[str],
    llave_destino: List[str],
    db_name: str = "default",
    catalog_name: str = "desarrollo",
    partition_cols: Optional[List[str]] = None,
    auto_merge_schema: bool = True
) -> None:
    """
    Crea si no existe una tabla Delta GESTIONADA en la base (que ya debe tener LOCATION en tu mount)
    y realiza MERGE. No usa LOCATION explícito.
    """

    # Validaciones
    df = globals()[nombre_df]

    if len(llave_origen) != len(llave_destino):
        print("❌ Error: La cantidad de columnas en 'llave_origen' y 'llave_destino' no coinciden.")
        return

    if partition_cols:
        faltantes = [c for c in partition_cols if c not in df.columns]
        if faltantes:
            print(f"❌ Error: Columnas de partición no existen en el DataFrame: {faltantes}")
            return

    if auto_merge_schema:
        spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

    # Armar nombre completo
    full_name = f"{catalog_name}.{db_name}.{nombre_tabla}"

    # ✅ FIX: usar el overload moderno (una sola cadena)
    exists = spark.catalog.tableExists(full_name)

    if not exists:
        # Crear como TABLA GESTIONADA en el LOCATION de la DB (sin LOCATION explícito)
        writer = df.write.format("delta").mode("overwrite")
        if partition_cols:
            # ✅ FIX: varargs
            writer = writer.partitionBy(*partition_cols)
        writer.saveAsTable(full_name)
        print(f"✅ Tabla gestionada creada: {full_name} (bajo LOCATION de la base '{db_name}')")
        return

    # Si existe, MERGE
    try:
        delta_tbl = DeltaTable.forName(spark, full_name)
    except Exception as e:
        raise RuntimeError(f"❌ La tabla {full_name} no es Delta o no es accesible como Delta: {e}")

    merge_condition = " AND ".join(
        [f"tgt.`{llave_destino[i]}` = src.`{llave_origen[i]}`" for i in range(len(llave_origen))]
    )
    set_expr  = {c: f"src.`{c}`" for c in df.columns}
    vals_expr = {c: f"src.`{c}`" for c in df.columns}

    print(f"🔄 Ejecutando MERGE INTO {full_name} ...")
    (delta_tbl.alias("tgt")
             .merge(df.alias("src"), merge_condition)
             .whenMatchedUpdate(set=set_expr)
             .whenNotMatchedInsert(values=vals_expr)
             .execute())
    print(f"✅ MERGE completado para {full_name}")

In [0]:
# Ejecutar la función para crear la tabla y hacer MERGE usando diferentes llaves
crear_tabla_delta_merge_managed(
    nombre_df="df_Subcategoria",
    nombre_tabla="md_subcategoria",
    llave_origen=["id_subcategoria"],
    llave_destino=["id_subcategoria"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)



In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Categoria",
    nombre_tabla="md_categoria",
    llave_origen=["id_categoria"],
    llave_destino=["id_categoria"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)


In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Moneda",
    nombre_tabla="md_moneda",
    llave_origen=["id_moneda"],
    llave_destino=["id_moneda"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)


In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Producto",
    nombre_tabla="md_producto",
    llave_origen=["id_producto"],
    llave_destino=["id_producto"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Mediopago",
    nombre_tabla="md_mediopago",
    llave_origen=["id_mediopago"],
    llave_destino=["id_mediopago"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Prioridadpedido",
    nombre_tabla="md_prioridadpedido",
    llave_origen=["id_prioridadpedido"],
    llave_destino=["id_prioridadpedido"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Modalidadenvio",
    nombre_tabla="md_modalidadenvio",
    llave_origen=["id_modalidadenvio"],
    llave_destino=["id_modalidadenvio"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Modalidadventa",
    nombre_tabla="md_modalidadventa",
    llave_origen=["id_modalidadventa"],
    llave_destino=["id_modalidadventa"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Segmento",
    nombre_tabla="md_segmento",
    llave_origen=["id_segmento"],
    llave_destino=["id_segmento"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Sectoreconomico",
    nombre_tabla="md_sectoreconomico",
    llave_origen=["id_sectoreconomico"],
    llave_destino=["id_sectoreconomico"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Mercado",
    nombre_tabla="md_mercado",
    llave_origen=["id_mercado"],
    llave_destino=["id_mercado"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Ubigeo",
    nombre_tabla="md_ubigeo",
    llave_origen=["id_ubigeo"],
    llave_destino=["id_ubigeo"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
# crear_tabla_delta_merge_managed(
#     nombre_df="df_Cliente",
#     nombre_tabla="md_cliente",
#     llave_origen=["id_cliente"],
#     llave_destino=["id_cliente"],
#     db_name="silver_ventas",
#     partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
# )

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Vendedor",
    nombre_tabla="md_vendedor",
    llave_origen=["id_vendedor"],
    llave_destino=["id_vendedor"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)

In [0]:
crear_tabla_delta_merge_managed(
    nombre_df="df_Pedido",
    nombre_tabla="hd_pedido",
    llave_origen=["id_pedido"],
    llave_destino=["id_pedido"],
    db_name="silver_ventas",
    partition_cols=["fecha_carga"]  # opcional; si no quieres partición, quítalo
)