Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add an async disruptor/ring-buffer queue abstraction for buffered communication between producer/consumer coroutines #17

Closed
lewissbaker opened this issue Jun 19, 2017 · 4 comments

Comments

@lewissbaker
Copy link
Owner

lewissbaker commented Jun 19, 2017

We have async_generator<T> that can be used for producer/consumer coroutines to communicate, however it has no buffering capability which means that the elements can only be generated and processed one at a time.

We should look at something similar to https://github.com/lewissbaker/disruptorplus but that uses co_await and coroutine suspension as a wait-strategy for handling the full/empty buffer cases.

  • Add support for a single-threaded producer as well as a multi-threaded producer.
  • Add a sequence_barrier abstraction.
  • Needs integration with schedulers to allow producer to resume consumer asynchronously.

I have some code kicking around in a side-project that I can port to cppcoro.

@lewissbaker lewissbaker changed the title Add an async disruptor/ring-buffer queue abstraction for communication between producer/consumer Add an async disruptor/ring-buffer queue abstraction for buffered communication between producer/consumer coroutines Jun 19, 2017
@lewissbaker
Copy link
Owner Author

I've added some initial abstractions for this to the 'disruptor' branch:
https://github.com/lewissbaker/cppcoro/tree/disruptor

This has the sequence_barrier, single_producer_sequencer and multi_producer_sequencer abstractions for dealing with sequence numbers.

Some simple multi_producer_sequencer tests on my (slow) dev machine show it can achieve a throughput of around 20M items (64-bit integers) per second for two producer coroutines enqueuing one item at a time and a single consumer consuming the items in batches and simply summing the values. A higher throughput can be achieved if producers are able to batch-enqueue items.

Still need to add a ring_buffer helper class to simplify indexing into the buffer.
Also need to add some documentation.

@akoolenbourke
Copy link

Interesting. I ported the disruptor to C++ about a year ago and found it to be pretty damned good; so long as you use the correct wait_strategy's for your requirements. Can't wait to try this out as we will need a messaging system at some point and if it can work with cppcoro and coroutines, all the better!

@lewissbaker
Copy link
Owner Author

The performance of the coroutine-based wait strategy is not quite going to be able to match that of a spin-wait strategy that avoids any synchronisation other than acquire-reads and release-stores.

The coroutine wait-strategy requires a seq_cst memory barrier whenever a coroutine needs to suspend in a wait_until_published() operation. It also needs a seq_cst memory barrier for every publish() operation.

This should still be more efficent than a wait-strategy that uses std::mutex and std::condition_variable and blocks the thread, though.

@lewissbaker
Copy link
Owner Author

This has now been merged (finally).

See PR #92

tavi-cacina pushed a commit to tavi-cacina/cppcoro that referenced this issue Dec 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants