In [1]:
# ============================================================
# 0) Config
# ============================================================

import os
from pathlib import Path

OSN_ENDPOINT_URL = "https://nyu1.osn.mghpcc.org"
OSN_BUCKET = "leap-pangeo-manual"
HACKATHON_PREFIX = "hackathon-2026/"
OSN_ROOT = f"s3://{OSN_BUCKET}/{HACKATHON_PREFIX}"

HRRR_PREFIX = f"{OSN_ROOT}hrrr/"

# Where to write derived outputs (writable scratch)
SCRATCH_BUCKET = os.environ.get("SCRATCH_BUCKET", "gs://leap-scratch/renriviera")
OUT_PREFIX = f"{SCRATCH_BUCKET}/sfincs_soundview_preproc"
WIND_OUT_PREFIX = f"{OUT_PREFIX}/forcing/wind_hrrr"

print("OSN_ROOT:", OSN_ROOT)
print("HRRR_PREFIX:", HRRR_PREFIX)
print("WIND_OUT_PREFIX:", WIND_OUT_PREFIX)


OSN_ROOT: s3://leap-pangeo-manual/hackathon-2026/
HRRR_PREFIX: s3://leap-pangeo-manual/hackathon-2026/hrrr/
WIND_OUT_PREFIX: gs://leap-scratch/renriviera/sfincs_soundview_preproc/forcing/wind_hrrr


In [2]:
# ============================================================
# 1) Dask cluster
# ============================================================

from dask.distributed import Client

client = None

try:
    from dask_gateway import Gateway
    gw = Gateway()
    cluster = gw.new_cluster()
    cluster.scale(4)  # adjust: 2‚Äì8 workers typical
    client = cluster.get_client()
    print("‚úÖ Using Dask Gateway cluster")
    print(client)
except Exception as e:
    print("Gateway not available (or failed). Falling back to LocalCluster.")
    from dask.distributed import LocalCluster
    cluster = LocalCluster(
        n_workers=2,
        threads_per_worker=2,
        memory_limit="3GB",
        dashboard_address=":8787",
    )
    client = Client(cluster)
    print("‚úÖ Using LocalCluster")
    print(client)


‚úÖ Using Dask Gateway cluster
<Client: 'tls://10.0.190.33:8786' processes=0 threads=0, memory=0 B>


In [3]:
# ============================================================
# 2) OSN S3 filesystem (anonymous)
# ============================================================

import s3fs

fs = s3fs.S3FileSystem(
    anon=True,
    client_kwargs={"endpoint_url": OSN_ENDPOINT_URL},
)

# quick sanity check
print("‚úÖ Connected to OSN:", OSN_ENDPOINT_URL)
print("Listing HRRR prefix:")
print(fs.ls(HRRR_PREFIX)[:20])


‚úÖ Connected to OSN: https://nyu1.osn.mghpcc.org
Listing HRRR prefix:
['leap-pangeo-manual/hackathon-2026/hrrr/refc', 'leap-pangeo-manual/hackathon-2026/hrrr/temp2m', 'leap-pangeo-manual/hackathon-2026/hrrr/tp', 'leap-pangeo-manual/hackathon-2026/hrrr/u10m', 'leap-pangeo-manual/hackathon-2026/hrrr/v10m']


In [4]:
# ============================================================
# 3) Open HRRR u10m/v10m Zarr stores for YEAR=2025 only (OSN S3)
# ============================================================

import xarray as xr
import fsspec

YEAR = 2025

U10M_STORE = f"s3://{OSN_BUCKET}/{HACKATHON_PREFIX}hrrr/u10m/hrrru10m{YEAR}.zarr"
V10M_STORE = f"s3://{OSN_BUCKET}/{HACKATHON_PREFIX}hrrr/v10m/hrrrv10m{YEAR}.zarr"

print("‚úÖ Using YEAR:", YEAR)
print("U10M_STORE:", U10M_STORE)
print("V10M_STORE:", V10M_STORE)

# ------------------------------------------------------------
# IMPORTANT: OSN is S3-compatible, not GCS.
# We MUST use an S3 filesystem pointed at the OSN endpoint.
# ------------------------------------------------------------
fs_s3 = fsspec.filesystem(
    "s3",
    anon=True,
    client_kwargs={"endpoint_url": OSN_ENDPOINT_URL},
)

def exists_zarr_s3(store_path: str) -> bool:
    # Zarr v2 marker: .zgroup
    # Zarr v3 marker: zarr.json
    return (
        fs_s3.exists(store_path)
        or fs_s3.exists(store_path.rstrip("/") + "/.zgroup")
        or fs_s3.exists(store_path.rstrip("/") + "/zarr.json")
    )

# ---- Existence check
if not exists_zarr_s3(U10M_STORE):
    raise FileNotFoundError(f"‚ùå u10m store not found: {U10M_STORE}")

if not exists_zarr_s3(V10M_STORE):
    raise FileNotFoundError(f"‚ùå v10m store not found: {V10M_STORE}")

print("‚úÖ Confirmed both 2025 Zarr stores exist on OSN.")

# ---- Open with consolidated fallback
def open_zarr_safely_s3(store_path: str):
    mapper = fs_s3.get_mapper(store_path)
    try:
        return xr.open_zarr(mapper, consolidated=True)
    except Exception:
        return xr.open_zarr(mapper, consolidated=False)

ds_u = open_zarr_safely_s3(U10M_STORE)
ds_v = open_zarr_safely_s3(V10M_STORE)

print("\n--- ds_u ---")
print(ds_u)
print("u vars:", list(ds_u.data_vars)[:30])

print("\n--- ds_v ---")
print(ds_v)
print("v vars:", list(ds_v.data_vars)[:30])

# ---- Quick time coverage sanity check
def time_range(ds):
    if "time" not in ds.coords:
        return None
    t0 = ds["time"].values[0]
    t1 = ds["time"].values[-1]
    n = ds.sizes.get("time", None)
    return t0, t1, n

u_t = time_range(ds_u)
v_t = time_range(ds_v)

print("\n--- Time coverage ---")
print("u10m time:", u_t)
print("v10m time:", v_t)

if u_t and v_t:
    if (u_t[0] != v_t[0]) or (u_t[1] != v_t[1]) or (u_t[2] != v_t[2]):
        print("‚ö†Ô∏è WARNING: u/v time ranges differ (we will intersect later).")
    else:
        print("‚úÖ u/v time ranges match.")





‚úÖ Using YEAR: 2025
U10M_STORE: s3://leap-pangeo-manual/hackathon-2026/hrrr/u10m/hrrru10m2025.zarr
V10M_STORE: s3://leap-pangeo-manual/hackathon-2026/hrrr/v10m/hrrrv10m2025.zarr
‚úÖ Confirmed both 2025 Zarr stores exist on OSN.

--- ds_u ---
<xarray.Dataset> Size: 59GB
Dimensions:              (time: 7800, y: 1059, x: 1799)
Coordinates:
  * time                 (time) datetime64[ns] 62kB 2025-01-01 ... 2025-11-21...
    gribfile_projection  float64 8B ...
    heightAboveGround    float64 8B ...
    latitude             (y, x) float64 15MB dask.array<chunksize=(1059, 1799), meta=np.ndarray>
    longitude            (y, x) float64 15MB dask.array<chunksize=(1059, 1799), meta=np.ndarray>
    step                 timedelta64[ns] 8B ...
    valid_time           (time) datetime64[ns] 62kB dask.array<chunksize=(24,), meta=np.ndarray>
Dimensions without coordinates: y, x
Data variables:
    u10                  (time, y, x) float32 59GB dask.array<chunksize=(24, 1059, 1799), meta=np.ndarray>


In [5]:
# ============================================================
# 4) Extract u10m and v10m variables + standardize names
# ============================================================

import xarray as xr
import numpy as np

def pick_main_var(ds, prefer_substrings=("u10", "u10m", "v10", "v10m", "wind")):
    if len(ds.data_vars) == 0:
        raise RuntimeError("Dataset has no data variables.")
    # Prefer variables that look like wind
    for key in ds.data_vars:
        lk = key.lower()
        if any(s in lk for s in prefer_substrings):
            return key
    return list(ds.data_vars)[0]

u_var = pick_main_var(ds_u, prefer_substrings=("u10", "u10m", "ugrd", "wind"))
v_var = pick_main_var(ds_v, prefer_substrings=("v10", "v10m", "vgrd", "wind"))

print("Selected u var:", u_var)
print("Selected v var:", v_var)

u10 = ds_u[u_var].rename("wind10_u")
v10 = ds_v[v_var].rename("wind10_v")

# Force consistent coordinates (time/x/y)
u10, v10 = xr.align(u10, v10, join="inner")

wind = xr.Dataset({"wind10_u": u10, "wind10_v": v10})
wind["wind10_speed"] = np.sqrt(wind["wind10_u"]**2 + wind["wind10_v"]**2)

print("\n‚úÖ wind dataset:")
print(wind)



Selected u var: u10
Selected v var: v10


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.



‚úÖ wind dataset:
<xarray.Dataset> Size: 178GB
Dimensions:              (y: 1059, x: 1799, time: 7800)
Coordinates:
  * time                 (time) datetime64[ns] 62kB 2025-01-01 ... 2025-11-21...
    gribfile_projection  float64 8B nan
    heightAboveGround    float64 8B 10.0
    latitude             (y, x) float64 15MB 21.14 21.15 21.15 ... 47.85 47.84
    longitude            (y, x) float64 15MB 237.3 237.3 237.3 ... 299.0 299.1
    step                 timedelta64[ns] 8B 00:00:00
    valid_time           (time) datetime64[ns] 62kB 2025-01-01 ... 2025-11-21...
Dimensions without coordinates: y, x
Data variables:
    wind10_u             (time, y, x) float32 59GB dask.array<chunksize=(24, 1059, 1799), meta=np.ndarray>
    wind10_v             (time, y, x) float32 59GB dask.array<chunksize=(24, 1059, 1799), meta=np.ndarray>
    wind10_speed         (time, y, x) float32 59GB dask.array<chunksize=(24, 1059, 1799), meta=np.ndarray>


In [6]:
# ============================================================
# 5) Define ROI (Soundview) + subset HRRR winds using 2D lat/lon
#     Works even when latitude/longitude are (y,x) 2D arrays.
# ============================================================

import numpy as np
import xarray as xr

print("Wind dims:", wind.dims)
print("Wind coords:", list(wind.coords))

# ------------------------------------------------------------
# (A) Define your ROI in WGS84 (lon/lat)
#     Soundview Bronx (approx bbox). You can adjust these later.
# ------------------------------------------------------------
ROI_MIN_LON = -73.882
ROI_MAX_LON = -73.842
ROI_MIN_LAT = 40.807
ROI_MAX_LAT = 40.836

print("\nROI WGS84 bbox:")
print("  lon:", (ROI_MIN_LON, ROI_MAX_LON))
print("  lat:", (ROI_MIN_LAT, ROI_MAX_LAT))

# ------------------------------------------------------------
# (B) Grab 2D lat/lon from dataset
# ------------------------------------------------------------
if "latitude" not in wind.coords or "longitude" not in wind.coords:
    raise RuntimeError("Expected wind coords 'latitude' and 'longitude' but did not find them.")

lat2d = wind["latitude"]
lon2d = wind["longitude"]

# Ensure they are (y,x)
print("\nLatitude shape:", lat2d.shape, "Longitude shape:", lon2d.shape)

# ------------------------------------------------------------
# (C) Normalize longitudes if stored as 0..360
# ------------------------------------------------------------
lon_vals = lon2d.values
if np.nanmax(lon_vals) > 180:
    print("Detected 0..360 longitude convention -> converting to -180..180")
    lon2d_fixed = ((lon2d + 180) % 360) - 180
else:
    lon2d_fixed = lon2d

# ------------------------------------------------------------
# (D) Build ROI mask on the 2D grid
# ------------------------------------------------------------
mask = (
    (lat2d >= ROI_MIN_LAT) & (lat2d <= ROI_MAX_LAT) &
    (lon2d_fixed >= ROI_MIN_LON) & (lon2d_fixed <= ROI_MAX_LON)
)

mask_count = int(mask.sum().values) if hasattr(mask.sum().values, "item") else int(mask.sum().values)
print("\nMask pixels inside ROI:", mask_count)

if mask_count == 0:
    # Helpful debugging: print dataset geographic extent
    lat_min = float(np.nanmin(lat2d.values))
    lat_max = float(np.nanmax(lat2d.values))
    lon_min = float(np.nanmin(lon2d_fixed.values))
    lon_max = float(np.nanmax(lon2d_fixed.values))
    raise RuntimeError(
        "ROI mask returned 0 pixels.\n"
        f"Wind lat range: {lat_min:.4f} .. {lat_max:.4f}\n"
        f"Wind lon range: {lon_min:.4f} .. {lon_max:.4f}\n"
        "Your ROI bbox is outside the dataset coverage OR lon convention mismatch."
    )

# ------------------------------------------------------------
# (E) Convert mask -> bounding box indices (y_min..y_max, x_min..x_max)
# ------------------------------------------------------------
yy, xx = np.where(mask.values)

y0, y1 = int(yy.min()), int(yy.max())
x0, x1 = int(xx.min()), int(xx.max())

# Add a small pad so we don't clip tightly
PAD = 4
y0 = max(0, y0 - PAD)
x0 = max(0, x0 - PAD)
y1 = min(wind.sizes["y"] - 1, y1 + PAD)
x1 = min(wind.sizes["x"] - 1, x1 + PAD)

print("\nSubset index window:")
print("  y:", (y0, y1), "=> height:", (y1 - y0 + 1))
print("  x:", (x0, x1), "=> width :", (x1 - x0 + 1))

# ------------------------------------------------------------
# (F) Subset wind by y/x index window
# ------------------------------------------------------------
wind_roi = wind.isel(y=slice(y0, y1 + 1), x=slice(x0, x1 + 1))

print("\n‚úÖ Wind subset done.")
print("Subset dims:", wind_roi.dims)

# Replace wind with ROI subset for the rest of notebook
wind = wind_roi

wind



Wind coords: ['gribfile_projection', 'heightAboveGround', 'latitude', 'longitude', 'step', 'time', 'valid_time']

ROI WGS84 bbox:
  lon: (-73.882, -73.842)
  lat: (40.807, 40.836)

Latitude shape: (1059, 1799) Longitude shape: (1059, 1799)
Detected 0..360 longitude convention -> converting to -180..180

Mask pixels inside ROI: 2

Subset index window:
  y: (696, 704) => height: 9
  x: (1551, 1560) => width : 10

‚úÖ Wind subset done.


Unnamed: 0,Array,Chunk
Bytes,2.68 MiB,8.44 kiB
Shape,"(7800, 9, 10)","(24, 9, 10)"
Dask graph,325 chunks in 3 graph layers,325 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.68 MiB 8.44 kiB Shape (7800, 9, 10) (24, 9, 10) Dask graph 325 chunks in 3 graph layers Data type float32 numpy.ndarray",10  9  7800,

Unnamed: 0,Array,Chunk
Bytes,2.68 MiB,8.44 kiB
Shape,"(7800, 9, 10)","(24, 9, 10)"
Dask graph,325 chunks in 3 graph layers,325 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.68 MiB,8.44 kiB
Shape,"(7800, 9, 10)","(24, 9, 10)"
Dask graph,325 chunks in 3 graph layers,325 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.68 MiB 8.44 kiB Shape (7800, 9, 10) (24, 9, 10) Dask graph 325 chunks in 3 graph layers Data type float32 numpy.ndarray",10  9  7800,

Unnamed: 0,Array,Chunk
Bytes,2.68 MiB,8.44 kiB
Shape,"(7800, 9, 10)","(24, 9, 10)"
Dask graph,325 chunks in 3 graph layers,325 chunks in 3 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,2.68 MiB,8.44 kiB
Shape,"(7800, 9, 10)","(24, 9, 10)"
Dask graph,325 chunks in 9 graph layers,325 chunks in 9 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 2.68 MiB 8.44 kiB Shape (7800, 9, 10) (24, 9, 10) Dask graph 325 chunks in 9 graph layers Data type float32 numpy.ndarray",10  9  7800,

Unnamed: 0,Array,Chunk
Bytes,2.68 MiB,8.44 kiB
Shape,"(7800, 9, 10)","(24, 9, 10)"
Dask graph,325 chunks in 9 graph layers,325 chunks in 9 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [7]:
# ============================================================
# Diagnose HRRR u10m/v10m time mismatch + auto-select a matching pair
# ============================================================

import s3fs
import xarray as xr
import numpy as np

OSN_ENDPOINT_URL = "https://nyu1.osn.mghpcc.org"
OSN_BUCKET = "leap-pangeo-manual"
HACKATHON_PREFIX = "hackathon-2026/"

U10M_ROOT = f"s3://{OSN_BUCKET}/{HACKATHON_PREFIX}hrrr/u10m"
V10M_ROOT = f"s3://{OSN_BUCKET}/{HACKATHON_PREFIX}hrrr/v10m"

fs = s3fs.S3FileSystem(
    client_kwargs={"endpoint_url": OSN_ENDPOINT_URL},
    anon=True,
)

def list_children(path, max_items=50):
    try:
        out = fs.ls(path)
        out = sorted(out)
        print(f"\nüìÅ {path} ({len(out)} entries)")
        for p in out[:max_items]:
            print(" -", p)
        if len(out) > max_items:
            print(f" ... ({len(out)-max_items} more)")
        return out
    except Exception as e:
        print(f"‚ùå Failed to ls {path}: {type(e).__name__}: {e}")
        return []

u_children = list_children(U10M_ROOT)
v_children = list_children(V10M_ROOT)

def candidate_stores(children):
    """
    Keep entries that look like zarr stores (directories / prefixes).
    We'll just attempt opening each.
    """
    # heuristic: prefer things containing ".zarr" if present,
    # otherwise treat each child folder as a possible store
    zarr_like = [p for p in children if p.endswith(".zarr") or "zarr" in p.lower()]
    if zarr_like:
        return zarr_like
    return children

u_stores = candidate_stores(u_children)
v_stores = candidate_stores(v_children)

def open_zarr_store(store_path):
    mapper = fs.get_mapper(store_path)
    ds = xr.open_zarr(mapper, consolidated=False)
    return ds

def time_range(ds):
    tname = "time"
    if "valid_time" in ds.coords:
        t = ds["valid_time"].values
    elif "time" in ds.coords:
        t = ds["time"].values
    else:
        return None, None
    return np.min(t), np.max(t)

print("\n--- Probing u10m candidate stores ---")
u_infos = []
for p in u_stores[:30]:
    try:
        ds = open_zarr_store(p)
        t0, t1 = time_range(ds)
        u_infos.append((p, t0, t1))
        print(f"‚úÖ u10m store: {p}")
        print("   time:", t0, "->", t1)
    except Exception as e:
        print(f"‚ö†Ô∏è u10m open failed: {p} ({type(e).__name__})")

print("\n--- Probing v10m candidate stores ---")
v_infos = []
for p in v_stores[:30]:
    try:
        ds = open_zarr_store(p)
        t0, t1 = time_range(ds)
        v_infos.append((p, t0, t1))
        print(f"‚úÖ v10m store: {p}")
        print("   time:", t0, "->", t1)
    except Exception as e:
        print(f"‚ö†Ô∏è v10m open failed: {p} ({type(e).__name__})")

def overlap(a0, a1, b0, b1):
    if a0 is None or b0 is None:
        return False
    return (a0 <= b1) and (b0 <= a1)

# Pick the first overlapping pair
chosen = None
for up, ut0, ut1 in u_infos:
    for vp, vt0, vt1 in v_infos:
        if overlap(ut0, ut1, vt0, vt1):
            chosen = (up, vp, ut0, ut1, vt0, vt1)
            break
    if chosen:
        break

if chosen is None:
    raise RuntimeError(
        "‚ùå Could not find any u10m/v10m store pair with overlapping time coverage.\n"
        "This means the OSN u10m/v10m directories contain different time partitions.\n"
        "We need to choose matching year/subfolder names manually from the printed listing above."
    )

up, vp, ut0, ut1, vt0, vt1 = chosen
print("\n‚úÖ Chosen matching pair:")
print("u10m:", up, "|", ut0, "->", ut1)
print("v10m:", vp, "|", vt0, "->", vt1)

# Open chosen stores for use downstream
ds_u = open_zarr_store(up)
ds_v = open_zarr_store(vp)

print("\n‚úÖ ds_u variables:", list(ds_u.data_vars)[:20])
print("‚úÖ ds_v variables:", list(ds_v.data_vars)[:20])




üìÅ s3://leap-pangeo-manual/hackathon-2026/hrrr/u10m (6 entries)
 - leap-pangeo-manual/hackathon-2026/hrrr/u10m/hrrru10m2020.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/u10m/hrrru10m2021.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/u10m/hrrru10m2022.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/u10m/hrrru10m2023.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/u10m/hrrru10m2024.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/u10m/hrrru10m2025.zarr

üìÅ s3://leap-pangeo-manual/hackathon-2026/hrrr/v10m (6 entries)
 - leap-pangeo-manual/hackathon-2026/hrrr/v10m/hrrrv10m2020.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/v10m/hrrrv10m2021.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/v10m/hrrrv10m2022.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/v10m/hrrrv10m2023.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/v10m/hrrrv10m2024.zarr
 - leap-pangeo-manual/hackathon-2026/hrrr/v10m/hrrrv10m2025.zarr

--- Probing u10m candidate stores ---
‚úÖ u10m store: leap-pangeo-manual/hackathon-20

In [9]:
# ============================================================
# 6) Subset u10/v10 to ROI (lat/lon mask) + Build FEWS wind forcing dataset
#    Output: x,y,time + amu/amv  (for HydroMT-SFINCS meteo forcing)
# ============================================================

import numpy as np
import xarray as xr
from pyproj import Transformer

FEWS_TIME_UNITS = "minutes since 1970-01-01 00:00:00.0 +0000"
WIND_NODATA = -9999.0

# ------------------------------------------------------------
# IMPORTANT: this notebook is separate from DEM notebook
# so TARGET_CRS might NOT exist here.
# For NYC / Soundview, UTM Zone 18N = EPSG:26918
# ------------------------------------------------------------
if "TARGET_CRS" not in globals():
    TARGET_CRS = "EPSG:26918"
print("‚úÖ Using TARGET_CRS:", TARGET_CRS)

# ---- ROI bbox (WGS84) ----
ROI_LON_MIN, ROI_LON_MAX = -73.882, -73.842
ROI_LAT_MIN, ROI_LAT_MAX =  40.807,  40.836

# ------------------------------------------------------------
# 1) Robustly pick u/v DataArrays from whatever exists
# ------------------------------------------------------------
def pick_first_var(ds: xr.Dataset) -> xr.DataArray:
    vname = list(ds.data_vars)[0]
    return ds[vname]

u = None
v = None

for cand in ["ds_u_roi", "ds_u_sub", "ds_u"]:
    if cand in globals() and isinstance(globals()[cand], xr.Dataset):
        u = pick_first_var(globals()[cand])
        print(f"‚úÖ picked u from {cand}[{u.name}]")
        break

for cand in ["ds_v_roi", "ds_v_sub", "ds_v"]:
    if cand in globals() and isinstance(globals()[cand], xr.Dataset):
        v = pick_first_var(globals()[cand])
        print(f"‚úÖ picked v from {cand}[{v.name}]")
        break

if u is None or v is None:
    raise RuntimeError("‚ùå Could not find ds_u / ds_v (or their subset versions).")

print("Raw u dims:", u.dims, "shape:", u.shape)
print("Raw v dims:", v.dims, "shape:", v.shape)

# ------------------------------------------------------------
# 2) Force dim order to (time, y, x)
# ------------------------------------------------------------
def to_time_y_x(da: xr.DataArray) -> xr.DataArray:
    needed = ("time", "y", "x")
    if not set(needed).issubset(set(da.dims)):
        raise RuntimeError(f"Expected dims {needed}, got {da.dims}")
    return da.transpose("time", "y", "x")

u = to_time_y_x(u)
v = to_time_y_x(v)

# ------------------------------------------------------------
# 3) Drop nondim coords that break merge/alignment (valid_time/step/etc.)
# ------------------------------------------------------------
def drop_nondim_coords(da: xr.DataArray) -> xr.DataArray:
    drop = [c for c in da.coords if c not in da.dims]
    return da.drop_vars(drop, errors="ignore")

u = drop_nondim_coords(u)
v = drop_nondim_coords(v)

# ------------------------------------------------------------
# 4) Align time axis by intersection (robust)
# ------------------------------------------------------------
t_u = np.asarray(u["time"].values)
t_v = np.asarray(v["time"].values)
t_common = np.intersect1d(t_u, t_v)

if t_common.size == 0:
    raise RuntimeError(
        "‚ùå No overlapping timestamps between u and v.\n"
        "You probably opened mismatched years (e.g. u=2020, v=2025)."
    )

u = u.sel(time=t_common)
v = v.sel(time=t_common)
print("‚úÖ common time:", u.sizes["time"], "| first/last:", u.time.values[0], u.time.values[-1])

# ------------------------------------------------------------
# 5) Build ROI mask using 2D latitude/longitude
# ------------------------------------------------------------
lat2d = None
lon2d = None

# Sometimes lat/lon are coords on the variable
if ("latitude" in u.coords) and ("longitude" in u.coords):
    lat2d = np.asarray(u["latitude"].values)
    lon2d = np.asarray(u["longitude"].values)

# fallback: use the full dataset if it exists
if (lat2d is None or lon2d is None) and ("ds_u" in globals()):
    if ("latitude" in globals()["ds_u"].coords) and ("longitude" in globals()["ds_u"].coords):
        lat2d = np.asarray(globals()["ds_u"]["latitude"].values)
        lon2d = np.asarray(globals()["ds_u"]["longitude"].values)

if lat2d is None or lon2d is None:
    raise RuntimeError("‚ùå Could not find latitude/longitude coords (need them for ROI masking).")

