In [1]:
# Orchestration notebook cell: Lake Huron FVCOM files → rename → normalize → fix times → verify → filelist → window check

import os, re, glob, shutil, sys
from pathlib import Path
from datetime import datetime, timezone
from typing import Iterable, Sequence
import numpy as np
import xarray as xr
import netCDF4

# -----------------------------
# CONFIG — EDIT AS NEEDED
# -----------------------------
ROOT = Path("/mnt/hydroglg/Data/External_Models/Outputs/GLCFS/LakeHuron/test")
# Which ensembles to include in the final file list (e.g. ["n000"] for a single set)
ENSEMBLES: Sequence[str] = ["n000"]            # e.g., ["n000"] or ["n000","n001",...]
FILELIST_NAME = "fvcom_files.txt"             # written under ROOT
# Optional window check
CHECK_START = "2024-09-01T00:00:00"            # ISO-like string or None
CHECK_END   = "2024-10-30T00:00:00"            # ISO-like string or None
# Safety
DRY_RUN = False                                # If True, don’t write .nc
STOP_ON_CORRUPT = False

# Introduction
Some of the FVCOM plots does not follow the same name characteristics adn also since September 2024 to the end of the year Itime is not representing correctly in the data therefore PyLag is not able to run the model so this code fixes the "Time out of range" issue.

# -----------------------------
# HELPERS
# -----------------------------

In [2]:
def log(msg: str):
    print(msg, flush=True)

def list_nc_files(root: Path) -> list[Path]:
    return sorted([p for p in root.iterdir() if p.suffix.lower()==".nc"])

def safe_move(src: Path, dst: Path):
    if src == dst:
        return False
    if dst.exists():
        log(f"skip (exists): {dst.name}")
        return False
    if not DRY_RUN:
        dst.parent.mkdir(parents=True, exist_ok=True)
        shutil.move(str(src), str(dst))
    log(f"renamed: {src.name} -> {dst.name}")
    return True

def is_corrupt_nc(path: Path) -> bool:
    try:
        with xr.open_dataset(path, decode_times=False, engine="netcdf4") as ds:
            _ = list(ds.variables)
        return False
    except Exception as e:
        if "NetCDF: HDF error" in str(e):
            return True
        # other failures also count as corrupt for our purposes
        return True

# -----------------------------
# STEP 1 — strip leading "nos." if present
# -----------------------------
def rename_strip_nos(root: Path) -> int:
    changed = 0
    for p in list_nc_files(root):
        if p.name.startswith("nos.lmhofs"):
            new = p.with_name(p.name.replace("nos.", "", 1))
            if safe_move(p, new):
                changed += 1
    log(f"[step1] nos.* → (stripped): {changed} file(s)")
    return changed

# -----------------------------
# STEP 2 — normalize names to: lmhofs.fields.nNNN.YYYYMMDD.tHHz.nc
# -----------------------------


In [3]:
PAT_REVERSED = re.compile(r"^(lmhofs)\.t(\d{2})z\.(\d{8})\.fields\.(n\d+)\.nc$", re.IGNORECASE)
PAT_CANONICAL = re.compile(r"^(lmhofs)\.fields\.(n\d+)\.(\d{8})\.t(\d{2})z\.nc$", re.IGNORECASE)

def normalize_names(root: Path) -> int:
    changed = 0
    for p in list_nc_files(root):
        m = PAT_REVERSED.match(p.name)
        if m:
            pre, hh, ymd, nnn = m.groups()
            newname = f"{pre}.fields.{nnn}.{ymd}.t{hh}z.nc"
            if newname != p.name:
                if safe_move(p, p.with_name(newname)):
                    changed += 1
    log(f"[step2] normalized names: {changed} file(s)")
    return changed

# -----------------------------
# STEP 3 — fix time variables from filename (write CF time + FVCOM Itime/Itime2)
#         CF epoch: 1970-01-01 (seconds)
#         FVCOM epoch (MJD): 1858-11-17
# -----------------------------

In [4]:
FN_CANON = re.compile(r"^lmhofs\.fields\.(n\d+)\.(\d{8})\.t(\d{2})z\.nc$", re.IGNORECASE)
MJD_EPOCH64 = np.datetime64("1858-11-17T00:00:00", "s")

def parse_filename_dt64(name: str) -> np.datetime64 | None:
    m = FN_CANON.match(name)
    if not m: 
        return None
    nnn, ymd, hh = m.groups()
    Y, M, D = int(ymd[:4]), int(ymd[4:6]), int(ymd[6:8])
    H = int(hh)
    return np.datetime64(f"{Y:04d}-{M:02d}-{D:02d}T{H:02d}:00:00", "s")

def to_mjd_parts(dt64: np.datetime64) -> tuple[int, int]:
    delta_s = (dt64 - MJD_EPOCH64).astype("timedelta64[s]").astype(np.int64)
    days = int(delta_s // 86400)
    sod  = int(delta_s %  86400)
    return days, sod

def fix_time_one(path: Path) -> bool:
    # quick corruption filter
    if is_corrupt_nc(path):
        msg = f"[step3] CORRUPT → skip: {path.name}"
        if STOP_ON_CORRUPT: 
            raise RuntimeError(msg)
        log(msg)
        return False

    target_dt = parse_filename_dt64(path.name)
    if target_dt is None:
        log(f"[step3] skip (unrecognized name): {path.name}")
        return False

    it, it2 = to_mjd_parts(target_dt)
    # CF seconds since epoch
    cf_secs = int((target_dt - np.datetime64("1970-01-01T00:00:00","s")).astype("timedelta64[s]").astype(np.int64))

    try:
        with xr.open_dataset(path, decode_times=False, engine="netcdf4") as ds:
            # Ensure a time dimension (length ≥1). If >1, we overwrite all with same timestamp.
            if "time" not in ds.dims:
                ds = ds.expand_dims({"time": 1})

            # Overwrite CF time
            ds = ds.assign_coords(time=("time", np.array([cf_secs], dtype="int64")))
            ds["time"].attrs.update({
                "standard_name": "time",
                "long_name": "time",
                "units": "seconds since 1970-01-01 00:00:00",
                "calendar": "gregorian",
            })

            # Overwrite FVCOM Itime/Itime2
            ds["Itime"]  = ("time", np.array([it], dtype="int32"))
            ds["Itime2"] = ("time", np.array([it2], dtype="int32"))
            ds["Itime"].attrs.update({
                "long_name": "integer days since 1858-11-17 00:00:00",
                "units": "days since 1858-11-17 00:00:00",
            })
            ds["Itime2"].attrs.update({
                "long_name": "seconds since start of day",
                "units": "seconds",
            })

            enc = {"time": {"dtype":"int64"}, "Itime": {"dtype":"int32"}, "Itime2": {"dtype":"int32"}}
            if not DRY_RUN:
                tmp = path.with_suffix(path.suffix + ".tmp")
                ds.to_netcdf(tmp, encoding=enc)
        if not DRY_RUN:
            os.replace(tmp, path)
        return True
    except Exception as e:
        log(f"[step3] ❌ {path.name}: {e}")
        return False

def fix_times_all(root: Path) -> tuple[int,int]:
    ok = bad = 0
    for p in list_nc_files(root):
        if FN_CANON.match(p.name):
            if fix_time_one(p):
                ok += 1
            else:
                bad += 1
    log(f"[step3] time fixed: ok={ok}, skipped/failed={bad}")
    return ok, bad

# -----------------------------
# STEP 4 — verify consistency (filename vs decoded time; cadence; duplicates)
# -----------------------------

In [5]:
def parse_expected_from_name(path: Path) -> np.datetime64 | None:
    m = FN_CANON.match(path.name)
    if not m: return None
    _, ymd, hh = m.groups()
    return np.datetime64(f"{ymd[:4]}-{ymd[4:6]}-{ymd[6:8]}T{hh}:00:00","s")

def decode_primary_time(ds: xr.Dataset) -> np.datetime64 | None:
    # prefer CF time
    if "time" in ds:
        u = ds["time"].attrs.get("units", "")
        if "since" in u:
            try:
                vals = np.asarray(ds["time"].values)
                py = netCDF4.num2date(vals, u, calendar=ds["time"].attrs.get("calendar","standard"))
                # convert to seconds int (POSIX)
                secs = np.array([int(p.timestamp()) for p in np.atleast_1d(py)], dtype="int64")
                return secs.astype("datetime64[s]")[0]
            except Exception:
                pass
    # fallback Itime
    if ("Itime" in ds):
        it  = np.asarray(ds["Itime"].values).astype("int64")
        it2 = np.asarray(ds["Itime2"].values).astype("int64") if "Itime2" in ds else np.zeros_like(it)
        return (MJD_EPOCH64 + it.astype("timedelta64[D]") + it2.astype("timedelta64[s]")).astype("datetime64[s]")[0]
    return None

def verify_files(root: Path) -> None:
    files = [p for p in list_nc_files(root) if FN_CANON.match(p.name)]
    times = []
    issues = []
    for p in files:
        if is_corrupt_nc(p):
            issues.append(("CORRUPT", p.name, "HDF error / unreadable"))
            continue
        try:
            with xr.open_dataset(p, decode_times=False, engine="netcdf4") as ds:
                exp = parse_expected_from_name(p)
                got = decode_primary_time(ds)
                if got is None:
                    issues.append(("DECODE", p.name, "could not decode CF/Itime time"))
                else:
                    dt = abs((got - exp).astype("timedelta64[s]").astype(int))
                    if dt != 0:
                        issues.append(("MISMATCH", p.name, f"decoded={str(got)} expected={str(exp)} Δ={dt}s"))
                    times.append(got)
        except Exception as e:
            issues.append(("OPEN", p.name, str(e)))
    # coverage & cadence
    if times:
        t = np.sort(np.array(times))
        log(f"[step4] Coverage: {str(t[0])} → {str(t[-1])}  ({len(t)} files)")
        if len(t)>1:
            diffs = ((t[1:] - t[:-1]).astype("timedelta64[s]").astype(int)).tolist()
            from collections import Counter
            counts = Counter(diffs)
            log("[step4] Intervals (s → count): " + ", ".join(f"{k}:{v}" for k,v in sorted(counts.items())))
            if any(d<=0 for d in diffs):
                issues.append(("MONOTONIC","<all>","non-increasing timestamps"))
            if len(set(t.tolist())) != len(t):
                issues.append(("DUPLICATES","<all>","duplicate timestamps"))
    if issues:
        log("[step4] Issues:")
        for kind, name, msg in issues[:50]:  # cap output
            log(f"  [{kind}] {name} — {msg}")
        if len(issues) > 50:
            log(f"  ... and {len(issues)-50} more")
    else:
        log("[step4] ✅ All checks passed")


# -----------------------------
# STEP 5 — build filtered file list (ENSEMBLES)
# -----------------------------

In [6]:
def build_filelist(root: Path, ensembles: Sequence[str], out_name: str) -> Path:
    patt = re.compile(rf"^lmhofs\.fields\.({'|'.join(map(re.escape, ensembles))})\.\d{{8}}\.t\d{{2}}z\.nc$", re.IGNORECASE)
    chosen = [p for p in list_nc_files(root) if patt.match(p.name)]
    out = root / out_name
    with out.open("w") as f:
        for p in sorted(chosen):
            f.write(str(p) + "\n")
    log(f"[step5] filelist → {out}  ({len(chosen)} lines)")
    return out

# -----------------------------
# STEP 6 — window check on filelist
# -----------------------------

In [7]:
def parse_units_epoch(units: str) -> np.datetime64 | None:
    m = re.search(r"since\s+(\d{4})-(\d{2})-(\d{2})(?:[ T](\d{2}):(\d{2}):(\d{2}))?", units or "")
    if not m: return None
    Y,M,D = map(int, m.groups()[:3])
    hh = int(m.group(4) or 0); mm = int(m.group(5) or 0); ss = int(m.group(6) or 0)
    return np.datetime64(f"{Y:04d}-{M:02d}-{D:02d}T{hh:02d}:{mm:02d}:{ss:02d}", "s")

def decode_any_time(ds: xr.Dataset) -> np.ndarray:
    # Itime first (cheap), then CF
    if "Itime" in ds:
        it  = np.asarray(ds["Itime"].values).astype("int64")
        it2 = np.asarray(ds["Itime2"].values).astype("int64") if "Itime2" in ds else np.zeros_like(it)
        epoch = parse_units_epoch(ds["Itime"].attrs.get("units","")) or MJD_EPOCH64
        return (epoch + it.astype("timedelta64[D]") + it2.astype("timedelta64[s]")).astype("datetime64[s]")
    if "time" in ds:
        u = ds["time"].attrs.get("units","")
        if "since" in u:
            vals = np.asarray(ds["time"].values)
            py = netCDF4.num2date(vals, u, calendar=ds["time"].attrs.get("calendar","standard"))
            secs = np.array([int(p.timestamp()) for p in np.atleast_1d(py)], dtype="int64")
            return secs.astype("datetime64[s]")
    raise RuntimeError("no time variable")

def check_window(filelist: Path, start_iso: str, end_iso: str):
    with filelist.open() as f:
        paths = [Path(ln.strip()) for ln in f if ln.strip().endswith(".nc")]
    stamps = []
    for p in paths:
        try:
            with xr.open_dataset(p, decode_times=False, engine="netcdf4") as ds:
                t = decode_any_time(ds)
            if t.size: stamps.append(t)
        except Exception as e:
            log(f"[step6] WARN {p.name}: {e}")
    if not stamps:
        log("[step6] No timestamps decoded."); return
    T = np.sort(np.concatenate(stamps))
    log(f"[step6] Files: {len(paths)}  stamps: {T.size}")
    log(f"[step6] Coverage: {str(T[0])} → {str(T[-1])}")
    s = np.datetime64(start_iso, "s"); e = np.datetime64(end_iso, "s")
    ok = (s >= T[0]) and (e <= T[-1])
    log(f"[step6] Requested: {s} → {e}  within? {ok}")
    i = int(np.searchsorted(T, s))
    i0 = max(0, i-3); i1 = min(T.size, i+3)
    log("[step6] Nearby stamps: " + ", ".join(str(T[k]) for k in range(i0, i1)))

# -----------------------------
# RUN ALL STEPS
# -----------------------------

In [8]:
log("=== START PIPELINE ===")
rename_strip_nos(ROOT)
normalize_names(ROOT)
fix_times_all(ROOT)
verify_files(ROOT)
fl = build_filelist(ROOT, ENSEMBLES, FILELIST_NAME)

if CHECK_START and CHECK_END:
    check_window(fl, CHECK_START, CHECK_END)

log("=== DONE ===")

=== START PIPELINE ===
renamed: nos.lmhofs.fields.n000.20240901.t00z.nc -> lmhofs.fields.n000.20240901.t00z.nc
renamed: nos.lmhofs.fields.n000.20240901.t06z.nc -> lmhofs.fields.n000.20240901.t06z.nc
renamed: nos.lmhofs.fields.n000.20240901.t12z.nc -> lmhofs.fields.n000.20240901.t12z.nc
renamed: nos.lmhofs.fields.n000.20240901.t18z.nc -> lmhofs.fields.n000.20240901.t18z.nc
renamed: nos.lmhofs.fields.n000.20240902.t00z.nc -> lmhofs.fields.n000.20240902.t00z.nc
renamed: nos.lmhofs.fields.n000.20240902.t06z.nc -> lmhofs.fields.n000.20240902.t06z.nc
renamed: nos.lmhofs.fields.n000.20240902.t12z.nc -> lmhofs.fields.n000.20240902.t12z.nc
renamed: nos.lmhofs.fields.n000.20240902.t18z.nc -> lmhofs.fields.n000.20240902.t18z.nc
renamed: nos.lmhofs.fields.n000.20240903.t00z.nc -> lmhofs.fields.n000.20240903.t00z.nc
renamed: nos.lmhofs.fields.n000.20240903.t06z.nc -> lmhofs.fields.n000.20240903.t06z.nc
renamed: nos.lmhofs.fields.n000.20240903.t12z.nc -> lmhofs.fields.n000.20240903.t12z.nc
renamed: 