Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support concurrent loading of variables #8965

Open
dcherian opened this issue Apr 23, 2024 · 4 comments
Open

Support concurrent loading of variables #8965

dcherian opened this issue Apr 23, 2024 · 4 comments

Comments

@dcherian
Copy link
Contributor

dcherian commented Apr 23, 2024

Is your feature request related to a problem?

Today if users have to concurrently load multiple variables in a DataArray or Dataset, they have to use dask.

It struck me that it'd be pretty easy for .load to gain an executor kwarg that accepts anything that follows the concurrent.futures executor interface, and parallelize this loop.

# load everything else sequentially
for k, v in self.variables.items():
if k not in lazy_data:
v.load()

@rabernat
Copy link
Contributor

Would that be compatible with async stores?

@TomNicholas
Copy link
Contributor

This idea of passing an arbitrary concurrent executor to xarray seems potentially related to #7810, which suggests allowing open_mfdataset(parallel=true) to use something other than dask.delayed to parallelize the opening of the files.

@rabernat
Copy link
Contributor

This is a tricky issue. One problem we have in our stack is that we currently outsourced nearly all actual parallelism to Dask. (The one exception to this is fsspec's async capabilities, which are hidden behind a separate thread housing an async event loop.)

Ideally, there would be one single runtime responsible for actually implementing concurrent data access and I/O. If all the libraries implemented async methods, then that could be placed completely in the user's responsibility, i.e. you could right code like

async def my_processing_function():
    await xr.open_dataset(...)
    # which would call
    await zarr.open_group(...)
    # which would call
    await object_store.get_object(...)

The user would be responsible for starting an event loop and running the coroutine. The event loop would manage the concurrency for the whole stack and everything would be fine.

In Zarr we are in the process of adding the async methods. That begs the question...should Xarray add them too?

If not, then Xarray has to decide how to call async code. It could use the fsspec approach of managing an async event loop on another thread. It could manage a threadpool of its own. How would these interact with Dask / fsspec / Zarr / etc. The futures approach proposed here is one example of how to add concurrency within Xarray.

I feel like this conundrum really illustrates the limitations of the modularity that we value so much from our stack. I have no idea what the "right" answer is. However, my perspective has been greatly influenced by writing Tokio Rust code, which does not suffer from this delegation problem. It's a very different situation from Python.

@dcherian
Copy link
Contributor Author

FWIW this appears to do what I wanted with Zarr at least, i.e. issue concurrent loads per variable.

def concurrent_compute(ds: xr.Dataset) -> xr.Dataset:
    from concurrent.futures import ThreadPoolExecutor, as_completed

    copy = ds.copy()

    def load_variable_data(name: str, var: xr.Variable) -> np.ndarray:
        return (name, var.compute().data)

    with ThreadPoolExecutor(max_workers=None) as executor:
        futures = [
            executor.submit(load_variable_data, k, v) for k, v in copy.variables.items()
        ]
        for future in as_completed(futures):
            name, loaded = future.result()
            copy.variables[name].data = loaded
    return copy

concurrent_compute(ds)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants