# ECOSTRESS ET Processing Pipeline

This notebook implements a complete QC and processing pipeline for ECOSTRESS L3T JET evapotranspiration tiles downloaded for Iowa (2019–2023). The output is a spatially aligned, cloud-masked, temporally organized stack of daily ET rasters ready for analysis.

**Input:** Raw `ETdaily.tif` and `cloud.tif` GeoTIFFs in `data/raw/ECOSTRESS/`  
**Output:** Processed rasters in `data/processed/ECOSTRESS/`

---

## Pipeline

| Step | Description |
|------|-------------|
| **1** | **Verify Downloads** — confirm ETdaily + cloud layers present, check 2019–2023 temporal coverage |
| **2** | **Apply QC Flags** — remove scenes obstructed by ISS solar panels or flagged for poor geolocation |
| **3** | **Mosaic Tiles** — merge overlapping tiles per acquisition date, apply cloud mask + scale factor |
| **4** | **Reproject & Clip** — reproject to WGS84, clip to Iowa AOI, resample to NLDAS 0.125° grid |
| **5** | **Structure Temporally** — sort dates, export processed inventory CSV |

---

## 1. Environment Setup

**Libraries:**
- `rasterio` / `rioxarray`: Raster I/O, reprojection, clipping
- `rasterio.merge`: Mosaicking tiles per date
- `geopandas`: Iowa AOI geometry
- `numpy` / `pandas`: Array operations and tabular summaries
- `pathlib`: Cross-platform file paths

In [None]:
import re
import warnings
from collections import defaultdict
from datetime import datetime
from pathlib import Path

import geopandas as gpd
import matplotlib.pyplot as plt
import matplotlib.ticker as ticker
import numpy as np
import pandas as pd
import rasterio
import rioxarray
from rasterio.merge import merge
from rasterio.mask import mask as rasterio_mask
from rasterio.warp import Resampling, calculate_default_transform, reproject
from rasterio.transform import from_bounds

warnings.filterwarnings('ignore')
print("Libraries loaded.")
print(f"  rasterio: {rasterio.__version__}")
print(f"  rioxarray: {rioxarray.__version__}")
print(f"  geopandas: {gpd.__version__}")

---

## 2. Configuration

All paths and processing parameters are defined here.

In [None]:
# ── Paths ──────────────────────────────────────────────────────────────────
NOTEBOOK_DIR = Path().resolve()
PROJECT_ROOT = NOTEBOOK_DIR.parent.parent  # SIF-Analysis/

RAW_DIR        = PROJECT_ROOT / "data" / "raw" / "ECOSTRESS"
AOI_PATH       = PROJECT_ROOT / "data" / "aoi" / "iowa.geojson"
QC_OBST_FILE   = PROJECT_ROOT / "src" / "scripts" / "obst_all_sort.txt"
QC_GEO_FILE    = PROJECT_ROOT / "src" / "scripts" / "qa_20250423-present.txt"

OUT_MOSAIC_DIR = PROJECT_ROOT / "data" / "processed" / "ECOSTRESS" / "mosaics"
OUT_CLIP_DIR   = PROJECT_ROOT / "data" / "processed" / "ECOSTRESS" / "clipped"

OUT_MOSAIC_DIR.mkdir(parents=True, exist_ok=True)
OUT_CLIP_DIR.mkdir(parents=True, exist_ok=True)

# ── QC Settings ────────────────────────────────────────────────────────────
# Geolocation QA values to ACCEPT (reject 'Suspect' and 'NotFound')
ACCEPT_GEOLOC_QA = {"Best", "Good"}

# ── ECOSTRESS Scale Factor ─────────────────────────────────────────────────
# ETdaily is stored as uint16; multiply by 0.1 to get mm/day
ET_SCALE_FACTOR  = 0.1
ET_NODATA_RAW    = 0      # raw NoData value before scaling
ET_NODATA_SCALED = np.nan

# ── Target CRS & Resolution ────────────────────────────────────────────────
# WGS84 to match SIF (OCO-2) and NLDAS coordinate systems
TARGET_CRS = "EPSG:4326"

