## Script Modificado

In [1]:
# Librerias a usar
import requests
import pandas as pd
import os
from pyspark.sql.functions import col, when, lit, to_date, concat, lpad
from io import StringIO
from pyspark.sql.functions import col
from functools import reduce
from delta.tables import DeltaTable
from pyspark.sql.window import Window
from pyspark.sql.utils import AnalysisException
from pyspark.sql.functions import row_number, monotonically_increasing_id, col, max as spark_max
requests.packages.urllib3.disable_warnings()

StatementMeta(, d4a5bc0d-7cf5-4ec8-8634-dbbe7d1b5f1c, 3, Finished, Available, Finished)

In [2]:
# Definir periodo de tiempo
start_year = 2020
current_year = 2025
years = list(range(start_year, current_year + 1))
ANOS_ACTUALIZAR = [2023, 2024, 2025]

# Ruta de la tabla Hechos
ruta_hechos = "Tables/Hechos"

#Definir columnas de análisis
columnas_requeridas = [
    'ANO_EJE', 'MES_EJE', 'NIVEL_GOBIERNO_NOMBRE', 'SECTOR_NOMBRE',
    'PLIEGO_NOMBRE', 'EJECUTORA_NOMBRE', 'DEPARTAMENTO_EJECUTORA_NOMBRE',
    'PROVINCIA_EJECUTORA_NOMBRE', 'DISTRITO_EJECUTORA_NOMBRE',
    'PROGRAMA_PPTO_NOMBRE', 'TIPO_ACT_PROY_NOMBRE',
    'PRODUCTO_PROYECTO_NOMBRE', 'ACTIVIDAD_ACCION_OBRA_NOMBRE', 'MONTO_PIA',
    'MONTO_PIM', 'MONTO_CERTIFICADO', 'MONTO_COMPROMETIDO_ANUAL',
    'MONTO_COMPROMETIDO', 'MONTO_DEVENGADO', 'MONTO_GIRADO'
]

StatementMeta(, d4a5bc0d-7cf5-4ec8-8634-dbbe7d1b5f1c, 4, Finished, Available, Finished)

In [3]:
# Verificar si la tabla ya existe
try:
    hechos_delta = DeltaTable.forPath(spark, ruta_hechos)
    tabla_existe = True
    print("📌 La tabla Hechos ya existe. Solo se descargarán los datos de 2023, 2024 y 2025.")
    years = ANOS_ACTUALIZAR  # Limitar la descarga a estos años
except AnalysisException:
    print("⚠️ La tabla Hechos aún no existe. Se procesarán todos los años desde 2020.")
    tabla_existe = False

dfs = []

for year in years:
    url_csv = f'https://fs.datosabiertos.mef.gob.pe/datastorefiles/{year}-Gasto-COVID-19.csv'
    
    try:
        response = requests.get(url_csv, verify=False, timeout=10)
        response.raise_for_status()
        
        # Leer CSV en Pandas
        df_pandas = pd.read_csv(StringIO(response.text), usecols=columnas_requeridas, low_memory=False)
        
        # Convertir Pandas a Spark DataFrame
        df_spark = spark.createDataFrame(df_pandas)
        
        dfs.append(df_spark)
        print(f"Datos de {year} descargados correctamente. Registros: {df_spark.count()}")
    except Exception as e:
        print(f"Error al descargar {year}: {e}")


StatementMeta(, d4a5bc0d-7cf5-4ec8-8634-dbbe7d1b5f1c, 5, Finished, Available, Finished)

📌 La tabla Hechos ya existe. Solo se descargarán los datos de 2023, 2024 y 2025.
Datos de 2023 descargados correctamente. Registros: 35426
Datos de 2024 descargados correctamente. Registros: 3302
Datos de 2025 descargados correctamente. Registros: 370


In [4]:
# Unir todos los dataframes
if dfs:
    df = reduce(lambda df1, df2: df1.union(df2), dfs)
    print(f"Datos totales obtenidos: {df.count()} registros.")

# Ver estructura
#df.printSchema()

# Ver registros
#display(df)

StatementMeta(, d4a5bc0d-7cf5-4ec8-8634-dbbe7d1b5f1c, 6, Finished, Available, Finished)

Datos totales obtenidos: 39098 registros.


In [5]:
def create_or_update_dim_table(df, columns, id_column, table_path):    
    # Seleccionar y eliminar duplicados en los datos nuevos
    df_dim = df.select(*columns).dropDuplicates()

    # Agregar columnas solo para DimTiempo
    if "DimTiempo" in table_path:
        df_dim = df_dim.withColumn(
            "Es_Presupuesto_Inicial", when(col("MES_EJE") == 0, lit(True)).otherwise(lit(False))
        ).withColumn(
            "Fecha_Referencia",
            when(
                col("MES_EJE") == 0,
                to_date(concat(col("ANO_EJE").cast("string"), lit("-01-01")), "yyyy-MM-dd")
            ).otherwise(
                to_date(concat(col("ANO_EJE").cast("string"), lit("-"), lpad(col("MES_EJE").cast("string"), 2, "0"), lit("-01")), "yyyy-MM-dd")
            )
        )
    
    # Revisar si la Delta Table ya existe
    try:
        delta_table = DeltaTable.forPath(spark, table_path)  # Cargar la tabla Delta existente
        existing_df = delta_table.toDF().select(*columns).dropDuplicates()  # Mantener solo las columnas relevantes

        # Filtrar solo los registros nuevos
        df_new = df_dim.join(existing_df, columns, "left_anti")  

        if df_new.isEmpty():
            print(f"✅ No hay datos nuevos para insertar en {table_path}")
            return

        # Obtener el último ID generado
        last_id = delta_table.toDF().select(spark_max(id_column)).collect()[0][0]
        last_id = last_id if last_id is not None else 0

        # Asignar IDs solo a los nuevos datos
        window_spec = Window.orderBy(monotonically_increasing_id())
        df_new = df_new.withColumn(id_column, row_number().over(window_spec) + last_id)

        # Escribir solo los datos nuevos
        delta_table.alias("existing").merge(
            df_new.alias("new"),
            " AND ".join([f"existing.{col} = new.{col}" for col in columns])
        ).whenNotMatchedInsertAll().execute()

        print(f"🔄 {df_new.count()} registros nuevos insertados en {table_path}")

    except Exception as e:
        # Si la tabla no existe, crearla desde cero
        df_dim = df_dim.withColumn(id_column, row_number().over(Window.orderBy(monotonically_increasing_id())))
        df_dim.write.format("delta").mode("overwrite").save(table_path)
        
        print(f"✅ Creación exitosa en {table_path}")


StatementMeta(, d4a5bc0d-7cf5-4ec8-8634-dbbe7d1b5f1c, 7, Finished, Available, Finished)

In [6]:
# Columnas de cada dimensión
dim_tables = [
    ("DimTiempo", ["ANO_EJE", "MES_EJE"], "id_tiempo"),
    ("DimUbicacion", ["DEPARTAMENTO_EJECUTORA_NOMBRE", "PROVINCIA_EJECUTORA_NOMBRE", "DISTRITO_EJECUTORA_NOMBRE"], "id_ubicacion"),
    ("DimEntidad", ["NIVEL_GOBIERNO_NOMBRE", "SECTOR_NOMBRE", "PLIEGO_NOMBRE", "EJECUTORA_NOMBRE"], "id_entidad"),
    ("DimActividad", ["PROGRAMA_PPTO_NOMBRE", "TIPO_ACT_PROY_NOMBRE", "PRODUCTO_PROYECTO_NOMBRE", "ACTIVIDAD_ACCION_OBRA_NOMBRE"], "id_actividad")
]

# Ejecutar la función para cada dimensión
for table_name, columns, id_col in dim_tables:
    table_path = f"Tables/{table_name}"
    create_or_update_dim_table(df, columns, id_col, table_path)

StatementMeta(, d4a5bc0d-7cf5-4ec8-8634-dbbe7d1b5f1c, 8, Finished, Available, Finished)

✅ No hay datos nuevos para insertar en Tables/DimTiempo
✅ No hay datos nuevos para insertar en Tables/DimUbicacion
✅ No hay datos nuevos para insertar en Tables/DimEntidad
✅ No hay datos nuevos para insertar en Tables/DimActividad


In [7]:
# Verificar si la tabla Hechos ya existe
try:
    hechos_delta = DeltaTable.forPath(spark, ruta_hechos)
    tabla_existe = True
except AnalysisException:
    print("⚠️ La tabla Hechos aún no existe. Se creará por primera vez.")
    tabla_existe = False

# Si la tabla YA EXISTE: eliminar datos de los años 2023, 2024, 2025 y volver a insertar
if tabla_existe:
    print("📌 Historial de versiones de la tabla Hechos antes de la actualización:")
    hechos_delta.history().select("version", "timestamp", "operation").show(5)

    print("📌 Eliminando registros de los años 2023, 2024 y 2025 en Hechos...")

    # Cargar DimTiempo para obtener los id_tiempo de los años a actualizar
    dim_tiempo = spark.read.format("delta").load("Tables/DimTiempo")
    id_tiempos_actualizar = dim_tiempo.filter(col("ANO_EJE").isin(ANOS_ACTUALIZAR)).select("id_tiempo")

    # Convertir los IDs a una lista para eliminar en Delta Table
    id_tiempo_lista = [row["id_tiempo"] for row in id_tiempos_actualizar.collect()]

    if id_tiempo_lista:
        print(f"📌 Eliminando registros con id_tiempo en: {id_tiempo_lista}")
        hechos_delta.delete(col("id_tiempo").isin(id_tiempo_lista))
        print("✅ Registros antiguos eliminados.")

        # Insertar nueva data filtrada solo para los años a actualizar
        df_hechos_nuevo = df.filter(col("ANO_EJE").isin(ANOS_ACTUALIZAR)) \
            .join(dim_tiempo, ["ANO_EJE", "MES_EJE"], "left") \
            .join(spark.read.format("delta").load("Tables/DimUbicacion"), ["DEPARTAMENTO_EJECUTORA_NOMBRE", "PROVINCIA_EJECUTORA_NOMBRE", "DISTRITO_EJECUTORA_NOMBRE"], "left") \
            .join(spark.read.format("delta").load("Tables/DimEntidad"), ["NIVEL_GOBIERNO_NOMBRE", "SECTOR_NOMBRE", "PLIEGO_NOMBRE", "EJECUTORA_NOMBRE"], "left") \
            .join(spark.read.format("delta").load("Tables/DimActividad"), ["PROGRAMA_PPTO_NOMBRE", "TIPO_ACT_PROY_NOMBRE", "PRODUCTO_PROYECTO_NOMBRE", "ACTIVIDAD_ACCION_OBRA_NOMBRE"], "left") \
            .select(
                "id_tiempo", "id_ubicacion", "id_entidad", "id_actividad",
                "MONTO_PIA", "MONTO_PIM", "MONTO_CERTIFICADO", "MONTO_COMPROMETIDO",
                "MONTO_DEVENGADO", "MONTO_GIRADO"
            )

        # Insertar registros nuevos en la tabla Hechos
        df_hechos_nuevo.write.format("delta") \
            .mode("append") \
            .partitionBy("id_tiempo") \
            .save(ruta_hechos)

        print("✅ Tabla de hechos actualizada correctamente para los años 2023, 2024 y 2025.")
        
        # Ver historial después de la actualización
        print("📌 Historial de versiones de la tabla Hechos después de la actualización:")
        hechos_delta.history().select("version", "timestamp", "operation").show(5)
    else:
        print("⚠️ No se encontraron id_tiempo para los años seleccionados.")

