Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async in zarr #536

Closed
rabernat opened this issue Feb 14, 2020 · 93 comments
Closed

async in zarr #536

rabernat opened this issue Feb 14, 2020 · 93 comments
Milestone

Comments

@rabernat
Copy link
Contributor

rabernat commented Feb 14, 2020

I think there are some places where zarr would benefit immensely from some async capabilities when reading and writing data. I will try to illustrate this with the simplest example I can.

Let's consider a zarr array stored in a public S3 bucket, which we can read with fsspec's HTTPFileSystem interface (no special S3 API needed, just regular http calls).

import fsspec
url_base = 'https://mur-sst.s3.us-west-2.amazonaws.com/zarr/time'
mapper = fsspec.get_mapper(url_base)
za = zarr.open(mapper)
za.info

image

Note that this is a highly sub-optimal choice of chunks. The 1D array of shape (6443,) is stored in chunks of only (5,) items, resulting in over 1000 tiny chunks. Reading this data takes forever, over 5 minutes

%prun tdata = za[:]
         20312192 function calls (20310903 primitive calls) in 342.624 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     1289  139.941    0.109  140.077    0.109 {built-in method _openssl.SSL_do_handshake}
     2578   99.914    0.039   99.914    0.039 {built-in method _openssl.SSL_read}
     1289   68.375    0.053   68.375    0.053 {method 'connect' of '_socket.socket' objects}
     1289    9.252    0.007    9.252    0.007 {built-in method _openssl.SSL_CTX_load_verify_locations}
     1289    7.857    0.006    7.868    0.006 {built-in method _socket.getaddrinfo}
     1289    1.619    0.001    1.828    0.001 connectionpool.py:455(close)
   930658    0.980    0.000    2.103    0.000 os.py:674(__getitem__)
...

I believe fsspec is introducing some major overhead by not reusing a connectionpool. But regardless, zarr is iterating synchronously over each chunk to load the data:

zarr-python/zarr/core.py

Lines 1023 to 1028 in 994f244

# iterate over chunks
for chunk_coords, chunk_selection, out_selection in indexer:
# load chunk selection into output array
self._chunk_getitem(chunk_coords, chunk_selection, out, out_selection,
drop_axes=indexer.drop_axes, fields=fields)

As a lower bound on how fast this approach could be, we bypass zarr and fsspec and just fetch all the chunks with requests:

import requests
s = requests.Session()

def get_chunk_http(n):
    r = s.get(url_base + f'/{n}')
    r.raise_for_status()
    return r.content

%prun all_data = [get_chunk_http(n) for n in range(za.shape[0] // za.chunks[0])] 
         12550435 function calls (12549147 primitive calls) in 98.508 seconds

   Ordered by: internal time

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
     2576   87.798    0.034   87.798    0.034 {built-in method _openssl.SSL_read}
       13    1.436    0.110    1.437    0.111 {built-in method _openssl.SSL_do_handshake}
   929936    1.042    0.000    2.224    0.000 os.py:674(__getitem__)

As expected, reusing a connection pool sped things up, but it still takes 100 s to read the array.

Finally, we can try the same thing with asyncio

import asyncio
import aiohttp
import time

async def get_chunk_http_async(n, session):
    url = url_base + f'/{n}'
    async with session.get(url) as r:
        r.raise_for_status()
        data = await r.read()
    return data

async with aiohttp.ClientSession() as session:
    tic = time.time()
    all_data = await asyncio.gather(*[get_chunk_http_async(n, session)
                                    for n in range(za.shape[0] // za.chunks[0])])
    print(f"{time.time() - tic} seconds")

# > 1.7969944477081299 seconds

This is a MAJOR speedup!

I am aware that using dask could possibly help me here. But I don't have big data here, and I don't want to use dask. I want zarr to support asyncio natively.

I am quite new to async programming and have no idea how hard / complicated it would be to do this. But based on this experiment, I am quite sure there are major performance benefits to be had, particularly when using zarr with remote storage protocols.

Thoughts?

cc @cgentemann

@jakirkham
Copy link
Member

Have been wondering the same thing. Thanks for this nice write-up and performing this experiment. 😄

@gzuidhof
Copy link

gzuidhof commented Feb 17, 2020

This is supported in zarr.js, for the same reasons as you wrote here. Here's the relevant PR and issue which also explains why it was a good idea (with some timings for less pathological cases where even there the speedup can be large).

Pinging @manzt who implemented above functionality, the JS implementation follows Python pretty closely here, so a similar implementation approach may be possible

@manzt
Copy link
Member

manzt commented Feb 17, 2020

Thanks for pinging me!

@alimanfoo
Copy link
Member

alimanfoo commented Feb 17, 2020 via email

@manzt
Copy link
Member

manzt commented Feb 17, 2020

I'm not at familiar with Python's asyncio but I'm assuming that there is overlap. Calling to WebAPIs (like fetch) returns a Promise rather than a value so requests are non-blocking and can be made concurrently.

In the example above,futures is a list of Python Futures (similar to promises in javascript). These are awaitable objects that represent an eventual result of an async operation. Using the session, multiple requests are fired off and awaiting async.gather(*futures) waits for the last future to be resolved.

async with aiohttp.ClientSession() as session:
    tic = time.time()
    futures = [get_chunk_http_async(n, session) for n in range(za.shape[0] // za.chunks[0])]
    all_data = await asyncio.gather(*futures)
    print(f"{time.time() - tic} seconds")

To handle concurrency, in zarr.js we use a dependency called p-queue to limit the number of concurrent requests to 10 by default. This way we can dynamically add to the queue while iterating through the projections. Hopefully this looks somewhat similar to zarr.

const queue = new PQueue({ concurrency: concurrencyLimit });
for (const proj of indexer.iter()) {
    const chunkValue = this.getChunkValue(proj, indexer, value, selectionShape); // returns promise
    queue.add(() => this.chunkSetItem(proj.chunkCoords, proj.chunkSelection, chunkValue)); // returns promise that queue resolves
}
// guarantees that all work on queue has finished
await queue.onIdle();

I think asyncio.Queue exists but that's about my limit of knowledge.

One concern I have here is that the API for accessing a slice in an array in zarr.js is naturally awaitable since everything in javascript is async. We need to wrap our requests in async functions and await the responses.

This in zarr-python,

arr = za[:] 

is the same as this in zarr.js,

const arr = await za.get(null);

regardless of the store type. Ideally you'd hide all the async fetching from the end user and have the same API independent of whether the array is on disk or over HTTP. Someone smarter than me probably knows if this is possible.

@jakirkham
Copy link
Member

To Alistair's point, if this is just about requests, I wonder if simply a Store leveraging asyncio internally would be sufficient to give you the same benefits. Not to say it wouldn't be interesting to add asyncio to all of Zarr. Just curious what is needed to get the performance boost.

@rabernat
Copy link
Contributor Author

I wonder if simply a Store leveraging asyncio internally would be sufficient to give you the same benefits

This is what I had in mind. I think the main performance benefit is when reading / writing many smallish chunks concurrently. We definitely don't want to force zarr users to use async.

Maybe a store could declare itself to use async for certain operations, and then zarr would know to use asyncio when performing loops with I/O operations. Otherwise it would default to the current, synchronous behavior.

I think there are cases where asyncio would hurt us. I did a few tests with files and found that, in some cases, asyncio was slower. I assume this is because issuing serial reads is more efficient than concurrent reads for some disks.

@alimanfoo
Copy link
Member

alimanfoo commented Feb 18, 2020 via email

@jakirkham
Copy link
Member

Yeah this came up in issue ( #384 ). Agree this would be a good solution to this problem (amongst others).

@clbarnes
Copy link
Contributor

clbarnes commented Mar 3, 2020

@alimanfoo would you envision the stores' API being synchronous, and just using asyncio under the hood for multi-gets? I've found asyncio quite difficult to work with because it "poisons" the entire stack above it, forcing the entire pipeline into peppering async/await annotations everywhere. You can get the event loop and resync inside a function call but it's not pretty.

@alimanfoo
Copy link
Member

@alimanfoo would you envision the stores' API being synchronous, and just using asyncio under the hood for multi-gets?

Yes that was my thought.

@mzjp2
Copy link
Member

mzjp2 commented Mar 5, 2020

If we just want to parallelise network requests, we don't need to go full asyncio, right? We could do thread-based parallelism. Not fully convinced we need to buy into the whole asyncio package just to parallelise a few network requests.

@jakirkham
Copy link
Member

That's a good point. We can probably live with something as simple as ThreadPoolExecutor's map.

TBH mapping with async, threads, or otherwise could just be specified as part of the API of this function (multigetitem? or other appropriate name).

@rabernat
Copy link
Contributor Author

rabernat commented Mar 5, 2020

Async and parallel are not the same thing. From my point of view, I use dask to handle actual parallel I/O. The example I posted here is one where I don't actually want dask involved. It's a tiny amount of data. When using xarray with zarr, the dimension coordinates are always read eagerly and without dask. That's the use case that motivated this issue.

Furthermore, I would worry about any internal multithreading within zarr conflicting with dask's own parallelism. This would have to be handled carefully.

@alimanfoo
Copy link
Member

alimanfoo commented Mar 5, 2020 via email

@nbren12
Copy link

nbren12 commented Mar 30, 2020

I ran across this thread because I was having some very slow write performance for small data using gcsfs's mapper object. I cobbled together this mutable mapping class in a few hours today that seems to get 4x improvements compared to gcsfs. Right now, it only supports asynchronous writes by flushing a cache whenever it exceeds a specified size. It would be pretty straightforward to write a multiget method. Is multiget supported by zarr?

@rabernat
Copy link
Contributor Author

@nbren12 - thanks for sharing! I would personally encourage us to be trying to make upstream performance improvements to gcsfs. My limited experience with these cloud filesystem-spec implementations suggests that they do a lot of unnecessary checking / caching (e.g. fsspec/s3fs#285). Are your 4x performance enhancements due to async, or something else?

It would be pretty straightforward to write a multiget method. Is multiget supported by zarr?

I believe this is what is being worked on in #534.

@nbren12
Copy link

nbren12 commented Mar 30, 2020

I'm definitely open to contributing this to gcsfs, but just wanted to put it out in the wild, since I personally just need the mapping capability, not the "fs". I believe the performance enhancement is due to async from examining the debugging level output for the example code in the README. This output shows the HTTP requests made for each call. fsspec does about 10 HTTP operations each taking about .5s to complete in sequence. OTOH, the async code I wrote seems to complete these same requests...well...asynchronously. I didn't notice any gratuitous list operations like #534, and the directory only had about 2 items in it to begin with.

@nbren12
Copy link

nbren12 commented Mar 30, 2020

Here's an example of this output: https://gist.github.com/nbren12/10bed8494d067c3ff6c880c368561602. Part of the problem is that gcsfs seems to be using resumable uploads (requiring 3 HTTP requests for each chunk), but I still suspect async is speeding things up dramatically. The chunk-uploading only takes 0.3-1 seconds with async, but 13 seconds w/o.

@rabernat
Copy link
Contributor Author

Very interesting, thanks for sharing. Pining @martindurant for the gcsfs perspective.

@alimanfoo
Copy link
Member

Thanks @nbren12 for sharing. There isn't support for multiget internally within zarr yet, it's just an idea at the moment, but FWIW I think it's worth pursuing. It will require some re-engineering of zarr internals though, likely a substantial piece of work.

@martindurant
Copy link
Member

Note that fsspec would, in general, love async support for all backends where this is feasible (perhaps not for a few like FTP). However, it is also true that zarr/xarray is used a lot with Dask, so there is some form of parallelism in operation already - for those cases, async may not be helpful.

@martindurant
Copy link
Member

Quick comment for the resumable upload in gcsfs: s3fs has implemented a shortcut to defer creation of the resumable upload and use a one-call version when the total file size is small enough (<5MB). The method for the single-call version exists in GCSFile, but the shortcut to execute is is not in place. It could be copied from s3fs.

@nbren12
Copy link

nbren12 commented Mar 30, 2020

Yah it would be great to have generic async support in fsspec. I expect async will have a lot less overhead than a dask worker for small data, but don't have the benchmarking to back this up.

@rabernat
Copy link
Contributor Author

rabernat commented Mar 30, 2020 via email

@nbren12
Copy link

nbren12 commented Mar 31, 2020

Yah. I will just echo Ryan. Basically this is similar to my workflow, dask distributed for coarse-grained parallelism, and then async for faster I/O.

@clbarnes
Copy link
Contributor

clbarnes commented Mar 31, 2020

Dask just indexes into the array with normal __getitem__, right? I struggle a little with the concepts of asyncio, but my understanding is that within __getitem__, you can just run a bunch of async chunk-fetch calls, blocking until you have all of them, and then assemble them synchronously, so that the async is entirely under the hood: dask doesn't need to know. We can have lots more async tasks than CPUs because we're just waiting for IO.

Alternatively, it looks like dask can work with asyncio, so you could feasibly use the same event loop to prevent just forking and forking and forking. Then it's up the executor to decide how many jobs to run concurrently; zarr using asyncio just defines the dependency graph of the operation, not how it's executed.

If possible, exposing an async Array API would be great, and seems to be doable with asyncio.Futures. This async array stuff (which does all the nuts and bolts of converting indices into chunk indexes, and the returned chunks into a single array) would be exposed to users, and user under the hood, but the current Array would stay synchronous. e.g.

# has a member called `aio` with async API
array = Array(...)

# - get chunk idxs
# - do `await sync_to_async(store.__getitem__)(chunk_key)` for each
# - assemble chunks into output array and return
out = await array.aio[:]

# - basically just calls async_to_sync(self.aio.__getitem__)(key)
out2 = array[:]

There may be some overhead in doing this, of course.

@alimanfoo
Copy link
Member

Dask just indexes into the array with normal __getitem__, right? I struggle a little with the concepts of asyncio, but my understanding is that within __getitem__, you can just run a bunch of async block-fetch calls, blocking until you have all of them, and then assemble them synchronously, so that the async is entirely under the hood: dask doesn't need to know. We can have lots more async tasks than CPUs because we're just waiting for IO.

This captures my understanding very clearly.

@martindurant
Copy link
Member

Of course not! I only mention that this is probably the case that you are most interested in.
I would expect that async functions would most likely be picked up by library developers rather than users, and xarray is a major one.

@Carreau
Copy link
Contributor

Carreau commented Jun 22, 2020

I believe it is easier to create an async API and syncify it, than the opposite,
and especially in the case of zarr we anyway have to expose some sync API at some point as get/setitems on arrays are sync.

Draft of spec v3 is (so far) completely async and expose a sync API like what @martindurant in fsspec.

I think it's more a question of laying down the groundwork for zarr to be async than making it async and have all the internal api be async def so that a potential async backend could work.

@martindurant
Copy link
Member

From the point of view of fsspec (not zarr), the availability of the sync API is essential, both because that is the only one currently available, and because some backends will not be async-able. When that is done, zarr may choose to first use a sync multiget with internal async, and then move to fully async; or whatever works. Is there anything missing in the HTTP POC for zarr to be able to make use of both of these avenues?
(note that the POC doesn't have writing, and the mapper interface has no way to execute the multiget yet, only the one file system implementation is affected)

@martindurant
Copy link
Member

Update:

  • HTTP is async/concurrent for getitems in fsspec master
  • gcsfs is asyn/concurrent for getitems/setitems/delitems in master
  • s3fs likewise, but not yet merged.

@bilts
Copy link

bilts commented Aug 10, 2020

What's the current status of an implementation that calls getitems instead of __getitem__ in the Zarr library? People seem to be mentioning making use of it, but I haven't seen an actual implementation. Is it still in the cards?

@martindurant
Copy link
Member

Currently, zarr calls getitem on each thing it needs, in sequence. To make use of my getitems and any concurrency, the code would need to be changed. I'm not aware of anyone having done this.

@jakirkham
Copy link
Member

I think this is still of interest. Is this something that you would be willing to contribute, @bilts? 🙂

@bilts
Copy link

bilts commented Aug 10, 2020

@jakirkham I would be / have been willing. I have a prototype implementation in a fork mentioned above for people to try it out. It's proven way, way faster for our needs. I think making a more robust change may require someone more able to make choices about zarr-python's internal organization than I am.

@martindurant
Copy link
Member

It's proven way, way faster for our needs.

Hurray!

@jakirkham
Copy link
Member

I think making a more robust change may require someone more able to make choices about zarr-python's internal organization than I am.

That's probably easier/faster to address through the normal review process in a PR 😉

@rabernat
Copy link
Contributor Author

rabernat commented Nov 3, 2020

I believe we can now close this issue, now that #630 and #606 have both been merged!

Just to close the loop, I just did a simple benchmarking exercise from Pangeo Cloud in Google Cloud.

import zarr
import os
import fsspec
import gcsfs
import numpy as np
import uuid

# get a temporary place to write to GCS
uid = uuid.uuid1()
path = os.environ['PANGEO_SCRATCH'] + str(uid) + '.zarr'
gcs = gcsfs.GCSFileSystem()
mapper = gcs.get_mapper(path)

# create a zarr array with many small chunks
shape = (100, 1000)
chunks = (1, 1000)
arr = zarr.create(shape, chunks=chunks, store=mapper)

# time write
%time arr[:] = 9

# time read
%time _ = arr[:]

My test environment included the following versions:

  • zarr 2.5.1.dev30
  • gcsfs 0.7.1+4.g77b5993
  • fsspec 0.8.4+42.g67b2e6f

My old (pre-async) environment was

  • zarr 2.4.0
  • gcsfs 0.7.1

Here is a comparison of the speeds

write read
old 5.65 s 7.27 s
new (w/ async) 1.32 s 245 ms

This is a fantastic improvement, particularly for reading (over 20x)!

👏 👏 👏 for all the effort by @martindurant and the other devs who helped make this happen!

@shoyer
Copy link
Contributor

shoyer commented Apr 2, 2021

One thing I'll add: parallel fetching of chunks in Zarr (as is supported here by getitems) is valuable even in the case where Dask is also used for parallelism. This is because the optimal number of threads for IO is often much higher than the optimal number of threads for computation, so it's a best practice to use a separate thread pool anyways. See pangeo-forge/pangeo-forge-recipes#89 for more details.

@tasansal
Copy link
Contributor

tasansal commented Sep 11, 2021

@shoyer I have a question about this.

I am currently fetching 500 GB to 2 TB chunked datases (~1MB chunks). Running on a 48 physical 96 logical core machine.
Data is on Google Cloud Storage (GCS).

I am using dask with a larger chunksize than the underlying array. i.e. from ~1MB chunks to ~100MB chunks. So each dask task should pull multiple chunks?

I am using a LocalCluster client.
One thing I noticed is if I run a dask.Client with ONLY threads (i.e. Client(n_workers=1, threads_per_worker=48), I get terrible read performance. It is almost equal to a single process (400MB ish).

When I recreate the Client with 48 workers (processes) and 1 thread per worker, I can read up to 5GB/s from GCP. However, this adds some latency because of multiple processes.

Another example is 8 workers with 12 threads, and that scales up to 8x performance from 1 worker, but still much slower than 48 workers. So it only seems to scale up with processes, not threads?

I was wondering if I am doing something wrong? It sounds like the thread only client is NOT fetching multiple parts / chunks at the same time.

I am not using any asynchronous=xxx kwargs anywhere, maybe I need to do that?

gcsfs, FSSpec, zarr, and dask is the combo I am using.

@jakirkham
Copy link
Member

@tasansal can you please raise a new issue? Also please include a minimal reproducer

@tasansal
Copy link
Contributor

@jakirkham, of course, wasn't sure if it should be an issue of its own. Will do that.

@jakirkham
Copy link
Member

Thanks! It just makes it easier for us to keep track of these things :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests