#LIBRERIAS

In [0]:
%run "../Configuraciones/variables_configuraciones"

In [0]:
print(BUCKET_S3)

In [0]:
import os
import boto3
import json
import pandas as pd

from pyspark.sql import functions as F, types as T
from pyspark.sql import types as T
from pyspark.sql import  Window


from io import TextIOWrapper, BytesIO
import csv
import unicodedata
from datetime import datetime

# CONFIGURACION

#DESARROLLO

## Cargar Tabla estado bronze

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS externo

In [0]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.catalog import VolumeType

catalog = "workspace"
schema = "externo"
volume = "tfm_s3_datalake_externo"
PATH_VOLUMEN = F"/Volumes/{catalog}/{schema}/{volume}"

w = WorkspaceClient()

# Ver si el volume ya existe
volumes = [v.name for v in w.volumes.list(catalog_name=catalog, schema_name=schema)]

if volume not in volumes:
    print("Creando volume…")
    w.volumes.create(
        name=volume,
        catalog_name=catalog,
        schema_name=schema,
        volume_type=VolumeType.EXTERNAL,
        storage_location= BUCKET_S3
    )
else:
    print("El volume ya existe.")

In [0]:
from pyspark.sql import functions as F

path = f"dbfs:/Volumes/{catalog}/{schema}/{volume}/metadata/vehiculos/state.json"

raw_df = (
    spark.read
         .option("multiline", "true")
         .json(path)
)

years = raw_df.columns
expr = ", ".join([f"'{y}', `{y}`" for y in years])
stack_expr = f"stack({len(years)}, {expr}) as (year, data)"

sdf_bronce_state = (
    raw_df
    .selectExpr(stack_expr)  # year, data (struct)
    .select("year", "data.*")
)

display(sdf_bronce_state)

In [0]:
%skip

#from pyspark.sql import functions as F, types as T

sdf_bronce_state = spark.createDataFrame(json_data); 



In [0]:
sdf_bronce_state = (sdf_bronce_state
  .withColumn("year", F.col("year").cast("int"))
  .withColumn("last_metadata_update", F.to_timestamp("last_metadata_update"))
  .withColumn("last_status",          F.col("last_status"))
  .withColumn("last_checked_ts",      F.to_timestamp("last_checked_ts"))
  .withColumn("last_file_path",       F.col("last_file_path"))
  .withColumn("last_sha256",          F.col("last_sha256"))
  .select("year","last_metadata_update","last_status","last_checked_ts","last_file_path","last_sha256")
)

In [0]:
sdf_bronce_state.write.mode("overwrite").saveAsTable("tfm_bronze.state_files")

In [0]:
print("[INFO] tfm_bronze.state_files actualizado desde state.json")
display(spark.table("tfm_bronze.state_files").orderBy(F.col("year")))

## Verificar Archivos cambiados o nuevos

In [0]:
#from pyspark.sql import functions as F

# Alias para las tablas Bronze (B) y Silver (P)
B = spark.table("tfm_bronze.state_files").alias("b")
P = spark.table("tfm_silver.state_files_processed").alias("p")

# Unimos por año (left join: mantiene todos los registros de Bronze)
joined = B.join(P, on=B["year"] == P["year"], how="left")

# Aplicamos las condiciones para detectar archivos nuevos o modificados:
#   - No existe en Silver
#   - El hash SHA256 cambió (contenido diferente)
#   - La fecha de metadatos cambió
#   - Existe un error registrado en Silver
to_process = joined.where(
    (P["year"].isNull()) |
    (B["last_sha256"] != P["last_sha256"]) |
    #(F.to_timestamp(B["last_metadata_update"]) != F.to_timestamp(P["last_metadata_update"])) |
    (P["error_message"].isNotNull())
).select(
    B["year"],
    B["last_metadata_update"],
    B["last_status"],
    B["last_checked_ts"],
    B["last_file_path"],
    B["last_sha256"],
    P["last_sha256"].alias("last_sha256_silver")
)

# Resultado final: lista de archivos que deben procesarse hacia Silver
print("[INFO] Archivos NUEVOS o CAMBIADOS que deben procesarse:")
display(to_process.orderBy("year"))

#Funciones

In [0]:
def obtener_dataframe_csv(key_path: str):
    """
    Crea un dataframe de spark del archivo almacenado en el volumen
    """
    df = (
        spark.read
            .option("header", True)
            .option("sep", ";") 
            .option("charset", "latin1")
            .option("inferSchema", True)
            .csv(key_path)
    )

    return df

In [0]:
def normalizacion_cabecera(df, year: int):
    """
    Normaliza los encabezados de un DataFrame según las reglas definidas 
    en la tabla de configuración `tfm_config.header_mappings`. 
    
    Esta función asegura que las columnas del DataFrame de entrada 
    (diferentes entre años o archivos) se ajusten a una nomenclatura 
    estándar, aplicando las reglas vigentes al momento de ejecución.

    Parámetros
    ----------
    df : pyspark.sql.DataFrame
        DataFrame de entrada con encabezados originales del archivo CSV.
    year : int
        Año de la fuente de datos, usado para aplicar las reglas específicas 
        de ese período en `header_mappings`.

    Retorna
    -------
    df : pyspark.sql.DataFrame
        DataFrame con los encabezados normalizados y alineados 
        al esquema canónico del dataset “vehiculos”.
    """

    # ---------------------------------------------------------------------
    # CONFIGURACIÓN BASE Y CONTEXTO TEMPORAL
    # ---------------------------------------------------------------------
    dataset_id = "vehiculos"

    # Marca temporal de referencia del proceso (fecha/hora actual)
    #event_ts = F.current_timestamp()

    # Obtención de un valor literal de timestamp (no columna Spark)
    event_ts_lit = spark.range(1).select(F.current_timestamp().alias("ts")).collect()[0]["ts"]

    #print("Columnas Originales:", df.columns)
    # Registro de control para auditoría (puede eliminarse en producción)
    #print(event_ts)
    #print(event_ts_lit)

    # ---------------------------------------------------------------------
    # LECTURA DE LA TABLA DE CONFIGURACIÓN DE ENCABEZADOS
    # ---------------------------------------------------------------------
    # Se carga la tabla de mapeo que define cómo traducir los nombres 
    # de columnas originales ("header_source") hacia los nombres estándar 
    # o canónicos ("header_canonical").
    #
    # Solo se aplican reglas activas (effective_start <= ahora < effective_end)
    # para el dataset "vehiculos".
    # ---------------------------------------------------------------------
    mappings = (
        spark.table("tfm_config.header_mappings")
        .where(
            (F.col("dataset_id") == dataset_id)
            & (F.col("effective_start") <= F.lit(event_ts_lit))
            & (
                (F.col("effective_end").isNull())
                | (F.col("effective_end") > F.lit(event_ts_lit))
            )
        )
    )

    # ---------------------------------------------------------------------
    # FILTRADO POR AÑO DE FUENTE
    # ---------------------------------------------------------------------
    # Si existen reglas específicas por año (source_year), se priorizan 
    # sobre las reglas generales (source_year = NULL).
    # ---------------------------------------------------------------------
    mappings = mappings.where(
        (F.col("source_year").isNull()) | (F.col("source_year") == F.lit(year))
    )

    # ---------------------------------------------------------------------
    # RESOLUCIÓN DE CONFLICTOS ENTRE REGLAS
    # ---------------------------------------------------------------------
    # Si existen múltiples mapeos para el mismo header_source, 
    # se selecciona el de menor prioridad (columna “priority”).
    # ---------------------------------------------------------------------
    w = Window.partitionBy("header_source").orderBy(F.col("priority").asc())
    mappings_resolved = (
        mappings.withColumn("rn", F.row_number().over(w))
        .where(F.col("rn") == 1)
        .select("header_source", "header_canonical")
    )

    # ---------------------------------------------------------------------
    # CONSTRUCCIÓN DEL DICCIONARIO DE MAPEOS
    # ---------------------------------------------------------------------
    # Convierte las reglas resueltas en un diccionario Python 
    # para fácil aplicación en el DataFrame:
    # Ejemplo: {"MARCA VEHICULO": "MARCA", "MODELO VEHICULO": "MODELO"}
    # ---------------------------------------------------------------------
    map_pairs = [(r["header_source"], r["header_canonical"]) for r in mappings_resolved.collect()]
    map_dict = dict(map_pairs)

    #print("mappings:", map_dict)


    # ---------------------------------------------------------------------
    # NORMALIZACIÓN DE ENCABEZADOS DEL DATAFRAME
    # ---------------------------------------------------------------------
    # 1. Se eliminan acentos y caracteres especiales.
    # 2. Se convierten a mayúsculas.
    # 3. Se renombran según el mapeo configurado.
    # ---------------------------------------------------------------------
    for c in df.columns:
        normalizado = (
            unicodedata.normalize("NFKD", c)
            .encode("ASCII", "ignore")
            .decode()
            .upper()
        )
        #print("norma: ", normalizado)
        target = map_dict.get(normalizado)
        if target and target != c:
            #print(f"Columna {c} renombrada a {target}")
            df = df.withColumnRenamed(c, target)


    #print("Columnas Normalizadas:", df.columns)

    # ---------------------------------------------------------------------
    # VALIDACIÓN DE COLUMNAS OBLIGATORIAS
    # ---------------------------------------------------------------------
    # Se asegura que todas las columnas esperadas existan en el DataFrame.
    # Si falta alguna, se crea con valor NULL (tipo string).
    # ---------------------------------------------------------------------
    columnas = [
        "CODIGO_VEHICULO",
        "FECHA_PROCESO",
        "FECHA_COMPRA",
        "CATEGORIA",
        "TIPO_TRANSACCION",
        "MARCA",
        "MODELO",
        "PAIS",
        "ANIO_MODELO",
        "CLASE",
        "SUBCLASE",
        "TIPO_VEHICULO",
        "AVALUO",
        "TIPO_SERVICIO",
        "CILINDRAJE",
        "TIPO_COMBUSTIBLE",
        "CANTON",
        "COLOR1",
        "COLOR2",
        "TIPO_COMPRADOR"
    ]

    for col_name in columnas:
        if col_name not in df.columns:
            df = df.withColumn(col_name, F.lit(None).cast("string"))

    # ---------------------------------------------------------------------
    # REORDENAMIENTO FINAL DE COLUMNAS
    # ---------------------------------------------------------------------
    # Se fuerza el orden estándar de columnas para garantizar consistencia 
    # con la estructura esperada en la capa Silver.
    # ---------------------------------------------------------------------
    df = df.select(*columnas)

    #print("Columnas Final:", df.columns)

    # ---------------------------------------------------------------------
    # RETORNO DEL DATAFRAME NORMALIZADO
    # ---------------------------------------------------------------------
    return df

In [0]:
def normalizacion_columnas(df, year: int):
    """
    Normaliza y castea columnas de un DataFrame según reglas declarativas
    en `tfm_config.column_cast_rules`.

    Prioridad de selección de reglas por columna (header_canonical):
      1) Reglas con `source_year` específico sobre reglas con `source_year` NULL.
      2) Entre reglas empatadas, se elige la de `effective_start` más reciente.

    Tipos soportados en `target_type`:
      - "string" (implícito), "int"/"integer", "double", "date", "timestamp".

    Caso especial para fechas:
      - Si `date_format == 'm'`, la columna representa el **mes** y se construye
        una fecha `"01-<mes>-<year>"` para normalizarla a una fecha válida.
    """
    dataset = "vehiculos"

    # ---------------------------------------------------------------------
    # 1) Selección de reglas vigentes para el dataset y el año dado.
    #    Se filtran por:
    #      - dataset_id
    #      - (source_year == year) o (source_year IS NULL)
    #      - ventana de vigencia: effective_start <= NOW < effective_end (o end NULL)
    # ---------------------------------------------------------------------
    ref_ts = F.current_timestamp()
    base_rules = (
        spark.table("tfm_config.column_cast_rules")
        .filter(
            (F.col("dataset_id") == F.lit(dataset)) &
            ((F.col("source_year").isNull()) | (F.col("source_year") == F.lit(year))) &
            (F.col("effective_start").isNull() | (F.col("effective_start") <= ref_ts)) &
            (F.col("effective_end").isNull()   | (F.col("effective_end")   >  ref_ts))
        )
    )

    # ---------------------------------------------------------------------
    # 2) Resolución de conflictos entre reglas de una misma columna.
    #    Criterios:
    #      a) Preferir reglas con `source_year` (puntaje 1) sobre NULL (0).
    #      b) Dentro de cada grupo, elegir la de `effective_start` más reciente.
    # ---------------------------------------------------------------------
    rules_scored = (
        base_rules
        .withColumn("_has_year", F.when(F.col("source_year").isNull(), F.lit(0)).otherwise(F.lit(1)))
        .withColumn("_start_ts", F.coalesce(F.col("effective_start"), F.lit("1970-01-01").cast("timestamp")))
    )

    w = Window.partitionBy("header_canonical").orderBy(
        F.col("_has_year").desc(),     # primero las que tienen año específico
        F.col("_start_ts").desc()      # luego la más reciente por fecha de inicio
    )

    rules_df = (
        rules_scored
        .withColumn("_rank", F.row_number().over(w))
        .filter(F.col("_rank") == 1)   # nos quedamos con la regla “ganadora”
        .drop("_has_year","_start_ts","_rank")
    )

    # (Opcional) inspección de reglas aplicadas para auditoría/depuración
    #display(
    #    rules_df.select(
    #        "header_canonical","target_type","date_format","normalize_upper",
    #        "trim_all","source_year","effective_start","effective_end","note"
    #    )
    #)

    # ---------------------------------------------------------------------
    # 3) Aplicación de reglas columna por columna.
    #    Para cada `header_canonical`:
    #      - Se crean columnas ausentes como NULL (string) para mantener esquema.
    #      - Se aplican normalizaciones de texto (trim/upper) si corresponde.
    #      - Se castea al tipo objetivo controlando formatos y valores inválidos.
    # ---------------------------------------------------------------------
    for r in rules_df.collect():
        col_name    = r["header_canonical"]
        target_type = r["target_type"]
        date_fmt    = r["date_format"]
        do_upper    = r["normalize_upper"]
        do_trim     = r["trim_all"]

        # Punto de partida: la columna destino (si no existe, se crea como string NULL)
        if col_name not in df.columns:
            df = df.withColumn(col_name, F.lit(None).cast("string"))
        expr = F.col(col_name)

        # Normalizaciones textuales previas
        if do_trim:
            expr = F.trim(expr)
        if do_upper:
            expr = F.upper(expr)

        # Casting según tipo destino
        if target_type in ("int", "integer"):
            # Valida que la cadena sea numérica positiva; si no, devuelve NULL
            expr = F.when(expr.rlike(r"^[0-9]+$"), expr.cast("int")).otherwise(F.lit(None).cast("int"))

        elif target_type == "double":
            # Sustituye coma decimal por punto y castea a double
            expr = F.regexp_replace(expr, ",", ".").cast("double")

        elif target_type == "date":
            # Quita parte horaria si existiera y se queda con la fecha (antes del primer espacio)
            base = F.substring_index(expr.cast("string"), " ", 1)

            if date_fmt == 'm':
                # Construye "01-<mm>-<year>" y castea con el formato explícito
                expr = F.concat_ws(
                    "-",
                    F.lit("01"),
                    F.lpad(base, 2, "0"),
                    F.lit(str(year))
                )
                expr = F.to_date(expr, "dd-MM-yyyy")
            else:
                if("-MMM-" in date_fmt):
                    meses = {
                        "Ene": "Jan", "Feb": "Feb", "Mar": "Mar", "Abr": "Apr",
                        "May": "May", "Jun": "Jun", "Jul": "Jul", "Ago": "Aug",
                        "Sept": "Sep", "Oct": "Oct", "Nov": "Nov", "Dic": "Dec"
                    }
                    for esp, eng in meses.items():
                        base = F.regexp_replace(base, f"(?i){esp}", eng)
                # Usa formato provisto; si no hay, intenta heurísticas comunes
                expr = F.when(
                    F.lit(date_fmt).isNotNull(), F.to_date(base, date_fmt)
                ).otherwise(
                    F.coalesce(
                        F.to_date(base, "dd/MM/yyyy"),
                        F.to_date(base, "yyyy-MM-dd"),
                        F.to_date(base, "dd-MM-yyyy"),
                        F.to_date(F.to_timestamp(expr, "dd/MM/yyyy HH:mm")),
                        F.to_date(F.to_timestamp(expr, "dd/MM/yyyy HH:mm:ss")),
                        F.to_date(F.to_timestamp(expr, "yyyy-MM-dd HH:mm:ss"))
                    )
                )

        elif target_type == "timestamp":
            # Casteo a timestamp con formato dado o con heurísticas estándar
            if date_fmt:
                expr = F.to_timestamp(expr, date_fmt)
            else:
                expr = F.coalesce(
                    F.to_timestamp(expr, "dd/MM/yyyy HH:mm:ss"),
                    F.to_timestamp(expr, "dd/MM/yyyy HH:mm"),
                    F.to_timestamp(expr, "yyyy-MM-dd HH:mm:ss")
                )

        else:
            # Por defecto, mantener como string (tras normalizaciones)
            expr = F.trim(expr)
            expr = expr.cast("string")

        # Reemplaza el contenido de la columna en el DataFrame final
        df = df.withColumn(col_name, expr)

    # Retorna el DataFrame con las columnas normalizadas y casteadas
    return df

In [0]:
def homologar_columnas_categoricas(df):
    dominios_columnas = [
        ("tipo_transaccion",    "TIPO_TRANSACCION"),
        ("marca",               "MARCA"),
        ("pais",                "PAIS"),
        ("clase",               "CLASE"),
        ("subclase",            "SUBCLASE"),
        ("tipo_vehiculo",       "TIPO_VEHICULO"),
        ("tipo_servicio",       "TIPO_SERVICIO"),
        ("tipo_combustible",    "TIPO_COMBUSTIBLE"),
        ("tipo_comprador",      "TIPO_COMPRADOR"),
    ]

    # Tabla de homologación actual
    hmg = spark.table("tfm_silver_hmg.homologacion")

    nuevas_reglas_all = None

    for dominio, col in dominios_columnas:
        print(f"Procesando dominio '{dominio}' / columna '{col}'")

        # 1) valores distintos de esa columna en el DF base
        distinct_vals = (
            df
            .select(F.col(col).alias("valor_original"))
            .distinct()
            .filter(F.col("valor_original").isNotNull())
        )

        # 2) reglas vigentes para ese dominio/columna
        hmg_dom = (
            hmg
            .filter(
                (F.col("dominio") == F.lit(dominio)) &
                (F.col("columna_origen") == F.lit(col)) &
                (F.col("fecha_hasta").isNull())
            )
            .select("valor_original")
            .distinct()
        )

        # 3) valores que NO están aún en la homologadora (left_anti = anti join)
        nuevos_valores = distinct_vals.join(hmg_dom, on="valor_original", how="left_anti")

        #if nuevos_valores.count() == 0:
        #    print(f"  - No hay nuevos valores para {dominio}/{col}")
        #    continue

        # 4) crear filas para homologacion
        nuevas_reglas = (
            nuevos_valores
            .withColumn("dominio", F.lit(dominio))
            .withColumn("columna_origen", F.lit(col))
            .withColumn("valor_original", F.col("valor_original"))
            .withColumn("catalog_id", F.lit(None).cast("bigint"))
            .withColumn("fecha_desde", F.current_timestamp())
            .withColumn("fecha_hasta", F.lit(None).cast("timestamp"))
            .select(
                "dominio",
                "columna_origen",
                "valor_original",
                "catalog_id",
                "fecha_desde",
                "fecha_hasta",
            )
        )

        if nuevas_reglas_all is None:
            nuevas_reglas_all = nuevas_reglas
        else:
            nuevas_reglas_all = nuevas_reglas_all.unionByName(nuevas_reglas)

    # 5) Insertar todo de una sola vez
    if nuevas_reglas_all is not None and  nuevas_reglas_all.count() > 0:
        print("Insertando nuevas reglas en tfm_silver_hmg.homologacion...")
        (
            nuevas_reglas_all
            .write
            .mode("append")
            .format("delta")
            .saveAsTable("tfm_silver_hmg.homologacion")
        )
    else:
        print("No hay nuevas reglas que insertar.")

    df.createOrReplaceTempView("vehiculos_sin_hmg_ctg")

    df_con_ids = spark.sql("""
    SELECT
        v.*,
        
        --TIPO_TRANSACCION
        h_tt.regla_id                               AS tipo_transaccion_hmg_id,
        NVL(h_tt.catalog_id, 1)                     AS tipo_transaccion_id,

        -- MARCA
        h_marca.regla_id                            AS marca_hmg_id,
        NVL(h_marca.catalog_id, 1)                 AS marca_id,

        -- PAIS
        h_pais.regla_id                             AS pais_hmg_id,
        NVL(h_pais.catalog_id,1)                    AS pais_id,

        -- CLASE
        h_clase.regla_id                            AS clase_hmg_id,
        NVL(h_clase.catalog_id,1)                   AS clase_id,

        -- SUBCLASE
        h_subclase.regla_id                         AS subclase_hmg_id,
        NVL(h_subclase.catalog_id,1)                AS subclase_id,

        -- TIPO_VEHICULO
        h_tipo_veh.regla_id                         AS tipo_vehiculo_hmg_id,
        NVL(h_tipo_veh.catalog_id,1)                AS tipo_vehiculo_id,

        -- TIPO_SERVICIO
        h_tipo_srv.regla_id                         AS tipo_servicio_hmg_id,
        NVL(h_tipo_srv.catalog_id,1)                AS tipo_servicio_id,

        -- TIPO_COMBUSTIBLE
        h_comb.regla_id                             AS tipo_combustible_hmg_id,
        NVL(h_comb.catalog_id,1)                    AS tipo_combustible_id,

        -- TIPO_COMPRADOR
        h_comp.regla_id                             AS tipo_comprador_hmg_id,
        NVL(h_comp.catalog_id,1)                    AS tipo_comprador_id

    FROM vehiculos_sin_hmg_ctg v
    -- ================= TIPO TRANSCCION =================
    LEFT JOIN tfm_silver_hmg.homologacion h_tt
        ON h_tt.dominio        = 'tipo_transaccion'
    AND h_tt.columna_origen = 'TIPO_TRANSACCION'
    AND h_tt.valor_original = v.TIPO_TRANSACCION
    AND h_tt.fecha_hasta IS NULL         -- regla vigente

    -- ================= MARCA =================
    LEFT JOIN tfm_silver_hmg.homologacion h_marca
        ON h_marca.dominio        = 'marca'
    AND h_marca.columna_origen = 'MARCA'
    AND h_marca.valor_original = v.MARCA
    AND h_marca.fecha_hasta IS NULL         -- regla vigente


    -- ================= PAIS =================
    LEFT JOIN tfm_silver_hmg.homologacion h_pais
        ON h_pais.dominio        = 'pais'
    AND h_pais.columna_origen = 'PAIS'
    AND h_pais.valor_original = v.PAIS
    AND h_pais.fecha_hasta IS NULL
    

    -- ================= CLASE =================
    LEFT JOIN tfm_silver_hmg.homologacion h_clase
        ON h_clase.dominio        = 'clase'
    AND h_clase.columna_origen = 'CLASE'
    And h_clase.valor_original = v.CLASE
    AND h_clase.fecha_hasta IS NULL

    -- ================= SUBCLASE =================
    LEFT JOIN tfm_silver_hmg.homologacion h_subclase
        ON h_subclase.dominio        = 'subclase'
    AND h_subclase.columna_origen = 'SUBCLASE'
    AND h_subclase.valor_original = v.SUBCLASE
    AND h_subclase.fecha_hasta IS NULL

    -- ================= TIPO_VEHICULO =================
    LEFT JOIN tfm_silver_hmg.homologacion h_tipo_veh
        ON h_tipo_veh.dominio        = 'tipo_vehiculo'
    AND h_tipo_veh.columna_origen = 'TIPO_VEHICULO'
    AND h_tipo_veh.valor_original = v.TIPO_VEHICULO
    AND h_tipo_veh.fecha_hasta IS NULL

    -- ================= TIPO_SERVICIO =================
    LEFT JOIN tfm_silver_hmg.homologacion h_tipo_srv
        ON h_tipo_srv.dominio        = 'tipo_servicio'
    AND h_tipo_srv.columna_origen = 'TIPO_SERVICIO'
    AND h_tipo_srv.valor_original = v.TIPO_SERVICIO
    AND h_tipo_srv.fecha_hasta IS NULL

    -- ================= TIPO_COMBUSTIBLE =================
    LEFT JOIN tfm_silver_hmg.homologacion h_comb
        ON h_comb.dominio        = 'tipo_combustible'
    AND h_comb.columna_origen = 'TIPO_COMBUSTIBLE'
    AND h_comb.valor_original = v.TIPO_COMBUSTIBLE
    AND h_comb.fecha_hasta IS NULL

    -- ================= TIPO_COMPRADOR =================
    LEFT JOIN tfm_silver_hmg.homologacion h_comp
        ON h_comp.dominio        = 'tipo_comprador'
    AND h_comp.columna_origen = 'TIPO_COMPRADOR'
    AND h_comp.valor_original = v.TIPO_COMPRADOR
    AND h_comp.fecha_hasta IS NULL
    ;
    """)

    return df_con_ids


In [0]:
def prepare_for_silver(df_in,
                       source_year: int,
                       source_file: str,
                       source_sha256: str = None):
    """
    Prepara un DataFrame proveniente de la capa Bronze para su escritura o 
    integración (MERGE) en la capa Silver.

    Esta función agrega columnas técnicas, metadatos de linaje, 
    y hashes de negocio y contenido para detectar cambios y evitar duplicados.

    Parámetros
    ----------
    df_in : pyspark.sql.DataFrame
        DataFrame de entrada ya normalizado (con FECHA_PROCESO, FECHA_COMPRA y CODIGO_VEHICULO).
    source_year : int
        Año de origen del archivo procesado (derivado de la carpeta o metadata del archivo S3).
    source_file : str
        Nombre completo del archivo fuente (por trazabilidad).
    source_sha256 : str, opcional
        Hash del archivo fuente (para control de integridad de datos).

    Retorna
    -------
    pyspark.sql.DataFrame
        DataFrame enriquecido con columnas de control, linaje, 
        y claves hash para manejo incremental en la capa Silver.
    """

    # ---------------------------------------------------------------------
    # 1) Agregar columnas técnicas y de linaje
    # ---------------------------------------------------------------------
    # - anio_proceso / mes_proceso: derivadas de FECHA_PROCESO para 
    #   facilitar particionamiento y consultas temporales.
    # - source_year / source_file / source_sha256: trazabilidad del origen.
    # - ingest_ts: timestamp del momento en que se ingesta el archivo.
    # ---------------------------------------------------------------------
    df = (
        df_in
        .withColumn("anio_proceso", F.year("FECHA_PROCESO"))
        .withColumn("mes_proceso",  F.month("FECHA_PROCESO"))
        .withColumn("source_year",  F.lit(int(source_year)))
        .withColumn("source_file",  F.lit(source_file))
        .withColumn("source_sha256", F.lit(source_sha256))
        .withColumn("ingest_ts",    F.current_timestamp())
        # NOTA: si necesitás convertir tipos explícitamente, 
        # podés habilitar las siguientes líneas:
        # .withColumn("ANIO_MODELO", F.col("ANIO_MODELO").cast(IntegerType()))
        # .withColumn("CILINDRAJE",  F.col("CILINDRAJE").cast(IntegerType()))
        # .withColumn("AVALUO",      F.regexp_replace(F.col("AVALUO"), ",", ".").cast(DoubleType()))
    )

    # ---------------------------------------------------------------------
    # 2) Generar clave de negocio (Business Key)
    # ---------------------------------------------------------------------
    # Define los campos que identifican de forma única una transacción 
    # o registro de vehículo. En este caso:
    #   (CODIGO_VEHICULO, FECHA_COMPRA)
    #   (Se excluye FECHA_PROCESO porque el mismo vehículo puede 
    #   volver a aparecer con diferentes fechas de proceso)
    #
    # Se genera un hash SHA-256 concatenando los valores (como strings),
    # usando "||" como separador para evitar ambigüedades.
    # ---------------------------------------------------------------------
    bk_cols = ["CODIGO_VEHICULO", "FECHA_COMPRA"]
    df = df.withColumn(
        "bk_hash",
        F.sha2(
            F.concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in bk_cols]),
            256
        )
    )

    # ---------------------------------------------------------------------
    # 3) Generar hash de contenido (Content Hash)
    # ---------------------------------------------------------------------
    # Permite detectar si un registro cambió en alguno de sus valores 
    # no clave (por ejemplo, cambio en marca, modelo, avaluo, etc.)
    # 
    # Se concatena el contenido completo de las columnas relevantes, 
    # aplicando un hash SHA-256 que representa el “estado” del registro.
    # ---------------------------------------------------------------------
    content_cols = [
        "CATEGORIA",
        #"TIPO_TRANSACCION",
        "tipo_transaccion_hmg_id",
        #"MARCA",
        "marca_hmg_id",
        "MODELO",
        #"PAIS",
        "pais_hmg_id",
        "ANIO_MODELO",
        #"CLASE",
        "clase_hmg_id",
        #"SUBCLASE",
        "subclase_hmg_id",
        #"TIPO_VEHICULO",
        "tipo_vehiculo_hmg_id",
        "AVALUO",
        #"TIPO_SERVICIO",
        "tipo_servicio_hmg_id",
        "CILINDRAJE",
        #"TIPO_COMBUSTIBLE",
        "tipo_combustible_hmg_id",
        "CANTON",
        "COLOR1",
        "COLOR2",
        #"TIPO_COMPRADOR"
        "tipo_comprador_hmg_id"
    ]

    df = df.withColumn(
        "content_hash",
        F.sha2(
            F.concat_ws("||", *[F.coalesce(F.col(c).cast("string"), F.lit("")) for c in content_cols]),
            256
        )
    )

    # ---------------------------------------------------------------------
    # 4) Retornar DataFrame enriquecido
    # ---------------------------------------------------------------------
    # El resultado contiene todas las columnas de negocio más:
    #   - anio_proceso, mes_proceso
    #   - source_year, source_file, source_sha256
    #   - ingest_ts, bk_hash, content_hash
    #
    # Este DataFrame está listo para ser:
    #   - Insertado en la tabla Silver.
    #   - Usado en un MERGE (upsert) incremental.
    # ---------------------------------------------------------------------
    return df

In [0]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

def control_duplicados(df):
    """
    Elimina registros duplicados en un DataFrame antes de su inserción en la capa Silver.
    Optimizada para ejecución distribuida en PySpark.

    Reglas:
      1. Elimina duplicados exactos basados en (bk_hash, content_hash, FECHA_PROCESO).
      2. Si existen varias versiones con la misma clave (bk_hash + content_hash),
         conserva únicamente la más antigua (FECHA_PROCESO más baja).
    """

    # ---------------------------------------------------------------------
    # 1) Eliminación directa de duplicados exactos
    # ---------------------------------------------------------------------
    # dropDuplicates() ya es distribuido y no requiere un shuffle completo.
    df = df.dropDuplicates(["bk_hash", "content_hash", "FECHA_PROCESO"])

    # ---------------------------------------------------------------------
    # 2) Selección de la versión más antigua por hash
    # ---------------------------------------------------------------------
    # En lugar de usar una ventana + row_number (que genera un shuffle caro),
    # podemos usar un groupBy + agg(min()) que es más eficiente y escalable.
    # Esto logra el mismo efecto: conservar la fila con la FECHA_PROCESO mínima.
    # ---------------------------------------------------------------------
    df = df.repartition("bk_hash")  # agrupa por hash y reduce shuffle posterior
    min_dates = (
        df.groupBy("bk_hash", "content_hash")
          .agg(F.min("FECHA_PROCESO").alias("min_fecha_proceso"))
    )

    # Join distribuido (sin broadcast, Spark manejará la estrategia óptima)
    df = (
        df.join(
            min_dates,
            on=[
                df["bk_hash"] == min_dates["bk_hash"],
                df["content_hash"] == min_dates["content_hash"],
                df["FECHA_PROCESO"] == min_dates["min_fecha_proceso"],
            ],
            how="inner"
        )
        .select(df["*"])  # mantener el esquema original
    )

    return df

In [0]:
def insertar_datos(df, bandera_insertar: bool):
    """
    Inserta los registros nuevos o actualizados desde un DataFrame temporal (vehiculos_stage)
    en la tabla Delta principal tfm_silver.vehiculos. 

    El proceso incluye los siguientes pasos:
    1. Determinar los años (particiones) presentes en la carga actual.
    2. Extraer únicamente las particiones correspondientes desde la tabla Silver.
    3. Identificar los registros nuevos o modificados mediante un anti-join.
    4. Insertar los registros resultantes en la tabla Silver.

    Args:
        df (DataFrame): DataFrame con los registros listos para inserción en Silver.
    """

    # -------------------------------------------------------------------------
    # Configuración de base de datos y tabla destino
    # -------------------------------------------------------------------------
    db_name    = "tfm_silver"
    silver_tbl = f"{db_name}.vehiculos"

    # Selecciona el esquema de trabajo
    spark.sql(f"USE {db_name}")

    # Registra el DataFrame recibido como vista temporal
    df.createOrReplaceTempView("vehiculos_stage")

    # -------------------------------------------------------------------------
    # 1. Determinación de los años (particiones) presentes en la carga actual
    # -------------------------------------------------------------------------
    # Obtiene la lista de años distintos desde el DataFrame de staging
    target_years = [
        r['anio_proceso']
        for r in spark.table("vehiculos_stage")
                      .select("anio_proceso")
                      .distinct()
                      .collect()
    ]

    # -------------------------------------------------------------------------
    # 2. Extracción de las particiones relevantes desde la tabla Silver
    # -------------------------------------------------------------------------
    # Carga únicamente las filas correspondientes a los años identificados.
    # Esta estrategia evita escanear la totalidad de la tabla Silver.
    silver_slice = (
        spark.table(silver_tbl)
        .where(F.col("anio_proceso").isin(target_years))
        .select(
            "anio_proceso",
            "CODIGO_VEHICULO",
            "FECHA_PROCESO",
            "FECHA_COMPRA",
            "content_hash"
        )
    )

    # Carga de la vista temporal como DataFrame de staging
    stage = spark.table("vehiculos_stage")

    # -------------------------------------------------------------------------
    # 3. Anti-join para identificar registros nuevos o modificados
    # -------------------------------------------------------------------------
    # Se comparan las filas de staging con la porción relevante de Silver
    # utilizando la clave compuesta y el hash de contenido.
    # Se retienen únicamente los registros que no existen en Silver.
    to_insert = (
        stage.alias("s")
        .join(
            silver_slice.alias("t"),
            on=[
                F.col("t.anio_proceso")    == F.col("s.anio_proceso"),
                F.col("t.CODIGO_VEHICULO") == F.col("s.CODIGO_VEHICULO"),
                F.col("t.FECHA_PROCESO")   == F.col("s.FECHA_PROCESO"),
                F.col("t.FECHA_COMPRA")    == F.col("s.FECHA_COMPRA"),
                F.col("t.content_hash")    == F.col("s.content_hash")
            ],
            how="left_anti"
        )
        # Reparticiona los datos por año de proceso para mejorar la escritura
        .repartition("anio_proceso")
    )

    schema_cols = spark.table(silver_tbl).columns
    to_insert = to_insert.select(*schema_cols)
    
    #to_insert = to_insert.persist()
    numero = to_insert.count()
    #print("Numero registros nuevos:", numero)
    #numero = 99999999

    # -------------------------------------------------------------------------
    # 4. Inserción en la tabla Delta particionada (Silver)
    # -------------------------------------------------------------------------
    # Los registros resultantes del anti-join se escriben en la tabla Delta.
    # Se utiliza modo append dado que la tabla ya está particionada por anio_proceso.
    if bandera_insertar == True:
        (
            to_insert.write
            .format("delta")
            .mode("append")
            .saveAsTable(silver_tbl)
        )

    return numero

# PROCESO LOOP

In [0]:
# Guardar los valores de cada columna de to_process en variables y mostrarlos

import time


if to_process.count() == 0:
    print("No hay registros por procesar")

bandera_insertar = True
    
for row in to_process.collect():
    year = row["year"]
    last_metadata_update = row["last_metadata_update"]
    last_status = row["last_status"]
    last_checked_ts = row["last_checked_ts"]
    last_file_path = row["last_file_path"]
    last_sha256 = row["last_sha256"]
    last_sha256_silver = row["last_sha256_silver"]

    ruta_archivo = 'raw/vehiculos/'
    file_name = os.path.basename(last_file_path)

    print(f"\n============   {year}  ============")
    print(f"Inicio Archivo: {file_name}")
    print(f"Fecha Proceso: {last_metadata_update}")
    print(f"Estado: {last_status}")
    print(f"Fecha Proceso: {last_checked_ts}")
    print(f"Archivo: {last_file_path}")
    print(f"Hash: {last_sha256}")

    print("\n--------------")
    print("Obtener df de csv")
    inicio = time.time()
    df_csv = None
    df_csv = obtener_dataframe_csv(f"{PATH_VOLUMEN}/{ruta_archivo}year={year}/{file_name}")
    #display(df_csv.limit(5))
    fin = time.time()
    duracion = fin - inicio
    print(f"[INFO] Tiempo de ejecución de Obtener df de csv: {duracion:.2f} segundos")

    print("\n--------------")
    print("Normalizar cabeceras")
    inicio = time.time()
    df_cabeceras = None
    df_cabeceras = normalizacion_cabecera(df_csv, year)
    #display(df_cabeceras.limit(5))
    fin = time.time()
    duracion = fin - inicio
    print(f"[INFO] Tiempo de ejecución de Normalizar cabeceras: {duracion:.2f} segundos")

    print("\n--------------")
    print("Normalizar columnas")
    inicio = time.time()
    df_columnas_normalizadas = None
    df_columnas_normalizadas = normalizacion_columnas(df_cabeceras, year)
    #display(df_columnas_normalizadas.limit(5))
    fin = time.time()
    duracion = fin - inicio
    print(f"[INFO] Tiempo de ejecución de Normalizar columnas: {duracion:.2f} segundos")

    print("\n--------------")
    print("Homologar y Categorizar columnas")
    inicio = time.time()
    df_columnas_homologadas_catalogos = None
    df_columnas_homologadas_catalogos= homologar_columnas_categoricas(df_columnas_normalizadas)
    #display(df_columnas_homologadas_catalogos.limit(5))
    fin = time.time()
    duracion = fin - inicio
    print(f"[INFO] Tiempo de ejecución de Homologar y catalogos columnas: {duracion:.2f} segundos")

    print("\n--------------")
    print("Agregar Columnas")
    inicio = time.time()
    df_silver = None
    df_silver = prepare_for_silver(df_columnas_homologadas_catalogos, year, last_file_path, last_sha256)
    #display(df_silver.limit(5))
    fin = time.time()
    duracion = fin - inicio
    print(f"[INFO] Tiempo de ejecución de Agregar Columnas: {duracion:.2f} segundos")

    print("\n--------------")
    print("Control duplicidad")
    inicio = time.time()
    #print("Conteo Original: ", df_silver.count())
    df_sin_dupplicados = control_duplicados(df_silver)
    #print("Conteo Sin duplicados: ", df_sin_dupplicados.count())
    fin = time.time()
    duracion = fin - inicio
    print(f"[INFO] Tiempo de ejecución de Control duplicidad: {duracion:.2f} segundos")

    print("\n--------------")
    print("Insertar Registros")
    inicio = time.time()
    numero_nuevos_registros = insertar_datos(df_sin_dupplicados, bandera_insertar)
    fin = time.time()
    duracion = fin - inicio
    print(f"[INFO] Tiempo de ejecución de Insertar Registro: {duracion:.2f} segundos")

    if (numero_nuevos_registros > 0) and bandera_insertar:
        print(f"Se insertaron {numero_nuevos_registros} nuevos registros")
        print("\n--------------")
        print("Insertar state_files_processed Silver")

        process_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        spark.sql(f"""
            DELETE FROM tfm_silver.state_files_processed
            WHERE last_sha256 = '{last_sha256_silver}'
            AND year = {year};
        """)

        spark.sql(f"""
            INSERT INTO tfm_silver.state_files_processed VALUES (
                {year},
                '{last_metadata_update}',
                '{last_status}',
                '{last_checked_ts}',
                '{last_file_path}',
                '{last_sha256}',
                '{process_ts}',
                NULL
            );
        """)
    else:
        print("No se insertaron nuevos registros")
        print("\n--------------")

