# HSAs → Weekly Climate Features with 20‑Day Lags (Daily‑Only Sources)


**Datasets**
- Precipitation: CHIRPS Daily
- Temperature, Dewpoint, Wind: ERA5‑Land Hourly → daily aggregates
- Evaporation proxy: ERA5‑Land Surface Latent Heat Flux → mm/day (hourly→daily)
- Soil Moisture layers (1–4): ERA5‑Land Hourly → daily aggregates
- Elevation: SRTM (static)

**Notes**
- Keys HSAs by `anchor_name`.
- Uses polygon‑by‑polygon exports to avoid geometry batching errors.
- TEST MODE allows running a small subset first.


## STEP 0.1 — Install and Initialize Earth Engine (with Cloud Project)

In [None]:

# @title STEP 0.1 — Install & Initialize Earth Engine (with Cloud Project)
!pip -q install earthengine-api geemap geopandas shapely pandas numpy tqdm

import ee
PROJECT = "ee-izaslavsky"  # <-- change if needed

try:
    ee.Initialize()
except Exception:
    ee.Authenticate()
    ee.Initialize(project=PROJECT)

print(f"✓ Earth Engine initialized with project: {PROJECT}")


## STEP 0.2 — Configuration (dates, scale, toggles, test mode)

In [None]:

# @title STEP 0.2 — Config
from datetime import date, timedelta

WEEK_START = "2022-06-27"
# WEEK_START = "2019-01-07"
WEEK_END   = "2024-01-29"
MAX_LAG_DAYS = 20

TEST_MODE = True
TEST_HSA_COUNT  = 5
TEST_WEEK_COUNT = 3

USE_CHIRPS        = True
USE_ERA5_HOURLY   = True
USE_ERA5_EVP      = True
INCLUDE_ELEVATION = True

COLL = {
    "CHIRPS_DAILY":     "UCSB-CHG/CHIRPS/DAILY",
    "ERA5_LAND_HOURLY": "ECMWF/ERA5_LAND/HOURLY",
    "SRTM":             "USGS/SRTMGL1_003",
}

BANDS = {
    "CHIRPS": {"precip": "precipitation"},
    "ERA5": {
        "t_mean": "temperature_2m",
        "td":     "dewpoint_temperature_2m",
        "u10":    "u_component_of_wind_10m",
        "v10":    "v_component_of_wind_10m",
        "lhf":    "surface_latent_heat_flux",
        "swvl1":  "volumetric_soil_water_layer_1",
        "swvl2":  "volumetric_soil_water_layer_2",
        "swvl3":  "volumetric_soil_water_layer_3",
        "swvl4":  "volumetric_soil_water_layer_4",
    },
}

SCALE = {"CHIRPS": 5550, "ERA5": 9000, "ELEV": 30}

print("✓ Config set")


## STEP 1.1 — Load HSA GeoJSON (must contain `anchor_name`)

In [None]:

# @title STEP 1.1 — Load HSA GeoJSON
import geopandas as gpd
from google.colab import files

GEOJSON_PATH = ""  # @param {type:"string"}

if not GEOJSON_PATH:
    print("Select your HSA GeoJSON…")
    uploaded = files.upload()
    if uploaded:
        GEOJSON_PATH = list(uploaded.keys())[0]

gdf = gpd.read_file(GEOJSON_PATH)
if gdf.crs is None or gdf.crs.to_epsg() != 4326:
    gdf = gdf.to_crs(4326)

id_col = "FacilityName"
assert id_col in gdf.columns, f"GeoJSON must include a '{id_col}' column."

gdf = gdf[~gdf.geometry.is_empty].copy()
print(f"✓ Loaded {len(gdf)} HSAs; using id_col = '{id_col}'")


## STEP 1.2 — Strict Shapely→EE geometry converter (rings only)

In [None]:
# ============================================
# STEP 1.2 — Build EE geometries from GeoDataFrame (server-side, robust)
# ============================================

import ee, geemap

id_col = "FacilityName"
assert id_col in gdf.columns, f"Missing '{id_col}' in GeoJSON."

# Make sure id is string (safer for Filters)
gdf[id_col] = gdf[id_col].astype(str)

# Convert entire GeoDataFrame to an EE FeatureCollection
# geodesic=False to avoid topology surprises; keeps properties including anchor_name
ee_fc_hsa = geemap.gdf_to_ee(gdf[[id_col, "geometry"]], geodesic=False)

# Quick server-side sanity: count & list a few ids
print("EE HSAs count:", ee_fc_hsa.size().getInfo())
print("First 5 anchor_name values:",
      ee_fc_hsa.limit(5).reduceColumns(ee.Reducer.toList(), [id_col]).get("list").getInfo())

# Accessor: fetch an EE Geometry for a given anchor_name directly from EE (no Python reconstruction)
def ee_geom_by_id(hid: str) -> ee.Geometry:
    f = ee_fc_hsa.filter(ee.Filter.eq(id_col, hid)).first()
    # Force early failure if not found or invalid
    g = ee.Algorithms.If(f, ee.Feature(f).geometry(), None)
    g = ee.Geometry(g)
    _ = g.type().getInfo()   # validate server-side now
    return g

# Rebuild HSA_LIST / HSA_BY_ID using the server-side features (no local GeoJSON construction)
HSA_LIST, HSA_BY_ID, HSA_FALLBACK_LOG = [], {}, []
ids = gdf[id_col].tolist()

for hid in ids:
    try:
        geom = ee_geom_by_id(hid)
        HSA_LIST.append({"id": hid, "geom": geom, "fallback": "ee_feature_geometry"})
        HSA_BY_ID[hid] = {"id": hid, "geom": geom, "fallback": "ee_feature_geometry"}
        HSA_FALLBACK_LOG.append((hid, "ee_feature_geometry"))
    except Exception as e:
        print(f"❌ {hid}: {e}")
        HSA_FALLBACK_LOG.append((hid, f"failed:{e}"))

print(f"HSAs converted: {len(HSA_LIST)}")
print("Examples:", HSA_FALLBACK_LOG[:5])

# OPTIONAL: one-polygon micro-test (CHIRPS weekly mean) to prove geometry works before exporting
_test_id = ids[0]
_test_week = "2023-01-02"  # rainy season week in Jordan
_g = HSA_BY_ID[_test_id]["geom"]
start = ee.Date(_test_week)
img = (ee.ImageCollection("UCSB-CHG/CHIRPS/DAILY")
       .filterDate(start, start.advance(7, 'day'))
       .select("precipitation")
       .sum())
print(f"CHIRPS sum (mm) for '{_test_id}' week {_test_week}:",
      img.reduceRegion(ee.Reducer.mean(), _g, 5550, maxPixels=1e9).getInfo())


## STEP 2.1 — Compute Monday‑anchored weeks

In [None]:
# STEP 2.1 — Build anchor_mondays (all Mondays, inclusive)
from datetime import date, timedelta

START_ISO = '2022-06-27'  # Monday
END_ISO   = '2024-01-29'  # Monday

start = date.fromisoformat(START_ISO)
end   = date.fromisoformat(END_ISO)

# sanity: both endpoints must be Monday (Monday=0)
assert start.weekday() == 0 and end.weekday() == 0, "Start/End must be Mondays"

anchor_mondays = []
d = start
while d <= end:
    anchor_mondays.append(d.isoformat())
    d += timedelta(days=7)

print(f"anchor_mondays: {len(anchor_mondays)} weeks ({anchor_mondays[0]} → {anchor_mondays[-1]})")


## STEP 3 — Feature families (weekly + d‑1..d‑20)

In [None]:
def family_precip_week_and_lags_IMPROVED(fc, week_iso):
    """Enhanced with cumulative windows"""

    if not USE_CHIRPS:
        return fc
    start = ee.Date(week_iso)
    week_str = ee.Date(week_iso).format('YYYY-MM-dd')

    ic = (ee.ImageCollection(COLL["CHIRPS_DAILY"])
          .filterDate(start, start.advance(7,'day'))
          .select("precipitation")
          .map(lambda im: im.unmask(0)))

    # EXISTING: Weekly features
    weekly = ee.Image().addBands([
        ic.mean().rename('P_mean_week'),
        ic.sum().rename('P_total_week'),
        ic.map(lambda im: im.gt(1.0)).mean().rename('wetday_frac_week'),
        ic.map(lambda im: im.gt(20.0)).sum().rename('heavy_days_week'),
        ic.reduce(ee.Reducer.percentile([95])).rename(['P95_week']),
        ic.reduce(ee.Reducer.max()).rename('P_max_day_week'),  # NEW: Max daily
    ])

    # NEW: Cumulative lag windows (reduce multicollinearity)
    lag_windows = []

    # 1-week lags (3 windows)
    for w in [1, 2, 3]:
        d0 = start.advance(-w*7, 'day')
        d1 = d0.advance(7, 'day')
        img = (ee.ImageCollection(COLL["CHIRPS_DAILY"])
               .filterDate(d0, d1)
               .select("precipitation")
               .map(lambda im: im.unmask(0)))

        lag_windows.append(img.sum().rename(f'P_sum_lag_w-{w}'))
        lag_windows.append(img.mean().rename(f'P_mean_lag_w-{w}'))
        lag_windows.append(img.map(lambda im: im.gt(10)).sum().rename(f'P_heavy_days_lag_w-{w}'))

    # OPTIONAL: Keep key daily lags (reduce from 20 to 7 for parsimony)
    daily_lags = []
    for k in [1, 2, 3, 5, 7, 10, 14]:  # Selected days only
        d0 = start.advance(-k,'day')
        di = (ee.ImageCollection(COLL["CHIRPS_DAILY"])
              .filterDate(d0, d0.advance(1,'day'))
              .select("precipitation")
              .map(lambda im: im.unmask(0))
              .mean()
              .rename(f'P_d-{k}'))
        daily_lags.append(di)

    # Combine
    img = weekly.addBands(ee.ImageCollection(lag_windows + daily_lags).toBands())

    def reduce_one(f):
        r = img.reduceRegion(ee.Reducer.mean(), f.geometry(), SCALE["CHIRPS"], maxPixels=1e9)
        return f.set(r).set({'week_start': week_str})

    return fc.map(reduce_one)


In [None]:
def _daily_td_w_IMPROVED(date_iso):
    """Add heat index and stress metrics"""
    d0 = ee.Date(date_iso); d1 = d0.advance(1,'day')
    day = ee.ImageCollection(COLL["ERA5_LAND_HOURLY"]).filterDate(d0,d1)

    t  = day.select("temperature_2m")
    td = day.select("dewpoint_temperature_2m")
    u  = day.select("u_component_of_wind_10m")
    v  = day.select("v_component_of_wind_10m")

    # Existing
    t_mean = t.mean().subtract(273.15).rename('T_mean_C')
    t_min  = t.min().subtract(273.15).rename('T_min_C')
    t_max  = t.max().subtract(273.15).rename('T_max_C')
    td_c   = td.mean().subtract(273.15).rename('Td_C')
    wspd   = u.mean().hypot(v.mean()).rename('wind_speed_ms')

    # NEW: Diurnal temperature range
    dtr = t.max().subtract(t.min()).rename('DTR_C')

    # NEW: Heat stress hours
    t_celsius = t.subtract(273.15)
    hours_above_30 = t_celsius.map(lambda img: img.gt(30)).sum().rename('hours_above_30C')
    hours_above_35 = t_celsius.map(lambda img: img.gt(35)).sum().rename('hours_above_35C')

    # NEW: Simplified heat index (approximation)
    # HI ≈ T + 0.5555 * (e - 10), where e = vapor pressure from dewpoint
    # Simplified: HI ≈ T_mean + 0.4 * (Td - T_mean)  for quick estimate
    t_mean_k = t.mean()
    td_mean_k = td.mean()
    heat_index_approx = t_mean_k.add(td_mean_k.subtract(t_mean_k).multiply(0.4)).subtract(273.15).rename('heat_index_C')

    return ee.Image().addBands([t_mean, t_min, t_max, td_c, wspd, dtr,
                                 hours_above_30, hours_above_35, heat_index_approx])

def family_temp_dewpoint_wind_week_and_lags(fc, week_iso):
    if not USE_ERA5_HOURLY:
        return fc
    start = ee.Date(week_iso)
    week_str = ee.Date(week_iso).format('YYYY-MM-dd')

    days = ee.List.sequence(0,6)
    weekly = ee.ImageCollection(days.map(lambda i: _daily_td_w(ee.Date(start).advance(i,'day')))).mean() \
        .rename(['T_mean_week_C','T_min_week_C','T_max_week_C','Td_week_C','wind_speed_week_ms'])

    lags = []
    for k in [1, 2, 3, 5, 7, 10, 14]:
        d = start.advance(-k,'day')
        img = _daily_td_w(d.format('YYYY-MM-dd')).select(
            ['T_mean_C','T_min_C','T_max_C','Td_C','wind_speed_ms'],
            [f'T_mean_d-{k}_C', f'T_min_d-{k}_C', f'T_max_d-{k}_C', f'Td_d-{k}_C', f'wind_speed_d-{k}_ms']
        )
        lags.append(img)

    full = weekly.addBands(ee.ImageCollection(lags).toBands())

    def reduce_one(f):
        r = full.reduceRegion(ee.Reducer.mean(), f.geometry(), SCALE["ERA5"], maxPixels=1e9)
        return f.set(r).set({'week_start': week_str})

    return fc.map(reduce_one)


In [None]:
def family_water_balance_week(fc, week_iso):
    """NEW: Combined water availability metric"""

    start = ee.Date(week_iso)
    week_str = ee.Date(week_iso).format('YYYY-MM-dd')

    # Precipitation
    precip_week = (ee.ImageCollection(COLL["CHIRPS_DAILY"])
                   .filterDate(start, start.advance(7,'day'))
                   .select("precipitation")
                   .sum())

    # Evaporation (computed as in existing code)
    def _daily_evap(date_iso):
        d0 = ee.Date(date_iso); d1 = d0.advance(1,'day')
        lhf = (ee.ImageCollection(COLL["ERA5_LAND_HOURLY"])
               .filterDate(d0,d1)
               .select("surface_latent_heat_flux").mean())
        SEC_PER_DAY = ee.Number(86400.0)
        LAMBDA = ee.Number(2.45e6)
        W_TO_MM_PER_DAY = SEC_PER_DAY.divide(LAMBDA)
        return lhf.multiply(W_TO_MM_PER_DAY)

    days = ee.List.sequence(0,6)
    evap_week = ee.ImageCollection(days.map(lambda i: _daily_evap(ee.Date(start).advance(i,'day')))).sum()

    # Water balance
    water_deficit = evap_week.subtract(precip_week).rename('water_deficit_mm_week')

    def reduce_one(f):
        r = water_deficit.reduceRegion(ee.Reducer.mean(), f.geometry(), SCALE["ERA5"], maxPixels=1e9)
        return f.set(r).set({'week_start': week_str})

    return fc.map(reduce_one)

In [None]:
def _daily_evap(date_iso):
    d0 = ee.Date(date_iso); d1 = d0.advance(1,'day')
    lhf = (ee.ImageCollection(COLL["ERA5_LAND_HOURLY"])
           .filterDate(d0,d1)
           .select("surface_latent_heat_flux").mean())
    SEC_PER_DAY = ee.Number(86400.0)
    LAMBDA = ee.Number(2.45e6)
    W_TO_MM_PER_DAY = SEC_PER_DAY.divide(LAMBDA)
    return lhf.multiply(W_TO_MM_PER_DAY).rename('E_mm_day')

def family_evap_week_and_lags(fc, week_iso):
    if not USE_ERA5_EVP:
        return fc
    start = ee.Date(week_iso)
    week_str = ee.Date(week_iso).format('YYYY-MM-dd')

    days = ee.List.sequence(0,6)
    weekly = ee.ImageCollection(days.map(lambda i: _daily_evap(ee.Date(start).advance(i,'day')))).mean() \
        .rename('E_week_mm_per_day')

    lags = []
    for k in [1, 2, 3, 5, 7, 10, 14]:
        d = start.advance(-k,'day')
        lags.append(_daily_evap(d.format('YYYY-MM-dd')).rename(f'E_d-{k}_mm_per_day'))

    full = weekly.addBands(ee.ImageCollection(lags).toBands())

    def reduce_one(f):
        r = full.reduceRegion(ee.Reducer.mean(), f.geometry(), SCALE["ERA5"], maxPixels=1e9)
        return f.set(r).set({'week_start': week_str})

    return fc.map(reduce_one)


In [None]:
def _daily_sm(date_iso):
    d0 = ee.Date(date_iso); d1 = d0.advance(1,'day')
    day = ee.ImageCollection(COLL["ERA5_LAND_HOURLY"]).filterDate(d0,d1)
    sm1 = day.select("volumetric_soil_water_layer_1").mean().rename('SM1')
    sm2 = day.select("volumetric_soil_water_layer_2").mean().rename('SM2')
    sm3 = day.select("volumetric_soil_water_layer_3").mean().rename('SM3')
    sm4 = day.select("volumetric_soil_water_layer_4").mean().rename('SM4')
    return ee.Image().addBands([sm1, sm2, sm3, sm4]).select(['SM1','SM2','SM3','SM4'])

def family_soilmoist_week_and_lags(fc, week_iso):
    if not USE_ERA5_HOURLY:
        return fc
    start = ee.Date(week_iso)
    week_str = ee.Date(week_iso).format('YYYY-MM-dd')

    days = ee.List.sequence(0,6)
    weekly = ee.ImageCollection(days.map(lambda i: _daily_sm(ee.Date(start).advance(i,'day')))).mean() \
        .rename(['SM1_week','SM2_week','SM3_week','SM4_week'])

    lags = []
    for k in [1, 2, 3, 5, 7, 10, 14]:
        d = start.advance(-k,'day')
        img = _daily_sm(d.format('YYYY-MM-dd')).rename([
            f'SM1_d-{k}', f'SM2_d-{k}', f'SM3_d-{k}', f'SM4_d-{k}'
        ])
        lags.append(img)

    full = weekly.addBands(ee.ImageCollection(lags).toBands())

    def reduce_one(f):
        r = full.reduceRegion(ee.Reducer.mean(), f.geometry(), SCALE["ERA5"], maxPixels=1e9)
        return f.set(r).set({'week_start': week_str})

    return fc.map(reduce_one)


In [None]:
def family_elevation(fc, week_iso_str=None):
    if not INCLUDE_ELEVATION:
        return fc
    elev = ee.Image(COLL["SRTM"]).select('elevation').rename('elevation_m')
    def reduce_one(f):
        r = elev.reduceRegion(ee.Reducer.mean(), f.geometry(), SCALE["ELEV"], maxPixels=1e9)
        out = {'elevation_m': r.get('elevation_m')}
        if week_iso_str is not None:
            out['week_start'] = week_iso_str
        return f.set(out)
    return fc.map(reduce_one)


## STEP 4 — Map preview

In [None]:

# @title STEP 4 — Map
import geemap, ee
wire = ee.FeatureCollection([ee.Feature(p['geom'], {'id': p['id']}) for p in HSA_LIST])
m = geemap.Map(height=520)
m.addLayer(wire.style(color='FF0000', fillColor='00000000', width=2), {}, 'HSAs')
wk0 = ee.Date(anchor_mondays[0])
chirps_wk0 = (ee.ImageCollection(COLL["CHIRPS_DAILY"])
              .filterDate(wk0, wk0.advance(7,'day'))
              .select("precipitation").sum())
m.addLayer(chirps_wk0, {'min':0, 'max':50}, f'CHIRPS sum {wk0.format("YYYY-MM-dd").getInfo()}')
m


## STEP 5 — Export helpers and per‑polygon driver

In [None]:
  import time
  # ============================================
  # STEP 5 — Robust driver: compute with cleaned geometry, export WITHOUT geometry
  # ============================================

  import ee

  # --- tiny, safe clean-up params (meters)
  SIMPLIFY_M = 10
  CLEAN_BUFFER_M = 1
  MAX_ERROR_M = 10
  FALLBACK_BOX_M = 250

  def _force_planar(g: ee.Geometry) -> ee.Geometry:
      t = ee.String(g.type())
      coords = g.coordinates()
      poly = ee.Algorithms.If(t.equals('Polygon'), ee.Geometry.Polygon(coords, None, False),
  ee.Algorithms.If(t.equals('MultiPolygon'), ee.Geometry.MultiPolygon(coords, None, False), g))
      return ee.Geometry(poly)

  def robust_ee_geom(hid: str) -> ee.Geometry:
      base = ee_geom_by_id(hid)
      g0 = _force_planar(base)
      g1 = g0.simplify(SIMPLIFY_M)
      if CLEAN_BUFFER_M and CLEAN_BUFFER_M != 0:
          g1 = g1.buffer(CLEAN_BUFFER_M, MAX_ERROR_M).buffer(-CLEAN_BUFFER_M, MAX_ERROR_M)
      area_ok = ee.Number(g1.area(MAX_ERROR_M)).gt(0)
      def _fallback():
          c = g0.centroid(MAX_ERROR_M)
          return c.buffer(FALLBACK_BOX_M, MAX_ERROR_M).bounds()
      g2 = ee.Geometry(ee.Algorithms.If(area_ok, g1, _fallback()))
      _ = g2.type().getInfo()
      return g2

  def _feat(props: dict):
      return ee.Feature(None, props)

  def _weeks_fc_map(weeks_list, fn_make_props):
      w = ee.List(weeks_list)
      def _mk(d):
          week_str = ee.Date(d).format('YYYY-MM-dd')
          props = fn_make_props(week_str)
          props_dict = ee.Dictionary(props) if props else ee.Dictionary({})
          return ee.Feature(None, props_dict)
      return ee.FeatureCollection(w.map(_mk))


  def _precip_props(geom: ee.Geometry, week_iso: ee.String):
      if not USE_CHIRPS:
          return {}
      start = ee.Date(week_iso)
      ic = ee.ImageCollection(COLL["CHIRPS_DAILY"]).filterDate(start,
  start.advance(7,'day')).select("precipitation").map(lambda im: im.unmask(0))
      weekly = ee.Image().addBands([ic.mean().rename('P_mean_week'), ic.sum().rename('P_total_week'), ic.map(lambda
  im: im.gt(1.0)).mean().rename('wetday_frac_week'), ic.map(lambda im: im.gt(20.0)).sum().rename('heavy_days_week'),
   ic.reduce(ee.Reducer.percentile([95])).rename(['P95_week']),
  ic.reduce(ee.Reducer.max()).rename('P_max_day_week')])
      img = weekly
      for w in [1, 2, 3]:
          d0 = start.advance(-w*7, 'day')
          d1 = d0.advance(7, 'day')
          ic_lag = ee.ImageCollection(COLL["CHIRPS_DAILY"]).filterDate(d0, d1).select("precipitation").map(lambda
  im: im.unmask(0))
          img = img.addBands(ic_lag.sum().rename(f'P_sum_lag_w-{w}'))
          img = img.addBands(ic_lag.mean().rename(f'P_mean_lag_w-{w}'))
          img = img.addBands(ic_lag.map(lambda im: im.gt(10)).sum().rename(f'P_heavy_days_lag_w-{w}'))
      for k in [1, 2, 3, 5, 7, 10, 14]:
          d0 = start.advance(-k,'day')
          di = ee.ImageCollection(COLL["CHIRPS_DAILY"]).filterDate(d0,
  d0.advance(1,'day')).select("precipitation").map(lambda im: im.unmask(0)).mean().rename(f'P_d-{k}')
          img = img.addBands(di)
      r = img.reduceRegion(ee.Reducer.mean(), geom, SCALE["CHIRPS"], maxPixels=1e9)
      props = {'week_start': week_iso, 'P_mean_week': r.get('P_mean_week'), 'P_total_week': r.get('P_total_week'),
  'wetday_frac_week': r.get('wetday_frac_week'), 'heavy_days_week': r.get('heavy_days_week'), 'P95_week':
  r.get('P95_week'), 'P_max_day_week': r.get('P_max_day_week')}
      for w in [1, 2, 3]:
          props[f'P_sum_lag_w-{w}'] = r.get(f'P_sum_lag_w-{w}')
          props[f'P_mean_lag_w-{w}'] = r.get(f'P_mean_lag_w-{w}')
          props[f'P_heavy_days_lag_w-{w}'] = r.get(f'P_heavy_days_lag_w-{w}')
      for k in [1, 2, 3, 5, 7, 10, 14]:
          props[f'P_d-{k}'] = r.get(f'P_d-{k}')
      return props



  def _tdw_props(geom: ee.Geometry, week_iso: ee.String):
      if not USE_ERA5_HOURLY:
          return {}
      def _daily_td_w_improved(date_iso):
          d0 = ee.Date(date_iso)
          d1 = d0.advance(1,'day')
          day = ee.ImageCollection(COLL["ERA5_LAND_HOURLY"]).filterDate(d0,d1)
          t = day.select("temperature_2m")
          td = day.select("dewpoint_temperature_2m")
          u = day.select("u_component_of_wind_10m")
          v = day.select("v_component_of_wind_10m")
          t_mean = t.mean().subtract(273.15).rename('T_mean_C')
          t_min = t.min().subtract(273.15).rename('T_min_C')
          t_max = t.max().subtract(273.15).rename('T_max_C')
          td_c = td.mean().subtract(273.15).rename('Td_C')
          wspd = u.mean().hypot(v.mean()).rename('wind_speed_ms')
          dtr = t.max().subtract(t.min()).rename('DTR_C')
          hours_above_30 = t.map(lambda img: img.subtract(273.15).gt(30)).sum().rename('hours_above_30C')
          hours_above_35 = t.map(lambda img: img.subtract(273.15).gt(35)).sum().rename('hours_above_35C')
          t_mean_k = t.mean()
          td_mean_k = td.mean()
          heat_index = t_mean_k.add(td_mean_k.subtract(t_mean_k).multiply(0.4)).subtract(273.15).rename('heat_index_C')
          return t_mean.addBands([t_min, t_max, td_c, wspd, dtr, hours_above_30, hours_above_35, heat_index])

      start = ee.Date(week_iso)
      days = ee.List.sequence(0,6)
      weekly = ee.ImageCollection(days.map(lambda i: _daily_td_w_improved(ee.Date(start).advance(i,'day')))).mean().rename(['T_mean_week_C','T_min_week_C','T_max_week_C','Td_week_C','wind_speed_week_ms','DTR_week_C','hours_above_30C_week','hours_above_35C_week','heat_index_week_C'])
      img = weekly
      for k in [1, 2, 3, 5, 7, 10, 14]:
          d = start.advance(-k,'day')
          lag_img = _daily_td_w_improved(d.format('YYYY-MM-dd')).rename([f'T_mean_d-{k}_C', f'T_min_d-{k}_C',
  f'T_max_d-{k}_C', f'Td_d-{k}_C', f'wind_speed_d-{k}_ms', f'DTR_d-{k}_C', f'hours_above_30C_d-{k}',
  f'hours_above_35C_d-{k}', f'heat_index_d-{k}_C'])
          img = img.addBands(lag_img)
      r = img.reduceRegion(ee.Reducer.mean(), geom, SCALE["ERA5"], maxPixels=1e9)
      props = {'week_start': week_iso, 'T_mean_week_C': r.get('T_mean_week_C'), 'T_min_week_C':
  r.get('T_min_week_C'), 'T_max_week_C': r.get('T_max_week_C'), 'Td_week_C': r.get('Td_week_C'),
  'wind_speed_week_ms': r.get('wind_speed_week_ms'), 'DTR_week_C': r.get('DTR_week_C'), 'hours_above_30C_week':
  r.get('hours_above_30C_week'), 'hours_above_35C_week': r.get('hours_above_35C_week'), 'heat_index_week_C':
  r.get('heat_index_week_C')}
      for k in [1, 2, 3, 5, 7, 10, 14]:
          props[f'T_mean_d-{k}_C'] = r.get(f'T_mean_d-{k}_C')
          props[f'T_min_d-{k}_C'] = r.get(f'T_min_d-{k}_C')
          props[f'T_max_d-{k}_C'] = r.get(f'T_max_d-{k}_C')
          props[f'Td_d-{k}_C'] = r.get(f'Td_d-{k}_C')
          props[f'wind_speed_d-{k}_ms'] = r.get(f'wind_speed_d-{k}_ms')
          props[f'DTR_d-{k}_C'] = r.get(f'DTR_d-{k}_C')
          props[f'hours_above_30C_d-{k}'] = r.get(f'hours_above_30C_d-{k}')
          props[f'hours_above_35C_d-{k}'] = r.get(f'hours_above_35C_d-{k}')
          props[f'heat_index_d-{k}_C'] = r.get(f'heat_index_d-{k}_C')
      return props

  def _evap_props(geom: ee.Geometry, week_iso: ee.String):
      if not USE_ERA5_EVP:
          return {}
      def _daily_evap(date_iso):
          d0 = ee.Date(date_iso)
          d1 = d0.advance(1,'day')
          lhf =  ee.ImageCollection(COLL["ERA5_LAND_HOURLY"]).filterDate(d0,d1).select("surface_latent_heat_flux").mean()
          SEC_PER_DAY = ee.Number(86400.0)
          LAMBDA = ee.Number(2.45e6)
          W_TO_MM_PER_DAY = SEC_PER_DAY.divide(LAMBDA)
          return lhf.multiply(W_TO_MM_PER_DAY).rename('E_mm_day')
      start = ee.Date(week_iso)
      days = ee.List.sequence(0,6)
      weekly = ee.ImageCollection(days.map(lambda i:
  _daily_evap(ee.Date(start).advance(i,'day')))).mean().rename('E_week_mm_per_day')
      img = weekly
      for k in [1, 2, 3, 5, 7, 10, 14]:
          d = start.advance(-k,'day')
          lag_img = _daily_evap(d.format('YYYY-MM-dd')).rename(f'E_d-{k}_mm_per_day')
          img = img.addBands(lag_img)
      r = img.reduceRegion(ee.Reducer.mean(), geom, SCALE["ERA5"], maxPixels=1e9)
      props = {'week_start': week_iso, 'E_week_mm_per_day': r.get('E_week_mm_per_day')}
      for k in [1, 2, 3, 5, 7, 10, 14]:
          props[f'E_d-{k}_mm_per_day'] = r.get(f'E_d-{k}_mm_per_day')
      return props


  def _soil_props(geom: ee.Geometry, week_iso: ee.String):
      if not USE_ERA5_HOURLY:
          return {}
      def _daily_sm(date_iso):
          d0 = ee.Date(date_iso)
          d1 = d0.advance(1,'day')
          day = ee.ImageCollection(COLL["ERA5_LAND_HOURLY"]).filterDate(d0,d1)
          sm1 = day.select("volumetric_soil_water_layer_1").mean().rename('SM1')
          sm2 = day.select("volumetric_soil_water_layer_2").mean().rename('SM2')
          sm3 = day.select("volumetric_soil_water_layer_3").mean().rename('SM3')
          sm4 = day.select("volumetric_soil_water_layer_4").mean().rename('SM4')
          return sm1.addBands([sm2, sm3, sm4])
      start = ee.Date(week_iso)
      days = ee.List.sequence(0,6)
      weekly = ee.ImageCollection(days.map(lambda i:
  _daily_sm(ee.Date(start).advance(i,'day')))).mean().rename(['SM1_week','SM2_week','SM3_week','SM4_week'])
      img = weekly
      for k in [1, 2, 3, 5, 7, 10, 14]:
          d = start.advance(-k,'day')
          lag_img = _daily_sm(d.format('YYYY-MM-dd')).rename([f'SM1_d-{k}', f'SM2_d-{k}', f'SM3_d-{k}',
  f'SM4_d-{k}'])
          img = img.addBands(lag_img)
      r = img.reduceRegion(ee.Reducer.mean(), geom, SCALE["ERA5"], maxPixels=1e9)
      props = {'week_start': week_iso, 'SM1_week': r.get('SM1_week'), 'SM2_week': r.get('SM2_week'), 'SM3_week':
  r.get('SM3_week'), 'SM4_week': r.get('SM4_week')}
      for k in [1, 2, 3, 5, 7, 10, 14]:
          props[f'SM1_d-{k}'] = r.get(f'SM1_d-{k}')
          props[f'SM2_d-{k}'] = r.get(f'SM2_d-{k}')
          props[f'SM3_d-{k}'] = r.get(f'SM3_d-{k}')
          props[f'SM4_d-{k}'] = r.get(f'SM4_d-{k}')
      return props


  def _water_props(geom: ee.Geometry, week_iso: ee.String):
      if not (USE_CHIRPS and USE_ERA5_EVP):
          return {}
      start = ee.Date(week_iso)
      precip_week = ee.ImageCollection(COLL["CHIRPS_DAILY"]).filterDate(start,
  start.advance(7,'day')).select("precipitation").map(lambda im: im.unmask(0)).sum()
      def _daily_evap(date_iso):
          d0 = ee.Date(date_iso)
          d1 = d0.advance(1,'day')
          lhf =  ee.ImageCollection(COLL["ERA5_LAND_HOURLY"]).filterDate(d0,d1).select("surface_latent_heat_flux").mean()
          SEC_PER_DAY = ee.Number(86400.0)
          LAMBDA = ee.Number(2.45e6)
          W_TO_MM_PER_DAY = SEC_PER_DAY.divide(LAMBDA)
          return lhf.multiply(W_TO_MM_PER_DAY)
      days = ee.List.sequence(0,6)
      evap_week = ee.ImageCollection(days.map(lambda i: _daily_evap(ee.Date(start).advance(i,'day')))).sum()
      water_deficit = evap_week.subtract(precip_week).rename('water_deficit_mm_week')
      r = water_deficit.reduceRegion(ee.Reducer.mean(), geom, SCALE["ERA5"], maxPixels=1e9)
      return {'week_start': week_iso, 'water_deficit_mm_week': r.get('water_deficit_mm_week')}

  def _elev_props(geom: ee.Geometry, week_iso: ee.String):
      if not INCLUDE_ELEVATION:
          return {}
      elev = ee.Image(COLL["SRTM"]).select('elevation').rename('elevation_m')
      r = elev.reduceRegion(ee.Reducer.mean(), geom, SCALE["ELEV"], maxPixels=1e9)
      return {'week_start': week_iso, 'elevation_m': r.get('elevation_m')}

  def _export_fc_to_drive(fc, description, prefix):
      task = ee.batch.Export.table.toDrive(collection=fc, description=description, fileNamePrefix=prefix,
  fileFormat='CSV')
      task.start()
      print(f"✓ Export started: {description} → {prefix}.csv")

  def export_one_polygon_by_family(hsa_id: str, weeks_list):
      geom = robust_ee_geom(hsa_id)
      try:
          if USE_CHIRPS:
              fc_p = _weeks_fc_map(weeks_list, lambda ws: {id_col: hsa_id, **_precip_props(geom, ws)})
              _export_fc_to_drive(fc_p, f"precip_{hsa_id}", f"HSA_{hsa_id}_precip_lags")
      except Exception as e:
          print(f"⚠️  Precip export failed for '{hsa_id}': {e}")
      try:
          if USE_ERA5_HOURLY:
              fc_td_w = _weeks_fc_map(weeks_list, lambda ws: {id_col: hsa_id, **_tdw_props(geom, ws)})
              _export_fc_to_drive(fc_td_w, f"tempdew_wind_{hsa_id}", f"HSA_{hsa_id}_tempdew_wind_lags")
      except Exception as e:
          print(f"⚠️  Temp/dew/wind export failed for '{hsa_id}': {e}")
      try:
          if USE_ERA5_EVP:
              fc_evp = _weeks_fc_map(weeks_list, lambda ws: {id_col: hsa_id, **_evap_props(geom, ws)})
              _export_fc_to_drive(fc_evp, f"evap_{hsa_id}", f"HSA_{hsa_id}_evapERA5_lags")
      except Exception as e:
          print(f"⚠️  Evap export failed for '{hsa_id}': {e}")
      try:
          if USE_ERA5_HOURLY:
              fc_sm = _weeks_fc_map(weeks_list, lambda ws: {id_col: hsa_id, **_soil_props(geom, ws)})
              _export_fc_to_drive(fc_sm, f"soilmoist_{hsa_id}", f"HSA_{hsa_id}_soilmoistERA5_lags")
      except Exception as e:
          print(f"⚠️  Soil moisture export failed for '{hsa_id}': {e}")
      try:
          if USE_CHIRPS and USE_ERA5_EVP:
              fc_water = _weeks_fc_map(weeks_list, lambda ws: {id_col: hsa_id, **_water_props(geom, ws)})
              _export_fc_to_drive(fc_water, f"water_balance_{hsa_id}", f"HSA_{hsa_id}_water_balance")
      except Exception as e:
          print(f"⚠️  Water balance export failed for '{hsa_id}': {e}")
      try:
          if INCLUDE_ELEVATION:
              fc_el = _weeks_fc_map(weeks_list, lambda ws: {id_col: hsa_id, **_elev_props(geom, ws)})
              _export_fc_to_drive(fc_el, f"elevation_{hsa_id}", f"HSA_{hsa_id}_elevation_by_week")
      except Exception as e:
          print(f"⚠️  Elevation export failed for '{hsa_id}': {e}")

  def run_exports_per_polygon():
      id_list = ee_fc_hsa.reduceColumns(ee.Reducer.toList(), [id_col]).get('list').getInfo()
      if TEST_MODE:
          weeks = anchor_mondays[:TEST_WEEK_COUNT]
          ids = id_list[:TEST_HSA_COUNT]
          print(f"TEST RUN: {len(ids)} polygons × {len(weeks)} weeks")
      else:
          weeks = anchor_mondays
          ids = id_list
          print(f"FULL RUN: {len(ids)} polygons × {len(weeks)} weeks")
      for hid in ids:
          try:
              time.sleep(60)
              export_one_polygon_by_family(hid, weeks)
          except Exception as e:
              print(f"⚠️  Skipping '{hid}': {e}")

  print("Driver ready. Call: run_exports_per_polygon()")


## STEP 6 — Processing

In [None]:
TEST_MODE = False
run_exports_per_polygon()

## STEP 7 — Optional status checks

In [None]:

  # Check status of all running Earth Engine tasks
  import ee

  tasks = ee.batch.Task.list()

  print("EARTH ENGINE TASK STATUS")
  print("="*80)

  # Filter to recent tasks (today's exports)
  from datetime import datetime, timedelta
  today = datetime.now()

  active_tasks = []
  completed_tasks = []
  failed_tasks = []

  for task in tasks[:50]:  # Check last 50 tasks
      state = task.status()['state']
      description = task.status()['description']

      if state == 'RUNNING':
          active_tasks.append(description)
      elif state == 'COMPLETED':
          completed_tasks.append(description)
      elif state in ['FAILED', 'CANCELLED']:
          failed_tasks.append(description)

  print(f"\nACTIVE (currently running): {len(active_tasks)}")
  for desc in active_tasks[:10]:  # Show first 10
      print(f"  ⏳ {desc}")
  if len(active_tasks) > 10:
      print(f"  ... and {len(active_tasks) - 10} more")

  print(f"\nCOMPLETED: {len(completed_tasks)}")
  print(f"FAILED: {len(failed_tasks)}")

  if failed_tasks:
      print("\nFailed tasks:")
      for desc in failed_tasks:
          print(f"  ✗ {desc}")

  print(f"\n{'='*80}")
  if len(active_tasks) == 0:
      print("✓ All tasks finished! (or none were started)")
  else:
      print(f"⏳ Still running: {len(active_tasks)} tasks")
      print("Wait and check again in a few minutes")

In [None]:
  # List all files in Google Drive export folder
  from google.colab import drive
  import os

  # Mount Google Drive if not already mounted
  if not os.path.exists('/content/drive'):
      drive.mount('/content/drive')

  # Check the export folder
  export_folder = '/content/drive/MyDrive/'  # Adjust if you specified a different folder

  # List all HSA CSV files
  import glob
  csv_files = glob.glob(f'{export_folder}/HSA_*.csv')

  print(f"GOOGLE DRIVE EXPORT STATUS")
  print("="*80)
  print(f"Total CSV files found: {len(csv_files)}")
  print(f"Expected: 108 files (18 HSAs × 6 types)")

  # Group by HSA
  from collections import defaultdict
  hsa_files = defaultdict(list)

  for f in csv_files:
      basename = os.path.basename(f)
      # Extract HSA name (everything between HSA_ and the last _)
      parts = basename.replace('HSA_', '').rsplit('_', 1)
      if len(parts) == 2:
          hsa_name = parts[0]
          hsa_files[hsa_name].append(basename)

  print(f"\nHSAs with files: {len(hsa_files)}/18")

  # Check completeness
  for hsa, files in sorted(hsa_files.items()):
      if len(files) == 6:
          print(f"  ✓ {hsa}: {len(files)}/6 files")
      else:
          print(f"  ⚠️  {hsa}: {len(files)}/6 files (INCOMPLETE)")

