In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F

In [0]:
dbutils.widgets.text("catalogo", "catalog_dev")
dbutils.widgets.text("esquema_source", "silver")
dbutils.widgets.text("esquema_sink", "golden")

In [0]:
catalogo = dbutils.widgets.get("catalogo")
esquema_source = dbutils.widgets.get("esquema_source")
esquema_sink = dbutils.widgets.get("esquema_sink")

In [0]:
df_bitacora = spark.table(f"{catalogo}.{esquema_source}.bitacora_clasificacion")

In [0]:
# Obtener timestamp en hora de Colombia
fecha_ejecucion_col = from_utc_timestamp(current_timestamp(), "America/Bogota")

In [0]:
# Función para generar 4 dataframes con datos agregados basados en la sábana de datos inicial

def generar_agregaciones_gold(df):
    # 1. Incidentes por zona
    df_zona_estado = df.groupBy("Zona") \
        .agg(F.count("*").alias("Total_Incidentes_Zona")) \
        .withColumn("Fecha_Ejecucion", fecha_ejecucion_col)

    # 2. Clasificación por tipo de vehículo
    df_clasificacion_vehiculo = df.groupBy("Tipo_Vehiculo", "Clasificacion") \
        .agg(F.count("*").alias("Cantidad")) \
        .withColumn("Fecha_Ejecucion", fecha_ejecucion_col)

    # 3. Tendencia diaria
    df_tendencia_diaria = df.groupBy("Fecha_Ejecucion") \
        .agg(F.count("*").alias("Incidentes_por_dia")) \
        .withColumn("Fecha_Ejecucion", fecha_ejecucion_col)

    # 4. Ranking de líderes técnicos
    df_ranking_lideres = df.groupBy("Lider_Tecnico") \
        .agg(F.count("*").alias("Total_Incidentes")) \
        .orderBy(F.desc("Total_Incidentes")) \
        .withColumn("Fecha_Ejecucion", fecha_ejecucion_col)

    return {
        "zona_estado": df_zona_estado,
        "clasificacion_vehiculo": df_clasificacion_vehiculo,
        "tendencia_diaria": df_tendencia_diaria,
        "ranking_lideres": df_ranking_lideres
    }



In [0]:
# Borra los datos de la fecha actual de las tablas gold, antes de volver a escribir los datos de la última ejecución

spark.sql("""
DELETE FROM catalog_dev.golden.clasificacion_vehiculo
WHERE DATE(from_utc_timestamp(Fecha_Ejecucion, 'America/Bogota')) = DATE(from_utc_timestamp(current_timestamp(), 'America/Bogota'))
""")

spark.sql("""
DELETE FROM catalog_dev.golden.ranking_lideres
WHERE DATE(from_utc_timestamp(Fecha_Ejecucion, 'America/Bogota')) = DATE(from_utc_timestamp(current_timestamp(), 'America/Bogota'))
""")

spark.sql("""
DELETE FROM catalog_dev.golden.tendencia_diaria
WHERE DATE(from_utc_timestamp(Fecha_Ejecucion, 'America/Bogota')) = DATE(from_utc_timestamp(current_timestamp(), 'America/Bogota'))
""")

spark.sql("""
DELETE FROM catalog_dev.golden.zona_estado
WHERE DATE(from_utc_timestamp(Fecha_Ejecucion, 'America/Bogota')) = DATE(from_utc_timestamp(current_timestamp(), 'America/Bogota'))
""")


DataFrame[num_affected_rows: bigint]

In [0]:
# carga los datos de los 4 dataframes a las tablas gold
agregaciones = generar_agregaciones_gold(df_bitacora)

for nombre, df_agregado in agregaciones.items():
    (
        df_agregado.write
        .format("delta")
        .option("mergeSchema", "true")
        .mode("append")
        .saveAsTable(f"{catalogo}.{esquema_sink}.{nombre}")
    )