Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Very large Dask graph size with high resolution grids #127

Closed
ScottWales opened this issue Nov 1, 2021 · 9 comments · Fixed by #135
Closed

Very large Dask graph size with high resolution grids #127

ScottWales opened this issue Nov 1, 2021 · 9 comments · Fixed by #135

Comments

@ScottWales
Copy link

Regridding from 0.1 to 0.25 degree global grids is creating a very large task graph with xesmf 0.6.1, causing problems when evaluating the result.

Sample code:

import xarray
import xesmf
import dask.array

a = xesmf.util.grid_global(0.1,0.1)
b = xesmf.util.grid_global(0.25,0.25)
r = xesmf.Regridder(a,b,method='bilinear')

data = dask.array.random.random((720,a.dims['y'], a.dims['x']), chunks=(12,-1,-1))
da = xarray.DataArray(data, dims=['time','y','x'], coords={'y': a.y, 'x': a.x})

r(da).data

Evaluated versions at https://nbviewer.org/gist/ScottWales/4fe0e9a5725b5a3bf07ab94e80778846

With 0.6.0, the regridded data has 240 Dask tasks. With 0.6.1 the number has exploded to 409,841 tasks, and appears to scale with the grid resolution - low resolution regrid task graphs have a reasonable size.

Expected behaviour is that the Dask task graph created by xesmf is a reasonably small multiple of the number of chunks in the input data, and that it is independent of grid resolution. Having a too large task graph slows down Dask as it prepares the tasks to run and is causing excess memory use when the result is evaluated.

@huard
Copy link
Contributor

huard commented Nov 2, 2021

Thanks @ScottWales for the detailed report and examples.

@aulemahal Any clue on what could be done about this ?

@raphaeldussin
Copy link
Contributor

@ScottWales thanks for the detailed issue, this is definitely problematic and we need to look into this asap!

@huard
Copy link
Contributor

huard commented Nov 2, 2021

Here is a breakdown of the dask tasks:

Counter({'regrid_array': 60,
         'random_sample': 60,
         'array': 409600,
         'array-original': 1,
         'regrid_array_0': 60,
         'transpose': 60})

array tasks look like this:

(<function dask.array.core.getter_inline(a, b, asarray=True, lock=None)>,
 'array-original-f73f3bf0064c05b0c2ef47d41a9077ed',
 (slice(0, 4050, None), slice(3568050, 3572100, None)),
 False,
 False)

and the array-original task is <COO: shape=(1036800, 6480000), dtype=float64, nnz=4147200, fill_value=0.0>
So the 409600 tasks take slices of 4050x4050 values into the COO sparse matrix.

My - limited - understanding is that dask chunks the sparse weights matrix as if it was dense.

@aulemahal
Copy link
Collaborator

Argh... Sorry guys!

Ok so I did see that problem in the original PR about sparse, but I guess I forgot and we didn't discuss it. @huard spotted the issue. Because of the tremendous size of the weight array : n_in x n_out, dask tries to rechunk it on the fly to limit the chunk size. It doesn't see that the arrays are sparse.

@ScottWales : the workaround is to config dask to accept large chunks. Since the chunks are sparse, there is no memory explosion. Ex:

from dask import config
config.set({'array.chunk-size': '500GiB'})

I am currently digging dask's code and doc. I found the following:

The weights are transformed to a dask array in dask.array.apply_gufunc which is called in xr.apply_ufunc when any input uses the dask backed. Precisely, in dask.array.core.asarray.

There, dask doesn't recognize the sparse array as something special and converts it to dask using the same as it was a numpy array. THUS, the size is computed as np.prod(shape) and it is rechunked to smaller elements. This is the problematic and useless part.

(It didn't happen before because we where passing the weights as a kwargs of apply_ufunc, instead of as a normal input. This had the ill effect of copying the weights as many times as there were regrid_array tasks. This is why we added the sparse backend.)

The solution is to have dask measure the size of the array from the number of non-zero elements, not from its shape. I see a few ways forward:

  1. Implement something in dask to recognize "sparse" arrays (sparse but also scipy) and judge the appropriate chunking acoordingly.
  2. Implement to_dask_array() on sparse.SparseArray to handle this (dask's asarray will detect it).
  3. Handle this in xarray (in xr.core.computation.apply_variable_ufunc)
  4. Handle this ourselves, simply by detecting that the regridding input uses dask and then creating the weights array ourselves, using a single chunk.

I personally believe (2) is the cleanest way, but we have to ask people at sparse. (4) would solve our problem quickly as it is easy to implement.

@raphaeldussin
Copy link
Contributor

@aulemahal since the issue seems to be coming from the transition from scipy to sparse, wouldn't a solution be to revert to the use of scipy.sparse as default and add Sparse as an optional backend while we figure out a proper fix either in xesmf or in the depencies?

@aulemahal
Copy link
Collaborator

If it's simpler like that.

I think it makes no sense to use sparse as an optional backend since it is itself backed by scipy. The main win from going to sparse is to be able to have weights in a DataArray and pass them as an argument to the apply_ufunc call. IMO, having this still possible (with an optional dependency, meaning 2 regrid_dataarray algorithms) would be way less elegant than simply working around the bug raised here ourselves.

Going back to scipy.sparse means carrying a full copy of the weights in every regrid_array tasks of the dask tree. Evidently, that problem is rarer then the one raised here (which is systematic).

@dcherian
Copy link
Contributor

If you're going down the sparse route, I would try using dask.array.tensordot directly or xr.dot instead of using apply_gufunc. You might run into dask/dask#7980 but in theory that could be fixed easily.

That said, this issue is pretty clearly a dask bug when it autochunks inputs to apply_gufunc so it would be good to open an issue there.

@raphaeldussin
Copy link
Contributor

@dcherian thanks very much for this valuable feedback, @aulemahal do you have time to look into this?

@aulemahal
Copy link
Collaborator

@dcherian @raphaeldussin This is indeed very promising! I am currently testing this new approach and am able to remove the need to call xr.apply_ufunc with dask='parallelized'. Instead, the Regridder._regrid_array function is now "polymorphic" and treats dask and numpy array alike. As a side effect, this forces me to wrap the weights into a dask array manually, and thus I can make it have a single chunk.

I am continuing my tests, but this is interesting! I expect a larger task graph than before (0.6.0) for DataArrays since apply_ufunc was wrapping a few operations into one task. Handling dask arrays directly will "expose" all those operations, but I feel this wouldn't be problematic as raised here. (Those operations are the reshapes of apply_weights and all the extra handling with skipna=True).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants