In [0]:
# Librerías 

from pyspark.sql.functions import (
    col, from_json, explode, arrays_zip, to_timestamp, date_format, hour, expr, current_timestamp, lit, round, coalesce, regexp_replace, 
)
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DoubleType
import logging
import pandas as pd

In [0]:
# Logger para avisar

logging.basicConfig(level=logging.INFO, format='%(asctime)s | %(levelname)s | %(message)s', force=True)
logger = logging.getLogger("ETL_SILVER")

# Rutas de las tablas Bronze

TABLE_BRONZE_CLIMA = "fire_risk_project.01_bronze.bronze_open_meteo"

# Ruta tabla Silver

TABLE_SILVER_CLIMA = "fire_risk_project.02_silver.silver_open_meteo"
TABLE_SILVER_GRID = "fire_risk_project.02_silver.silver_grid_master" # Necesito transportar el id_grid

In [0]:
# Transformación 
logger.info(f"Procesando Silver: {TABLE_BRONZE_CLIMA} -> {TABLE_SILVER_CLIMA}")

df_clima_raw = spark.read.table(TABLE_BRONZE_CLIMA)

# C. Desagrupado de filas por hora. Explode
df_clima_exploded = (df_clima_raw
    .select(
        col("source_filename"),
        round(col("request_latitude").cast(DoubleType()), 1).alias("latitude"),
        round(col("request_longitude").cast(DoubleType()), 1).alias("longitude"),
        round(col("elevation_val").cast(DoubleType()), 2).alias("elevation"),
        # EXPLODE + ARRAYS_ZIP
        explode(arrays_zip(
            # Al poner .alias aqui dentro, controlamos el nombre en el struct resultante
            coalesce(col("hourly.time"), col("hourly.date")).alias("time_unified"), 
            col("hourly.temperature_2m"),
            col("hourly.relative_humidity_2m"),
            col("hourly.vapour_pressure_deficit"),
            col("hourly.precipitation"),
            col("hourly.wind_speed_10m"),
            col("hourly.wind_direction_10m"),
            col("hourly.wind_gusts_10m"),
            col("hourly.et0_fao_evapotranspiration"),
            col("hourly.soil_moisture_0_to_7cm"),
            col("hourly.soil_moisture_28_to_100cm"),
            col("hourly.weather_code"),
            col("hourly.snow_depth")
        )).alias("zipped")
    )

    # SELECCIÓN FINAL (Aquí corregimos el error de índices)
    .select(
        col("source_filename"),
        col("latitude"),
        col("longitude"),
        col("elevation"),
        # Accedemos por NOMBRE, no por número (excepto el que renombramos)
        col("zipped.time_unified").alias("timestamp_clima_str"),
        col("zipped.temperature_2m"),            # Spark conservó el nombre original
        col("zipped.relative_humidity_2m"),
        col("zipped.vapour_pressure_deficit"),
        col("zipped.precipitation"),
        col("zipped.wind_speed_10m"),
        col("zipped.wind_direction_10m"),
        col("zipped.wind_gusts_10m"),
        col("zipped.et0_fao_evapotranspiration"),
        col("zipped.soil_moisture_0_to_7cm"),
        col("zipped.soil_moisture_28_to_100cm"),
        col("zipped.weather_code"),
        col("zipped.snow_depth")
    )
    
    # Limpieza de fecha
    .withColumn("timestamp_clima", to_timestamp(regexp_replace(col("timestamp_clima_str"), "T", " ")))
    .withColumn("fecha_join", col("timestamp_clima").cast("date"))
    .withColumn("hora_join", hour(col("timestamp_clima")))
    .drop("timestamp_str")
)

# Escritura
df_clima_exploded.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(TABLE_SILVER_CLIMA)

logger.info(f"Tabla Silver Clima guardada en {TABLE_SILVER_CLIMA}")

In [0]:
%sql 
-- Agregamos el grid_id al frame
CREATE OR REPLACE TABLE fire_risk_project.02_silver.silver_open_meteo AS
SELECT w.*, g.grid_id FROM fire_risk_project.02_silver.silver_open_meteo w
LEFT JOIN fire_risk_project.02_silver.silver_grid_master g
ON w.latitude = g.latitude_centroid AND w.longitude = g.longitude_centroid

In [0]:
# Verificación 

df_clima_check = spark.read.table(TABLE_SILVER_CLIMA)
display(df_clima_check.limit(5))

In [0]:
# Ordenamiento y limpieza del df

df_clima_final = df_clima_check.select(
    # 1. Claves Principales
    col("grid_id"),
    col("timestamp_clima"),
    col("fecha_join"),
    col("hora_join"),
    
    # 2. Features
    col("temperature_2m"),
    col("relative_humidity_2m"),
    col("vapour_pressure_deficit"),
    col("precipitation"),
    col("wind_speed_10m"),
    col("wind_direction_10m"),
    col("wind_gusts_10m"),
    col("et0_fao_evapotranspiration"),
    col("soil_moisture_0_to_7cm"),
    col("soil_moisture_28_to_100cm"),
    col("weather_code"),
    col("snow_depth"),
    
    # 3. Metadatos y Referencias
    col("elevation"),
    col("latitude"),
    col("longitude"),
    col("source_filename") 
)

