PROYECTO PROVIA. Autoría VEGALSA-EROSKI, Torusware & CITIC-UDC. Este notebook se conecta a la API de Open-Meteo, utiliza las coordenadas de determinados municipios para descargar datos meteorológicos —como temperaturas, lluvias o radiación solar— tanto del pasado como de los próximos días, y guarda toda esa información en una tabla

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType
from pyspark.sql import functions as F
from datetime import date, timedelta, datetime
import requests
import time
from delta.tables import DeltaTable
from concurrent.futures import ThreadPoolExecutor, as_completed
from pyspark.sql import Window

In [0]:
table_name = "pre_silver.previsionaux.clima"

In [0]:
coords_manual = {
    # "SIERO": (43.3923, -5.6633, "Asturias", "SIERO"),
    # "VILA DE CRUCES": (42.7190, -8.2355, "Galicia", "VILA DE CRUCES"),
    # "PONTES DE GARCÍA RODRÍGUEZ, AS": (43.4527,-7.8486,"Galicia","PONTES DE GARCÍA RODRÍGUEZ, AS"),
    # "LEÓN": (42.5987, -5.5671, "Castilla y León", "LEÓN"),
    # "CARBALLIÑO, O": (42.4316, -8.0790, "Galicia", "CARBALLIÑO, O"),
    # "GOZÓN": (43.6403, -5.8186, "Asturias", "GOZÓN"),
    # "BAÑEZA, LA": (42.2983, -5.9039, "Castilla y León", "BAÑEZA, LA"),
    # "POBRA DO CARAMIÑAL, A": (42.6025, -8.9323, "Galicia", "POBRA DO CARAMIÑAL, A"),
    # "LARACHA, A": (43.2538, -8.5853, "Galicia", "LARACHA, A"),
    # "AMES": (42.9029, -8.6368, "Galicia", "AMES"),
    # "NARÓN": (43.5017, -8.1908, "Galicia", "NARÓN"),
    # "RÚA, A": (42.3900, -7.1093, "Galicia", "RÚA, A"),
    # "BARCO DE VALDEORRAS, O": (42.4162, -6.9899, "Galicia", "BARCO DE VALDEORRAS, O"),
    # "OLEIROS": (43.3328, -8.3127, "Galicia", "OLEIROS"),
    # "GROVE, O": (42.4950, -8.8650, "Galicia", "GROVE, O"),
    # "SADA": (43.3553, -8.2534, "Galicia", "SADA"),
    # "MARÍN": (42.3913, -8.6988, "Galicia", "MARÍN"),
    # "BAIONA": (42.1184, -8.8492, "Galicia", "BAIONA"),
    # "ESTRADA, A": (42.6872, -8.4938, "Galicia", "ESTRADA, A"),
    # "GUARDA, A": (41.9024, -8.8738, "Galicia", "GUARDA, A"),
    # "SILLEDA": (42.6994, -8.2366, "Galicia", "SILLEDA"),
    # "CEE": (42.9490, -9.1902, "Galicia", "CEE"),
    "CORUÑA, A": (43.3623, -8.4115, "Galicia", "CORUÑA, A"),
}

In [0]:
schema = StructType(
    [
        StructField("time", StringType(), True),
        StructField("provincia", StringType(), True),
        StructField("tavg", DoubleType(), True),
        StructField("tmax", DoubleType(), True),
        StructField("tmin", DoubleType(), True),
        StructField("prcp_mm", DoubleType(), True),
        StructField("solar_mj", DoubleType(), True),
        StructField("municipio", StringType(), True),
    ]
)

In [0]:
def get_coordinates(name):
    """
    Busca las coordenadas (latitud, longitud y provincia) de un municipio usando la API de Open-Meteo.
    Si el municipio está guardado en el diccionario coords_manual, usa esos datos directamente
    """
    if name in coords_manual:
        return coords_manual[name]

    params = {"name": name, "count": 1, "language": "es", "country": "ES"}
    r = safe_get("https://geocoding-api.open-meteo.com/v1/search",
                 retries=3, delay=5,
    ) if False else requests.get("https://geocoding-api.open-meteo.com/v1/search",
                                 params=params, timeout=15).json()
    res = r.get("results") if r else None
    if not res:
        print(f"No coords para: {name}")
        return None
    lat, lon = res[0]["latitude"], res[0]["longitude"]
    prov = res[0].get("admin1", "")
    return lat, lon, prov, name

In [0]:
def build_url(lat, lon, mode, start=None, end=None):
    """
    Construye la dirección URL de la API de Open-Meteo según si se piden datos históricos (“archive”) o de previsión (“forecast”).
    """
    base = (
        "https://api.open-meteo.com/v1"
        if mode == "forecast"
        else "https://archive-api.open-meteo.com/v1"
    )
    path = "forecast" if mode == "forecast" else "archive"
    daily = "temperature_2m_max,temperature_2m_min,precipitation_sum,shortwave_radiation_sum"
    url = f"{base}/{path}?latitude={lat}&longitude={lon}&daily={daily}"
    if mode == "forecast":
        url += "&forecast_days=14&model=ecmwf_ifs05"
    else:
        url += f"&start_date={start}&end_date={end}"
    return url + "&timezone=UTC"

In [0]:
def safe_get(url: str, retries=5):
    """
    Descarga los datos de una dirección web con varios retrys por si hay errores.
    """
    for n in range(1, retries + 1):
        try:
            start = time.time()
            response = requests.get(url, timeout=10)
            duration = time.time() - start

            if duration > 10:
                print(f"Advertencia: respuesta lenta ({duration:.1f}s). Esperando 60s...")
                time.sleep(60)

            return response.json()

        except Exception as e:
            if n == retries:
                print(f"Fallo definitivo {url} – {e}")
            else:
                print(f"Fallo {n}/{retries} – esperando 60s antes de reintentar")
                time.sleep(60)
    return None

In [0]:
def build_weather_rows(daily_data, provincia, municipio):
    """
    Convierte los datos diarios que devuelve la API en filas
    """
    rows = []
    for i, d in enumerate(daily_data["time"]):
        tmax  = daily_data["temperature_2m_max"][i]
        tmin  = daily_data["temperature_2m_min"][i]
        if tmax is None or tmin is None:
            continue
        rain  = float(daily_data["precipitation_sum"][i] or 0.0)
        solar = float(daily_data["shortwave_radiation_sum"][i] or 0.0)
        tavg  = round((tmax + tmin) / 2, 1)
        rows.append((
            f"{d}T00:00:00Z", provincia,
            tavg, tmax, tmin,
            rain, solar,
            municipio
        ))
    return rows

In [0]:
def get_weather_data(lat, lon, provincia, municipio, start_date=None):
    """
    Usa las funciones anteriores para descargar tanto los datos históricos como la previsión del tiempo para un municipio.
    Devuelve todas las filas listas para guardar en una tabla.
    """
    today = date.today()
    start = start_date if start_date else date(2021, 1, 1)
    url_hist = build_url(lat, lon, "archive", start=start, end=today)
    url_fc = build_url(lat, lon, "forecast")

    hist = safe_get(url_hist)
    fc = safe_get(url_fc)

    hist_daily = build_weather_rows(hist["daily"], provincia, municipio) if hist and "daily" in hist else []
    fc_daily = build_weather_rows(fc["daily"], provincia, municipio) if fc and "daily" in fc else []

    return hist_daily + fc_daily

Comprobamos si la tabla de clima ya existe y obtenemos la última fecha registrada por cada municipio

In [0]:
if spark.catalog.tableExists(table_name):
    clima_actual = spark.table(table_name)
    clima_max_fecha = (
        clima_actual
        .groupBy("municipio")
        .agg(F.max("time").alias("max_fecha"))
        .withColumn("desde_fecha", F.date_sub("max_fecha", 14))
    )
else:
    clima_max_fecha = spark.createDataFrame([], schema=StructType([
        StructField("municipio", StringType(), True),
        StructField("max_fecha", StringType(), True),
        StructField("desde_fecha", StringType(), True),
    ]))

fechas_dict = {row["municipio"]: row["desde_fecha"] for row in clima_max_fecha.collect()}

Definimos la lista de municipios a procesar y descargamos sus datos meteorológicos (históricos y previsiones) desde la API de Open-Meteo

In [0]:
municipios_list = ['CORUÑA, A']

In [0]:
rows_all, errores = [], []

for muni in municipios_list:
    coords = get_coordinates(muni)
    print(f"{muni} → {coords}")
    if not coords:
        errores.append(muni)
        continue

    lat, lon, prov, _ = coords
    start_date = fechas_dict.get(muni, date(2021, 1, 1))
    if isinstance(start_date, str):
        start_date = datetime.strptime(start_date, "%Y-%m-%d").date()

    datos = get_weather_data(lat, lon, prov, muni, start_date=start_date)
    rows_all.extend(datos)
    # time.sleep(60)
    print(f"Datos para {muni} desde {start_date} ({len(datos)} filas)")

In [0]:
if errores:
    print("Municipios sin datos por timeout:", errores)

Convertimos los resultados en un df y creamos o actualizamos la tabla delta con la información más reciente

In [0]:
clima_df = spark.createDataFrame(rows_all, schema=schema)
clima_df = clima_df.withColumn("time", F.to_date("time"))

In [0]:
window = Window.partitionBy("municipio", "provincia", "time").orderBy(F.desc("prcp_mm"))
# solo nos quedamos con el último registro por día que tenga precipitaciones distintas a 0
# esto es debido a que la api me devuelve filas superpuestas de historico + prevision
clima_df = clima_df.withColumn("rn", F.row_number().over(window)).filter("rn = 1").drop("rn") 
clima_df = clima_df.dropDuplicates(["municipio", "provincia", "time"])

In [0]:
if not spark.catalog.tableExists(table_name):
    clima_df.write.format("delta").mode("overwrite").saveAsTable(table_name)
else:
    delta_target = DeltaTable.forName(spark, table_name)
    (
        delta_target.alias("target")
        .merge(
            clima_df.alias("source"),
            """
            target.municipio = source.municipio AND
            target.time = source.time AND
            target.provincia = source.provincia
            """
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

In [0]:
if errores:
    print("Municipios con errores:", errores)