# Convert 0..360 -> -180..180 if needed
if np.nanmax(lon2d) > 180:
    lon2d = ((lon2d + 180) % 360) - 180

inside = (
    (lon2d >= ROI_LON_MIN) & (lon2d <= ROI_LON_MAX) &
    (lat2d >= ROI_LAT_MIN) & (lat2d <= ROI_LAT_MAX)
)

n_inside = int(np.sum(inside))
print("Mask pixels inside ROI:", n_inside)
if n_inside == 0:
    raise RuntimeError("‚ùå ROI mask selected 0 pixels. Check bbox or lon convention.")

ys, xs = np.where(inside)
y0, y1 = int(ys.min()), int(ys.max())
x0, x1 = int(xs.min()), int(xs.max())

print("Subset index window:")
print(f"  y: ({y0}, {y1}) -> height: {y1-y0+1}")
print(f"  x: ({x0}, {x1}) -> width : {x1-x0+1}")

# Apply subset window to u/v
u = u.isel(y=slice(y0, y1 + 1), x=slice(x0, x1 + 1))
v = v.isel(y=slice(y0, y1 + 1), x=slice(x0, x1 + 1))

# Subset lat/lon too (for projection)
lat_roi = lat2d[y0:y1 + 1, x0:x1 + 1]
lon_roi = lon2d[y0:y1 + 1, x0:x1 + 1]

print("‚úÖ u subset shape:", u.shape, "| v subset shape:", v.shape)

# ------------------------------------------------------------
# 6) Build projected x/y coordinates (meters) from lat/lon ROI window
# ------------------------------------------------------------
transformer = Transformer.from_crs("EPSG:4326", str(TARGET_CRS), always_xy=True)
x2d, y2d = transformer.transform(lon_roi, lat_roi)
x2d = np.asarray(x2d, dtype="float64")
y2d = np.asarray(y2d, dtype="float64")

# Collapse to 1D coords (mean along orthogonal axis)
x_1d = np.nanmean(x2d, axis=0)
y_1d = np.nanmean(y2d, axis=1)

# Force monotonic increasing x/y by flipping if needed
if np.any(np.diff(x_1d) < 0):
    x_1d = x_1d[::-1]
    u = u.isel(x=slice(None, None, -1))
    v = v.isel(x=slice(None, None, -1))

if np.any(np.diff(y_1d) < 0):
    y_1d = y_1d[::-1]
    u = u.isel(y=slice(None, None, -1))
    v = v.isel(y=slice(None, None, -1))

print("‚úÖ final x/y sizes:", len(x_1d), len(y_1d))

# ------------------------------------------------------------
# 7) Convert time -> FEWS minutes since epoch
# ------------------------------------------------------------
t0 = np.datetime64("1970-01-01T00:00:00")
t_minutes = ((u["time"].values - t0) / np.timedelta64(1, "m")).astype("int64")

# ------------------------------------------------------------
# 8) Build FEWS dataset: amu/amv + (time,x,y)
# ------------------------------------------------------------
amu = u.astype("float32")
amv = v.astype("float32")

amu = amu.where(np.isfinite(amu), WIND_NODATA)
amv = amv.where(np.isfinite(amv), WIND_NODATA)

wind_fews = xr.Dataset(
    data_vars={
        "amu": (("time", "y", "x"), amu.values),
        "amv": (("time", "y", "x"), amv.values),
    },
    coords={
        "time": ("time", t_minutes),
        "x": ("x", x_1d.astype("float64")),
        "y": ("y", y_1d.astype("float64")),
    },
)

wind_fews["time"].attrs["units"] = FEWS_TIME_UNITS
wind_fews["amu"].attrs.update({"long_name": "x_wind", "units": "m s-1"})
wind_fews["amv"].attrs.update({"long_name": "y_wind", "units": "m s-1"})

# Do NOT set _FillValue in attrs (NetCDF writer wants it in encoding)
wind_fews.attrs["crs"] = str(TARGET_CRS)

print("\n‚úÖ FEWS wind dataset built:")
print(wind_fews)
print("Sizes:", dict(wind_fews.sizes))



‚úÖ Using TARGET_CRS: EPSG:26918
‚úÖ picked u from ds_u[u10]
‚úÖ picked v from ds_v[v10]
Raw u dims: ('time', 'y', 'x') shape: (8784, 1059, 1799)
Raw v dims: ('time', 'y', 'x') shape: (8784, 1059, 1799)
‚úÖ common time: 8784 | first/last: 2020-01-01T00:00:00.000000000 2020-12-31T23:00:00.000000000
Mask pixels inside ROI: 2
Subset index window:
  y: (700, 700) -> height: 1
  x: (1555, 1556) -> width : 2
‚úÖ u subset shape: (8784, 1, 2) | v subset shape: (8784, 1, 2)
‚úÖ final x/y sizes: 2 1

‚úÖ FEWS wind dataset built:
<xarray.Dataset> Size: 211kB
Dimensions:  (time: 8784, y: 1, x: 2)
Coordinates:
  * time     (time) int64 70kB 26297280 26297340 26297400 ... 26824200 26824260
  * y        (y) float64 8B 4.52e+06
  * x        (x) float64 16B 5.946e+05 5.975e+05
Data variables:
    amu      (time, y, x) float32 70kB 0.3772 0.4397 -0.6037 ... 3.199 3.511
    amv      (time, y, x) float32 70kB 0.8493 1.287 1.145 ... -1.981 -1.981
Attributes:
    crs:      EPSG:26918
Sizes: {'time': 8784, '

In [None]:
# ============================================================
# 7) Write FEWS netamuamvfile.nc + upload to SCRATCH_BUCKET via gcsfs
# ============================================================

import os
import tempfile
from pathlib import Path

import gcsfs

# ---- Use your preferred scratch bucket location
SCRATCH_BUCKET = os.environ.get("SCRATCH_BUCKET", "gs://leap-scratch/renriviera")
print("‚úÖ Using SCRATCH_BUCKET:", SCRATCH_BUCKET)

# ---- Where inside scratch you want the file
# We'll keep your OUT_PREFIX structure but rooted at SCRATCH_BUCKET
# Example final path:
# gs://leap-scratch/renriviera/sfincs_soundview_preproc/forcing/wind/...
out_prefix = f"{OUT_PREFIX}/forcing/wind"
out_gcs = f"{out_prefix}/sfincs_netamuamv_hrrr_u10v10_soundview_2020.nc"

# If OUT_PREFIX is already absolute gs://..., override to use SCRATCH_BUCKET explicitly:
if out_gcs.startswith("gs://"):
    # Make path relative after your scratch root if needed
    # If your OUT_PREFIX already starts with SCRATCH_BUCKET, keep it
    if not out_gcs.startswith(SCRATCH_BUCKET):
        # fallback: store under SCRATCH_BUCKET/forcing/wind/
        out_gcs = f"{SCRATCH_BUCKET}/forcing/wind/sfincs_netamuamv_hrrr_u10v10_soundview_2020.nc"

print("üìå Target scratch path:", out_gcs)

# ---- Write locally
local_dir = Path(tempfile.mkdtemp(prefix="sfincs_wind_fews_"))
out_local = local_dir / "netamuamvfile.nc"

print("Writing local:", out_local)

# IMPORTANT: remove _FillValue from attrs if you also set it in encoding
for var in ["amu", "amv"]:
    if "_FillValue" in wind_fews[var].attrs:
        wind_fews[var].attrs.pop("_FillValue", None)

# NetCDF encoding
WIND_NODATA = float(WIND_NODATA)
encoding = {
    "amu": {"dtype": "float32", "zlib": True, "complevel": 4, "_FillValue": WIND_NODATA},
    "amv": {"dtype": "float32", "zlib": True, "complevel": 4, "_FillValue": WIND_NODATA},
}

wind_fews.to_netcdf(out_local, encoding=encoding)
print("‚úÖ Local netcdf written:", out_local)

# ---- Upload with gcsfs (no gcloud/gsutil)
fs_gcs = gcsfs.GCSFileSystem(token="cloud")

# Convert gs://bucket/path -> bucket/path for gcsfs
assert out_gcs.startswith("gs://")
gcs_path_no_scheme = out_gcs.replace("gs://", "", 1)

print("Uploading via gcsfs ->", out_gcs)
fs_gcs.put(str(out_local), gcs_path_no_scheme)
print("‚úÖ Uploaded netamuamvfile to:", out_gcs)

# ---- Quick existence check
print("Exists on GCS:", fs_gcs.exists(gcs_path_no_scheme))


In [None]:
# ============================================================
# 8) Validate uploaded FEWS netamuamvfile.nc for HydroMT-SFINCS
#    Checks: existence, vars, dims, dtype, nodata, monotonic time,
#            CRS attrs, finite values, and basic range sanity.
# ============================================================

import os
import numpy as np
import xarray as xr
import gcsfs

SCRATCH_BUCKET = os.environ.get("SCRATCH_BUCKET", "gs://leap-scratch/renriviera")
print("‚úÖ Using SCRATCH_BUCKET:", SCRATCH_BUCKET)

# ---- Point to your uploaded file
# If you already have out_gcs from the previous cell, this will use it.
# Otherwise set it explicitly here:
try:
    OUT_NETCDF = out_gcs
except NameError:
    OUT_NETCDF = f"{SCRATCH_BUCKET}/forcing/wind/sfincs_netamuamv_hrrr_u10v10_soundview_2020.nc"

print("üìå Validating:", OUT_NETCDF)

# ---- Existence check
fs = gcsfs.GCSFileSystem(token="cloud")
gcs_path_no_scheme = OUT_NETCDF.replace("gs://", "", 1)
if not fs.exists(gcs_path_no_scheme):
    raise FileNotFoundError(f"‚ùå Not found on GCS: {OUT_NETCDF}")

print("‚úÖ File exists on GCS")

# ---- Open (gcsfs -> fsspec) without downloading
# (Works well for NetCDF4; if it fails, we fallback to caching locally.)
try:
    ds = xr.open_dataset(OUT_NETCDF, engine="netcdf4")
    print("‚úÖ Opened remotely with netcdf4 engine")
except Exception as e:
    print("‚ö†Ô∏è Remote open failed, caching locally. Reason:", type(e).__name__, "-", str(e)[:200])
    import tempfile
    from pathlib import Path

    local_dir = Path(tempfile.mkdtemp(prefix="sfincs_wind_validate_"))
    local_nc = local_dir / "netamuamvfile.nc"
    fs.get(gcs_path_no_scheme, str(local_nc))
    print("‚úÖ Downloaded to:", local_nc)
    ds = xr.open_dataset(local_nc, engine="netcdf4")
    print("‚úÖ Opened locally with netcdf4 engine")

print("\n--- Dataset summary ---")
print(ds)

# ============================================================
# Required structure for SFINCS FEWS wind forcing
# ============================================================

REQUIRED_VARS = ["amu", "amv"]
REQUIRED_DIMS = ("time", "y", "x")
EXPECTED_FILL = -9999.0

# ---- 1) Required variables exist
missing_vars = [v for v in REQUIRED_VARS if v not in ds.data_vars]
if missing_vars:
    raise AssertionError(f"‚ùå Missing required vars: {missing_vars}")
print("‚úÖ Required vars present:", REQUIRED_VARS)

# ---- 2) Each variable has dims (time,y,x)
for v in REQUIRED_VARS:
    if tuple(ds[v].dims) != REQUIRED_DIMS:
        raise AssertionError(f"‚ùå {v} dims {ds[v].dims} != {REQUIRED_DIMS}")
print("‚úÖ Variable dimensions are correct:", REQUIRED_DIMS)

# ---- 3) Dtypes are numeric + float-ish
for v in REQUIRED_VARS:
    if not np.issubdtype(ds[v].dtype, np.floating):
        raise AssertionError(f"‚ùå {v} dtype {ds[v].dtype} is not float")
print("‚úÖ Variable dtypes are float")

# ---- 4) Time coordinate validity
if "time" not in ds.coords:
    raise AssertionError("‚ùå Missing time coordinate")
if ds["time"].size < 2:
    raise AssertionError("‚ùå time coord too short")
if not np.issubdtype(ds["time"].dtype, np.datetime64):
    raise AssertionError(f"‚ùå time dtype should be datetime64, got {ds['time'].dtype}")

t = ds["time"].values
if not np.all(np.diff(t).astype("timedelta64[s]") > np.timedelta64(0, "s")):
    raise AssertionError("‚ùå time is not strictly increasing")
print("‚úÖ time is datetime64 and strictly increasing")

# ---- 5) Spatial coordinates exist and are 1D
for coord in ["x", "y"]:
    if coord not in ds.coords:
        raise AssertionError(f"‚ùå Missing coord: {coord}")
    if ds[coord].ndim != 1:
        raise AssertionError(f"‚ùå {coord} must be 1D, got ndim={ds[coord].ndim}")
print("‚úÖ x/y are present and 1D")

# ---- 6) FillValue / missing data check
def get_fillvalue(da):
    # Prefer encoding _FillValue, fallback to attrs
    fv = da.encoding.get("_FillValue", None)
    if fv is None:
        fv = da.attrs.get("_FillValue", None)
    return fv

for v in REQUIRED_VARS:
    fv = get_fillvalue(ds[v])
    if fv is None:
        print(f"‚ö†Ô∏è {v}: no _FillValue found in encoding/attrs (not always fatal)")
    else:
        if not np.isclose(float(fv), float(EXPECTED_FILL)):
            raise AssertionError(f"‚ùå {v}: _FillValue={fv} != expected {EXPECTED_FILL}")
        print(f"‚úÖ {v}: _FillValue OK ({fv})")

# ---- 7) Check for NaNs (should typically be filled, not NaN)
for v in REQUIRED_VARS:
    # sample a small slice to avoid loading everything
    sample = ds[v].isel(time=slice(0, min(48, ds.dims["time"]))).values
    if np.isnan(sample).any():
        raise AssertionError(f"‚ùå {v}: contains NaNs (should be filled to nodata={EXPECTED_FILL})")
print("‚úÖ No NaNs detected in early time sample")

# ---- 8) Basic range sanity (wind in m/s, very broad allowed)
# HRRR winds can spike but this catches unit mistakes like km/h or knots
for v in REQUIRED_VARS:
    sample = ds[v].isel(time=slice(0, min(168, ds.dims["time"]))).values  # 1 week
    # ignore fillvalues
    sample = sample[np.isfinite(sample)]
    sample = sample[sample != EXPECTED_FILL]
    if sample.size == 0:
        raise AssertionError(f"‚ùå {v}: no valid data found after removing fillvalues")
    p99 = float(np.quantile(sample, 0.99))
    p01 = float(np.quantile(sample, 0.01))
    print(f"Sanity {v}: p01={p01:.2f}, p99={p99:.2f} (m/s)")
    if abs(p99) > 80 or abs(p01) > 80:
        raise AssertionError(f"‚ùå {v}: suspicious wind magnitude (>80 m/s). Units wrong?")
print("‚úÖ Wind magnitude sanity checks passed")

# ---- 9) Optional CRS attribute (nice to have)
crs_attr = ds.attrs.get("crs", None)
if crs_attr is None:
    print("‚ö†Ô∏è Dataset attribute 'crs' missing (not always fatal).")
else:
    print("‚úÖ Dataset CRS attr:", crs_attr)

# ---- 10) Final verdict
print("\n‚úÖ FEWS netamuamvfile.nc looks HydroMT-SFINCS compatible.")
print("   - Vars: amu/amv")
print("   - Dims: (time,y,x)")
print("   - time is monotonic datetime64")
print("   - x/y are 1D coords")
print("   - FillValue handled")
