In [None]:
# --- fast, memory-safe NDVI → per-polygon median time series ---
import os, re, glob
from pathlib import Path
import numpy as np
import pandas as pd
import geopandas as gpd
import xarray as xr
import rioxarray as rxr
from shapely.geometry import mapping, box
from shapely.validation import make_valid
from dask.distributed import Client
from dask import delayed, compute
from dask.diagnostics import ProgressBar

# ----------------------------
# Paths
# ----------------------------
top_dir = "/datawaha/esom/DatePalmCounting"
satellite_dir = f"{top_dir}/SatelliteData/Sentinel_2_NDVI_biweek_mean"
poly_path = f"{top_dir}/Geoportal/Datepalm/app_server/datepalms/Qassim_datepalm_fields_polygons.geojson"
OUT_DIR = Path(top_dir) / "Geoportal/Datepalm/app_server" / "ndvi_timeseries_csvs"
OUT_DIR.mkdir(parents=True, exist_ok=True)

# All years/tiles (adjust pattern if needed)
satellite_list = sorted(glob.glob(f"{satellite_dir}/*38RLQ-0000000000-0000000000.tif"))
assert satellite_list, "No GeoTIFFs found."

# ----------------------------
# Dask client (bounded workers; spills when needed)
# ----------------------------
N_WORKERS = min(8, max(2, (os.cpu_count() or 4) - 1))
client = Client(n_workers=N_WORKERS, threads_per_worker=1, processes=True)
print("Dask dashboard:", client.dashboard_link)

# ----------------------------
# Helpers (no global compute)
# ----------------------------
def band_to_date(year: int, band_index: int) -> np.datetime64:
    m = (band_index - 1) // 2 + 1
    d = 1 if band_index % 2 == 1 else 16
    return np.datetime64(f"{year:04d}-{m:02d}-{d:02d}")

def year_from_name(path: str | Path) -> int:
    s = Path(path).name
    m = re.search(r"(20(1[0-9]|2[0-9]))", s)  # 2010–2029
    if not m:
        raise ValueError(f"Cannot parse year from filename: {s}")
    return int(m.group(1))

def maybe_scale_ndvi_lazy(da: xr.DataArray, assume_scale=10000) -> xr.DataArray:
    """Avoid global min/max. If integer dtype, assume NDVI is scaled by 1e4."""
    if np.issubdtype(da.dtype, np.integer) and da.dtype != bool:
        da = da.astype("float32") / float(assume_scale)
    else:
        da = da.astype("float32")
    nd = da.rio.nodata
    if nd is not None:
        da = da.where(da != nd)
    # keep reasonable range
    return da.where((da >= -1.2) & (da <= 1.2))

def interp_extend(df: pd.DataFrame) -> pd.DataFrame:
    df = df.sort_values("date").reset_index(drop=True)
    s = (df.set_index("date")["ndvi_median"]
          .replace(0, np.nan)
          .interpolate(method="time", limit_direction="both")
          .ffill().bfill().clip(-1.0, 1.0))
    df["ndvi_median"] = s.values
    return df

# ----------------------------
# 1) Build LAZY time-cube (no compute, no persist)
# ----------------------------
CHUNKS = {"band": 1, "y": 1024, "x": 1024}   # smaller chunks -> lower peak RAM

da_list = []
for tif in satellite_list:
    year = year_from_name(tif)
    da = rxr.open_rasterio(tif, chunks=CHUNKS)  # (band,y,x) dask-backed

    # Rename band→time & assign dates (no conflict)
    dates = [band_to_date(year, b) for b in range(1, int(da.sizes["band"]) + 1)]
    da = da.rename({"band": "time"})
    da = da.assign_coords(time=("time", np.array(dates, dtype="datetime64[D]")))
    da.name = "ndvi"
    da_list.append(da)

# Concatenate lazily
NDVI = xr.concat(da_list, dim="time").sortby("time")
# Scale lazily (no .min/.max)
NDVI = maybe_scale_ndvi_lazy(NDVI)
# DO NOT .compute() or .persist() here – keep it lazy to avoid huge RAM use.

# ----------------------------
# 2) Load polygons (match CRS exactly)
# ----------------------------
polys = gpd.read_file(poly_path)
ID_COL = "Field_id" if "Field_id" in polys.columns else (
         "field_id" if "field_id" in polys.columns else (
         "poly_id"  if "poly_id"  in polys.columns else None))
if ID_COL is None:
    polys = polys.reset_index(names="field_id"); ID_COL = "field_id"

# Ensure CRS matches cube (your earlier code forced EPSG:32638; we’ll align to raster instead)
assert NDVI.rio.crs is not None, "Raster has no CRS."
if polys.crs != NDVI.rio.crs:
    polys = polys.to_crs(NDVI.rio.crs)

# Fix invalid geometries once
polys["geometry"] = polys.geometry.map(make_valid)

# Optional: subset for testing
# polys = polys.iloc[0:500]

# ----------------------------
# 3) Define per-polygon worker (bbox→clip→median→interpolate→CSV)
# ----------------------------
@delayed
def process_one_polygon(row, out_dir=OUT_DIR, all_touched=True, pad_px=1):
    pid = int(row[ID_COL])
    out_csv = out_dir / f"{pid}.csv"
    if out_csv.exists():
        return pid, "skip"

    geom = row.geometry
    # quick bounding-box reject against global raster bounds
    rb = box(*NDVI.rio.bounds())
    if not rb.intersects(geom):
        out_csv.write_text("date,ndvi_median\n")
        return pid, "no_coverage"

    # First trim by bbox (much cheaper), then precise clip
    # Use a small padding in CRS units based on pixel size to avoid empty trims
    dx = abs(NDVI.rio.transform()[0])
    dy = abs(NDVI.rio.transform()[4])
    minx, miny, maxx, maxy = geom.bounds
    sub = NDVI.rio.clip_box(minx - pad_px*dx, miny - pad_px*dy,
                            maxx + pad_px*dx, maxy + pad_px*dy)
    sub = sub.rio.clip([mapping(geom)], all_touched=all_touched, drop=True)

    if sub.size == 0 or sub.isnull().all():
        out_csv.write_text("date,ndvi_median\n")
        return pid, "empty"

    # median over (y,x) per time -> Dask reduction, tiny output
    med_ts = sub.median(dim=("y", "x"), skipna=True)

    # Materialize tiny vector only
    dates = pd.to_datetime(med_ts["time"].values.astype("datetime64[D]"))
    vals  = med_ts.compute().values  # <= len(time) numbers

    df = pd.DataFrame({"date": dates, "ndvi_median": vals})
    df = interp_extend(df)
    df.to_csv(out_csv, index=False)
    return pid, "ok"

# ----------------------------
# 4) Dispatch with bounded parallelism
# ----------------------------
tasks = [process_one_polygon(row) for _, row in polys.iterrows()]
with ProgressBar():
    results = compute(*tasks)

ok = sum(1 for _, s in results if s == "ok")
print(f"✓ wrote {ok} CSVs → {OUT_DIR}")
