<a href="https://colab.research.google.com/github/sunnyday2/flight_on_time/blob/desarrollo/flightontime_hackaton_e32_alura_latam.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Kaggle Notebook para un pre-procesamiento pesado**

---

Dado a que Colab no tiene suficientes recursos para elaborar el dataset a base de millones registros, usa Kaggle para:
* Lectura de datasets grandes (Parquet, CSV > varios GB)
* Limpieza inicial
* Feature engineering
* Agregaciones
* Generaci√≥n de datasets intermedios

**Ventajas**
* Datasets montados nativamente (sin bugs de disco)
* M√°s I/O estable
* Entorno reproducible
* Ideal para EDA y pipelines de datos

## **1. üìö Librer√≠as y configuraci√≥n**
---

### Instalar las librer√≠as necesarias para leer datos y construir el dataset

In [None]:
!pip install kagglehub pyarrow requests

### Consolidar todas las librer√≠as en una sola secci√≥n para evitar imports repetidos

In [None]:
import pandas as pd
import numpy as np
import kagglehub
import pyarrow as pa
import pyarrow.parquet as pq
import os
import requests
import time
import math
import fcntl  # En Windows reemplaza por msvcrt.locking si lo necesitas

## **2. ‚¨áÔ∏è Descargar el dataset presentado por Rafael para el analisis y confeci√≥n de uno mas completo**

---

### Extraer porciones de data por a√±os y cargar por bloques

In [None]:
path = kagglehub.dataset_download("arvindnagaonkar/flight-delay")
print("Dataset descargado en:")
print(path)

# Lista los archivos disponibles dentro del dataset descargado
os.listdir(path)

# Construimos la ruta al archivo parquet
# ParquetFile permite leer el dataset sin cargarlo entero en memoria
PARQUET_FILE = os.path.join(path, "Flight_Delay.parquet")

OUT_FILE = "df_sample_equal_year.parquet"

BATCH_SIZE = 200_000
YEAR_COL = "Year"

# -------- 1) Primer pase: contar filas por Year (streaming) --------
pf = pq.ParquetFile(PARQUET_FILE)
counts = {}

for batch in pf.iter_batches(batch_size=BATCH_SIZE, columns=[YEAR_COL]):
    years = batch.column(0).to_numpy()
    uniq, cnt = np.unique(years, return_counts=True)
    for y, c in zip(uniq, cnt):
        counts[int(y)] = counts.get(int(y), 0) + int(c)

min_per_year = min(counts.values())  # mismo tama√±o por Year

# (opcional) si quieres una fracci√≥n del m√≠nimo:
# SAMPLE_FRAC = 0.10
# min_per_year = int(min_per_year * SAMPLE_FRAC)

print("Cantidad de registros en a√±o:", counts)
print("Registros por A√±o a guardar:", min_per_year)

# -------- 2) Segundo pase: construir salida con N filas por Year --------
pf = pq.ParquetFile(PARQUET_FILE)

kept = {y: 0 for y in counts.keys()}
writer = None

for batch in pf.iter_batches(batch_size=BATCH_SIZE):  # lee todas las columnas
    df_chunk = batch.to_pandas()

    # seleccionar filas manteniendo cupo por year
    parts = []
    for y, g in df_chunk.groupby(YEAR_COL, sort=False):
        y = int(y)
        remaining = min_per_year - kept.get(y, 0)
        if remaining <= 0:
            continue
        take = g.iloc[:remaining]  # determin√≠stico (primeras filas)
        kept[y] = kept.get(y, 0) + len(take)
        parts.append(take)

    if not parts:
        # si no hay nada que guardar de este batch, seguir
        continue

    out_df = pd.concat(parts, ignore_index=True)

    out_table = pa.Table.from_pandas(out_df, preserve_index=False)

    if writer is None:
        writer = pq.ParquetWriter(OUT_FILE, out_table.schema)
    writer.write_table(out_table)

    # early stop: si ya completaste todos los a√±os, corta
    if all(v >= min_per_year for v in kept.values()):
        break

