Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace fetch promise all with async generator #570

Merged
merged 12 commits into from
Jan 30, 2020

Conversation

tulios
Copy link
Owner

@tulios tulios commented Nov 29, 2019

This PR fixes issue #370, and it can improve the overall performance of KafkaJS (pending benchmark).

Explaining the problem:

The fetch operation has its responsibility split into two modules: runner and consumerGroup. The runner is responsible for managing the consumer life-cycle and feeding the data to the userland code. The consumer group module abstracts the fetch operation by hiding the complexity of leadership, multiple brokers, and partitions.

Issue #370 highlighted the following problem, given a topic with 3 partitions where partition 0 hasn't received any messages in a specific t0, a call to consumerGroup#fetch would block the processing for maxWaitTimeInMs, usually 5s, even if the other partitions had data. This behavior is happening because consumerGroup#fetch uses Promise.all to wait for the requests to all partition leaders, and runner#fetch waits for the list of batches before it feeds any data to the user-land code.

Proposed solution:

consumerGroup#fetch returns an async generator that can yield new results over time, so using the previous example, the user-land code would start processing messages from the fastest partition first, without paying any upfront cost for the partitions without data.

One drawback:

We won't be able to provide numberOfBatches to the FETCH instrumentation event since the batches will be yielded overtime. I think it is a fine compromise, WDYT?

@vykimo @bobzsj87 @IvanovOleg I know this is an old issue and you might have moved away, but any input would be appreciated.

src/consumer/runner.js Outdated Show resolved Hide resolved
Copy link
Collaborator

@JaapRood JaapRood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really excited about this change, an async generator really seems the way to go for this use case. It further isolates the fetching of partitions, which is what should be the unit of concurrency in Kafka after all. For ones who implement a Readable Stream per partition assigned, this means we're a step closer to having separate back-pressure for each. For example, we have a processing task where results are written to an intermediary topic to then be consumed again, all within the same consumer group. Since there's a 10:1 reduction in data from input to this intermediary topic, we've definitely seen some of the latency issues described.

src/consumer/consumerGroup.js Show resolved Hide resolved
The async generators were officially released on node 10, they are 
available with the harmony flag on node 8, but the support for node 8 is 
ending soon. We might not be able to merge this change before the end of 
the LTS date.
@tulios
Copy link
Owner Author

tulios commented Nov 29, 2019

We will need to bump to node 10. Node 8 LTS reaches end of life in the end of the year, so we can release this after that.

src/consumer/runner.js Outdated Show resolved Hide resolved
@tulios
Copy link
Owner Author

tulios commented Dec 2, 2019

The tests are green; I will wait for the next release and the end of node 8 LTS support before I merge.

@andrewreineke
Copy link
Contributor

Just wanted to echo support for this change since we're currently using KafkaJS in production and have a large number of topic-partitions in anticipation of future load - and as such are paying the E2E latency tax.

Node 8 LTS is EOL as of the new year, so thanks for having this on your radar! (and also for all the work maintaining the rest of KafkaJS)

Cheers

@tulios tulios merged commit 7fc1a14 into master Jan 30, 2020
@tulios tulios deleted the replace-fetch-promise-all-with-generator branch January 30, 2020 10:01
}

await onBatch(batch)
await this.consumerGroup.heartbeat({ interval: this.heartbeatInterval })
Copy link
Contributor

@WaleedAshraf WaleedAshraf Apr 22, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tulios calling heartbeat here, will fix this issue: #708

This comment was marked as resolved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants