-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Ensure fair fetch response allocation across topic-partitions #859
Conversation
When receiving a fetch request, the broker processes partitions in the order they appear in the fetch request. The response will contain up to `maxBytesPerPartition` per partition, up to a total of `maxBytes`, so if `maxBytesPerPartition * partitions > maxBytes`, the partitions that appear later in the request will not be allocated any data, which means that the consumer would first consume until it hits the head of the first partitions, before consuming any messages on the partitions that appear later in the request. To ensure a more balanced consumption across all partitions, we randomize the order of all partitions in the request. This means that over time all partitions should be consumed approximately evenly. Fixes #849
test('it should randomize the order of the requested topic-partitions', async () => { | ||
const topics = [createFetchTopic('topic-a', 3), createFetchTopic('topic-b', 2)] | ||
|
||
mockShuffle.mockImplementationOnce(() => [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't really love this test, since I had to module mock shuffle
- i.e. reach into the internals of broker.fetch
- but it's hard to assert on things that have randomness, and in this case I really do want a specific order to check that consecutive partitions for the same topic get merged.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally fair I'd say. My best attempt would asserting that the order is different, but even that's not guaranteed: ending up with the same order is still a valid shuffle result. Mocking this order is the closest I imagine you could get to verifying the logic in broker.fetch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this a tiny bit more in case you'd really didn't like the test, I guess you could spy on shuffle
instead of mocking it, so you'd know the order of topics it returned when called by broker.fetch
. That would verify the interface between those two to work correctly, but since the surface area of that is so small and straightforward, I'm pretty sure that I wouldn't bother.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem with that is that I need shuffle to actually return a very specific result in order to be able to validate that consecutive partitions for the same topic are merged. Meaning if we have topic-partitions:
topic-a | 1
topic-a | 2
topic-b | 1
topic-b | 2
And after shuffling it looks like:
topic-a | 1
topic-b | 1
topic-b | 2
topic-a | 2
We want the resulting request to be:
topic-a | 1
topic-b | 1, 2
topic-a | 2
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me!
test('it should randomize the order of the requested topic-partitions', async () => { | ||
const topics = [createFetchTopic('topic-a', 3), createFetchTopic('topic-b', 2)] | ||
|
||
mockShuffle.mockImplementationOnce(() => [ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally fair I'd say. My best attempt would asserting that the order is different, but even that's not guaranteed: ending up with the same order is still a valid shuffle result. Mocking this order is the closest I imagine you could get to verifying the logic in broker.fetch
.
See #849 for details, but this fixes a big issue with uneven consumption when the consumer has fallen behind on several partitions and
maxBytesPerPartition * numPartitions > maxBytes
.