Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Performance issue; preprocessing produces ungodly amount of dask tasks #58

Closed
jbusecke opened this issue Oct 16, 2020 · 12 comments
Closed
Labels
upstream issue Problems that are better addressed in the upstream repos (xarray, dask etc)

Comments

@jbusecke
Copy link
Owner

jbusecke commented Oct 16, 2020

I just discovered a concerning behavior of combined_preprocessing, which seems to create a lot more dask tasks for each datasets when I enable the new automatic slicing for large arrays.

Consider this example:

import intake
import warnings
from cmip6_preprocessing.preprocessing import combined_preprocessing

import dask
dask.config.set(**{'array.slicing.split_large_chunks': True, "array.chunk-size": "256 MiB",})

url = "https://raw.githubusercontent.com/NCAR/intake-esm-datastore/master/catalogs/pangeo-cmip6.json"
col = intake.open_esm_datastore(url)

cat = col.search(
    table_id='Omon',
    grid_label='gn',
    experiment_id='piControl',
    variable_id='o2',
    source_id=['CanESM5'])

Now lets load this single model into a dictionary with and without using preprocess

ddict_raw = cat.to_dataset_dict(
                zarr_kwargs={"consolidated": True, "decode_times":False}, 
            )
ddict_raw['CMIP.CCCma.CanESM5.piControl.Omon.gn'].o2.data

image

ddict = cat.to_dataset_dict(
                zarr_kwargs={"consolidated": True, "decode_times":False},
                preprocess=combined_preprocessing
            )
ddict['CMIP.CCCma.CanESM5.piControl.Omon.gn'].o2.data

image

The tasks increased from ~30k to more than 9 million!. This seems to hit the limit of what dask can handle.

I dug a little deeper and it seems that the increase is happening during this step , specifically the call to .sortby('x) here.

If I deactivate the new automatic slicing for large arrays I get this

import dask
with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    ddict_test = cat.to_dataset_dict(
                zarr_kwargs={"consolidated": True, "decode_times":False},
                preprocess=combined_preprocessing
            )
ddict_test['CMIP.CCCma.CanESM5.piControl.Omon.gn'].o2.data

image

This prevents the x dimension to be rechunked to single values chunks.

Unfortunately I dont understand enough about these xarray/dask internals. @dcherian is this something that you know more about? Ill try to come up with a more simplified example and crosspost over at xarray. Just wanted to document this behavior here first.

@jbusecke
Copy link
Owner Author

Seems like this is related: pydata/xarray#4428

Should i deactivate the auto slicing for the whole combined_preprocessing? Or is there a more dask friendly way to sort values along a dimension (in this case it is not chunked, I got a local version where it is chunked...that causes even more trouble...

I am also unclear why these chunks are split in the first place? They are smaller than dask.config.get('array.chunk-size'), which is 128MB. Is there an internal step that creates larger chunks as the one in the input/output?

@dcherian
Copy link

Is there an internal step that creates larger chunks as the one in the input/output?

sort becomes an isel which triggers that behaviour. But clearly your chunks are becoming too small. Can you produce a minimal example?

@jbusecke
Copy link
Owner Author

Working on it. Do you think that is better posted over at xarray or here?

@jbusecke
Copy link
Owner Author

jbusecke commented Oct 16, 2020

I think I got a simpler example with the same issue:

import xarray as xr
import numpy as np

import dask
dask.config.set(**{'array.slicing.split_large_chunks': True,
                   "array.chunk-size": "24 MiB",})

da = xr.DataArray(np.random.rand(10), dims=['x'],
                  coords={'x':[3,4,5,6,7,9,8,0,2,1]}).chunk({'x':-1}).expand_dims(y=1000, time=2000).chunk({'y':-1, 'time':200})
da

image

Note that I set the array chunk size to be larger than the chunks of the array.

If I now sort it by x, I get much smaller chunks:

da.sortby('x')

image

If I deactivate the auto slicing, I get the expected result (+ a few tasks but not 3x as much):

with dask.config.set(**{'array.slicing.split_large_chunks': False}):
    da_sorted = da.sortby('x')
da_sorted

image

I dont know enough about these internals but that seems unintuitive to me.

EDIT: Here are the versions installed:

INSTALLED VERSIONS
------------------
commit: None
python: 3.7.8 | packaged by conda-forge | (default, Jul 31 2020, 02:25:08) 
[GCC 7.5.0]
python-bits: 64
OS: Linux
OS-release: 4.19.112+
machine: x86_64
processor: x86_64
byteorder: little
LC_ALL: C.UTF-8
LANG: C.UTF-8
LOCALE: en_US.UTF-8
libhdf5: 1.10.6
libnetcdf: 4.7.4

xarray: 0.16.1
pandas: 1.1.3
numpy: 1.19.1
scipy: 1.5.2
netCDF4: 1.5.4
pydap: installed
h5netcdf: 0.8.1
h5py: 2.10.0
Nio: None
zarr: 2.4.0
cftime: 1.2.1
nc_time_axis: 1.2.0
PseudoNetCDF: None
rasterio: 1.1.7
cfgrib: 0.9.8.4
iris: None
bottleneck: 1.3.2
dask: 2.30.0
distributed: 2.30.0
matplotlib: 3.3.2
cartopy: 0.18.0
seaborn: None
numbagg: None
pint: 0.16.1
setuptools: 49.6.0.post20200917
pip: 20.2.3
conda: None
pytest: 6.1.1
IPython: 7.18.1
sphinx: 3.2.1

@jbusecke
Copy link
Owner Author

I just tried it and is seems I need to go to "array.chunk-size": "256 MiB" for these to not be chunked differently.

@jbusecke
Copy link
Owner Author

I have changed the array size to dask.config.set(**{'array.slicing.split_large_chunks': True, "array.chunk-size": "128 GB",}) and the problem goes away 🤣. Still feel that this value should be more representative of what the chunks actually end up being...

@jbusecke jbusecke added the upstream issue Problems that are better addressed in the upstream repos (xarray, dask etc) label Oct 16, 2020
@aaronspring
Copy link
Contributor

another option to circumvent this: use cmip6_preprocessing after using intake-esm and not via preprocess in intake-esm

@jbusecke
Copy link
Owner Author

You mean with aggregate=False, @aaronspring ?

@aaronspring
Copy link
Contributor

You mean with aggregate=False, @aaronspring ?

maybe. I usually dont use aggregate. I just meant to use the functions from your package after intake-esm on xarray objects directly.

@jbusecke
Copy link
Owner Author

I am experimenting with this and will report back. This is a good suggestion and maybe it should become our default recommendation so that people are not bound to intake-esm, but can still use it if desired.

@aaronspring
Copy link
Contributor

The functions works as expected on datasets. When used on extracted dataarrays, the missing attrs may cause some functions to fail

@jbusecke
Copy link
Owner Author

jbusecke commented Jun 1, 2021

I have updated the recommended workflow to use aggregate=False here, which should avoid this problem. Closing this.

@jbusecke jbusecke closed this as completed Jun 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
upstream issue Problems that are better addressed in the upstream repos (xarray, dask etc)
Projects
None yet
Development

No branches or pull requests

3 participants