if writer is not None:
    writer.close()

print("La generaci√≥n del archivo se ha terminado. Archivo creado: ", OUT_FILE)
print("Cantidades de las muestras seleccionadas: ", kept)

## **3. üßæ Leer el archivo de muestras**

---

### Convertir las muestras en un `DataFrame`

In [None]:
print ('Espere. Estoy leendo el archivo...')
df = pd.read_parquet(OUT_FILE)

print("La lectura ha terminado.")

### Cambiar los nombres de las columnas a min√∫sculas

In [None]:
df.columns = df.columns.str.lower()

## **4. üëÄ Inspecci√≥n visual de los datos**

---

### Visualizar las primeras 5 filas

In [None]:
df.head()

### ¬øCu√°ntas filas y columnas hay?

In [None]:
print(f"Cantidad de columnas y filas: {df.shape}")

### Qu√© tipo de datos contiene cada una: ¬øson n√∫meros, fechas, texto?

In [None]:
print(df.info())

### Que distribuci√≥n de data hay por cada a√±o

In [None]:
df["year"].value_counts(normalize=True)

### Estad√≠stica b√°sica: medias, medianas y desviaciones est√°ndar para entender la distribuci√≥n de los n√∫meros

In [None]:
df.describe()

## **5. üß© Creaci√≥n de nuevas variables (Feature Engineering)**

---



### Se crea la variable temporal `hour`, la variable objetivo `dalayed` si el retraso en salida esta igual o may√≥r de 15 minutos

In [None]:
# Crear variable temporal 'hour' a partir de la hora programada
df["hour"] = df.select_dtypes(include='number')["crsdeptime"] // 100  # solo la hora

# Variable objetivo 'delayed': 1 si el retraso en salida >= 15 min
df["delayed"] = (df.select_dtypes(include='number')["depdelay"] >= 15).astype(int)

# Probabilidad de delay en la muestra
delay_rate = df["delayed"].mean()
print(f"Tasa de retraso: {delay_rate:.4f}")

# Ver valores √∫nicos de la variable 'delayed'
unique_values = df["delayed"].unique()
print("Valores √∫nicos en 'delayed':", unique_values)

### Revisamos la probilidad de retraso en diferentes horas

In [None]:
# Agrupar por hora y calcular la probabilidad de retraso
hour_delay = (
    df.groupby("hour")["delayed"]
    .mean()
    .sort_index()
)

# Mostrar la tabla de probabilidades por hora
print(hour_delay)

hour_std = hour_delay.std()
print(f"\nLa probabilidad de retraso de los vuelos en promedio\na lo largo de las diferentes horas del d√≠a: {hour_std:.2%}")

### Crear bins de distancia

In [None]:
# Crear bins de distancia (5 quintiles)
df["distance_bin"] = pd.qcut(df.select_dtypes(include='number')["distance"], q=5)

# Calcular probabilidad de retraso por rango de distancia
distance_delay = (
    df.groupby("distance_bin", observed=True)["delayed"]
    .mean()
)

print(distance_delay)

### Crear columna con dia de la semana del vuelo

In [None]:
# Convertir a datetime (ya hecho en df, pero aseguramos si df_numeric no lo tiene)
# Asumimos que df_numeric y df tienen el mismo √≠ndice y n√∫mero de filas
# y que df['flightdate'] ya es datetime de un paso anterior
df["day_of_week"] = pd.to_datetime(df["flightdate"]).dt.dayofweek

print("Distribuci√≥n de vuelos puntuales y atrasados:")
df["delayed"].value_counts(normalize=True)

### Extraer c√≥digo de estado `state_clean` y de la `state_clean` ciudad a partir de `origincityname` para una fusi√≥n con las ubicaciones de los aeropuertos

In [None]:
# "Dallas, TX" -> "Dallas"
# Necesario para el geocoding

tmp = (
    df["origincityname"]
    .astype(str)
    .str.split(",", n=1, expand=True)
)

