Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof of concept: CloudFilesStore #767

Open
rabernat opened this issue Jun 4, 2021 · 7 comments
Open

Proof of concept: CloudFilesStore #767

rabernat opened this issue Jun 4, 2021 · 7 comments

Comments

@rabernat
Copy link
Contributor

rabernat commented Jun 4, 2021

We currently rely 100% on fsspec and its implementations for accessing cloud storage (s3fs, gcsfs, adlfs). Cloud storage is complicated, and for debugging purposes, it could be useful to have an alternative. Since I met @william-silversmith a few years ago, I have been curious about CloudFiles:

https://github.com/seung-lab/cloud-files/

CloudFiles was developed to access files from object storage without ever touching disk. The goal was to reliably and rapidly access a petabyte of image data broken down into tens to hundreds of millions of files being accessed in parallel across thousands of cores. The predecessor of CloudFiles, CloudVolume.Storage, the core of which is retained here, has been used to processes dozens of images, many of which were in the hundreds of terabyte range. Storage has reliably read and written tens of billions of files to date.

Highlights

  1. Fast file access with transparent threading and optionally multi-process.
  2. Google Cloud Storage, Amazon S3, local filesystems, and arbitrary web servers making hybrid or multi-cloud easy.
  3. Robust to flaky network connections. Uses exponential random window retries to avoid network collisions on a large cluster. > Validates md5 for gcs and s3.
  4. gzip, brotli, and zstd compression.
  5. Supports HTTP Range reads.
  6. Supports green threads, which are important for achieving maximum performance on virtualized servers.
  7. High efficiency transfers that avoid compression/decompression cycles.
  8. High speed gzip decompression using libdeflate (compared with zlib).
  9. Bundled CLI tool.
  10. Accepts iterator and generator input.

Today I coded up a quick CloufFiles-based store for Zarr

from cloudfiles import CloudFiles

class CloudFilesMapper:
    
    def __init__(self, path, **kwargs):
        self.cf = CloudFiles(path, **kwargs)
        
    def clear(self):
        self.cf.delete(self.cf.list())
        
    def getitems(self, keys, on_error="none"):
        return {item['path']: item['content'] for item in self.cf.get(keys, raw=True)}
    
    def setitems(self, values_dict):
        self.cf.puts([(k, v) for k, v in values_dict.items()])
        
    def delitems(self, keys):
        self.cf.delete(keys)
        
    def __getitem__(self, key):
        return self.cf.get(key)
    
    def __setitem__(self, key, value):
        self.cf.put(key, value)
        
    def __iter__(self):
        for item in self.cf.list():
            yield item

    def __len__(self):
        raise NotImplementedError

    def __delitem__(self, key):
        self.cf.delete(key)

    def __contains__(self, key):
        return self.cf.exists(key)

    def listdir(self, key):
        for item in self.cf.list(key):
            yield item.lstrip(key).lstrip('/')
            
    def rmdir(self, prefix):
        self.cf.delete(self.cf.list(prefix=prefix))

In my test with GCS, it works just fine with Zarr, Xarray, and Dask: https://nbviewer.jupyter.org/gist/rabernat/dde8b835bb7ef0590b6bf4034d5e0b2f

Distributed read performance was about 50% slower than gcsfs, but my benchmark is probably biased.

It might be useful to have the option to switch between the fsspec-based stores and this one. If folks are interested, we could think about adding this to zarr-python as some kind of optional alternative to fsspec.

@william-silversmith
Copy link

Thanks for this Ryan! I'm having trouble running the notebook on my M1 (several dependencies are x86 only even though I have Rosetta2), but I'll try again with my x86 machine later. I'm very interested to see what the profile says. I'm always interested in increasing performance.

One minor quick tip. setitems can be written using a generator to avoid the list memory and generation time overhead.

    def setitems(self, values_dict):
        self.cf.puts(( (k, v) for k, v in values_dict.items() ))

@william-silversmith
Copy link

Also, it just occurred to me that you mentioned distributed read performance. I've found on virtualized servers that the Python threads jump around between the different cores in a way that doesn't happen on real hardware. Try using the green threads option and you might be pleasantly surprised. You can achieve similar results using taskset on linux machines to pin a process to a single core. I hope to at some point in the next 6 months to experiment more with an async/await implementation that would avoid the need for the gevent library and would enable http/2 support.

@rabernat
Copy link
Contributor Author

rabernat commented Jun 4, 2021

Also, it just occurred to me that you mentioned distributed read performance.

Specifically, Dask.distributed. In our group, we are usually doing our scaling out via dask. One point I don't understand is how CloudFiles' thread-based parallelism would interact with Dask's multi-faceted parallelism (workers, processes, threads, etc.)

@william-silversmith
Copy link

william-silversmith commented Jun 5, 2021 via email

@shoyer
Copy link
Contributor

shoyer commented Jun 5, 2021

This looks like a great idea! 👍

Generally you want to use many more parallel threads for IO-bound work (like reading/writing Zarr chunks) than from for compute-bound work (for which you want roughly one thread per core). So I think this nested parallelism is actually probably a good thing -- you want more threads for the IO tasks than you want for the compute tasks.

@jakirkham
Copy link
Member

Somewhat related discussion about using multiple ThreadPoolExecutors per Dask Worker from earlier today here ( dask/distributed#4655 (comment) )

@martindurant
Copy link
Member

Note that fsspec uses asyncio to fetch multiple chunks concurrently, so this can greatly increase performance by setting each dask partition to be larger than the zarr chunksize.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants