Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dask-specific methods #97

Open
TomNicholas opened this issue May 19, 2022 · 6 comments · May be fixed by #196
Open

Dask-specific methods #97

TomNicholas opened this issue May 19, 2022 · 6 comments · May be fixed by #196
Labels
enhancement New feature or request help wanted Extra attention is needed

Comments

@TomNicholas
Copy link
Collaborator

xr.Dataset implements a bunch of dask-specific methods, such as __dask_tokenize__ and __dask_graph__. It also obviously has public methods that involve dask such as .compute() and .load().

In DataTree on the other hand, I haven't yet implemented any methods like these, or even written any tests that involve dask! You can probably still use dask with datatree right now, but from dask's perspective the datatree is presumably merely a set of unconnected Dataset objects.

We could choose to implement methods like .load() as just a mapping over the tree, i.e.

def load(self):
    for node in self.subtree:
        if node.has_data:
            node.ds.load()

but really this should probably wait for #41, or be done as part of that refactor.

I don't really understand what the double-underscore methods do though yet, so would appreciate input on that.

@dcherian
Copy link

understand what the double-underscore methods do though yet

https://docs.dask.org/en/stable/custom-collections.html

Xarray objects satisfy this Collections protocol, so you can do dask.tokenize(xarray_thing), dask.compute(xarray_thing) etc (visualize, persist).

@pbranson
Copy link

I have just discovered datatree and wonder if it helps address performance issues I have encountered with dask mostly due to exploding task graphs when working with very large datasets stored across many netcdf/very large/deeply nested zarr arrays with many chunks.

My solution was implement a basic tree in an intake driver, that for a 20 year dataset, uses monthly aggregates (of daily rasters) in kerchunk json with a delayed open_and_(optionally)load method. The complete dataset representation and graph is readily constructed at this coarser granularity, and then Dataset subsetting and loading of the monthly aggregates can occur on the workers, in a distributed fashion.

I tried to make use of finer grained parallelism by then using the threads scheduler within a worker, but ran into issues - so I got the best performance using many single threaded workers (a bit like the lambda examples I saw with pywren). The earlier prototype code and performance tests are in this notebook: https://github.com/pbranson/pixeldrill/blob/main/notebooks/access_via_aggindex.ipynb

Is there, in a sense, some overlap between this library and kerchunk and is there a logical point for them to interface?

Perhaps there is a more native way to handle this in dask that I need to learn about that encapsulated some kind of dynamic graph generation and nested distributed scheduling that doesn't need to be coordinated back to the central scheduler?

@TomNicholas
Copy link
Collaborator Author

@pbranson I think datatree's IO is less advanced than you are imagining.

At the moment all we do is look at a netCDF file / Zarr Store, iterate over the groups, open each group using xr.open_dataset, and arrange all the groups into a tree. We don't do anything else with dask graphs, chunking, loading, or intake.

Is there, in a sense, some overlap between this library and kerchunk and is there a logical point for them to interface?

I am not (yet) very familiar with kerchunk, but I think they do pretty different things. My understanding is that kerchunk allows you to open data as if it were chunked in a different pattern from how it is actually chunked on disk. Datatree does nothing special with chunks, it just arranges the results of different open_dataset calls into a convenient tree structure.

If you can use kerchunk in open_dataset, we could probably also use it in datatree, but I think the logical place to ask about integration would be on the xarray main repo.

@pbranson
Copy link

pbranson commented Jul 30, 2022

Thanks @TomNicholas and sorry for creating issue noise. I guess I got a bit carried away with these comments in the readme:

  • Has functions for mapping user-supplied functions over every node in the tree,
  • Automatically dispatches some of xarray.Dataset's API over every node in the tree (such as .isel),

I was thinking that maybe the datatree abstraction could be a more formalised and ultimately 'xarray native' approach to the the problems that have been tackled by e.g. intake-esm and intake-thredds. Leaves in the tree could compositions over netcdf files, which may be aggregated JSON indexes. I guess I was thinking that some sort of formalism over a nested datastructure could help in dask computational graph composition. I have run into issues where the scheduler gets overloaded, or just takes forever to start for calculations across large datasets composed with i.e. mf_opendataset

I wonder if @andersy005, @mdurant or @rsignell-usgs have any experience or thoughts about if it makes any sense for an interface between this library and intake?

@TomNicholas
Copy link
Collaborator Author

@pbranson you're asking great questions but I'm going to move this discussion over to #134 so as not to confuse it with the original purpose of this issue.

@darothen darothen linked a pull request Jan 13, 2023 that will close this issue
5 tasks
@slevang
Copy link
Contributor

slevang commented Nov 15, 2023

I have a use case with a tree where all the nodes share the initial i/o ops and many intermediate compute tasks, and noticed how slow this is to map over the nodes sequentially as e.g. compute() currently operates.

I don't see this implemented in #196, so wanted to note a very simple optimization on these methods: combine all the nodes in a python builtin collection (easily done with DataTree.to_dict()) and use dask.compute simultaneously, e.g. something like:

def compute(self):
    return self.from_dict(dask.compute(self.to_dict())[0])

Probably a little faster to first filter out non-dask nodes. This is exactly what Dataset.compute() does:
https://github.com/pydata/xarray/blob/141147434cb1f4547ffff5e28900eeb487704f08/xarray/core/dataset.py#L841-L851

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request help wanted Extra attention is needed
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants