In [0]:
# Databricks notebook source
# ==========================================================
# UDV - ESTADIOS
# Proyecto: Liga 1 Perú
# Autor: Oscar García Del Águila
# ==========================================================

from env_setup import *
from utils_liga1 import setup_adls, get_dbutils, generar_udv_json, get_entity_data, get_abfss_path, is_dataframe_empty, get_predecesor,get_pipeline_params, get_yaml_from_param, write_delta_udv, log, read_udv_table, get_pipeline, extract_entity_name
from pyspark.sql import SparkSession
from md_estadios import carga_final
import traceback
import sys

# ----------------------------------------------------------
# CONFIGURACIÓN INICIAL
# ----------------------------------------------------------

log("Inicio de ejecución del pipeline UDV", "INFO", "md_estadios")

spark = SparkSession.builder.getOrCreate()
setup_adls()
dbutils = get_dbutils()

In [0]:
# ----------------------------------------------------------
# PARÁMETROS Y PREDECESORES
# ----------------------------------------------------------
try:
    # Para pruebas puedes dejar un valor por defecto; luego ADF lo setea
    dbutils.widgets.text("prm_pipelineid", "7")
    prm_pipelineid = int(dbutils.widgets.get("prm_pipelineid"))

    pipeline_name = get_pipeline(prm_pipelineid)
    entity_name = pipeline_name["pipeline"]               # ej: 'md_estadios'

    dict_predecesores = get_predecesor(prm_pipelineid)
    dict_params       = get_pipeline_params(prm_pipelineid)

    prm_ruta_tabla_pred = dict_predecesores["RutaTabla"]  # ej: 'tb_udv.md_catalogo_equipos'

    prm_capa_udv     = dict_params["CAPA_UDV"]
    prm_ruta_base    = dict_params["RUTA_BASE"]
    prm_ruta_tabla   = dict_params["RUTA_TABLA"]
    prm_formato      = dict_params["FORMATO_SALIDA"]
    prm_schema_tb    = dict_params["SCHEMA_TABLA"]
    prm_tabla_output = dict_params["NOMBRE_TABLA"]
    prm_ruta_yaml    = dict_params["YAML_PATH"]

    prm_ruta_tabla_output = f"{prm_capa_udv}/{prm_ruta_base}/{prm_ruta_tabla}"
    ruta_delta_udv = get_abfss_path(prm_ruta_tabla_output)

    log("Parámetros cargados correctamente", "INFO", entity_name)

except Exception as e:
    log(f"Error al cargar parámetros o predecesores: {e}", "ERROR", "md_estadios")
    print(traceback.format_exc())
    raise

In [0]:
# ----------------------------------------------------------
# LECTURA YAML
# ----------------------------------------------------------
try:
    yaml_conf = get_yaml_from_param(prm_ruta_yaml)

    prm_cols_catalogo_equipos    = yaml_conf[entity_name]["cols_catalogo_equipos"]
    prm_cols_estadios            = yaml_conf[entity_name]["cols_estadios"]
    prm_drop_duplicates_estadios = yaml_conf[entity_name]["drop_duplicates_estadios"]
    prm_rename_columns           = yaml_conf[entity_name]["rename_columns"]
    prm_schema                   = yaml_conf[entity_name]["schema"]
    prm_dedup_cols               = yaml_conf[entity_name]["dedup_cols"]

    log("YAML cargado correctamente", "INFO", entity_name)

except Exception as e:
    log(f"Error al leer YAML {prm_ruta_yaml}: {e}", "ERROR", entity_name)
    print(traceback.format_exc())
    raise

In [0]:
# ----------------------------------------------------------
# LECTURA PREDECESORES (CATÁLOGO + RAW)
# ----------------------------------------------------------
try:
    log("Lectura desde UDV/RAW", "INFO", entity_name)

    # 1) Catálogo de equipos (md_catalogo_equipos) vía RutaTabla
    df_catalogo_equipos = read_udv_table(prm_ruta_tabla_pred)

    if is_dataframe_empty(df_catalogo_equipos):
        raise Exception(f"No se encontró data en la tabla predecesora: {prm_ruta_tabla_pred}")

    # 2) RAW estadios desde tbl_paths (flg_udv = 'N')
    entity_raw = extract_entity_name(entity_name)  # 'md_estadios' -> 'estadios'

    df_raw_estadios = get_entity_data(entity_raw,dedup_cols=prm_dedup_cols)

    if is_dataframe_empty(df_raw_estadios):
        raise Exception(f"No se encontró data en RAW para la entidad: {entity_raw}")

    log("Predecesores completados correctamente", "INFO", entity_name)

except Exception as e:
    log(f"Error en lectura de predecesores/RAW: {e}", "ERROR", entity_name)
    print(traceback.format_exc())
    raise

log("Finalización de la etapa de lectura", "INFO", entity_name)

In [0]:
from pyspark.sql.functions import col, current_timestamp, date_format, lower,trim
from utils_liga1 import cast_dataframe_schema, rename_columns

# 1) Selección de catálogo
df_catalogo_equipos_select = df_catalogo_equipos.select(*prm_cols_catalogo_equipos)
display(df_catalogo_equipos_select)

# 2) Selección y deduplicación de RAW estadios
df_raw_estadios_select = (
    df_raw_estadios
    .select(*prm_cols_estadios)
    .dropDuplicates(prm_drop_duplicates_estadios)
    .select(
        *[col(c) for c in prm_cols_estadios],
        lower(col("club") ).alias("club_lower")
    )
)

display(df_raw_estadios_select)
# 3) JOIN catálogo vs estadios
df_join = (
    df_catalogo_equipos_select.alias("a")
    .join(
        df_raw_estadios_select.alias("b"),
        trim(lower(col("a.nombre_transfermarkt"))) == trim(col("b.club_lower")),
        "left"
    )
    .select(
        col("a.id_equipo"),
        col("a.nombre_equipo"),
        col("b.club_lower"),
        col("b.estadio"),
        col("b.capacidad"),
        col("b.aforo"),
        col("b.fuente"),
        current_timestamp().alias("fecha_carga"),
        date_format(current_timestamp(), "yyyyMMdd").alias("periododia")
    )
)

# 4) Renombrar columnas según YAML (club_lower → club_raw, fuente → fuente_estadio)
df_rename = rename_columns(df_join, prm_rename_columns)

# 5) Castear y ordenar columnas según schema
df_cast = cast_dataframe_schema(df_rename, prm_schema)

display(df_cast)


In [0]:
# # ----------------------------------------------------------
# # EJECUCIÓN PRINCIPAL
# # ----------------------------------------------------------
# try:
#     log("Inicio de Ejecución Principal", "INFO", entity_name)

#     df_final = carga_final(
#         df_catalogo_equipos,
#         df_raw_estadios,
#         prm_cols_catalogo_equipos,
#         prm_cols_estadios,
#         prm_drop_duplicates_estadios,
#         prm_rename_columns,
#         prm_schema
#     )

#     # MERGE simple (UPSERT) sobre id_equipo
#     write_delta_udv(
#         spark,
#         df_final,
#         prm_schema_tb,
#         prm_tabla_output,
#         mode="merge",
#         merge_condition="delta.id_equipo = df.id_equipo"
#     )

#     log("Proceso completado correctamente", "SUCCESS", entity_name)

# except Exception as e:
#     log(f"Error en ejecución principal: {e}", "ERROR", entity_name)
#     print(traceback.format_exc())
#     raise

# log("Finalización del pipeline UDV", "INFO", entity_name)