# Weather API Data Export
Configure a location and date span, then export per-day CSV files for each weather API client at the highest available granularity.

Install dependencies if needed (e.g. `pip install pandas requests`).

In [None]:
import datetime as dt
import json
from pathlib import Path
import sys

PROJECT_ROOT = Path('..').resolve()
if str(PROJECT_ROOT) not in sys.path:
    sys.path.insert(0, str(PROJECT_ROOT))

import pandas as pd

from clients.tomorrow_io_client import TomorrowIOClient, ApiError as TomorrowApiError
from clients.open_meteo_client import OpenMeteoClient
from clients.visual_crossing_client import VisualCrossingClient
from clients.noaa_access_client import NoaaIsdClient, NoaaLcdClient
from clients.meteostat_client import MeteostatClient

from clients.noaa_access_client import NoaaIsdClient, NoaaLcdClient, ApiError as NoaaAccessApiError
from clients.weatherapi_com_client import WeatherApiClient, ApiError as WeatherApiError

from clients.openweather_client import OpenWeatherClient, ApiError as OpenWeatherApiError

from clients.weatherbit_client import WeatherbitClient, ApiError as WeatherbitApiError

from clients.copernicus_cds_client import CopernicusCdsClient

In [None]:
# Location and time span configuration
CONFIG_PATH = Path('../weather_config.json')
CONFIG = json.loads(CONFIG_PATH.read_text())
LOCATIONS = CONFIG.get('locations', {})
LOCATION_ITEMS = []
for key, coords in LOCATIONS.items():
    try:
        lat = float(coords['lat'])
        lon = float(coords['lon'])
    except (KeyError, TypeError, ValueError) as exc:
        raise ValueError(f"Invalid coordinates for location '{key}'. Provide numeric 'lat' and 'lon'.") from exc
    LOCATION_ITEMS.append((key, lat, lon))
if not LOCATION_ITEMS:
    raise ValueError("Define at least one location under 'locations' in weather_config.json.")
START_DATE = dt.date(2000, 1, 1)
END_DATE = dt.date(2025, 11, 5)
# Enable/disable providers
USE_TOMORROW_IO = True
USE_OPEN_METEO = True
USE_VISUAL_CROSSING = True
USE_NOAA_ISD = True
USE_NOAA_LCD = False
USE_OPENWEATHER = True
USE_WEATHERBIT = True
USE_WEATHERAPI_COM = True
USE_COPERNICUS_ERA5_SINGLE = True
USE_COPERNICUS_ERA5_LAND = False
USE_COPERNICUS_ERA5_PRESSURE = False
USE_COPERNICUS_ERA5_LAND_TS = False
DATA_ROOT = Path('../data')
DATA_ROOT.mkdir(exist_ok=True)
UTC = dt.timezone.utc
def iter_locations():
    for item in LOCATION_ITEMS:
        yield item
def iter_days(start: dt.date, end: dt.date):
    day = start
    while day <= end:
        yield day
        day += dt.timedelta(days=1)
DAY_RANGE = list(iter_days(START_DATE, END_DATE))
LOCATION_ITEMS, DAY_RANGE[:3], DAY_RANGE[-3:] if DAY_RANGE else []


## Tomorrow.io export

In [None]:
if not USE_TOMORROW_IO:
    print('USE_TOMORROW_IO: disabled, skipping export.')
else:
    try:
        tomorrow_dir = DATA_ROOT / 'tomorrow_io'
        tomorrow_dir.mkdir(parents=True, exist_ok=True)

        TOMORROW_FIELDS = [
          'temperature', 'temperatureApparent', 'humidity', 'dewPoint', 'pressureSurfaceLevel',
          'pressureMeanSeaLevel', 'visibility', 'cloudCover', 'uvIndex', 'windSpeed', 'windGust',
          'windDirection', 'precipitationIntensity', 'precipitationProbability', 'precipitationType',
          'rainIntensity', 'snowIntensity', 'iceAccumulation', 'solarGHI', 'weatherCode'
        ]
        TOMORROW_TIMESTEPS = ['5m', '1h']

        tomorrow_client = TomorrowIOClient(config_path='../weather_config.json')
        today = dt.date.today()

        def _tomorrow_payload_to_df(payload: dict, day: dt.date) -> pd.DataFrame:
            rows = []
            for timeline in payload.get('data', {}).get('timelines', []):
                timestep = timeline.get('timestep')
                for interval in timeline.get('intervals', []):
                    values = interval.get('values', {})
                    row = {'timestamp': interval.get('startTime'), 'timestep': timestep}
                    row.update(values)
                    rows.append(row)
            if not rows:
                return pd.DataFrame()
            df = pd.DataFrame(rows)
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df = df[df['timestamp'].dt.date == day]
            return df.sort_values('timestamp')

        def _write_tomorrow_results(directory: Path, prefix: str, days, payloads):
            for day, payload in zip(days, payloads):
                if isinstance(payload, Exception):
                    print(f'{prefix}: request failed for {day}: {payload}')
                    continue
                try:
                    df_day = _tomorrow_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f'{prefix}: no data for {day}')
                        continue
                    output_path = directory / f'{day.isoformat()}.csv'
                    df_day.to_csv(output_path, index=False)
                    print(f'{prefix}: wrote {output_path}')
                except Exception as exc:
                    print(f"{prefix}: unexpected error for {day}: {exc}")

        for location_key, lat, lon in iter_locations():
            prefix = f'Tomorrow.io[{location_key}]'
            location_dir = tomorrow_dir / location_key
            location_dir.mkdir(parents=True, exist_ok=True)

            historical_days = []
            historical_requests = []
            forecast_days = []
            forecast_requests = []

            for day in DAY_RANGE:
                if day < today - dt.timedelta(days=1):
                    print(f'{prefix}: skipping {day} (plan permits only last 24 hours).')
                    continue

                output_path = location_dir / f'{day.isoformat()}.csv'
                if day != today and output_path.exists():
                    print(f'{prefix}: skipping {day} (already exported).')
                    continue

                start = dt.datetime.combine(day, dt.time(0, 0), tzinfo=UTC)
                end = start + dt.timedelta(days=1)

                request = {
                    'location': (lat, lon),
                    'start_time': start,
                    'end_time': end,
                    'fields': TOMORROW_FIELDS,
                    'timesteps': TOMORROW_TIMESTEPS,
                    'timezone': 'UTC',
                }

                if day < today:
                    historical_days.append(day)
                    historical_requests.append(dict(request))
                else:
                    forecast_days.append(day)
                    forecast_requests.append(dict(request))

            historical_payloads = tomorrow_client.get_historical_batch(historical_requests)
            forecast_payloads = tomorrow_client.get_forecast_batch(forecast_requests)

            _write_tomorrow_results(location_dir, prefix, historical_days, historical_payloads)
            _write_tomorrow_results(location_dir, prefix, forecast_days, forecast_payloads)
    except Exception as exc:
        print(f"Tomorrow.io: unexpected error: {exc}")


## Open-Meteo export

In [None]:
if not USE_OPEN_METEO:
    print('USE_OPEN_METEO: disabled, skipping export.')
else:
    try:
        open_meteo_dir = DATA_ROOT / 'open_meteo'
        open_meteo_dir.mkdir(parents=True, exist_ok=True)

        OPEN_METEO_HOURLY = [
          'temperature_2m', 'relative_humidity_2m', 'dew_point_2m', 'apparent_temperature',
          'pressure_msl', 'surface_pressure', 'cloud_cover', 'cloud_cover_low', 'cloud_cover_mid',
          'cloud_cover_high', 'wind_speed_10m', 'wind_direction_10m', 'wind_gusts_10m',
          'shortwave_radiation', 'direct_radiation', 'diffuse_radiation', 'global_tilted_irradiance',
          'sunshine_duration', 'precipitation', 'rain', 'snowfall', 'weather_code',
          'soil_temperature_0cm', 'soil_moisture_0_1cm'
        ]

        open_meteo_client = OpenMeteoClient(config_path='../weather_config.json')
        today = dt.date.today()

        def _open_meteo_payload_to_df(payload: dict, day: dt.date) -> pd.DataFrame:
            hourly = payload.get('hourly', {})
            if not hourly:
                return pd.DataFrame()
            df = pd.DataFrame(hourly)
            if 'time' not in df:
                return pd.DataFrame()
            df.rename(columns={'time': 'timestamp'}, inplace=True)
            df['timestamp'] = pd.to_datetime(df['timestamp'])
            df = df[df['timestamp'].dt.date == day]
            return df.sort_values('timestamp')

        def _write_open_meteo_results(directory: Path, prefix: str, days, payloads):
            for day, payload in zip(days, payloads):
                if isinstance(payload, Exception):
                    print(f'{prefix}: request failed for {day}: {payload}')
                    continue
                try:
                    df_day = _open_meteo_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f'{prefix}: no data for {day}')
                        continue
                    output_path = directory / f'{day.isoformat()}.csv'
                    df_day.to_csv(output_path, index=False)
                    print(f'{prefix}: wrote {output_path}')
                except Exception as exc:
                    print(f"{prefix}: unexpected error for {day}: {exc}")

        for location_key, lat, lon in iter_locations():
            prefix = f'Open-Meteo[{location_key}]'
            location_dir = open_meteo_dir / location_key
            location_dir.mkdir(parents=True, exist_ok=True)

            historical_days = []
            historical_requests = []
            forecast_days = []
            forecast_requests = []

            for day in DAY_RANGE:
                output_path = location_dir / f'{day.isoformat()}.csv'
                if day != today and output_path.exists():
                    print(f'{prefix}: skipping {day} (already exported).')
                    continue

                request = {
                    'location': (lat, lon),
                    'start_date': day,
                    'end_date': day,
                    'hourly': OPEN_METEO_HOURLY,
                    'timezone': 'UTC',
                }
                if day < today:
                    historical_days.append(day)
                    historical_requests.append(dict(request))
                else:
                    forecast_days.append(day)
                    forecast_requests.append(dict(request))

            historical_payloads = open_meteo_client.get_historical_batch(historical_requests)
            forecast_payloads = open_meteo_client.get_forecast_batch(forecast_requests)

            _write_open_meteo_results(location_dir, prefix, historical_days, historical_payloads)
            _write_open_meteo_results(location_dir, prefix, forecast_days, forecast_payloads)
    except Exception as exc:
        print(f"Open-Meteo: unexpected error: {exc}")


## Visual Crossing export

In [None]:
if not USE_VISUAL_CROSSING:
    print('USE_VISUAL_CROSSING: disabled, skipping export.')
else:
    try:
        visual_crossing_dir = DATA_ROOT / 'visual_crossing'
        visual_crossing_dir.mkdir(parents=True, exist_ok=True)

        visual_crossing_client = VisualCrossingClient(config_path='../weather_config.json')
        today = dt.date.today()
        max_forecast_day = today + dt.timedelta(days=15)

        def _visual_crossing_payload_to_df(payload: dict, day: dt.date) -> pd.DataFrame:
            hours = []
            for daily in payload.get('days', []):
                day_date = daily.get('datetime')
                for entry in daily.get('hours', []):
                    row = dict(entry)
                    row['parent_day'] = day_date
                    epoch = entry.get('datetimeEpoch')
                    if epoch is not None:
                        row['timestamp'] = pd.to_datetime(epoch, unit='s', utc=True)
                    elif entry.get('datetime') is not None and day_date is not None:
                        row['timestamp'] = pd.to_datetime(f"{day_date}T{entry['datetime']}")
                    hours.append(row)
            if not hours:
                return pd.DataFrame()
            df = pd.DataFrame(hours)
            if 'timestamp' not in df:
                return pd.DataFrame()
            df['timestamp'] = pd.to_datetime(df['timestamp'], utc=True)
            df = df[df['timestamp'].dt.date == day]
            return df.sort_values('timestamp')

        def _write_visual_crossing_results(directory: Path, prefix: str, days, payloads):
            for day, payload in zip(days, payloads):
                if isinstance(payload, Exception):
                    print(f'{prefix}: request failed for {day}: {payload}')
                    continue
                try:
                    df_day = _visual_crossing_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f'{prefix}: no data for {day}')
                        continue
                    output_path = directory / f'{day.isoformat()}.csv'
                    df_day.to_csv(output_path, index=False)
                    print(f'{prefix}: wrote {output_path}')
                except Exception as exc:
                    print(f"{prefix}: unexpected error for {day}: {exc}")

        for location_key, lat, lon in iter_locations():
            prefix = f'Visual Crossing[{location_key}]'
            location_dir = visual_crossing_dir / location_key
            location_dir.mkdir(parents=True, exist_ok=True)

            historical_days = []
            historical_requests = []
            forecast_days = []
            forecast_requests = []

            for day in DAY_RANGE:
                if day > max_forecast_day:
                    print(f'{prefix}: skipping {day} (beyond forecast horizon).')
                    continue

                output_path = location_dir / f'{day.isoformat()}.csv'
                if day != today and output_path.exists():
                    print(f'{prefix}: skipping {day} (already exported).')
                    continue

                start = day
                end = day + dt.timedelta(days=1)

                request = {
                    'location': f'{lat},{lon}',
                    'start': start,
                    'end': end,
                    'include': ['hours'],
                    'unit_group': 'metric',
                }

                if day < today:
                    historical_days.append(day)
                    historical_requests.append(dict(request))
                else:
                    forecast_days.append(day)
                    forecast_requests.append(dict(request))

            historical_payloads = visual_crossing_client.get_historical_batch(historical_requests)
            forecast_payloads = visual_crossing_client.get_forecast_batch(forecast_requests)

            _write_visual_crossing_results(location_dir, prefix, historical_days, historical_payloads)
            _write_visual_crossing_results(location_dir, prefix, forecast_days, forecast_payloads)
    except Exception as exc:
        print(f"Visual Crossing: unexpected error: {exc}")


## NOAA ISD export


In [None]:

if not USE_NOAA_ISD:
    print('USE_NOAA_ISD: disabled, skipping export.')
else:
    try:
        noaa_isd_dir = DATA_ROOT / 'noaa_isd'
        noaa_isd_dir.mkdir(parents=True, exist_ok=True)

        noaa_isd_client = NoaaIsdClient(config_path='../weather_config.json')
        today = dt.date.today()

        def _parse_isd_temp(value):
            if not value or value in {'+9999,9', '-9999,9'}:
                return None
            token = value.strip()
            if not token:
                return None
            sign = -1 if token.startswith('-') else 1
            digits = ''.join(ch for ch in token if ch.isdigit())
            if not digits:
                return None
            try:
                numeric = int(digits[:4])
            except ValueError:
                return None
            return sign * numeric / 10.0

        def _noaa_isd_payload_to_df(payload: list[dict], day: dt.date) -> pd.DataFrame:
            if not payload:
                return pd.DataFrame()
            df = pd.DataFrame(payload)
            if 'DATE' not in df:
                return pd.DataFrame()
            df['timestamp'] = pd.to_datetime(df['DATE'], errors='coerce')
            df = df.dropna(subset=['timestamp'])
            df = df[df['timestamp'].dt.date == day]
            if df.empty:
                return df
            df = df.sort_values('timestamp')
            if 'TMP' in df.columns:
                df['temperature_c'] = df['TMP'].apply(_parse_isd_temp)
            if 'DEW' in df.columns:
                df['dewpoint_c'] = df['DEW'].apply(_parse_isd_temp)
            return df

        for location_key, *_coords in iter_locations():
            station_id = LOCATIONS.get(location_key, {}).get('noaaIsdStation')
            if not station_id:
                print(f"{location_key}: missing 'noaaIsdStation'; skipping.")
                continue

            for day in DAY_RANGE:
                if day > today:
                    continue
                output_path = noaa_isd_dir / location_key / f'{day.isoformat()}.csv'
                if day != today and output_path.exists():
                    continue
                output_path.parent.mkdir(parents=True, exist_ok=True)
                start = dt.datetime.combine(day, dt.time(0, 0), tzinfo=UTC)
                end = start + dt.timedelta(days=1)
                payload = noaa_isd_client.get_observations(
                    station_id=station_id,
                    start_time=start,
                    end_time=end,
                )
                try:
                    df_day = _noaa_isd_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f"{location_key}: no ISD data for {day}")
                        continue
                    df_day.to_csv(output_path, index=False)
                    print(f"{location_key}: wrote {output_path}")
                except Exception as exc:  # noqa: BLE001
                    print(f"{location_key}: unexpected ISD error for {day}: {exc}")
    except Exception as exc:  # noqa: BLE001
        print(f"NOAA ISD: unexpected error: {exc}")


## NOAA LCD export


In [None]:

if not USE_NOAA_LCD:
    print('USE_NOAA_LCD: disabled, skipping export.')
else:
    try:
        noaa_lcd_dir = DATA_ROOT / 'noaa_lcd'
        noaa_lcd_dir.mkdir(parents=True, exist_ok=True)

        noaa_lcd_client = NoaaLcdClient(config_path='../weather_config.json')
        today = dt.date.today()

        def _fahrenheit_to_celsius(value):
            if value is None or (isinstance(value, str) and not value.strip()):
                return None
            try:
                numeric = float(value)
            except ValueError:
                return None
            return (numeric - 32.0) * 5.0 / 9.0

        def _noaa_lcd_payload_to_df(payload: list[dict], day: dt.date) -> pd.DataFrame:
            if not payload:
                return pd.DataFrame()
            df = pd.DataFrame(payload)
            if 'DATE' not in df:
                return pd.DataFrame()
            df['timestamp'] = pd.to_datetime(df['DATE'], errors='coerce')
            df = df.dropna(subset=['timestamp'])
            df = df[df['timestamp'].dt.date == day]
            if df.empty:
                return df
            df = df.sort_values('timestamp')
            if 'HourlyDryBulbTemperature' in df.columns:
                df['dry_bulb_f'] = pd.to_numeric(df['HourlyDryBulbTemperature'], errors='coerce')
                df['dry_bulb_c'] = df['dry_bulb_f'].apply(_fahrenheit_to_celsius)
            if 'HourlyDewPointTemperature' in df.columns:
                df['dewpoint_f'] = pd.to_numeric(df['HourlyDewPointTemperature'], errors='coerce')
                df['dewpoint_c'] = df['dewpoint_f'].apply(_fahrenheit_to_celsius)
            return df

        for location_key, *_coords in iter_locations():
            station_id = LOCATIONS.get(location_key, {}).get('noaaLcdStation')
            if not station_id:
                print(f"{location_key}: missing 'noaaLcdStation'; skipping.")
                continue

            for day in DAY_RANGE:
                if day > today:
                    continue
                output_path = noaa_lcd_dir / location_key / f'{day.isoformat()}.csv'
                if day != today and output_path.exists():
                    continue
                output_path.parent.mkdir(parents=True, exist_ok=True)
                start = dt.datetime.combine(day, dt.time(0, 0), tzinfo=UTC)
                end = start + dt.timedelta(days=1)
                payload = noaa_lcd_client.get_observations(
                    station_id=station_id,
                    start_time=start,
                    end_time=end,
                )
                try:
                    df_day = _noaa_lcd_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f"{location_key}: no LCD data for {day}")
                        continue
                    df_day.to_csv(output_path, index=False)
                    print(f"{location_key}: wrote {output_path}")
                except Exception as exc:  # noqa: BLE001
                    print(f"{location_key}: unexpected LCD error for {day}: {exc}")
    except Exception as exc:  # noqa: BLE001
        print(f"NOAA LCD: unexpected error: {exc}")


## Meteostat export


In [None]:
if not USE_METEOSTAT:
    print('USE_METEOSTAT: disabled, skipping export.')
else:
    try:
        meteostat_dir = DATA_ROOT / 'meteostat'
        meteostat_dir.mkdir(parents=True, exist_ok=True)
        meteostat_client = MeteostatClient(config_path='../weather_config.json')
        def _meteostat_payload_to_df(payload, day):
            if not payload:
                return pd.DataFrame()
            df = pd.DataFrame(payload)
            if 'timestamp' not in df.columns:
                if 'time' in df.columns:
                    df['timestamp'] = pd.to_datetime(df['time'], errors='coerce')
                else:
                    return pd.DataFrame()
            else:
                df['timestamp'] = pd.to_datetime(df['timestamp'], errors='coerce')
            df = df.dropna(subset=['timestamp'])
            df = df[df['timestamp'].dt.date == day]
            return df.sort_values('timestamp')
        for location_key, lat, lon in iter_locations():
            prefix = f'Meteostat[{location_key}]'
            location_dir = meteostat_dir / location_key
            location_dir.mkdir(parents=True, exist_ok=True)
            for day in DAY_RANGE:
                output_path = location_dir / f'{day.isoformat()}.csv'
                if day != dt.date.today() and output_path.exists():
                    continue
                start = dt.datetime.combine(day, dt.time(0, 0))
                end = start + dt.timedelta(days=1)
                payload = meteostat_client.get_hourly(
                    location=(lat, lon),
                    start_time=start,
                    end_time=end,
                )
                try:
                    df_day = _meteostat_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f"{prefix}: no Meteostat data for {day}")
                        continue
                    df_day.to_csv(output_path, index=False)
                    print(f"{prefix}: wrote {output_path}")
                except Exception as exc:  # noqa: BLE001
                    print(f"{prefix}: unexpected Meteostat error for {day}: {exc}")
    except Exception as exc:  # noqa: BLE001
        print(f"Meteostat: unexpected error: {exc}")


## NASA POWER export


In [None]:
if not USE_NASA_POWER:
    print('USE_NASA_POWER: disabled, skipping export.')
else:
    try:
        nasa_power_dir = DATA_ROOT / 'nasa_power'
        nasa_power_dir.mkdir(parents=True, exist_ok=True)

        nasa_power_client = NasaPowerClient(config_path='../weather_config.json')

        def _nasa_power_payload_to_df(payload, day):
            props = payload.get('properties', {}) if isinstance(payload, dict) else {}
            parameters = props.get('parameter', {})
            if not parameters:
                return pd.DataFrame()
            rows = {}
            for param, series in parameters.items():
                if not isinstance(series, dict):
                    continue
                for stamp, value in series.items():
                    rows.setdefault(stamp, {})[param] = value
            records = []
            for stamp, values in rows.items():
                try:
                    timestamp = dt.datetime.strptime(str(stamp), '%Y%m%d%H').replace(tzinfo=dt.timezone.utc)
                except ValueError:
                    continue
                if timestamp.date() != day:
                    continue
                record = dict(values)
                record['timestamp'] = timestamp
                records.append(record)
            if not records:
                return pd.DataFrame()
            df = pd.DataFrame(records).sort_values('timestamp')
            rename_map = {
                'T2M': 'temperature_c',
                'T2MDEW': 'dewpoint_c',
                'RH2M': 'rel_humidity_pct',
                'WS10M': 'wind_speed_10m_m_s',
                'WD10M': 'wind_dir_10m_deg',
                'PRECTOTCORR': 'precip_mm_hr',
                'PS': 'surface_pressure_kpa',
            }
            df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns}, inplace=True)
            return df

        for location_key, lat, lon in iter_locations():
            prefix = f'NASA POWER[{location_key}]'
            location_dir = nasa_power_dir / location_key
            location_dir.mkdir(parents=True, exist_ok=True)

            for day in DAY_RANGE:
                output_path = location_dir / f'{day.isoformat()}.csv'
                if day != dt.date.today() and output_path.exists():
                    continue
                payload = nasa_power_client.get_hourly(
                    location=(lat, lon),
                    start_time=day,
                    end_time=day,
                )
                try:
                    df_day = _nasa_power_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f"{prefix}: no data for {day}")
                        continue
                    df_day.to_csv(output_path, index=False)
                    print(f"{prefix}: wrote {output_path}")
                except Exception as exc:  # noqa: BLE001
                    print(f"{prefix}: unexpected NASA POWER error for {day}: {exc}")
    except Exception as exc:  # noqa: BLE001
        print(f"NASA POWER: unexpected error: {exc}")