df_clima_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(TABLE_SILVER_CLIMA)


In [0]:
%sql
-- Z ordering para optimizar
OPTIMIZE fire_risk_project.02_silver.silver_open_meteo ZORDER BY (grid_id, timestamp_clima);

In [0]:
%sql
-- Segunda verificación

SELECT * FROM fire_risk_project.02_silver.silver_open_meteo LIMIT 50;

In [0]:
import builtins

# Revisión final de contenidos de la tabla
TABLE_TARGET = "fire_risk_project.02_silver.silver_open_meteo"
EXPECTED_GRID_POINTS = 160
EXPECTED_DAYS = 2191 
EXPECTED_HOURS = 24
EXPECTED_TOTAL_ROWS = EXPECTED_GRID_POINTS * EXPECTED_DAYS * EXPECTED_HOURS

print(f"INICIANDO AUDITORIA DE: {TABLE_TARGET}")
print(f"OBJETIVOS: {EXPECTED_GRID_POINTS} puntos | {EXPECTED_DAYS} dias | {EXPECTED_HOURS} horas")
print(f"TOTAL FILAS ESPERADAS: {EXPECTED_TOTAL_ROWS:,}")
print("-" * 60)

# Lectura
df = spark.read.table(TABLE_TARGET)

# ESTRUCTURA Y VOLUMEN

print("\nVOLUMEN TOTAL")
total_rows_actual = df.count()
diff = total_rows_actual - EXPECTED_TOTAL_ROWS

if total_rows_actual == EXPECTED_TOTAL_ROWS:
    print(f"STATUS: OK. La tabla tiene exactamente {total_rows_actual:,} filas.")
else:
    print(f"STATUS: ALERTA. La tabla tiene {total_rows_actual:,} filas.")
    print(f"Diferencia: {diff:+,} filas.")

# GEOGRAFIA
print("\nVALIDACION GEOGRAFICA")
puntos_unicos = df.select("latitude", "longitude").distinct().count()
print(f"Cantidad de Puntos Geograficos Unicos: {puntos_unicos}")

if puntos_unicos == EXPECTED_GRID_POINTS:
    print(f"STATUS: OK. Coincide con los {EXPECTED_GRID_POINTS} puntos esperados.")
else:
    print(f"STATUS: ALERTA. Se encontraron {puntos_unicos} puntos.")

coords_nulas = df.filter(F.col("latitude").isNull() | F.col("longitude").isNull()).count()
if coords_nulas == 0:
    print("STATUS: OK. No hay coordenadas nulas.")
else:
    print(f"STATUS: ERROR CRITICO. Hay {coords_nulas} filas sin coordenadas.")

# CONTINUIDAD TEMPORAL (DIAS Y HORAS)
print("\nCONTINUIDAD TEMPORAL")

# A. Dias por Grid
print("dias por Grid ID")
df_days_check = df.groupBy("grid_id").agg(F.countDistinct("fecha_join").alias("dias_totales"))
grids_bad = df_days_check.filter(F.col("dias_totales") != EXPECTED_DAYS)
count_grids_bad = grids_bad.count()

if count_grids_bad == 0:
    print(f"STATUS: OK. Todos los puntos tienen {EXPECTED_DAYS} dias.")
else:
    print(f"STATUS: ALERTA. {count_grids_bad} puntos tienen dias incompletos.")
    display(grids_bad.limit(10))

# B. Horas por Dia
print("Horas por Dia")
df_hours_check = df.groupBy("grid_id", "fecha_join").count().withColumnRenamed("count", "horas_registradas")
days_bad = df_hours_check.filter(F.col("horas_registradas") != EXPECTED_HOURS)
count_days_bad = days_bad.count()

if count_days_bad == 0:
    print(f"STATUS: OK. Todos los dias tienen {EXPECTED_HOURS} horas.")
else:
    print(f"STATUS: ALERTA. {count_days_bad} combinaciones Grid-Dia no tienen 24 horas.")
    display(days_bad.limit(10))

# FEATURES
print("\nFEATURES")

cols_exclude = ["grid_id", "timestamp_clima", "fecha_join", "hora_join", "source_filename"]
feature_cols = [c for c in df.columns if c not in cols_exclude]

# Calculo
null_exprs = [F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in feature_cols]
results_row = df.select(null_exprs).collect()[0]

data = []
for col_name in feature_cols:
    null_count = results_row[col_name]
    null_pct = (null_count / total_rows_actual) * 100
    
    status = "OPTIMO"
    if null_pct > 0: status = "ATENCION"
    if null_pct > 5: status = "CRITICO (>5%)"
        
    data.append({
        "Variable": col_name,
        "Nulos_Cant": null_count,
        "Nulos_Pct": builtins.round(null_pct, 4),
        "Estado": status
    })

pdf_features = pd.DataFrame(data).sort_values(by="Nulos_Cant", ascending=False)
display(pdf_features)

# Conclusión
total_nulls = pdf_features["Nulos_Cant"].sum()
if total_nulls == 0:
    print("CONCLUSION: DATASET COMPLETO. Listo para usar.")
else:
    print(f"CONCLUSION: SE DETECTARON {total_nulls} NULOS. Revisar tabla anterior.")

In [0]:
# Finalizado el proceso tranformación de datos para datos Open Meteo