Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lock mutations to offset manager #578

Closed
wants to merge 2 commits into from

Conversation

tulios
Copy link
Owner

@tulios tulios commented Dec 4, 2019

Fixes #567 #555

What's missing in this PR?

  • Make the lock timeout configurable
  • Set a default based on the number of partitions assigned to the consumer

@tulios tulios requested a review from JaapRood December 4, 2019 15:26
@Nevon
Copy link
Collaborator

Nevon commented Dec 4, 2019

For context for other readers, what we've seen happening is that a consumer group commits offsets for some partitions and then shuts down. The retention time for the topic is such that the currently committed offsets for the consumer are no longer "valid" (i.e. they have been deleted due to retention).

When the consumer starts up again, it's assigned 2 partitions and calls resolveOffsets. What this does is that it detects that we have partitions where we don't know the current offset, so we call the broker and get the last offsets that were committed. So the state of the local resolved offsets at this point is:

Partition Offset
1 10
2 10

Then we make parallel fetch requests to the 2 brokers. Whichever one happens first gives back an OffsetOutOfRange error, which causes the consumer to reset the offset. This commits the default offset for the partition (example: -1) and clears the locally resolved offset for that partition (delete resolvedOffsets[topic][partition]).

Partition Offset
1 undefined
2 10

Note that at this point on the server partition 1 has committed the "latest offset" (-1 is a magic value meaning latest offset). It's just locally that it's undefined.

After recovering from this error, it schedules a new fetch. Again, there's a resolveOffset that happens. This time it gets the updated offset from the broker, so the locally resolved offsets are now:

Partition Offset
1 52
2 10

While this is happening, the second promise that's still ongoing gets the same error (this time for partition 2). It also commits the default offset and deletes the locally resolved offset. The locally resolved offsets are now:

Partition Offset
1 52
2 undefined

Yielding back to the "new" fetch, it tries to calculate the next offsets to fetch for. It does this calculation by basically taking the locally resolved offsets and incrementing them. However, the offset for partition 2 is undefined, so it fails to increment it and crashes.

The proposed solution at the moment is to lock mutation to the offset manager's local state. We basically have to lock between the moment that you call resolveOffsets and setDefaultOffsets, so that you can either be reading and using the offsets, or clearing them, but once you have read the offsets you cannot clear them until you have used them.

@Nevon
Copy link
Collaborator

Nevon commented Dec 4, 2019

Trying to wrap our minds around what's going on:

IMG_20191204_170330

Copy link
Collaborator

@JaapRood JaapRood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I read "lock" and alarm bells go off 🚨, even though I know there are situations (like row-level locks for snapshot isolation in databases) where they are the right tool for the job.

Just to check my understanding is right, let me try to rephrase the problem. It's the fetching of invalid offsets that's causing the crash. These invalid offsets are set as part of an async recovery mechanism. Normally a fetch is guarded by resolving any logical offsets (-1 in this case), but because this is asynchronous, there is time in between this resolving process and actual determination of which offsets to fetch for one of those offsets to be set back to something invalid again.

If that's right, my instinct would not be to implement locks, but to maintain 2 lists of offsets: those in need of being resolved (logical offsets) and those ready to be fetched (absolute offsets). At the beginning of a fetch, we can attempt to resolve all logical offsets in the list at that time, to move them to the absolute list. If any offsets are invalidated, they are removed from the absolute list and added to the logical list. Key is that the list of abstract offsets actually contains ever only that: abstract offsets, moving the recovery mechanism out of it.

Am I missing something that would make that a bad idea and necessitates locking?

@tulios
Copy link
Owner Author

tulios commented Dec 5, 2019

@JaapRood I agree with you, I think this was the first attempt after a long afternoon debugging this problem 😅

We decided to create the PR to allow others to try out, but now that I understand the problem better I would take another route.

@JaapRood
Copy link
Collaborator

JaapRood commented Dec 5, 2019

@tulios working on a problem and trying to bite of a chunk is almost always the way to really understand a problem and by proxy have others be able to understand it, so we'd be nowhere without all that hard work 🙏👏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants