# Forecasting Freshwater Algal Bloom Levels Using Multisource Climate and Water-Quality Data

*Course project of **STATS 402: Interdisciplinary Data Analysis**.*

**Name:** Ziyue Yin

**NetID:** zy166

**How to use this Notebook *wisely*?**

## Dataset: HydroLAKES

Lake polygons (including all attributes) in shapefile format can be found here: https://www.hydrosheds.org/products/hydrolakes. Simply scroll down to the "Data download" section and click on the link to start downloading the zipped file of:
- Lake polygons (including all attributes) in shapefileformat (820MB).

*TL;DR:* Click on https://data.hydrosheds.org/file/hydrolakes/HydroLAKES_polys_v10_shp.zip to download the data.

This dataset will be later used as a standard area for filtering out the **Five Great Lakes** of the United States. **No** preprocessing is specified or needed thus far.

## Dataset: NASA OceanColor Inland Waters (ILW)

S3Merged-ILW data: https://oceandata.sci.gsfc.nasa.gov/directdataaccess/Level-3%20Mapped/Merged-S3-ILW/.

S3B-ILW data: https://oceandata.sci.gsfc.nasa.gov/directdataaccess/Level-3%20Mapped/S3B-ILW/.

After downloading the datasets, the structure should be shown as follows:

```
datasets/
 ├── ILW/
 │    ├── S3B/2024/CONUS_MO/
 │    │      ├── S3B_OLCI_EFRNT.20240101_20240131.L3m.MO.ILW_CONUS.V5.all.CONUS.300m.nc
 │    │      ├── S3B_OLCI_EFRNT.20240201_20240229.L3m.MO.ILW_CONUS.V5.all.CONUS.300m.nc
 │    ├── Merged/2024/CONUS_DAY/
 │    │      ├── S3M_OLCI_EFRNT.20240101.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc
 │    │      ├── S3M_OLCI_EFRNT.20240102.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc
 │    │      ...
```

### Data Structure Exploration

First of all, let's glance at the **monthly** dataset.

In [1]:
import xarray as xr

p = "/dkucc/home/zy166/HAB-forecasting/datasets/ILW/S3B/2024/CONUS_MO/S3B_OLCI_EFRNT.20240101_20240131.L3m.MO.ILW_CONUS.V5.all.CONUS.300m.nc"

ds = xr.open_dataset(p, engine="netcdf4", chunks="auto")
print(ds.dims)
print(list(ds.data_vars))

['rhos_400', 'rhos_412', 'rhos_443', 'rhos_490', 'rhos_510', 'rhos_560', 'rhos_620', 'rhos_665', 'rhos_674', 'rhos_681', 'rhos_709', 'rhos_754', 'rhos_865', 'rhos_884', 'CI_cyano', 'palette']


And also, the **daily** dataset.

In [2]:
import xarray as xr

p = "/dkucc/home/zy166/HAB-forecasting/datasets/ILW/Merged/2024/CONUS_DAY/S3M_OLCI_EFRNT.20240101.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc"

ds = xr.open_dataset(p, engine="netcdf4", chunks="auto")
print(ds.dims)
print(list(ds.data_vars))

['rhos_400', 'rhos_412', 'rhos_443', 'rhos_490', 'rhos_510', 'rhos_560', 'rhos_620', 'rhos_665', 'rhos_674', 'rhos_681', 'rhos_709', 'rhos_754', 'rhos_865', 'rhos_884', 'CI_cyano', 'palette']


In [1]:
from pathlib import Path
import xarray as xr
import numpy as np

daily_dir = Path("/dkucc/home/zy166/HAB-forecasting/datasets/ILW/Merged/2024/CONUS_DAY")
nc_path = sorted(daily_dir.glob("S3M_OLCI_EFRNT.*.L3m.DAY.*.nc"))[0]
print("Using file:", nc_path)

ds = xr.open_dataset(nc_path)
print("\n=== DATASET SUMMARY ===")
print(ds)

print("\n=== DATA VARS ===")
for name, da in ds.data_vars.items():
    print(f"- {name}: dims={da.dims}, attrs keys={list(da.attrs.keys())}")

print("\n=== COORDS ===")
for name, da in ds.coords.items():
    print(f"- {name}: dims={da.dims}")

Using file: /dkucc/home/zy166/HAB-forecasting/datasets/ILW/Merged/2024/CONUS_DAY/S3M_OLCI_EFRNT.20240101.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc

=== DATASET SUMMARY ===
<xarray.Dataset> Size: 27GB
Dimensions:   (y: 15138, x: 26328, rgb: 3, eightbitcolor: 256)
Coordinates:
    lat       (y, x) float32 2GB ...
    lon       (y, x) float32 2GB ...
Dimensions without coordinates: y, x, rgb, eightbitcolor
Data variables: (12/16)
    rhos_400  (y, x) float32 2GB ...
    rhos_412  (y, x) float32 2GB ...
    rhos_443  (y, x) float32 2GB ...
    rhos_490  (y, x) float32 2GB ...
    rhos_510  (y, x) float32 2GB ...
    rhos_560  (y, x) float32 2GB ...
    ...        ...
    rhos_709  (y, x) float32 2GB ...
    rhos_754  (y, x) float32 2GB ...
    rhos_865  (y, x) float32 2GB ...
    rhos_884  (y, x) float32 2GB ...
    CI_cyano  (y, x) float32 2GB ...
    palette   (rgb, eightbitcolor) uint8 768B ...
Attributes: (12/63)
    product_name:                      S3M_OLCI_EFRNT.20240101.L3m.DAY.ILW_C

### Target Feature Extraction

#### Utilities

In [3]:
import re
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
import geopandas as gpd
import rioxarray  # 目前不再在裁剪中使用，但保留以防其他地方需要

def infer_time_label(nc_path, ds, product="monthly"):
    """
    Return a pandas.Timestamp, try to infer from ds or filename.
    product: 'monthly' or 'daily'
    """
    # 1) Directly have time coordinate/variable
    if "time" in ds.coords or "time" in ds.variables:
        try:
            tt = pd.to_datetime(ds["time"].values)
            tt = np.array(tt).reshape(-1)[0]
            return pd.to_datetime(tt)
        except Exception:
            pass

    # 2) Global attributes (common in L3M data)
    start = ds.attrs.get("time_coverage_start") or ds.attrs.get("start_time")
    end   = ds.attrs.get("time_coverage_end")   or ds.attrs.get("end_time")
    if start and end:
        try:
            ts = pd.to_datetime(start)
            te = pd.to_datetime(end)
            if product == "monthly":
                return ts + (te - ts) / 2
            else:
                return ts
        except Exception:
            pass

    # 3) Analyze the filename
    fn = nc_path.split("/")[-1]
    if product == "monthly":
        # ...YYYYMMDD_YYYYMMDD.L3m.MO...
        m = re.search(r"\.(\d{8})_(\d{8})\.L3m\.MO\.", fn)
        if m:
            b, e = m.group(1), m.group(2)
            ts = pd.to_datetime(b, format="%Y%m%d")
            te = pd.to_datetime(e, format="%Y%m%d")
            return ts + (te - ts) / 2
    else:
        # ...YYYYMMDD.L3m.DAY...
        m = re.search(r"\.(\d{8})\.L3m\.DAY\.", fn)
        if m:
            return pd.to_datetime(m.group(1), format="%Y%m%d")

    raise ValueError("Cannot infer time from dataset or filename: " + fn)

def clean_ci(da: xr.DataArray) -> xr.DataArray:
    """
    Filter out values out of physical range and remove near-zero values.
    """
    vmin = float(da.attrs.get("valid_min", np.nan))
    vmax = float(da.attrs.get("valid_max", np.nan))
    if np.isfinite(vmin):
        da = da.where(da >= vmin)
    if np.isfinite(vmax):
        da = da.where(da <= vmax)

    # drop near-zero (background) CI
    thr = max(vmin, 5e-5) if np.isfinite(vmin) else 5e-5
    da = da.where(da > thr)

    return da

def _extract_lakes_core_from_ds(
    ds: xr.Dataset,
    nc_path: str,
    lakes_gdf: gpd.GeoDataFrame,
    lake_id_col: str,
    product: str,
    time_label: pd.Timestamp,
    engine: str,
) -> pd.DataFrame:
    """
    核心提取逻辑：针对一个 xarray.Dataset，用 lat/lon + bbox 做掩膜裁剪 CI_cyano。
    兼容曲线网格(lat(y,x), lon(y,x))。
    返回列：
      lake_id, time, product, CI_mean, CI_p90, n_valid, src, engine
    """
    if "CI_cyano" not in ds.data_vars:
        raise KeyError("CI_cyano not found in dataset data_vars")

    da = ds["CI_cyano"]
    da = clean_ci(da)

    # 期望 lat, lon 是 2D (y,x)，与 CI_cyano 维度一致
    if "lat" not in ds.variables or "lon" not in ds.variables:
        raise KeyError("lat / lon not found in dataset")

    lat = ds["lat"]
    lon = ds["lon"]

    if lat.shape != da.shape or lon.shape != da.shape:
        # 极端情况下可以做广播，但对 ILW_CONUS 来说应该是一致的
        raise ValueError(
            f"lat/lon shape {lat.shape}/{lon.shape} not matching CI_cyano {da.shape}"
        )

    rows = []
    for _, r in lakes_gdf.iterrows():
        lid = r[lake_id_col]
        geom = r.geometry
        minx, miny, maxx, maxy = geom.bounds  # 经度, 纬度

        # 使用 bbox 对 lat/lon 作初筛
        mask = (
            (lon >= minx) & (lon <= maxx) &
            (lat >= miny) & (lat <= maxy)
        )

        sub = da.where(mask)
        arr = np.asarray(sub.values)
        finite = np.isfinite(arr)
        n_valid = int(finite.sum())

        if n_valid > 0:
            vals = arr[finite].ravel()
            mean_val = float(np.nanmean(vals))
            p90      = float(np.nanquantile(vals, 0.9))
        else:
            mean_val, p90 = np.nan, np.nan

        rows.append({
            "lake_id": lid,
            "time":   pd.to_datetime(time_label),
            "product": product,
            "CI_mean": mean_val,
            "CI_p90":  p90,
            "n_valid": n_valid,
            "src":     Path(nc_path).name,
            "engine":  engine,
        })

    return pd.DataFrame(rows)

def extract_lakes_from_nc(nc_path: str,
                          lakes_gdf: gpd.GeoDataFrame,
                          lake_id_col: str,
                          product: str) -> pd.DataFrame:
    """
    nc_path: A single NetCDF file (S3B monthly or S3M daily)
    lakes_gdf: A GeoDataFrame containing `lake_id` and `geometry` (EPSG:4326)
    product: 'monthly' | 'daily'
    Returns: One row per lake (timestamp of the file)
    """
    nc_path = str(nc_path)
    with xr.open_dataset(nc_path, engine="netcdf4") as ds:
        t = infer_time_label(nc_path, ds, product=product)
        df = _extract_lakes_core_from_ds(
            ds=ds,
            nc_path=nc_path,
            lakes_gdf=lakes_gdf,
            lake_id_col=lake_id_col,
            product=product,
            time_label=t,
            engine="netcdf4",
        )
    return df

def set_spatial_dims_safe(da: xr.DataArray, ds: xr.Dataset | None = None) -> xr.DataArray:
    """
    [遗留函数] 现在不再用于 lake 提取，但保留以兼容其他代码。
    Make `da` geospatially aware for rioxarray:
    - Prefer real dimension names present on `da` (lon/lat or x/y or variants);
    - Only pass EXISTING dimension names to `rio.set_spatial_dims`;
    - If dataset carries 1D lon/lat arrays matching x/y lengths, bind them as coords;
    - Finally write CRS=EPSG:4326.

    NOTE: If lon/lat are 2D (curvilinear), we DO NOT pass them as dims; we keep x/y dims.
    """
    dims = list(da.dims)

    # normalize common variants
    def _has(*cands):
        return any(c in dims for c in cands)

    # Case A: dims already lon/lat
    if "lon" in dims and "lat" in dims:
        out = da.rio.write_crs(4326)
        out = out.rio.set_spatial_dims(x_dim="lon", y_dim="lat", inplace=False)
    elif "longitude" in dims and "latitude" in dims:
        out = da.rename({"longitude": "lon", "latitude": "lat"})
        out = out.rio.write_crs(4326)
        out = out.rio.set_spatial_dims(x_dim="lon", y_dim="lat", inplace=False)

    # Case B: dims are x/y (any case)
    elif (_has("x") and _has("y")) or (_has("X") and _has("Y")):
        # rename upper-case to lower-case to please rioxarray
        rename_map = {}
        if "X" in dims: rename_map["X"] = "x"
        if "Y" in dims: rename_map["Y"] = "y"
        out = da.rename(rename_map) if rename_map else da
        out = out.rio.write_crs(4326)
        out = out.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=False)

        # try binding 1D lon/lat coords from ds if lengths match
        if ds is not None:
            try:
                lon1d = None
                lat1d = None
                for lon_name in ("lon","longitude"):
                    if lon_name in ds.variables and ds[lon_name].ndim == 1 and ds[lon_name].sizes[ds[lon_name].dims[0]] == out.sizes["x"]:
                        lon1d = np.asarray(ds[lon_name].values)
                        break
                for lat_name in ("lat","latitude"):
                    if lat_name in ds.variables and ds[lat_name].ndim == 1 and ds[lat_name].sizes[ds[lat_name].dims[0]] == out.sizes["y"]:
                        lat1d = np.asarray(ds[lat_name].values)
                        break
                if lon1d is not None and lat1d is not None:
                    out = out.assign_coords(x=lon1d, y=lat1d)
            except Exception:
                pass

    # Case C: unknown names → rename the last two dimensions to y/x
    elif len(dims) >= 2:
        ydim, xdim = dims[-2], dims[-1]
        out = da.rename({xdim: "x", ydim: "y"}).rio.write_crs(4326)
        out = out.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=False)

    else:
        raise ValueError(f"Cannot determine spatial dims for {da.name!r}; dims={dims}")

    return out

def looks_like_hdf5(path: Path) -> bool:
    try:
        with open(path, "rb") as f:
            sig = f.read(8)
        return sig.startswith(b"\x89HDF") or sig.startswith(b"CDF")
    except Exception:
        return False

def _extract_one_with_h5netcdf(nc_path: Path,
                               lakes_gdf: gpd.GeoDataFrame,
                               lake_id_col: str,
                               product: str = "daily") -> pd.DataFrame:
    """
    兜底方案：用 h5netcdf 打开并在本函数内完成裁剪与统计。
    现在同样使用 lat/lon + bbox 掩膜，不再调用 rioxarray.clip。
    返回列同 extract_lakes_from_nc：
      lake_id, time, product, CI_mean, CI_p90, n_valid, src, engine
    """
    nc_path = Path(nc_path)
    with xr.open_dataset(nc_path, engine="h5netcdf", phony_dims="access") as ds:
        t = infer_time_label(str(nc_path), ds, product=product)
        df = _extract_lakes_core_from_ds(
            ds=ds,
            nc_path=str(nc_path),
            lakes_gdf=lakes_gdf,
            lake_id_col=lake_id_col,
            product=product,
            time_label=t,
            engine="h5netcdf",
        )
    return df

def try_open_xarray(fp: Path):
    """Try netcdf4 → h5netcdf → h5py，return (ds or None, engine_used)。"""
    try:
        ds = xr.open_dataset(fp, engine="netcdf4", chunks="auto")
        _ = ds.dims
        return ds, "netcdf4"
    except Exception as e1:
        try:
            ds = xr.open_dataset(fp, engine="h5netcdf", chunks="auto", phony_dims="access")
            _ = ds.dims
            return ds, "h5netcdf"
        except Exception as e2:
            try:
                import h5py
                with h5py.File(fp, "r") as f:
                    pass
                print(f"[WARN] {fp.name} readable by h5py but not by xarray engines (netCDF-4 layout issue?)")
            except Exception as e3:
                print(f"[WARN] {fp.name} not readable even by h5py: {e3}")
            print(f"[SKIP] {fp.name} → netcdf4:{e1} | h5netcdf:{e2}")
            return None, None

#### Scale 1: In general

##### Monthly

Here, we use the **S3B Monthly** data. One month per row.

In [4]:
import glob, numpy as np, pandas as pd, xarray as xr
from pathlib import Path

monthly_dir = Path("/dkucc/home/zy166/HAB-forecasting/datasets/ILW/S3B/2024/CONUS_MO")
out_csv = monthly_dir/"ci_cyano_monthly_mean.csv"

rows = []
for fp in sorted(monthly_dir.glob("S3B_OLCI_EFRNT.*.L3m.MO.ILW_CONUS.V5.all.CONUS.300m.nc")):
    with xr.open_dataset(fp, engine="netcdf4", chunks="auto") as ds:
        da = clean_ci(ds["CI_cyano"])
        arr = np.asarray(da.data)
        mask = np.isfinite(arr)
        m   = float(np.nanmean(arr[mask])) if mask.any() else np.nan
        p90 = float(np.nanquantile(arr[mask], 0.9))    if mask.any() else np.nan
        t   = infer_time_label(str(fp), ds, product="monthly")
        rows.append({"time": pd.to_datetime(t), "CI_mean": m, "CI_p90": p90,
                     "n_valid": int(mask.sum())})

df_mo = pd.DataFrame(rows).sort_values("time").reset_index(drop=True)
df_mo.to_csv(out_csv, index=False)
df_mo.head()

Unnamed: 0,time,CI_mean,CI_p90,n_valid
0,2024-01-16 16:53:28.500000+00:00,0.000591,0.000472,2030281
1,2024-02-15 17:12:11.500000+00:00,0.000539,0.000351,2204191
2,2024-03-16 16:44:09+00:00,0.000492,0.000166,1810493
3,2024-04-16 04:52:02+00:00,0.000496,0.00054,2151149
4,2024-05-16 17:01:29+00:00,0.0005,0.000836,2586974


##### Daily

Here, we use the **S3M Daily** data. One day per row.

In [5]:
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
import h5py
import geopandas as gpd

daily_dir = Path("/dkucc/home/zy166/HAB-forecasting/datasets/ILW/Merged/2024/CONUS_DAY")
out_csv = daily_dir / "ci_cyano_daily_mean.csv"

rows = []
for fp in sorted(daily_dir.glob("S3M_OLCI_EFRNT.*.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc")):
    if not looks_like_hdf5(fp):
        print(f"[WARN] Skip (not HDF5 header): {fp.name}")
        continue

    ds, eng = try_open_xarray(fp)
    if ds is None:
        continue

    try:
        da = clean_ci(ds["CI_cyano"])
        arr = np.asarray(da.data)
        mask = np.isfinite(arr)
        m   = float(np.nanmean(arr[mask])) if mask.any() else np.nan
        p90 = float(np.nanquantile(arr[mask], 0.9)) if mask.any() else np.nan

        t = infer_time_label(str(fp), ds, product="daily")

        rows.append({
            "date":   pd.to_datetime(t),
            "CI_mean": m,
            "CI_p90":  p90,
            "n_valid": int(mask.sum()),
            "src":     fp.name,
            "engine":  eng,
        })
    finally:
        ds.close()

df_day = pd.DataFrame(rows).sort_values("date").reset_index(drop=True)
df_day.to_csv(out_csv, index=False)
print(f"[OK] saved → {out_csv} (rows={len(df_day)})")
df_day.head()

[WARN] S3M_OLCI_EFRNT.20240211.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc not readable even by h5py: Unable to synchronously open file (truncated file: eof = 323158016, sblock->base_addr = 0, stored_eof = 763662772)
[SKIP] S3M_OLCI_EFRNT.20240211.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc → netcdf4:[Errno -101] NetCDF: HDF error: '/dkucc/home/zy166/HAB-forecasting/datasets/ILW/Merged/2024/CONUS_DAY/S3M_OLCI_EFRNT.20240211.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc' | h5netcdf:Unable to synchronously open file (truncated file: eof = 323158016, sblock->base_addr = 0, stored_eof = 763662772)
[WARN] S3M_OLCI_EFRNT.20241016.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc not readable even by h5py: Unable to synchronously open file (truncated file: eof = 411697152, sblock->base_addr = 0, stored_eof = 978029795)
[SKIP] S3M_OLCI_EFRNT.20241016.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc → netcdf4:[Errno -101] NetCDF: HDF error: '/dkucc/home/zy166/HAB-forecasting/datasets/ILW/Merged/2024/CONUS_DAY/S3M_OLCI_EFRNT.2024101

Unnamed: 0,date,CI_mean,CI_p90,n_valid,src,engine
0,2024-01-01 13:52:31+00:00,0.000432,5e-05,1024210,S3M_OLCI_EFRNT.20240101.L3m.DAY.ILW_CONUS.V5.a...,netcdf4
1,2024-01-02 13:36:49+00:00,0.000274,5e-05,1047655,S3M_OLCI_EFRNT.20240102.L3m.DAY.ILW_CONUS.V5.a...,netcdf4
2,2024-01-03 14:02:06+00:00,0.000368,5e-05,865219,S3M_OLCI_EFRNT.20240103.L3m.DAY.ILW_CONUS.V5.a...,netcdf4
3,2024-01-04 13:40:26+00:00,0.00023,5e-05,1144789,S3M_OLCI_EFRNT.20240104.L3m.DAY.ILW_CONUS.V5.a...,netcdf4
4,2024-01-05 13:48:41+00:00,0.000273,5e-05,1152641,S3M_OLCI_EFRNT.20240105.L3m.DAY.ILW_CONUS.V5.a...,netcdf4


#### Scale 2: Five Great Lakes

First of all, we get the five Great Lakes out explicitly.

In [4]:
from pathlib import Path
import geopandas as gpd

src = "/dkucc/home/zy166/HAB-forecasting/datasets/Lakes/shapes/lakes_greatlakes.gpkg"
gdf = gpd.read_file(src)

keep_names = ["Superior","Michigan","Huron","Erie","Ontario"]
gdf = gpd.read_file(src)
gdf5 = gdf[gdf["Lake_name"].str.fullmatch("|".join(keep_names), case=False)].copy()
gdf5 = gdf5.dissolve(by="Lake_name", as_index=False).rename(columns={"Lake_name":"lake_name"})

order = ["Erie","Huron","Michigan","Ontario","Superior"]
gdf5 = gdf5.set_index("lake_name").loc[order].reset_index()

# Buffer the shoreline by 300 m, first to equidistant projection, then buffer, then back to 4326
gdf5m = gdf5.to_crs(5070)
gdf5m["geometry"] = gdf5m.buffer(-300)
gdf5m = gdf5m[~gdf5m.geometry.is_empty].copy()
gdf5  = gdf5m.to_crs(4326).reset_index(drop=True)

gdf5["lake_id"] = [f"GL-{i+1}" for i in range(len(gdf5))]

out = Path(src).with_name("lakes_greatlakes_5poly.gpkg")
gdf5.to_file(out, driver="GPKG")
print("Saved:", out, "features:", len(gdf5))

Saved: /dkucc/home/zy166/HAB-forecasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg features: 5


##### Monthly

In [5]:
from pathlib import Path
import geopandas as gpd

def run_monthly(monthly_dir: str,
                lakes_fp: str,
                lake_id_col: str,
                out_parquet: str):
    """
    monthly_dir: Directory containing files like S3B_OLCI_EFRNT.*.L3m.MO.ILW_CONUS...nc
    lakes_fp:    Lake boundaries (gpkg/shp, must be EPSG:4326)
    """
    gdf = gpd.read_file(lakes_fp)
    if gdf.crs is None:
        raise ValueError("The lake file is missing CRS, please ensure it is EPSG:4326")
    gdf = gdf.to_crs(4326)[[lake_id_col, "geometry"]].dropna()

    out_rows = []
    for fp in sorted(Path(monthly_dir).glob("S3B_OLCI_EFRNT.*.L3m.MO.*.nc")):
        df_one = extract_lakes_from_nc(str(fp), gdf, lake_id_col, product="monthly")
        out_rows.append(df_one)

    if not out_rows:
        print("No monthly files found.")
        return

    df_all = pd.concat(out_rows, ignore_index=True)
    Path(out_parquet).parent.mkdir(parents=True, exist_ok=True)
    df_all.to_parquet(out_parquet, index=False)
    print(f"[monthly] saved → {out_parquet}  ({len(df_all)} rows)")

run_monthly(
    monthly_dir="/dkucc/home/zy166/HAB-forecasting/datasets/ILW/S3B/2024/CONUS_MO",
    lakes_fp="/dkucc/home/zy166/HAB-forecasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg",
    lake_id_col="lake_id",
    out_parquet="/dkucc/home/zy166/HAB-forecasting/datasets/processed/lake_ci_monthly.parquet"
)

[monthly] saved → /dkucc/home/zy166/HAB-forecasting/datasets/processed/lake_ci_monthly.parquet  (60 rows)


##### Daily

In [None]:
from pathlib import Path
import geopandas as gpd
import numpy as np

def run_daily(daily_dir: str,
              lakes_fp: str,
              lake_id_col: str,
              out_parquet: str):
    """
    daily_dir: Directory containing files like S3M_OLCI_EFRNT.*.L3m.DAY.ILW_CONUS...nc
    """
    daily_dir = Path(daily_dir)
    gdf = gpd.read_file(lakes_fp)
    if gdf.crs is None:
        raise ValueError("The lake file is missing CRS, please ensure it is EPSG:4326")
    gdf = gdf.to_crs(4326)[[lake_id_col, "geometry"]].dropna()

    out_rows = []
    files = sorted(daily_dir.glob("S3M_OLCI_EFRNT.*.L3m.DAY.*.nc"))
    if not files:
        print(f"[WARN] No daily files found under: {daily_dir}")
        return

    for fp in files:
        # 1) 文件头快速校验
        if not looks_like_hdf5(fp):
            print(f"[WARN] Skip (not HDF5/NetCDF header): {fp.name}")
            continue

        # 2) 首选：netcdf4 引擎
        try:
            df_one = extract_lakes_from_nc(str(fp), gdf, lake_id_col, product="daily")
            # 统一列名为 date
            if "time" in df_one.columns and "date" not in df_one.columns:
                df_one = df_one.rename(columns={"time": "date"})
            df_one["src"] = fp.name
            df_one["engine"] = "netcdf4"
            out_rows.append(df_one)
            continue
        except Exception as e1:
            print(f"[WARN] netcdf4 failed on {fp.name}: {e1}")

        # 3) 兜底：h5netcdf 内联提取
        try:
            df_one = _extract_one_with_h5netcdf(fp, gdf, lake_id_col, product="daily")
            if "time" in df_one.columns and "date" not in df_one.columns:
                df_one = df_one.rename(columns={"time": "date"})
            out_rows.append(df_one)
        except Exception as e2:
            print(f"[SKIP] {fp.name}: h5netcdf fallback failed → {e2}")

    if not out_rows:
        print("No valid daily rows produced.")
        return

    df_all = pd.concat(out_rows, ignore_index=True)

    # 基本整理：日期排序、列顺序、小型诊断
    keep_cols = ["lake_id", "date", "product", "CI_mean", "CI_p90", "n_valid", "src", "engine"]
    for c in keep_cols:
        if c not in df_all.columns:
            df_all[c] = np.nan if c not in ("lake_id", "product", "src", "engine") else None
    df_all = df_all[keep_cols].sort_values(["lake_id", "date"]).reset_index(drop=True)

    Path(out_parquet).parent.mkdir(parents=True, exist_ok=True)
    df_all.to_parquet(out_parquet, index=False)
    print(f"[daily] saved → {out_parquet}  (rows={len(df_all)}, files={len(files)})")

run_daily(
    daily_dir="/dkucc/home/zy166/HAB-forecasting/datasets/ILW/Merged/2024/CONUS_DAY",
    lakes_fp="/dkucc/home/zy166/HAB-forecasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg",
    lake_id_col="lake_id",
    out_parquet="/dkucc/home/zy166/HAB-forecasting/datasets/processed/lake_ci_daily.parquet"
)

### Data Quality Control

In [1]:
"""
QC & Completeness checks for NASA ILW Cyanobacteria Index (CI_cyano)
- Regional (CONUS) daily & monthly time series
- Lake-level (Great Lakes) daily & monthly time series

Inputs (already prepared by you):
1) /dkucc/home/zy166/HAB-forecasting/datasets/ILW/Merged/2024/CONUS_DAY/ci_cyano_daily_mean.csv
2) /dkucc/home/zy166/HAB-forecasting/datasets/ILW/S3B/2024/CONUS_MO/ci_cyano_monthly_mean.csv
3) /dkucc/home/zy166/HAB-forecasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg
4) /dkucc/home/zy166/HAB-forecasting/datasets/processed/lake_ci_monthly.parquet
5) /dkucc/home/zy166/HAB-forecasting/datasets/processed/lake_ci_daily.parquet

Outputs:
- Cleaned regional daily/monthly CSVs with QC flags
- Cleaned lake-level daily/monthly Parquet with QC flags and completeness metrics
- Simple summary CSVs per product (row counts, missing rates, clipping, etc.)
"""

from pathlib import Path
import numpy as np
import pandas as pd
import geopandas as gpd

# ----------------------------
# Parameters (tune as needed)
# ----------------------------
# Resolution of ILW L3m grid (meters)
PIX_RES_M = 300.0

# Minimal valid pixel ratio when judged by geometry (n_valid / expected_pixels)
MIN_PCT_VALID_GEOM = 0.10   # drop if coverage < 10%

# Minimal absolute valid pixels per lake-time
MIN_ABS_PIX = 50            # drop very tiny coverage

# Empirical coverage threshold relative to best observed coverage for that lake
MIN_PCT_VALID_EMP = 0.10    # drop if n_valid < 10% of lake's max observed n_valid

# CI cleaning thresholds
NEAR_ZERO_THRESHOLD = 5e-5  # drop near-zero values (already applied in earlier step, kept as doc)
CLIP_QUANTILE_LOW  = 0.001  # lower clip for outliers
CLIP_QUANTILE_HIGH = 0.999  # upper clip for outliers

# Interpolation limits for regional (optional smoothing to fill short gaps)
INTERP_LIMIT_DAYS = 3

# ----------------------------
# Paths
# ----------------------------
P_CONUS_DAILY   = Path("/dkucc/home/zy166/HAB-forecasting/datasets/ILW/Merged/2024/CONUS_DAY/ci_cyano_daily_mean.csv")
P_CONUS_MONTHLY = Path("/dkucc/home/zy166/HAB-forecasting/datasets/ILW/S3B/2024/CONUS_MO/ci_cyano_monthly_mean.csv")
P_LAKES_GPKG    = Path("/dkucc/home/zy166/HAB-forecasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg")
P_LAKE_DAILY    = Path("/dkucc/home/zy166/HAB-forecasting/datasets/processed/lake_ci_daily.parquet")
P_LAKE_MONTHLY  = Path("/dkucc/home/zy166/HAB-forecasting/datasets/processed/lake_ci_monthly.parquet")

OUT_DIR = Path("/dkucc/home/zy166/HAB-forecasting/datasets/processed/qc")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# ----------------------------
# Utilities
# ----------------------------
def _clip_series_q(s: pd.Series, qlow=CLIP_QUANTILE_LOW, qhigh=CLIP_QUANTILE_HIGH) -> pd.Series:
    """Clip a numeric series by quantiles; preserve NaNs."""
    if s.dropna().empty:
        return s
    lo = s.quantile(qlow)
    hi = s.quantile(qhigh)
    return s.clip(lower=lo, upper=hi)

def _ensure_datetime(df: pd.DataFrame,
                     prefer_cols=("date", "time")) -> pd.DataFrame:
    """
    Robust datetime parsing:
    - try 'date' then 'time' (or any column that contains these names)
    - tolerant to mixed formats; coerce to UTC tz-aware then drop tz
    - drop NaT rows
    """
    col = None
    for cand in prefer_cols:
        if cand in df.columns:
            col = cand
            break
    if col is None:
        for c in df.columns:
            lc = str(c).lower()
            if "date" in lc or "time" in lc or "datetime" in lc or "timestamp" in lc:
                col = c
                break
    if col is None:
        raise ValueError("No recognizable datetime column (expect 'date'/'time').")

    dt = pd.to_datetime(df[col], utc=True, format="mixed", errors="coerce")
    dt = dt.dt.tz_convert("UTC").dt.tz_localize(None)

    df = df.copy()
    df["date"] = dt
    before = len(df)
    df = df.dropna(subset=["date"]).reset_index(drop=True)
    if len(df) < before:
        print(f"[QC] Dropped {before - len(df)} rows with unparseable datetime in column '{col}'")
    return df

def _summarize_basic(df: pd.DataFrame, tag: str) -> pd.DataFrame:
    """Basic counts and missingness summary for quick logging."""
    out = {
        "tag": tag,
        "rows": len(df),
        "n_missing_CI_mean": int(df["CI_mean"].isna().sum()) if "CI_mean" in df.columns else None,
        "n_missing_CI_p90":  int(df["CI_p90"].isna().sum()) if "CI_p90" in df.columns else None,
        "n_missing_n_valid": int(df["n_valid"].isna().sum()) if "n_valid" in df.columns else None,
    }
    return pd.DataFrame([out])

# ----------------------------
# QC for regional (CONUS) time series
# ----------------------------
def qc_conus_timeseries(csv_path: Path, freq: str, out_prefix: str) -> None:
    df = pd.read_csv(csv_path)

    expected = {"CI_mean", "CI_p90", "n_valid"}
    missing = expected - set(df.columns)
    if missing:
        print(f"[WARN] {csv_path.name} is missing columns: {missing}")

    # robust parse + drop NaT
    df = _ensure_datetime(df, prefer_cols=("date", "time"))

    # normalize timestamps to start-of-period
    if freq.upper() == "D":
        df["date"] = df["date"].dt.floor("D")
    elif freq.upper() == "MS":
        df["date"] = df["date"].dt.to_period("M").dt.to_timestamp(how="start")
    else:
        df["date"] = df["date"].dt.to_period(freq).dt.start_time

    df = df.sort_values("date").drop_duplicates(subset=["date"]).reset_index(drop=True)

    # build regular index over the observed span
    idx = pd.date_range(df["date"].min(), df["date"].max(), freq=freq)
    df = df.set_index("date").reindex(idx).rename_axis("date").reset_index()

    # short-gap interpolation on CI columns (both directions)
    for c in ["CI_mean", "CI_p90"]:
        if c in df.columns:
            df[c] = df[c].interpolate(
                limit=INTERP_LIMIT_DAYS if freq.upper()=="D" else 1,
                limit_direction="both"
            )

    # robust quantile clipping
    for c in ["CI_mean", "CI_p90"]:
        if c in df.columns:
            df[c] = _clip_series_q(df[c])

    if "n_valid" in df.columns:
        nv_thr = df["n_valid"].quantile(0.05) if df["n_valid"].notna().any() else 0
        df["qc_low_coverage"] = (df["n_valid"] < nv_thr).astype(int)

    out_csv = OUT_DIR / f"{out_prefix}_clean.csv"
    df.to_csv(out_csv, index=False)
    _summarize_basic(df, tag=f"{out_prefix}").to_csv(OUT_DIR / f"{out_prefix}_summary.csv", index=False)
    print(f"[OK] Saved regional QC → {out_csv}")

# ----------------------------
# Lake geometry → expected pixel count
# ----------------------------
def compute_expected_pixels_per_lake(lakes_gpkg: Path,
                                     inset_m: float = 0.0) -> pd.DataFrame:
    """
    Compute expected pixel counts per lake from geometry area, after optional inward buffer
    (set inset_m=0 to avoid double-shrinking if polygons are already buffered).
    Done in equal-area CRS (EPSG:5070) then back to 4326.

    Returns a DataFrame: lake_id, lake_name (if present), expected_pixels_geom, area_m2
    """
    g = gpd.read_file(lakes_gpkg)
    cols = list(g.columns)
    if "lake_id" not in cols:
        raise ValueError("Expect 'lake_id' in lakes_greatlakes_5poly.gpkg")

    name_col = None
    for cand in ["lake_name", "Lake_name", "name", "Name"]:
        if cand in cols:
            name_col = cand
            break

    gm = g.to_crs(5070)

    if inset_m and inset_m > 0:
        gm["geometry"] = gm.buffer(-abs(inset_m))

    gm["area_m2"] = gm.geometry.area
    gm["expected_pixels_geom"] = (gm["area_m2"] / (PIX_RES_M ** 2)).round().astype("Int64")

    df = gm.to_crs(4326)[["lake_id", "area_m2", "expected_pixels_geom"]].copy()
    if name_col:
        df[name_col] = g[name_col].values

    return df

# ----------------------------
# QC for lake-level time series
# ----------------------------
def _safe_nanmedian(arr) -> float:
    """Return NaN if all values are NaN or array empty; otherwise nanmedian."""
    a = pd.Series(arr).astype(float)
    a = a[np.isfinite(a)]
    return float(np.nanmedian(a)) if len(a) else float("nan")

def _coerce_n_valid(df: pd.DataFrame) -> pd.DataFrame:
    # 1) 如果已有 n_valid 且存在非零值，直接返回
    if "n_valid" in df.columns and (df["n_valid"].fillna(0) > 0).any():
        return df

    # 2) 常见别名兜底
    for alias in ["valid_count", "count_valid", "num_valid", "n", "count"]:
        if alias in df.columns and (df[alias].fillna(0) > 0).any():
            df = df.rename(columns={alias: "n_valid"})
            return df

    # 3) 若 n_valid 不存在或全为 0 → 改成 NaN，避免误触发覆盖率判定
    if "n_valid" not in df.columns or not (df["n_valid"].fillna(0) > 0).any():
        df["n_valid"] = np.nan
        print("[WARN] n_valid is missing or all zeros in lake parquet; "
              "set n_valid = NaN so coverage flags become NA.")
    return df

def qc_lake_timeseries(parquet_path: Path, lakes_gpkg: Path, out_prefix: str) -> None:
    """
    QC for lake-level CI time series (daily or monthly parquet).
    Adds:
      - expected_pixels based on lake geometry
      - pct_valid_geom = n_valid / expected_pixels
      - pct_valid_emp = n_valid / max(n_valid) per lake
      - QC flags: low coverage (geom/emp), tiny absolute pixels
    Outputs cleaned Parquet + summary CSV.
    """
    # Load data
    df = pd.read_parquet(parquet_path)
    df = _coerce_n_valid(df)

    # Robust datetime parse: prefer 'date', then 'time'
    df = _ensure_datetime(df, prefer_cols=("date", "time"))

    # Ensure single datetime column named 'date'
    if "time" in df.columns and "date" in df.columns:
        df = df.drop(columns=["time"])
    elif "time" in df.columns and "date" not in df.columns:
        df = df.rename(columns={"time": "date"})

    # Drop duplicated columns if any
    if df.columns.duplicated().any():
        dup = df.columns[df.columns.duplicated()].tolist()
        print(f"[QC] Dropping duplicated columns: {dup}")
        df = df.loc[:, ~df.columns.duplicated()].copy()

    # Ensure a 'product' column exists
    if "product" not in df.columns:
        df["product"] = "unknown"

    # Expected pixels per lake from geometry (no extra inset here)
    geom_df = compute_expected_pixels_per_lake(lakes_gpkg, inset_m=0.0)
    name_col = "lake_name" if "lake_name" in geom_df.columns else ("Lake_name" if "Lake_name" in geom_df.columns else None)

    # Merge expected pixels
    df = df.merge(geom_df, on="lake_id", how="left")

    # Empirical max n_valid per lake (skip zeros/NaNs)
    if "n_valid" in df.columns:
        emp_max = (
            df.loc[df["n_valid"].fillna(0) > 0]
              .groupby("lake_id")["n_valid"]
              .max()
              .rename("empiric_n_valid_max")
        )
        df = df.merge(emp_max, on="lake_id", how="left")

        # Coverage metrics
        df["pct_valid_geom"] = df["n_valid"] / df["expected_pixels_geom"]
        df["pct_valid_emp"]  = df["n_valid"] / df["empiric_n_valid_max"]

        for c in ("pct_valid_geom", "pct_valid_emp"):
            df.loc[~np.isfinite(df[c]), c] = np.nan

        # Coverage flags
        df["qc_low_cov_geom"] = pd.Series(pd.NA, index=df.index, dtype="Int64")
        ok_geom = df["pct_valid_geom"].notna()
        df.loc[ok_geom, "qc_low_cov_geom"] = (df.loc[ok_geom, "pct_valid_geom"] < MIN_PCT_VALID_GEOM).astype("Int64")

        df["qc_low_cov_emp"] = pd.Series(pd.NA, index=df.index, dtype="Int64")
        ok_emp = df["pct_valid_emp"].notna()
        df.loc[ok_emp, "qc_low_cov_emp"] = (df.loc[ok_emp, "pct_valid_emp"] < MIN_PCT_VALID_EMP).astype("Int64")

        df["qc_tiny_abs_pix"] = (pd.to_numeric(df["n_valid"], errors="coerce") < MIN_ABS_PIX).astype("Int64")
    else:
        for c in ["pct_valid_geom", "pct_valid_emp", "qc_low_cov_geom", "qc_low_cov_emp", "qc_tiny_abs_pix"]:
            df[c] = pd.NA

    # Clip CI columns lake-wise (robust to per-lake distributions)
    for c in ["CI_mean", "CI_p90"]:
        if c in df.columns:
            df[c] = df.groupby("lake_id", group_keys=False)[c].apply(
                lambda s: _clip_series_q(s, CLIP_QUANTILE_LOW, CLIP_QUANTILE_HIGH)
            )

    # Consolidated validity flag
    cov_flags = ["qc_low_cov_geom", "qc_low_cov_emp", "qc_tiny_abs_pix"]
    for c in cov_flags:
        if c not in df.columns:
            df[c] = pd.NA

    df["qc_is_valid"] = 1  # start as valid
    for c in cov_flags:
        df.loc[df[c] == 1, "qc_is_valid"] = 0

    # rows with no coverage info → fall back to "CI_mean is finite"
    rows_no_cov = df[cov_flags].isna().all(axis=1)
    ci_ok = pd.to_numeric(df.get("CI_mean"), errors="coerce")
    ci_ok = ci_ok.apply(lambda x: int(np.isfinite(x)))
    df.loc[rows_no_cov, "qc_is_valid"] = ci_ok.loc[rows_no_cov].values

    # Optional quick diagnostics (helps when a lake looks empty in plots)
    diag = (
        df.assign(CI_mean_finite = pd.to_numeric(df["CI_mean"], errors="coerce").apply(np.isfinite))
          .groupby("lake_id")
          .agg(n_rows=("lake_id","size"),
               n_CI_finite=("CI_mean_finite","sum"),
               n_valid_pos=("n_valid", lambda s: int((pd.to_numeric(s, errors="coerce")>0).sum())),
               qc_valid=("qc_is_valid","sum"))
          .reset_index()
    )
    print("\n[DIAG] per-lake counts:\n", diag.to_string(index=False))

    # Save cleaned parquet
    out_pq = OUT_DIR / f"{out_prefix}_clean.parquet"
    df.to_parquet(out_pq, index=False)

    # Build simple summary per-lake
    summary_rows = []
    for lid, g in df.groupby("lake_id"):
        n_rows = len(g)
        n_valid_rows = int((g["qc_is_valid"] == 1).sum())
        frac_valid = n_valid_rows / n_rows if n_rows else 0.0
        row = {
            "lake_id": lid,
            "rows": n_rows,
            "rows_valid": n_valid_rows,
            "frac_valid": round(frac_valid, 4),
            "pct_valid_geom_med": _safe_nanmedian(g["pct_valid_geom"]) if "pct_valid_geom" in g.columns else np.nan,
            "pct_valid_emp_med":  _safe_nanmedian(g["pct_valid_emp"])  if "pct_valid_emp"  in g.columns else np.nan,
        }
        if name_col and name_col in g.columns:
            row["lake_name"] = g[name_col].iloc[0]
        summary_rows.append(row)

    summary_df = pd.DataFrame(summary_rows).sort_values("lake_id")
    out_summary = OUT_DIR / f"{out_prefix}_summary.csv"
    summary_df.to_csv(out_summary, index=False)

    print(f"[OK] Saved lake-level QC → {out_pq}")
    print(f"[OK] Summary → {out_summary}")

# ----------------------------
# Main
# ----------------------------
if __name__ == "__main__":
    # 1) Regional time series QC
    if P_CONUS_DAILY.exists():
        qc_conus_timeseries(P_CONUS_DAILY,   freq="D",  out_prefix="conus_daily")
    else:
        print(f"[SKIP] Missing: {P_CONUS_DAILY}")

    if P_CONUS_MONTHLY.exists():
        qc_conus_timeseries(P_CONUS_MONTHLY, freq="MS", out_prefix="conus_monthly")
    else:
        print(f"[SKIP] Missing: {P_CONUS_MONTHLY}")

    # 2) Lake-level QC (Great Lakes)
    if P_LAKE_DAILY.exists():
        qc_lake_timeseries(P_LAKE_DAILY,  P_LAKES_GPKG, out_prefix="greatlakes_daily")
    else:
        print(f"[SKIP] Missing: {P_LAKE_DAILY}")

    if P_LAKE_MONTHLY.exists():
        qc_lake_timeseries(P_LAKE_MONTHLY, P_LAKES_GPKG, out_prefix="greatlakes_monthly")
    else:
        print(f"[SKIP] Missing: {P_LAKE_MONTHLY}")

[OK] Saved regional QC → /dkucc/home/zy166/HAB-forecasting/datasets/processed/qc/conus_daily_clean.csv
[OK] Saved regional QC → /dkucc/home/zy166/HAB-forecasting/datasets/processed/qc/conus_monthly_clean.csv

[DIAG] per-lake counts:
 lake_id  n_rows  n_CI_finite  n_valid_pos  qc_valid
   GL-1     361          336          336       147
   GL-2     361          343          343       122
   GL-3     361          346          346       128
   GL-4     361          326          326       136
   GL-5     361          352          352       114
[OK] Saved lake-level QC → /dkucc/home/zy166/HAB-forecasting/datasets/processed/qc/greatlakes_daily_clean.parquet
[OK] Summary → /dkucc/home/zy166/HAB-forecasting/datasets/processed/qc/greatlakes_daily_summary.csv

[DIAG] per-lake counts:
 lake_id  n_rows  n_CI_finite  n_valid_pos  qc_valid
   GL-1      12           12           12        11
   GL-2      12           12           12        10
   GL-3      12           12           12        10
   GL-

### Dataset Visualization: ILW Monthly & Daily for Five Great Lakes

In [2]:
"""
Visualization for ILW CI_cyano after QC.
Generates:
- CONUS daily & monthly time-series plots
- Per-lake (Great Lakes) daily & monthly time series with coverage
- Faceted (5-lake) small multiples
- Validity heatmap (lake x date)
"""

from pathlib import Path
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# -----------------------------
# Paths
# -----------------------------
ROOT = Path("/dkucc/home/zy166/HAB-forecasting")
QC_DIR = ROOT / "datasets/processed/qc"
FIG_DIR = ROOT / "visualization/ILW"
FIG_DIR.mkdir(parents=True, exist_ok=True)

P_CONUS_DAILY_CLEAN   = QC_DIR / "conus_daily_clean.csv"
P_CONUS_MONTHLY_CLEAN = QC_DIR / "conus_monthly_clean.csv"
P_GL_DAILY_CLEAN      = QC_DIR / "greatlakes_daily_clean.parquet"
P_GL_MONTHLY_CLEAN    = QC_DIR / "greatlakes_monthly_clean.parquet"

# -----------------------------
# Helpers
# -----------------------------
def _rolling(s, win=7):
    """Simple rolling mean with center window, preserving NaNs at edges."""
    try:
        return s.rolling(win, center=True, min_periods=max(1, win // 2)).mean()
    except Exception:
        return s

def _nice_ax(ax, title=None, xlabel=None, ylabel=None, grid=True):
    """Minimal axis styling."""
    if title:  ax.set_title(title, fontsize=12)
    if xlabel: ax.set_xlabel(xlabel)
    if ylabel: ax.set_ylabel(ylabel)
    if grid:   ax.grid(True, alpha=0.25, linestyle="--", linewidth=0.8)

def _lake_name_map(df):
    """Resolve a friendly lake name column."""
    for c in ["lake_name", "Lake_name", "name", "Name"]:
        if c in df.columns:
            return c
    return None

# -----------------------------
# 1) CONUS time series
# -----------------------------
def plot_conus_timeseries():
    """Plot CONUS daily & monthly CI time series."""
    if not P_CONUS_DAILY_CLEAN.exists() and not P_CONUS_MONTHLY_CLEAN.exists():
        print("[SKIP] No CONUS files found.")
        return

    # Daily
    if P_CONUS_DAILY_CLEAN.exists():
        d = pd.read_csv(P_CONUS_DAILY_CLEAN, parse_dates=["date"])
        d = d.sort_values("date")
        fig, ax = plt.subplots(figsize=(10, 4))
        if "CI_mean" in d.columns:
            ax.plot(d["date"], d["CI_mean"], linewidth=1.0, label="CI_mean (daily)")
            ax.plot(d["date"], _rolling(d["CI_mean"], 7), linewidth=2.0, label="CI_mean 7D MA")
        if "CI_p90" in d.columns:
            ax.plot(d["date"], _rolling(d["CI_p90"], 7), linewidth=1.5, linestyle=":", label="CI_p90 7D MA")
        _nice_ax(ax, title="CONUS — Daily CI_cyano (with 7D moving average)", xlabel="Date", ylabel="CI")
        ax.legend()
        fig.tight_layout()
        out = FIG_DIR / "conus_daily_timeseries.png"
        fig.savefig(out, dpi=180)
        plt.close(fig)
        print("[OK] Saved", out)

    # Monthly
    if P_CONUS_MONTHLY_CLEAN.exists():
        m = pd.read_csv(P_CONUS_MONTHLY_CLEAN, parse_dates=["date"])
        m = m.sort_values("date")
        fig, ax = plt.subplots(figsize=(10, 4))
        if "CI_mean" in m.columns:
            ax.plot(m["date"], m["CI_mean"], marker="o", linewidth=2.0, label="CI_mean (monthly)")
        if "CI_p90" in m.columns:
            ax.plot(m["date"], m["CI_p90"], marker="s", linewidth=1.8, linestyle="--", label="CI_p90 (monthly)")
        _nice_ax(ax, title="CONUS — Monthly CI_cyano", xlabel="Month", ylabel="CI")
        ax.legend()
        fig.tight_layout()
        out = FIG_DIR / "conus_monthly_timeseries.png"
        fig.savefig(out, dpi=180)
        plt.close(fig)
        print("[OK] Saved", out)

# -----------------------------
# 2) Great Lakes time series
# -----------------------------
def plot_lake_timeseries_daily():
    """Per-lake time series with coverage as secondary axis."""
    if not P_GL_DAILY_CLEAN.exists():
        print("[SKIP] No Great Lakes daily parquet.")
        return
    df = pd.read_parquet(P_GL_DAILY_CLEAN)
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df = df.dropna(subset=["date"])
    name_col = _lake_name_map(df)
    lakes = sorted(df["lake_id"].unique().tolist())

    for lid in lakes:
        g = df[df["lake_id"] == lid].sort_values("date")
        title = f"Lake {g[name_col].iloc[0] if name_col else lid} — Daily CI & Coverage"
        fig, ax1 = plt.subplots(figsize=(10, 4))
        # CI curves
        has_mean = "CI_mean" in g.columns and g["CI_mean"].notna().any()
        has_p90  = "CI_p90" in g.columns and g["CI_p90"].notna().any()
        if not (has_mean or has_p90):
            ax1.text(0.5, 0.5, "no finite CI values", ha="center", va="center", transform=ax1.transAxes)
        if has_mean:
            ax1.plot(g["date"], g["CI_mean"], linewidth=1.0, label="CI_mean")
            ax1.plot(g["date"], _rolling(g["CI_mean"], 7), linewidth=2.0, label="CI_mean 7D MA")
        if has_p90:
            ax1.plot(g["date"], _rolling(g["CI_p90"], 7), linewidth=1.5, linestyle=":", label="CI_p90 7D MA")
        _nice_ax(ax1, title=title, xlabel="Date", ylabel="CI")
        # Coverage on twin axis
        ax2 = ax1.twinx()
        if "pct_valid_geom" in g.columns and g["pct_valid_geom"].notna().any():
            ax2.plot(g["date"], g["pct_valid_geom"], linewidth=1.0, alpha=0.6, label="Coverage (geom)")
            ax2.set_ylabel("Coverage ratio")
            ax2.set_ylim(0, 1.05)
        lines1, labels1 = ax1.get_legend_handles_labels()
        lines2, labels2 = ax2.get_legend_handles_labels()
        ax1.legend(lines1 + lines2, labels1 + labels2, loc="upper right")
        fig.tight_layout()
        out = FIG_DIR / f"lake_{lid}_daily_timeseries.png"
        fig.savefig(out, dpi=180)
        plt.close(fig)
        print("[OK] Saved", out)

def plot_lake_timeseries_monthly():
    """Plot monthly CI time series for each Great Lake with robust checks."""
    if not P_GL_MONTHLY_CLEAN.exists():
        print("[SKIP] No Great Lakes monthly parquet.")
        return
    df = pd.read_parquet(P_GL_MONTHLY_CLEAN)
    if "date" not in df.columns and "time" in df.columns:
        df = df.rename(columns={"time": "date"})
    df["date"] = pd.to_datetime(df["date"], errors="coerce")
    df = df.dropna(subset=["date"])

    if "qc_is_valid" in df.columns:
        before = len(df)
        df = df[df["qc_is_valid"] == 1]
        print(f"[INFO] Filtered by qc_is_valid==1: {before} -> {len(df)} rows")

    has_mean = "CI_mean" in df.columns
    has_p90 = "CI_p90" in df.columns
    if not (has_mean or has_p90):
        print("[WARN] Neither CI_mean nor CI_p90 present.")
        return

    name_col = _lake_name_map(df)
    lakes = sorted(df["lake_id"].dropna().unique().tolist())

    for lid in lakes:
        g = df[df["lake_id"] == lid].sort_values("date")
        title_name = g[name_col].iloc[0] if name_col and len(g[name_col].dropna()) else lid
        fig, ax = plt.subplots(figsize=(10, 4))
        plotted = False
        if has_mean and g["CI_mean"].notna().any():
            ax.plot(g["date"], g["CI_mean"], marker="o", linewidth=2.0, label="CI_mean (monthly)")
            plotted = True
        if has_p90 and g["CI_p90"].notna().any():
            ax.plot(g["date"], g["CI_p90"], marker="s", linewidth=1.6, linestyle="--", label="CI_p90 (monthly)")
            plotted = True
        if not plotted:
            ax.text(0.5, 0.5, "no finite CI values", ha="center", va="center", transform=ax.transAxes)
        _nice_ax(ax, title=f"Lake {title_name} — Monthly CI", xlabel="Month", ylabel="CI")
        if plotted:
            ax.legend()
        fig.tight_layout()
        out = FIG_DIR / f"lake_{lid}_monthly_timeseries.png"
        fig.savefig(out, dpi=180)
        plt.close(fig)
        print("[OK] Saved", out)

# -----------------------------
# 3) Faceted small multiples & validity heatmap
# -----------------------------
def plot_lake_facets_daily():
    """Five-lake small multiples (CI_mean 7D MA + coverage shading)."""
    if not P_GL_DAILY_CLEAN.exists():
        print("[SKIP] No Great Lakes daily parquet.")
        return
    df = pd.read_parquet(P_GL_DAILY_CLEAN)
    df["date"] = pd.to_datetime(df["date"])
    name_col = _lake_name_map(df)
    desired = ["Superior", "Michigan", "Huron", "Erie", "Ontario"]
    if name_col:
        order = []
        for nm in desired:
            subset = df[df[name_col].str.fullmatch(nm, case=False, na=False)]
            if not subset.empty:
                order.extend(subset["lake_id"].unique().tolist())
        lakes = order or sorted(df["lake_id"].unique().tolist())[:5]
    else:
        lakes = sorted(df["lake_id"].unique().tolist())[:5]

    fig, axes = plt.subplots(3, 2, figsize=(12, 9))
    axes = axes.ravel()
    for i, lid in enumerate(lakes[:5]):
        ax = axes[i]
        g = df[df["lake_id"] == lid].sort_values("date")
        label = g[name_col].iloc[0] if name_col else str(lid)
        if "pct_valid_geom" in g.columns:
            ycov = (g["pct_valid_geom"].clip(0, 1.0) * (g["CI_mean"].max() * 0.2)).fillna(0.0)
            ax.fill_between(g["date"], 0, ycov, alpha=0.15, step="pre")
        if "CI_mean" in g.columns:
            ax.plot(g["date"], _rolling(g["CI_mean"], 7), linewidth=2.0, label="CI_mean 7D MA")
        if "CI_p90" in g.columns:
            ax.plot(g["date"], _rolling(g["CI_p90"], 7), linewidth=1.2, linestyle="--", label="CI_p90 7D MA")
        _nice_ax(ax, title=label, xlabel=None, ylabel="CI")
        if i == 0:
            ax.legend(fontsize=9)
    for j in range(len(lakes), len(axes)):
        axes[j].axis("off")
    fig.suptitle("Great Lakes — Daily CI (7D MA) with coverage shading", fontsize=14)
    fig.tight_layout(rect=[0, 0.02, 1, 0.97])
    out = FIG_DIR / "greatlakes_daily_facets.png"
    fig.savefig(out, dpi=180)
    plt.close(fig)
    print("[OK] Saved", out)

def plot_validity_heatmap_daily():
    """Heatmap of qc_is_valid by (lake x date)."""
    if not P_GL_DAILY_CLEAN.exists():
        print("[SKIP] No Great Lakes daily parquet.")
        return
    df = pd.read_parquet(P_GL_DAILY_CLEAN)
    df["date"] = pd.to_datetime(df["date"])
    name_col = _lake_name_map(df)
    if name_col:
        id2name = (df.dropna(subset=[name_col])
                     .drop_duplicates(subset=["lake_id"])[["lake_id", name_col]]
                     .set_index("lake_id")[name_col].to_dict())
    else:
        id2name = {}
    mat = df.pivot_table(index="lake_id", columns="date", values="qc_is_valid", aggfunc="max")
    if id2name:
        mat.index = [f"{lid} - {id2name.get(lid, '')}" for lid in mat.index]
    mat = mat.sort_index()
    fig, ax = plt.subplots(figsize=(12, 3 + 0.35 * len(mat)))
    im = ax.imshow(mat.values, aspect="auto", interpolation="nearest", vmin=0, vmax=1)
    ax.set_yticks(np.arange(len(mat)))
    ax.set_yticklabels(mat.index, fontsize=9)
    dates = mat.columns.to_pydatetime()
    step = max(1, len(dates) // 12)
    xticks = np.arange(0, len(dates), step)
    ax.set_xticks(xticks)
    ax.set_xticklabels([dates[i].strftime("%Y-%m-%d") for i in xticks], rotation=45, ha="right", fontsize=8)
    _nice_ax(ax, title="Great Lakes — Daily validity (qc_is_valid) heatmap", xlabel="Date", ylabel="Lake")
    cbar = fig.colorbar(im, ax=ax, shrink=0.8)
    cbar.set_label("qc_is_valid (1=valid, 0=invalid)", rotation=270, labelpad=12)
    fig.tight_layout()
    out = FIG_DIR / "greatlakes_daily_validity_heatmap.png"
    fig.savefig(out, dpi=180)
    plt.close(fig)
    print("[OK] Saved", out)

# -----------------------------
# Main
# -----------------------------
if __name__ == "__main__":
    plot_conus_timeseries()
    plot_lake_timeseries_daily()
    plot_lake_facets_daily()
    plot_validity_heatmap_daily()
    plot_lake_timeseries_monthly()

[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/conus_daily_timeseries.png
[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/conus_monthly_timeseries.png
[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/lake_GL-1_daily_timeseries.png
[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/lake_GL-2_daily_timeseries.png
[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/lake_GL-3_daily_timeseries.png
[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/lake_GL-4_daily_timeseries.png
[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/lake_GL-5_daily_timeseries.png
[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/greatlakes_daily_facets.png
[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/greatlakes_daily_validity_heatmap.png
[INFO] Filtered by qc_is_valid==1: 60 -> 52 rows
[OK] Saved /dkucc/home/zy166/HAB-forecasting/visualization/ILW/lake_GL-1_monthly_timeseries.png
[OK] Saved 

## Dataset: Daymet v4 Daily Surface Weather Data (ORNL DAAC)

Daymet: **Daily** Surface Weather Data on a 1-km Grid for North America, Version 4 R1
- Dataset Description: https://www.earthdata.nasa.gov/data/catalog/ornl-cloud-daymet-daily-v4r1-2129-4.1.
- More specifically, we need the data that fits into **North America (`na`)**, they are:
    - https://data.ornldaac.earthdata.nasa.gov/protected/daymet/Daymet_Daily_V4R1/data/daymet_v4_daily_na_tmin_2024.nc
    - https://data.ornldaac.earthdata.nasa.gov/protected/daymet/Daymet_Daily_V4R1/data/daymet_v4_daily_na_vp_2024.nc
    - https://data.ornldaac.earthdata.nasa.gov/protected/daymet/Daymet_Daily_V4R1/data/daymet_v4_daily_na_prcp_2024.nc
    - https://data.ornldaac.earthdata.nasa.gov/protected/daymet/Daymet_Daily_V4R1/data/daymet_v4_daily_na_srad_2024.nc
    - https://data.ornldaac.earthdata.nasa.gov/protected/daymet/Daymet_Daily_V4R1/data/daymet_v4_daily_na_tmax_2024.nc
    - https://data.ornldaac.earthdata.nasa.gov/protected/daymet/Daymet_Daily_V4R1/data/daymet_v4_daily_na_dayl_2024.nc
    - https://data.ornldaac.earthdata.nasa.gov/protected/daymet/Daymet_Daily_V4R1/data/daymet_v4_daily_na_swe_2024.nc

Daymet: **Monthly** Climate Summaries on a 1-km Grid for North America, Version 4 R1
- Dataset Description: https://www.earthdata.nasa.gov/data/catalog/ornl-cloud-daymet-monthly-v4r1-2131-4.1.
- More specifically, we need the data that fits into **North America (`na`)**, they are:
    - (Links coming soon.)

### Daymet Daily

Preliminary settings for the paths.

In [None]:
from pathlib import Path

import geopandas as gpd
import xarray as xr
import numpy as np
import pandas as pd

from shapely import geometry
from shapely import vectorized  # shapely.vectorized.contains

# paths
ROOT = Path("/dkucc/home/zy166/HAB-forecasting")

GPKG = ROOT / "datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg"
DAYMET_DIR = ROOT / "datasets/Daymet/2024_north_america_daily"
OUT_DAYMET_DAILY = ROOT / "datasets/Daymet/daymet_glakes_daily.parquet"

# lakes vector, reproject to EPSG:4326 (lat/lon)
lakes = gpd.read_file(GPKG)
lakes = lakes.to_crs(4326)

if "lake_id" not in lakes.columns:
    raise ValueError("Expect 'lake_id' in lakes_greatlakes_5poly.gpkg")

# find a name column, not required
name_col = None
for cand in ["lake_name", "Lake_name", "name", "Name"]:
    if cand in lakes.columns:
        name_col = cand
        break
if name_col is None:
    lakes["lake_name"] = lakes["lake_id"]
    name_col = "lake_name"

lakes = lakes[["lake_id", name_col, "geometry"]]

print(lakes)

  lake_id lake_name                                           geometry
0    GL-1      Erie  MULTIPOLYGON (((-83.46202 41.74248, -83.46203 ...
1    GL-2     Huron  MULTIPOLYGON (((-84.7498 45.83154, -84.74973 4...
2    GL-3  Michigan  MULTIPOLYGON (((-88.03947 44.56444, -88.03912 ...
3    GL-4   Ontario  MULTIPOLYGON (((-79.88425 43.27778, -79.88423 ...
4    GL-5  Superior  MULTIPOLYGON (((-92.19317 46.6747, -92.19318 4...


Build the "mask" for each lake in the Daymet map

- There are `lat(y,x)` and `lon(y,x)` in the `.nc` file of Daymet datasets.
- For each polygon of the lake, use `shapely.vectorized.contains(geom, lon, lat)` to get a `y × x` boolean mask.
- Therefore, for eacvh variable (e.g., min/tmax/...), we can use `da.where(mask)` to take the mean of `(y,x)` easily.

In [None]:
# Use one file (e.g., tmin) to get the lat/lon grid.
f_grid = DAYMET_DIR / "daymet_v4_daily_na_tmin_2024.nc"

# Use dask chunk to reduce the memory pressure (about 1km grid of North America)
ds_grid = xr.open_dataset(
    f_grid,
    chunks={"time": 365, "y": 1000, "x": 1000},
)

print(ds_grid)

# According to the Daymet documentation, there are usually `lat(y,x)` and `lon(y,x)`
# If the names are not exactly these two, you can print ds_grid.data_vars / ds_grid.coords to see the exact names
lat = ds_grid["lat"].values  # shape: (ny, nx)
lon = ds_grid["lon"].values  # shape: (ny, nx)

ny, nx = lat.shape
print(f"[INFO] Daymet grid shape: ny={ny}, nx={nx}")

# Prepare a mask DataArray for each lake
lake_masks = {}  # lake_id -> xr.DataArray(bool[y,x])

for _, row in lakes.iterrows():
    lake_id   = row["lake_id"]
    lake_name = row[name_col]
    geom      = row.geometry

    # Some MultiPolygon / topology may have problems, buffer(0)一下
    if not isinstance(geom, geometry.base.BaseGeometry):
        geom = geometry.shape(geom)
    geom = geom.buffer(0)

    print(f"[INFO] build mask for lake {lake_id} ({lake_name}) ...")

    # shapely.vectorized.contains expects (geom, x, y) = (polygon, lon, lat)
    mask_bool = vectorized.contains(geom, lon, lat)  # shape: (ny, nx) bool

    if not mask_bool.any():
        print(f"[WARN] mask for lake {lake_id} has no True pixels, check geometry/CRS")
    lake_masks[lake_id] = xr.DataArray(
        mask_bool,
        dims=("y", "x"),
        coords={"y": ds_grid["y"], "x": ds_grid["x"]},
        name=f"mask_{lake_id}",
    )

print(f"[OK] built masks for {len(lake_masks)} lakes")
ds_grid.close()

  ds_grid = xr.open_dataset(
  ds_grid = xr.open_dataset(


<xarray.Dataset> Size: 93GB
Dimensions:                  (time: 365, nv: 2, y: 8075, x: 7814)
Coordinates:
  * time                     (time) datetime64[ns] 3kB 2024-01-01T12:00:00 .....
  * y                        (y) float32 32kB 4.984e+06 4.983e+06 ... -3.09e+06
  * x                        (x) float32 31kB -4.56e+06 -4.559e+06 ... 3.253e+06
    lat                      (y, x) float32 252MB dask.array<chunksize=(1000, 1000), meta=np.ndarray>
    lon                      (y, x) float32 252MB dask.array<chunksize=(1000, 1000), meta=np.ndarray>
Dimensions without coordinates: nv
Data variables:
    yearday                  (time) int16 730B dask.array<chunksize=(365,), meta=np.ndarray>
    time_bnds                (time, nv) datetime64[ns] 6kB dask.array<chunksize=(365, 2), meta=np.ndarray>
    lambert_conformal_conic  int16 2B ...
    tmin                     (time, y, x) float32 92GB dask.array<chunksize=(365, 1000, 1000), meta=np.ndarray>
Attributes:
    start_year:        2024
  

  mask_bool = vectorized.contains(geom, lon, lat)  # shape: (ny, nx) bool


[INFO] build mask for lake GL-2 (Huron) ...
[INFO] build mask for lake GL-3 (Michigan) ...
[INFO] build mask for lake GL-4 (Ontario) ...
[INFO] build mask for lake GL-5 (Superior) ...
[OK] built masks for 5 lakes


Take the daily average of each variable dataset.

In [None]:
# Map variable names to file names
VAR_FILES = {
    "tmin": "daymet_v4_daily_na_tmin_2024.nc",
    "tmax": "daymet_v4_daily_na_tmax_2024.nc",
    "prcp": "daymet_v4_daily_na_prcp_2024.nc",
    "srad": "daymet_v4_daily_na_srad_2024.nc",
    "vp":   "daymet_v4_daily_na_vp_2024.nc",
    "dayl": "daymet_v4_daily_na_dayl_2024.nc",
}

def compute_lake_daily_for_var(var_name: str) -> pd.DataFrame:
    """
    Compute the daily mean time series of each lake for a Daymet variable (e.g., tmin).
    Return columns: ['lake_id', 'date', var_name]
    """
    fpath = DAYMET_DIR / VAR_FILES[var_name]
    print(f"[INFO] open {var_name} from {fpath}")

    ds = xr.open_dataset(
        fpath,
        chunks={"time": 30, "y": 1000, "x": 1000},
    )
    # The variable name is usually the same as the file name, if not, change it here
    da = ds[var_name]  # dims: time, y, x

    rows = []

    for _, row in lakes.iterrows():
        lake_id   = row["lake_id"]
        lake_name = row[name_col]
        mask_da   = lake_masks[lake_id]  # bool[y,x]

        print(f"[INFO] compute {var_name} daily mean for lake {lake_id} ({lake_name}) ...")

        # broadcast mask to (time, y, x), only keep the pixels inside the lake
        masked = da.where(mask_da)

        # Take the mean of the spatial dimensions, get the time series
        series = masked.mean(dim=("y", "x"), skipna=True)  # DataArray(time)

        df = series.to_dataframe(name=var_name).reset_index()  # columns: ['time', var_name]
        df["lake_id"]   = lake_id
        df["lake_name"] = lake_name
        rows.append(df)

    ds.close()
    out = pd.concat(rows, ignore_index=True)
    # Use the same name for the time column: date (no timezone)
    out["date"] = pd.to_datetime(out["time"], utc=True).dt.tz_localize(None)
    out = out.drop(columns=["time"])

    return out[["lake_id", "lake_name", "date", var_name]]

Now, we run through all the variables and then get a merged table according to `lake_id` and `date`.

In [None]:
# Get a `df` for each variable, then merge them step by step
dfs = []
for var_name in VAR_FILES.keys():
    df_var = compute_lake_daily_for_var(var_name)
    dfs.append(df_var)

# Use the first variable as the base, then merge with other variables step by step
from functools import reduce

df_daymet = reduce(
    lambda left, right: pd.merge(
        left,
        right,
        on=["lake_id", "lake_name", "date"],
        how="outer",
    ),
    dfs,
)

# Sort it for better readability
df_daymet = df_daymet.sort_values(["lake_id", "date"]).reset_index(drop=True)

print(df_daymet.head())
print(df_daymet.describe(include="all"))

# Save as parquet
OUT_DAYMET_DAILY.parent.mkdir(parents=True, exist_ok=True)
df_daymet.to_parquet(OUT_DAYMET_DAILY, index=False)
print(f"[OK] saved daily lake-mean Daymet → {OUT_DAYMET_DAILY}, rows={len(df_daymet)}")

[INFO] open tmin from /dkucc/home/zy166/HAB-forecasting/datasets/Daymet/2024_north_america_daily/daymet_v4_daily_na_tmin_2024.nc
[INFO] compute tmin daily mean for lake GL-1 (Erie) ...


  ds = xr.open_dataset(
  ds = xr.open_dataset(
  ds = xr.open_dataset(


[INFO] compute tmin daily mean for lake GL-2 (Huron) ...
[INFO] compute tmin daily mean for lake GL-3 (Michigan) ...
[INFO] compute tmin daily mean for lake GL-4 (Ontario) ...
[INFO] compute tmin daily mean for lake GL-5 (Superior) ...
[INFO] open tmax from /dkucc/home/zy166/HAB-forecasting/datasets/Daymet/2024_north_america_daily/daymet_v4_daily_na_tmax_2024.nc


  ds = xr.open_dataset(
  ds = xr.open_dataset(
  ds = xr.open_dataset(


[INFO] compute tmax daily mean for lake GL-1 (Erie) ...
[INFO] compute tmax daily mean for lake GL-2 (Huron) ...
[INFO] compute tmax daily mean for lake GL-3 (Michigan) ...
[INFO] compute tmax daily mean for lake GL-4 (Ontario) ...
[INFO] compute tmax daily mean for lake GL-5 (Superior) ...
[INFO] open prcp from /dkucc/home/zy166/HAB-forecasting/datasets/Daymet/2024_north_america_daily/daymet_v4_daily_na_prcp_2024.nc


  ds = xr.open_dataset(
  ds = xr.open_dataset(
  ds = xr.open_dataset(


[INFO] compute prcp daily mean for lake GL-1 (Erie) ...
[INFO] compute prcp daily mean for lake GL-2 (Huron) ...
[INFO] compute prcp daily mean for lake GL-3 (Michigan) ...
[INFO] compute prcp daily mean for lake GL-4 (Ontario) ...


In this following cell of the notebook, we:
- aggregate the daily Daymet data to weekly data;
- compute the 28-day rolling mean of the daily data; 
- compute the monthly Daymet data; and
- merge it with the ILW cleaned data.

In [3]:
import pandas as pd
from pathlib import Path

VARS  = ["tmin", "tmax", "prcp", "srad", "vp", "dayl"]

ROOT     = Path("/dkucc/home/zy166/HAB-forecasting")
P_DAYMET_DAILY = ROOT / "datasets/Daymet/daymet_glakes_daily.parquet"

# 1) Read the daily Daymet table
daymet_daily = pd.read_parquet(P_DAYMET_DAILY)
daymet_daily["date"] = pd.to_datetime(daymet_daily["date"])

# 2) Weekly aggregation (7D or natural week W)
# Option A: 7D rolling equal-distance window
daymet_weekly = (
    daymet_daily
    .set_index("date")
    .groupby("lake_id")[VARS]
    .resample("7D")
    .mean()
    .reset_index()
)
out_weekly_pq = P_DAYMET_DAILY.with_name("daymet_glakes_weekly_7D.parquet")
daymet_weekly.to_parquet(out_weekly_pq, index=False)
print(f"[OK] saved weekly Daymet (7D) → {out_weekly_pq}")

# Option B: Natural week (week start on Monday)
daymet_weekly_w = (
    daymet_daily
    .set_index("date")
    .groupby("lake_id")[VARS]
    .resample("W-MON")
    .mean()
    .reset_index()
)
out_weekly_w_pq = P_DAYMET_DAILY.with_name("daymet_glakes_weekly_W-MON.parquet")
daymet_weekly_w.to_parquet(out_weekly_w_pq, index=False)
print(f"[OK] saved weekly Daymet (W-MON) → {out_weekly_w_pq}")

# 3) 28-day moving average
daymet_daily = daymet_daily.sort_values(["lake_id", "date"])

roll28 = (
    daymet_daily
    .set_index("date")
    .groupby("lake_id")[VARS]
    .rolling("28D")
    .mean()
    .reset_index()
)
# After reset_index, there are 'lake_id','date',VARS
roll28 = roll28.rename(columns={"date": "date"})
out_roll28_pq = P_DAYMET_DAILY.with_name("daymet_glakes_roll28.parquet")
roll28.to_parquet(out_roll28_pq, index=False)
print(f"[OK] saved 28D rolling mean Daymet → {out_roll28_pq}")

# 4) Monthly Daymet (align with ILW monthly data)
daymet_monthly = (
    daymet_daily
    .set_index("date")
    .groupby("lake_id")[VARS]
    .resample("MS")    # Month Start, align with ILW QC's "start-of-month"
    .mean()
    .reset_index()
)
out_monthly_pq = P_DAYMET_DAILY.with_name("daymet_glakes_monthly.parquet")
daymet_monthly.to_parquet(out_monthly_pq, index=False)
print(f"[OK] saved monthly Daymet → {out_monthly_pq}")

# 5) Merge with ILW cleaned data

ROOT     = Path("/dkucc/home/zy166/HAB-forecasting")
QC_DIR   = ROOT / "datasets/processed/qc"
P_ILW_D  = QC_DIR / "greatlakes_daily_clean.parquet"
P_ILW_M  = QC_DIR / "greatlakes_monthly_clean.parquet"

# Daily merge
ilw_d = pd.read_parquet(P_ILW_D)
ilw_d["date"] = pd.to_datetime(ilw_d["date"])

# Only keep the observations with qc_is_valid == 1 (adjustable)
if "qc_is_valid" in ilw_d.columns:
    before = len(ilw_d)
    ilw_d = ilw_d[ilw_d["qc_is_valid"] == 1].copy()
    print(f"[INFO] ILW daily filter qc_is_valid==1: {before} -> {len(ilw_d)} rows")

# Merge with Daymet daily data
merged_daily = (
    ilw_d
    .merge(
        daymet_daily[["lake_id", "date"] + VARS],
        on=["lake_id", "date"],
        how="left",
    )
)
out_merged_daily = QC_DIR / "greatlakes_daily_with_daymet.parquet"
merged_daily.to_parquet(out_merged_daily, index=False)
print(f"[OK] saved ILW+Daymet daily → {out_merged_daily}, rows={len(merged_daily)}")

# Monthly merge
ilw_m = pd.read_parquet(P_ILW_M)
# ILW QC script already ensures 'date' is the first day of the month, but here we double-check
ilw_m["date"] = pd.to_datetime(ilw_m["date"])

if "qc_is_valid" in ilw_m.columns:
    before = len(ilw_m)
    ilw_m = ilw_m[ilw_m["qc_is_valid"] == 1].copy()
    print(f"[INFO] ILW monthly filter qc_is_valid==1: {before} -> {len(ilw_m)} rows")

merged_monthly = (
    ilw_m
    .merge(
        daymet_monthly[["lake_id", "date"] + VARS],
        on=["lake_id", "date"],
        how="left",
    )
)
out_merged_monthly = QC_DIR / "greatlakes_monthly_with_daymet.parquet"
merged_monthly.to_parquet(out_merged_monthly, index=False)
print(f"[OK] saved ILW+Daymet monthly → {out_merged_monthly}, rows={len(merged_monthly)}")

[OK] saved weekly Daymet (7D) → /dkucc/home/zy166/HAB-forecasting/datasets/Daymet/daymet_glakes_weekly_7D.parquet
[OK] saved weekly Daymet (W-MON) → /dkucc/home/zy166/HAB-forecasting/datasets/Daymet/daymet_glakes_weekly_W-MON.parquet
[OK] saved 28D rolling mean Daymet → /dkucc/home/zy166/HAB-forecasting/datasets/Daymet/daymet_glakes_roll28.parquet
[OK] saved monthly Daymet → /dkucc/home/zy166/HAB-forecasting/datasets/Daymet/daymet_glakes_monthly.parquet
[INFO] ILW daily filter qc_is_valid==1: 1805 -> 647 rows
[OK] saved ILW+Daymet daily → /dkucc/home/zy166/HAB-forecasting/datasets/processed/qc/greatlakes_daily_with_daymet.parquet, rows=647
[INFO] ILW monthly filter qc_is_valid==1: 60 -> 52 rows
[OK] saved ILW+Daymet monthly → /dkucc/home/zy166/HAB-forecasting/datasets/processed/qc/greatlakes_monthly_with_daymet.parquet, rows=52
