-
-
Notifications
You must be signed in to change notification settings - Fork 527
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Only fetch for partitions with initialized offsets #582
Conversation
@JaapRood this is a new approach to the problem describe in the Lock PR. |
To reproduce the issue, I ran the branch
I produced a bunch of messages to that topic, and then I started a consumer and consumed a bit. Then I killed my consumer and kept producing until the broker did a cleanup of old segments (this takes minutes btw...) Now when I started the consumer again, it would break because the old committed offset was now invalid, as described in #578. With this fix, what happened instead was that we detected that some partitions didn't have a valid offset, and didn't fetch for them. Shortly after, each partition had recovered and fetches continued as usual. |
@@ -368,13 +368,34 @@ module.exports = class ConsumerGroup { | |||
) | |||
|
|||
const leaders = keys(partitionsPerLeader) | |||
const committedOffsets = this.offsetManager.committedOffsets() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we're using committedOffsets
here, instead of just resolved? An invalid offset could come from attempting to resume from a committed offset, but also from a consumer.seek
. Your great comment touches on how both are cleared, so I'm not sure whether there would be any actual difference in behaviour, but as future changes are made this subtle difference might be harder to spot while becoming more consequential.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initially I was actually using resolvedOffsets
, but that didn't work because OffsetManager.resolveOffsets
actually just sets the initialized consumer offsets in committedOffsets
, not in resolvedOffsets
(did someone mention that our naming is confusing...? 😅). When the consumer first boots, it doesn't actually have any resolved offsets, so the only source of offsets is the initialized offsets in committedOffsets
.
Regarding the seek behavior, I would expect it to work the same way, no? Seek would commit (potentially invalid) offsets and then clear both committedOffsets
and resolvedOffsets
using OffsetManager.clearOffsets
. In the fetch loop we'd get the consumer offsets from the brokers and from there on it's the same.
Maybe I'm missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Regarding the seek behavior, I would expect it to work the same way, no? Seek would commit (potentially invalid) offsets
Seeking shouldn't commit, only move the "playhead", see #395, so to rely on that behaviour is probably not the thing we want.
Initially I was actually using resolvedOffsets, but that didn't work because OffsetManager.resolveOffsets actually just sets the initialized consumer offsets in committedOffsets, not in resolvedOffsets (did someone mention that our naming is confusing...? 😅).
I guess having to use committedOffsets
is a symptom of there being an issue in there then. Conceptually, it's the resolvedOffsets
(which I understand is the "next to consume offset" or "playhead" for reading the log) that should always exist and the committed offset which is optional, as using Kafka for committing offsets is / should be totally optional (see #395).
Since that seems like a different issue, maybe it's an idea we create a separate issue for it and tag that in a comment. Being able to spot outside of the context of these changes that we conceptually want the resolved offsets rather than committed there might be a lot to ask from our future selves (or others) 😅.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since that seems like a different issue, maybe it's an idea we create a separate issue for it and tag that in a comment.
That sounds like a good idea. I would prefer to do that kind of holistic refactoring in a PR that doesn't actually change any behavior, rather than squeezing it into a bugfix. Could you create that issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Created the issue, trying to preserve the context of this conversation properly: #585. To help the audit suggested in there, I'd suggest linking that issue in a comment above where committedOffsets()
is called.
Pragmatic fix, no locks, 🙌. |
The pre-release version |
Second attempt at solving the issue with concurrent recovery of offset out of range, this time without locks.
Fixes #555
Closes #578