New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for dask arrays in GridInterface #2305

Merged
merged 1 commit into from Mar 7, 2018

Conversation

Projects
None yet
4 participants
@philippjfr
Member

philippjfr commented Feb 5, 2018

As requested in #2176 this PR adds support for using dask arrays with the gridded interface. This turned out to be completely trivial (as you can see most of the new code is tests) and let users use dask arrays without relying on the heavier xarray dependency. @jakirkham Would you mind testing this and seeing whether it addresses your requirements?

@jakirkham

This comment has been minimized.

Contributor

jakirkham commented Feb 12, 2018

Thanks @philippjfr. Sorry for getting back to you so late. How would I use this to say instantiate a hv.Dataset using a Dask Array?

@philippjfr

This comment has been minimized.

Member

philippjfr commented Feb 12, 2018

It should work something like this:

darr = da.from_array(np.random.rand(10, 20))

# Tuple constructor
hv.Dataset((range(20), range(10), darr), ['x', 'y'], 'z')

# Dict constructor
hv.Dataset({'x': range(20), 'y': range(10), 'z': darr}, ['x', 'y'], 'z')

Basically you declare coordinate arrays describing the values along the axes of your dask array and then pass them in as a tuple or dictionary. The order of key dimensions should be the reverse order of the axes of the array.

@jakirkham

This comment has been minimized.

Contributor

jakirkham commented Feb 13, 2018

Thanks for the tips. Managed to get that to work.

Minor note: from_array needs a chunks argument

@jakirkham

This comment has been minimized.

Contributor

jakirkham commented Feb 26, 2018

Anything else needed before this can go in?

@philippjfr

This comment has been minimized.

Member

philippjfr commented Feb 26, 2018

The Dataset tests for the aggregate method were highlighting some issues using numpy functions to aggregate the dask arrays. Haven't actually looked into what the problem is though.

@philippjfr

This comment has been minimized.

Member

philippjfr commented Feb 26, 2018

Looks like the errors were in the test code itself:

  File "/home/travis/build/ioam/holoviews/tests/testdataset.py", line 1522, in test_dataset_2D_aggregate_partial_hm
    Dataset({'x':self.xs, 'z': np.mean(array, axis=0).compute()},
  File "/home/travis/miniconda/envs/test-environment/lib/python2.7/site-packages/numpy/core/fromnumeric.py", line 2937, in mean
    return mean(axis=axis, dtype=dtype, out=out, **kwargs)
TypeError: mean() got an unexpected keyword argument 'out'
@philippjfr

This comment has been minimized.

Member

philippjfr commented Mar 7, 2018

This is ready to merge once tests pass.

@philippjfr philippjfr added this to the v1.10 milestone Mar 7, 2018

@@ -301,6 +310,8 @@ def values(cls, dataset, dim, expanded=True, flat=True):
if dim in dataset.vdims or dataset.data[dim.name].ndim > 1:
data = dataset.data[dim.name]
data = cls.canonicalize(dataset, data)
if da and isinstance(data, da.Array):
data = data.compute()

This comment has been minimized.

@jakirkham

jakirkham Mar 7, 2018

Contributor

When does this get called? On what data is this being called on?

This comment has been minimized.

@philippjfr

philippjfr Mar 7, 2018

Member

This should only be called during plotting, and only on the subset of data that you are actually displaying. If you use a datashader operation this won't end up being called at all, and all the processing should happen out-of-core.

This comment has been minimized.

@jakirkham

jakirkham Mar 7, 2018

Contributor

Ok, thanks for clarifying. Sounds reasonable.

Does this get cached?

This comment has been minimized.

@philippjfr

philippjfr Mar 7, 2018

Member

No, but if you've got ideas about caching I'd love to hear them. I'm currently running into issues with some remote datasets, which end up being downloaded multiple times.

This comment has been minimized.

@mrocklin

mrocklin Mar 7, 2018

For the local scheduler there is http://dask.pydata.org/en/latest/caching.html

No opportunistic caching exists today for the distributed scheduler. It's wouldn't be particularly hard to do though if someone wants to dive in. Relevant issue here: dask/distributed#681

This comment has been minimized.

@jakirkham

jakirkham Mar 7, 2018

Contributor

Naively was thinking we might be able to call persist here. It would still be blocking in some cases (single machine scheduler), but no worse than calling compute. Though given we would want the array as well (in cases where we would get Futures), we would need somewhere to hang on to the Futures. Perhaps in Holoviews (i.e. an object of some kind), a Distributed Client (if using Distributed), somewhere else?

This comment has been minimized.

@philippjfr

philippjfr Mar 7, 2018

Member

Is the use case here a multi-dimensional array which should be loaded into memory incrementally as you are exploring it? For example let's say you had a stack of images with x, y and z axes and you are browsing through the z-stack, you want each 2D array to persist in memory once it has been displayed? I think that would be a very useful option but I'd have to think about the best approach to support that.

Currently our interfaces are stateless but I could imagine passing through a persist option to the interfaces in the constructor, which tells them to call .persist() before calling .compute(). To make this concrete:

darr = da.from_array(np.random.rand(30, 20, 20), (1, 1, 3))
ds = hv.Dataset((range(20), range(20), range(30), darr),
                ['x', 'y', 'z'], 'Value', persist=True)

z_stack = ds.to(hv.Image, ['x', 'y'])
z_stack[10]['Value']

In this example we create the 3D array, use .to to group it by the z-axis and then fetch the 2D array corresponding to z-value 10 and then use getitem to load the 'Value' array into memory. If we add the persist flag the array would then stay in memory (ignoring the fact that this example is in memory anyway) and subsequent access to z_stack[10]['Value'] would be much faster. Is this what you had in mind? If so we'll need to discuss whether interfaces could/should have state for backend specific options like this.

This comment has been minimized.

@mrocklin

mrocklin Mar 7, 2018

I think that medium-term this is the sort of thing that should be handled on the Dask side. We're well-positioned to solve this with minimal code-complexity elsewhere. Again, you can probably do this on the local (non-distributed) scheduler today.

This comment has been minimized.

@mrocklin

mrocklin Mar 7, 2018

It would be interesting to see how calling these lines ahead of time affected performance:

from dask.cache import Cache
cache = Cache(2e9).register()

This comment has been minimized.

@jakirkham

jakirkham Mar 7, 2018

Contributor

To answer your question, @philippjfr, yes, basically as you described. We are looking at time instead of z (and the axis order differs), but for all intensive purposes this difference is merely semantic as far as Holoviews is concerned.

FWIW, as @mrocklin points out, we do have other options on the Dask side that could be helpful here or could be extended a bit (e.g. Cache support on Distributed). Issue ( dask/distributed#681 ) outlines this problem a bit more. Alternatively there are some existing Dask options, which could allow this to be solved in user code.

There's a separate discussion that is a bit more interesting, which essentially involves having something like Holoviews drive computational priority in Dask. IOW one requests a large array be persisted (in memory, persist, or on disk, with return_stored=True), but would like to prioritize computations based on what a user would like to look at first. Outlined a bit more in issue ( dask/distributed#1753 ).

@jakirkham

This comment has been minimized.

Contributor

jakirkham commented Mar 7, 2018

Thanks @philippjfr. Had one question above. Otherwise LGTM.

@jlstevens

This comment has been minimized.

Member

jlstevens commented Mar 7, 2018

Looks good to me as well and I am happy to see a good number of unit tests. I'm also happy to see that compute only needs to be called in one place. Merging.

@jlstevens jlstevens merged commit 8771c81 into master Mar 7, 2018

4 checks passed

continuous-integration/travis-ci/pr The Travis CI build passed
Details
continuous-integration/travis-ci/push The Travis CI build passed
Details
coverage/coveralls First build on dask_grid at 82.383%
Details
s3-reference-data-cache Test data is cached.
Details
@philippjfr

This comment has been minimized.

Member

philippjfr commented Mar 7, 2018

happy to see a good number of unit tests

Note that what you are seeing is only a small fraction of unit tests, it's running the entire dataset test suite, which now includes about 130 tests.

@jakirkham

This comment has been minimized.

Contributor

jakirkham commented Mar 7, 2018

Thanks all. :)

@jakirkham

This comment has been minimized.

Contributor

jakirkham commented Mar 7, 2018

@mrocklin, this may be of interest. ;)

@philippjfr

This comment has been minimized.

Member

philippjfr commented Mar 7, 2018

Going to have to follow up on this, it seems the groupby implementation is loading all the data into memory.

@@ -491,7 +502,10 @@ def sample(cls, dataset, samples=[]):
data[d].append(arr)
for vdim, array in zip(dataset.vdims, arrays):
flat_index = np.ravel_multi_index(tuple(int_inds)[::-1], array.shape)
data[vdim.name].append(array.flat[flat_index])
if da and isinstance(array, da.Array):
data[vdim.name].append(array.flatten()[tuple(flat_index)])

This comment has been minimized.

@jakirkham

jakirkham Mar 7, 2018

Contributor

Think the indexing is the issue here. If you switch to vindex, IIUC what flat_index means, this should be lazy.

This comment has been minimized.

This comment has been minimized.

@philippjfr

philippjfr Mar 9, 2018

Member

Thanks, the same issue also applied in select which was causing most of the problems. Followed up in #2424.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment