# Sentinel 2 on AWS

Python libraries: [satsearch](https://github.com/sat-utils/sat-search), [intake-stac](https://github.com/intake/intake-stac), [geopandas](https://github.com/geopandas/geopandas), [xarray](https://github.com/pydata/xarray),  [dask](https://github.com/dask/dask), [holoviz](https://holoviz.org)

[Cloud Native Geoprocessing of Earth Observation Satellite Data with Pangeo](https://medium.com/pangeo/cloud-native-geoprocessing-of-earth-observation-satellite-data-with-pangeo-997692d91ca2)

## Finding data on the Cloud 

The SpatioTemporal Asset Catalog (STAC): metadata catalog standard

https://www.element84.com/earth-search/ allows us to programmatically and visually search for data on AWS! Here we will use the [satsearch Python library](https://github.com/sat-utils/sat-search) for searching Sentinel2 on AWS

In [28]:
# Suppress library deprecation warnings
import warnings
warnings.filterwarnings('ignore')

In [29]:
import satsearch
print(satsearch.__version__)
print(satsearch.config.API_URL)
from satsearch import Search

0.2.3
https://earth-search-legacy.aws.element84.com


In [18]:
search = Search(bbox=[30.5, 1, 32, 2.5],
               datetime='2019-04-12/2019-04-13',
               query=["eo:cloud_cover<25"],
               collection='sentinel-2-l1c'
               )
print('%s items' % search.found())

items = search.items()
print('%s items' % len(items))
print('%s collections' % len(items._collections))
print(items._collections)

items.save('subset.geojson')
# for item in items:
#     print(item)

6 items
6 items
1 collections
[sentinel-2-l1c]


In [30]:
# NOTE this STAC API endpoint does not currently search the entire catalog

bbox = (30.5, 1, 32, 2.5) # (min lon, min lat, max lon, max lat) 

timeRange = '2019-04-12/2019-04-30'

# STAC metadata properties
properties =  ['sentinel:latitude_band=N',
               'sentinel:grid_square=TG'] 

results = Search.search(collection='sentinel-2-l1c', 
                        bbox=bbox,
                        datetime=timeRange,
                        property=properties,
                        sort=['<datetime'],
                        query=["eo:cloud_cover<25"]
                        )

print('%s items' % results.found())
items = results.items()
print(items._collections)
items.save('my-s2-cogs.json')
type(items)

4 items
[sentinel-2-l1c]


satstac.itemcollection.ItemCollection

In [31]:
# Remember that it is easy to load geojson with geopandas!
import geopandas as gpd

gf = gpd.read_file('my-s2-cogs.json')
gf.head()

Unnamed: 0,id,collection,eo:gsd,eo:instrument,eo:off_nadir,eo:bands,datetime,eo:platform,eo:cloud_cover,sentinel:utm_zone,sentinel:latitude_band,sentinel:grid_square,sentinel:sequence,sentinel:product_id,geometry
0,S2B_36NTG_20190413_0,sentinel-2-l1c,10,MSI,0,"[ { ""name"": ""B01"", ""common_name"": ""coastal"", ""...",2019-04-13T08:39:57.837002+00:00,sentinel-2b,96.75,36,N,TG,0,S2B_MSIL1C_20190413T080609_N0207_R078_T36NTG_2...,"POLYGON ((30.30453 0.81553, 30.30347 1.80779, ..."
1,S2A_36NTG_20190418_0,sentinel-2-l1c,10,MSI,0,"[ { ""name"": ""B01"", ""common_name"": ""coastal"", ""...",2019-04-18T08:36:11.969000+00:00,sentinel-2a,3.4,36,N,TG,0,S2A_MSIL1C_20190418T080611_N0207_R078_T36NTG_2...,"POLYGON ((30.30453 0.81553, 30.30347 1.80779, ..."
2,S2B_36NTG_20190423_0,sentinel-2-l1c,10,MSI,0,"[ { ""name"": ""B01"", ""common_name"": ""coastal"", ""...",2019-04-23T08:35:02.388000+00:00,sentinel-2b,34.98,36,N,TG,0,S2B_MSIL1C_20190423T080619_N0207_R078_T36NTG_2...,"POLYGON ((30.30453 0.81553, 30.30347 1.80779, ..."
3,S2A_36NTG_20190428_0,sentinel-2-l1c,10,MSI,0,"[ { ""name"": ""B01"", ""common_name"": ""coastal"", ""...",2019-04-28T08:29:52.180000+00:00,sentinel-2a,17.28,36,N,TG,0,S2A_MSIL1C_20190428T080611_N0207_R078_T36NTG_2...,"POLYGON ((30.30453 0.81553, 30.30347 1.80779, ..."


In [32]:
# Tidy display of band information from the 'eo:bands column'
import ast
import pandas as pd
band_info = pd.DataFrame(ast.literal_eval(gf.iloc[0]['eo:bands']))
band_info

Unnamed: 0,name,common_name,gsd,center_wavelength,full_width_half_max
0,B01,coastal,60,0.4439,0.027
1,B02,blue,10,0.4966,0.098
2,B03,green,10,0.56,0.045
3,B04,red,10,0.6645,0.038
4,B05,,20,0.7039,0.019
5,B06,,20,0.7402,0.018
6,B07,,20,0.7825,0.028
7,B08,nir,10,0.8351,0.145
8,B8A,,20,0.8648,0.033
9,B09,,60,0.945,0.026


In [33]:
# Plot search AOI and frames on a map using Holoviz Libraries (more on these later)
import geoviews as gv
import hvplot.pandas

cols = gf.loc[:,('id','geometry')]

footprints = cols.hvplot(geo=True, line_color='k', alpha=0.1, title='Sentinel')
tiles = gv.tile_sources.CartoEco.options(width=700, height=500) 
labels = gv.tile_sources.StamenLabels.options(level='annotation')
tiles * footprints * labels

## ipywidgets

[ipywidgets](https://ipywidgets.readthedocs.io/en/latest/) provide another convenient approach to custom visualizations within JupyterLab. The function below allows us to browse through all the image thumbnails for a group of images (more specifically a specific Landsat8 path and row). 

In [9]:
from ipywidgets import interact
from IPython.display import display, Image

def browse_images(items):
    n = len(items)

    def view_image(i=0):
        item = items[i]
        print(f"id={item.id}\tdate={item.datetime}\tcloud%={item['eo:cloud_cover']}")
        display(Image(item.asset('thumbnail')['href']))
    
    interact(view_image, i=(0,n-1))

In [10]:
browse_images(items)

interactive(children=(IntSlider(value=0, description='i', max=3), Output()), _dom_classes=('widget-interact',)…

### Intake-STAC

the intake-stac library allows us to easily load these scenes described with STAC metadata into xarray DataArrays! NOTE this library is very new and will likely undergo changes in the near future. https://github.com/pangeo-data/intake-stac

In [34]:
# if you've installed intake-STAC, it will automatically import alongside intake
import intake
catalog = intake.open_stac_item_collection(items)

In [35]:
list(catalog)

['S2B_36NTG_20190413_0',
 'S2A_36NTG_20190418_0',
 'S2B_36NTG_20190423_0',
 'S2A_36NTG_20190428_0']

In [50]:
item = catalog['S2A_36NTG_20190428_0']
# list(item)
item.B02.raster

ValueError: No plugins loaded for this entry: image/jp2
A listing of installable plugins can be found at https://intake.readthedocs.io/en/latest/plugin-directory.html .

In [43]:
type(item['B02'])

ValueError: No plugins loaded for this entry: image/jp2
A listing of installable plugins can be found at https://intake.readthedocs.io/en/latest/plugin-directory.html .

In [39]:
# Let's work with the Geotiffs using Xarray
# NOTE that you don't have to specify the URL or filePath!

import xarray as xr

da = item.B04(chunks=dict(band=1, y=2048, x=2048)).to_dask()
da

ValueError: No plugins loaded for this entry: image/jp2
A listing of installable plugins can be found at https://intake.readthedocs.io/en/latest/plugin-directory.html .

## Dask Chunks and Cloud Optimized Geotiffs

Since we didn't specify chunk sizes, everything is read as one chunk. When we load larger sets of imagery
we can change these chunk sizes to use dask. It's best to align dask chunks with the way image chunks (typically called "tiles" are stored on disk or cloud storage buckets. The landsat data is stored on AWS S3 in a tiled Geotiff format where tiles are 512x512, so we should pick som multiple of that, and typically aim for chunksizes of ~100Mb (although this is subjective). You can read more about dask chunks here: https://docs.dask.org/en/latest/array-best-practices.html

Also check out this documentation about the Cloud-optimized Geotiff format, it is an excellent choice for putting satellite raster data on Cloud storage: https://www.cogeo.org/

In [13]:
# Intake-STAC Item to chunked dask array
da = item.B2(chunks=dict(band=1, x=2048, y=2048)).to_dask()
da.data

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(1, 7971, 7871)","(1, 2048, 2048)"
Count,17 Tasks,16 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 125.48 MB 8.39 MB Shape (1, 7971, 7871) (1, 2048, 2048) Count 17 Tasks 16 Chunks Type uint16 numpy.ndarray",7871  7971  1,

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(1, 7971, 7871)","(1, 2048, 2048)"
Count,17 Tasks,16 Chunks
Type,uint16,numpy.ndarray


In [14]:
# Let's load all the bands into an xarray dataset!
# Stick to bands that have the same Ground Sample Distance for simplicity
bands = band_info.query('gsd == 30').common_name.to_list()
bands

['coastal', 'blue', 'green', 'red', 'nir', 'swir16', 'swir22', 'cirrus']

In [15]:
stacked = item.stack_bands(bands)
da = stacked(chunks=dict(band=1, x=2048, y=2048)).to_dask()
da

Unnamed: 0,Array,Chunk
Bytes,1.00 GB,8.39 MB
Shape,"(8, 7971, 7871)","(1, 2048, 2048)"
Count,264 Tasks,128 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 1.00 GB 8.39 MB Shape (8, 7971, 7871) (1, 2048, 2048) Count 264 Tasks 128 Chunks Type uint16 numpy.ndarray",7871  7971  8,

Unnamed: 0,Array,Chunk
Bytes,1.00 GB,8.39 MB
Shape,"(8, 7971, 7871)","(1, 2048, 2048)"
Count,264 Tasks,128 Chunks
Type,uint16,numpy.ndarray


In [16]:
# If we want we can convert this to an xarray dataset, with variable names corresponding to common names
# This is all very fast because we are only reading metadata
da['band'] = bands
ds = da.to_dataset(dim='band')
print('Dataset size: [Gb]', ds.nbytes/1e9)
ds

Dataset size: [Gb] 1.003962592


Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 125.48 MB 8.39 MB Shape (7971, 7871) (2048, 2048) Count 280 Tasks 16 Chunks Type uint16 numpy.ndarray",7871  7971,

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 125.48 MB 8.39 MB Shape (7971, 7871) (2048, 2048) Count 280 Tasks 16 Chunks Type uint16 numpy.ndarray",7871  7971,

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 125.48 MB 8.39 MB Shape (7971, 7871) (2048, 2048) Count 280 Tasks 16 Chunks Type uint16 numpy.ndarray",7871  7971,

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 125.48 MB 8.39 MB Shape (7971, 7871) (2048, 2048) Count 280 Tasks 16 Chunks Type uint16 numpy.ndarray",7871  7971,

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 125.48 MB 8.39 MB Shape (7971, 7871) (2048, 2048) Count 280 Tasks 16 Chunks Type uint16 numpy.ndarray",7871  7971,

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 125.48 MB 8.39 MB Shape (7971, 7871) (2048, 2048) Count 280 Tasks 16 Chunks Type uint16 numpy.ndarray",7871  7971,

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 125.48 MB 8.39 MB Shape (7971, 7871) (2048, 2048) Count 280 Tasks 16 Chunks Type uint16 numpy.ndarray",7871  7971,

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray
"Array Chunk Bytes 125.48 MB 8.39 MB Shape (7971, 7871) (2048, 2048) Count 280 Tasks 16 Chunks Type uint16 numpy.ndarray",7871  7971,

Unnamed: 0,Array,Chunk
Bytes,125.48 MB,8.39 MB
Shape,"(7971, 7871)","(2048, 2048)"
Count,280 Tasks,16 Chunks
Type,uint16,numpy.ndarray


In [17]:
# Computations or visualizations trigger the streaming of bytes from S3 into memory
# Here we will compute the mean for a number of pixels for a box defined by easting and northings
zonal_mean = ds['nir'].sel(x=slice(4.99e5, 5.03e5), y=slice(5.244e6, 5.238e6)).mean()
zonal_mean.compute()

## Hvplot

HoloViz is a coordinated effort to make browser-based data visualization in Python easier to use, easier to learn, and more powerful! https://holoviz.org One particularly powerful library is `hvplot`, which allows for interactive visualizations of pandas dataframes or xarray DataArrays. With this tool you pull data on-the-fly only as required by zoom level and band selection.

In [18]:
import hvplot.xarray

da.hvplot.image(groupby='band', rasterize=True, dynamic=True, cmap='magma',
                width=700, height=500,  widget_location='left')

## Dask-Gateway Cluster

If we don't specify a specific cluster, dask will use the cores on the machine we are running our notebook on instead, lets connect to a Dask-Gateway cluster. You can read more about this cluster at https://gateway.dask.org/. **It can take a few minutes for the cluster to become ready for computing. This is because EC2 machines are being created for you behind the scenes**. Monitor the 'Dashboard' link to see cluster activity.

In [19]:
from dask_gateway import GatewayCluster
from distributed import Client

cluster = GatewayCluster()
cluster.adapt(minimum=2, maximum=20) #Keep a minimum of 2 workers, but allow for scaling up to 20 if there is RAM and CPU pressure
client = Client(cluster) #Make sure dask knows to use this cluster for computations
cluster

ValueError: No dask-gateway address provided or found in configuration

In [None]:
%%time

# First let's construct a large dataset with all the scenes in our search so that we
# have a time dimension
# Loop through geopandas geodataframe (each row is a STAC ITEM)
import dask

@dask.delayed
def stacitem_to_dataset(item):
    print(item.id)
    stacked = catalog[item.id].stack_bands(bands)
    da = stacked(chunks=dict(band=1, x=8000, y=2048)).to_dask()
    da['band'] = bands # use common names
    da = da.expand_dims(time=[pd.to_datetime(item.datetime)])
    ds = da.to_dataset(dim='band')
    return ds

lazy_datasets = []
for i,item in gf.iterrows():
    ds = stacitem_to_dataset(item)
    lazy_datasets.append(ds)
    
datasets = dask.compute(*lazy_datasets)

In [None]:
DS = xr.concat(datasets, dim='time')

In [None]:
print('Dataset size: [Gb]', DS.nbytes/1e9)
DS

## Distributed computations

We'll calculate the classic NDVI index with all our data

NOTE that you should be using Landsat ARD data (https://www.usgs.gov/land-resources/nli/landsat/us-landsat-analysis-ready-data) for this with atmospheric corrections! 
this is just to illustrate the intuitive syntax of xarray


In [None]:
NDVI = (DS['nir'] - DS['red']) / (DS['nir'] + DS['red'])
NDVI

In [None]:
NDVI.data

### A note on distributed versus local memory

In [None]:
#ndvi_my_memory = NDVI.compute() # compute pulls computation results into notebook process
NDVI = NDVI.persist() # persist keeps computation results on the dask cluster

In [None]:
# Plotting pulls data from distributed cluster memory on-demand
NDVI.hvplot.image(groupby='time', x='x',y='y', 
                  cmap='BrBG', clim=(-1,1),
                  rasterize=True, dynamic=True, 
                  width=700, height=500)

In [None]:
# Grab a subset and save as a netcdf file
sub = NDVI.sel(x=slice(4.5e5,5.0e5), y=slice(5.25e6,5.2e6)).mean(dim=['time'])
sub.hvplot.image(rasterize=True, dynamic=True, width=700, height=500, cmap='greens')

In [None]:
myda = sub.compute() #pull subset to local memory first, some formats allow distributed writing too
myda.to_netcdf(path='myndvi.nc', engine='h5netcdf')

In [None]:
round_trip = xr.load_dataarray('myndvi.nc')

In [None]:
round_trip

In [None]:
# Be a good citizen and always explicitly shut down computing resources when you're not using them!
# client.close()
# cluster.close()