# Xarray / Dask Climatology Benchmark

Notebook designed to debug the issue described in <https://github.com/dask/distributed/issues/2602>

This has been tested with Dask 2020.12.0 and Dask 2021.07.1.

In [None]:
import dask
dask.__version__

## Dask Cluster Settings

In [None]:
nworkers = 30
worker_memory = 8
worker_cores = 1
use_MALLOC_TRIM_THRESHOLD = True

In [None]:
from dask_gateway import Gateway
g = Gateway()
options = g.cluster_options()
# set the options programatically, or through their HTML repr
options.worker_memory = worker_memory
options.worker_cores = worker_cores
if use_MALLOC_TRIM_THRESHOLD:
    options.environment = {"MALLOC_TRIM_THRESHOLD_": "0"}

display(options)
cluster = g.new_cluster(options)
cluster

In [None]:
cluster.scale(nworkers)
client = cluster.get_client()
client.wait_for_workers(nworkers)
client

## Synthetic Data Example

In [None]:
import dask.array as dsa
import numpy as np
import xarray as xr

data = dsa.random.random((10000, 1000000), chunks=(1, 1000000))
da = xr.DataArray(data, dims=['time', 'x'],
                  coords={'day': ('time', np.arange(10000) % 100)})
clim = da.groupby('day').mean(dim='time')
anom = da.groupby('day') - clim
anom_mean = anom.mean(dim='time')

In [None]:
# without use_MALLOC_TRIM_THRESHOLD, workers die
# with use_MALLOC_TRIM_THRESHOLD:
#  Dask 2020.12.0: 1min 46s
#  Dask 2021.07.1: 1min 41s

%time anom_mean.load()

## Real Data Example

In [None]:
from intake import open_catalog
cat = open_catalog("https://raw.githubusercontent.com/pangeo-data/pangeo-datastore/master/intake-catalogs/ocean.yaml")
ds  = cat["sea_surface_height"].to_dask()
ds

In [None]:
sla = ds.sla
sla_gb = sla.groupby('time.dayofyear')
sla_clim = sla_gb.mean(dim='time')
sla_anom = sla_gb - sla_clim
sla_anom_std = sla_anom.std(dim='time')
sla_anom_std

In [None]:
# With Dask 2020.12.0: 2min 58s
# With Dask 2021:07.1: 2min 6s

%time sla_anom_std.load()

In [None]:
sla_anom_std.plot(figsize=(20, 12))

In [None]:
client.close()
cluster.scale(0)
cluster.close()