Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Integration with dask/distributed (xarray backend design) #798

Closed
2 tasks done
pwolfram opened this issue Mar 21, 2016 · 59 comments
Closed
2 tasks done

Integration with dask/distributed (xarray backend design) #798

pwolfram opened this issue Mar 21, 2016 · 59 comments

Comments

@pwolfram
Copy link
Contributor

pwolfram commented Mar 21, 2016

Dask (https://github.com/dask/dask) currently provides on-node parallelism for medium-size data problems. However, large climate data sets will require multiple-node parallelism to analyze large climate data sets because this constitutes a big data problem. A likely solution to this issue is integration of distributed (https://github.com/dask/distributed) with dask. Distributed is now integrated with dask and its benefits are already starting to be realized, e.g., see http://matthewrocklin.com/blog/work/2016/02/26/dask-distributed-part-3.

Thus, this issue is designed to identify the steps needed to perform this integration, at a high-level. As stated by @shoyer, it will

definitely require some refactoring of the xarray backend system to make this work cleanly, but that's
OK -- the xarray backend system is indicated as experimental/internal API precisely because we
hadn't figured out all the use cases yet."

To be honest, I've never been entirely happy with the design we took there (we use inheritance rather
than composition for backend classes), but we did get it to work for our use cases. Some refactoring
with an eye towards compatibility with dask distributed seems like a very worthwhile endeavor. We
do have the benefit of a pretty large test suite covering existing use cases.

Thus, we have the chance to make xarray big-data capable as well as provide improvements to the backend.

To this end, I'm starting this issue to help begin the design process following the xarray mailing list discussion some of us have been having (@shoyer, @mrocklin, @rabernat).

Task To Do List:

@pwolfram
Copy link
Contributor Author

@pwolfram
Copy link
Contributor Author

See also dask/dask#922

@mrocklin
Copy link
Contributor

Copying over a comment from that issue:

Yes, so the problem as I see it is that, for serialization and open-file reasons we want to use a function like the following:

def get_chunk_of_array(filename, datapath, slice):
    with netCDF4.Dataset(filename) as f:
        return f.variables[datapath][slice]

However, this opens and closes many files, which while robust, is slow. We can alleviate this by maintaining an LRU cache in a global variable so that it is created separately per process.

from toolz import memoize

cache = LRUDict(size=100, on_eviction=lambda file: file.close())

netCDF4_Dataset = memoize(netCDF4.Dataset, cache=cache)

def def get_chunk_of_array(filename, datapath, slice):
    f = netCDF4_Dataset(filename)
    return f.variables[datapath][slice]

I'm happy to supply the memoize function with toolz and an appropriate LRUDict object with other microprojects that I can publish if necessary.

We would then need to use such a function within the dask.array and xarary codebases.

Anyway, that's one approach. Thoughts welcome.

@pwolfram
Copy link
Contributor Author

Here is an example of a use case for a nanmean over ensembles in collaboration with @mrocklin and following http://matthewrocklin.com/blog/work/2016/02/26/dask-distributed-part-3: https://gist.github.com/mrocklin/566a8d5c3f6721abf36f

@pwolfram
Copy link
Contributor Author

@shoyer and @mrocklin, I've updated the summary above in the PR description with a to do list. Do either of you see any obvious tasks I missed on the list in the PR description? If so, can you please update the to do list so that I can see what needs done to modify the backend for the dask/distributed integration?

@pwolfram
Copy link
Contributor Author

Repeating @mrocklin:

Dask.array writes data to any object that supports numpy style setitem syntax like the following:

dataset[my_slice] = my_numpy_array

Objects like h5py.Dataset and netcdf objects support this syntax.

So dask.array would work today without modification if we had such an object that represented many netcdf files at once and supported numpy-style setitem syntax, placing the numpy array properly across the right files. This work could happen easily without deep knowledge of either project.

Alternatively, we could make the dask.array.store function optionally lazy so that users (or xarray) could call store many times before triggering execution.

@pwolfram
Copy link
Contributor Author

This issue of connecting to dask/distributed may also be connected with #463, #591, and #524.

@mrocklin
Copy link
Contributor

I believe that robustly supporting HDF/NetCDF reads with the mechanism mentioned above will resolve most problems from a dask.array perspective. I have no doubt that other things will arise though. Switching from shared to distributed memory always come with (surmountable) obstacles

@shoyer
Copy link
Member

shoyer commented Mar 25, 2016

I agree with @mrocklin that the LRUCache for file-like objects should take care of things from the dask.array perspective. It should also solve #463 in a very clean way. We'll just need to reorganize things a bit to make use of it.

@pwolfram
Copy link
Contributor Author

Thanks @shoyer. If you can provide some guidance on bounds for the reorganization that would be really great. I want your and @jhamman's feedback on this before I try a solution. The trick is just to make the time, as always, and I may have some time this coming weekend.

@pwolfram
Copy link
Contributor Author

pwolfram commented Apr 2, 2016

Another note in support of this PR, especially "robustly support HDF/NetCDF reads": I am having problems with NetCDF: HDF error as previously reported by @rabernat in #463. Thus, a solution here will save time and may arguably be on the critical path of some workflows because fewer jobs will fail and require baby-sitting/restarts, especially when dealing with running multiple jobs.

@mrocklin
Copy link
Contributor

mrocklin commented Apr 2, 2016

FWIW I've uploaded a tiny LRU dict implementation to a new zict project (which also has some other stuff):

http://zict.readthedocs.org/en/latest/

pip install zict
from zict import LRU
d = LRU(100, dict())

There are a number of good alternatives out there though for LRU dictionaries.

@pwolfram
Copy link
Contributor Author

pwolfram commented Apr 4, 2016

Thanks @mrocklin! This has been really helpful and was what I needed to get going.

A prelim design I'm seeing is to modify the NetCDF4DataStore class https://github.com/pydata/xarray/blob/master/xarray/backends/netCDF4_.py#L170 to meet these requirements:

  1. At __init__, try to open file via the LRU cache. I think the LRU dict has to be a global because because the file restriction is an attribute of the system, correct?
  2. For each read from a file, ensure it hasn't been closed via a @ds.getter property method. If so, reopen it via the LRU cache. This is ok because for a read the file is essentially read-only. The LRU closes out stale entries to prevent the too many open file errors. Checking this should be fast.
  3. sync is only for a write but seems like it should following the above approach.

A clean way to do this is just to make sure that each time self.ds is called, it is re-validated via the LRU cache. This should be able to be implemented via property getter methods https://docs.python.org/2/library/functions.html#property.

Unless I'm missing something big, I don't think this change will require at large refactor but it is quite possible I overlooked something important. @shoyer and @mrocklin, do you see any obvious pitfalls in this scope? If not, it shouldn't be too hard to implement.

@fmaussion
Copy link
Member

Sorry if I am just producing noise here (I am not a specialist), but I have two naive questions:

To 1. how will you handle concurrent access to the LRU cache if it's a global variable?

To 2. Once the file has been closed by the LRU, won't it also be erased from it? So that a simple if file in LRU: could suffice as a test if the file has been closed or not?

@shoyer
Copy link
Member

shoyer commented Apr 4, 2016

I think the LRU dict has to be a global because because the file restriction is an attribute of the system, correct?

Correct, the LRU dict should be global. I believe the file restriction is generally per-process, and creating a global dict should assure that works properly.

For each read from a file, ensure it hasn't been closed via a @ds.getter property method. If so, reopen it via the LRU cache. This is ok because for a read the file is essentially read-only. The LRU closes out stale entries to prevent the too many open file errors. Checking this should be fast.

The challenge is that we only call the .get_variables() method (and hence self.ds) once on a DataStore when a Dataset is opened from disk. I think we need to refactor NetCDF4ArrayWrapper to take a filename instead, and use something like @mrocklin's netcdf_Dataset.

My bigger concern was how to make use of a method like futures_to_dask_arrays. But it looks like that may actually not be necessary, at least if we are happy to open all netCDF files (and read out the metadata) from a master process.

@pwolfram
Copy link
Contributor Author

pwolfram commented Apr 4, 2016

Just to be clear, we are talking about this https://github.com/mrocklin/hdf5lazy/blob/master/hdf5lazy/core.py#L83 for @mrocklin's netcdf_Dataset, right?

@pwolfram
Copy link
Contributor Author

pwolfram commented Apr 4, 2016

@fmaussion, for

  1. The LRU cache should be used serially for the read initially, but something more like @mrocklin's netcdf_Dataset appears to be needed as @shoyer points out. I need to think about this more.
  2. I was thinking we would keep track of the file name outside the LRU and only use the filename to open up datasets inside the LRU if they aren't already open. Agreed that if file in LRU should designate whether the file is open.

@pwolfram
Copy link
Contributor Author

pwolfram commented Apr 4, 2016

@shoyer, if if we are happy to open all netCDF files and read out the metadata from a master process that would imply that we would open a file, read the metadata, and then close it, correct?

Array access should then follow something like the @mrocklin's netcdf_Dataset approach, right?

@shoyer
Copy link
Member

shoyer commented Apr 4, 2016

@pwolfram I was referring to this comment for @mrocklin's netCDF4_Dataset.

@shoyer
Copy link
Member

shoyer commented Apr 4, 2016

@shoyer, if if we are happy to open all netCDF files and read out the metadata from a master process that would imply that we would open a file, read the metadata, and then close it, correct?

Array access should then follow something like the @mrocklin's netcdf_Dataset approach, right?

Yes, this is correct.

In principle, if we have a very large number of files containing many variables each, we might want to do the read in parallel using futures, and then use something like futures_to_dask_arrays to bring them together. That seems much trickier to integrate into our current backend approach.

@mrocklin
Copy link
Contributor

It's probably best to avoid futures within xarray, so far they're only in the distributed memory scheduler. I think that ideally we create graphs that can be used robustly in either. I think that the memoized netCDF4_Dataset approach can probably do this just fine. Is there anything that is needed from me to help push this forward?

@kynan
Copy link

kynan commented Oct 20, 2016

Has this issue progressed since?

Being able to distribute loading of files to a dask cluster and composing an xarray Dataset from data on remote workers would be a great feature.

Is @mrocklin's blog post from Feb 2016 still the reference for remote data loading on a cluster? Adapting it to loading xarray Datasets rather than plain arrays is not straightforward since there is no way to combine futures representing Datasets out of the box.

@mrocklin
Copy link
Contributor

I haven't worked on this but agree that it is important.

@pwolfram
Copy link
Contributor Author

@kynan, I'm still interested in this but have not had time to advance this further. Are you interested in contributing to this too?

I view this as a key component of future climate analysis workflows. This may also be something that is addressed at the upcoming hackathon at Columbia with @rabernat early next month.

Also, I suspect that both @mrocklin and @shoyer would be willing to continue to provide key advice because this appears to be aligned with their interests too (please correct me if I'm wrong in this assessment).

@mrocklin
Copy link
Contributor

Definitely happy to support from the Dask side.

I think that the LRU method described above is feasible.

@mrocklin
Copy link
Contributor

If XArray devs want to chat sometime I suspect we could hammer out a plan fairly quickly. My hope is that once a plan exists then a developer will arise to implement that plan. I'm free all of today and tomorrow.

@pwolfram
Copy link
Contributor Author

@mrocklin, I would be happy to chat because I am interested in seeing this happen (e.g., eventually contributing code). The question is whether we need additional expertise from @shoyer, @jhamman, @rabernat etc who likely have a greater in-depth understanding of xarray than me. Perhaps this warrants an email to the wider list?

@mrocklin
Copy link
Contributor

I agree that this conversation needs expertise from a core xarray developer. I suspect that this change is more likely to happen in xarray than in dask.array. Happy to continue the conversation wherever. I do have a slight preference to switch to real-time at some point though. I suspect that we can hash this out in a moderate number of minutes.

@shoyer
Copy link
Member

shoyer commented Nov 8, 2016

We have something very hacky working with #1095

I'm also going to see if I can get something working with the LRU cache, since that seems closer to the solution we want eventually.

@mrocklin
Copy link
Contributor

mrocklin commented Nov 8, 2016

FYI Dask is committed to maintaining this: https://github.com/dask/zict/blob/master/zict/lru.py

@shoyer
Copy link
Member

shoyer commented Nov 8, 2016

One slight subtlety is writes. We'll need to switch from 'w' to 'a' mode
the second time we open a file.
On Tue, Nov 8, 2016 at 8:17 AM Matthew Rocklin notifications@github.com
wrote:

FYI Dask is committed to maintaining this:
https://github.com/dask/zict/blob/master/zict/lru.py


You are receiving this because you were mentioned.

Reply to this email directly, view it on GitHub
#798 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/ABKS1rz8sYoBXjMbJvQqrP3XHZx3_fJhks5q8KCRgaJpZM4H1p4q
.

@shoyer
Copy link
Member

shoyer commented Nov 8, 2016

A few other thoughts on thread safety with the LRU approach:

  1. We need to a global lock ensure internal consistency of the LRU cache, and so that we don't overwrite files without closing them. It probably makes sense to put this in memoize function.
  2. We need separate, per file locks, to ensure that we don't evict files in the process of reading or writing data from them (which would cause segfaults). As a stop-gap measure, we could simply refuse to evict files until we can acquire a lock, but more broadly this suggests that strict LRU is not quite right. Instead, we want to evict the least-recently-used unlocked item.

@kynan
Copy link

kynan commented Nov 8, 2016

Great to see this moving! I take it the workshop was productive?

How does #1095 work in the scenario of a distributed scheduler with remote workers? Do I understand correctly that all workers and the client would need to see the same shared filesystem from where NetCDF files are read?

@mrocklin
Copy link
Contributor

mrocklin commented Nov 8, 2016

Yes.

On Tue, Nov 8, 2016 at 5:17 PM, Florian Rathgeber notifications@github.com
wrote:

Great to see this moving! I take it the workshop was productive?

How does #1095 #1095 work in the
scenario of a distributed scheduler with remote workers? Do I understand
correctly that all workers and the client would need to see the same shared
filesystem from where NetCDF files are read?


You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
#798 (comment), or mute
the thread
https://github.com/notifications/unsubscribe-auth/AASszCe45oN0_1tBsrCycyr2N01M75xNks5q8PTsgaJpZM4H1p4q
.

@kynan
Copy link

kynan commented Nov 22, 2016

When using xarray with the dask.distributed scheduler it would be useful to be able to persist intermediate DataArrays / Datasets on remote workers.

There could be a persist method analogous to the compute method introduced in #1024. Potential issues with this approach are:

  1. What are the semantics of this operation for the general case where dask or distributed are not used?
  2. Is it justified to add an operation which is rather specific to the distributed scheduler?

(Could create a separate issue for this if preferred).

@mrocklin
Copy link
Contributor

mrocklin commented Nov 22, 2016

One solution is to create protocols on the Dask side to enable dask.distributed.Client.persist itself to work on XArray objects. This keeps the scheduler specific details like persist on the scheduler.

@shoyer
Copy link
Member

shoyer commented Nov 28, 2016

@mrocklin Any thoughts on my thread safety concerns (#798 (comment)) for the LRU cache? I suppose the simplest thing to do is to simply refuse to evict a file until the per-file lock is released, but I can see that strategy failing pretty badly in edge cases.

@mrocklin
Copy link
Contributor

A lock on the LRU cache makes sense to me.

We need separate, per file locks, to ensure that we don't evict files in the process of reading or writing data from them (which would cause segfaults). As a stop-gap measure, we could simply refuse to evict files until we can acquire a lock, but more broadly this suggests that strict LRU is not quite right. Instead, we want to evict the least-recently-used unlocked item

If it were me I would just block on the evicted file until it becomes available (the stop-gap measure) until it became a performance problem.

@mrocklin mrocklin mentioned this issue Dec 20, 2016
shoyer added a commit to shoyer/xarray that referenced this issue Dec 22, 2016
…writing

Fixes pydata#1172

The serializable lock will be useful for dask.distributed or multi-processing
(xref pydata#798, pydata#1173, among others).
shoyer added a commit that referenced this issue Jan 4, 2017
…ing (#1179)

* Switch to shared Lock (SerializableLock if possible) for reading and writing

Fixes #1172

The serializable lock will be useful for dask.distributed or multi-processing
(xref #798, #1173, among others).

* Test serializable lock

* Use conda-forge for builds

* remove broken/fragile .test_lock
@pwolfram
Copy link
Contributor Author

@mrocklin and @shoyer, we now have dask.distributed and xarray support. Should this issue be closed?

@mrocklin
Copy link
Contributor

Has anyone used XArray on NetCDF data on cluster without resorting to any tricks?

@mrocklin
Copy link
Contributor

mrocklin commented Jun 1, 2017

@shoyer regarding per-file locking this probably only matters if we are writing as well, yes?

Here is a small implementation of a generic file-open cache. I haven't yet decided on a eviction policy but either LRU or random (filtered by closeable files) should work OK.

from contextlib import contextmanager
import threading

class OpenCache(object):
    def __init__(self, maxsize=100):
        self.refcount = defaultdict(lambda: 0)
        self.maxsize = 0
        self.cache = {}
        self.i = 0
        self.lock = threading.Lock()

    @contextmanager
    def open(self, myopen, fn, mode='r'):
        assert 'r' in mode
        key = (myopen, fn, mode)
        with self.lock:
            try:
                file = self.cache[key]
            except KeyError:
                file = myopen(fn, mode=mode)
                self.cache[key] = file

            self.refcount[key] += 1

            if len(self.cache) > self.maxsize:
                # Clear old files intelligently

        try:
            yield file
        finally:
            with self.lock:
                self.refcount[key] -= 1

cache = OpenCache()
with cache.open(h5py.File, 'myfile.hdf5') as f:
    x = f['/data/x']
    y = x[:1000, :1000]

Is this still useful?

I'm curious to hear from users like @pwolfram and @rabernat who may be running into the many file problem about what the current pain points are.

@jhamman
Copy link
Member

jhamman commented Jan 13, 2019

Closing this old issue. The final checkbox in @pwolfram's original post was completed in #2261.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants