In [1]:
import pandas as pd
import numpy as np
import xarray as xr
from pathlib import Path
import shutil

HPD_PATH = Path("/mnt/d/climate_data/hpd/data/")
GHCND_PATH = Path("/mnt/d/climate_data/ghcnd/data/")
DATA_DOC_PATH = Path("./data/dataset_docs/")
OE_PATH = Path("/mnt/d/climate_data/HPD_CONUS_OEVENTS/")

VIMD_PATH = Path("/mnt/d/climate_data/ERA5_CONUS_VIMD/")
W500_PATH = Path("/mnt/d/climate_data/ERA5_CONUS/")

# Path for ERA5_df
ERA5_PATH = Path("/mnt/d/climate_data/ERA5_CONUS_STATIONS/")
ERA5_PATH.mkdir(parents=True, exist_ok=True)

In [2]:
def load_era5_data(start_year, end_year):
    print(f"Loading ERA5 data from {start_year} to {end_year}...")

    w500_files = sorted([
        f for f in W500_PATH.glob("ERA5_CONUS_W500_*.grib")
        if start_year <= int(f.name.split("_")[-1][:4]) <= end_year
    ])
    w500_ds = xr.concat(
        [xr.open_dataset(f, engine="cfgrib") for f in w500_files], dim="time"
    )

    vimd_files = sorted([
        f for f in VIMD_PATH.glob("ERA5_CONUS_VIMD_*.grib")
        if start_year <= int(f.name.split("_")[-1][:4]) <= end_year
    ])
    vimd_ds = xr.concat(
        [xr.open_dataset(f, engine="cfgrib") for f in vimd_files], dim="time"
    )

    print(f"ERA5 data ({start_year}-{end_year}) loaded successfully.")
    return w500_ds, vimd_ds


def extract_era5_at_station(lat, lon, utc_offset, w500_ds, vimd_ds):
    """
    Notes: ERA5 short forecast data like VIMD are recorded as forecast steps (1-12 hours)
    from two daily forecast start times (06 and 18 UTC). Since files are stored
    monthly, the forecast steps at the boundaries (first and last day of the month)
    may point to times outside the file's time range. This causes NaN values at
    the start/end of each month.

    Instead of globally cleaning the full dataset, which is large, these
    NaNs are removed locally per station after selecting and flattening the relevant series
    for memory efficiency.
    """
    # W500 (direct hourly data)
    w500_series = (
        w500_ds.w.sel(longitude=lon, latitude=lat, method="nearest")
        .to_series()
        .rename("W500")
    )
    w500_series.index += utc_offset  # UTC → local time

    # VIMD (short forecast)
    valid_time = vimd_ds.time.values[:, None] + vimd_ds.step.values[None, :]
    vimd_values = vimd_ds.vimd.sel(
        longitude=lon, latitude=lat, method="nearest"
    ).values.ravel()
    valid_time = valid_time.ravel()

    unique_mask = ~np.isnan(vimd_values)
    vimc_series = (
        pd.Series(
            vimd_values[unique_mask], index=pd.to_datetime(valid_time[unique_mask])
        )
        .mul(-1)  # Convert to VIMC
        .rename("VIMC")
    )
    vimc_series.index += utc_offset  # UTC → local time

    return pd.concat([w500_series, vimc_series], axis=1)


# Extract data in batches

In [3]:
potential_stations = pd.read_csv(DATA_DOC_PATH / "hpd_hadisd.csv")
#potential_stations = pd.read_csv(DATA_DOC_PATH / "potential_pt_stations.csv")

temp_dir = ERA5_PATH / "temp"
temp_dir.mkdir(parents=True, exist_ok=True)

# Filter out stations that already have final output files
stations_to_process = []
for idx, row in potential_stations.iterrows():
    stn_id = row["StnID"]
    final_file = ERA5_PATH / f"{stn_id}.csv"
    if not final_file.exists():
        stations_to_process.append((idx, row))

print(f"Processing {len(stations_to_process)} out of {len(potential_stations)} stations")

Processing 604 out of 1324 stations


In [4]:
if stations_to_process:
    #batches = [(1940 + i*5, 1944 + i*5) for i in range(17)] 
    batches = np.tile(np.arange(1940,2025), (2,1)).T

    for jj, (start_year, end_year) in enumerate(batches):
        # Load only data in the batch range
        w500_ds, vimd_ds = load_era5_data(start_year, end_year)

        for idx, row in stations_to_process:
            stn_id = row["StnID"]
            lon, lat = row["Lon"], row["Lat"]
            utc_offset = pd.Timedelta(hours=row["UTC_Offset"])

            # Extract local hourly ERA5 → daily
            era5_df = extract_era5_at_station(lat, lon, utc_offset, w500_ds, vimd_ds)

            # Save partial result for merging later
            era5_df.to_csv(temp_dir / f"{stn_id}_temp_b{jj}.csv")

        w500_ds.close()
        vimd_ds.close()

else:
    print("All stations already processed - nothing to do!")

Loading ERA5 data from 1940 to 1940...
ERA5 data (1940-1940) loaded successfully.
Loading ERA5 data from 1941 to 1941...
ERA5 data (1941-1941) loaded successfully.
Loading ERA5 data from 1942 to 1942...
ERA5 data (1942-1942) loaded successfully.
Loading ERA5 data from 1943 to 1943...
ERA5 data (1943-1943) loaded successfully.
Loading ERA5 data from 1944 to 1944...
ERA5 data (1944-1944) loaded successfully.
Loading ERA5 data from 1945 to 1945...
ERA5 data (1945-1945) loaded successfully.
Loading ERA5 data from 1946 to 1946...
ERA5 data (1946-1946) loaded successfully.
Loading ERA5 data from 1947 to 1947...
ERA5 data (1947-1947) loaded successfully.
Loading ERA5 data from 1948 to 1948...
ERA5 data (1948-1948) loaded successfully.
Loading ERA5 data from 1949 to 1949...
ERA5 data (1949-1949) loaded successfully.
Loading ERA5 data from 1950 to 1950...
ERA5 data (1950-1950) loaded successfully.
Loading ERA5 data from 1951 to 1951...
ERA5 data (1951-1951) loaded successfully.
Loading ERA5 dat

In [5]:
# Final concatenation step
if stations_to_process:
    for idx, row in stations_to_process:
        stn_id = row["StnID"]
        merged = []
        for jj in range(len(batches)):
            temp_file = temp_dir / f"{stn_id}_temp_b{jj}.csv"
            merged.append(pd.read_csv(temp_file, index_col=0, parse_dates=True))
        full_df = pd.concat(merged).sort_index()
        full_df.to_csv(ERA5_PATH / f"{stn_id}.csv")

In [6]:
shutil.rmtree(temp_dir)