## IEM ASOS export


In [None]:
if not USE_IEM_ASOS:
    print('USE_IEM_ASOS: disabled, skipping export.')
else:
    try:
        iem_dir = DATA_ROOT / 'iem_asos'
        iem_dir.mkdir(parents=True, exist_ok=True)

        iem_client = IemAsosClient(config_path='../weather_config.json')

        def _iem_payload_to_df(payload, day):
            if payload is None or payload.empty:
                return pd.DataFrame()
            df = payload.copy()
            if 'timestamp' not in df.columns:
                valid_col = next((col for col in df.columns if col.lower().startswith('valid')), None)
                if valid_col:
                    df['timestamp'] = pd.to_datetime(df[valid_col], utc=True, errors='coerce')
            df = df.dropna(subset=['timestamp'])
            df = df[df['timestamp'].dt.date == day]
            if df.empty:
                return df
            df = df.sort_values('timestamp')
            rename_map = {
                'tmpf': 'temperature_f',
                'dwpf': 'dewpoint_f',
                'sknt': 'wind_speed_knots',
                'drct': 'wind_dir_deg',
                'gust_sknt': 'wind_gust_knots',
                'precip': 'precip_in',
                'pres1': 'station_pressure_inhg',
            }
            df = df.rename(columns={k: v for k, v in rename_map.items() if k in df.columns})
            if 'temperature_f' in df.columns:
                df['temperature_c'] = (df['temperature_f'] - 32.0) * 5.0 / 9.0
            if 'dewpoint_f' in df.columns:
                df['dewpoint_c'] = (df['dewpoint_f'] - 32.0) * 5.0 / 9.0
            if 'wind_speed_knots' in df.columns:
                df['wind_speed_mps'] = df['wind_speed_knots'] * 0.514444
            if 'wind_gust_knots' in df.columns:
                df['wind_gust_mps'] = df['wind_gust_knots'] * 0.514444
            if 'precip_in' in df.columns:
                df['precip_mm'] = df['precip_in'] * 25.4
            return df

        for location_key, *_coords in iter_locations():
            station = LOCATIONS.get(location_key, {}).get('iemStation')
            network = LOCATIONS.get(location_key, {}).get('iemNetwork')
            if not station or not network:
                print(f"{location_key}: missing IEM metadata; skipping.")
                continue

            for day in DAY_RANGE:
                output_path = iem_dir / location_key / f'{day.isoformat()}.csv'
                if day != dt.date.today() and output_path.exists():
                    continue
                output_path.parent.mkdir(parents=True, exist_ok=True)
                start = dt.datetime.combine(day, dt.time(0, 0))
                end = start + dt.timedelta(days=1)
                payload = iem_client.get_observations(
                    station=station,
                    network=network,
                    start_time=start,
                    end_time=end,
                )
                try:
                    df_day = _iem_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f"{station}: no IEM ASOS data for {day}")
                        continue
                    df_day.to_csv(output_path, index=False)
                    print(f"{station}: wrote {output_path}")
                except Exception as exc:
                    print(f"{station}: unexpected IEM error for {day}: {exc}")
    except Exception as exc:
        print(f"IEM ASOS: unexpected error: {exc}")


## Copernicus ERA5 exports
The exporter now treats each ERA5 dataset (single levels, ERA5-Land, ERA5 pressure levels, and ERA5-Land time-series) as a standalone provider. Enable any combination via the corresponding `USE_COPERNICUS_*` flags before running the cell below.

In [None]:
import datetime as dt

COPERNICUS_EXPORTS = [
    {
        "label": "ERA5 single levels",
        "provider": "copernicus_era5_single",
        "dir_name": "copernicus_era5_single",
        "area_attr": "copernicusEra5Area",
        "enabled": USE_COPERNICUS_ERA5_SINGLE,
        "variant": "single",
    },
    {
        "label": "ERA5-Land",
        "provider": "copernicus_era5_land",
        "dir_name": "copernicus_era5_land",
        "area_attr": "copernicusEra5LandArea",
        "enabled": USE_COPERNICUS_ERA5_LAND,
        "variant": "land",
    },
    {
        "label": "ERA5 pressure levels",
        "provider": "copernicus_era5_pressure",
        "dir_name": "copernicus_era5_pressure",
        "area_attr": "copernicusEra5Area",
        "enabled": USE_COPERNICUS_ERA5_PRESSURE,
        "variant": "pressure",
    },
    {
        "label": "ERA5-Land time-series",
        "provider": "copernicus_era5_land_timeseries",
        "dir_name": "copernicus_era5_land_timeseries",
        "area_attr": None,
        "enabled": USE_COPERNICUS_ERA5_LAND_TS,
        "variant": "land_ts",
    },
]


def _filter_day_df(payload, day):
    if payload is None or payload.empty:
        return pd.DataFrame()
    df = payload.copy()
    if 'timestamp' not in df.columns:
        return pd.DataFrame()
    df = df[df['timestamp'].dt.date == day]
    if df.empty:
        return df
    return df.sort_values('timestamp').reset_index(drop=True)


def _transform_payload(df, variant):
    if df.empty:
        return df
    if variant in ('single', 'land', 'land_ts') and 't2m' in df.columns:
        df['temperature_c'] = df['t2m'] - 273.15
    if variant in ('single', 'land', 'land_ts') and 'tp' in df.columns:
        df['precip_mm'] = df['tp'] * 1000.0
    if variant in ('single', 'land_ts') and 'u10' in df.columns:
        df['wind_u10_mps'] = df['u10']
    if variant in ('single', 'land_ts') and 'v10' in df.columns:
        df['wind_v10_mps'] = df['v10']
    if variant == 'pressure' and 'temperature' in df.columns:
        df['temperature_c'] = df['temperature'] - 273.15
    return df

for cfg in COPERNICUS_EXPORTS:
    if not cfg['enabled']:
        print(f"{cfg['label']}: disabled, skipping export block.")
        continue

    print(f"
=== {cfg['label']} ===")
    provider_dir = DATA_ROOT / cfg['dir_name']
    provider_dir.mkdir(parents=True, exist_ok=True)

    try:
        client = CopernicusCdsClient(config_path='../weather_config.json', provider=cfg['provider'])
    except Exception as exc:  # noqa: BLE001
        print(f"{cfg['label']}: client init failed -> {exc}")
        continue

    for location_key, lat, lon in iter_locations():
        extras = LOCATIONS.get(location_key, {})
        area_attr = cfg.get('area_attr')
        area = extras.get(area_attr) if area_attr else None
        if area_attr and not area:
            print(f"{cfg['label']} -> {location_key}: missing {area_attr}; skipping location.")
            continue

        for day in DAY_RANGE:
            output_path = provider_dir / location_key / f'{day.isoformat()}.csv'
            if day != dt.date.today() and output_path.exists():
                continue
            output_path.parent.mkdir(parents=True, exist_ok=True)
            try:
                payload = client.get_dataset(
                    area=area,
                    start_date=day,
                    end_date=day,
                    latitude=lat,
                    longitude=lon,
                )
                df_day = _transform_payload(_filter_day_df(payload, day), cfg['variant'])
                if df_day.empty:
                    print(f"{cfg['label']} -> {location_key}: no data for {day}")
                    continue
                df_day.to_csv(output_path, index=False)
                print(f"{cfg['label']} -> {location_key}: wrote {output_path}")
            except Exception as exc:  # noqa: BLE001
                print(f"{cfg['label']} -> {location_key}: error on {day}: {exc}")


## OpenWeather export


In [None]:
if not USE_OPENWEATHER:
    print('USE_OPENWEATHER: disabled, skipping export.')
else:
    try:
        openweather_dir = DATA_ROOT / 'openweather'
        openweather_dir.mkdir(parents=True, exist_ok=True)

        openweather_client = OpenWeatherClient(config_path='../weather_config.json')
        today = dt.date.today()
        now_utc = dt.datetime.now(dt.timezone.utc)

        for location_key, lat, lon in iter_locations():
            prefix = f'OpenWeather[{location_key}]'
            location_dir = openweather_dir / location_key
            location_dir.mkdir(parents=True, exist_ok=True)

            request_days = []
            request_payloads = []

            for day in DAY_RANGE:
                if day > today:
                    print(f'{prefix}: skipping {day} (future dates unsupported).')
                    continue

                start = dt.datetime.combine(day, dt.time(0, 0), tzinfo=UTC)
                end = min(start + dt.timedelta(days=1), now_utc)
                if end <= start:
                    print(f'{prefix}: skipping {day} (no elapsed time yet).')
                    continue

                output_path = location_dir / f'{day.isoformat()}.csv'
                if day != today and output_path.exists():
                    print(f'{prefix}: skipping {day} (already exported).')
                    continue

                request_payloads.append({
                    'location': (lat, lon),
                    'start_time': start,
                    'end_time': end,
                    'interval_type': 'hour',
                    'units': 'metric',
                })
                request_days.append(day)

            payloads = openweather_client.get_historical_batch(request_payloads)

            for day, payload in zip(request_days, payloads):
                if isinstance(payload, Exception):
                    print(f'{prefix}: request failed for {day}: {payload}')
                    continue
                try:
                    records = payload.get('list', [])
                    if not records:
                        print(f'{prefix}: no data for {day}')
                        continue

                    df = pd.json_normalize(records)
                    if 'dt' in df.columns:
                        df['timestamp'] = pd.to_datetime(df['dt'], unit='s', utc=True)
                    elif 'time' in df.columns:
                        df['timestamp'] = pd.to_datetime(df['time'], utc=True)
                    else:
                        print(f'{prefix}: unable to determine timestamp for {day}')
                        continue

                    df = df[df['timestamp'].dt.date == day]
                    if df.empty:
                        print(f'{prefix}: no data for {day}')
                        continue

                    output_path = location_dir / f'{day.isoformat()}.csv'
                    df.to_csv(output_path, index=False)
                    print(f'{prefix}: wrote {output_path}')
                except Exception as exc:
                    print(f'{prefix}: unexpected error for {day}: {exc}')
    except Exception as exc:
        print(f'OpenWeather: unexpected error: {exc}')


## Weatherbit export


In [None]:
if not USE_WEATHERBIT:
    print('USE_WEATHERBIT: disabled, skipping export.')
else:
    try:
        weatherbit_dir = DATA_ROOT / 'weatherbit'
        weatherbit_dir.mkdir(parents=True, exist_ok=True)

        weatherbit_client = WeatherbitClient(config_path='../weather_config.json')
        today = dt.date.today()

        def _weatherbit_payload_to_df(payload: dict, day: dt.date) -> pd.DataFrame:
            data = payload.get('data', [])
            if not data:
                return pd.DataFrame()
            df = pd.DataFrame(data)
            if 'timestamp_utc' in df.columns:
                df['timestamp'] = pd.to_datetime(df['timestamp_utc'], utc=True)
            elif 'ts' in df.columns:
                df['timestamp'] = pd.to_datetime(df['ts'], unit='s', utc=True)
            else:
                df['timestamp'] = pd.to_datetime(df.get('datetime', pd.Series(dtype=str)), utc=True, errors='coerce')
            df = df.dropna(subset=['timestamp'])
            df = df[df['timestamp'].dt.date == day]
            return df.sort_values('timestamp')

        def _write_weatherbit_results(directory: Path, prefix: str, days, payloads):
            for day, payload in zip(days, payloads):
                if isinstance(payload, Exception):
                    print(f'{prefix}: request failed for {day}: {payload}')
                    continue
                try:
                    df_day = _weatherbit_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f'{prefix}: no data for {day}')
                        continue
                    output_path = directory / f'{day.isoformat()}.csv'
                    df_day.to_csv(output_path, index=False)
                    print(f'{prefix}: wrote {output_path}')
                except Exception as exc:
                    print(f"{prefix}: unexpected error for {day}: {exc}")

        for location_key, lat, lon in iter_locations():
            prefix = f'Weatherbit[{location_key}]'
            location_dir = weatherbit_dir / location_key
            location_dir.mkdir(parents=True, exist_ok=True)

            historical_days = []
            historical_requests = []
            forecast_days = []
            forecast_requests = []

            for day in DAY_RANGE:
                output_path = location_dir / f'{day.isoformat()}.csv'
                if day != today and output_path.exists():
                    print(f'{prefix}: skipping {day} (already exported).')
                    continue

                start = dt.datetime.combine(day, dt.time(0, 0), tzinfo=UTC)
                end = start + dt.timedelta(days=1)
                if day < today:
                    historical_days.append(day)
                    historical_requests.append({
                        'location': (lat, lon),
                        'start_time': start,
                        'end_time': end,
                        'units': 'M',
                    })
                else:
                    forecast_days.append(day)
                    forecast_requests.append({
                        'location': (lat, lon),
                        'hours': 48,
                        'units': 'M',
                    })

            historical_payloads = weatherbit_client.get_historical_batch(historical_requests)
            forecast_payloads = weatherbit_client.get_forecast_batch(forecast_requests)

            _write_weatherbit_results(location_dir, prefix, historical_days, historical_payloads)
            _write_weatherbit_results(location_dir, prefix, forecast_days, forecast_payloads)
    except Exception as exc:
        print(f"Weatherbit: unexpected error: {exc}")


