Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Concurrent message processing #332

Merged
merged 12 commits into from Apr 18, 2019
Merged

Concurrent message processing #332

merged 12 commits into from Apr 18, 2019

Conversation

Nevon
Copy link
Collaborator

@Nevon Nevon commented Apr 8, 2019

This implements a simple way to achieve partition-aware concurrency. This allows a consumer to consume messages from multiple partitions concurrently, while still ensuring ordering within each partition.

Usage

consumer.run({
  partitionsConsumedConcurrently: 3, // Default: 1
  eachMessage: async ({ topic, partition, message }) => {
      // This will be called up to 3 times concurrently
  }
})

In order to benefit from this kind of concurrency, the consumer needs to be subscribed to a topic with at least 2 partitions, and the work being done in eachMessage/eachBatch needs to be async (preferably I/O). I added a guideline in the docs to avoid people setting this to Math.MAX_SAFE_INTEGER and expecting their consumer to be super fast.

This is implemented by creating a kind of queue every time we finish a fetch. Each item in the queue is processed concurrently up to the specified limit. If one throws, we reject all the pending items. I did this to behave similarly to the existing behavior where we bail out of the loop on failure, but given that each item is a separate topic/partition, this is something we might be able to revisit in order to keep processing other partitions even if one of them fails.

Fixes #330.

This is screwing up the console output, and if we have resources that
aren't cleaned up, we should fix that instead.
Processes each batch per fetch concurrently, up to the specified limit.
This allows for simple concurrency while maintaining order within each
partition.
@Nevon Nevon requested a review from tulios April 8, 2019 13:07
onChange(semaphore)

if (waiting.length > 0) {
const fn = waiting.pop()
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should shift instead, to grab from the head of the list

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 7069059


The same thing applies if you are using [`eachBatch`](Consuming.md#each-batch). Given `partitionsConsumedConcurrently > 1`, you will be able to process multiple batches concurrently.

A guideline for setting `partitionsConsumedConcurrently` would be `partitions < partitionsConsumedConcurrently < CPU cores`.
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this right? I think it is backwards right, partitionsConsumedConcurrently should be lower than partitions

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rewrote this to actually make sense.


The same thing applies if you are using [`eachBatch`](Consuming.md#each-batch). Given `partitionsConsumedConcurrently > 1`, you will be able to process multiple batches concurrently.

A guideline for setting `partitionsConsumedConcurrently` would be that it should not be larger than the number of partitions consumed. Depending on whether or not your workload is CPU bound, it may also not benefit you to set it to a higher number than the number of logical CPU cores. A recommendation is to start with a low number and measure if increasing leads to higher throughput.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the number of CPU cores relevant for a Node application? This is still single threaded processing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It depends on your workload. If the message processing is entirely synchronous, then this setting is completely irrelevant as all the work takes place on the main thread and will thus be executed sequentially regardless.

However, in most cases the work is not entirely synchronous. In many cases, we call remote APIs or read/write to a database, etc. In those cases, some of the work takes place outside of the main thread, such as when doing network I/O. If each message requires you to make a network call to some API, you would normally just sit around and wait while the remote service is doing some work, but with this concurrency model you can instead fire off multiple of these requests concurrently.

So then you might say that you would just set the concurrency limit to whatever number of partitions you have, in order to achieve maximum concurrency, but that may not actually increase throughput for multiple reasons. First, your downstream dependency might not appreciate getting that many concurrent requests and it's likely their performance may suffer. Even if that's fine, network or disk I/O is not infinitely parallelisable, and correlates with the number of cores you have.

While they're talking about connection pool sizing in this article, the principles are very much the same, and it's a really good high-level explanation: https://github.com/brettwooldridge/HikariCP/wiki/About-Pool-Sizing

So the complex answer is that it really depends on your workload and your dependencies. The recommendation is pretty conservative, because otherwise people will put a gigantic number in there thinking that "more is better" and end up getting lower throughput and potentially really bursty behavior. Throughput may very well increase with higher concurrency, given the right conditions, but it's better to start low and increase it over time while measuring the effect until you hit the sweet spot.


The same thing applies if you are using [`eachBatch`](Consuming.md#each-batch). Given `partitionsConsumedConcurrently > 1`, you will be able to process multiple batches concurrently.

A guideline for setting `partitionsConsumedConcurrently` would be that it should not be larger than the number of partitions consumed. Depending on whether or not your workload is CPU bound, it may also not benefit you to set it to a higher number than the number of logical CPU cores. A recommendation is to start with a low number and measure if increasing leads to higher throughput.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can I set it to a high number to always process all partitions in parallel? Is the behaviour the same for any setting > # consumed partitions ?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. You can set it to a number higher than the number of partitions, and the effective result will be the same as if you set it to the number of partitions.

@Nevon Nevon merged commit aa63b4c into master Apr 18, 2019
@Nevon Nevon deleted the concurrent-message-processing branch April 18, 2019 07:33
@tulios
Copy link
Owner

tulios commented Apr 23, 2019

Fixes #320

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Partition aware concurrent eachBatch/eachMessage
3 participants