# Si la tabla NO EXISTE: Crear desde cero
else:
    print("📌 Creando la tabla Hechos por primera vez...")

   # Unir IDs con la tabla de hechos
    df_hechos = df \
        .join(spark.read.format("delta").load("Tables/DimTiempo"), ["ANO_EJE", "MES_EJE"], "left") \
        .join(spark.read.format("delta").load("Tables/DimUbicacion"), ["DEPARTAMENTO_EJECUTORA_NOMBRE", "PROVINCIA_EJECUTORA_NOMBRE", "DISTRITO_EJECUTORA_NOMBRE"], "left") \
        .join(spark.read.format("delta").load("Tables/DimEntidad"), ["NIVEL_GOBIERNO_NOMBRE", "SECTOR_NOMBRE", "PLIEGO_NOMBRE", "EJECUTORA_NOMBRE"], "left") \
        .join(spark.read.format("delta").load("Tables/DimActividad"), ["PROGRAMA_PPTO_NOMBRE", "TIPO_ACT_PROY_NOMBRE", "PRODUCTO_PROYECTO_NOMBRE", "ACTIVIDAD_ACCION_OBRA_NOMBRE"], "left") \
        .select(
            "id_tiempo", "id_ubicacion", "id_entidad", "id_actividad",
            "MONTO_PIA", "MONTO_PIM", "MONTO_CERTIFICADO", "MONTO_COMPROMETIDO",
            "MONTO_DEVENGADO", "MONTO_GIRADO"
        )


    # Guardar en Delta optimizado
    df_hechos.write.format("delta") \
        .mode("overwrite") \
        .partitionBy("id_tiempo") \
        .save(ruta_hechos)

    print("✅ Tabla de hechos creada por primera vez.")


StatementMeta(, d4a5bc0d-7cf5-4ec8-8634-dbbe7d1b5f1c, 9, Finished, Available, Finished)

📌 Historial de versiones de la tabla Hechos antes de la actualización:
+-------+--------------------+---------+
|version|           timestamp|operation|
+-------+--------------------+---------+
|      2|2025-03-21 23:50:...|    WRITE|
|      1|2025-03-21 23:50:...|   DELETE|
|      0|2025-03-21 21:51:...|    WRITE|
+-------+--------------------+---------+

📌 Eliminando registros de los años 2023, 2024 y 2025 en Hechos...
📌 Eliminando registros con id_tiempo en: [40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69]
✅ Registros antiguos eliminados.
✅ Tabla de hechos actualizada correctamente para los años 2023, 2024 y 2025.
📌 Historial de versiones de la tabla Hechos después de la actualización:
+-------+--------------------+---------+
|version|           timestamp|operation|
+-------+--------------------+---------+
|      4|2025-03-22 00:01:...|    WRITE|
|      3|2025-03-22 00:01:...|   DELETE|
|      2|2025-03-21 23:50

In [12]:
from datetime import datetime
import pytz
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

# Obtener la fecha y hora actual en UTC
utc_now = datetime.utcnow()

# Convertir a la zona horaria de Lima (UTC-5)
peru_tz = pytz.timezone("America/Lima")
fecha_actualizacion = utc_now.replace(tzinfo=pytz.utc).astimezone(peru_tz).strftime("%Y-%m-%d %H:%M:%S")

# Crear DataFrame con la fecha de actualización
df_update = spark.createDataFrame([(fecha_actualizacion,)], ["fecha_actualizacion"])

# Guardar en el Lakehouse en la ruta Tables/update_log (sobrescribe con cada ejecución)
df_update.write.mode("overwrite").format("delta").save("Tables/update_log")

print("✅ Fecha de actualización registrada correctamente:", fecha_actualizacion)



StatementMeta(, d4a5bc0d-7cf5-4ec8-8634-dbbe7d1b5f1c, 14, Finished, Available, Finished)

✅ Fecha de actualización registrada correctamente: 2025-03-21 19:04:37
