# Data Pipeline Orchestration

In [None]:
import os
import sys
from pathlib import Path
from types import ModuleType
import importlib

# --- 1. Mount Google Drive ---
try:
    from google.colab import drive
    drive.mount('/content/drive')
    IN_COLAB = True
except ImportError:
    IN_COLAB = False

# --- 2. Setup Workspace ---
if IN_COLAB:
    REPO_NAME = "temp-data-pipeline"
    WORKSPACE_DIR = Path(f"/content/drive/MyDrive/{REPO_NAME}")
    REPO_URL = f"https://github.com/kyler505/{REPO_NAME}.git"
    
    # Clone if missing
    if not WORKSPACE_DIR.exists():
        print(f"Cloning {REPO_NAME} to Drive...")
        !git clone {REPO_URL} {str(WORKSPACE_DIR)}
    
    os.chdir(WORKSPACE_DIR)
    if str(WORKSPACE_DIR) not in sys.path:
        sys.path.insert(0, str(WORKSPACE_DIR))
else:
    # Local development setup
    cwd = Path.cwd().resolve()
    project_root = None
    for parent in [cwd] + list(cwd.parents):
        if (parent / "pyproject.toml").exists():
            project_root = parent
            break
    if project_root:
        os.chdir(project_root)
        if str(project_root) not in sys.path:
            sys.path.insert(0, str(project_root))
        print(f"Local environment ready: {project_root}")

# --- 3. Python 3.12 Compatibility Shim (imp module) ---
try:
    import imp
except ImportError:
    print("Applying 'imp' module shim...")
    imp_shim = ModuleType("imp")
    imp_shim.reload = importlib.reload
    sys.modules["imp"] = imp_shim

# --- 4. Run Bootstrap ---
try:
    from tempdata.utils.colab import bootstrap
    bootstrap(use_wandb=True)
except ImportError:
    print("Failed to import bootstrap utility. Ensure repo is on sys.path.")

print("Environment initialization complete.")

In [4]:
from pathlib import Path
import os

# CDS API credentials
# Get your key from: https://cds.climate.copernicus.eu/profile
CDS_API_KEY = "6ac45a56-7542-47fb-acf8-61975dbca72d"  # <-- Replace with your key (format UID:KEY)
CDS_API_URL = "https://cds.climate.copernicus.eu/api"

# Write ~/.cdsapirc if not exists
cdsapirc_path = Path.home() / ".cdsapirc"
if not cdsapirc_path.exists() and CDS_API_KEY != "YOUR_CDS_API_KEY":
    cdsapirc_path.write_text(f"url: {CDS_API_URL}\nkey: {CDS_API_KEY}\n")
    print(f"Created {cdsapirc_path}")
elif cdsapirc_path.exists():
    print(f"CDS credentials already configured at {cdsapirc_path}")
else:
    print("⚠️  Set CDS_API_KEY above to configure ERA5 access")

CDS credentials already configured at /Users/kcao/.cdsapirc


## Fetch hourly data

Configure a station and date range, then run the fetcher.

In [6]:
from tempdata.fetch.noaa_hourly import fetch_noaa_hourly

STATION_ID = "KLGA"
START_DATE = "2010-01-01"
END_DATE = "2025-01-01"  # exclusive

OUTPUT_DIR = DATA_DIR / "raw" / "noaa_hourly" / STATION_ID
CACHE_DIR = DATA_DIR / "cache" / "isd_csv" / STATION_ID

written = fetch_noaa_hourly(
    station_id=STATION_ID,
    start_date=START_DATE,
    end_date=END_DATE,
    out_dir=OUTPUT_DIR,
    cache_dir=CACHE_DIR,
)

print(f"Wrote {len(written)} parquet files:")
for path in written:
    print(f"  - {path}")

[isd] 2010: rows=13494 coverage=2010-01-01 00:00:00+00:00 -> 2010-12-31 23:51:00+00:00
[isd] 2011: rows=13955 coverage=2011-01-01 00:00:00+00:00 -> 2011-12-31 23:51:00+00:00
[isd] 2012: rows=13704 coverage=2012-01-01 00:00:00+00:00 -> 2012-12-31 23:51:00+00:00
[isd] 2013: rows=13659 coverage=2013-01-01 00:00:00+00:00 -> 2013-12-31 23:51:00+00:00
[isd] 2014: rows=13790 coverage=2014-01-01 00:00:00+00:00 -> 2014-12-31 23:51:00+00:00
[isd] 2015: rows=13668 coverage=2015-01-01 00:00:00+00:00 -> 2015-12-31 23:51:00+00:00
[isd] 2016: rows=13375 coverage=2016-01-01 00:00:00+00:00 -> 2016-12-31 23:51:00+00:00
[isd] 2017: rows=14043 coverage=2017-01-01 00:00:00+00:00 -> 2017-12-31 23:51:00+00:00
[isd] 2018: rows=14280 coverage=2018-01-01 00:00:00+00:00 -> 2018-12-31 23:51:00+00:00
[isd] 2019: rows=14081 coverage=2019-01-01 00:00:00+00:00 -> 2019-12-31 23:51:00+00:00
[isd] 2020: rows=13841 coverage=2020-01-01 00:00:00+00:00 -> 2020-12-31 23:51:00+00:00
[isd] 2021: rows=13565 coverage=2021-01-01 

## Verify outputs

Load one parquet file to confirm the fetch results.

In [7]:
import pandas as pd
from tempdata.schemas import validate_hourly_obs

parquet_files = sorted(OUTPUT_DIR.glob("*.parquet"))
if not parquet_files:
    raise FileNotFoundError(f"No parquet files found in {OUTPUT_DIR}")

# Load ALL parquet files and concatenate
dfs = []
for pf in parquet_files:
    df_year = pd.read_parquet(pf)
    dfs.append(df_year)
    print(f"Loaded {len(df_year)} rows from {pf.name}")

df = pd.concat(dfs, ignore_index=True)
print(f"\nTotal: {len(df)} rows from {len(parquet_files)} files")

# Validate schema (will raise if invalid)
validate_hourly_obs(df, require_unique_keys=False)
print("Schema validation passed")

print(df.head())
print(f"Date range: {df['ts_utc'].min()} to {df['ts_utc'].max()}")

Loaded 13375 rows from 2016.parquet
Loaded 14043 rows from 2017.parquet
Loaded 14280 rows from 2018.parquet
Loaded 14081 rows from 2019.parquet
Loaded 13841 rows from 2020.parquet
Loaded 13565 rows from 2021.parquet
Loaded 13653 rows from 2022.parquet
Loaded 13647 rows from 2023.parquet
Loaded 13414 rows from 2024.parquet
Loaded 8808 rows from 2025.parquet
Loaded 13494 rows from isd_2010.parquet
Loaded 13955 rows from isd_2011.parquet
Loaded 13704 rows from isd_2012.parquet
Loaded 13659 rows from isd_2013.parquet
Loaded 13790 rows from isd_2014.parquet
Loaded 13668 rows from isd_2015.parquet
Loaded 13375 rows from isd_2016.parquet
Loaded 14043 rows from isd_2017.parquet
Loaded 14280 rows from isd_2018.parquet
Loaded 14081 rows from isd_2019.parquet
Loaded 13841 rows from isd_2020.parquet
Loaded 13565 rows from isd_2021.parquet
Loaded 13653 rows from isd_2022.parquet
Loaded 13647 rows from isd_2023.parquet
Loaded 13414 rows from isd_2024.parquet

Total: 338876 rows from 25 files
Schema 

## Clean hourly data

Apply the cleaning pipeline to the fetched data:
- Validate input schema (early fail on malformed data)
- Sort and deduplicate by (ts_utc, station_id)
- Flag missing temperature values
- Flag and nullify out-of-range temperatures
- Detect hour-to-hour spikes

In [8]:
from tempdata.clean import clean_hourly_obs

# Clean the fetched data
# This applies: deduplication, missing value flags, out-of-range handling, spike detection
df_clean = clean_hourly_obs(df)

print(f"\nCleaned DataFrame shape: {df_clean.shape}")
print(df_clean.head())

[clean] Cleaning summary:
  Total rows: 338876 -> 214686 (124190 duplicates removed)
  Rows with QC flags: 5770
    QC_MISSING_VALUE: 5769
    QC_SPIKE_DETECTED: 1
  Temp range (valid): -17.2C to 39.4C

Cleaned DataFrame shape: (214686, 7)
                     ts_utc station_id       lat       lon  temp_c source  \
0 2010-01-01 00:00:00+00:00       KLGA  40.77944 -73.88035     1.1    isd   
1 2010-01-01 00:51:00+00:00       KLGA  40.77944 -73.88035     1.1    isd   
2 2010-01-01 01:36:00+00:00       KLGA  40.77944 -73.88035     1.0    isd   
3 2010-01-01 01:51:00+00:00       KLGA  40.77944 -73.88035     1.1    isd   
4 2010-01-01 02:01:00+00:00       KLGA  40.77944 -73.88035     1.0    isd   

   qc_flags  
0         0  
1         0  
2         0  
3         0  
4         0  


## Aggregate to Daily Tmax

Convert cleaned hourly observations to daily maximum temperature (Tmax).

Key design principles:
- **Market-aligned**: Tmax is computed per station-local calendar day, not UTC
- **QC-aware**: Hours with `QC_OUT_OF_RANGE` are excluded from Tmax calculation
- **Spike-inclusive**: Spike-flagged values ARE included (to avoid removing real heat spikes)
- **Transparent**: Every day carries `coverage_hours` and propagated `qc_flags`

In [9]:
from tempdata.aggregate.build_daily_tmax import build_daily_tmax
from tempdata.schemas.daily_tmax import validate_daily_tmax

# Station timezone (KLGA is in Eastern time)
STATION_TZ = "America/New_York"

# Build daily Tmax from cleaned hourly data
df_daily = build_daily_tmax(df_clean, station_tz=STATION_TZ)

# Validate the output schema
validate_daily_tmax(df_daily)
print("Daily Tmax schema validation passed")

print(f"\nAggregated {len(df_clean)} hourly obs -> {len(df_daily)} daily records")
print(f"Date range: {df_daily['date_local'].min().date()} to {df_daily['date_local'].max().date()}")

print("\nDaily Tmax summary:")
print(df_daily[["date_local", "tmax_c", "tmax_f", "coverage_hours", "qc_flags"]].head(10))

Daily Tmax schema validation passed

Aggregated 214686 hourly obs -> 5718 daily records
Date range: 2009-12-31 to 2025-08-26

Daily Tmax summary:
                 date_local  tmax_c  tmax_f  coverage_hours  qc_flags
0 2009-12-31 00:00:00-05:00     1.1    34.0               5        17
1 2010-01-01 00:00:00-05:00     3.9    39.0              24         1
2 2010-01-02 00:00:00-05:00     0.0    32.0              24         1
3 2010-01-03 00:00:00-05:00    -5.0    23.0              24         1
4 2010-01-04 00:00:00-05:00    -0.6    30.9              24         1
5 2010-01-05 00:00:00-05:00    -0.6    30.9              24         1
6 2010-01-06 00:00:00-05:00     1.1    34.0              24         1
7 2010-01-07 00:00:00-05:00     3.3    37.9              24         1
8 2010-01-08 00:00:00-05:00     1.1    34.0              24         1
9 2010-01-09 00:00:00-05:00    -0.6    30.9              24         1


## Coverage and Quality Analysis

Check data quality metrics for the aggregated daily Tmax.

In [10]:
from tempdata.schemas.qc_flags import QC_LOW_COVERAGE, QC_INCOMPLETE_DAY, QC_SPIKE_DETECTED

# Coverage statistics
print("Coverage Statistics:")
print(f"  Min coverage: {df_daily['coverage_hours'].min()} hours")
print(f"  Max coverage: {df_daily['coverage_hours'].max()} hours")
print(f"  Mean coverage: {df_daily['coverage_hours'].mean():.1f} hours")
print(f"  Days with 24h coverage: {(df_daily['coverage_hours'] == 24).sum()}")

# QC flag breakdown
print("\nQC Flag Analysis:")
low_coverage_days = ((df_daily['qc_flags'] & QC_LOW_COVERAGE) != 0).sum()
incomplete_days = ((df_daily['qc_flags'] & QC_INCOMPLETE_DAY) != 0).sum()
spike_days = ((df_daily['qc_flags'] & QC_SPIKE_DETECTED) != 0).sum()

print(f"  Days with QC_LOW_COVERAGE: {low_coverage_days}")
print(f"  Days with QC_INCOMPLETE_DAY: {incomplete_days}")
print(f"  Days with QC_SPIKE_DETECTED: {spike_days}")
print(f"  Days with no QC issues: {(df_daily['qc_flags'] == 0).sum()}")

# Temperature range
print("\nTemperature Range:")
print(f"  Min Tmax: {df_daily['tmax_c'].min():.1f}°C ({df_daily['tmax_f'].min():.1f}°F)")
print(f"  Max Tmax: {df_daily['tmax_c'].max():.1f}°C ({df_daily['tmax_f'].max():.1f}°F)")
print(f"  Mean Tmax: {df_daily['tmax_c'].mean():.1f}°C ({df_daily['tmax_f'].mean():.1f}°F)")

Coverage Statistics:
  Min coverage: 5 hours
  Max coverage: 24 hours
  Mean coverage: 24.0 hours
  Days with 24h coverage: 5696

QC Flag Analysis:
  Days with QC_LOW_COVERAGE: 1
  Days with QC_INCOMPLETE_DAY: 0
  Days with QC_SPIKE_DETECTED: 1
  Days with no QC issues: 19

Temperature Range:
  Min Tmax: -10.0°C (14.0°F)
  Max Tmax: 39.4°C (102.9°F)
  Mean Tmax: 17.6°C (63.6°F)


## Save Daily Tmax

Write the daily Tmax data to parquet for downstream use (backtesting, model training, trading validation).

In [11]:
from tempdata.aggregate.build_daily_tmax import write_daily_tmax

# Output paths - partition daily Tmax by year like hourly data
DAILY_TMAX_DIR = DATA_DIR / "clean" / "daily_tmax" / STATION_ID
DAILY_TMAX_DIR.mkdir(parents=True, exist_ok=True)

# Determine year range for partitioning
years = df_daily["date_local"].dt.year.unique()
for year in years:
    year_df = df_daily[df_daily["date_local"].dt.year == year]
    year_path = DAILY_TMAX_DIR / f"{year}.parquet"
    year_df.to_parquet(year_path, index=False)
    print(f"[aggregate] Wrote {len(year_df)} rows to {year_path}")

# Also save cleaned hourly data for reference
HOURLY_CLEAN_DIR = DATA_DIR / "clean" / "hourly_obs" / STATION_ID
HOURLY_CLEAN_DIR.mkdir(parents=True, exist_ok=True)

# Determine year range for partitioning
years = df_clean["ts_utc"].dt.year.unique()
for year in years:
    year_df = df_clean[df_clean["ts_utc"].dt.year == year]
    year_path = HOURLY_CLEAN_DIR / f"{year}.parquet"
    year_df.to_parquet(year_path, index=False)
    print(f"[clean] Wrote {len(year_df)} rows to {year_path}")

print(f"\nPipeline complete!")
print(f"  Daily Tmax: {DAILY_TMAX_DIR}")
print(f"  Cleaned hourly: {HOURLY_CLEAN_DIR}")

[aggregate] Wrote 1 rows to /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/2009.parquet
[aggregate] Wrote 365 rows to /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/2010.parquet
[aggregate] Wrote 365 rows to /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/2011.parquet
[aggregate] Wrote 366 rows to /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/2012.parquet
[aggregate] Wrote 365 rows to /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/2013.parquet
[aggregate] Wrote 365 rows to /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/2014.parquet
[aggregate] Wrote 365 rows to /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/2015.parquet
[aggregate] Wrote 366 rows to /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/2016.parquet
[aggregate] Wrote 365 rows to /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/2017.parquet
[ag

## Verify Saved Data

Reload the saved parquet to confirm it was written correctly.

In [12]:
# Reload and verify the saved daily Tmax data (partitioned by year)
daily_tmax_files = list(DAILY_TMAX_DIR.glob("*.parquet"))
daily_tmax_dfs = []
for f in daily_tmax_files:
    df_year = pd.read_parquet(f)
    daily_tmax_dfs.append(df_year)
    print(f"Loaded {len(df_year)} rows from {f.name}")

df_verify = pd.concat(daily_tmax_dfs, ignore_index=True)
print(f"\nTotal: {len(df_verify)} daily records from {len(daily_tmax_files)} files")

# Validate schema
validate_daily_tmax(df_verify)
print("Schema validation passed")

# Show sample of dataset
print("\nDaily Tmax Data (first 10 rows):")
print(df_verify.head(10).to_string(index=False))

Loaded 365 rows from 2023.parquet
Loaded 365 rows from 2015.parquet
Loaded 365 rows from 2014.parquet
Loaded 365 rows from 2022.parquet
Loaded 366 rows from 2016.parquet
Loaded 366 rows from 2020.parquet
Loaded 365 rows from 2021.parquet
Loaded 365 rows from 2017.parquet
Loaded 366 rows from 2012.parquet
Loaded 366 rows from 2024.parquet
Loaded 238 rows from 2025.parquet
Loaded 365 rows from 2013.parquet
Loaded 365 rows from 2018.parquet
Loaded 365 rows from 2011.parquet
Loaded 365 rows from 2010.parquet
Loaded 1 rows from 2009.parquet
Loaded 365 rows from 2019.parquet

Total: 5718 daily records from 17 files
Schema validation passed

Daily Tmax Data (first 10 rows):
               date_local station_id  tmax_c  tmax_f  coverage_hours   source  qc_flags                   updated_at_utc
2023-01-01 00:00:00-05:00       KLGA    12.2    54.0              24 noaa_isd         1 2026-01-21 17:37:26.131952+00:00
2023-01-02 00:00:00-05:00       KLGA    13.3    55.9              24 noaa_isd     

## Fetch Open-Meteo Historical Forecasts

Ingest **historical** daily Tmax forecasts from Open-Meteo for the same station and date range as the truth data.

This creates the **feature-side** dataset: "What did the forecast say at issue time about a target local date?"

Key concepts:
- **Issue time**: when the forecast was issued (simulated as midnight UTC of the day before target)
- **Target date**: the station-local calendar date being forecasted
- **Lead hours**: hours from issue time to target date midnight in station timezone

Using historical forecasts allows us to join forecasts to truth data for model training and backtesting.

## Fetch ERA5 Data (Deep Historical)

For dates before 2016, use ERA5 reanalysis data.

In [13]:
from tempdata.fetch.era5_hourly import fetch_era5_hourly
import pandas as pd

ERA5_CUTOFF = "2016-01-01"
if pd.Timestamp(START_DATE) < pd.Timestamp(ERA5_CUTOFF):
    era5_end = min(pd.Timestamp(END_DATE), pd.Timestamp(ERA5_CUTOFF)).strftime("%Y-%m-%d")
    print(f"Fetching ERA5 data for {START_DATE} to {era5_end}")

    try:
        era5_files = fetch_era5_hourly(
            station_id=STATION_ID,
            start_date=START_DATE,
            end_date=era5_end,
            out_dir=DATA_DIR / "raw" / "era5" / STATION_ID,
        )
        print(f"Wrote {len(era5_files)} ERA5 parquet files")
    except ImportError:
        print("Skipping ERA5: dependencies not installed (run setup cells above)")
    except Exception as e:
        print(f"Skipping ERA5: {e}")
else:
    print("No ERA5 data needed (start date >= 2016)")

Fetching ERA5 data for 2010-01-01 to 2016-01-01
[era5] 2010: fetching full year...


2026-01-21 11:37:27,827 INFO [2025-12-11T00:00:00] Please note that a dedicated catalogue entry for this dataset, post-processed and stored in Analysis Ready Cloud Optimized (ARCO) format (Zarr), is available for optimised time-series retrievals (i.e. for retrieving data from selected variables for a single point over an extended period of time in an efficient way). You can discover it [here](https://cds.climate.copernicus.eu/datasets/reanalysis-era5-single-levels-timeseries?tab=overview)
2026-01-21 11:37:27,828 INFO Request ID is a90b6b6e-5083-4793-8873-ea401274adef
2026-01-21 11:37:28,003 INFO status has been updated to accepted
2026-01-21 11:37:42,073 INFO status has been updated to running
2026-01-21 11:47:54,399 INFO status has been updated to successful


1fdc087550656c98380dc0c7acf13853.nc:   0%|          | 0.00/1.02M [00:00<?, ?B/s]

[era5] 2010: wrote 8760 rows -> /Users/kcao/Documents/temp-data-pipeline/data/raw/era5/KLGA/era5_2010.parquet
[era5] 2011: fetching full year...


2026-01-21 11:47:59,201 INFO [2025-12-11T00:00:00] Please note that a dedicated catalogue entry for this dataset, post-processed and stored in Analysis Ready Cloud Optimized (ARCO) format (Zarr), is available for optimised time-series retrievals (i.e. for retrieving data from selected variables for a single point over an extended period of time in an efficient way). You can discover it [here](https://cds.climate.copernicus.eu/datasets/reanalysis-era5-single-levels-timeseries?tab=overview)
2026-01-21 11:47:59,203 INFO Request ID is 84beb07c-4401-4d0d-9d33-b386c9f4b165
2026-01-21 11:47:59,404 INFO status has been updated to accepted
2026-01-21 11:48:08,449 INFO status has been updated to running
2026-01-21 12:00:23,933 INFO status has been updated to successful


6f71ecf4f417d0c237b3bdffea3d1f9.nc:   0%|          | 0.00/1.02M [00:00<?, ?B/s]

[era5] 2011: wrote 8760 rows -> /Users/kcao/Documents/temp-data-pipeline/data/raw/era5/KLGA/era5_2011.parquet
[era5] 2012: fetching full year...


2026-01-21 12:00:35,598 INFO [2025-12-11T00:00:00] Please note that a dedicated catalogue entry for this dataset, post-processed and stored in Analysis Ready Cloud Optimized (ARCO) format (Zarr), is available for optimised time-series retrievals (i.e. for retrieving data from selected variables for a single point over an extended period of time in an efficient way). You can discover it [here](https://cds.climate.copernicus.eu/datasets/reanalysis-era5-single-levels-timeseries?tab=overview)
2026-01-21 12:00:35,599 INFO Request ID is 5084c997-cd2b-4e7d-b64a-4d65d6d5ca35
2026-01-21 12:00:35,778 INFO status has been updated to accepted
2026-01-21 12:00:50,072 INFO status has been updated to running
2026-01-21 12:11:03,860 INFO status has been updated to successful


7573ff022761d8f448066e28f846a569.nc:   0%|          | 0.00/1.02M [00:00<?, ?B/s]

[era5] 2012: wrote 8784 rows -> /Users/kcao/Documents/temp-data-pipeline/data/raw/era5/KLGA/era5_2012.parquet
[era5] 2013: fetching full year...


2026-01-21 12:11:06,838 INFO [2025-12-11T00:00:00] Please note that a dedicated catalogue entry for this dataset, post-processed and stored in Analysis Ready Cloud Optimized (ARCO) format (Zarr), is available for optimised time-series retrievals (i.e. for retrieving data from selected variables for a single point over an extended period of time in an efficient way). You can discover it [here](https://cds.climate.copernicus.eu/datasets/reanalysis-era5-single-levels-timeseries?tab=overview)
2026-01-21 12:11:06,839 INFO Request ID is ced1c057-3b26-407c-9319-058f31b4b32c
2026-01-21 12:11:07,025 INFO status has been updated to accepted
2026-01-21 12:11:15,927 INFO status has been updated to running
Recovering from connection error [('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))], attempt 1 of 500
Retrying in 120 seconds
Recovering from connection error [HTTPSConnectionPool(host='cds.climate.copernicus.eu', port=443): Max retries exceeded with ur

9df9e41cdc70afaf89e88d36ce4482a3.nc:   0%|          | 0.00/1.02M [00:00<?, ?B/s]

Recovering from connection error [HTTPSConnectionPool(host='object-store.os-api.cci2.ecmwf.int', port=443): Read timed out. (read timeout=60)], attempt 1 of 500
Retrying in 120 seconds


9df9e41cdc70afaf89e88d36ce4482a3.nc:   0%|          | 0.00/1.02M [00:00<?, ?B/s]

[era5] 2013: wrote 8760 rows -> /Users/kcao/Documents/temp-data-pipeline/data/raw/era5/KLGA/era5_2013.parquet
[era5] 2014: fetching full year...


Recovering from connection error [HTTPSConnectionPool(host='cds.climate.copernicus.eu', port=443): Read timed out. (read timeout=60)], attempt 1 of 500
Retrying in 120 seconds
2026-01-21 13:49:35,662 INFO [2025-12-11T00:00:00] Please note that a dedicated catalogue entry for this dataset, post-processed and stored in Analysis Ready Cloud Optimized (ARCO) format (Zarr), is available for optimised time-series retrievals (i.e. for retrieving data from selected variables for a single point over an extended period of time in an efficient way). You can discover it [here](https://cds.climate.copernicus.eu/datasets/reanalysis-era5-single-levels-timeseries?tab=overview)
2026-01-21 13:49:35,664 INFO Request ID is 3ea7ab89-ef7c-42d8-88a9-c6c459f8b2da
2026-01-21 13:49:35,841 INFO status has been updated to accepted
2026-01-21 13:49:49,456 INFO status has been updated to running
Recovering from connection error [HTTPSConnectionPool(host='cds.climate.copernicus.eu', port=443): Read timed out. (read 

aaa5e4d3880eecd97408ff09ed9a0b58.nc:   0%|          | 0.00/1.02M [00:00<?, ?B/s]

[era5] 2014: wrote 8760 rows -> /Users/kcao/Documents/temp-data-pipeline/data/raw/era5/KLGA/era5_2014.parquet
[era5] 2015: using cached /Users/kcao/Documents/temp-data-pipeline/data/raw/era5/KLGA/era5_2015.parquet
Wrote 6 ERA5 parquet files


In [14]:
from tempdata.fetch.openmeteo_daily_forecast import fetch_openmeteo_historical_forecasts
from tempdata.schemas.daily_tmax_forecast import validate_daily_tmax_forecast
from datetime import datetime, timedelta
import pandas as pd

# Use the same date range as the truth data (from NOAA fetch)
OPENMETEO_MIN_DATE = "2016-01-01"
if pd.Timestamp(START_DATE) < pd.Timestamp(OPENMETEO_MIN_DATE):
    print(f"[notebook] Adjusting Open-Meteo start date from {START_DATE} to {OPENMETEO_MIN_DATE} (API limit)")
    FORECAST_START_DATE = OPENMETEO_MIN_DATE
else:
    FORECAST_START_DATE = START_DATE

FORECAST_END_DATE = END_DATE

# Adjust end date: NOAA uses exclusive end, Open-Meteo uses inclusive
end_dt = datetime.strptime(FORECAST_END_DATE, "%Y-%m-%d") - timedelta(days=1)
forecast_end_date = end_dt.strftime("%Y-%m-%d")

if pd.Timestamp(FORECAST_START_DATE) <= pd.Timestamp(forecast_end_date):
    print(f"Fetching historical forecasts for {STATION_ID}")
    print(f"Date range: {FORECAST_START_DATE} to {forecast_end_date}")

    # Output directories
    FORECAST_RAW_DIR = DATA_DIR / "raw" / "forecasts" / "openmeteo" / STATION_ID
    FORECAST_CLEAN_DIR = DATA_DIR / "clean" / "forecasts" / "openmeteo" / STATION_ID

    forecast_files, df_forecast_om = fetch_openmeteo_historical_forecasts(
        station_id=STATION_ID,
        start_date=FORECAST_START_DATE,
        end_date=forecast_end_date,
        out_raw_dir=FORECAST_RAW_DIR,
        out_parquet_dir=FORECAST_CLEAN_DIR,
        write_raw=True,  # Save raw JSON for debugging
    )

    print(f"\nWrote {len(forecast_files)} files:")
    for path in forecast_files:
        print(f"  - {path}")
else:
    print("Open-Meteo forecast range is empty (dates are pre-2016)")
    df_forecast_om = pd.DataFrame()


[notebook] Adjusting Open-Meteo start date from 2010-01-01 to 2016-01-01 (API limit)
Fetching historical forecasts for KLGA
Date range: 2016-01-01 to 2024-12-31

Wrote 2 files:
  - /Users/kcao/Documents/temp-data-pipeline/data/raw/forecasts/openmeteo/KLGA/historical_2016-01-01_to_2024-12-31.json
  - /Users/kcao/Documents/temp-data-pipeline/data/clean/forecasts/openmeteo/KLGA/historical_2016-01-01_to_2024-12-31.parquet


In [15]:
# Consolidate forecasts (ERA5 + Open-Meteo)
from tempdata.aggregate.build_daily_tmax import build_daily_tmax
from datetime import timezone

forecast_dfs = []

# 1. Add ERA5 data if available (as perfect forecast proxy)
era5_dir = DATA_DIR / "raw" / "era5" / STATION_ID
if era5_dir.exists():
    era5_files = list(era5_dir.glob("*.parquet"))
    if era5_files:
        # Load ERA5 (hourly)
        df_era5_hourly = pd.concat([pd.read_parquet(f) for f in era5_files])
        print(f"Loaded {len(df_era5_hourly)} hourly ERA5 rows")

        # Aggregate to Daily Tmax
        # Note: We rely on STATION_TZ being defined in previous cells (Cell 32)
        if 'STATION_TZ' not in locals():
            STATION_TZ = "America/New_York"
            print(f"Warning: STATION_TZ was not defined, defaulting to {STATION_TZ}")

        df_era5_daily = build_daily_tmax(df_era5_hourly, station_tz=STATION_TZ)
        print(f"Aggregated ERA5 to {len(df_era5_daily)} daily Tmax records")

        # Transform to Forecast Schema
        # We need to replicate the 'lead_hours' structure from the actual forecasts
        # to ensure the model sees consistent features.

        # Get lead hours from Open-Meteo data if available, otherwise default to [28, 29]
        if 'df_forecast_om' in locals() and not df_forecast_om.empty:
            lead_hours_list = sorted(df_forecast_om['lead_hours'].unique())
        else:
            lead_hours_list = [28, 29] # Default for daily tmax (issue ~midnight UTC prev day)

        print(f"Synthesizing ERA5 forecasts for lead hours: {lead_hours_list}")

        era5_forecasts = []
        now_utc = pd.Timestamp.now(tz=timezone.utc)

        # Get station metadata from first row/file if possible, or use STATION_ID
        # The hourly data has lat/lon
        lat = df_era5_hourly['lat'].iloc[0] if not df_era5_hourly.empty else 0.0
        lon = df_era5_hourly['lon'].iloc[0] if not df_era5_hourly.empty else 0.0

        for lead in lead_hours_list:
            # Create a copy for this lead time
            df_lead = df_era5_daily.copy()

            # Map columns
            # daily_tmax has: date_local, tmax_c, tmax_f, coverage_hours, qc_flags, source, updated_at_utc
            # forecast has: station_id, lat, lon, issue_time_utc, target_date_local, tmax_pred_c, tmax_pred_f, lead_hours, model, source, ingested_at_utc

            df_lead['station_id'] = STATION_ID
            df_lead['lat'] = lat
            df_lead['lon'] = lon
            df_lead['target_date_local'] = df_lead['date_local'].dt.tz_localize(None) # Remove tz for schema

            # Calculate issue time: target_date (midnight local) - lead_hours
            # We need to be careful with timezones.
            # The simple approximation:
            # target_date_local (as UTC timestamp representation) - lead hours?
            # No, standard is issue_time_utc.
            # Let's reverse engineering: issue_time + lead_hours = target_date (in station tz)
            # So issue_time = target_date (station tz) - lead_hours

            # df_lead['date_local'] is tz-aware (STATION_TZ)
            df_lead['issue_time_utc'] = df_lead['date_local'] - pd.to_timedelta(lead, unit='h')
            df_lead['issue_time_utc'] = df_lead['issue_time_utc'].dt.tz_convert('UTC')

            df_lead['tmax_pred_c'] = df_lead['tmax_c']
            df_lead['tmax_pred_f'] = df_lead['tmax_f']
            df_lead['lead_hours'] = lead
            df_lead['model'] = 'era5'
            df_lead['source'] = 'era5' # Overwrite 'noaa_isd' from aggregation
            df_lead['ingested_at_utc'] = now_utc

            # Select columns
            from tempdata.schemas.daily_tmax_forecast import DAILY_TMAX_FORECAST_FIELDS
            era5_forecasts.append(df_lead[DAILY_TMAX_FORECAST_FIELDS])

        if era5_forecasts:
            df_era5_final = pd.concat(era5_forecasts, ignore_index=True)
            forecast_dfs.append(df_era5_final)
            print(f"Added {len(df_era5_final)} synthentic ERA5 forecast rows")

# 2. Add Open-Meteo data
if 'df_forecast_om' in locals() and not df_forecast_om.empty:
    forecast_dfs.append(df_forecast_om)
    print(f"Loaded {len(df_forecast_om)} Open-Meteo forecast rows")

if forecast_dfs:
    df_forecast = pd.concat(forecast_dfs, ignore_index=True)
    # Sort for tidiness
    df_forecast.sort_values(['station_id', 'target_date_local', 'lead_hours'], inplace=True)
else:
    print("Warning: No forecast data found. Creating empty DataFrame.")
    from tempdata.schemas.daily_tmax_forecast import DAILY_TMAX_FORECAST_FIELDS
    df_forecast = pd.DataFrame(columns=DAILY_TMAX_FORECAST_FIELDS)

print(f"Final df_forecast: {len(df_forecast)} rows")

Loaded 54000 hourly ERA5 rows
Aggregated ERA5 to 2192 daily Tmax records
Synthesizing ERA5 forecasts for lead hours: [np.int64(28), np.int64(29)]
Added 4384 synthentic ERA5 forecast rows
Loaded 2558 Open-Meteo forecast rows
Final df_forecast: 6942 rows


## Verify Forecast Data

Load and validate the forecast parquet, then display a summary.

In [16]:
# df_forecast is already returned from fetch_openmeteo_historical_forecasts
print(f"Loaded {len(df_forecast)} forecast rows")

# Validate schema (already validated in fetch, but double-check)
validate_daily_tmax_forecast(df_forecast)
print("Schema validation passed")

# Display summary
print(f"\nForecast Summary:")
print(f"  Target dates: {df_forecast['target_date_local'].min().date()} to {df_forecast['target_date_local'].max().date()}")
print(f"  Lead hours range: {df_forecast['lead_hours'].min()} to {df_forecast['lead_hours'].max()}")
print(f"  Tmax (C): {df_forecast['tmax_pred_c'].min():.1f} to {df_forecast['tmax_pred_c'].max():.1f}")
print(f"  Tmax (F): {df_forecast['tmax_pred_f'].min():.1f} to {df_forecast['tmax_pred_f'].max():.1f}")

print("\nForecast Data (first 10 rows):")
print(df_forecast[["target_date_local", "tmax_pred_c", "tmax_pred_f", "lead_hours"]].head(10).to_string(index=False))

Loaded 6942 forecast rows
Schema validation passed

Forecast Summary:
  Target dates: 2009-12-31 to 2024-12-31
  Lead hours range: 28 to 29
  Tmax (C): -12.1 to 38.2
  Tmax (F): 10.2 to 100.8

Forecast Data (first 10 rows):
target_date_local  tmax_pred_c  tmax_pred_f  lead_hours
       2009-12-31     0.515778         32.9          28
       2009-12-31     0.515778         32.9          29
       2010-01-01     3.700043         38.7          28
       2010-01-01     3.700043         38.7          29
       2010-01-02     1.040802         33.9          28
       2010-01-02     1.040802         33.9          29
       2010-01-03    -5.446259         22.2          28
       2010-01-03    -5.446259         22.2          29
       2010-01-04    -0.885223         30.4          28
       2010-01-04    -0.885223         30.4          29


## Feature Engineering for Daily Tmax

Transform forecasts and truth data into a **model-ready training dataset**.

This section uses:
- **Real truth data** (`df_daily`) from the NOAA aggregation step above
- **Real historical forecasts** (`df_forecast`) from the Open-Meteo fetch above

The feature engineering pipeline:
1. Joins forecasts to truth on `(station_id, target_date_local)`
2. Filters low-quality truth days by coverage
3. Adds seasonal encodings: `sin_doy`, `cos_doy`, `month`
4. Computes rolling bias/error statistics: `bias_7d`, `bias_14d`, `bias_30d`, `rmse_14d`, `rmse_30d`, `sigma_lead`

All rolling features use `.shift(1)` to ensure **no lookahead** — each row's features are computed only from prior data.

In [17]:
# Use real data from previous cells:
# - df_daily: NOAA observations aggregated to daily Tmax (truth)
# - df_forecast: Open-Meteo historical forecasts

# Prepare truth data for feature engineering
df_truth_for_features = df_daily.copy()

print(f"Using real truth data: {len(df_truth_for_features)} days")
print(f"  Date range: {df_truth_for_features['date_local'].min().date()} to {df_truth_for_features['date_local'].max().date()}")

# Use the real Open-Meteo historical forecasts
df_forecast_for_features = df_forecast.copy()

print(f"\nUsing real Open-Meteo historical forecasts:")
print(f"  Forecast rows: {len(df_forecast_for_features)}")
print(f"  Lead times: {sorted(df_forecast_for_features['lead_hours'].unique())} hours")
print(f"  Target date range: {df_forecast_for_features['target_date_local'].min().date()} to {df_forecast_for_features['target_date_local'].max().date()}")

Using real truth data: 5718 days
  Date range: 2009-12-31 to 2025-08-26

Using real Open-Meteo historical forecasts:
  Forecast rows: 6942
  Lead times: [np.int64(28), np.int64(29)] hours
  Target date range: 2009-12-31 to 2024-12-31


## Build Training Dataset

Run the feature engineering pipeline to create model-ready features.

In [18]:
from tempdata.features.build_train_daily_tmax import build_train_daily_tmax
from tempdata.schemas.train_daily_tmax import validate_train_daily_tmax, TRAIN_DAILY_TMAX_FIELDS

# Build the training dataset using real truth data + real Open-Meteo forecasts
# This performs: join, seasonal features, rolling bias/error stats, validation
df_train = build_train_daily_tmax(
    forecast_df=df_forecast_for_features,
    truth_df=df_truth_for_features,
    min_coverage_hours=18,  # Filter low-quality truth days
    drop_warmup_nulls=False,  # Keep warm-up rows (they have NaN in rolling features)
    validate=True,
)

print(f"Training dataset: {len(df_train)} rows")
print(f"Columns: {list(df_train.columns)}")
print(f"\nColumn types:")
for col in df_train.columns:
    print(f"  {col}: {df_train[col].dtype}")

Training dataset: 6940 rows
Columns: ['station_id', 'issue_time_utc', 'target_date_local', 'tmax_pred_f', 'lead_hours', 'forecast_source', 'sin_doy', 'cos_doy', 'month', 'bias_7d', 'bias_14d', 'bias_30d', 'rmse_14d', 'rmse_30d', 'sigma_lead', 'tmax_actual_f']

Column types:
  station_id: object
  issue_time_utc: datetime64[ns, UTC]
  target_date_local: datetime64[ns]
  tmax_pred_f: float64
  lead_hours: int64
  forecast_source: object
  sin_doy: float64
  cos_doy: float64
  month: int32
  bias_7d: float64
  bias_14d: float64
  bias_30d: float64
  rmse_14d: float64
  rmse_30d: float64
  sigma_lead: float64
  tmax_actual_f: float64


## Inspect Features

Examine the generated features, focusing on rolling statistics and seasonal encodings.

In [19]:
# Display core features and seasonal encodings
display_cols = [
    "target_date_local", "lead_hours", "tmax_pred_f", "tmax_actual_f",
    "sin_doy", "cos_doy", "month"
]
print("Core Features & Seasonal Encodings (first 10 rows):")
print(df_train[display_cols].head(10).to_string(index=False))

# Display rolling bias/error features
rolling_cols = [
    "target_date_local", "lead_hours", "bias_7d", "bias_14d", "bias_30d",
    "rmse_14d", "rmse_30d", "sigma_lead"
]
print("\n\nRolling Bias & Error Features (rows 10-20, after warm-up):")
print(df_train[rolling_cols].iloc[10:20].to_string(index=False))

Core Features & Seasonal Encodings (first 10 rows):
target_date_local  lead_hours  tmax_pred_f  tmax_actual_f  sin_doy  cos_doy  month
       2010-01-01          28         38.7           39.0 0.017202 0.999852      1
       2010-01-02          28         33.9           32.0 0.034398 0.999408      1
       2010-01-03          28         22.2           23.0 0.051584 0.998669      1
       2010-01-04          28         30.4           30.9 0.068755 0.997634      1
       2010-01-05          28         30.3           30.9 0.085906 0.996303      1
       2010-01-06          28         33.8           34.0 0.103031 0.994678      1
       2010-01-07          28         38.0           37.9 0.120126 0.992759      1
       2010-01-08          28         32.4           34.0 0.137185 0.990545      1
       2010-01-09          28         28.9           30.9 0.154204 0.988039      1
       2010-01-10          28         27.9           28.9 0.171177 0.985240      1


Rolling Bias & Error Features (ro

## Verify No-Lookahead Property

Confirm that rolling features are computed correctly with `.shift(1)` — each row's features should only use prior data.

In [20]:
# Verify no-lookahead: The first row for each (station, lead_hours) group should have NaN bias
# because there's no prior data to compute rolling stats from

first_rows = df_train.groupby(["station_id", "lead_hours"]).first()
print("First row per (station_id, lead_hours) group — bias_7d should be NaN:")
print(first_rows[["bias_7d", "bias_14d", "bias_30d"]].to_string())

# Compute actual residual for a specific row and verify it's NOT in its own bias
# Pick a row after warm-up period
if len(df_train) > 10:
    test_idx = 10
    test_row = df_train.iloc[test_idx]
    actual_residual = test_row["tmax_pred_f"] - test_row["tmax_actual_f"]

    print(f"\n\nVerification for row {test_idx}:")
    print(f"  Actual residual (pred - actual): {actual_residual:.2f}°F")
    print(f"  bias_7d (computed from PRIOR 7 days): {test_row['bias_7d']:.2f}°F")
    print(f"  These should be different values (bias excludes current row)")

    # The bias_7d should NOT equal the current residual (unless by coincidence)
    print(f"\n  Residual == bias_7d? {abs(actual_residual - test_row['bias_7d']) < 0.01}")

First row per (station_id, lead_hours) group — bias_7d should be NaN:
                       bias_7d  bias_14d  bias_30d
station_id lead_hours                             
KLGA       28             -0.3      -0.3      -0.3
           29             -0.3      -0.3      -0.3


Verification for row 10:
  Actual residual (pred - actual): -0.40°F
  bias_7d (computed from PRIOR 7 days): -0.83°F
  These should be different values (bias excludes current row)

  Residual == bias_7d? False


## Analyze Forecast Bias by Lead Time

The rolling bias features capture systematic forecast errors that vary by lead time.

In [21]:
# Compute actual residuals for analysis
df_train["residual"] = df_train["tmax_pred_f"] - df_train["tmax_actual_f"]

# Analyze bias by lead time
print("Forecast Error Analysis by Lead Time:")
print("=" * 60)

bias_by_lead = df_train.groupby("lead_hours").agg({
    "residual": ["mean", "std", "count"],
    "bias_30d": "mean",  # Average of the rolling bias feature
    "sigma_lead": "mean",  # Average sigma_lead
}).round(2)

bias_by_lead.columns = ["Mean Error (°F)", "Std Dev (°F)", "Count", "Avg bias_30d", "Avg sigma_lead"]
print(bias_by_lead.to_string())

print("\n\nKey Insight:")
print("  - sigma_lead captures uncertainty and can be used for confidence intervals")
print("  - bias_30d provides a rolling estimate that adapts to recent forecast performance")

Forecast Error Analysis by Lead Time:
            Mean Error (°F)  Std Dev (°F)  Count  Avg bias_30d  Avg sigma_lead
lead_hours                                                                    
28                    -1.08          1.95   3857         -1.09            1.93
29                    -1.42          1.87   3083         -1.42            1.93


Key Insight:
  - sigma_lead captures uncertainty and can be used for confidence intervals
  - bias_30d provides a rolling estimate that adapts to recent forecast performance


## Save Training Dataset

Write the feature-engineered training dataset to parquet for model training.

In [22]:
from tempdata.features.build_train_daily_tmax import write_train_daily_tmax

# Create output directory
TRAIN_DIR = DATA_DIR / "train" / "daily_tmax" / STATION_ID
TRAIN_DIR.mkdir(parents=True, exist_ok=True)

# Drop rows with NaN rolling features for clean training data
df_train_clean = df_train.dropna(subset=["bias_7d", "bias_14d", "bias_30d", "rmse_14d", "rmse_30d", "sigma_lead"])
print(f"Rows after dropping warm-up NaNs: {len(df_train_clean)} (was {len(df_train)})")

# Select only the schema columns (drop residual which was for analysis)
df_train_final = df_train_clean[TRAIN_DAILY_TMAX_FIELDS].copy()

# Write to parquet
output_path = TRAIN_DIR / "train_daily_tmax.parquet"
write_train_daily_tmax(df_train_final, output_path)

print(f"\nTraining dataset saved to: {output_path}")

Rows after dropping warm-up NaNs: 6936 (was 6940)
[features] wrote 6936 rows to /Users/kcao/Documents/temp-data-pipeline/data/train/daily_tmax/KLGA/train_daily_tmax.parquet

Training dataset saved to: /Users/kcao/Documents/temp-data-pipeline/data/train/daily_tmax/KLGA/train_daily_tmax.parquet


## Feature Engineering Summary

The `train_daily_tmax` dataset is now ready for model training with:

**Input Data:**
- Truth: Real NOAA observations aggregated to daily Tmax
- Forecasts: Real Open-Meteo historical forecasts

**Core Features:**
- `tmax_pred_f`: Raw forecast (the baseline to beat)
- `lead_hours`: Forecast horizon (longer = more uncertainty)
- `forecast_source`: Model identifier for multi-model ensembles

**Seasonal Encodings:**
- `sin_doy`, `cos_doy`: Capture annual temperature cycles
- `month`: Coarse seasonal regime

**Rolling Bias/Error Statistics (key value-add):**
- `bias_7d`, `bias_14d`, `bias_30d`: Recent forecast bias (forecast - observed)
- `rmse_14d`, `rmse_30d`: Recent forecast error magnitude
- `sigma_lead`: Historical uncertainty for this lead time

**Label:**
- `tmax_actual_f`: Observed maximum temperature (ground truth)

In [23]:
# Pipeline complete - summary of outputs
print("Pipeline outputs:")
print(f"  - Raw NOAA hourly data: {OUTPUT_DIR}")
print(f"  - Cleaned hourly observations: {HOURLY_CLEAN_DIR}")
print(f"  - Daily Tmax (truth): {DAILY_TMAX_DIR / f'{STATION_ID}.parquet'}")
print(f"  - Open-Meteo forecasts: {FORECAST_CLEAN_DIR}")
print(f"  - Training dataset: {output_path}")

Pipeline outputs:
  - Raw NOAA hourly data: /Users/kcao/Documents/temp-data-pipeline/data/raw/noaa_hourly/KLGA
  - Cleaned hourly observations: /Users/kcao/Documents/temp-data-pipeline/data/clean/hourly_obs/KLGA
  - Daily Tmax (truth): /Users/kcao/Documents/temp-data-pipeline/data/clean/daily_tmax/KLGA/KLGA.parquet
  - Open-Meteo forecasts: /Users/kcao/Documents/temp-data-pipeline/data/clean/forecasts/openmeteo/KLGA
  - Training dataset: /Users/kcao/Documents/temp-data-pipeline/data/train/daily_tmax/KLGA/train_daily_tmax.parquet
