-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Thread based concurrency #315
base: master
Are you sure you want to change the base?
Conversation
Concurrently runs a number of 'Racecar::Runner' instances in a fixed size thread pool. Each thread starts a single `Racecar::Runner` with `Racecar::Consumer` class instance. All threads run the same consumer class, have the same config and consume partitions from the same topic(s). `ThreadPoolRunnerProxy` can be combined with `ParallelRunner`, to run forks and threads. `ParallelRunner` is not used or battle tested (at Zendesk) and so this is still not a recommended thing to do. Other inclusions: - Signal handling has been moved up one level to the CLI - Runner-like object interface standardized to `#run` `#stop` `#running?` - Test can be run locally with Docker, `export LOCAL=1` - Some test bugs have been fixed, connections now always close and orphaned processes raise an exception
32cfd03
to
3ca2bf0
Compare
64dbb63
to
fed9f24
Compare
@@ -223,7 +236,8 @@ def load_consumer_class(consumer_class) | |||
consumer_class.name.gsub(/[a-z][A-Z]/) { |str| "#{str[0]}-#{str[1]}" }.downcase, | |||
].compact.join | |||
|
|||
self.parallel_workers = consumer_class.parallel_workers | |||
self.forks = consumer_class.forks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should avoid breaking changes unless there's a really good reason.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agreed, it's actually aliased which you see further down the PR 👇
@@ -223,7 +236,8 @@ def load_consumer_class(consumer_class) | |||
consumer_class.name.gsub(/[a-z][A-Z]/) { |str| "#{str[0]}-#{str[1]}" }.downcase, | |||
].compact.join | |||
|
|||
self.parallel_workers = consumer_class.parallel_workers | |||
self.forks = consumer_class.forks | |||
self.threads = consumer_class.threads |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be configurable with the config itself as well; could we add a new config key and have this be e.g. self.threads = consumer_class.threads || self.threads
?
# For deprecation | ||
def parallel_workers=(forks) | ||
self.forks = forks | ||
end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I see there's backwards compatibility; all good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the PR!
I love the idea of having multi-thread support in Racecar, but I think the design needs to be changed a bit.
Specifically, I'm not so sure about having one runner per thread; I think the real value would be to have a single runner per process, but having the runner be multithreaded. This would mean a shared message fetch loop, with the loop fanning out message batches to worker threads, perhaps using bounded queues.
This would mean that each worker process would be a consumer in the consumer group, rather than each thread. And we would increase our processing concurrency without reducing the efficiency of the fetches.
My pleasure! That sounds a little more complex but totally doable. This is very much MVP concurrency, changing as little as existing code as possible. Unless someone already has that work in progress, I can give it a try. Is this enough an improvement to consider merging? If so, I think we could iterate towards your proposed design without breaking the API. It should also be marked experimental anyway. |
I'm not sure it provides enough value as it stands – with copy-on-write, the forking approach seems better if we're using separate consumer instances anyway. I think the shared-main-loop approach should be pretty doable; if you look into Runner, you can see how batches are being processed sequentially – we would change that to an async model, with a fixed partition number -> worker thread mapping probably. |
Concurrently runs a number of 'Racecar::Runner' instances in a fixed size thread pool.
Each thread starts a single
Racecar::Runner
withRacecar::Consumer
class instance. All threads run the same consumer class, have the same config and consume partitions from the same topic(s).ThreadPoolRunnerProxy
can be combined withParallelRunner
, to run forks and threads.ParallelRunner
is not used or battle tested (at Zendesk) and so this is still not a recommended thing to do.Racecar does not yet implement a health-check mechanism so an uncaught error in a single worker thread will cause the process to start a graceful shutdown of all threads before exiting with an error.
Other inclusions:
#run
#stop
#running?
export LOCAL=1