df["state_clean"] = tmp[0].str.strip()
df["state_clean"] = tmp[1].str.strip()  # lo que va despu√©s de la coma

df[["origincityname", "city_clean", "state_clean"]].head()

### Descargar y revisar datos con ubicaci√≥n de los aeropuertos

In [None]:
import pandas as pd

URL_AIRPORTS = "https://davidmegginson.github.io/ourairports-data/airports.csv"

# 1) Descargar desde la URL (queda en memoria como DataFrame)
df_airports = pd.read_csv(URL_AIRPORTS)  # pandas permite leer CSV directo desde URL [web:497]

df_airports.head()

In [None]:
df_airports.info()

## **6. üßπ Limpieza y preparaci√≥n de datos**

---

### Dejar solo los aeropuertos de aviacion civil con vuelos programados

In [None]:
# 3) (Opcional) Filtrar para aviaci√≥n civil "normal" (excluye heliports, seaplane)
allowed_types = {"small_airport", "medium_airport", "large_airport"}

df_airports = df_airports[
    df_airports["type"].isin(allowed_types) &
    df_airports["scheduled_service"].eq("yes")
].copy()

df_airports.head()

### Buscar la informaci√≥n de unos aeropuertos por nombres

In [None]:
pattern = r"Odessa|Sheremet|Murmansk|Domodedovo International Airport|Murmansk Airport"

airport = df_airports.loc[
    df_airports["name"].astype(str).str.contains(pattern, case=False, na=False, regex=True)
]

airport

### Revisar si la informaci√≥n de los aeropuertos tiene los valores faltantes

- Si iata_code esta nulo, lo reemplazamos por un valor no nulo extraido de una de las siguentes columnas `icao_code`, `gps_code` o `local_code`

In [None]:
df_airports["airport_code"] = (
    df_airports["iata_code"]
      .fillna(df_airports["icao_code"])
      .fillna(df_airports["gps_code"])
      .fillna(df_airports["local_code"])
)

df_airports["airport_code"].isna().mean()

### Muestra data con nulos

In [None]:
mask = df_airports["airport_code"].isna()

# inspecci√≥n r√°pida
df_airports.loc[mask, ["ident","type","name","iso_country","scheduled_service",
                          "iata_code","icao_code","gps_code","local_code"]].head(20)

### Extraer solo los aeropuertos con valores no nulos en el `airport_code`

In [None]:
df_airports = df_airports.loc[~mask].copy()

In [None]:
df_airports.info()

In [None]:
df_airports["airport_code"].isna().sum()

In [None]:
df_airports["airport_code"].sample(10)

### Filtrar los aeropuertos de los EEUU

In [None]:
# airports: DataFrame ya cargado desde airports.csv (OurAirports)
# airports = pd.read_csv(URL_AIRPORTS)

# 1) Filtrar solo filas donde iso_region empieza con "US-"
df_us = df_airports.loc[
    df_airports["iso_region"].astype(str).str.startswith("US-"),
].copy()  # str.startswith para filtrar prefijos [web:526]

# 2) Crear state_clean = parte despu√©s del guion (US-AL -> AL)
df_us["state_clean"] = (
    df_us["iso_region"]
      .astype(str)
      .str.split("-", n=1, expand=True)[1]
      .str.strip()
)  # split con expand para crear columnas [web:412][web:405]

# 3) city_clean = municipality (solo renombre/copia)
df_us["city_clean"] = df_us["municipality"].astype(str)
df_us["latitude"] = df_us["latitude_deg"]
df_us["longitude"] = df_us["longitude_deg"]

# 4) Seleccionar columnas finales
df_export = df_us[[
    "type", "name", "latitude", "longitude",
    "state_clean", "city_clean", "airport_code"
]].copy()


### Revisar la muestra de c√≥odigos de aeropuertos

In [None]:
df_export.head()

### Guardar aeropuertos limpiados en un archivo CSV

In [None]:
# 5) Exportar a CSV
df_export.to_csv("airports_us_clean.csv", index=False)

## **7. üõ¨ Cargar los aeropuertos depurados en un DataFrame y validarlos**

---

### Leer el archivo de aeropuertos

In [None]:
df_airports = pd.read_csv("airports_us_clean.csv")
df_airports.head()

### Unir el dataset de vuelos con las ubicaciones de los aeropuertos

In [None]:
#merge coordenadas y dataset

df = df.merge(df_airports, on=["city_clean", "state_clean"], how="left")
df.head()

In [None]:
df.shape

In [None]:
df.info()

### Eliminar valores nulos en los siguientes campos: `type`, `name`, `latitude`, `longitude`

In [None]:
cols_req = ["type", "name", "latitude", "longitude"]

print("Antes:", df.shape)
df_base = df.dropna(subset=cols_req, how="any").reset_index(drop=True)
print("Despu√©s:", df_base.shape)

### Tomamar una fracci√≥n del dataset por cada a√±o, porque consultar la API del clima es costoso en tiempo de ejecuci√≥n

In [None]:
# Subsample MVP
N = 20_000
frac = N / len(df)

df_base = (
    df.groupby("year", group_keys=False)
      .sample(frac=frac, random_state=42)
)

df_base.head()


### Guardamos esa fracci√≥n en un archivo CSV

In [None]:
# 5) Exportar a CSV
df_base.to_csv("df_base_merged_sample_and_airports.csv", index=False)

## **8. üå¶Ô∏è Consultar la API externa de clima hist√≥rico y generar un dataset de entrenamiento enriquecido**

---

### Enriquecer el dataset de entrenamiento con informaci√≥n clim√°tica

Guardamos los resultados en bloques de 500 filas dentro del archivo `openmeteo_daily_cache.parquet`.

Como este archivo queda en el almacenamiento temporal del notebook, es recomendable descargarlo peri√≥dicamente para evitar p√©rdidas si la ejecuci√≥n se interrumpe y as√≠ poder retomar el proceso sin repetir consultas a la API del clima.

Para continuar, vuelve a subir `openmeteo_daily_cache.parquet` al entorno temporal del notebook y ejecuta nuevamente las celdas siguientes, de modo que el proceso siga escribiendo sobre la cach√© existente.

La constante `HARD_CAP = 5_000` indica el n√∫mero m√°ximo total de consultas permitidas a la API del clima; as√≠ evitas ejecuciones muy largas o consumir demasiada cuota.

Asigna el valor seg√∫n tus necesidades (por ejemplo, `HARD_CAP = 5_000` o `HARD_CAP = 15_000`) y ajusta en base al tiempo/costo que est√°s dispuesto a asumir; una buena pr√°ctica es partir conservador y luego aumentar si hace falta.

In [None]:
# c√≥digo completo incorporando selecci√≥n balanceada por a√±o para keys_missing,
# de modo que cuando pidas max_api_calls=15000 no se vaya todo al primer a√±o (p. ej. 2018),
# sino que tome una cantidad similar por a√±o (y si un a√±o tiene pocas claves faltantes, toma todas y redistribuye el resto).
#  La parte clave es usar groupby(...).apply(lambda g: g.sample(...)) para muestrear por grupo/a√±o.

BASE_URL = "https://archive-api.open-meteo.com/v1/archive"
HARD_CAP = 20_000          # l√≠mite duro (tu tope global) ‚Äî respeta tu configuraci√≥n
DEFAULT_TIMEOUT = 15
MAX_RETRIES = 3
BACKOFF_BASE = 0.75        # segundos
WRITE_BATCH_SIZE = 500     # cada cu√°ntas respuestas persistimos


def _safe_sleep(last_call_ts, min_interval_s):
    now = time.time()
    wait = (last_call_ts + min_interval_s) - now
    if wait > 0:
        time.sleep(wait)
    return time.time()


def _retry_get(session, url, params, timeout=DEFAULT_TIMEOUT, max_retries=MAX_RETRIES):
    for attempt in range(1, max_retries + 1):
        try:
            r = session.get(url, params=params, timeout=timeout)
            r.raise_for_status()
            return r
        except requests.RequestException:
            if attempt == max_retries:
                raise
            back = BACKOFF_BASE * (2 ** (attempt - 1))
            time.sleep(back)
    raise RuntimeError("Exhausted retries unexpectedly")


def fetch_daily_weather_for_point_date(lat, lon, date_str, timezone="UTC", session=None, timeout=DEFAULT_TIMEOUT):
    params = {
        "latitude": float(lat),
        "longitude": float(lon),
        "start_date": date_str,
        "end_date": date_str,
        "daily": "temperature_2m_max,temperature_2m_min,precipitation_sum,windspeed_10m_max",
        "timezone": timezone,
    }
    s = session or requests.Session()
    r = _retry_get(s, BASE_URL, params=params, timeout=timeout, max_retries=MAX_RETRIES)
    data = r.json()

    d = data.get("daily", {})
    row = {
        "flightdate": date_str,
        "temp_max": d.get("temperature_2m_max", [None])[0],
        "temp_min": d.get("temperature_2m_min", [None])[0],
        "precipitation_sum": d.get("precipitation_sum", [None])[0],
        "wind_speed_max": d.get("windspeed_10m_max", [None])[0],
    }
    if row["temp_max"] is not None and row["temp_min"] is not None:
        row["temp_mean"] = (float(row["temp_max"]) + float(row["temp_min"])) / 2.0
    else:
        row["temp_mean"] = None
    return row


def _ensure_dir(path):
    d = os.path.dirname(os.path.abspath(path))
    if d and not os.path.exists(d):
        os.makedirs(d, exist_ok=True)


def _lock_file(path, mode="a+b"):
    f = open(path, mode)
    try:
        fcntl.flock(f.fileno(), fcntl.LOCK_EX)
    except Exception:
        f.close()
        raise
    return f


def _write_parquet_atomic(df: pd.DataFrame, path: str, tmp_suffix=".tmp"):
    tmp_path = f"{path}{tmp_suffix}"
    df.to_parquet(tmp_path, index=False)
    os.replace(tmp_path, path)


def _load_cache(cache_path: str) -> pd.DataFrame:
    cols = ["_lat_r","_lon_r","flightdate","temp_max","temp_min","temp_mean","precipitation_sum","wind_speed_max"]
    if os.path.exists(cache_path):
        try:
            df_cache = pd.read_parquet(cache_path)
            for c in cols:
                if c not in df_cache.columns:
                    df_cache[c] = pd.Series(dtype="float64" if c.startswith("temp") or c in ["precipitation_sum","wind_speed_max"] else "object")
            df_cache = df_cache.drop_duplicates(subset=["_lat_r", "_lon_r", "flightdate"], keep="last")
            return df_cache
        except Exception:
            corrupt = cache_path + ".corrupt"
            os.replace(cache_path, corrupt)
            print(f"[cache] Archivo corrupto movido a: {corrupt}. Se reinicia cache.")
    return pd.DataFrame(columns=cols)