## WeatherAPI.com export


In [None]:
if not USE_WEATHERAPI_COM:
    print('USE_WEATHERAPI_COM: disabled, skipping export.')
else:
    try:
        weatherapi_dir = DATA_ROOT / 'weatherapi_com'
        weatherapi_dir.mkdir(parents=True, exist_ok=True)

        weatherapi_client = WeatherApiClient(config_path='../weather_config.json')
        today = dt.date.today()
        forecast_horizon = today + dt.timedelta(days=14)

        def _weatherapi_rows(block: dict) -> list:
            rows = []
            for hour in block.get('hour', []):
                row = dict(hour)
                if 'time_epoch' in hour:
                    row['timestamp'] = pd.to_datetime(hour['time_epoch'], unit='s', utc=True)
                elif 'time' in hour:
                    row['timestamp'] = pd.to_datetime(hour['time'], utc=True)
                row['forecast_date'] = block.get('date')
                rows.append(row)
            return rows

        def _weatherapi_payload_to_df(payload: dict, day: dt.date) -> pd.DataFrame:
            forecast = payload.get('forecast', {})
            forecast_days = forecast.get('forecastday', [])
            rows = []
            for block in forecast_days:
                rows.extend(_weatherapi_rows(block))
            if not rows:
                return pd.DataFrame()
            df = pd.DataFrame(rows)
            if 'timestamp' not in df:
                if 'time_epoch' in df.columns:
                    df['timestamp'] = pd.to_datetime(df['time_epoch'], unit='s', utc=True)
                elif 'time' in df.columns:
                    df['timestamp'] = pd.to_datetime(df['time'], utc=True)
                else:
                    return pd.DataFrame()
            df = df[df['timestamp'].dt.date == day]
            return df.sort_values('timestamp')

        def _write_weatherapi_results(directory: Path, prefix: str, days, payloads):
            for day, payload in zip(days, payloads):
                if isinstance(payload, Exception):
                    print(f'{prefix}: request failed for {day}: {payload}')
                    continue
                try:
                    df_day = _weatherapi_payload_to_df(payload, day)
                    if df_day.empty:
                        print(f'{prefix}: no data for {day}')
                        continue
                    output_path = directory / f'{day.isoformat()}.csv'
                    df_day.to_csv(output_path, index=False)
                    print(f'{prefix}: wrote {output_path}')
                except Exception as exc:
                    print(f"{prefix}: unexpected error for {day}: {exc}")

        for location_key, lat, lon in iter_locations():
            prefix = f'WeatherAPI.com[{location_key}]'
            location_dir = weatherapi_dir / location_key
            location_dir.mkdir(parents=True, exist_ok=True)

            historical_days = []
            historical_requests = []
            forecast_days = []
            forecast_requests = []

            for day in DAY_RANGE:
                if day > forecast_horizon:
                    print(f'{prefix}: skipping {day} (beyond 14-day forecast window).')
                    continue

                output_path = location_dir / f'{day.isoformat()}.csv'
                if day != today and output_path.exists():
                    print(f'{prefix}: skipping {day} (already exported).')
                    continue

                if day < today:
                    historical_days.append(day)
                    historical_requests.append({
                        'location': (lat, lon),
                        'date': day,
                        'aqi': 'yes',
                    })
                else:
                    forecast_days.append(day)
                    forecast_requests.append({
                        'location': (lat, lon),
                        'days': 1,
                        'start_date': day,
                        'end_date': day,
                        'aqi': 'yes',
                        'alerts': 'yes',
                    })

            historical_payloads = weatherapi_client.get_historical_batch(historical_requests)
            forecast_payloads = weatherapi_client.get_forecast_batch(forecast_requests)

            _write_weatherapi_results(location_dir, prefix, historical_days, historical_payloads)
            _write_weatherapi_results(location_dir, prefix, forecast_days, forecast_payloads)
    except Exception as exc:
        print(f"WeatherAPI.com: unexpected error: {exc}")