# NLDAS spatial resolution (degrees) for final resampling
NLDAS_RES = 0.125

# ── Study Period ───────────────────────────────────────────────────────────
START_YEAR = 2019
END_YEAR   = 2023

print(f"Project root:  {PROJECT_ROOT}")
print(f"Raw data dir:  {RAW_DIR}  (exists: {RAW_DIR.exists()})")
print(f"Iowa AOI:      {AOI_PATH}  (exists: {AOI_PATH.exists()})")
print(f"Mosaic output: {OUT_MOSAIC_DIR}")
print(f"Clipped output:{OUT_CLIP_DIR}")

---

## Step 1 — Verify Downloads

Scan all TIFFs in `data/raw/ECOSTRESS/`, parse metadata from filenames, and verify that:
- Both `ETdaily.tif` and `cloud.tif` are present for each scene
- Coverage spans 2019–2023
- No obvious gaps exist in the record

**ECOSTRESS filename structure:**  
`ECOv002_L3T_JET_{ORB}_{SCN}_{TILE}_{DATETIME}_{BUILD}_{VER}_{LAYER}.tif`
- `ORB` = 5-digit orbit number  
- `SCN` = 3-digit scene number  
- `TILE` = MGRS tile ID (e.g., `15TVG`)  
- `DATETIME` = `YYYYMMDDTHHMMSS`  
- `LAYER` = `ETdaily` or `cloud`

In [None]:
def parse_ecostress_filename(path):
    """Parse metadata from an ECOSTRESS L3T JET filename.
    
    Returns a dict with keys: orbit, scene, tile, datetime, layer, stem.
    Returns None if filename does not match expected pattern.
    """
    p = Path(path)
    parts = p.stem.split('_')  # strip .tif, then split
    # Expected: ECOv002_L3T_JET_NNNNN_SSS_TILE_DATETIME_BUILD_VER_LAYER
    if len(parts) < 10 or parts[0] != 'ECOv002':
        return None
    try:
        orbit = int(parts[3])
        scene = int(parts[4])
        tile  = parts[5]
        dt    = datetime.strptime(parts[6], "%Y%m%dT%H%M%S")
        layer = parts[9]  # 'ETdaily' or 'cloud'
        return {
            'path':     p,
            'orbit':    orbit,
            'scene':    scene,
            'tile':     tile,
            'datetime': dt,
            'date':     dt.date(),
            'year':     dt.year,
            'month':    dt.month,
            'layer':    layer,
            'stem':     p.stem,
        }
    except (ValueError, IndexError):
        return None


# ── Scan all TIFFs ─────────────────────────────────────────────────────────
all_tifs = sorted(RAW_DIR.glob("*.tif"))
print(f"Total TIF files found: {len(all_tifs)}")

records = [parse_ecostress_filename(f) for f in all_tifs]
records = [r for r in records if r is not None]
df_all  = pd.DataFrame(records)

et_df    = df_all[df_all['layer'] == 'ETdaily'].copy()
cloud_df = df_all[df_all['layer'] == 'cloud'].copy()

print(f"  ETdaily files: {len(et_df)}")
print(f"  cloud   files: {len(cloud_df)}")
print(f"  Unrecognized:  {len(all_tifs) - len(records)}")

In [None]:
# ── Check ETdaily / cloud pairing ──────────────────────────────────────────
# Build a scene key from orbit + scene number to check for matching pairs
et_keys    = set(zip(et_df['orbit'],    et_df['scene'],    et_df['datetime']))
cloud_keys = set(zip(cloud_df['orbit'], cloud_df['scene'], cloud_df['datetime']))

paired    = et_keys & cloud_keys
et_only   = et_keys - cloud_keys
cloud_only= cloud_keys - et_keys

print("Scene pairing summary:")
print(f"  Fully paired (ETdaily + cloud): {len(paired):,}")
print(f"  ETdaily only (no cloud file):   {len(et_only):,}")
print(f"  cloud only   (no ETdaily):      {len(cloud_only):,}")

# ── Temporal coverage ──────────────────────────────────────────────────────
print("\nYear-by-year scene counts (ETdaily):")
year_counts = et_df.groupby('year').size()
for yr, cnt in year_counts.items():
    print(f"  {yr}: {cnt:,} scenes")

missing_years = [y for y in range(START_YEAR, END_YEAR + 1) if y not in year_counts.index]
if missing_years:
    print(f"\nWARNING: No scenes found for years: {missing_years}")
else:
    print(f"\nAll years {START_YEAR}–{END_YEAR} present.")

# ── Monthly heatmap ────────────────────────────────────────────────────────
pivot = et_df.groupby(['year', 'month']).size().unstack(fill_value=0)
pivot = pivot.reindex(columns=range(1, 13), fill_value=0)

fig, ax = plt.subplots(figsize=(13, 3.5))
im = ax.imshow(pivot.values, aspect='auto', cmap='YlGn', vmin=0)
ax.set_xticks(range(12))
ax.set_xticklabels(['Jan','Feb','Mar','Apr','May','Jun',
                    'Jul','Aug','Sep','Oct','Nov','Dec'])
ax.set_yticks(range(len(pivot)))
ax.set_yticklabels(pivot.index)
for r, year in enumerate(pivot.index):
    for c, month in enumerate(range(1, 13)):
        val = pivot.loc[year, month]
        if val > 0:
            ax.text(c, r, str(val), ha='center', va='center', fontsize=8)
plt.colorbar(im, ax=ax, label='Scene count')
ax.set_title('ECOSTRESS ETdaily Scene Count by Year and Month')
plt.tight_layout()
plt.show()

---

## Step 2 — Apply QC Flags

Two QC issues can compromise ECOSTRESS data quality:

1. **Solar panel obstruction (`obst_all_sort.txt`)** — The ISS solar panels can obstruct the ECOSTRESS field of view, producing corrupted thermal data. All entries in this file have `FOV_OBST=YES`. These scenes are excluded.

2. **Geolocation accuracy** — Scenes with `GeolocationAccuracyQA = "Suspect"` or `"NotFound"` have unreliable pixel locations and are also excluded. This flag is embedded in `obst_all_sort.txt` for each obstructed scene, and in `qa_20250423-present.txt` for April 2025+ scenes.

> **Note:** `qa_20250423-present.txt` covers data from April 23, 2025 onwards and does not apply to the 2019–2023 study period. It is parsed here for completeness.

Scenes matching either flag are removed **before** mosaicking to avoid propagating bad data.

In [None]:
def parse_obstruction_file(path):
    """Parse obst_all_sort.txt.
    
    Returns:
        bad_scenes  : set of (orbit, scene) — FOV obstructed, exclude all
        suspect_geo : set of (orbit, scene) — also have bad geolocation
    """
    bad_scenes  = set()
    suspect_geo = set()

    orb_re = re.compile(r'ORB=(\d+)')
    scn_re = re.compile(r'SCN=(\d+)')
    geo_re = re.compile(r'GeolocationAccuracyQA="([^"]+)"')

    with open(path) as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            orb_m = orb_re.search(line)
            scn_m = scn_re.search(line)
            geo_m = geo_re.search(line)
            if not (orb_m and scn_m):
                continue
            key = (int(orb_m.group(1)), int(scn_m.group(1)))
            bad_scenes.add(key)
            if geo_m and geo_m.group(1) not in ACCEPT_GEOLOC_QA:
                suspect_geo.add(key)

    return bad_scenes, suspect_geo


def parse_geoloc_file(path):
    """Parse qa_20250423-present.txt.
    
    Returns set of (orbit, scene) with poor geolocation (Suspect/NotFound).
    These reference L1B GEO files; orbit and scene are in the filename.
    """
    suspect = set()
    fn_re   = re.compile(r'ECOv002_L1B_GEO_(\d+)_(\d+)_')
    geo_re  = re.compile(r'GeolocationAccuracyQA="([^"]+)"')

    with open(path) as f:
        for line in f:
            line = line.strip()
            fn_m  = fn_re.search(line)
            geo_m = geo_re.search(line)
            if fn_m and geo_m and geo_m.group(1) not in ACCEPT_GEOLOC_QA:
                key = (int(fn_m.group(1)), int(fn_m.group(2)))
                suspect.add(key)

    return suspect


# ── Load QC flags ──────────────────────────────────────────────────────────
bad_obstruction, suspect_geoloc_obst = parse_obstruction_file(QC_OBST_FILE)
suspect_geoloc_geo = parse_geoloc_file(QC_GEO_FILE)

# Union: all scenes to exclude
# (1) solar panel obstruction — exclude all
# (2) bad geolocation from either file
all_bad_geoloc = suspect_geoloc_obst | suspect_geoloc_geo
all_excluded   = bad_obstruction | all_bad_geoloc

print("QC flag summary:")
print(f"  Obstruction file:     {len(bad_obstruction):,} (orbit, scene) pairs flagged")
print(f"    → of which, also bad geoloc: {len(suspect_geoloc_obst):,}")
print(f"  Geoloc file (2025+): {len(suspect_geoloc_geo):,} pairs with poor geolocation")
print(f"  Total unique excluded scenes:  {len(all_excluded):,}")

In [None]:
# ── Apply QC flags to ETdaily file list ────────────────────────────────────
et_df['qc_key']        = list(zip(et_df['orbit'], et_df['scene']))
et_df['obstructed']    = et_df['qc_key'].isin(bad_obstruction)
et_df['bad_geoloc']    = et_df['qc_key'].isin(all_bad_geoloc)
et_df['excluded']      = et_df['qc_key'].isin(all_excluded)

et_pass = et_df[~et_df['excluded']].copy()
et_fail = et_df[et_df['excluded']].copy()

print("QC filtering results (ETdaily scenes):")
print(f"  Total ETdaily scenes:     {len(et_df):,}")
print(f"  Flagged — obstructed:     {et_df['obstructed'].sum():,}")
print(f"  Flagged — bad geoloc:     {et_df['bad_geoloc'].sum():,}")
print(f"  Total excluded:           {len(et_fail):,} ({100*len(et_fail)/len(et_df):.1f}%)")
print(f"  Passed QC:                {len(et_pass):,} ({100*len(et_pass)/len(et_df):.1f}%)")

# Year-by-year breakdown after QC
print("\nScenes passing QC per year:")
for yr, cnt in et_pass.groupby('year').size().items():
    total_yr = len(et_df[et_df['year'] == yr])
    print(f"  {yr}: {cnt:,} / {total_yr:,} ({100*cnt/total_yr:.1f}%)")

---

## Step 3 — Mosaic Tiles by Acquisition Date

Multiple ECOSTRESS tiles may cover Iowa on the same acquisition pass. This step:

1. Groups QC-passed ETdaily files by **date** (`YYYYMMDD`)
2. For each scene, applies the paired cloud mask (pixels with cloud value > 0 are set to NaN)
3. Applies the **scale factor** (×0.1) to convert raw uint16 to mm/day
4. Merges all tiles for that date into a single mosaicked GeoTIFF
5. Saves to `data/processed/ECOSTRESS/mosaics/`

> **Cloud mask values:** By default, pixels with cloud mask > 0 are treated as cloudy. The first available cloud file is inspected to confirm the encoding.

In [None]:
# ── Inspect cloud mask encoding ────────────────────────────────────────────
sample_cloud = cloud_df['path'].iloc[0] if len(cloud_df) > 0 else None

if sample_cloud and sample_cloud.exists():
    with rasterio.open(sample_cloud) as src:
        cloud_sample = src.read(1)
        unique_vals  = np.unique(cloud_sample)
    print(f"Cloud mask sample: {sample_cloud.name}")
    print(f"  Unique values: {unique_vals}")
    print(f"  dtype: {cloud_sample.dtype}")
    print("  Interpretation: values > 0 will be treated as cloudy (masked).")
else:
    print("No cloud files found — cloud masking will be skipped.")

In [None]:
# ── Build cloud file lookup: (orbit, scene, datetime) -> cloud path ─────────
cloud_lookup = {
    (r['orbit'], r['scene'], r['datetime']): r['path']
    for _, r in cloud_df.iterrows()
}


def load_et_with_cloud_mask(et_path, cloud_path=None):
    """Read ETdaily TIF, apply cloud mask, apply scale factor.
    
    Returns:
        data    : float32 array, mm/day, NaN where cloudy or NoData
        profile : rasterio profile of the source file
    """
    with rasterio.open(et_path) as src:
        raw     = src.read(1).astype(np.float32)
        profile = src.profile.copy()

    # Apply scale factor and NoData mask
    data = raw * ET_SCALE_FACTOR
    data[raw == ET_NODATA_RAW] = np.nan

    # Apply cloud mask if available
    if cloud_path is not None and Path(cloud_path).exists():
        with rasterio.open(cloud_path) as csrc:
            cloud = csrc.read(1)
        data[cloud > 0] = np.nan

    profile.update(dtype='float32', nodata=np.nan, count=1)
    return data, profile


def mosaic_date(date_rows, cloud_lookup, out_dir):
    """Mosaic all QC-passed tiles for a single acquisition date.
    
    Parameters
    ----------
    date_rows    : DataFrame rows for one date
    cloud_lookup : dict mapping (orbit, scene, datetime) to cloud TIF path
    out_dir      : output directory for mosaic TIFs
    
    Returns
    -------
    out_path : Path to saved mosaic, or None if all tiles were empty
    """
    date_str  = date_rows.iloc[0]['date'].strftime("%Y%m%d")
    out_path  = out_dir / f"ECOSTRESS_ET_{date_str}.tif"

    if out_path.exists():
        return out_path  # skip already-processed dates

    mem_files = []
    tmp_paths = []

    for _, row in date_rows.iterrows():
        cloud_key  = (row['orbit'], row['scene'], row['datetime'])
        cloud_path = cloud_lookup.get(cloud_key)

        data, profile = load_et_with_cloud_mask(row['path'], cloud_path)

        # Skip if tile is entirely NaN
        if np.all(np.isnan(data)):
            continue

        # Write to a temp in-memory file for merging
        tmp = rasterio.MemoryFile()
        with tmp.open(**profile) as dst:
            dst.write(data[np.newaxis, :, :])
        mem_files.append(tmp)
        tmp_paths.append(tmp)

    if not mem_files:
        return None  # all tiles empty after masking

    # Open the in-memory files for merging
    opened = [mf.open() for mf in mem_files]
    mosaic, out_transform = merge(opened, method='first', nodata=np.nan)
    out_profile = opened[0].profile.copy()
    out_profile.update(
        height=mosaic.shape[1],
        width=mosaic.shape[2],
        transform=out_transform,
        compress='lzw'
    )

    with rasterio.open(out_path, 'w', **out_profile) as dst:
        dst.write(mosaic)

    for ds in opened:
        ds.close()
    for mf in mem_files:
        mf.close()

    return out_path


print("Mosaic functions defined.")

In [None]:
# ── Run mosaicking for all acquisition dates ───────────────────────────────
# Group QC-passed ETdaily scenes by date
grouped = et_pass.groupby('date')
dates   = sorted(grouped.groups.keys())

print(f"Processing {len(dates)} unique acquisition dates...")
print("(Dates already mosaicked will be skipped.)\n")

mosaic_log = []  # track results

for i, date in enumerate(dates):
    date_rows = grouped.get_group(date)
    out_path  = mosaic_date(date_rows, cloud_lookup, OUT_MOSAIC_DIR)

    mosaic_log.append({
        'date':       date,
        'n_tiles':    len(date_rows),
        'mosaicked':  out_path is not None,
        'path':       str(out_path) if out_path else None,
    })

    if (i + 1) % 100 == 0 or (i + 1) == len(dates):
        done = sum(r['mosaicked'] for r in mosaic_log)
        print(f"  [{i+1}/{len(dates)}] {done} mosaics written, "
              f"{len(mosaic_log)-done} empty/skipped")

mosaic_df = pd.DataFrame(mosaic_log)
print(f"\nMosaic complete.")
print(f"  Total dates:    {len(mosaic_df)}")
print(f"  Mosaics saved:  {mosaic_df['mosaicked'].sum()}")
print(f"  Empty (all NaN):{(~mosaic_df['mosaicked']).sum()}")

---

## Step 4 — Reproject, Clip, and Resample to Iowa / NLDAS Grid

ECOSTRESS tiles use UTM projections (multiple zones across Iowa). This step:

1. **Reprojects** each mosaic to **WGS84 (EPSG:4326)** — the CRS of the SIF (OCO-2) and NLDAS datasets
2. **Clips** to the Iowa state boundary (`data/aoi/iowa.geojson`)
3. **Resamples** from ECOSTRESS native resolution (~70m) to **NLDAS 0.125°** using area-averaging (`Resampling.average`), enabling direct comparison with NLDAS ET

Output rasters are saved to `data/processed/ECOSTRESS/clipped/`.

In [None]:
# ── Load Iowa AOI ──────────────────────────────────────────────────────────
iowa_gdf = gpd.read_file(AOI_PATH)

# Ensure it's in WGS84 for clipping and grid alignment
iowa_gdf = iowa_gdf.to_crs(TARGET_CRS)
iowa_bounds = iowa_gdf.total_bounds  # (minx, miny, maxx, maxy)

print(f"Iowa AOI loaded: {len(iowa_gdf)} feature(s)")
print(f"  CRS:    {iowa_gdf.crs}")
print(f"  Bounds: W={iowa_bounds[0]:.3f} S={iowa_bounds[1]:.3f} "
      f"E={iowa_bounds[2]:.3f} N={iowa_bounds[3]:.3f}")

# ── Define NLDAS-aligned output grid ──────────────────────────────────────
# Snap Iowa bounds to the nearest 0.125° NLDAS grid cell
res = NLDAS_RES
grid_west  = np.floor(iowa_bounds[0] / res) * res
grid_south = np.floor(iowa_bounds[1] / res) * res
grid_east  = np.ceil( iowa_bounds[2] / res) * res
grid_north = np.ceil( iowa_bounds[3] / res) * res

grid_width  = int(round((grid_east  - grid_west)  / res))
grid_height = int(round((grid_north - grid_south) / res))
grid_transform = from_bounds(grid_west, grid_south, grid_east, grid_north,
                              grid_width, grid_height)

print(f"\nNLDAS-aligned output grid:")
print(f"  Resolution: {res}°")
print(f"  Extent:     W={grid_west} S={grid_south} E={grid_east} N={grid_north}")
print(f"  Size:       {grid_width} cols × {grid_height} rows")

In [None]:
# ── Reproject + clip + resample function ───────────────────────────────────
iowa_shapes = [geom.__geo_interface__ for geom in iowa_gdf.geometry]


def reproject_clip_resample(mosaic_path, out_dir):
    """Reproject mosaic to WGS84, clip to Iowa, resample to NLDAS grid.
    
    Parameters
    ----------
    mosaic_path : Path to input mosaic TIF (UTM projection)
    out_dir     : Output directory
    
    Returns
    -------
    out_path : Path to saved clipped TIF, or None on error
    """
    out_path = out_dir / mosaic_path.name
    if out_path.exists():
        return out_path

    try:
        # Step 1: Reproject UTM → WGS84 at native resolution
        with rasterio.open(mosaic_path) as src:
            if src.crs is None:
                return None

            # Calculate intermediate WGS84 transform (native res)
            wgs_transform, wgs_width, wgs_height = calculate_default_transform(
                src.crs, TARGET_CRS, src.width, src.height, *src.bounds
            )
            wgs_data = np.full((wgs_height, wgs_width), np.nan, dtype=np.float32)
            reproject(
                source=rasterio.band(src, 1),
                destination=wgs_data,
                src_transform=src.transform,
                src_crs=src.crs,
                dst_transform=wgs_transform,
                dst_crs=TARGET_CRS,
                resampling=Resampling.bilinear,
                src_nodata=np.nan,
                dst_nodata=np.nan,
            )
            wgs_profile = src.profile.copy()
            wgs_profile.update(
                crs=TARGET_CRS,
                transform=wgs_transform,
                width=wgs_width,
                height=wgs_height,
                dtype='float32',
                nodata=np.nan,
            )

        # Step 2: Resample to NLDAS 0.125° grid using area averaging
        nldas_data = np.full((grid_height, grid_width), np.nan, dtype=np.float32)
        with rasterio.MemoryFile() as memfile:
            with memfile.open(**wgs_profile) as tmp:
                tmp.write(wgs_data[np.newaxis, :, :])
            with memfile.open() as tmp:
                reproject(
                    source=rasterio.band(tmp, 1),
                    destination=nldas_data,
                    src_transform=wgs_transform,
                    src_crs=TARGET_CRS,
                    dst_transform=grid_transform,
                    dst_crs=TARGET_CRS,
                    resampling=Resampling.average,
                    src_nodata=np.nan,
                    dst_nodata=np.nan,
                )

        # Step 3: Clip to Iowa state boundary
        nldas_profile = wgs_profile.copy()
        nldas_profile.update(
            transform=grid_transform,
            width=grid_width,
            height=grid_height,
        )
        with rasterio.MemoryFile() as memfile:
            with memfile.open(**nldas_profile) as tmp:
                tmp.write(nldas_data[np.newaxis, :, :])
            with memfile.open() as tmp:
                clipped, clip_transform = rasterio_mask(
                    tmp, iowa_shapes, crop=True, nodata=np.nan, filled=True
                )
                clip_profile = tmp.profile.copy()
                clip_profile.update(
                    transform=clip_transform,
                    width=clipped.shape[2],
                    height=clipped.shape[1],
                    compress='lzw',
                )

        with rasterio.open(out_path, 'w', **clip_profile) as dst:
            dst.write(clipped)

        return out_path

    except Exception as e:
        print(f"  ERROR processing {mosaic_path.name}: {e}")
        return None


print("Reproject/clip function defined.")

In [None]:
# ── Run reproject + clip for all mosaics ──────────────────────────────────
mosaic_files = sorted(OUT_MOSAIC_DIR.glob("ECOSTRESS_ET_*.tif"))
print(f"Reprojecting and clipping {len(mosaic_files)} mosaics...")
print("(Already processed files will be skipped.)\n")

clip_log = []

for i, mosaic_path in enumerate(mosaic_files):
    out_path = reproject_clip_resample(mosaic_path, OUT_CLIP_DIR)
    clip_log.append({
        'mosaic': mosaic_path.name,
        'clipped': out_path is not None,
        'path':    str(out_path) if out_path else None,
    })

    if (i + 1) % 100 == 0 or (i + 1) == len(mosaic_files):
        done = sum(r['clipped'] for r in clip_log)
        print(f"  [{i+1}/{len(mosaic_files)}] {done} files written")

clip_df = pd.DataFrame(clip_log)
print(f"\nReproject/clip complete.")
print(f"  Processed: {clip_df['clipped'].sum()}")
print(f"  Errors:    {(~clip_df['clipped']).sum()}")

---

## Step 5 — Temporal Structure and Inventory

Build a complete inventory of processed ECOSTRESS files: one row per date with file path, year, month, day-of-year, and valid pixel coverage. Export as CSV for use in downstream analysis notebooks.

In [None]:
# ── Build temporal inventory ───────────────────────────────────────────────
clipped_files = sorted(OUT_CLIP_DIR.glob("ECOSTRESS_ET_*.tif"))
print(f"Building inventory from {len(clipped_files)} clipped files...")

inventory = []
for f in clipped_files:
    # Parse date from filename: ECOSTRESS_ET_YYYYMMDD.tif
    date_str = f.stem.split('_')[-1]  # 'YYYYMMDD'
    try:
        dt = datetime.strptime(date_str, "%Y%m%d")
    except ValueError:
        continue

    # Quick valid-pixel check
    try:
        with rasterio.open(f) as src:
            data = src.read(1)
            total  = data.size
            valid  = int(np.sum(~np.isnan(data.astype(float))))
            mean_et = float(np.nanmean(data)) if valid > 0 else np.nan
    except Exception:
        total, valid, mean_et = 0, 0, np.nan

    inventory.append({
        'date':          dt.date(),
        'year':          dt.year,
        'month':         dt.month,
        'doy':           dt.timetuple().tm_yday,
        'valid_pixels':  valid,
        'total_pixels':  total,
        'coverage_pct':  round(100 * valid / total, 1) if total > 0 else 0,
        'mean_et_mm_day': round(mean_et, 3),
        'path':          str(f),
    })

inv_df = pd.DataFrame(inventory).sort_values('date').reset_index(drop=True)

# Save inventory CSV
inv_path = PROJECT_ROOT / "data" / "processed" / "ECOSTRESS" / "ecostress_inventory.csv"
inv_df.to_csv(inv_path, index=False)

print(f"\nInventory saved to: {inv_path}")
print(f"  Total processed dates: {len(inv_df)}")
print(f"  Date range: {inv_df['date'].min()} to {inv_df['date'].max()}")
print(f"  Mean coverage: {inv_df['coverage_pct'].mean():.1f}%")
print(f"  Mean ET (Iowa daily): {inv_df['mean_et_mm_day'].mean():.3f} mm/day")
print()
print("Scenes per year:")
for yr, cnt in inv_df.groupby('year').size().items():
    print(f"  {yr}: {cnt} dates")

In [None]:
# ── Summary plots ──────────────────────────────────────────────────────────
fig, axes = plt.subplots(2, 1, figsize=(13, 8))

# Top: scene count per month across all years
monthly = inv_df.groupby(['year', 'month']).size().unstack(fill_value=0)
monthly = monthly.reindex(columns=range(1, 13), fill_value=0)
im = axes[0].imshow(monthly.values, aspect='auto', cmap='YlGn', vmin=0)
axes[0].set_xticks(range(12))
axes[0].set_xticklabels(['Jan','Feb','Mar','Apr','May','Jun',
                          'Jul','Aug','Sep','Oct','Nov','Dec'])
axes[0].set_yticks(range(len(monthly)))
axes[0].set_yticklabels(monthly.index)
for r, yr in enumerate(monthly.index):
    for c, mo in enumerate(range(1, 13)):
        v = monthly.loc[yr, mo]
        if v > 0:
            axes[0].text(c, r, str(v), ha='center', va='center', fontsize=8)
plt.colorbar(im, ax=axes[0], label='# dates')
axes[0].set_title('Processed ECOSTRESS Dates per Month (after QC + mosaicking)')

# Bottom: mean daily ET time series
for yr, grp in inv_df.groupby('year'):
    axes[1].plot(grp['doy'], grp['mean_et_mm_day'], '.', alpha=0.4,
                 markersize=3, label=str(yr))
axes[1].set_xlabel('Day of Year')
axes[1].set_ylabel('Mean ET (mm/day)')
axes[1].set_title('Iowa Mean Daily ET — Processed ECOSTRESS (2019–2023)')
axes[1].legend(title='Year', loc='upper left', ncol=5)
axes[1].grid(True, alpha=0.3)

plt.tight_layout()

fig_dir = PROJECT_ROOT / "figures" / "ecostress"
fig_dir.mkdir(parents=True, exist_ok=True)
plt.savefig(fig_dir / "ecostress_processing_summary.png", dpi=150, bbox_inches='tight')
plt.show()
print("Summary figure saved.")

---

## Pipeline Summary

| Output | Location |
|--------|----------|
| Cloud-masked, mosaicked daily TIFs (UTM) | `data/processed/ECOSTRESS/mosaics/` |
| Reprojected + clipped TIFs (WGS84, NLDAS grid) | `data/processed/ECOSTRESS/clipped/` |
| Processed date inventory CSV | `data/processed/ECOSTRESS/ecostress_inventory.csv` |
| Summary figure | `figures/ecostress/ecostress_processing_summary.png` |

**Next step:** Use `data/processed/ECOSTRESS/clipped/` in the irrigation signal analysis alongside NLDAS modeled ET to compute the PET – ET residual.