# 01 Ingest Flood Hazard Feeds (NOAA + NWS)

Stage: `01_ingest_hazard`
Discipline: hazard data generation and normalization.

Outputs written to:
- `JupyterNotebooks/outputs/index_pipeline/01_ingest/noaa_station_catalog.csv`
- `JupyterNotebooks/outputs/index_pipeline/01_ingest/noaa_station_timeseries.csv`
- `JupyterNotebooks/outputs/index_pipeline/01_ingest/flood_station_latest_features.csv`
- `JupyterNotebooks/outputs/index_pipeline/01_ingest/nws_alerts_enriched.csv`


In [None]:
# Cell 1: Setup
import importlib.util
import subprocess
import sys
import logging
import os
import time
from datetime import datetime, timedelta, timezone
from pathlib import Path


def ensure_packages(packages):
    missing = [p for p in packages if importlib.util.find_spec(p) is None]
    if missing:
        subprocess.check_call([sys.executable, "-m", "pip", "install", "--quiet", *missing])


ensure_packages(["pandas", "numpy", "requests"])

import numpy as np
import pandas as pd
import requests

logging.basicConfig(level=logging.INFO, format="%(asctime)s | %(levelname)s | %(message)s")
logger = logging.getLogger("index-pipeline-stage01")


def find_repo_root():
    p = Path.cwd().resolve()
    for c in [p, *p.parents]:
        if (c / "JupyterNotebooks").exists():
            return c
    return p


REPO_ROOT = find_repo_root()
OUTPUT_DIR = REPO_ROOT / "JupyterNotebooks" / "outputs" / "index_pipeline" / "01_ingest"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

print(f"Repo root: {REPO_ROOT}")
print(f"Output dir: {OUTPUT_DIR}")

try:
    from IPython.display import display
except ImportError:
    display = print


In [None]:
# Cell 2: Configuration
NOAA_STATE = "PR"
NOAA_PRODUCT = os.environ.get("NOAA_PRODUCT", "water_level")
NOAA_DATUM = os.environ.get("NOAA_DATUM", "MLLW")
NOAA_TIME_ZONE = os.environ.get("NOAA_TIME_ZONE", "gmt")
NOAA_UNITS = os.environ.get("NOAA_UNITS", "metric")
LOOKBACK_HOURS = int(os.environ.get("LOOKBACK_HOURS", "72"))

RAW_STATION_IDS = os.environ.get("NOAA_STATION_IDS", "").strip()
EXCLUDE_STATION_IDS = os.environ.get("NOAA_EXCLUDE_STATION_IDS", "").strip()
MAX_ACTIVE_STATIONS = int(os.environ.get("MAX_ACTIVE_STATIONS", "100"))
REQUIRE_TIDAL_FOR_DATUM = os.environ.get("REQUIRE_TIDAL_FOR_DATUM", "true").lower() in ("1", "true", "yes")

CATALOG_TIMEOUT_SECONDS = int(os.environ.get("CATALOG_TIMEOUT_SECONDS", "90"))
CATALOG_MAX_BYTES = int(os.environ.get("CATALOG_MAX_BYTES", "5000000"))
CATALOG_MAX_ROWS = int(os.environ.get("CATALOG_MAX_ROWS", "5000"))

PR_BBOX = {
    "min_lon": -68.5,
    "max_lon": -65.0,
    "min_lat": 17.5,
    "max_lat": 18.9,
}

MDAPI_BASE = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi"
DATAGETTER_URL = "https://api.tidesandcurrents.noaa.gov/api/prod/datagetter"
NWS_ALERTS_URL = "https://api.weather.gov/alerts/active"
NWS_HEADERS = {
    "User-Agent": "gmu-daen-index-pipeline/1.0",
    "Accept": "application/geo+json",
}

RUN_UTC = datetime.now(timezone.utc)
BEGIN_UTC = RUN_UTC - timedelta(hours=LOOKBACK_HOURS)
BEGIN_DATE = BEGIN_UTC.strftime("%Y%m%d %H:%M")
END_DATE = RUN_UTC.strftime("%Y%m%d %H:%M")

print("Configuration ready")
print(f"  NOAA_DATUM={NOAA_DATUM} LOOKBACK_HOURS={LOOKBACK_HOURS}")


In [None]:
# Cell 3: Helper functions

def api_get_json(url, params=None, timeout=60, retries=3, backoff_seconds=2, max_bytes=None, headers=None):
    last_err = None
    for attempt in range(1, retries + 1):
        try:
            resp = requests.get(url, params=params, timeout=timeout, headers=headers)
            if max_bytes is not None and len(resp.content or b"") > max_bytes:
                raise RuntimeError(f"Payload exceeded max bytes for {url}")

            if resp.status_code >= 400:
                msg = ""
                try:
                    payload = resp.json()
                    msg = (payload.get("error") or {}).get("message", "")
                except Exception:
                    msg = (resp.text or "")[:300]

                if 400 <= resp.status_code < 500 and resp.status_code not in (408, 429):
                    raise RuntimeError(f"Non-retryable HTTP {resp.status_code}: {msg}")

                resp.raise_for_status()

            return resp.json()
        except Exception as exc:
            last_err = exc
            if "Non-retryable HTTP" in str(exc):
                break
            if attempt < retries:
                wait_s = backoff_seconds ** attempt
                logger.warning("Request failed (%s/%s). Retrying in %ss. %s", attempt, retries, wait_s, exc)
                time.sleep(wait_s)
    raise RuntimeError(f"Request failed after {retries} attempts: {last_err}")


def parse_station_csv(value):
    return [sid.strip() for sid in str(value).split(",") if sid and sid.strip()]


def get_pr_station_catalog(state="PR"):
    payload = api_get_json(
        f"{MDAPI_BASE}/stations.json",
        params={"state": state},
        timeout=CATALOG_TIMEOUT_SECONDS,
        max_bytes=CATALOG_MAX_BYTES,
    )
    rows = payload.get("stations", [])
    if not rows:
        raise RuntimeError("No station rows returned from NOAA catalog")
    if len(rows) > CATALOG_MAX_ROWS:
        raise RuntimeError(f"Catalog row count {len(rows)} exceeds configured limit {CATALOG_MAX_ROWS}")

    df = pd.DataFrame(rows)
    df["id"] = df["id"].astype(str)
    for c in ["lat", "lng"]:
        df[c] = pd.to_numeric(df[c], errors="coerce")

    df = df[
        (df["lng"] >= PR_BBOX["min_lon"])
        & (df["lng"] <= PR_BBOX["max_lon"])
        & (df["lat"] >= PR_BBOX["min_lat"])
        & (df["lat"] <= PR_BBOX["max_lat"])
    ].copy()

    keep = [c for c in ["id", "name", "state", "lat", "lng", "shefcode", "tidal"] if c in df.columns]
    return df[keep].drop_duplicates(subset=["id"]).sort_values("id").reset_index(drop=True)


def resolve_station_ids(catalog_df):
    catalog_ids = sorted(catalog_df["id"].astype(str).tolist())
    catalog_set = set(catalog_ids)
    requested = parse_station_csv(RAW_STATION_IDS)
    excluded = set(parse_station_csv(EXCLUDE_STATION_IDS))
    invalid = [sid for sid in requested if sid not in catalog_set]

    if requested:
        selected = [sid for sid in requested if sid in catalog_set]
        mode = "manual intersect live catalog"
        if not selected:
            selected = catalog_ids
            mode = "auto fallback (full live catalog)"
    else:
        selected = catalog_ids
        mode = "auto (full live catalog)"

    if excluded:
        selected = [sid for sid in selected if sid not in excluded]

    if len(selected) > MAX_ACTIVE_STATIONS:
        logger.warning("Truncating station list %s -> %s due to MAX_ACTIVE_STATIONS", len(selected), MAX_ACTIVE_STATIONS)
        selected = selected[:MAX_ACTIVE_STATIONS]

    return selected, invalid, sorted(excluded), mode


def fetch_station_metadata(station_id):
    payload = api_get_json(
        f"{MDAPI_BASE}/stations/{station_id}.json",
        params={"expand": "floodlevels,details,sensors"},
        timeout=90,
        max_bytes=CATALOG_MAX_BYTES,
    )
    stations = payload.get("stations", [])
    if not stations:
        raise RuntimeError(f"No metadata for station {station_id}")
    return stations[0]