# Pruebas Manuales

In [0]:
%skip
import time

bandera_insertar = True
    
row =  to_process.collect()[0]

In [0]:
%skip
year = row["year"]
last_metadata_update = row["last_metadata_update"]
last_status = row["last_status"]
last_checked_ts = row["last_checked_ts"]
last_file_path = row["last_file_path"]
last_sha256 = row["last_sha256"]
last_sha256_silver = row["last_sha256_silver"]

ruta_archivo = 'raw/vehiculos/'
file_name = os.path.basename(last_file_path)

print(f"\n============   {year}  ============")
print(f"Inicio Archivo: {file_name}")
print(f"Fecha Proceso: {last_metadata_update}")
print(f"Estado: {last_status}")
print(f"Fecha Proceso: {last_checked_ts}")
print(f"Archivo: {last_file_path}")
print(f"Hash: {last_sha256}")

In [0]:
%skip
print("\n--------------")
print("Obtener df de csv")
inicio = time.time()
df_csv = None
df_csv = obtener_dataframe_csv(f"{PATH_VOLUMEN}/{ruta_archivo}year={year}/{file_name}")
display(df_csv.limit(5))
fin = time.time()
duracion = fin - inicio
print(f"[INFO] Tiempo de ejecución de Obtener df de csv: {duracion:.2f} segundos")

In [0]:
%skip
print("\n--------------")
print("Normalizar cabeceras")
inicio = time.time()
df_cabeceras = None
df_cabeceras = normalizacion_cabecera(df_csv, year)
display(df_cabeceras.limit(5))
fin = time.time()
duracion = fin - inicio
print(f"[INFO] Tiempo de ejecución de Normalizar cabeceras: {duracion:.2f} segundos")

In [0]:
%skip
print("\n--------------")
print("Normalizar columnas")
inicio = time.time()
df_columnas_normalizadas = None
df_columnas_normalizadas = normalizacion_columnas(df_cabeceras, year)
display(df_columnas_normalizadas.limit(5))
fin = time.time()
duracion = fin - inicio
print(f"[INFO] Tiempo de ejecución de Normalizar columnas: {duracion:.2f} segundos")

In [0]:
%skip
print("\n--------------")
print("Homologar y Categorizar columnas")
inicio = time.time()
df_columnas_homologadas_catalogos = None
df_columnas_homologadas_catalogos= homologar_columnas_categoricas(df_columnas_normalizadas)
display(df_columnas_homologadas_catalogos.limit(5))
fin = time.time()
duracion = fin - inicio
print(f"[INFO] Tiempo de ejecución de Homologar y catalogos columnas: {duracion:.2f} segundos")

In [0]:
%skip
print("\n--------------")
print("Agregar Columnas")
inicio = time.time()
df_silver = None
df_silver = prepare_for_silver(df_columnas_homologadas_catalogos, year, last_file_path, last_sha256)
display(df_silver.limit(5))
fin = time.time()
duracion = fin - inicio
print(f"[INFO] Tiempo de ejecución de Agregar Columnas: {duracion:.2f} segundos")

In [0]:
%skip
print("\n--------------")
print("Control duplicidad")
inicio = time.time()
print("Conteo Original: ", df_silver.count())
df_sin_dupplicados = control_duplicados(df_silver)
print("Conteo Sin duplicados: ", df_sin_dupplicados.count())
fin = time.time()
duracion = fin - inicio
print(f"[INFO] Tiempo de ejecución de Control duplicidad: {duracion:.2f} segundos")

In [0]:
%skip
print("\n--------------")
print("Insertar Registros")
inicio = time.time()
numero_nuevos_registros = insertar_datos(df_sin_dupplicados, True)
fin = time.time()
duracion = fin - inicio
print(f"[INFO] Tiempo de ejecución de Insertar Registro: {duracion:.2f} segundos")

In [0]:
%skip
if (numero_nuevos_registros > 0) and bandera_insertar:
    print(f"Se insertaron {numero_nuevos_registros} nuevos registros")
    print("\n--------------")
    print("Insertar state_files_processed Silver")

    process_ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    spark.sql(f"""
        DELETE FROM tfm_silver.state_files_processed
        WHERE last_sha256 = '{last_sha256_silver}'
        AND year = {year};
    """)

    spark.sql(f"""
        INSERT INTO tfm_silver.state_files_processed VALUES (
            {year},
            '{last_metadata_update}',
            '{last_status}',
            '{last_checked_ts}',
            '{last_file_path}',
            '{last_sha256}',
            '{process_ts}',
            NULL
        );
    """)
else:
    print("No se insertaron nuevos registros")
    print("\n--------------")