# Creating a Median Composite with Dask

We will query a STAC catalog for Sentinel-2 imagery and create a monthly cloud-free composite using distributed processing on a local machine.

## Setup and Data Download

The following blocks of code will install the required packages and download the datasets to your Colab environment.

In [12]:
%%capture
if 'google.colab' in str(get_ipython()):
    !pip install pystac-client
    !apt install libspatialindex-dev
    !pip install fiona shapely pyproj rtree
    !pip install geopandas folium stackstac rioxarray mapclassify

In [13]:
import json
import geopandas as gpd
from shapely.geometry import mapping
import pandas as pd
import pystac_client
import os
import folium
from folium import Figure
import stackstac
import rioxarray
import matplotlib.pyplot as plt
import mapclassify
import dask

In [14]:
from dask.distributed import Client, progress
client = Client()  # set up local cluster on the machine
client

Perhaps you already have a cluster running?
Hosting the HTTP server on port 55183 instead


0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:55183/status,

0,1
Dashboard: http://127.0.0.1:55183/status,Workers: 4
Total threads: 8,Total memory: 15.79 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:55184,Workers: 4
Dashboard: http://127.0.0.1:55183/status,Total threads: 8
Started: Just now,Total memory: 15.79 GiB

0,1
Comm: tcp://127.0.0.1:55203,Total threads: 2
Dashboard: http://127.0.0.1:55208/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:55187,
Local directory: C:\Users\ujava\AppData\Local\Temp\dask-scratch-space\worker-mp28sao0,Local directory: C:\Users\ujava\AppData\Local\Temp\dask-scratch-space\worker-mp28sao0

0,1
Comm: tcp://127.0.0.1:55204,Total threads: 2
Dashboard: http://127.0.0.1:55207/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:55189,
Local directory: C:\Users\ujava\AppData\Local\Temp\dask-scratch-space\worker-dsro8nwe,Local directory: C:\Users\ujava\AppData\Local\Temp\dask-scratch-space\worker-dsro8nwe

0,1
Comm: tcp://127.0.0.1:55212,Total threads: 2
Dashboard: http://127.0.0.1:55213/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:55191,
Local directory: C:\Users\ujava\AppData\Local\Temp\dask-scratch-space\worker-7i9clb9s,Local directory: C:\Users\ujava\AppData\Local\Temp\dask-scratch-space\worker-7i9clb9s

0,1
Comm: tcp://127.0.0.1:55205,Total threads: 2
Dashboard: http://127.0.0.1:55206/status,Memory: 3.95 GiB
Nanny: tcp://127.0.0.1:55193,
Local directory: C:\Users\ujava\AppData\Local\Temp\dask-scratch-space\worker-bxgh8pll,Local directory: C:\Users\ujava\AppData\Local\Temp\dask-scratch-space\worker-bxgh8pll


In [15]:
data_folder = 'data'
output_folder = 'output'

if not os.path.exists(data_folder):
    os.mkdir(data_folder)
if not os.path.exists(output_folder):
    os.mkdir(output_folder)

In [16]:
def download(url):
    filename = os.path.join(data_folder, os.path.basename(url))
    if not os.path.exists(filename):
        from urllib.request import urlretrieve
        local, _ = urlretrieve(url, filename)
        print('Downloaded ' + local)

download('https://github.com/spatialthoughts/python-tutorials/raw/main/data/' +
         'bangalore.geojson')

## Procedure

Let's use Element84 search endpoint to look for items from the `sentinel-2-c1-l2a` collection on AWS

In [17]:
catalog = pystac_client.Client.open('https://earth-search.aws.element84.com/v1')

In [18]:
aoi_file = 'bangalore.geojson'
aoi_filepath = os.path.join(data_folder, aoi_file)
aoi = gpd.read_file(aoi_filepath)

In [19]:
geometry = aoi.unary_union
geometry_geojson = json.dumps(mapping(geometry))

We search for the imagery collected within the date range and intersecting the AOI geometry. Additionally we add filters to select imagery with less cloud cover and over a specific MGRS tile.

In [20]:
year = 2023
month = 5
time_range = f'{year}-{month:02}'

search = catalog.search(
    collections=['sentinel-2-c1-l2a'],
    intersects=geometry_geojson,
    datetime=time_range,
    query={'eo:cloud_cover': {'lt': 30},  'mgrs:grid_square': {'eq': 'GQ'}},
)
items = search.item_collection()
len(items)

3

In [21]:
stack = stackstac.stack(items, resolution=10)
stack

  times = pd.to_datetime(


Unnamed: 0,Array,Chunk
Bytes,53.10 GiB,8.00 MiB
Shape,"(3, 19, 11179, 11184)","(1, 1, 1024, 1024)"
Dask graph,6897 chunks in 3 graph layers,6897 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 53.10 GiB 8.00 MiB Shape (3, 19, 11179, 11184) (1, 1, 1024, 1024) Dask graph 6897 chunks in 3 graph layers Data type float64 numpy.ndarray",3  1  11184  11179  19,

Unnamed: 0,Array,Chunk
Bytes,53.10 GiB,8.00 MiB
Shape,"(3, 19, 11179, 11184)","(1, 1, 1024, 1024)"
Dask graph,6897 chunks in 3 graph layers,6897 chunks in 3 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


Clip and select the subset of bands.

In [29]:
geometry = aoi.to_crs(stack.rio.crs).geometry
clipped = stack.rio.clip(geometry)
subset = clipped.sel(band=['red', 'green', 'blue', 'scl'])

In [30]:
scl = subset.sel(band='scl')

valid = ((scl >= 4) & (scl <= 7) | (scl==11))
         
subset_masked = subset.where(valid)
rgb_masked = subset_masked.sel(band=['red', 'green', 'blue'])

In [31]:
median = rgb_masked.median(dim='time')
median

Unnamed: 0,Array,Chunk
Bytes,275.32 MiB,8.00 MiB
Shape,"(3, 3427, 3510)","(1, 1024, 1024)"
Dask graph,48 chunks in 19 graph layers,48 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray
"Array Chunk Bytes 275.32 MiB 8.00 MiB Shape (3, 3427, 3510) (1, 1024, 1024) Dask graph 48 chunks in 19 graph layers Data type float64 numpy.ndarray",3510  3427  3,

Unnamed: 0,Array,Chunk
Bytes,275.32 MiB,8.00 MiB
Shape,"(3, 3427, 3510)","(1, 1024, 1024)"
Dask graph,48 chunks in 19 graph layers,48 chunks in 19 graph layers
Data type,float64 numpy.ndarray,float64 numpy.ndarray


In [32]:
%time median = median.compute()

This may cause some slowdown.
Consider scattering data ahead of time and using futures.


CPU times: total: 36.3 s
Wall time: 6min 15s


In [33]:
output_file = f'median_{year}_{month:02}.tif'
output_path = os.path.join(output_folder, output_file)
median.rio.to_raster(output_path, driver='COG')
print(f'Wrote {output_file}')

Wrote median_2023_05.tif