def _balance_keys_by_year(keys_missing: pd.DataFrame, max_api_calls: int, random_state: int = 42, exclude_years=None) -> pd.DataFrame:
    """
    Devuelve un subconjunto de keys_missing, muestreado de forma balanceada por 'year'
    derivado desde flightdate. Usa groupby+sample. [web:696][web:712]
    """
    exclude_years = set(exclude_years or [])
    km = keys_missing.copy()
    km["year"] = pd.to_datetime(km["flightdate"], errors="coerce").dt.year

    if exclude_years:
        km = km[~km["year"].isin(exclude_years)].copy()

    # Si no hay year parseable, fallback a sample global
    years = sorted(km["year"].dropna().unique())
    if len(years) == 0:
        out = km.sample(n=min(len(km), max_api_calls), random_state=random_state)
        return out.drop(columns=["year"], errors="ignore")

    # Target por a√±o
    per_year = math.ceil(max_api_calls / len(years))

    # Sample por grupo/a√±o (sin reemplazo; si hay menos, toma todo)
    sampled = (
        km.sample(frac=1, random_state=random_state)
          .groupby("year", group_keys=False)
          .head(per_year)
    )

    # Si sobraron por ceil, recorta a max_api_calls; si faltaron, rellena con el resto
    if len(sampled) > max_api_calls:
        sampled = sampled.sample(n=max_api_calls, random_state=random_state)

    elif len(sampled) < max_api_calls:
        picked_keys = (
            sampled["_lat_r"].astype(str) + "|" +
            sampled["_lon_r"].astype(str) + "|" +
            sampled["flightdate"].astype(str)
        )
        picked_set = set(picked_keys.values)

        all_keys = (
            km["_lat_r"].astype(str) + "|" +
            km["_lon_r"].astype(str) + "|" +
            km["flightdate"].astype(str)
        )
        rest = km.loc[~all_keys.isin(picked_set)]
        need = max_api_calls - len(sampled)
        if len(rest) > 0 and need > 0:
            sampled = pd.concat(
                [sampled, rest.sample(n=min(len(rest), need), random_state=random_state)],
                ignore_index=True
            )

    return sampled.drop(columns=["year"], errors="ignore")


def enrich_df_base_with_weather_daily_cached(
    df_base: pd.DataFrame,
    timezone="UTC",
    max_requests_per_second=2.0,
    max_api_calls=20000,
    round_coords_decimals=3,
    cache_path="/content/drive/MyDrive/openmeteo_daily_cache.parquet",
    show_progress=True,
    random_state=42,
    exclude_years=None,        # ej: [2018] si quieres saltarte 2018
) -> pd.DataFrame:

    max_api_calls = min(int(max_api_calls), HARD_CAP)
    _ensure_dir(cache_path)

    # Normaliza fechas y claves
    df = df_base.copy()
    df["flightdate"] = pd.to_datetime(df["flightdate"]).dt.strftime("%Y-%m-%d")
    df["_lat_r"] = df["latitude"].round(round_coords_decimals)
    df["_lon_r"] = df["longitude"].round(round_coords_decimals)

    keys = (
        df.loc[df["_lat_r"].notna() & df["_lon_r"].notna(), ["_lat_r", "_lon_r", "flightdate"]]
          .drop_duplicates()
          .reset_index(drop=True)
    )

    # Carga cache
    df_cache = _load_cache(cache_path)

    # Anti-join vectorizado
    if not df_cache.empty:
        cache_key = (
            df_cache["_lat_r"].astype(str) + "|" +
            df_cache["_lon_r"].astype(str) + "|" +
            df_cache["flightdate"].astype(str)
        )
        cached_set = set(cache_key.values)
    else:
        cached_set = set()

    keys_key = keys["_lat_r"].astype(str) + "|" + keys["_lon_r"].astype(str) + "|" + keys["flightdate"].astype(str)
    missing_mask = ~keys_key.isin(cached_set)
    keys_missing = keys.loc[missing_mask].reset_index(drop=True)

    total_missing = len(keys_missing)
    if total_missing == 0:
        print(f"[ok] No hay claves faltantes. Cache: {len(df_cache)} filas. Archivo: {os.path.abspath(cache_path)}")

    # IMPORTANTE: balancea por a√±o ANTES de aplicar el cap final
    if total_missing > 0:
        keys_missing = _balance_keys_by_year(
            keys_missing=keys_missing,
            max_api_calls=max_api_calls,
            random_state=random_state,
            exclude_years=exclude_years
        )

    to_fetch = len(keys_missing)
    print(f"[plan] √önicas totales: {len(keys)} | En cache: {len(cached_set)} | A consultar: {to_fetch} (cap m√°x: {HARD_CAP})")

    if to_fetch == 0:
        df_enriched = df.merge(
            df_cache.drop_duplicates(subset=["_lat_r","_lon_r","flightdate"], keep="last"),
            how="left",
            on=["_lat_r","_lon_r","flightdate"]
        )
        df_enriched.drop(columns=["_lat_r", "_lon_r"], inplace=True)
        return df_enriched

    # Rate limit
    min_interval_s = 1.0 / max_requests_per_second if max_requests_per_second > 0 else 0.0
    last_call_ts = 0.0
    session = requests.Session()

    # Barra de progreso opcional
    try:
        from tqdm import tqdm
        bar = tqdm(total=to_fetch, unit="req", disable=not show_progress)
    except Exception:
        bar = None
        print("[info] tqdm no disponible. Continuando sin barra de progreso.")

    new_rows = []
    written_since_last = 0
    processed = 0
    start_ts = time.time()

    def persist_incremental(df_cache_local, new_rows_batch):
        if not new_rows_batch:
            return df_cache_local
        #df_new_local = pd.DataFrame(new_rows_batch)
        #df_cache_local = pd.concat([df_cache_local, df_new_local], ignore_index=True)
        df_new_local = pd.DataFrame(new_rows_batch)
        if df_new_local.empty:
            return df_cache_local

        df_cache_local = pd.concat([df_cache_local, df_new_local], ignore_index=True)
        df_cache_local = df_cache_local.drop_duplicates(subset=["_lat_r","_lon_r","flightdate"], keep="last")
        with _lock_file(cache_path, mode="a+b"):
            _write_parquet_atomic(df_cache_local, cache_path)
        return df_cache_local

    for _, k in keys_missing.iterrows():
        if processed >= HARD_CAP:
            break

        last_call_ts = _safe_sleep(last_call_ts, min_interval_s)

        try:
            row = fetch_daily_weather_for_point_date(
                lat=k["_lat_r"], lon=k["_lon_r"], date_str=k["flightdate"],
                timezone=timezone, session=session
            )
            row["_lat_r"] = k["_lat_r"]
            row["_lon_r"] = k["_lon_r"]
        except requests.RequestException:
            row = {
                "_lat_r": k["_lat_r"],
                "_lon_r": k["_lon_r"],
                "flightdate": k["flightdate"],
                "temp_max": None,
                "temp_min": None,
                "temp_mean": None,
                "precipitation_sum": None,
                "wind_speed_max": None,
            }

        new_rows.append(row)
        processed += 1
        written_since_last += 1

        if bar:
            bar.update(1)
            elapsed = max(time.time() - start_ts, 1e-6)
            rate = processed / elapsed
            remaining = to_fetch - processed
            eta = remaining / rate if rate > 0 else float("inf")
            bar.set_postfix({"rate": f"{rate:.2f}/s", "ETA": f"{eta/60:.1f}m"})

        if written_since_last >= WRITE_BATCH_SIZE:
            df_cache = persist_incremental(df_cache, new_rows)
            print(f"[persist] Escrito batch de {written_since_last}. Total cache: {len(df_cache)}")
            new_rows.clear()
            written_since_last = 0

    if new_rows:
        df_cache = persist_incremental(df_cache, new_rows)
        print(f"[persist] Escrito batch final de {len(new_rows)}. Total cache: {len(df_cache)}")
        new_rows.clear()

    if bar:
        bar.close()

    # Merge final
    df_enriched = df.merge(
        df_cache.drop_duplicates(subset=["_lat_r","_lon_r","flightdate"], keep="last"),
        how="left",
        on=["_lat_r","_lon_r","flightdate"]
    )
    df_enriched.drop(columns=["_lat_r", "_lon_r"], inplace=True)

    print(f"[done] Enriquecidas {len(df_enriched)} filas. Cache en: {os.path.abspath(cache_path)}")
    return df_enriched

### Ejecutar la funci√≥n que consulta la API y actualiza el archivo de cach√©

In [None]:
df_base = enrich_df_base_with_weather_daily_cached(
    df_base,
    timezone="auto",
    max_requests_per_second=2.0,
    max_api_calls=15000,
    round_coords_decimals=3,
    cache_path="openmeteo_daily_cache.parquet",
    #exclude_years=[2018, 2019], # opcional. Si ya tienes info de 2018 y 2019 y quieres priorizar otros a√±os
    random_state=42
)

### Exportar el `DataFrame` a CSV: `dataset_con_meteo_out.csv`

In [None]:
df_base.to_csv("dataset_con_meteo_out.csv", index=False, encoding="utf-8")

### Cargar el CSV en un nuevo `DataFrame`

In [None]:
df = pd.read_csv("dataset_con_meteo_out.csv")

### Inspeccionar `DataFrame`


In [None]:
df.info()

In [None]:
df.head()

In [None]:
df.shape

### Renombrar las nuevas columnas agregadas previamente

In [None]:
df = df.rename(columns={
    "precipitation_sum": "precipitation",
    "wind_speed_max": "wind_speed",
})

### Revisar cu√°ntos valores nulos tienen estas columnas

In [None]:
df[["temp_mean", "precipitation", "wind_speed"]].isna().sum()

### Revisar cu√°ntos valores nulos tiene el dataset, agrupado por a√±o

In [None]:
# null por a√±os
cols = ["temp_mean", "precipitation", "wind_speed"]

mask_any_na = df[cols].isna().any(axis=1)
na_rows_by_year = df.loc[mask_any_na].groupby("year").size().sort_index()

na_rows_by_year

### Revisar cu√°ntos valores no nulos tenemos por a√±o

In [None]:
# No-null por a√±o (para columnas espec√≠ficas)

cols = ["temp_mean", "precipitation", "wind_speed"]

not_null_by_year = df.groupby("year")[cols].count().sort_index()
not_null_by_year

### Revisar una muestra de los datos con valores nulos

In [None]:
df.loc[mask_any_na, ["year", "flightdate"] + cols].sample(20)

### Eliminar todos los registros con nulos en las columnas nuevas y conservar solo los que tienen informaci√≥n meteorol√≥gica; revisar el resultado

In [None]:
# Eliminamos filas sin informaci√≥n clim√°tica

df_ml = df.dropna(
    subset=["temp_mean", "precipitation", "wind_speed"]
)

df_ml.shape

In [None]:
df_ml.info()

### Eliminar las columnas sobrantes y revisar el resultado

In [None]:
df_ml = df_ml.drop(columns=["temp_min", "temp_max"])

In [None]:
df_ml.info()

### Revisar y eliminar registros duplicados

In [None]:
print("Buscando duplicados...")
num_duplicates = df_ml.duplicated().sum()
print(f"N√∫mero de filas duplicadas: {num_duplicates}")

In [None]:
print("Registros antes de eliminar duplicados:", len(df_ml))
df_ml = df_ml.drop_duplicates(keep="first").reset_index(drop=True)
print("Registros despu√©s de eliminar duplicados:", len(df_ml))

### Asegurar que solo queden aeropuertos de aviaci√≥n civil con vuelos programados

In [None]:
allowed_types = {"small_airport", "medium_airport", "large_airport"}
df_ml = df_ml[df_ml["type"].isin(allowed_types)].copy()

In [None]:
df_ml.loc[df_ml["type"].isin(allowed_types), "type"].unique()

In [None]:
df_ml.sample(20)


In [None]:
df_ml.info()

## **9. ‚¨áÔ∏è Exportar el dataset final a CSV y forzar su descarga (solo en Colab)**

---

In [None]:
OUTPUT_CSV = "dataset_vuelos_clima_final.csv"
df_ml.to_csv(OUTPUT_CSV, index=False)

OUTPUT_CSV

# Si tienes recursos suficientes y est√°s ejecutando este notebook en Google Colab,
# puedes descomentar el bloque de abajo para forzar la descarga del archivo generado.
"""
from google.colab import files
files.download(OUTPUT_CSV)
"""