## Investigating Dask Performance

Relates to Github Issue https://github.com/dcs4cop/xcube/issues/544

See "# TODO: expensive stmt" here: https://github.com/dcs4cop/xcube/blob/forman-544-spat_resampl_too_slow/xcube/core/gridmapping/coords.py

In [None]:
import dask
import dask.array as da
import numpy as np
import time

from xcube.core.gridmapping.coords import new_grid_mapping_from_coords

Set up dask dashboard: https://github.com/dask/dask-labextension

In [None]:
dask.config.set(scheduler='threads')

from distributed import Client
client = Client()
client.dashboard_link

Open Dataset with 1D-GridMapping:

In [None]:
from xcube.core.store import new_data_store

store = new_data_store("s3", root='agriculture-vlab-data-staging', max_depth=5, storage_options=dict(anon=True))
ds_1d = store.open_data('avl/l3b/bel/S2_L3B_LAI_31UFS.zarr')
ds_1d

In [None]:
from xcube.core.gridmapping.cfconv import get_dataset_grid_mapping_proxies

grid_mapping_proxies = get_dataset_grid_mapping_proxies(ds_1d)
gmp_1d = list(grid_mapping_proxies.values())[0]
gmp_1d

In [None]:
%%time
new_grid_mapping_from_coords(gmp_1d.coords.x, gmp_1d.coords.y, gmp_1d.crs)

Open Dataset with 2D-GridMapping:

In [None]:
from xcube_cci.zarraccess import CciZarrDataStore
cci_zarr_store = CciZarrDataStore()
ds_2d = cci_zarr_store.open_data(
    'ESACCI-SEAICE-L3C-SITHICK-SIRAL_CRYOSAT2-NH25KMEASE2-2010-2017-fv2.0.zarr')
ds_2d

In [None]:
grid_mapping_proxies = get_dataset_grid_mapping_proxies(ds_2d)
gmp_2d = list(grid_mapping_proxies.values())[0]
gmp_2d

In [None]:
%%time
new_grid_mapping_from_coords(gmp_2d.coords.x, gmp_2d.coords.y, gmp_2d.crs)

In [None]:
def _prepare_dask_arrays(ds):
    x = da.asarray(ds.lon)
    y = da.asarray(ds.lat)

    x_x_diff = _abs_no_nan(da.diff(x, axis=1))
    x_y_diff = _abs_no_nan(da.diff(x, axis=0))
    y_x_diff = _abs_no_nan(da.diff(y, axis=1))
    y_y_diff = _abs_no_nan(da.diff(y, axis=0))

    x_x_diff_c = da.concatenate([x_x_diff, x_x_diff[:, -1:]], axis=1)
    y_x_diff_c = da.concatenate([y_x_diff, y_x_diff[:, -1:]], axis=1)
    x_y_diff_c = da.concatenate([x_y_diff, x_y_diff[-1:, :]], axis=0)
    y_y_diff_c = da.concatenate([y_y_diff, y_y_diff[-1:, :]], axis=0)

    x_abs_diff = da.sqrt(da.square(x_x_diff_c) + da.square(x_y_diff_c))
    y_abs_diff = da.sqrt(da.square(y_x_diff_c) + da.square(y_y_diff_c))

    x_abs_diff_r = da.radians(x_abs_diff)
    y_abs_diff_r = da.radians(y_abs_diff)
    x_abs_diff = 6371000 * da.cos(x_abs_diff_r) * y_abs_diff_r
    y_abs_diff = 6371000 * y_abs_diff_r

    xy_areas = (x_abs_diff * y_abs_diff).flatten()
    xy_areas = da.where(xy_areas > 0, xy_areas, np.nan)

    xy_area_index_min = da.nanargmin(xy_areas)
    xy_area_index_max = da.nanargmax(xy_areas)
            
    return x, y, xy_areas, x_y_diff, y_x_diff, x_x_diff

def _abs_no_nan(array):
    array = np.fabs(array)
    return np.where(np.logical_or(np.isnan(array), 
                                  np.isclose(array, 0)), 0, array)

def _call_separately(x, y, xy_areas, x_y_diff, y_x_diff, x_x_diff):
    da.nanargmin(xy_areas).compute()
    da.nanargmax(xy_areas).compute()
    da.all(x_y_diff == 0).compute()
    da.all(y_x_diff == 0).compute()
    da.any(da.max(x_x_diff) > 180).compute()
    da.any(da.max(x_y_diff) > 180).compute()
    x.min().compute()
    y.min().compute()

In [None]:
x, y, xy_areas, x_y_diff, y_x_diff, x_x_diff = _prepare_dask_arrays(ds_2d)
y.min().visualize()

In [None]:
%%time
x, y, xy_areas, x_y_diff, y_x_diff, x_x_diff = _prepare_dask_arrays(ds_2d)
_call_separately(x, y, xy_areas, x_y_diff, y_x_diff, x_x_diff)

In [None]:
optimized_graph = dask.optimize(da.nanargmin(xy_areas),
                                da.nanargmax(xy_areas),
                                da.all(x_y_diff == 0),
                                da.all(y_x_diff == 0),
                                da.any(da.max(x_x_diff) > 180),
                                da.any(da.max(x_y_diff) > 180),
                                x.min(),
                                y.min())
optimized_graph

In [None]:
optimized_graph[7].visualize()

In [None]:
%%time
results = dask.compute(*optimized_graph)
results

In [None]:
%%time
results = dask.compute(da.nanargmin(xy_areas),
                       da.nanargmax(xy_areas),
                       da.all(x_y_diff == 0),
                       da.all(y_x_diff == 0),
                       da.any(da.max(x_x_diff) > 180),
                       da.any(da.max(x_y_diff) > 180),
                       x.min(),
                       y.min())
results

Problem 1: In the code, some graph results must be ready before other graphs can be constructed.

In [None]:
%%time
gm_graph = dask.optimize(new_grid_mapping_from_coords(gmp_2d.coords.x, gmp_2d.coords.y, gmp_2d.crs))
gm_graph

Problem 2: compute and optimize only work on dask objects. Functions will not be evaluated any further for dask elements

Next attempt: Use dask.delayed instead

In [None]:
%%time
gmp_delayed = dask.delayed(new_grid_mapping_from_coords)(gmp_2d.coords.x, 
                                                        gmp_2d.coords.y, 
                                                        gmp_2d.crs)
gmp_delayed

In [None]:
gmp_delayed.compute()

In [None]:
%%time
from xcube.core.gridmapping.coords import _get_cls_params_2d

_get_cls_params_2d(gmp_2d.coords.x, 
                   gmp_2d.coords.y, 
                   is_lon_360=False, 
                   crs=gmp_2d.crs, 
                   tile_size=None,
                   tolerance=1e-5)

In [None]:
cls_params_2d = dask.delayed(_get_cls_params_2d)(gmp_2d.coords.x, 
                                                 gmp_2d.coords.y, 
                                                 is_lon_360=False, 
                                                 crs=gmp_2d.crs, 
                                                 tile_size=None,
                                                 tolerance=1e-5)
cls_params_2d

In [None]:
%%time
cls_params_2d.compute()

In [None]:
%%time
from xcube.core.gridmapping.coords import new_grid_mapping_from_coords_optimized

gmo = new_grid_mapping_from_coords_optimized(gmp_2d.coords.x, 
                                             gmp_2d.coords.y, 
                                             gmp_2d.crs)
gmo

In [None]:
%%time
from xcube.core.gridmapping.coords import new_grid_mapping_from_coords_optimized

gmo_1d = new_grid_mapping_from_coords_optimized(gmp_1d.coords.x, 
                                                gmp_1d.coords.y, 
                                                gmp_1d.crs)
gmo_1d

In [None]:
dask.optimize(_get_cls_params_2d(gmp_2d.coords.x, 
                                 gmp_2d.coords.y, 
                                 is_lon_360=False, 
                                 crs=gmp_2d.crs, 
                                 tile_size=None,
                                 tolerance=1e-5))