-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Discard messages for toppars that saw a seek operation during a fetch or batch processing #367
Conversation
…one stale by a seek operation
…if it has since gone stale through a seek operation
Typically with these kinds of issues I'd start with a test, but I've had some trouble figuring out the best way to approach this. I've tried implementing an integration test in |
Not sure how that test failure is triggered by these changes 🤔. |
We have a lot of integration tests; they can be flaky some times. I am re-running the broken step. I am a bit busy this week; I will try to review this as soon as I can. |
Meanwhile, I thought I'd add some context to how we ran into this, it might help in figuring out an integration test. In stream processing, upon ever rebalance, there is a setup phase for each assignment taken on by the consumer. As this might take a while, consumption for that assignment is immediately paused This setup triggers the issue described by this pull request. Upon receiving the rebalance the |
It took me a while, but eventually managed to implement an integration test that isolates the issue. Leveraging a higher |
Before writing the additional tests to also cover the state batches I'm keen to get an initial review. Once everyone is happy with the direction I'll get onto the other tests. Those should be easier to isolate, as we have control over the consumption loop 😅! |
I will try to look at it this week. :D |
src/consumer/runner.js
Outdated
@@ -187,6 +187,7 @@ module.exports = class Runner { | |||
}, | |||
uncommittedOffsets: () => this.consumerGroup.uncommittedOffsets(), | |||
isRunning: () => this.running, | |||
isStale: ({ topic, partition }) => this.consumerGroup.hasSeekOffset({ topic, partition }), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe isStale
isn't clear enough, what do you think? I can't offer a better name now, let me think about this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Naming is always hard :/ I tried to reason from a person implementing eachBatch
and how the name of the method should hint at what they can or cannot do. In this case we'd want to imply that one could continue processing, but that messages have gone outdated. Maybe isCanceled
makes more sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sitting on isCanceled
for a bit, I think it might be confusing in combination with isRunning
. What would be the difference between the two? While isStale
in isolation is maybe less obvious, I do think it avoids that confusion.
@JaapRood I like the PR, the changes make sense. |
I added tests to verify both As far as I'm concerned this is ready to be merged. Pleased to hear otherwise :) |
@JaapRood yes, this PR is ready. I will merge it 🎉 |
After calling
consumer.seek
, any new messages processed should be from the sought to offset.Before attempting a fetch, the
consumerGroup
of theconsumer
takes into account anyconsumer.seek
calls, fetching messages at the right offset. However, since fetching is an async operation, ifconsumer.seek
is called while the fetch is in progress, the fetched messages are still returned for processing withconsumer.run
.To fix this, we can get pretty far by filtering the messages once again after the response of a fetch is received. By checking whether the topic and partition of a batch has a pending seek operation, we know to discard the messages.
Another situation is that a batch of messages might currently be processed. To get the behaviour we're after, we'll have to check for each message whether a seek operation is pending (similar to how you check whether we're still running). In the case of
eachMessage
we can do this for the user, foreachBatch
we expose aisStale
function (likeisRunning)
.