# Step 4 — Turn imagery into anomaly signals

This notebook:
- builds a seasonal baseline per corridor segment (break buffer)
- computes z-score anomalies for NDVI / NDWI (plus NDRE / RENDVI)
- aggregates anomalies per segment (max, mean, persistence)

Input: CSV exports from GEE in `data/imagery/sentinel2_time_series/`
Output: `data/features/anomaly_features.parquet`


In [None]:
from pathlib import Path

try:
    import pandas as pd
    import numpy as np
except ImportError as exc:
    raise SystemExit('Missing deps. Run: pip install pandas pyarrow') from exc


In [None]:
# Load all exported CSVs from GEE
data_dir = Path('data/imagery/sentinel2_time_series')
csvs = sorted(data_dir.glob('*.csv'))
if not csvs:
    raise SystemExit('No CSVs found. Download the export(s) to data/imagery/sentinel2_time_series/.')

df = pd.concat([pd.read_csv(p) for p in csvs], ignore_index=True)

# Expect columns: break_id, break_date, month (YYYY-MM), kind, NDVI, NDWI, NDRE, RENDVI
df['month'] = pd.to_datetime(df['month'] + '-01')
df['month_num'] = df['month'].dt.month
df['month_idx'] = df['month'].dt.year * 12 + df['month'].dt.month

metrics = ['NDVI', 'NDWI', 'NDRE', 'RENDVI']
print('Rows:', len(df))
df.head()


In [None]:
# Build seasonal baseline stats using baseline rows (same-month, prior years)
baseline = df[df['kind'] == 'baseline'].copy()
if baseline.empty:
    raise SystemExit('No baseline rows found. Ensure your export includes baseline samples.')

agg = {}
for m in metrics:
    agg[m] = ['mean', 'std']

baseline_stats = baseline.groupby(['break_id', 'month_num']).agg(agg)
baseline_stats.columns = [f'{col}_{stat}' for col, stat in baseline_stats.columns]
baseline_stats = baseline_stats.reset_index()

baseline_stats.head()


In [None]:
# Compute z-score anomalies for window rows (6 months before -> 2 months after)
window = df[df['kind'] == 'window'].copy()
if window.empty:
    raise SystemExit('No window rows found. Ensure your export includes the break window samples.')

window = window.merge(baseline_stats, on=['break_id', 'month_num'], how='left')

for m in metrics:
    std = window[f'{m}_std']
    valid_std = std.where(std > 1e-6)
    window[f'{m}_z'] = (window[m] - window[f'{m}_mean']) / valid_std

window[[f'{m}_z' for m in metrics]].head()


In [None]:
# Aggregate anomalies per segment (break buffer)
threshold = 2.0

def max_consecutive(month_idx, flags):
    if len(month_idx) == 0:
        return 0
    order = np.argsort(month_idx)
    months = np.array(month_idx)[order]
    flags = np.array(flags)[order]

    max_run = 0
    run = 0
    last_m = None
    for m, f in zip(months, flags):
        if f and (last_m is None or m == last_m + 1):
            run += 1
        elif f:
            run = 1
        else:
            run = 0
        max_run = max(max_run, run)
        last_m = m
    return int(max_run)

def per_segment(group):
    out = {
        'break_id': group['break_id'].iloc[0],
        'segment_id': group['break_id'].iloc[0]
    }
    for m in metrics:
        z = group[f'{m}_z']
        out[f'{m.lower()}_z_max'] = z.max()
        out[f'{m.lower()}_z_mean'] = z.mean()
        out[f'{m.lower()}_persistence_months'] = max_consecutive(
            group['month_idx'].tolist(), (z.abs() >= threshold).fillna(False).tolist()
        )
        out[f'{m.lower()}_persistence_days'] = out[f'{m.lower()}_persistence_months'] * 30
    return pd.Series(out)

features = window.groupby('break_id', sort=False).apply(per_segment).reset_index(drop=True)

break_meta = df[['break_id', 'break_date']].drop_duplicates()
features = features.merge(break_meta, on='break_id', how='left')

features.head()


In [None]:
# Save features
out_path = Path('data/features/anomaly_features.parquet')
out_path.parent.mkdir(parents=True, exist_ok=True)

try:
    features.to_parquet(out_path, index=False)
    print(f'Wrote {out_path}')
except Exception as exc:
    raise SystemExit('Failed to write parquet. Install pyarrow: pip install pyarrow') from exc
