Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[V3] Support for batched Store API in v3 #1806

Open
akshaysubr opened this issue Apr 22, 2024 · 17 comments
Open

[V3] Support for batched Store API in v3 #1806

akshaysubr opened this issue Apr 22, 2024 · 17 comments
Milestone

Comments

@akshaysubr
Copy link

The Store API in v3 supports fetching a single key at a time or partial values of multiple keys.

class Store(ABC):
    @abstractmethod
    async def get(
        self, key: str, byte_range: Optional[Tuple[int, Optional[int]]] = None
    ) -> Optional[bytes]:
        ...

    @abstractmethod
    async def get_partial_values(
        self, key_ranges: List[Tuple[str, Tuple[int, int]]]
    ) -> List[bytes]:
        ...

The new BatchedCodecPipeline, for instance, currently fetches data for all requested chunks concurrently by using an asyncio thread pool with each task calling the get method of the Store:

            chunk_bytes_batch = await concurrent_map(
                [(byte_getter,) for byte_getter, _, _, _ in batch_info],
                lambda byte_getter: byte_getter.get(),
                runtime_configuration.concurrency,
            )

Since there have been some concerns about scalability of asyncio for a large number of tasks, would it make sense to move this batched fetch into the Store itself? This would allow another Store implementation to potentially use a more performant asynchronous framework for the batched fetch, say in C++ or Rust, and can look like a single asyncio task to zarr-python.

This is a feature that currently exists in v2 through the getitems Store API which is used to enable GPU Direct Storage in Zarr through kvikIO.

A similar feature is also now added (provisionally) to v3 codecs that support an encode_batch and a decode_batch with a default implementation that ships tasks off to an asyncio thread pool but allows a codec to override that in favor or another approach if needed.

@akshaysubr akshaysubr changed the title Support for batched Store API in v3 [V3] Support for batched Store API in v3 Apr 24, 2024
@rabernat
Copy link
Contributor

Do queues have a role to play here?

To me it makes sense that the store and the codec pipeline would communicate via a queue. The store puts compressed chunks into a queue. The codec pipeline pulls items out and processes them. Either one could do this either via single iterms or in batches. If the queue is full, the store could stop fetching chunks until there is room (backpressure).

@JackKelly
Copy link
Contributor

JackKelly commented Apr 24, 2024

I'd love to see a batched Store API in Zarr-Python!

(Just to connect the dots: Here's an informal proposal I wrote for a Store.get_items method for Zarr-Python... which includes links to @akshaysubr's previous discussions about this topic!)

the store and the codec pipeline would communicate via a queue

Yes! I agree! (Ideally a multi-producer multi-consumer queue.) For reference, here's my current planned design for the light-speed-io Rust crate that I'm currently chipping away at. The relevant part for this discussion is the crossbeam::channel which sends chunks from "Layer 1 (the IO layer)" to "Layer 2: Parallel computation on chunks" 🙂

My ultimate aim is to allow Zarr-Python (and others) to offload IO (and decompression?) to Rust, if users want that.

@jhamman
Copy link
Member

jhamman commented Apr 24, 2024

I've also been thinking that a producer/consumer queue is the way forward here but I'm not clear at all where in the stack these queues would go. IMO, this is the part of the equation that is missing for us to take action here.

@rabernat
Copy link
Contributor

If this queue was an object which could be addressed by non-python code, then that would really open the door to the sort of interoperability we are looking for.

Python std library has two Queue objects:

Both of these can be limited on the number of items. But I think we want to limit on the total number of bytes stored instead.

We could also implement our own purpose-built Queue in a low-level language and talk to it from Python.

@rabernat
Copy link
Contributor

Here's an example of sending items to a python Queue from an async task and then processing those items from a thread. This is basically what we want to do between store and codec pipeline.

import threading
import queue

import asyncio
import time

SLEEP = 0.1

async def generate_data(item, q):
    # non-blocking sleep--represents time spent on I/O
    await asyncio.sleep(SLEEP)  
    print(f"putting item {item}")
    q.put(item)

def process_item(q):
    item = q.get()
    # blocking sleep--represents CPU-bound task
    time.sleep(SLEEP)
    print(f"processed {item}")
    q.task_done()

def worker():
    while True:
        process_item(q)

async def generate_items():
    for n in range(10):
        await generate_data(n, q)

q = queue.Queue()
threading.Thread(target=worker, daemon=True).start()

await generate_items()

@JackKelly
Copy link
Contributor

JackKelly commented Apr 24, 2024

On the question of sharing a queue between Python and Rust: I like the idea. But I've searched the PyO3 GitHub organisation for "queue" and haven't found anything relevant 😞

For my main use-case, where I'd like to load and decompress on the order of 1 million chunks per second (on a single machine), I'd probably want to keep the chunk-by-chunk loading & processing in Rust-land. I'd guess that a Python queue would struggle to handle 1 million items per second, whilst still leaving enough CPU cycles for decompression and other bits and bobs (but I haven't tried and I'd love to be proved wrong!).

What I'm imagining - whilst being very ignorant of the current architecture of Zarr-Python v3 - is something like this:

The user would, in a single function call, ask Zarr-Python to load some subset of a huge array. Zarr-Python would calculate the set of chunks that need to be loaded1. Then Zarr-Python would, in a single async function call, ask the Store to "get and decompress these million chunks, and let me know when you're done". It'd be entirely up to the Store to figure out how best to parallelise the work, and how to pass data from IO to decompression.

Could that work? (I'm sure there's some glaring problem with that overly simple sketch!) I'm guessing that doesn't fit at all with the way that Zarr v3 splits up IO and codecs?

Footnotes

  1. Although it's possible that Python would struggle to compute which million chunks we want, in the time available. So we may also want to compute the set of chunks in Rust-land. But then we've got a full Zarr implementation in Rust!

@rabernat
Copy link
Contributor

In the current architecture, the Store doesn't handle decompression. That's a separate thing (the Codec Pipeline).

@JackKelly
Copy link
Contributor

JackKelly commented Apr 24, 2024

ah, yes, of course... sorry.... 😳... I need to re-read the Zarr v3 spec soon 🙂 !

On the topic of sharing a queue between Rust and Python... In a recent comment, a core maintainer of the PyO3 library says:

You could wrap the receiver end of the [Rust std::sync::mpsc::]channel in a #[pyclass] to expose the receiving end to Python. You have the choice of using blocking or async to wait for messages.

@rabernat
Copy link
Contributor

I need to re-read the Zarr v3 spec soon 🙂 !

This is not really covered by the spec. It's about the software architecture of Zarr Python (which is exactly what this issue is for discussing).

@rabernat
Copy link
Contributor

I'd guess that a Python queue would struggle to handle 1 million items per second

Here's a quick experiment

import threading
import queue

import asyncio
import time

class Timer:

    def __init__(self):
        self._log = [("start", time.perf_counter())]

    def log(self, message):
        self._log.append((message, time.perf_counter()))

    def __repr__(self):
        out = f"<Timer started at {self._log[0][1]}>\n"
        for n in range(1, len(self._log)):
            message = self._log[n][0]
            diff = self._log[n][1] - self._log[n-1][1]
            out += f" - {message}, {diff}\n"
        return out

# don't add any overhead
SLEEP = 0


async def enqueue_data(batch, q):
    # non-blocking sleep--represents time spent on I/O
    await asyncio.sleep(SLEEP)  
    for item in batch:
        q.put(item)

def process_item(worker_id, q, batch_size=1):
    # not sure if batching really optimizes anything here since
    for _ in range(batch_size):
        item = q.get()
        # blocking sleep--represents CPU-bound task
        time.sleep(5*SLEEP)
        q.task_done()

def worker(worker_id, q, batch_size=1):
    while True:
        process_item(worker_id, q, batch_size=batch_size)

async def generate_items(NITEMS, q, batch_size=1):
    for batch in batched(range(NITEMS), batch_size):
        await enqueue_data(batch, q)

NTHREADS = 32
NITEMS = 1_000_000

timer = Timer()
q = queue.Queue()
timer.log("create queue")

threads = [
    threading.Thread(
        target=worker,
        args=(worker_id, q,),
        kwargs={"batch_size": 100},
        daemon=True
    ).start()
    for worker_id in range(NTHREADS)
]
timer.log("create threads")

await generate_items(NITEMS, q, batch_size=100)
timer.log(f"generate {NITEMS} items")
q.join()
timer.log("finish processing")

pritn(timer)
<Timer started at 108872.175694578>
 - create queue, 4.448898835107684e-05
 - create threads, 0.003170869007590227
 - generate 1000000 items, 0.6771369689959101
 - finish processing, 10.761253934004344

So I was able to enqueue 1_000_000 items in less than a second (working in batches of 100), but it took me 10 seconds to process them (32 core machine). This example took 10 minutes to cook up, so probably lots of room for improvement.

@akshaysubr
Copy link
Author

This discussion about how to orchestrate a producer-consumer workflow between chunk readers (Store) and decompressors (Codecs) is really interesting. But I worry that it is taking away from the more fundamental question of the required Store API. Even if we use a queue to orchestrate these tasks, the current Store API only allows the store to put one item at a time into the queue rather than a batch of items at a time into the queue.

I would propose that we think about implementation of points raised in this issue in two steps:

  1. Add a batched API to the Store base class that defaults to just doing concurrent loads from multiple threads (similar to the default decode_batch in the Codec API). This way, a Store implementation can decide if it wants single chunk fetching being the natural way to do things or if it makes more sense for the implementation to think of batched case being the default with single chunk fetches being a specialization of that.
  2. The concurrency policy is essentially encoded into the CodecPipeline. This is where I think the mapping of these tasks to a specific system architecture should be encoded. We would want to have different approaches reading from a local drive or an object store and similarly for computing on the CPU vs GPU. There are two codec pipelines currently in Reworked codec pipelines #1670: A BatchedCodecPipeline and an InterleavedCodecPipeline depending on what is better for a specific hardware architecture. We can certainly add another DynamicCodecPipeline that uses queues.

"get and decompress these million chunks, and let me know when you're done"

@JackKelly This should be possible by implementing a custom CodecPipeline, potentially in Rust, but is broader scope than the Store. The scope of light-speed-io seems to be aligned with the scope of CodecPipeline. Though, I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?

@JackKelly
Copy link
Contributor

JackKelly commented Apr 30, 2024

The scope of light-speed-io seems to be aligned with the scope of CodecPipeline. Though, I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?

Before I can answer properly, I need to get myself up-to-speed on the design of the CodecPipeline 🙂! Is #1670 the best place to read the latest info about the CodecPipeline? Is there an architecture diagram showing how CodecPipeline fits into zarr-python? (no worries if not! I appreciate that everyone is short on time!)

@JackKelly
Copy link
Contributor

I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?

My plan is for light-speed-io to call codec libraries written in C/C++ or Rust. For example, there's a Rust wrapper for c-blosc2. Although I'll probably start with more "mainstream" codecs like lz4.

Whilst it's technically possible for Rust to call Python code (via PyO3), I'd worry that would kill any performance benefits! (Although I haven't benchmarked it!)

@normanrz
Copy link
Contributor

normanrz commented Apr 30, 2024

The scope of light-speed-io seems to be aligned with the scope of CodecPipeline. Though, I don't quite understand how light-speed-io would interface with existing Codecs. Would those be python calls into the Codec API from Rust?

Before I can answer properly, I need to get myself up-to-speed on the design of the CodecPipeline 🙂! Is #1670 the best place to read the latest info about the CodecPipeline? Is there an architecture diagram showing how CodecPipeline fits into zarr-python? (no worries if not! I appreciate that everyone is short on time!)

The PR is the best place for the moment. Basically, the CodecPipelines get a list of chunks to fetch with the according slices, fetch the data from the store, decode the chunks and assemble the chunk arrays into 1 output array.

Currently, there are 2 implementations: batched and interleaved. I want to combine both into one.

It would not be a big change for the codec pipeline to accept a queue. Probably changes the Store API considerably, though.

@JackKelly
Copy link
Contributor

JackKelly commented May 1, 2024

Sounds great! Thank you for the explanation.

I'll try to enable my little light-speed-io Rust crate to provide CodecPipeline-like functionality (fetching data from IO, decoding, assembling chunks into 1 output array) and a Python API. The ultimate aim could be that light-speed-io could provide a drop-in CodecPipeline for zarr-python. (Although light-speed-io will be limited to local storage on Linux for a little while!) Does that sound viable and vaguely useful?

Although... I should stop mumbling about my pet project and instead help to pull the focus of this thread back to @akshaysubr's original questions about a batched Store API!....

@normanrz
Copy link
Contributor

normanrz commented May 1, 2024

The ultimate aim could be that light-speed-io could provide a drop-in CodecPipeline for zarr-python. (Although light-speed-io will be limited to local storage on Linux for a little while!) Does that sound viable and vaguely useful?

Yes, that should be possible. At this point, I wouldn't consider the CodecPipeline a stable API, though. It will probably change quite a bit until the 3.0 release and maybe even afterwards.

@JackKelly
Copy link
Contributor

No worries, sounds good.

When you do start the process of stabilising the CodecPipeline API, please could you tag me in the discussion? I'll probably just passively listen to the discussion, so I can have the best chance of moulding light-speed-io to play nicely with zarr-python v3 🙂

@jhamman jhamman added this to the 3.0.0 milestone May 17, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants