# Uganda Flood Fraction — Local JupyterLab (Earth Engine)  
This notebook runs **locally** (JupyterLab / Jupyter Notebook) and produces **per-Sentinel-1-image × per-district** flood metrics for Uganda (2020–2025).  
Outputs are exported **server-side to your Google Drive** using `ee.batch.Export.table.toDrive()` (no `.getInfo()` for the per-image tables).  

**Requirements (install locally):**
- Python 3.8+  
- `earthengine-api`, `geemap`, `geopandas`, `pandas`, `shapely`, `fiona`, `rasterio` installed in your environment.  
- Google account with Earth Engine access (register at https://earthengine.google.com/).  

**Notes:**
- Export tasks appear in *Earth Engine Tasks* in the Code Editor and execute on EE servers; you must check Drive for CSVs.
- For large runs, run in batches (chunking) to avoid hitting quota limits.  


## Environment Setup

In [1]:
from pathlib import Path
import ee, geemap
import numpy as np
import geopandas as gpd
import pandas as pd
import datetime
from shapely.geometry import mapping
from time import sleep

## Folder Structure

In [4]:
raw_data_dir = Path("../data/raw")
proc_data_dir = Path("../data/processed/climate")
proc_data_dir.mkdir(parents = True, exist_ok=True)

## Parameters

In [41]:
# Path to shapefile
DISTRICT_SHP_PATH = raw_data_dir / "geographies/uganda_herbert/uganda_districts.shp" 

# Baseline period and study period
START_DATE = '2020-01-01'
END_DATE   = '2025-08-31'
BASELINE_START = '2015-01-01'
BASELINE_END   = '2019-12-31'

# Flood detection parameters
ANOMALY_THRESHOLD = -3.0   # dB
PERM_WATER_OCCURRENCE_PCT = 50


S1_COLLECTION_ID = 'COPERNICUS/S1_GRD'
DRIVE_FOLDER = Path('/Users/paoich/Library/CloudStorage/GoogleDrive-andrichpaolo@gmail.com/My Drive/data/flood_files')
CHUNK_SIZE = 50  # how many images to submit per run (tune to your EE quotas)
SCALE = 10  # meters for pixelArea reductions
BATCH_SIZE = 20           # how many image metadata items to fetch per getInfo() call (50 is safe)
SLEEP_BETWEEN_BATCHES = 1 # seconds between batches to be polite to EE servers
SLEEP_BETWEEN_EXPS = 0.5  # seconds between export submissions

PROCESSED_LOG = str((DRIVE_FOLDER / 'processed_images.jsonl').resolve())  # local log file (newline-delimited JSON)
RESUBMIT_FAILED = False  

## Functions

In [39]:
def load_processed_ids(logfile=PROCESSED_LOG):
    processed = {}
    if os.path.exists(logfile):
        with open(logfile, 'r') as fh:
            for line in fh:
                try:
                    rec = json.loads(line.strip())
                    imgid = rec.get('image_id')
                    if imgid:
                        # keep last record per image (in case of resubmits)
                        processed[imgid] = rec
                except Exception:
                    continue
    return processed

# Helper: append a record to the log in JSONL format
def append_log(record, logfile=PROCESSED_LOG):
    # Use atomic append
    with open(logfile, 'a') as fh:
        fh.write(json.dumps(record) + "\\n")
        fh.flush()
        os.fsync(fh.fileno())

# Helper: function to create Export.table.toDrive task for a given image (server-side export of per-district table)
def export_image_to_drive(image_id, image_time_str):
    img = (
        ee
            .Image(S1_COLLECTION_ID + '/' + image_id)
            .select('VV')
    )
    acq_time = ee.Date(img.get('system:time_start')).format('YYYY-MM-dd')
    month = ee.Number(ee.Date(img.get('system:time_start')).get('month'))
    perm_water = ee.Image('JRC/GSW1_3/GlobalSurfaceWater').select('occurrence').gte(PERM_WATER_OCCURRENCE_PCT)

    def per_feature(feature):
        geom = feature.geometry()

        # Baseline collection for this district and calendar month
        baseline_col = (ee.ImageCollection(S1_COLLECTION_ID)
                        .filterDate(BASELINE_START, BASELINE_END)
                        .filter(ee.Filter.eq('instrumentMode','IW'))
                        .filter(ee.Filter.listContains('transmitterReceiverPolarisation','VV'))
                        .filterBounds(geom)
                        .filter(ee.Filter.calendarRange(month, month, 'month'))
                        .select('VV'))
        baseline_exists = baseline_col.size().gt(0)
        baseline = baseline_col.median().clip(geom)

        img_clip = img.clip(geom)
        anomaly = img_clip.subtract(baseline)

        flood_mask = anomaly.lte(ANOMALY_THRESHOLD).And(perm_water.Not())

        flooded_area = (
            ee
                .Image
                .pixelArea()
                .updateMask(flood_mask)
                .reduceRegion(
                    ee.Reducer.sum(), 
                    geom, 
                    SCALE, 
                    maxPixels=1e13
                )
                .get('area')
        )

        valid_area = (
            ee
                .Image
                .pixelArea()
                .updateMask(img_clip.mask())
                .reduceRegion(
                    ee.Reducer.sum(), 
                    geom, 
                    SCALE, 
                    maxPixels=1e13
                )
                .get('area')
        )

        district_area = ee.Number(feature.get('area_m2'))

        flooded_num = ee.Number(flooded_area).unmask(0)
        valid_num = ee.Number(valid_area).unmask(0)

        coverage_pct = ee.Algorithms.If(district_area.gt(0), valid_num.divide(district_area).multiply(100), ee.Number(0))
        flood_fraction = ee.Algorithms.If(district_area.gt(0), flooded_num.divide(district_area), ee.Number(0))

        return feature.set({
            'image_id': image_id,
            'acq_time': acq_time,
            'flooded_m2': flooded_num,
            'district_area_m2': district_area,
            'flood_fraction': flood_fraction,
            'coverage_pct': coverage_pct,
            'baseline_exists': baseline_exists
        })

    results_fc = districts_fc.map(per_feature)

    # Export to Drive
    task_description = f"export_{image_id}"
    file_name = f"{image_id}_district_floods"
    task = ee.batch.Export.table.toDrive(
        collection=results_fc,
        description=task_description,
        folder=DRIVE_FOLDER,
        fileNamePrefix=file_name,
        fileFormat='CSV'
    )
    task.start()
    print('Started export task for', image_id, '->', file_name)
    return task


## Setup Google Earth Engine

In [2]:
ee.Authenticate()

True

In [3]:
ee.Initialize(project = "divine-catalyst-330916")

## Import Data

### Geometries

In [10]:
gdf = (
    gpd
    .read_file(DISTRICT_SHP_PATH)
    .rename(columns = str.lower)
    .assign(
        district = lambda x: np.where(
            x["district"] == "SSEMBABULE",
            "Sembabule",
            x["district"].str.title()
        ),
        area_m2 = lambda x: x["geometry"].area
    )
    .to_crs(epsg=4326)
    [[
        "district",
        "area_m2",
        "geometry"
    ]]
)

In [11]:
# Convert to ee FeatureCollection; ensure area_m2 is a property
districts_fc = geemap.geopandas_to_ee(gdf)
# Ensure properties: district_name and area_m2 exist
def ensure_props(f):
    return (
        f
        .set(
            {
                'district': f.get('district'), 
                'area_m2': f.get('area_m2')
            }
        )
    )
districts_fc = districts_fc.map(ensure_props)

## Processing

In [44]:
# Whether to include centroid geometry as a small geometry (keeps size up a bit)
INCLUDE_CENTROID = False

# Build month windows
months = pd.date_range(START_DATE, END_DATE, freq='MS')

for m in months:
    m_start = m.strftime('%Y-%m-%d')
    m_end = (m + pd.offsets.MonthEnd(1)).strftime('%Y-%m-%d')
    print('Preparing metadata export for', m_start, '->', m_end)

    # Filter collection to month and region if desired (e.g., filterBounds(districts_fc.geometry()))
    coll = (
        ee
            .ImageCollection("COPERNICUS/S1_GRD")
            .filterDate(m_start, m_end)
            .filter(ee.Filter.eq('instrumentMode', 'IW'))
            .filter(ee.Filter.listContains('transmitterReceiverPolarisation', 'VV'))
            .filterBounds(districts_fc.geometry())
    )

    def make_feature(img):
        # minimal properties
        props = {
            'image_id': img.get('system:index'),
            'time_start': img.get('system:time_start'),
            'orbit_pass': img.get('orbitProperties_pass')  # optional
        }
        # optionally add a tiny centroid geometry instead of full footprint:
        if INCLUDE_CENTROID:
            geom = img.geometry().centroid(1)  # centroid, 1 m tolerance
            return ee.Feature(geom, props)
        else:
            # Feature with no geometry (table export will contain properties only)
            return ee.Feature(None, props)

    meta_fc = coll.map(make_feature)

    # An extra guard: if the collection is empty, skip export
    size = coll.size()
    if size.eq(0).getInfo():  # safe small scalar; size per-month tends to be small — if this still errors, remove getInfo and proceed with a try/catch
        print('  No images in this month; skipping.')
        continue

    task_desc = f'metadata_{m_start}'
    out_name = f's1_metadata_{m_start}'
    task = ee.batch.Export.table.toDrive({
        'collection': meta_fc,
        'description': task_desc,
        'folder': DRIVE_FOLDER,
        'fileNamePrefix': out_name,
        'fileFormat': 'CSV'
    })
    task.start()
    print('Export started:', task_desc)


Preparing metadata export for 2020-01-01 -> 2020-01-31


EEException: Request payload size exceeds the limit: 10485760 bytes.

In [42]:
months = pd.date_range(START_DATE, END_DATE, freq='MS')

In [43]:
# Main loop: month by month, page by small batches
for m in months:
    m_start = m.strftime('%Y-%m-%d')
    m_end = (m + pd.offsets.MonthEnd(1)).strftime('%Y-%m-%d')
    print(f"== Month window: {m_start} to {m_end} ==")

    s1_month_col = (
        ee
            .ImageCollection('COPERNICUS/S1_GRD')
            .filterDate(m_start, m_end)
            .filter(ee.Filter.eq('instrumentMode','IW'))
            .filter(ee.Filter.listContains('transmitterReceiverPolarisation','VV'))
            .filterBounds(districts_fc.geometry())
    )

    offset = 0
    while True:
        # fetch only BATCH_SIZE metadata items (safe small payload)
        batch_list = s1_month_col.toList(BATCH_SIZE, offset).getInfo()
        if not batch_list:
            print(f"No more images for {m_start} (offset {offset}).")
            break

        for img_meta in batch_list:
            # robust image_id extraction
            image_id = None
            if isinstance(img_meta, dict):
                image_id = img_meta.get('id') or (img_meta.get('properties') or {}).get('system:index')
            if image_id is None:
                # sometimes nested differently; do a final attempt
                image_id = img_meta.get('name') if isinstance(img_meta, dict) else None

            if image_id is None:
                print("WARNING: could not extract image id; skipping meta:", img_meta)
                continue

            # Skip if already submitted (unless resubmitting failed)
            prev = processed_map.get(image_id)
            if prev:
                state = prev.get('status')
                if state == 'submitted' and not RESUBMIT_FAILED:
                    print(f"  Skipping {image_id} (already submitted).")
                    continue
                if state == 'failed' and not RESUBMIT_FAILED:
                    print(f"  Skipping {image_id} (previously failed). Set RESUBMIT_FAILED=True to retry.")
                    continue
                # else, will resubmit

            # Submit export (user-defined function)
            try:
                task = export_image_to_drive(image_id)  # should start the task and return ee.batch.Task
                # Record submission
                rec = {
                    'image_id': image_id,
                    'task_description': getattr(task, 'description', None),
                    'task_id': getattr(task, 'id', None),
                    'time_utc': datetime.datetime.utcnow().isoformat() + 'Z',
                    'status': 'submitted'
                }
                append_log(rec)
                processed_map[image_id] = rec
                print(f"Submitted export for {image_id} -> task {rec['task_description']}")
            except Exception as e:
                # Log failure so you can retry later
                rec = {
                    'image_id': image_id,
                    'task_description': None,
                    'task_id': None,
                    'time_utc': datetime.datetime.utcnow().isoformat() + 'Z',
                    'status': 'failed',
                    'error': str(e)
                }
                append_log(rec)
                processed_map[image_id] = rec
                print(f"  ERROR submitting {image_id}: {e}")

            time.sleep(SLEEP_BETWEEN_EXPS)

        offset += BATCH_SIZE
        print(f"  Fetched offset {offset}; sleeping {SLEEP_BETWEEN_BATCHES}s before next batch...")
        time.sleep(SLEEP_BETWEEN_BATCHES)

print("All month windows processed. Review", PROCESSED_LOG, "for submission log and task ids.")

== Month window: 2020-01-01 to 2020-01-31 ==


EEException: Request payload size exceeds the limit: 10485760 bytes.

### Sentinel-1

In [24]:
# Build list of Sentinel-1 image ids & times intersecting the districts and timeframe.
# This is a light-weight client-side listing that retrieves only IDs and timestamps (not heavy data).
S1_col = (
    ee
        .ImageCollection('COPERNICUS/S1_GRD')
        .filterDate(START_DATE, END_DATE)
        .filter(ee.Filter.eq('instrumentMode','IW'))
        .filter(ee.Filter.listContains('transmitterReceiverPolarisation','VV'))
        .filterBounds(districts_fc.filter(ee.Filter.eq("district", "Gulu")).geometry())
        .select("VV")
)

In [25]:
# create lists of IDs and times
info = (
    S1_col
        .reduceColumns(
            ee.Reducer.toList(2), 
            ['system:index','system:time_start']
        )
        .getInfo()
)
image_ids = info['list'][0]
image_times = info['list'][1]
print('Found', len(image_ids), 'Sentinel-1 images in the period')

EEException: Request payload size exceeds the limit: 10485760 bytes.

In [None]:
# Submit export tasks in chunks to avoid overloading EE Tasks
n_images = len(image_ids)
print('Total images:', n_images)
start_idx = 0

while start_idx < n_images:
    end_idx = min(start_idx + CHUNK_SIZE, n_images)
    batch_ids = image_ids[start_idx:end_idx]
    print(f'Submitting images {start_idx}..{end_idx-1} (count {len(batch_ids)})')
    tasks = []
    for iid in batch_ids:
        t = export_image_to_drive(iid, None)
        tasks.append(t)
        sleep(0.5)  # small delay to be polite
    print('Submitted batch. Monitoring task states (brief)...')
    # simple monitoring
    for task in tasks:
        print(task.description, task.status())
    # wait before next batch
    print('Sleeping 30s before next batch (increase if needed)')
    sleep(30)
    start_idx = end_idx

print('All batches submitted (or attempted). Check Earth Engine Tasks in the Code Editor and your Drive folder.')

In [None]:
# OPTIONAL: list all EE tasks and their status
for t in ee.batch.Task.list():
    print(t.id, t.state, t.description, t.status())

## After exports
- Exported CSVs will appear in your Google Drive in the folder you specified (DRIVE_FOLDER).  
- Each CSV corresponds to one image and contains one row per district with columns:
  image_id, acq_time, district_name, flooded_m2, district_area_m2, flood_fraction, coverage_pct, baseline_exists  
- To assemble a single master CSV locally: download the CSVs from Drive and concatenate (example code provided below).

Example code to merge downloaded CSVs locally:
```python
import glob, pandas as pd
files = glob.glob('/path/to/downloaded_csvs/*_district_floods.csv')
df = pd.concat([pd.read_csv(f) for f in files], ignore_index=True)
df.to_csv('uganda_flood_master.csv', index=False)
```
