# Best-practices for Cloud-Optimized Geotiffs


**Part 3. Dask LocalCluster**

As the number of COGs starts to grow you can quickly excede the amount of RAM on your system. This is where a Dask Cluster can be extremely useful. A LocalCluster is able to utlize all your CPUs and will manage your RAM such that you shouldn't get 'out of memory' errors when running computations. Often this amount of parallelism is all you need for efficient data exploration and analysis.

In this notebook we'll focus on computing the temporal mean for a stack of COGS that excede our notebook memory (8GB)

In [None]:
import xarray as xr
import s3fs
import pandas as pd
import os 

import dask
from dask.distributed import Client, LocalCluster, progress

In [None]:
env = dict(GDAL_DISABLE_READDIR_ON_OPEN='EMPTY_DIR', 
           AWS_NO_SIGN_REQUEST='YES',
           GDAL_MAX_RAW_BLOCK_CACHE_SIZE='200000000',
           GDAL_SWATH_SIZE='200000000',
           VSI_CURL_CACHE_SIZE='200000000')
os.environ.update(env)

In [None]:
%%time 

s3 = s3fs.S3FileSystem(anon=True)
objects = s3.glob('sentinel-s1-rtc-indigo/tiles/RTC/1/IW/10/T/ET/**Gamma0_VV.tif')
images = ['s3://' + obj for obj in objects]
print(len(images))
images.sort(key=lambda x: x[-32:-24]) #sort list in place by date in filename
images[:6] #january 2020 scenes

In [None]:
# Let's use first 100 images for simplicity
images = images[:100]
dates = [pd.to_datetime(x[-32:-24]) for x in images]

### Load in series (no dask)

skip these cells and go to Dask if you want to avoid local caching from previous steps

In [None]:
%%time
# ~8.5s

# 100 images, 12 GB uncompressed
# All the image metadata = ~275 images, 33GB uncompressed


dataArrays = [xr.open_rasterio(url, chunks={}) for url in images]
# note use of join='override' b/c we know these COGS have the same coordinates
da = xr.concat(dataArrays, dim='band', join='override', combine_attrs='drop').rename(band='time')
da['time'] = dates
da

In [None]:
# as a benchmark, let's say we want to calculate the mean of each of these COGs.
# we can just loop over all 100 images if each calculation is ~1.5s (based on single-cog notebook) that should take ~2.5 min:
100 * 1.5 / 60

# we should be able to do better than this though since each COG can be operated on independently...

### Load in parallel (dask w/ threads)

In [None]:
cluster = LocalCluster(processes=False, local_directory='/tmp') 
client = Client(cluster) 
client
# NOTES: 
# dask workers write to SSD (/tmp) rather than home directory NFS mount
# 1 worker, 4 cores --> 1 process w/ 4 threads
# Open 'Dask 'Graph', 'Task Steam', and 'Workers' from labextension to see computation progress

In [None]:
@dask.delayed
def lazy_open(href):
    chunks=dict(band=1, x=2745, y=2745)
    return xr.open_rasterio(href, chunks=chunks) 

In [None]:
%%time 

# ~10s ... basically loading in series (file locks?)
# picks up cache if run again (300ms)

dataArrays = dask.compute(*[lazy_open(href) for href in images])
da = xr.concat(dataArrays, dim='band', join='override', combine_attrs='drop').rename(band='time')
da['time'] = dates
da

In [None]:
# lets say we want the spatial mean of each COG. We can operate on each of these 278 files simultaneously ("embarrassingly parallel")
# workers should be able to operate on each COG in isolation and just return a single result

# It can be helpful to look at the task graph for a single COG like so:
da.isel(time=0).mean(dim=['x','y']).data.visualize(optimize_graph=True, rankdir='LR')

In [None]:
%%time

# 2min 25s
# spatial mean of each COG (output = 278x1 vector)
# task stream shows that this actually goes in series (due to xr.open_rasterio file lock?)

da.mean(dim=['x','y']).compute()

In [None]:
%%time

# ~ 2 min 32s
# temporal mean of all COGs (output = 5490x5490 array)
# this workflow requires pulling (nCOGS x chunk size) into worker RAM to get mean through time for each chunk (3GB)

da.mean(dim='time').compute()

In [None]:
# GOTCHAS: The following is not a good idea because the output is the full uncompressed DataArray in local memory, 
# so we eventually hit RAM limits and start writing bytes to disk instead of RAM or the computation fails

#scaled = da + 100
#scaled.compute() 

#### recap

* The initial load of this dataset is slow b/c each thread is reading metadata sequentially 
* subsequent calls to da are an order of magnitude faster b/c the file handles and metadata are cached locally
* computations can be slow (maybe due to file read locks preventing simultaneous operations)
* threads are good for computations where memory needs to be shared by tasks (e.g. temporal mean for many COGs)
* might want to experiment with chunk sizes (30--> 100MB), there should be less network requests that way...

### Load in parallel (dask w/ processes)

Restart the kernel before running this section to avoid cache in timing

In [None]:
#processes=True allows us to open COGs in parallel, circumventing locks. should be faster by a factor of 'nCores'
# we have 4 by default on this machine
cluster = LocalCluster(local_directory='/tmp') #processes=True by default
client = Client(cluster) 
client

In [None]:
%%time 

# ~3.7s first run, 1.6s subsequent run (caching but maybe cache is separate per process?),
dataArrays = dask.compute(*[lazy_open(href) for href in images])
da = xr.concat(dataArrays, dim='band', join='override', combine_attrs='drop').rename(band='time')
da['time'] = dates
da

In [None]:
%%time

# 42.8s

da.mean(dim=['x','y']).compute()

In [None]:
%%time

# Kernel dies :(

# temporal mean of all COGs (output = 5490x5490 array)
# this workflow requires pulling (nCOGS x chunk size) into worker RAM to get mean through time for each chunk (3GB)
# because each processe uses it's own RAM with a max of 2GB, we are forced to do some writing to disk and this is super slow.

da.mean(dim='time').compute() # task stream is very inefficient here with high memory use!

### recap

* processes=True is great for dask delayed opening a bunch of datasets
* it's also great for tasks where workers don't need to communicate information
* it is really bad if tasks need to store a lot of intermediate results in memory (workers start writing to disk instead of RAM) 

### Best of both worlds?

Turns out you can mix and match dask cluster operations in a workflow. For example:

In [None]:
%%time 

with LocalCluster(local_directory='/tmp') as cluster, Client(cluster) as client:
    dataArrays = dask.compute(*[lazy_open(href) for href in images])
    da = xr.concat(dataArrays, dim='band', join='override', combine_attrs='drop').rename(band='time')
    da['time'] = dates
    spatial_means = da.mean(dim=['x','y']).compute()

with LocalCluster(processes=False, local_directory='/tmp') as cluster, Client(cluster) as client:
    temporal_mean = da.mean(dim='time').compute()

### recap

* if performance is what you're going for, you might need to mix and match dask settings with processes and threads
* while COGs are loaded as Dask Arrays via xarray, references to files and file locks can complicated parallelization