<a href="https://colab.research.google.com/github/lawrencejesse/Sentinel2_Extractor/blob/main/Sentinel2_RasterExtractor.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Sentinel-2 AOI Exporter (Colab friendly)

Run order:
1. Package setup (Colab only)
2. Imports and globals
3. Earth Engine authentication / initialization
4. AOI upload widget
5. AOI map preview
6. Sentinel-2 helpers (cloud mask, indices, collection builder)
7. Date filtering UI (list cloud-filtered scenes)
8. Export UI (per-date NDVI / NDWI / RGB + optional means)
9. Task monitor (optional)
10. NDVI time-series quicklook (optional)


In [None]:
# --- Cell 1: Package setup (run in Colab; skip locally if already installed) ---
# Install spatial libraries required by geemap for file conversions (like KML/Shapefile to EE)
%pip install -q earthengine-api geemap ipywidgets==7.7.1 pandas fiona gdal

In [None]:

# --- Cell 2: Imports & global state ---
import os
import tempfile
from collections import OrderedDict
from datetime import datetime

import ee
import geemap
import pandas as pd
import ipywidgets as widgets
from IPython.display import display, clear_output

DATE_FMT = "%Y-%m-%d"
AOI_GEOMETRY = None
AOI_NAME = None
PROCESSED_COLLECTION = None
AVAILABLE_IMAGES = OrderedDict()
EXPORT_TASKS = []

def check_aoi_status():
    """Check if AOI is properly loaded and display status."""
    if AOI_GEOMETRY is None or AOI_NAME is None:
        print("❌ AOI not loaded. Please upload an AOI file in the upload cell.")
        return False
    else:
        print(f"✅ AOI loaded: {AOI_NAME}")
        return True

def validate_aoi_required(func_name):
    """Validate that AOI is loaded before running functions that need it."""
    if AOI_GEOMETRY is None or AOI_NAME is None:
        print(f"❌ {func_name} requires an AOI to be uploaded first.")
        print("Please go back to the upload cell and upload your AOI file.")
        return False
    return True


In [None]:
# --- Cell 3: Earth Engine authentication / initialization --- **Please Manually Enter you Google Earth Engine Project ID in the next line. Then when you authenticate in the next cell, it should work.
EE_PROJECT_ID = "jessemapping"

try:
    ee.Initialize(project=EE_PROJECT_ID)
    print(f"Earth Engine initialized with project: {EE_PROJECT_ID}")
except Exception as init_err:
    print("Earth Engine initialization failed. Authenticate before rerunning this cell.")
    print(init_err)
    print("\nSteps if running in Colab:")
    print("1. Run ee.Authenticate() in the next cell.")
    print("2. Rerun this cell to bind the session.")
    print("\nSteps if running locally:")
    print("1. Run 'earthengine authenticate' in a terminal.")
    print("2. Rerun this cell inside the notebook.")

In [None]:

# --- Cell 3b: Manual authentication helper (run only if prompted) ---
ee.Authenticate()
ee.Initialize(project=EE_PROJECT_ID)
print("Earth Engine authenticated and initialized.")


In [None]:
# --- Cell 4: AOI upload using google.colab.files (KML / GeoJSON / zipped Shapefile) ---
import google.colab.files
import os
import tempfile
import ee
import geemap
import re # Import the re module

def _sanitize_name(name: str) -> str:
    '''Create filesystem-safe and EE-description-safe labels.'''
    # Use regex to keep only allowed characters for EE description: a-z, A-Z, 0-9, ., ,, :, ;, _, -
    # Replace disallowed characters with underscores or simply remove them. Removing is safer.
    base = os.path.splitext(name)[0]
    # Replace spaces with underscores first, then remove any characters not in the allowed set
    safe_base = base.replace(' ', '_')
    # Keep only allowed characters
    cleaned_name = re.sub(r'[^a-zA-Z0-9.,:;_-]', '', safe_base)
    # Ensure it's not empty after sanitization
    if not cleaned_name:
        return 'sanitized_aoi' # Provide a default name if sanitization results in empty string
    return cleaned_name

print("<h3>Step 1 · Upload your AOI</h3>")
print("Please upload your KML, GeoJSON, or zipped Shapefile using the file picker below.")

uploaded_files = google.colab.files.upload()

global AOI_GEOMETRY, AOI_NAME
AOI_GEOMETRY = None
AOI_NAME = None

if not uploaded_files:
    print("No file uploaded.")
else:
    # Assuming only one file is uploaded
    file_name = list(uploaded_files.keys())[0]
    file_content = uploaded_files[file_name]
    suffix = os.path.splitext(file_name)[1].lower()

    print(f"Processing AOI file: {file_name}")

    temp_path = None
    try:
        with tempfile.NamedTemporaryFile(delete=False, suffix=suffix) as tmp:
            tmp.write(file_content)
            temp_path = tmp.name

        if suffix == '.kml':
            try:
                ee_object = geemap.kml_to_ee(temp_path)
            except Exception as kml_err:
                print(f"❌ Error processing KML file: {kml_err}")
                import traceback
                traceback.print_exc()
                raise # Re-raise the exception to be caught by the outer block
        elif suffix == '.geojson':
            ee_object = geemap.geojson_to_ee(temp_path)
        elif suffix == '.zip':
            ee_object = geemap.shp_to_ee(temp_path)
        else:
            print('Unsupported file type. Please upload KML, GeoJSON, or zipped Shapefile.')
            raise ValueError('Unsupported file type') # Raise to skip setting AOI variables


        AOI_GEOMETRY = ee_object.geometry().transform('EPSG:4326', 1)
        # Sanitize the name for EE export description compatibility
        AOI_NAME = _sanitize_name(file_name)
        print(f"✅ AOI loaded successfully as: {AOI_NAME}")
        print('Run the next cell to preview the footprint on a map.')

    except Exception as err:
        AOI_GEOMETRY = None
        AOI_NAME = None
        print(f"❌ Failed to parse AOI: {err}")
        # The traceback is already printed for KML errors, avoid double printing for other errors
        if suffix != '.kml':
             import traceback
             traceback.print_exc()

    finally:
        if temp_path and os.path.exists(temp_path):
            os.remove(temp_path)

In [None]:

# --- Cell 5: AOI map preview (rerun after uploading) ---
# Check AOI status first
if not check_aoi_status():
    print('Upload an AOI in the previous cell before previewing the map.')
else:
    preview_map = geemap.Map(basemap='SATELLITE')
    preview_map.addLayer(ee.FeatureCollection(AOI_GEOMETRY), {'color': 'red'}, AOI_NAME or 'AOI')
    preview_map.centerObject(AOI_GEOMETRY, zoom=11)
    preview_map.addLayerControl()
    display(preview_map)


In [None]:
# --- Cell 6: Sentinel-2 helpers (cloud mask, indices, collection builder) ---
def mask_s2_sr(image: ee.Image) -> ee.Image:
    qa = image.select('QA60')
    cloud_mask = qa.bitwiseAnd(1 << 10).eq(0)
    cirrus_mask = qa.bitwiseAnd(1 << 11).eq(0)
    return image.updateMask(cloud_mask.And(cirrus_mask))


def add_indices(image: ee.Image) -> ee.Image:
    scaled = image.select(['B3', 'B4', 'B8', 'B11']).multiply(0.0001)

    # Upsample B11 to 10m resolution before index calculation
    swir1_10m = scaled.select('B11').resample('bilinear').reproject(
        crs=scaled.select('B8').projection(), scale=10
    )

    # Add the upsampled SWIR band to the scaled image for NDMI calculation
    scaled_with_upsampled_swir = scaled.addBands(swir1_10m.rename('B11_10m'), overwrite=True)


    ndvi = scaled_with_upsampled_swir.normalizedDifference(['B8', 'B4']).rename('NDVI')
    # Calculate NDMI using B8 and the upsampled B11_10m band
    ndmi = scaled_with_upsampled_swir.normalizedDifference(['B8', 'B11_10m']).rename('NDMI')

    return image.addBands([ndvi, ndmi])


def build_s2_collection(aoi_geom: ee.Geometry, start_date: str, end_date: str, max_cloud: float) -> ee.ImageCollection:
    if aoi_geom is None:
        raise ValueError('AOI is missing. Upload an AOI before requesting imagery.')

    collection = (
        ee.ImageCollection('COPERNICUS/S2_SR_HARMONIZED')
        .filterBounds(aoi_geom)
        .filterDate(start_date, end_date)
        .filter(ee.Filter.lte('CLOUDY_PIXEL_PERCENTAGE', max_cloud))
        .map(mask_s2_sr)
        .map(add_indices)
        .map(lambda img: img.clip(aoi_geom))
        .sort('system:time_start')
    )
    return collection


def summarize_collection(collection: ee.ImageCollection) -> list:
    size = collection.size().getInfo()
    if size == 0:
        return []

    info_list = collection.toList(size).getInfo()
    summary = []
    for item in info_list:
        props = item['properties']
        timestamp = props.get('system:time_start')
        date_str = datetime.utcfromtimestamp(timestamp / 1000).strftime(DATE_FMT)
        summary.append({
            'date': date_str,
            'timestamp': timestamp,
            'cloud_percent': props.get('CLOUDY_PIXEL_PERCENTAGE', None),
            'image_id': item.get('id'),
            'system_index': props.get('system:index')
        })
    return summary


print('Helper functions ready. Proceed to the next cell to configure filters.')

In [None]:

# --- Cell 7: Date filtering UI (list cloud-filtered scenes) ---
# Check AOI status first
if not check_aoi_status():
    print('Upload an AOI first so we can query imagery.')

start_date_widget = widgets.DatePicker(description='Start date', value=datetime(2025, 6, 1))
end_date_widget = widgets.DatePicker(description='End date', value=datetime(2025, 9, 1))
max_cloud_slider = widgets.FloatSlider(value=20.0, min=0.0, max=100.0, step=1.0, description='Max cloud %')

fetch_button = widgets.Button(description='List cloud-filtered scenes', button_style='info', icon='search')
select_all_button = widgets.Button(description='Select all dates', icon='check')
clear_selection_button = widgets.Button(description='Clear selection', icon='times')

collection_status_output = widgets.Output()
collection_table_output = widgets.Output()

date_selector = widgets.SelectMultiple(
    options=[],
    description='Available dates',
    rows=10,
    layout=widgets.Layout(width='100%')
)


def on_fetch_clicked(_):
    global PROCESSED_COLLECTION, AVAILABLE_IMAGES
    with collection_status_output:
        collection_status_output.clear_output()

        if not validate_aoi_required("Scene fetching"):
            return
        if start_date_widget.value is None or end_date_widget.value is None:
            print('Select both a start date and an end date.')
            return
        if start_date_widget.value > end_date_widget.value:
            print('Start date must be earlier than or equal to end date.')
            return

        start_str = start_date_widget.value.strftime(DATE_FMT)
        end_str = end_date_widget.value.strftime(DATE_FMT)
        max_cloud = max_cloud_slider.value

        print(f'Querying Sentinel-2 SR Harmonized from {start_str} to {end_str} (<= {max_cloud}% cloud)...')

        try:
            collection = build_s2_collection(AOI_GEOMETRY, start_str, end_str, max_cloud)
            summary = summarize_collection(collection)
            PROCESSED_COLLECTION = collection if summary else None
            AVAILABLE_IMAGES = OrderedDict((item['date'], item) for item in summary)

            if not summary:
                date_selector.options = []
                with collection_table_output:
                    collection_table_output.clear_output()
                print('No cloud-filtered scenes found for the chosen parameters.')
                return

            option_pairs = [
                (f"{item['date']} | cloud {item['cloud_percent']:.1f}%", item['date'])
                if item['cloud_percent'] is not None
                else (f"{item['date']} | cloud n/a", item['date'])
                for item in summary
            ]
            date_selector.options = option_pairs

            with collection_table_output:
                collection_table_output.clear_output()
                df = pd.DataFrame(summary)
                df = df[['date', 'cloud_percent', 'image_id']]
                df = df.rename(columns={'cloud_percent': 'cloud_%', 'image_id': 'ee_image_id'})
                display(df)

            print(f"Found {len(summary)} scene(s). Select the dates you want to export below.")
        except Exception as err:
            PROCESSED_COLLECTION = None
            AVAILABLE_IMAGES = OrderedDict()
            date_selector.options = []
            with collection_table_output:
                collection_table_output.clear_output()
            print(f'Error while fetching scenes: {err}')


def on_select_all(_):
    if not date_selector.options:
        return
    date_selector.value = tuple(value for _, value in date_selector.options)


def on_clear_selection(_):
    date_selector.value = ()


fetch_button.on_click(on_fetch_clicked)
select_all_button.on_click(on_select_all)
clear_selection_button.on_click(on_clear_selection)

filter_controls = widgets.VBox([
    widgets.HTML('<h3>Step 2  Choose date range & cloud threshold</h3>'),
    widgets.HBox([start_date_widget, end_date_widget]),
    max_cloud_slider,
    fetch_button,
    widgets.HBox([select_all_button, clear_selection_button]),
    collection_status_output,
    collection_table_output,
    widgets.HTML('<strong>Highlighted dates:</strong>'),
    date_selector
])

display(filter_controls)


In [None]:
# --- Cell 8: Export UI (NDVI / NDMI / RGB per-date + optional means) ---
export_ndvi_toggle = widgets.Checkbox(value=True, description='Export NDVI rasters', indent=False)
export_ndmi_toggle = widgets.Checkbox(value=False, description='Export NDMI rasters', indent=False) # Changed description
export_rgb_toggle = widgets.Checkbox(value=False, description='Export RGB rasters', indent=False)
mean_ndvi_toggle = widgets.Checkbox(value=False, description='Export mean NDVI for range', indent=False)
mean_ndmi_toggle = widgets.Checkbox(value=False, description='Export mean NDMI for range', indent=False) # Changed description

scale_widget = widgets.IntSlider(value=10, min=10, max=60, step=10, description='Export scale (m)')
folder_widget = widgets.Text(value='earth_engine_exports', description='Drive folder')

export_button = widgets.Button(description='Start exports', button_style='success', icon='download')
export_status_output = widgets.Output()


def _start_export_task(image: ee.Image, description: str, folder: str, scale: int) -> ee.batch.Task:
    task = ee.batch.Export.image.toDrive(
        image=image,
        description=description,
        folder=folder,
        fileNamePrefix=description,
        region=AOI_GEOMETRY,
        scale=scale,
        crs='EPSG:4326',
        fileFormat='GeoTIFF'
    )
    task.start()
    EXPORT_TASKS.append({'label': description, 'task': task})
    return task


def on_export_clicked(_):
    # First check if AOI is loaded
    if not validate_aoi_required("Export"):
        return

    if PROCESSED_COLLECTION is None or not AVAILABLE_IMAGES:
        with export_status_output:
            export_status_output.clear_output()
            print('Run the previous cell to list available scenes before exporting.')
        return

    selected_dates = list(date_selector.value)
    active_products = [
        prod for prod, toggle in [
            ('NDVI', export_ndvi_toggle.value),
            ('NDMI', export_ndmi_toggle.value), # Changed from NDWI to NDMI
            ('RGB', export_rgb_toggle.value)
        ] if toggle
    ]

    if not selected_dates and not (mean_ndvi_toggle.value or mean_ndmi_toggle.value):
        with export_status_output:
            export_status_output.clear_output()
            print('Select at least one date or enable a mean export option.')
        return

    if not active_products and not (mean_ndvi_toggle.value or mean_ndmi_toggle.value):
        with export_status_output:
            export_status_output.clear_output()
            print('Enable at least one product toggle (NDVI, NDMI, or RGB).') # Changed from NDWI to NDMI
        return

    folder_name = folder_widget.value.strip() or 'earth_engine_exports'
    scale_value = int(scale_widget.value)
    aoi_label = AOI_NAME or 'AOI'
    start_str = start_date_widget.value.strftime(DATE_FMT) if start_date_widget.value else 'start'
    end_str = end_date_widget.value.strftime(DATE_FMT) if end_date_widget.value else 'end'

    tasks_started = []

    with export_status_output:
        export_status_output.clear_output()
        print(f'Export destination: Google Drive folder "{folder_name}" at {scale_value} m resolution.')

        if mean_ndvi_toggle.value:
            mean_ndvi = PROCESSED_COLLECTION.select('NDVI').mean().clip(AOI_GEOMETRY)
            desc = f'{aoi_label}_NDVI_MEAN_{start_str}_{end_str}'
            task = _start_export_task(mean_ndvi, desc, folder_name, scale_value)
            tasks_started.append((desc, task.id))
            print(f'Started mean NDVI export: {desc}')

        if mean_ndmi_toggle.value:
            mean_ndmi = PROCESSED_COLLECTION.select('NDMI').mean().clip(AOI_GEOMETRY) # Changed from NDWI to NDMI
            desc = f'{aoi_label}_NDMI_MEAN_{start_str}_{end_str}' # Changed from NDWI to NDMI
            task = _start_export_task(mean_ndmi, desc, folder_name, scale_value)
            tasks_started.append((desc, task.id))
            print(f'Started mean NDMI export: {desc}') # Changed from NDWI to NDMI

        for date_key in selected_dates:
            meta = AVAILABLE_IMAGES.get(date_key)
            if meta is None:
                print(f'Skipping {date_key}: metadata not found.')
                continue

            image = PROCESSED_COLLECTION.filter(ee.Filter.eq('system:time_start', meta['timestamp'])).first()
            if image is None:
                print(f'Skipping {date_key}: image could not be retrieved.')
                continue

            base_name = f"{aoi_label}_{date_key}".replace(':', '-')

            if 'NDVI' in active_products:
                ndvi_image = image.select('NDVI')
                desc = f'{base_name}_NDVI'
                task = _start_export_task(ndvi_image, desc, folder_name, scale_value)
                tasks_started.append((desc, task.id))
                print(f'Started NDVI export for {date_key}')

            if 'NDMI' in active_products: # Changed from NDWI to NDMI
                ndmi_image = image.select('NDMI') # Changed from NDWI to NDMI
                desc = f'{base_name}_NDMI' # Changed from NDWI to NDMI
                task = _start_export_task(ndmi_image, desc, folder_name, scale_value)
                tasks_started.append((desc, task.id))
                print(f'Started NDMI export for {date_key}') # Changed from NDWI to NDMI

            if 'RGB' in active_products:
                rgb_image = image.select(['B4', 'B3', 'B2'])
                desc = f'{base_name}_RGB'
                task = _start_export_task(rgb_image, desc, folder_name, scale_value)
                tasks_started.append((desc, task.id))
                print(f'Started RGB export for {date_key}')

        if tasks_started:
            print('Export tasks launched:')
            for name, task_id in tasks_started:
                print(f'- {name}  task {task_id}')
        else:
            print('No export tasks were started. Check selections and try again.')


def on_folder_change(change):
    if change['new'] == '':
        folder_widget.value = 'earth_engine_exports'


folder_widget.observe(on_folder_change, names='value')
export_button.on_click(on_export_clicked)

export_controls = widgets.VBox([
    widgets.HTML('<h3>Step 3 · Choose products & launch exports</h3>'),
    widgets.HBox([export_ndvi_toggle, export_ndmi_toggle, export_rgb_toggle]), # Changed from NDWI to NDMI toggle
    widgets.HBox([mean_ndvi_toggle, mean_ndmi_toggle]), # Changed from NDWI to NDMI toggle
    scale_widget,
    folder_widget,
    export_button,
    export_status_output
])

display(export_controls)

In [None]:

# --- Cell 9: Export task monitor (optional) ---
refresh_button = widgets.Button(description='Refresh task status', icon='refresh')
task_status_output = widgets.Output()


def refresh_tasks(_=None):
    with task_status_output:
        task_status_output.clear_output()
        if not EXPORT_TASKS:
            print('No export tasks tracked yet. Launch exports in the previous cell first.')
            return
        for item in EXPORT_TASKS:
            task = item['task']
            status = task.status()
            print(f"{item['label']}: {status.get('state')} ({status.get('description', 'no description')})")


refresh_button.on_click(refresh_tasks)

display(widgets.VBox([
    widgets.HTML('<h3>Step 4  Monitor export progress (optional)</h3>'),
    refresh_button,
    task_status_output
]))


In [None]:

# --- Cell 10: NDVI time-series quicklook (optional) ---
# Check AOI status first
if not validate_aoi_required("NDVI charting"):
    pass
elif PROCESSED_COLLECTION is None or not AVAILABLE_IMAGES:
    print('Run the scene listing cell first to build a collection before charting NDVI.')
else:
    try:
        chart = geemap.chart.ImageSeries(
            PROCESSED_COLLECTION.select('NDVI'),
            AOI_GEOMETRY,
            reducer=ee.Reducer.mean(),
            scale=30,
            x_property='system:time_start'
        )
        display(chart)
    except Exception as chart_err:
        print(f'Unable to build NDVI chart: {chart_err}')
        print('You can always export the rasters and chart NDVI locally if needed.')


In [None]:
# --- Cell 11: AOI Status Check (run anytime to check current state) ---
print("=== Current AOI Status ===")
check_aoi_status()

if AOI_GEOMETRY is not None:
    print(f"✅ AOI Geometry: Loaded")
    print(f"✅ AOI Name: {AOI_NAME}")
    print(f"✅ Collection Status: {'Loaded' if PROCESSED_COLLECTION is not None else 'Not loaded'}")
    print(f"✅ Available Images: {len(AVAILABLE_IMAGES)} scenes")
    print(f"✅ Export Tasks: {len(EXPORT_TASKS)} tasks")
else:
    print("❌ No AOI loaded. Please upload an AOI file in the upload cell.")
    print("❌ Collection Status: Not available")
    print("❌ Available Images: 0 scenes")
    print("❌ Export Tasks: 0 tasks")
