# Forecasting Freshwater Algal Bloom Levels Using Multisource Climate and Water-Quality Data

*This is the course project of **STATS 402: Interdisciplinary Data Analysis**.*

**Name:** Ziyue Yin

**NetID:** zy166

## Dataset: HydroLAKES

Lake polygons (including all attributes) in shapefileformat: https://www.hydrosheds.org/products/hydrolakes.

## Dataset: NASA OceanColor Inland Waters (ILW)

S3Merged-ILW data: https://oceandata.sci.gsfc.nasa.gov/directdataaccess/Level-3%20Mapped/Merged-S3-ILW/.

S3B-ILW data: https://oceandata.sci.gsfc.nasa.gov/directdataaccess/Level-3%20Mapped/S3B-ILW/.

After downloading the datasets, the structure should be shown as follows:

```
datasets/
 ├── ILW/
 │    ├── S3B/2024/CONUS_MO/
 │    │      ├── S3B_OLCI_EFRNT.20240101_20240131.L3m.MO.ILW_CONUS.V5.all.CONUS.300m.nc
 │    │      ├── S3B_OLCI_EFRNT.20240201_20240229.L3m.MO.ILW_CONUS.V5.all.CONUS.300m.nc
 │    ├── Merged/2024/CONUS_DAY/
 │    │      ├── S3M_OLCI_EFRNT.20240101.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc
 │    │      ├── S3M_OLCI_EFRNT.20240102.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc
 │    │      ...
```

### Data Structure Exploration

First of all, let's glance at the monthly dataset.

In [1]:
import xarray as xr

p = "/dkucc/home/zy166/HAB-forcasting/datasets/ILW/S3B/2024/CONUS_MO/S3B_OLCI_EFRNT.20240101_20240131.L3m.MO.ILW_CONUS.V5.all.CONUS.300m.nc"

ds = xr.open_dataset(p, engine="netcdf4", chunks="auto")
print(ds.dims)
print(list(ds.data_vars))

['rhos_400', 'rhos_412', 'rhos_443', 'rhos_490', 'rhos_510', 'rhos_560', 'rhos_620', 'rhos_665', 'rhos_674', 'rhos_681', 'rhos_709', 'rhos_754', 'rhos_865', 'rhos_884', 'CI_cyano', 'palette']


And also, the daily dataset.

In [2]:
import xarray as xr

p = "/dkucc/home/zy166/HAB-forcasting/datasets/ILW/Merged/2024/CONUS_DAY/S3M_OLCI_EFRNT.20240101.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc"

ds = xr.open_dataset(p, engine="netcdf4", chunks="auto")
print(ds.dims)
print(list(ds.data_vars))

['rhos_400', 'rhos_412', 'rhos_443', 'rhos_490', 'rhos_510', 'rhos_560', 'rhos_620', 'rhos_665', 'rhos_674', 'rhos_681', 'rhos_709', 'rhos_754', 'rhos_865', 'rhos_884', 'CI_cyano', 'palette']


### Target Feature Extraction

#### Utilities

In [3]:
import re
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
import geopandas as gpd
import rioxarray

Time Extraction: coords -> attrs -> file names

In [4]:
def infer_time_label(nc_path, ds, product="monthly"):
    """
    Return a pandas.Timestamp, try to infer from ds or filename.
    product: 'monthly' or 'daily'
    """
    # 1) Directly have time coordinate/variable
    for k in ("time",):
        if k in ds.coords or k in ds.variables:
            try:
                return pd.to_datetime(ds[k].values).to_pydatetime()
            except Exception:
                pass

    # 2) Global attributes (common in L3M data)
    start = ds.attrs.get("time_coverage_start") or ds.attrs.get("start_time")
    end   = ds.attrs.get("time_coverage_end")   or ds.attrs.get("end_time")
    if start and end:
        try:
            ts = pd.to_datetime(start)
            te = pd.to_datetime(end)
            if product == "monthly":
                return ts + (te - ts) / 2
            else:
                return ts
        except Exception:
            pass

    # 3) Analyze the filename
    fn = nc_path.split("/")[-1]
    if product == "monthly":
        # ...YYYYMMDD_YYYYMMDD.L3m.MO...
        m = re.search(r"\.(\d{8})_(\d{8})\.L3m\.MO\.", fn)
        if m:
            b, e = m.group(1), m.group(2)
            ts = pd.to_datetime(b, format="%Y%m%d")
            te = pd.to_datetime(e, format="%Y%m%d")
            return ts + (te - ts) / 2
    else:
        # ...YYYYMMDD.L3m.DAY...
        m = re.search(r"\.(\d{8})\.L3m\.DAY\.", fn)
        if m:
            return pd.to_datetime(m.group(1), format="%Y%m%d")

    raise ValueError("Cannot infer time from dataset or filename: " + fn)

Quality Prune

In [None]:
def clean_ci(da: xr.DataArray) -> xr.DataArray:
    """
    Filter out values out of physical range and remove near-zero values.
    """
    vmin = float(da.attrs.get("valid_min", np.nan))
    vmax = float(da.attrs.get("valid_max", np.nan))
    if np.isfinite(vmin):
        da = da.where(da >= vmin)
    if np.isfinite(vmax):
        da = da.where(da <= vmax)

    thr = max(vmin, 5e-5) if np.isfinite(vmin) else 5e-5
    da = da.where(da > thr)

    return da

For a single .nc file, get all the lakes

In [6]:
def extract_lakes_from_nc(nc_path: str,
                          lakes_gdf: gpd.GeoDataFrame,
                          lake_id_col: str,
                          product: str) -> pd.DataFrame:
    """
    nc_path: A single NetCDF file (S3B monthly or S3M daily)
    lakes_gdf: A GeoDataFrame containing `lake_id` and `geometry` (EPSG:4326)
    product: 'monthly' | 'daily'
    Returns: One row per lake (timestamp of the file)
    """
    ds = xr.open_dataset(nc_path, engine="netcdf4", chunks="auto")
    t  = infer_time_label(nc_path, ds, product=product)

    da = ds["CI_cyano"]
    da = set_spatial_dims_safe(da)
    da = clean_ci(da)

    rows = []
    for _, row in lakes_gdf.iterrows():
        lid  = row[lake_id_col]
        geom = [row.geometry]  # rioxarray.clip needs a list

        try:
            clipped = da.rio.clip(geom, lakes_gdf.crs, drop=True)
            valid   = clipped.where(np.isfinite(clipped))
            n_valid = int(valid.count().compute().values)
            if n_valid == 0:
                mean_val = np.nan
                p90      = np.nan
            else:
                mean_val = float(valid.mean().compute().values)
                p90      = float(valid.quantile(0.9).compute().values)
        except Exception:
            mean_val, p90, n_valid = np.nan, np.nan, 0

        rows.append({
            "lake_id": lid,
            "time":   pd.to_datetime(t),
            "product": product,
            "CI_mean": mean_val,
            "CI_p90":  p90,
            "n_valid": n_valid,
            "src":     Path(nc_path).name,
        })

    ds.close()
    return pd.DataFrame(rows)

Process monthly data in batches

In [7]:
def run_monthly(monthly_dir: str,
                lakes_fp: str,
                lake_id_col: str,
                out_parquet: str):
    """
    monthly_dir: Directory containing files like S3B_OLCI_EFRNT.*.L3m.MO.ILW_CONUS...nc
    lakes_fp:    Lake boundaries (gpkg/shp, must be EPSG:4326)
    """
    gdf = gpd.read_file(lakes_fp)
    if gdf.crs is None:
        raise ValueError("The lake file is missing CRS, please ensure it is EPSG:4326")
    gdf = gdf.to_crs(4326)[[lake_id_col, "geometry"]].dropna()

    out_rows = []
    for fp in sorted(Path(monthly_dir).glob("S3B_OLCI_EFRNT.*.L3m.MO.*.nc")):
        df_one = extract_lakes_from_nc(str(fp), gdf, lake_id_col, product="monthly")
        out_rows.append(df_one)

    if not out_rows:
        print("No monthly files found.")
        return

    df_all = pd.concat(out_rows, ignore_index=True)
    Path(out_parquet).parent.mkdir(parents=True, exist_ok=True)
    df_all.to_parquet(out_parquet, index=False)
    print(f"[monthly] saved → {out_parquet}  ({len(df_all)} rows)")

Process daily data in batches

In [8]:
def run_daily(daily_dir: str,
              lakes_fp: str,
              lake_id_col: str,
              out_parquet: str):
    """
    daily_dir: Directory containing files like S3M_OLCI_EFRNT.*.L3m.DAY.ILW_CONUS...nc
    """
    gdf = gpd.read_file(lakes_fp)
    if gdf.crs is None:
        raise ValueError("The lake file is missing CRS, please ensure it is EPSG:4326")
    gdf = gdf.to_crs(4326)[[lake_id_col, "geometry"]].dropna()

    out_rows = []
    for fp in sorted(Path(daily_dir).glob("S3M_OLCI_EFRNT.*.L3m.DAY.*.nc")):
        df_one = extract_lakes_from_nc(str(fp), gdf, lake_id_col, product="daily")
        out_rows.append(df_one)

    if not out_rows:
        print("No daily files found.")
        return

    df_all = pd.concat(out_rows, ignore_index=True)
    Path(out_parquet).parent.mkdir(parents=True, exist_ok=True)
    df_all.to_parquet(out_parquet, index=False)
    print(f"[daily] saved → {out_parquet}  ({len(df_all)} rows)")

Spatial Coordination

In [9]:
def set_spatial_dims_safe(da: xr.DataArray) -> xr.DataArray:
    """
    Try to set the spatial dimensions and CRS for L3m grids.
    First use `x/y`; if failed, try `lon/lat`; if still failed, degrade to the last two dimensions as `x/y`.
    """
    if "x" in da.dims and "y" in da.dims:
        out = da.rio.write_crs(4326)
        out = out.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=False)
        return out

    if "lon" in da.dims and "lat" in da.dims:
        out = da.rio.write_crs(4326)
        out = out.rio.set_spatial_dims(x_dim="lon", y_dim="lat", inplace=False)
        return out

    if len(da.dims) >= 2:
        dims = list(da.dims)
        ydim, xdim = dims[-2], dims[-1]
        out = da.rename({xdim: "x", ydim: "y"})
        out = out.rio.write_crs(4326)
        out = out.rio.set_spatial_dims(x_dim="x", y_dim="y", inplace=False)
        return out

    raise ValueError("Cannot determine spatial dims for CI_cyano")

#### Scale 1: In general

##### Monthly

Here, we use the **S3B Monthly** data. One month per row.

In [10]:
import glob, numpy as np, pandas as pd, xarray as xr
from pathlib import Path

monthly_dir = Path("/dkucc/home/zy166/HAB-forcasting/datasets/ILW/S3B/2024/CONUS_MO")
out_csv = monthly_dir/"ci_cyano_monthly_mean.csv"

rows = []
for fp in sorted(monthly_dir.glob("S3B_OLCI_EFRNT.*.L3m.MO.ILW_CONUS.V5.all.CONUS.300m.nc")):
    ds = xr.open_dataset(fp, engine="netcdf4", chunks="auto")
    da = ds["CI_cyano"]
    da = clean_ci(da)

    vmin = float(da.attrs.get("valid_min", np.nan))
    vmax = float(da.attrs.get("valid_max", np.nan))
    if np.isfinite(vmin): da = da.where(da >= vmin)
    if np.isfinite(vmax): da = da.where(da <= vmax)

    m   = float(da.where(np.isfinite(da)).mean().compute().values)
    p90 = float(da.where(np.isfinite(da)).quantile(0.9).compute().values)
    t   = infer_time_label(str(fp), ds, product="monthly")

    rows.append({"time": pd.to_datetime(t), "CI_mean": m, "CI_p90": p90,
                 "n_valid": int(da.count().compute().values)})
    ds.close()

df_mo = pd.DataFrame(rows).sort_values("time").reset_index(drop=True)
df_mo.to_csv(out_csv, index=False)
df_mo.head()

Unnamed: 0,time,CI_mean,CI_p90,n_valid
0,2024-01-16 16:53:28.500000+00:00,0.000591,0.000472,2030281
1,2024-02-15 17:12:11.500000+00:00,0.000539,0.000351,2204191
2,2024-03-16 16:44:09+00:00,0.000492,0.000166,1810493
3,2024-04-16 04:52:02+00:00,0.000496,0.00054,2151149
4,2024-05-16 17:01:29+00:00,0.0005,0.000836,2586974


##### Daily

Here, we use the **S3M Daily** data. One day per row.

In [11]:
# daily_dir = Path("/dkucc/home/zy166/HAB-forcasting/datasets/ILW/Merged/2024/CONUS_DAY")
# out_csv = daily_dir/"ci_cyano_daily_mean.csv"

# rows = []
# for fp in sorted(daily_dir.glob("S3M_OLCI_EFRNT.*.L3m.DAY.ILW_CONUS.V5.all.CONUS.300m.nc")):
#     ds = xr.open_dataset(fp, engine="netcdf4", chunks="auto")
#     da = ds["CI_cyano"]
#     da = clean_ci(da)

#     vmin = float(da.attrs.get("valid_min", np.nan))
#     vmax = float(da.attrs.get("valid_max", np.nan))
#     if np.isfinite(vmin): da = da.where(da >= vmin)
#     if np.isfinite(vmax): da = da.where(da <= vmax)

#     m   = float(da.where(np.isfinite(da)).mean().compute().values)
#     p90 = float(da.where(np.isfinite(da)).quantile(0.9).compute().values)
#     t   = infer_time_label(str(fp), ds, product="daily")

#     rows.append({"date": pd.to_datetime(t), "CI_mean": m, "CI_p90": p90,
#                  "n_valid": int(da.count().compute().values)})
#     ds.close()

# df_day = pd.DataFrame(rows).sort_values("date").reset_index(drop=True)
# df_day.to_csv(out_csv, index=False)
# df_day.head()

#### Scale 2: Five Great Lakes

First of all, we get the five Great Lakes out explicitly.

In [12]:
from pathlib import Path
import geopandas as gpd

src = "/dkucc/home/zy166/HAB-forcasting/datasets/Lakes/shapes/lakes_greatlakes.gpkg"
gdf = gpd.read_file(src)

keep_names = ["Superior","Michigan","Huron","Erie","Ontario"]
gdf5 = gdf[gdf["Lake_name"].str.fullmatch("|".join(keep_names), case=False)].copy()

# Dissolve into five polygons
gdf5 = gdf5.dissolve(by="Lake_name", as_index=False)

# Clean and rename the columns
gdf5 = gdf5.rename(columns={"Lake_name":"lake_name"})
gdf5 = gdf5.reset_index(drop=True)

# Directly overwrite/create the `lake_id` column
gdf5["lake_id"] = [f"GL-{i+1}" for i in range(len(gdf5))]   # GL-1..GL-5

# Buffer the shoreline by 300 m, first to equidistant projection, then buffer, then back to 4326
gdf5m = gdf5.to_crs(5070)
gdf5m["geometry"] = gdf5m.buffer(-300)
gdf5 = gdf5m.to_crs(4326)

out = Path(src).with_name("lakes_greatlakes_5poly.gpkg")
gdf5.to_file(out, driver="GPKG")
print("Saved:", out, "features:", len(gdf5))

Saved: /dkucc/home/zy166/HAB-forcasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg features: 5


##### Monthly

In [13]:
# Monthly
run_monthly(
    monthly_dir="/dkucc/home/zy166/HAB-forcasting/datasets/ILW/S3B/2024/CONUS_MO",
    lakes_fp="/dkucc/home/zy166/HAB-forcasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg",
    lake_id_col="lake_id",
    out_parquet="/dkucc/home/zy166/HAB-forcasting/datasets/processed/lake_ci_monthly.parquet"
)

[monthly] saved → /dkucc/home/zy166/HAB-forcasting/datasets/processed/lake_ci_monthly.parquet  (55 rows)


##### Daily

In [14]:
# Daily
# run_daily(
#     daily_dir="/dkucc/home/zy166/HAB-forcasting/datasets/ILW/Merged/2024/CONUS_DAY",
#     lakes_fp="/dkucc/home/zy166/HAB-forcasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg",
#     lake_id_col="lake_id",
#     out_parquet="/dkucc/home/zy166/HAB-forcasting/data/processed/lake_ci_daily.parquet"
# )

### Data Quality Control

In [31]:
import pandas as pd
import numpy as np

df = pd.read_parquet("/dkucc/home/zy166/HAB-forcasting/datasets/processed/qc/greatlakes_monthly_clean.parquet")
print("Columns:", list(df.columns))
print("Shape:", df.shape)
print(df.head())

for c in ["CI_mean", "CI_p90", "n_valid"]:
    if c in df.columns:
        v = pd.to_numeric(df[c], errors="coerce")
        print(f"{c}: non-null={v.notna().sum()}, finite={np.isfinite(v).sum()}, mean={v.mean() if np.isfinite(v).any() else np.nan}")


Columns: ['lake_id', 'product', 'CI_mean', 'CI_p90', 'n_valid', 'src', 'date', 'area_m2', 'expected_pixels_geom', 'lake_name', 'empiric_n_valid_max', 'pct_valid_geom', 'pct_valid_emp', 'qc_low_cov_geom', 'qc_low_cov_emp', 'qc_tiny_abs_pix', 'qc_is_valid']
Shape: (55, 17)
  lake_id  product  CI_mean  CI_p90  n_valid  \
0    GL-1  monthly      NaN     NaN      NaN   
1    GL-2  monthly      NaN     NaN      NaN   
2    GL-3  monthly      NaN     NaN      NaN   
3    GL-4  monthly      NaN     NaN      NaN   
4    GL-5  monthly      NaN     NaN      NaN   

                                                 src                    date  \
0  S3B_OLCI_EFRNT.20240101_20240131.L3m.MO.ILW_CO... 2024-01-16 16:53:28.500   
1  S3B_OLCI_EFRNT.20240101_20240131.L3m.MO.ILW_CO... 2024-01-16 16:53:28.500   
2  S3B_OLCI_EFRNT.20240101_20240131.L3m.MO.ILW_CO... 2024-01-16 16:53:28.500   
3  S3B_OLCI_EFRNT.20240101_20240131.L3m.MO.ILW_CO... 2024-01-16 16:53:28.500   
4  S3B_OLCI_EFRNT.20240101_20240131.L3m

In [28]:
"""
QC & Completeness checks for NASA ILW Cyanobacteria Index (CI_cyano)
- Regional (CONUS) daily & monthly time series
- Lake-level (Great Lakes) daily & monthly time series

Inputs (already prepared by you):
1) /dkucc/home/zy166/HAB-forcasting/datasets/ILW/Merged/2024/CONUS_DAY/ci_cyano_daily_mean.csv
2) /dkucc/home/zy166/HAB-forcasting/datasets/ILW/S3B/2024/CONUS_MO/ci_cyano_monthly_mean.csv
3) /dkucc/home/zy166/HAB-forcasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg
4) /dkucc/home/zy166/HAB-forcasting/datasets/processed/lake_ci_monthly.parquet
5) /dkucc/home/zy166/HAB-forcasting/datasets/processed/lake_ci_daily.parquet

Outputs:
- Cleaned regional daily/monthly CSVs with QC flags
- Cleaned lake-level daily/monthly Parquet with QC flags and completeness metrics
- Simple summary CSVs per product (row counts, missing rates, clipping, etc.)
"""

from pathlib import Path
import numpy as np
import pandas as pd
import geopandas as gpd

# ----------------------------
# Parameters (tune as needed)
# ----------------------------
# Resolution of ILW L3m grid (meters)
PIX_RES_M = 300.0

# Minimal valid pixel ratio when judged by geometry (n_valid / expected_pixels)
MIN_PCT_VALID_GEOM = 0.10   # drop if coverage < 10%

# Minimal absolute valid pixels per lake-time
MIN_ABS_PIX = 50            # drop very tiny coverage

# Empirical coverage threshold relative to best observed coverage for that lake
MIN_PCT_VALID_EMP = 0.10    # drop if n_valid < 10% of lake's max observed n_valid

# CI cleaning thresholds
NEAR_ZERO_THRESHOLD = 5e-5  # drop near-zero values (already applied in earlier step, kept as doc)
CLIP_QUANTILE_LOW  = 0.001  # lower clip for outliers
CLIP_QUANTILE_HIGH = 0.999  # upper clip for outliers

# Interpolation limits for regional (optional smoothing to fill short gaps)
INTERP_LIMIT_DAYS = 3

# ----------------------------
# Paths
# ----------------------------
P_CONUS_DAILY   = Path("/dkucc/home/zy166/HAB-forcasting/datasets/ILW/Merged/2024/CONUS_DAY/ci_cyano_daily_mean.csv")
P_CONUS_MONTHLY = Path("/dkucc/home/zy166/HAB-forcasting/datasets/ILW/S3B/2024/CONUS_MO/ci_cyano_monthly_mean.csv")
P_LAKES_GPKG    = Path("/dkucc/home/zy166/HAB-forcasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg")
P_LAKE_DAILY    = Path("/dkucc/home/zy166/HAB-forcasting/datasets/processed/lake_ci_daily.parquet")
P_LAKE_MONTHLY  = Path("/dkucc/home/zy166/HAB-forcasting/datasets/processed/lake_ci_monthly.parquet")

OUT_DIR = Path("/dkucc/home/zy166/HAB-forcasting/datasets/processed/qc")
OUT_DIR.mkdir(parents=True, exist_ok=True)

# ----------------------------
# Utilities
# ----------------------------
def _clip_series_q(s: pd.Series, qlow=CLIP_QUANTILE_LOW, qhigh=CLIP_QUANTILE_HIGH) -> pd.Series:
    """Clip a numeric series by quantiles; preserve NaNs."""
    if s.dropna().empty:
        return s
    lo = s.quantile(qlow)
    hi = s.quantile(qhigh)
    return s.clip(lower=lo, upper=hi)

def _ensure_datetime(df: pd.DataFrame,
                     prefer_cols=("date", "time")) -> pd.DataFrame:
    """
    Robust datetime parsing:
    - try 'date' then 'time' (or any column that contains these names)
    - use pandas 'mixed' parser and coerce to UTC tz-aware -> drop tz
    - drop NaT rows
    """
    # find a candidate column
    col = None
    for cand in prefer_cols:
        if cand in df.columns:
            col = cand
            break
    if col is None:
        # try fuzzy match by column name
        for c in df.columns:
            lc = str(c).lower()
            if "date" in lc or "time" in lc or "datetime" in lc or "timestamp" in lc:
                col = c
                break
    if col is None:
        raise ValueError("No recognizable datetime column (expect 'date'/'time').")

    # mixed-format tolerant parsing; force UTC then remove tz
    # errors='coerce' will set unparsable entries to NaT (we drop them)
    dt = pd.to_datetime(df[col], utc=True, format="mixed", errors="coerce")
    dt = dt.dt.tz_convert("UTC").dt.tz_localize(None)
    df = df.copy()
    df["date"] = dt
    # drop rows that failed to parse
    before = len(df)
    df = df.dropna(subset=["date"]).reset_index(drop=True)
    if len(df) < before:
        print(f"[QC] Dropped {before - len(df)} rows with unparseable datetime in column '{col}'")
    return df

def _summarize_basic(df: pd.DataFrame, tag: str) -> pd.DataFrame:
    """Basic counts and missingness summary for quick logging."""
    out = {
        "tag": tag,
        "rows": len(df),
        "n_missing_CI_mean": int(df["CI_mean"].isna().sum()) if "CI_mean" in df.columns else None,
        "n_missing_CI_p90":  int(df["CI_p90"].isna().sum()) if "CI_p90" in df.columns else None,
        "n_missing_n_valid": int(df["n_valid"].isna().sum()) if "n_valid" in df.columns else None,
    }
    return pd.DataFrame([out])

# ----------------------------
# QC for regional (CONUS) time series
# ----------------------------
def qc_conus_timeseries(csv_path: Path, freq: str, out_prefix: str) -> None:
    df = pd.read_csv(csv_path)

    expected = {"CI_mean", "CI_p90", "n_valid"}
    missing = expected - set(df.columns)
    if missing:
        print(f"[WARN] {csv_path.name} is missing columns: {missing}")

    # robust parse + drop NaT
    df = _ensure_datetime(df, prefer_cols=("date", "time"))

    # normalize timestamps to start-of-period
    if freq.upper() == "D":
        df["date"] = df["date"].dt.floor("D")
    elif freq.upper() == "MS":
        # start-of-month via how='start'
        df["date"] = df["date"].dt.to_period("M").dt.to_timestamp(how="start")
    else:
        # fallback: floor to given freq
        df["date"] = df["date"].dt.to_period(freq).dt.start_time

    df = df.sort_values("date").drop_duplicates(subset=["date"]).reset_index(drop=True)

    # build regular index over the observed span
    idx = pd.date_range(df["date"].min(), df["date"].max(), freq=freq)
    df = df.set_index("date").reindex(idx).rename_axis("date").reset_index()

    # short-gap interpolation on CI columns
    for c in ["CI_mean", "CI_p90"]:
        if c in df.columns:
            df[c] = df[c].interpolate(limit=INTERP_LIMIT_DAYS if freq.upper()=="D" else 1)

    # robust quantile clipping
    for c in ["CI_mean", "CI_p90"]:
        if c in df.columns:
            df[c] = _clip_series_q(df[c])

    if "n_valid" in df.columns:
        nv_thr = df["n_valid"].quantile(0.05) if df["n_valid"].notna().any() else 0
        df["qc_low_coverage"] = (df["n_valid"] < nv_thr).astype(int)

    out_csv = OUT_DIR / f"{out_prefix}_clean.csv"
    df.to_csv(out_csv, index=False)
    _summarize_basic(df, tag=f"{out_prefix}").to_csv(OUT_DIR / f"{out_prefix}_summary.csv", index=False)
    print(f"[OK] Saved regional QC → {out_csv}")

# ----------------------------
# Lake geometry → expected pixel count
# ----------------------------
def compute_expected_pixels_per_lake(lakes_gpkg: Path,
                                     inset_m: float = 300.0) -> pd.DataFrame:
    """
    Compute expected pixel counts per lake from geometry area, after optional inward buffer
    to reduce shoreline contamination. Done in equal-area CRS (EPSG:5070) then back to 4326.

    Returns a DataFrame: lake_id, lake_name (if present), expected_pixels_geom, area_m2
    """
    g = gpd.read_file(lakes_gpkg)
    # Normalize lake_id and names
    cols = list(g.columns)
    if "lake_id" not in cols:
        raise ValueError("Expect 'lake_id' in lakes_greatlakes_5poly.gpkg")

    name_col = None
    for cand in ["lake_name", "Lake_name", "name", "Name"]:
        if cand in cols:
            name_col = cand
            break

    # Reproject to equal-area CRS for stable buffering & area
    gm = g.to_crs(5070)

    # Optional inward buffer to avoid shoreline pixels
    if inset_m and inset_m > 0:
        gm["geometry"] = gm.buffer(-abs(inset_m))

    gm["area_m2"] = gm.geometry.area
    # Expected pixel count at given resolution
    gm["expected_pixels_geom"] = (gm["area_m2"] / (PIX_RES_M ** 2)).round().astype("Int64")

    df = gm.to_crs(4326)[["lake_id", "area_m2", "expected_pixels_geom"]].copy()
    if name_col:
        df[name_col] = g[name_col].values

    return df

# ----------------------------
# QC for lake-level time series
# ----------------------------
def _safe_nanmedian(arr) -> float:
    """Return NaN if all values are NaN or array empty; otherwise nanmedian."""
    a = pd.Series(arr).astype(float)
    a = a[np.isfinite(a)]
    return float(np.nanmedian(a)) if len(a) else float("nan")

def _coerce_n_valid(df: pd.DataFrame) -> pd.DataFrame:
    # 1) 如果已有 n_valid 且存在非零值，直接返回
    if "n_valid" in df.columns and (df["n_valid"].fillna(0) > 0).any():
        return df

    # 2) 常见别名兜底
    for alias in ["valid_count", "count_valid", "num_valid", "n", "count"]:
        if alias in df.columns and (df[alias].fillna(0) > 0).any():
            df = df.rename(columns={alias: "n_valid"})
            return df

    # 3) 若 n_valid 不存在或全为 0 → 改成 NaN，避免误触发覆盖率判定
    if "n_valid" not in df.columns or not (df["n_valid"].fillna(0) > 0).any():
        df["n_valid"] = np.nan
        print("[WARN] n_valid is missing or all zeros in lake parquet; "
              "set n_valid = NaN so coverage flags become NA.")
    return df

def qc_lake_timeseries(parquet_path: Path, lakes_gpkg: Path, out_prefix: str) -> None:
    """
    QC for lake-level CI time series (daily or monthly parquet).
    Adds:
      - expected_pixels based on lake geometry
      - pct_valid_geom = n_valid / expected_pixels
      - pct_valid_emp = n_valid / max(n_valid) per lake
      - QC flags: low coverage (geom/emp), tiny absolute pixels, and clipped values
    Outputs cleaned Parquet + summary CSV.
    """
    # Load data
    df = pd.read_parquet(parquet_path)
    df = _coerce_n_valid(df)
    # print("== Quick diag ==")
    # print("df cols:", list(df.columns))
    # print("unique lake_id in df (first 10):", pd.Series(df["lake_id"]).drop_duplicates().head(10).tolist())
    # geom_df = compute_expected_pixels_per_lake(lakes_gpkg, inset_m=PIX_RES_M)
    # print("unique lake_id in geom (first 10):", pd.Series(geom_df["lake_id"]).drop_duplicates().head(10).tolist())
    # print("n_valid present?", "n_valid" in df.columns, 
    #     "; nonzero n_valid rows:", int((df.get("n_valid", pd.Series(dtype=float)) > 0).sum()))

    # Robust datetime parse: prefer 'date', then 'time'
    df = _ensure_datetime(df, prefer_cols=("date", "time"))

    # Ensure we end up with exactly ONE datetime column named 'date'
    if "time" in df.columns and "date" in df.columns:
        # we already parsed into df["date"], so drop the raw 'time' column
        df = df.drop(columns=["time"])
    elif "time" in df.columns and "date" not in df.columns:
        # no 'date' yet — rename 'time' -> 'date'
        df = df.rename(columns={"time": "date"})
    # else: only 'date' existed — nothing to do

    # (optional) guard: drop any accidental duplicate columns by keeping first
    if df.columns.duplicated().any():
        dup = df.columns[df.columns.duplicated()].tolist()
        print(f"[QC] Dropping duplicated columns: {dup}")
        df = df.loc[:, ~df.columns.duplicated()].copy()

    # Ensure a 'product' column exists
    if "product" not in df.columns:
        df["product"] = "unknown"

    # Expected pixels per lake from geometry
    geom_df = compute_expected_pixels_per_lake(lakes_gpkg, inset_m=PIX_RES_M)  # inset = 1 pixel
    name_col = "lake_name" if "lake_name" in geom_df.columns else ("Lake_name" if "Lake_name" in geom_df.columns else None)

    # Merge expected pixels
    merge_cols = ["lake_id"]
    df = df.merge(geom_df, on="lake_id", how="left")

    # Empirical max n_valid per lake (skip zeros/NaNs)
    if "n_valid" in df.columns:
        # Only consider strictly positive counts to define an empirical max
        emp_max = (
            df.loc[df["n_valid"].fillna(0) > 0]
            .groupby("lake_id")["n_valid"]
            .max()
            .rename("empiric_n_valid_max")
        )
        df = df.merge(emp_max, on="lake_id", how="left")

        # Coverage metrics
        # pct_valid_geom: finite only when expected_pixels_geom > 0
        df["pct_valid_geom"] = df["n_valid"] / df["expected_pixels_geom"]
        # pct_valid_emp: finite only when empiric_n_valid_max > 0
        denom = df["empiric_n_valid_max"]
        df["pct_valid_emp"] = df["n_valid"] / denom
        # clean up invalid values
        for c in ("pct_valid_geom", "pct_valid_emp"):
            if c in df.columns:
                df.loc[~np.isfinite(df[c]), c] = np.nan

        # QC coverage flags（仅当可计算时才打标，否则置 NA）
        df["qc_low_cov_geom"] = pd.Series(pd.NA, index=df.index, dtype="Int64")
        ok_geom = df["pct_valid_geom"].notna()
        df.loc[ok_geom, "qc_low_cov_geom"] = (df.loc[ok_geom, "pct_valid_geom"] < MIN_PCT_VALID_GEOM).astype("Int64")

        df["qc_low_cov_emp"] = pd.Series(pd.NA, index=df.index, dtype="Int64")
        ok_emp = df["pct_valid_emp"].notna()
        df.loc[ok_emp, "qc_low_cov_emp"] = (df.loc[ok_emp, "pct_valid_emp"] < MIN_PCT_VALID_EMP).astype("Int64")

        df["qc_tiny_abs_pix"] = (df["n_valid"] < MIN_ABS_PIX).astype("Int64")
    else:
        for c in ["pct_valid_geom", "pct_valid_emp", "qc_low_cov_geom", "qc_low_cov_emp", "qc_tiny_abs_pix"]:
            df[c] = pd.NA

    # Clip CI columns lake-wise (robust to per-lake distributions)
    for c in ["CI_mean", "CI_p90"]:
        if c in df.columns:
            df[c] = df.groupby("lake_id", group_keys=False)[c].apply(
                lambda s: _clip_series_q(s, CLIP_QUANTILE_LOW, CLIP_QUANTILE_HIGH)
            )

    # A consolidated QC validity flag
    # Valid if NOT any of (low cov geom, low cov emp, tiny abs pix)
    cov_flags = ["qc_low_cov_geom", "qc_low_cov_emp", "qc_tiny_abs_pix"]
    for c in cov_flags:
        if c not in df.columns:
            df[c] = pd.NA

    # start as valid
    df["qc_is_valid"] = 1

    # any hard-fail coverage flag → invalid
    for c in cov_flags:
        df.loc[df[c] == 1, "qc_is_valid"] = 0

    # rows with no coverage info at all → fall back to "CI_mean is finite"
    rows_no_cov = df[cov_flags].isna().all(axis=1)
    ci_ok = pd.to_numeric(df.get("CI_mean"), errors="coerce").notna().astype(int)
    df.loc[rows_no_cov, "qc_is_valid"] = ci_ok.loc[rows_no_cov].values

    # Save cleaned parquet
    out_pq = OUT_DIR / f"{out_prefix}_clean.parquet"
    df.to_parquet(out_pq, index=False)

    # Build simple summary per-lake
    summary_rows = []
    for lid, g in df.groupby("lake_id"):
        n_rows = len(g)
        n_valid_rows = int((g["qc_is_valid"] == 1).sum())
        frac_valid = n_valid_rows / n_rows if n_rows else 0.0
        row = {
            "lake_id": lid,
            "rows": n_rows,
            "rows_valid": n_valid_rows,
            "frac_valid": round(frac_valid, 4),
            "pct_valid_geom_med": _safe_nanmedian(g["pct_valid_geom"]) if "pct_valid_geom" in g.columns else np.nan,
            "pct_valid_emp_med":  _safe_nanmedian(g["pct_valid_emp"])  if "pct_valid_emp"  in g.columns else np.nan,
        }
        if name_col and name_col in g.columns:
            row["lake_name"] = g[name_col].iloc[0]
        summary_rows.append(row)

    summary_df = pd.DataFrame(summary_rows).sort_values("lake_id")
    out_summary = OUT_DIR / f"{out_prefix}_summary.csv"
    summary_df.to_csv(out_summary, index=False)

    print(f"[OK] Saved lake-level QC → {out_pq}")
    print(f"[OK] Summary → {out_summary}")

# ----------------------------
# Main
# ----------------------------
if __name__ == "__main__":
    # 1) Regional time series QC
    if P_CONUS_DAILY.exists():
        qc_conus_timeseries(P_CONUS_DAILY,   freq="D",  out_prefix="conus_daily")
    else:
        print(f"[SKIP] Missing: {P_CONUS_DAILY}")

    if P_CONUS_MONTHLY.exists():
        qc_conus_timeseries(P_CONUS_MONTHLY, freq="MS", out_prefix="conus_monthly")
    else:
        print(f"[SKIP] Missing: {P_CONUS_MONTHLY}")

    # 2) Lake-level QC (Great Lakes)
    if P_LAKE_DAILY.exists():
        qc_lake_timeseries(P_LAKE_DAILY,  P_LAKES_GPKG, out_prefix="greatlakes_daily")
    else:
        print(f"[SKIP] Missing: {P_LAKE_DAILY}")

    if P_LAKE_MONTHLY.exists():
        qc_lake_timeseries(P_LAKE_MONTHLY, P_LAKES_GPKG, out_prefix="greatlakes_monthly")
    else:
        print(f"[SKIP] Missing: {P_LAKE_MONTHLY}")

[OK] Saved regional QC → /dkucc/home/zy166/HAB-forcasting/datasets/processed/qc/conus_daily_clean.csv
[OK] Saved regional QC → /dkucc/home/zy166/HAB-forcasting/datasets/processed/qc/conus_monthly_clean.csv
[SKIP] Missing: /dkucc/home/zy166/HAB-forcasting/datasets/processed/lake_ci_daily.parquet
[WARN] n_valid is missing or all zeros in lake parquet; set n_valid = NaN so coverage flags become NA.
[OK] Saved lake-level QC → /dkucc/home/zy166/HAB-forcasting/datasets/processed/qc/greatlakes_monthly_clean.parquet
[OK] Summary → /dkucc/home/zy166/HAB-forcasting/datasets/processed/qc/greatlakes_monthly_summary.csv


### Data Visualization

Available at independent python file `HAB-forcasting/data_visualization_ILW.py`.

## Dataset: Daymet v4 Daily Surface Weather Data (ORNL DAAC)

Read Great Lakes Polygon data (GPKG)

In [26]:
import geopandas as gpd
import pydaymet as daymet
import xarray as xr
import pandas as pd
from pathlib import Path

GPKG = "/dkucc/home/zy166/HAB-forcasting/datasets/Lakes/shapes/lakes_greatlakes_5poly.gpkg"

lakes = gpd.read_file(GPKG)
# 确保是经纬度坐标（EPSG:4326）；PyDaymet也支持其它CRS，传参loc_crs即可
lakes = lakes.to_crs(4326)

# 准备日期与变量清单
# DATES = ("2016-01-01", "2025-12-31")
DATES = ("2024-01-01", "2024-12-31")
VARS  = ["tmin","tmax","prcp","srad","vp","dayl"]

# 如果没有湖名列，就造一个 id
if "name" not in lakes.columns:
    lakes["name"] = [f"lake_{i}" for i in range(len(lakes))]

逐湖请求 Daymet 网格并做“湖面平均”

In [27]:
out_dir = Path("/dkucc/home/zy166/HAB-forcasting/datasets/Daymet/glakes_nc")
out_dir.mkdir(parents=True, exist_ok=True)

dfs_daily = []   # 存各湖“日尺度湖面平均”的表

for idx, row in lakes.iterrows():
    geom   = row.geometry
    lakeid = row["name"]

    # 取 Daymet 栅格（daily）
    ds = daymet.get_bygeom(
        geom, DATES, variables=VARS,  # time_scale 默认 daily
        # 如你的 GPKG 不是4326，可加 loc_crs=你的EPSG
    )

    # 按空间维度(y,x)做湖面平均；skipna 处理缺测
    ds_mean = ds[VARS].mean(dim=["y","x"], skipna=True)

    # 存 netCDF 以便复用
    ds.to_netcdf(out_dir / f"daymet_{lakeid}_daily.nc")

    # 转成 DataFrame（带时间索引）
    df = ds_mean.to_dataframe().reset_index()
    df = df.rename(columns={"time": "date"})
    df["lake_id"] = lakeid
    dfs_daily.append(df)

daymet_daily = pd.concat(dfs_daily, ignore_index=True)

ServiceError: Service returned the following error message:
URL: https://thredds.daac.ornl.gov/thredds/ncss/ornldaac/2129/daymet_v4_daily_na_vp_2024.nc?var=vp&north=42.902360&west=-83.475523&east=-78.857244&south=41.383397&disableProjSubset=on&horizStride=1&time_start=2024-01-01T12%3A00%3A00Z&time_end=2024-12-30T12%3A00%3A00Z&timeStride=1&addLatLon=true&accept=netcdf
ERROR: 400, message='', url='https://opendap.earthdata.nasa.gov/thredds/ncss/ornldaac/2129/daymet_v4_daily_na_dayl_2024.nc?var=dayl&north=42.902360&west=-83.475523&east=-78.857244&south=41.383397&disableProjSubset=on&horizStride=1&time_start=2024-01-01T12:00:00Z&time_end=2024-12-30T12:00:00Z&timeStride=1&addLatLon=true&accept=netcdf'


与 ILW 时间步对齐：周均 / 28 天滑动均值

In [None]:
# 先确保日期是 datetime
daymet_daily["date"] = pd.to_datetime(daymet_daily["date"])

# --- 周聚合（自然周，对齐周一/周日可改 label/closed） ---
wk = (daymet_daily
      .set_index("date")
      .groupby("lake_id")
      [VARS].resample("7D").mean()
      .reset_index()
     )

# --- 28天移动平均（滑窗，不跳步；末端可能因窗口不满而NA） ---
daymet_daily = daymet_daily.sort_values(["lake_id","date"])
roll28 = (daymet_daily
          .groupby("lake_id")[VARS]
          .rolling("28D", on=daymet_daily["date"])
          .mean()
          .reset_index()
         )
roll28 = roll28.rename(columns={"level_1":"date"})

SyntaxError: invalid syntax (1453463249.py, line 1)