# Creating a Median Composite with Dask

## Introduction

When working with optical satellite imagery, one of the first pre-processing step is to apply a cloud-mask on individual satellite scenes and create a cloud-free composite image for the chosen time period. Doing this in the traditional way requires you to download large satellite scenes, read them into memory and iteratively process them. This is not very efficient and does not leverage all the resources your computer has to offer. We can instead leverage modern cloud-native data formats, such a Cloud-Optimized GeoTIFFs (COGs) that can efficiently stream only the required pixels, process the time-series of scenes using XArray that uses vectorized operations to efficiently process the whole stack, and use Dask to run the computation in parallel across all the CPU cores. This modern approach makes data processing tasks easy, efficient and fun.

## Overview of the Task

We will query a STAC catalog for Sentinel-2 imagery over the city of Bengaluru, India, apply a cloud-mask and create a median composite image using distributed processing on a local machine.


**Input Layers**:

* `bangalore.geojson`: A GeoJSON file representing the municipal boundary for the city of Bengaluru, India.

**Output Layers**:
*   `composite.tif` : A Cloud-optimized GeoTIFF (COG) of Sentinel-2 composite image for January 2025.

**Data Credit**:
* Bangalore Ward Maps Provided by Spatial Data of Municipalities (Maps) Project by Data{Meet}.
* Sentinel-2 Level 2A Scenes: Contains modified Copernicus Sentinel data (2025-02)

## Setup and Data Download

The following blocks of code will install the required packages and download the datasets to your Colab environment.

In [12]:
%%capture
if 'google.colab' in str(get_ipython()):
    !pip install pystac-client odc-stac rioxarray dask

In [13]:
from odc.stac import stac_load
import geopandas as gpd
import matplotlib.pyplot as plt
import os
import pystac_client
import rioxarray as rxr
import xarray as xr

In [15]:
data_folder = 'data'
output_folder = 'output'

if not os.path.exists(data_folder):
    os.mkdir(data_folder)
if not os.path.exists(output_folder):
    os.mkdir(output_folder)

In [16]:
def download(url):
    filename = os.path.join(data_folder, os.path.basename(url))
    if not os.path.exists(filename):
        from urllib.request import urlretrieve
        local, _ = urlretrieve(url, filename)
        print('Downloaded ' + local)

data_url = 'https://github.com/spatialthoughts/geopython-tutorials/releases/download/data/'
filename = 'bangalore.geojson'

download(data_url + filename)

Setup a local Dask cluster. This distributes the computation across multiple workers on your computer.

> Note: If you are running this notebook in Colab, you will not be able to access the Daskboard link.

In [None]:
from dask.distributed import Client, progress
client = Client()  # set up local cluster on the machine
client

## Procedure

Let's use Element84 search endpoint to look for items from the `sentinel-2-c1-l2a` collection on AWS

In [17]:
catalog = pystac_client.Client.open('https://earth-search.aws.element84.com/v1')

In [18]:
aoi_file = 'bangalore.geojson'
aoi_filepath = os.path.join(data_folder, aoi_file)
aoi = gpd.read_file(aoi_filepath)

In [19]:
geometry = aoi.unary_union
geometry_geojson = json.dumps(mapping(geometry))

We search for the imagery collected within the date range and intersecting the AOI geometry. Additionally we add filters to select imagery with less cloud cover and over a specific MGRS tile.

In [20]:
year = 2023
month = 5
time_range = f'{year}-{month:02}'

search = catalog.search(
    collections=['sentinel-2-c1-l2a'],
    intersects=geometry_geojson,
    datetime=time_range,
    query={'eo:cloud_cover': {'lt': 30},  'mgrs:grid_square': {'eq': 'GQ'}},
)
items = search.item_collection()
len(items)

3

In [21]:
stack = stackstac.stack(items, resolution=10)
stack

  times = pd.to_datetime(


Unnamed: 0,Array,Chunk
Bytes,53.10 GiB,8.00 MiB
Shape,"(3, 19, 11179, 11184)","(1, 1, 1024, 1024)"
Dask graph,6897 chunks in 3 graph layers,6897 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 53.10 GiB 8.00 MiB Shape (3, 19, 11179, 11184) (1, 1, 1024, 1024) Dask graph 6897 chunks in 3 graph layers Data type float64 numpy.ndarray",3  1  11184  11179  19,

Unnamed: 0,Array,Chunk
Bytes,53.10 GiB,8.00 MiB
Shape,"(3, 19, 11179, 11184)","(1, 1, 1024, 1024)"
Dask graph,6897 chunks in 3 graph layers,6897 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Clip and select the subset of bands.

In [29]:
geometry = aoi.to_crs(stack.rio.crs).geometry
clipped = stack.rio.clip(geometry)
subset = clipped.sel(band=['red', 'green', 'blue', 'scl'])

In [30]:
scl = subset.sel(band='scl')

valid = ((scl >= 4) & (scl <= 7) | (scl==11))
         
subset_masked = subset.where(valid)
rgb_masked = subset_masked.sel(band=['red', 'green', 'blue'])

In [31]:
median = rgb_masked.median(dim='time')
median

Unnamed: 0,Array,Chunk
Bytes,275.32 MiB,8.00 MiB
Shape,"(3, 3427, 3510)","(1, 1024, 1024)"
Dask graph,48 chunks in 19 graph layers,48 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 275.32 MiB 8.00 MiB Shape (3, 3427, 3510) (1, 1024, 1024) Dask graph 48 chunks in 19 graph layers Data type float64 numpy.ndarray",3510  3427  3,

Unnamed: 0,Array,Chunk
Bytes,275.32 MiB,8.00 MiB
Shape,"(3, 3427, 3510)","(1, 1024, 1024)"
Dask graph,48 chunks in 19 graph layers,48 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [32]:
%time median = median.compute()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


CPU times: total: 36.3 s
Wall time: 6min 15s


In [33]:
output_file = f'median_{year}_{month:02}.tif'
output_path = os.path.join(output_folder, output_file)
median.rio.to_raster(output_path, driver='COG')
print(f'Wrote {output_file}')

Wrote median_2023_05.tif