def fetch_station_series(station_id):
    payload = api_get_json(
        DATAGETTER_URL,
        params={
            "product": NOAA_PRODUCT,
            "application": "GMU_INDEX_PIPELINE",
            "begin_date": BEGIN_DATE,
            "end_date": END_DATE,
            "datum": NOAA_DATUM,
            "station": station_id,
            "time_zone": NOAA_TIME_ZONE,
            "units": NOAA_UNITS,
            "format": "json",
        },
        timeout=120,
        max_bytes=CATALOG_MAX_BYTES,
    )

    if "error" in payload:
        msg = (payload["error"] or {}).get("message", "")
        if "No data was found" in msg:
            return pd.DataFrame(columns=["t", "v", "s", "f", "q"])
        raise RuntimeError(msg)

    rows = payload.get("data", [])
    if not rows:
        return pd.DataFrame(columns=["t", "v", "s", "f", "q"])

    df = pd.DataFrame(rows)
    for c in ["v", "s"]:
        if c in df.columns:
            df[c] = pd.to_numeric(df[c], errors="coerce")
    return df


def map_alert_score(event, severity):
    event_text = (event or "").lower()
    severity_text = (severity or "").lower()

    if "flash flood warning" in event_text:
        return 100
    if "flood warning" in event_text:
        return 70
    if "flood watch" in event_text:
        return 40

    sev_map = {
        "extreme": 100,
        "severe": 80,
        "moderate": 60,
        "minor": 40,
        "unknown": 25,
    }
    return sev_map.get(severity_text, 25)


def normalize_linear(value, low, high):
    if pd.isna(value):
        return np.nan
    if high <= low:
        return np.nan
    return float(np.clip((value - low) / (high - low), 0, 1) * 100)


In [None]:
# Cell 4: Ingest NOAA + NWS and build flood hazard features
catalog_pull_utc = datetime.now(timezone.utc)
station_catalog_df = get_pr_station_catalog(NOAA_STATE)

if REQUIRE_TIDAL_FOR_DATUM and NOAA_DATUM.upper() != "STND" and "tidal" in station_catalog_df.columns:
    pre = len(station_catalog_df)
    station_catalog_df["tidal"] = station_catalog_df["tidal"].fillna(False).astype(bool)
    station_catalog_df = station_catalog_df[station_catalog_df["tidal"]].copy()
    logger.info("Datum compatibility filter: %s -> %s stations", pre, len(station_catalog_df))

station_ids, invalid_requested, excluded_applied, selection_mode = resolve_station_ids(station_catalog_df)
print(f"Selection mode: {selection_mode}")
print(f"Selected stations: {len(station_ids)}")
if invalid_requested:
    print(f"Ignored station IDs: {invalid_requested}")
if excluded_applied:
    print(f"Excluded station IDs: {excluded_applied}")

station_rows = []
timeseries_frames = []

for sid in station_ids:
    try:
        meta = fetch_station_metadata(sid)
        ts = fetch_station_series(sid)
    except Exception as exc:
        logger.warning("Skipping station %s: %s", sid, exc)
        continue

    if ts.empty:
        logger.info("No timeseries rows for station %s", sid)
        continue

    ts["time_utc"] = pd.to_datetime(ts["t"], utc=True, errors="coerce")
    ts = ts.dropna(subset=["time_utc"]).sort_values("time_utc").copy()

    lat = pd.to_numeric(meta.get("lat"), errors="coerce")
    lon = pd.to_numeric(meta.get("lng"), errors="coerce")
    if pd.isna(lat) or pd.isna(lon):
        continue

    flood = meta.get("floodlevels") or {}
    minor = pd.to_numeric(flood.get("nos_minor") or flood.get("action"), errors="coerce")
    moderate = pd.to_numeric(flood.get("nos_moderate"), errors="coerce")
    major = pd.to_numeric(flood.get("nos_major"), errors="coerce")

    latest = ts.iloc[-1]
    prev = ts.iloc[-2] if len(ts) > 1 else latest
    latest_value = pd.to_numeric(latest.get("v"), errors="coerce")
    prev_value = pd.to_numeric(prev.get("v"), errors="coerce")

    delta_hours = max((latest["time_utc"] - prev["time_utc"]).total_seconds() / 3600.0, 1e-6)
    rise_rate = (latest_value - prev_value) / delta_hours if pd.notna(latest_value) and pd.notna(prev_value) else np.nan

    exceed_ratio = np.nan
    if pd.notna(minor) and pd.notna(major) and major > minor and pd.notna(latest_value):
        exceed_ratio = (latest_value - minor) / (major - minor)

    exceed_score = normalize_linear(exceed_ratio, 0.0, 1.0)
    rise_score = normalize_linear(rise_rate, 0.0, 0.30)
    sensor_hazard_score = np.nanmean([exceed_score, rise_score]) if (pd.notna(exceed_score) or pd.notna(rise_score)) else 0.0

    ts = ts.rename(columns={"v": "water_level", "s": "sigma", "q": "quality", "f": "flags_raw"})
    ts["flags"] = ts.get("flags_raw", "").astype(str)
    ts["station_id"] = sid
    ts["station_name"] = meta.get("name", sid)
    ts["lat"] = float(lat)
    ts["lon"] = float(lon)
    ts["minor"] = minor
    ts["moderate"] = moderate
    ts["major"] = major
    ts["datum"] = NOAA_DATUM
    ts["units"] = NOAA_UNITS
    ts["run_utc"] = RUN_UTC.isoformat()

    keep_cols = [
        "station_id", "station_name", "time_utc", "water_level", "sigma", "flags", "quality",
        "lat", "lon", "minor", "moderate", "major", "datum", "units", "run_utc"
    ]
    timeseries_frames.append(ts[keep_cols])

    station_rows.append({
        "station_id": sid,
        "station_name": meta.get("name", sid),
        "shefcode": meta.get("shefcode"),
        "lat": float(lat),
        "lon": float(lon),
        "latest_time_utc": latest["time_utc"],
        "latest_water_level": latest_value,
        "rise_rate_per_hour": rise_rate,
        "minor": minor,
        "moderate": moderate,
        "major": major,
        "exceed_ratio": exceed_ratio,
        "exceed_score": exceed_score,
        "rise_score": rise_score,
        "sensor_hazard_score": float(sensor_hazard_score),
    })

