# 1a — Fetch streamflow for ausvic gauges

Fetches daily mean flow (ML/day) from both data providers for all 12 Maribyrnong gauges,
converts to mm/day, and writes per-gauge CSVs ready for the Part 2 postprocessing notebook.

**Run this notebook locally** (no Google Earth Engine required).

## What it does

1. Loads gauge config from `caravan_maribyrnong_gee/gauges_ausvic.json` (same file as 0a / 0b)
2. Fetches 7 Melbourne Water gauges from `api.melbournewater.com.au`
3. Fetches 5 Victorian Water gauges from `data.water.vic.gov.au` (Hydstra API)
4. Converts ML/day → mm/day using `area_km2` from the JSON
5. Filters negatives, bad quality flags, deduplicates, sorts chronologically
6. Saves one CSV per gauge to `caravan_maribyrnong/timeseries/csv/ausvic/{gauge_id}.csv`

## Output CSV format

Each file has two columns: `date` (YYYY-MM-DD) and `streamflow` (mm/day, empty string for missing).
The date index is **daily** with no gaps — required by the Part 2 validation check.

## Station ID mapping

| gauge_id | Station ID | API |
|----------|-----------|-----|
| ausvic_230119 | 230119A | Melbourne Water |
| ausvic_230100 | 230100A | Melbourne Water |
| ausvic_230102 | 230102A | Melbourne Water |
| ausvic_230211 | 230211A | Melbourne Water |
| ausvic_230107 | 230107A | Melbourne Water |
| ausvic_230237 | 230237A | Melbourne Water |
| ausvic_230106 | 230106A | Melbourne Water |
| ausvic_230200 | 230200  | Victorian Water |
| ausvic_230206 | 230206  | Victorian Water |
| ausvic_230202 | 230202  | Victorian Water |
| ausvic_230213 | 230213  | Victorian Water |
| ausvic_230227 | 230227  | Victorian Water |

In [1]:
import json
import time
import urllib.parse
import urllib.request
from datetime import date, timedelta
from pathlib import Path

import pandas as pd

# ── Resolve project root ───────────────────────────────────────────────────
# Works whether the kernel CWD is the project root OR the notebooks/ folder.
_cwd = Path('.').resolve()
PROJECT_ROOT = _cwd.parent if _cwd.name == 'notebooks' else _cwd

# ── Paths ──────────────────────────────────────────────────────────────────
GAUGES_JSON = PROJECT_ROOT / 'caravan_maribyrnong_gee' / 'gauges_ausvic.json'
OUT_DIR     = PROJECT_ROOT / 'caravan_maribyrnong' / 'timeseries' / 'csv' / 'ausvic'
OUT_DIR.mkdir(parents=True, exist_ok=True)

# ── Load gauge config ─────────────────────────────────────────────────────
with open(GAUGES_JSON) as f:
    GAUGES = json.load(f)

# Station ID mapping: gauge_id → (raw station ID, API source)
# Melbourne Water station IDs have a trailing letter (always 'A' in this network).
STATION_IDS = {
    'ausvic_230119': ('230119A', 'melbwater'),
    'ausvic_230100': ('230100A', 'melbwater'),
    'ausvic_230102': ('230102A', 'melbwater'),
    'ausvic_230211': ('230211A', 'melbwater'),
    'ausvic_230107': ('230107A', 'melbwater'),
    'ausvic_230237': ('230237A', 'melbwater'),
    'ausvic_230106': ('230106A', 'melbwater'),
    'ausvic_230200': ('230200',  'hydstra'),
    'ausvic_230206': ('230206',  'hydstra'),
    'ausvic_230202': ('230202',  'hydstra'),
    'ausvic_230213': ('230213',  'hydstra'),
    'ausvic_230227': ('230227',  'hydstra'),
}

# Earliest date to keep per gauge (inclusive). Rows before this date are dropped.
# Rationale for 230202: Hydstra returns data from 1908 but values 1908-1959 are
# ~0.25 ML/day artefacts (level-derived, no proper rating curve). The gauge was
# properly operational from 1960. Including pre-1960 data would mislead flood
# models (ERA5 forcing exists 1950-1960 but streamflow is near-zero artefact).
FETCH_START = {
    'ausvic_230202': '1960-01-01',
}

print(f'Project root : {PROJECT_ROOT}')
print(f'Loaded {len(GAUGES)} gauges from {GAUGES_JSON}')
print(f'Output dir   : {OUT_DIR}')
print(f'fetch_start overrides: {FETCH_START}')

Project root : C:\Users\leela\FloodHubMaribyrnong
Loaded 12 gauges from C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong_gee\gauges_ausvic.json
Output dir   : C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\timeseries\csv\ausvic
fetch_start overrides: {'ausvic_230202': '1960-01-01'}


## Melbourne Water API

Endpoint: `https://api.melbournewater.com.au/rainfall-river-level/{station_id}/river-flow/daily/range`  
Parameters: `fromDate=YYYY-MM-DD&toDate=YYYY-MM-DD`  
Response key: `dailyRiverFlowsData` → list of `{dateTime, meanRiverFlow}`

Fetched in yearly chunks (API may impose request limits).

In [2]:
MELBWATER_BASE = 'https://api.melbournewater.com.au/rainfall-river-level'

MW_HEADERS = {
    'User-Agent': 'Mozilla/5.0',
    'Accept':     'application/json',
    'Origin':     'https://www.melbournewater.com.au',
    'Referer':    'https://www.melbournewater.com.au/',
}

def fetch_mw_flow(station_id, start_year=1990):
    """
    Fetch all available daily flow data for a Melbourne Water station.

    Returns a list of (iso_date_str, ml_per_day) tuples.
    Filters out negative flows.
    dateTime field is Melbourne local time 'YYYY-MM-DD HH:MM:SS' — date part only.
    """
    rows = []
    today = date.today()
    yr = start_year

    while yr <= today.year:
        yr_end = min(yr + 4, today.year)  # 5-year chunks
        from_dt = f'{yr}-01-01'
        to_dt   = f'{yr_end}-12-31' if yr_end < today.year else today.isoformat()

        url = (f'{MELBWATER_BASE}/{station_id}/river-flow/daily/range'
               f'?fromDate={from_dt}&toDate={to_dt}')
        req = urllib.request.Request(url, headers=MW_HEADERS)
        try:
            with urllib.request.urlopen(req, timeout=30) as resp:
                data = json.loads(resp.read().decode())
        except Exception as e:
            print(f'    [warn] {station_id} {from_dt}–{to_dt}: {e}')
            yr = yr_end + 1
            time.sleep(0.5)
            continue

        for rec in data.get('dailyRiverFlowsData', []):
            flow = rec.get('meanRiverFlow')
            dt   = rec.get('dateTime', '')
            if flow is None or not dt:
                continue
            if float(flow) < 0:
                continue
            rows.append((dt[:10], float(flow)))

        yr = yr_end + 1
        time.sleep(0.3)

    return rows

print('Melbourne Water fetch function defined.')

Melbourne Water fetch function defined.


## Victorian Water / Hydstra API

Endpoint: `https://data.water.vic.gov.au/cgi/webservice.exe`  
Function: `get_ts_traces`, variable `141.00` (ML/day discharge), datasource `PUBLISH`  
Response: `return.traces[0].trace` → list of `{t, v, q}`  
Quality flag 255 = bad data — filtered out.

In [3]:
HYDSTRA_BASE = 'https://data.water.vic.gov.au/cgi/webservice.exe'

def fetch_vw_flow(station_id):
    """
    Fetch all available daily mean discharge (141.00 = ML/day) for a Victorian Water station.

    Returns a list of (iso_date_str, ml_per_day) tuples.
    Filters quality flag 255 (bad data) and negative values.
    Timestamp field t is YYYYMMDD[HHMMSS] — date part is first 8 chars.
    """
    params = {
        'function':   'get_ts_traces',
        'version':    '2',
        'site_list':  station_id,
        'datasource': 'PUBLISH',
        'varfrom':    '100.00',
        'varto':      '141.00',
        'start_time': '19000101000000',
        'end_time':   date.today().strftime('%Y%m%d235959'),
        'interval':   'day',
        'multiplier': '1',
        'data_type':  'mean',
    }

    url  = f'{HYDSTRA_BASE}?{urllib.parse.urlencode(params)}'
    with urllib.request.urlopen(url, timeout=60) as resp:
        data = json.loads(resp.read().decode())

    traces = data.get('return', {}).get('traces', [])
    if not traces:
        print(f'  [warn] No traces returned for {station_id}')
        return []

    rows = []
    for pt in traces[0].get('trace', []):
        if pt.get('q') == 255 or pt.get('v') in ('', None):
            continue
        raw = float(pt['v'])
        if raw < 0:
            continue
        ts = str(pt['t'])
        if len(ts) < 8:
            continue
        rows.append((f'{ts[:4]}-{ts[4:6]}-{ts[6:8]}', raw))

    return rows

print('Victorian Water fetch function defined.')

Victorian Water fetch function defined.


## Build per-gauge daily DataFrames

For each gauge:
1. Fetch raw ML/day from the appropriate API
2. Deduplicate on date (keep first after sort)
3. Convert ML/day → mm/day: `flow_mm = flow_ML / area_km2`
4. Build a complete daily date index (no gaps) — required by Part 2
5. Missing days get empty string (Caravan convention)

In [4]:
def build_daily_series(rows_ml, area_km2, fetch_start=None):
    """
    Given a list of (iso_date, ml_per_day) tuples, build a complete daily
    pandas Series in mm/day with no date gaps.

    fetch_start: optional 'YYYY-MM-DD' string — rows before this date are dropped.
    Missing days → NaN (written as empty string to CSV).
    Deduplicates: keeps first occurrence after chronological sort.
    """
    if not rows_ml:
        return pd.Series(dtype=float, name='streamflow')

    df = (pd.DataFrame(rows_ml, columns=['date', 'flow_ml'])
            .assign(date=lambda d: pd.to_datetime(d['date']))
            .sort_values('date')
            .drop_duplicates('date', keep='first')
            .set_index('date'))

    # Apply fetch_start cutoff if specified
    if fetch_start:
        df = df.loc[fetch_start:]

    if df.empty:
        return pd.Series(dtype=float, name='streamflow')

    # Convert ML/day → mm/day  (ML/day ÷ area_km2 = mm/day)
    df['streamflow'] = df['flow_ml'] / area_km2

    # Reindex to a gapless daily spine
    full_idx = pd.date_range(df.index.min(), df.index.max(), freq='D')
    series = df['streamflow'].reindex(full_idx)
    series.index.name = 'date'
    series.name = 'streamflow'
    return series


results = {}  # gauge_id → pd.Series

for g in GAUGES:
    gid      = g['gauge_id']
    area     = g['area_km2']
    sid, src = STATION_IDS[gid]
    start    = FETCH_START.get(gid)

    print(f'  Fetching {gid} ({sid}, {src}) ...', end=' ', flush=True)

    try:
        if src == 'melbwater':
            rows = fetch_mw_flow(sid, start_year=1990)
        else:
            rows = fetch_vw_flow(sid)
    except Exception as e:
        print(f'ERROR: {e}')
        results[gid] = pd.Series(dtype=float, name='streamflow')
        continue

    series = build_daily_series(rows, area, fetch_start=start)
    results[gid] = series

    n_valid   = series.notna().sum()
    n_total   = len(series)
    pct_miss  = 100 * series.isna().sum() / n_total if n_total > 0 else 0
    date_rng  = f'{series.index.min().date()} – {series.index.max().date()}' if n_total > 0 else 'no data'
    flag      = f'  [start clipped to {start}]' if start else ''
    print(f'{n_valid:>6} valid days / {n_total} total | {pct_miss:.1f}% missing | {date_rng}{flag}')

  Fetching ausvic_230119 (230119A, melbwater) ...   8094 valid days / 10749 total | 24.7% missing | 1996-09-25 – 2026-02-28
  Fetching ausvic_230100 (230100A, melbwater) ...   8094 valid days / 10749 total | 24.7% missing | 1996-09-25 – 2026-02-28
  Fetching ausvic_230102 (230102A, melbwater) ...   8094 valid days / 10749 total | 24.7% missing | 1996-09-25 – 2026-02-28
  Fetching ausvic_230211 (230211A, melbwater) ...   6504 valid days / 6507 total | 0.0% missing | 2008-05-07 – 2026-02-28
  Fetching ausvic_230107 (230107A, melbwater) ...   8078 valid days / 10749 total | 24.8% missing | 1996-09-25 – 2026-02-28
  Fetching ausvic_230237 (230237A, melbwater) ...   6611 valid days / 6702 total | 1.4% missing | 2007-10-25 – 2026-02-28
  Fetching ausvic_230200 (230200, hydstra) ...  36490 valid days / 43126 total | 15.4% missing | 1908-02-02 – 2026-02-27
  Fetching ausvic_230106 (230106A, melbwater) ...   8078 valid days / 10749 total | 24.8% missing | 1996-09-25 – 2026-02-28
  Fetching ausv

## Save CSVs

Each CSV: `date` (index), `streamflow` (mm/day or empty string for missing).  
The Part 2 notebook reads these with `pd.to_numeric(..., errors='coerce')` which converts
empty strings back to NaN.

In [5]:
saved = []

for gid, series in results.items():
    if len(series) == 0:
        print(f'  [skip] {gid} — no data fetched')
        continue

    # Convert NaN → empty string (Caravan missing-value convention)
    out = series.round(4).to_frame()
    out['streamflow'] = out['streamflow'].map(
        lambda x: '' if pd.isna(x) else f'{x:.4f}'
    )

    out_path = OUT_DIR / f'{gid}.csv'
    out.to_csv(out_path)
    saved.append(gid)
    print(f'  Saved {out_path}  ({len(out)} rows)')

print(f'\n{len(saved)} / {len(GAUGES)} gauges saved.')

  Saved C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\timeseries\csv\ausvic\ausvic_230119.csv  (10749 rows)
  Saved C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\timeseries\csv\ausvic\ausvic_230100.csv  (10749 rows)
  Saved C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\timeseries\csv\ausvic\ausvic_230102.csv  (10749 rows)
  Saved C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\timeseries\csv\ausvic\ausvic_230211.csv  (6507 rows)
  Saved C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\timeseries\csv\ausvic\ausvic_230107.csv  (10749 rows)
  Saved C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\timeseries\csv\ausvic\ausvic_230237.csv  (6702 rows)
  Saved C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\timeseries\csv\ausvic\ausvic_230200.csv  (43126 rows)
  Saved C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\timeseries\csv\ausvic\ausvic_230106.csv  (10749 rows)
  Saved C:\Users\leela\FloodHubMaribyrnong\caravan_maribyrnong\tim

## Summary

In [6]:
print(f'{"gauge_id":<22} {"station":>8}  {"source":<12} {"area km2":>9}  {"valid days":>10}  {"from":>12} {"to":>12}  {"% missing":>10}')
print('-' * 110)

for g in GAUGES:
    gid      = g['gauge_id']
    area     = g['area_km2']
    sid, src = STATION_IDS[gid]
    series   = results.get(gid, pd.Series(dtype=float))

    if len(series) == 0:
        print(f'{gid:<22} {sid:>8}  {src:<12} {area:>9.1f}  {"NO DATA":>10}')
        continue

    n_valid  = int(series.notna().sum())
    n_total  = len(series)
    pct_miss = 100 * series.isna().sum() / n_total
    d_from   = str(series.index.min().date())
    d_to     = str(series.index.max().date())

    print(f'{gid:<22} {sid:>8}  {src:<12} {area:>9.1f}  {n_valid:>10}  {d_from:>12} {d_to:>12}  {pct_miss:>9.1f}%')

print()
print('NOTE: Chifley Drive (230106A) is tidal — expect many missing days.')
print('NOTE: Keilor (230200) should have records from 1907.')

gauge_id                station  source        area km2  valid days          from           to   % missing
--------------------------------------------------------------------------------------------------------------
ausvic_230119           230119A  melbwater        226.1        8094    1996-09-25   2026-02-28       24.7%
ausvic_230100           230100A  melbwater        481.5        8094    1996-09-25   2026-02-28       24.7%
ausvic_230102           230102A  melbwater        857.5        8094    1996-09-25   2026-02-28       24.7%
ausvic_230211           230211A  melbwater         94.9        6504    2008-05-07   2026-02-28        0.0%
ausvic_230107           230107A  melbwater        618.0        8078    1996-09-25   2026-02-28       24.8%
ausvic_230237           230237A  melbwater       1278.1        6611    2007-10-25   2026-02-28        1.4%
ausvic_230200            230200  hydstra         1305.4       36490    1908-02-02   2026-02-27       15.4%
ausvic_230106           230106A  

## Next step

Run **`2-Caravan_part2_local_postprocessing_ausvic.ipynb`** to merge these streamflow CSVs
with the ERA5-Land forcing data from the `caravan_maribyrnong_gee/batch*.csv` files.