## Data Caching Process

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.colors import ListedColormap
PROVIDER_LABELS = {
    'tomorrow_io': 'Tomorrow.io',
    'open_meteo': 'Open-Meteo',
    'visual_crossing': 'Visual Crossing',
    'noaa_isd': 'NOAA ISD',
    'noaa_lcd': 'NOAA LCD',
    'meteostat': 'Meteostat',
    'nasa_power': 'NASA POWER',
    'iem_asos': 'IEM ASOS',
    'copernicus_era5_single': 'Copernicus ERA5 (single)',
    'copernicus_era5_land': 'Copernicus ERA5-Land',
    'copernicus_era5_pressure': 'Copernicus ERA5 (pressure)',
    'copernicus_era5_land_timeseries': 'Copernicus ERA5-Land TS',
    'openweather': 'OpenWeather',
    'weatherbit': 'Weatherbit',
    'weatherapi_com': 'WeatherAPI.com',
}
PROVIDER_RESOLUTION = {
    'tomorrow_io': 'No cached data yet (expected 5m/1h timeline)',
    'open_meteo': 'Hourly cadence (1h)',
    'visual_crossing': 'Hourly cadence (1h)',
    'noaa_isd': 'Sub-hourly METAR (median ~53 min)',
    'noaa_lcd': 'Sub-hourly LCD (median ~53 min)',
    'meteostat': 'Hourly multi-source blend (1h)',
    'nasa_power': 'Hourly NASA POWER (satellite/model)',
    'iem_asos': '1-min ASOS observations',
    'copernicus_era5_single': 'Hourly ERA5 single levels',
    'copernicus_era5_land': 'Hourly ERA5-Land (0.1 deg)',
    'copernicus_era5_pressure': 'Hourly ERA5 pressure levels (0.25 deg)',
    'copernicus_era5_land_timeseries': 'Hourly ERA5-Land point series',
    'openweather': 'Hourly observations (1h)',
    'weatherbit': 'No cached data yet',
    'weatherapi_com': 'Hourly forecast/history (1h)',
}
provider_flags = [
    ('tomorrow_io', USE_TOMORROW_IO),
    ('open_meteo', USE_OPEN_METEO),
    ('visual_crossing', USE_VISUAL_CROSSING),
    ('noaa_isd', USE_NOAA_ISD),
    ('noaa_lcd', USE_NOAA_LCD),
    ('meteostat', USE_METEOSTAT),
    ('nasa_power', USE_NASA_POWER),
    ('iem_asos', USE_IEM_ASOS),
    ('copernicus_era5_single', USE_COPERNICUS_ERA5_SINGLE),
    ('copernicus_era5_land', USE_COPERNICUS_ERA5_LAND),
    ('copernicus_era5_pressure', USE_COPERNICUS_ERA5_PRESSURE),
    ('copernicus_era5_land_timeseries', USE_COPERNICUS_ERA5_LAND_TS),
    ('openweather', USE_OPENWEATHER),
    ('weatherbit', USE_WEATHERBIT),
    ('weatherapi_com', USE_WEATHERAPI_COM),
]
date_index = pd.date_range(START_DATE, END_DATE, freq='D')
active_providers = [key for key, enabled in provider_flags if enabled and key in PROVIDER_LABELS]
if not active_providers:
    print('No providers enabled; skipping cached coverage chart.')
else:
    cmap = ListedColormap(['#f0f0f0', '#2ca02c'])
    fig, axes = plt.subplots(len(active_providers), 1, figsize=(16, 2.5 * len(active_providers)), sharex=True)
    if len(active_providers) == 1:
        axes = [axes]
    for ax, provider_key in zip(axes, active_providers):
        provider_dir = DATA_ROOT / provider_key
        coverage_rows = []
        location_names = []
        for location_name, *_coords in iter_locations():
            location_dir = provider_dir / location_name
            row = []
            for day in date_index:
                file_path = location_dir / f"{day.date().isoformat()}.csv"
                row.append(file_path.exists())
            coverage_rows.append(row)
            location_names.append(location_name)
        if not coverage_rows:
            ax.text(0.5, 0.5, 'No location data available', ha='center', va='center')
            ax.set_axis_off()
            continue
        data = np.array(coverage_rows, dtype=int)
        im = ax.imshow(data, aspect='auto', interpolation='nearest', cmap=cmap, vmin=0, vmax=1)
        ax.set_yticks(range(len(location_names)))
        ax.set_yticklabels(location_names)
        tick_count = min(len(date_index), 10)
        if tick_count > 0:
            tick_positions = np.linspace(0, len(date_index) - 1, tick_count, dtype=int)
            ax.set_xticks(tick_positions)
            ax.set_xticklabels([date_index[i].date().isoformat() for i in tick_positions], rotation=45, ha='right')
        label = PROVIDER_LABELS.get(provider_key, provider_key)
        resolution = PROVIDER_RESOLUTION.get(provider_key, '')
        title_suffix = f" ({resolution})" if resolution else ''
        ax.set_title(f"{label}{title_suffix}")
        ax.set_ylabel('Location')
    axes[-1].set_xlabel('Date')
    fig.suptitle(f"Cached coverage {START_DATE.isoformat()} â€” {END_DATE.isoformat()}")
    plt.tight_layout(rect=(0, 0, 1, 0.97))
    cbar = fig.colorbar(im, ax=axes, orientation='horizontal', fraction=0.025, pad=0.08)
    cbar.set_ticks([0, 1])
    cbar.set_ticklabels(['Missing', 'Cached'])