if not station_rows:
    raise RuntimeError("No station hazard rows were generated. Check NOAA API settings.")

station_latest_df = pd.DataFrame(station_rows)
noaa_timeseries_df = pd.concat(timeseries_frames, ignore_index=True)

# NWS alerts ingest
alerts_payload = api_get_json(
    NWS_ALERTS_URL,
    params={"area": "PR"},
    timeout=60,
    retries=2,
    max_bytes=CATALOG_MAX_BYTES,
    headers=NWS_HEADERS,
)

alert_rows = []
for feat in alerts_payload.get("features", []):
    props = feat.get("properties", {})
    event = props.get("event")
    severity = props.get("severity")
    score = map_alert_score(event, severity)
    geocodes = props.get("geocode") or {}
    ugc = geocodes.get("UGC") or []
    area_desc = props.get("areaDesc")

    alert_rows.append({
        "id": props.get("id"),
        "event": event,
        "severity": severity,
        "urgency": props.get("urgency"),
        "certainty": props.get("certainty"),
        "headline": props.get("headline"),
        "sent": props.get("sent"),
        "onset": props.get("onset"),
        "ends": props.get("ends"),
        "status": props.get("status"),
        "area_desc": area_desc,
        "ugc_list": ",".join(ugc) if isinstance(ugc, list) else str(ugc),
        "alert_score": score,
    })

nws_alerts_df = pd.DataFrame(alert_rows)
global_alert_score = float(nws_alerts_df["alert_score"].max()) if not nws_alerts_df.empty else 0.0

station_latest_df["nws_global_alert_score"] = global_alert_score
station_latest_df["flood_hazard_final"] = station_latest_df[["sensor_hazard_score"]].max(axis=1)
station_latest_df["flood_hazard_final"] = station_latest_df["flood_hazard_final"].clip(0, 100)
station_latest_df["flood_hazard_final"] = station_latest_df[["flood_hazard_final"]].assign(g=global_alert_score).max(axis=1)
station_latest_df["catalog_pull_utc"] = catalog_pull_utc.isoformat()

# Save outputs
station_catalog_out = OUTPUT_DIR / "noaa_station_catalog.csv"
noaa_series_out = OUTPUT_DIR / "noaa_station_timeseries.csv"
station_latest_out = OUTPUT_DIR / "flood_station_latest_features.csv"
nws_alerts_out = OUTPUT_DIR / "nws_alerts_enriched.csv"

station_catalog_df.to_csv(station_catalog_out, index=False)
noaa_timeseries_df.to_csv(noaa_series_out, index=False)
station_latest_df.to_csv(station_latest_out, index=False)
nws_alerts_df.to_csv(nws_alerts_out, index=False)

print("Stage 01 outputs:")
print(f"  {station_catalog_out}")
print(f"  {noaa_series_out}")
print(f"  {station_latest_out}")
print(f"  {nws_alerts_out}")

print("\nPreview flood station features:")
display(station_latest_df.head(10))
