Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix offset when a batch ends with compacted records #813

Merged
merged 20 commits into from
Feb 9, 2022

Conversation

nlsun
Copy link
Contributor

@nlsun nlsun commented Dec 18, 2021

Based off of #807 and also related to #709

Move the fix inside `Batch.readMessage`

The reasoning is that `Batch.readMessage` is normally where we update
the batch offset so the goal is to do this additional update to the
batch offset in the same location.

Also adjusts the test case retry so that the test case may finish
earlier than the previous sleep time while also giving the test more
time to finish to reduce flakiness.

The test case also now ensures that the final state is fully compacted
before declaring success.

The solution has been updated to jump to the last offset as indicated by the response header. In addition a test is added to ensure that the new way of moving offsets still works when the message is truncated.

@nlsun
Copy link
Contributor Author

nlsun commented Dec 18, 2021

@iddqdeika I made a few tweaks to your original PR #807, thoughts on the changes?

Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Spectacular 🙌

Thanks for your thorough investigation into this issue @nlsun and the fix looks great as well 👍

reader_test.go Outdated
//
// This test forces varying sized chunks of duplicated messages along with
// configuring the topic with a minimal `segment.bytes` in order to
// guarantee that at least 1 batch can be compacted down to 0 messages.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems to me that there cannot be batch without messages. Our case here is that can be batch without new (unread by current conn) messages at the end of the batch. If kafka's segment was compacted down to 0 messages, it would be skipped during Fetch and kafka would try to return batch from next segment.

Test case here leads to segment been compacted to at least 1 message, but not at the end of segment. I tried to create fully compacted segments and they were skipped by kafka during fetch..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the clarification, made some updates to the comments

@iddqdeika
Copy link
Contributor

@nlsun thanks! Finally it looks great =) I'm proud to be the part of this fix

We're waiting very much for fix to be merged, because we need compaction in upcoming features (in our proj) =D

@leedavis81
Copy link

leedavis81 commented Dec 21, 2021

@nlsun just wanted to confirm that this does resolve the initial issue I reported in #709 however I do have an instance where a batch has multiple compacted records within it, and the consumer fails to advanced (with this patch).

You can see from the headers here I have a record count of 1, but a lastOffsetDelta of 3...

headers {firstOffset:33268272 length:7160 crc:-909961870 magic:2 v1:{attributes:0 timestamp:0} v2:{leaderEpoch:59 attributes:0 lastOffsetDelta:3 firstTimestamp:1636184302331 lastTimestamp:1636184302331 producerID:-1 producerEpoch:-1 baseSequence:0 count:1}}

It seems the problem surrounds the fact that even though the offset is advanced in the first instance batch.offset++ the subsequent missing record doesn't trigger an error and offset is moved back with batch.offset = offset + 1 (where offset is returned from the batch.msgs.readMessage call and is always r.header.firstOffset + offsetDelta. Because we've only read a single message that will always be 33268273 in this instance.

how difficult would it be to accommodate for this scenario in this change?

batch.go Outdated
// reading a message but since there were no messages to read we
// update it now instead.
if batch.offset == batch.conn.offset {
batch.offset++
Copy link

@leedavis81 leedavis81 Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where a batch contains two compacted records at its tail advancing the offset by one will cause a subsequent read to retrieve the same batch (as we've not fully passed it). When that batch is processed a second time, it will read a valid record (no error will be returned) and our batch.offset will be moved backwards as we fall into the case nil: and the following is applied: batch.offset = offset + 1.

I think the solution we want requires us to advance to offset + lastOffsetDelta as determined by the header, however there could be a risk here of missing records should compacted entries reside mid-batch (would need to research whether that was possible)

Copy link

@leedavis81 leedavis81 Dec 21, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One other option could be to only advance the batch.offset under the err = nil case where <= to the returned offset (meaning all batch.offset++ increments in the errShortRead case will be preserved)

		if batch.offset <= offset {
			batch.offset = offset + 1
		}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leedavis81 If I understand correctly, we only enter the errShortRead state when we reach the end of a batch so advancing the offset by 1 should advance us to the next batch.

I think (not certain) that compacted records in the middle of a batch will only show up in the offset delta and won't cause a errShortRead.

I wonder if this new scenario is a different one? Are there any more details you could dig up about where the reader is getting stuck and what the kafka log segment looks like? I tried playing around with the unit test in this PR by increasing the segment size and also varying the duplicated messages but haven't been able to get stuck anywhere else yet.

Copy link

@leedavis81 leedavis81 Dec 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nlsun So the scenario you want is a batch with 3 records, where two are compacted at the end. What happens here is the consumer reads record 1 OK, then moves onto record 2 where an errShortRead is triggered and you fall into the case where no records remain and the batch.offset is advanced (as per the fix in this PR).

As no messages reside in the batch, attempts to read batch+3 fail and a request is made to the broker for that offset. The broker then returns the same batch (as that's where the compacted entry resides) and the consumer begins to read that (still with batch.offset set to offset+3). The first read succeeds and returns a new offset with no error, falling into the err = nil case the batch.offset is then moved back to offset+1 and the cycle repeats.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nlsun this is a fix I used today to resolve the production issues we were experiencing.. https://github.com/leedavis81/kafka-go/pull/1/files

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Your understanding is spot on, and moving that to a seperate issue is perfectly fine. I do intend on testing your branch shortly, just haven't had the time to do it yet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I was talking to someone about this and they got me thinking about lastOffsetDelta again, and I remembered that in the debug branch it was always 1 which I tried to change before but gave up before. This time I tried changing the BatchSize on the writer from 2 to 3 and bam it got stuck on what hopefully is what you're running into, will let you know when I fix this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leedavis81 alright, hoping the million-th time's the charm eh? Could you try out this newly updated PR on your test case?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'm running this test now, should have some feedback shortly

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @nlsun , I can confirm this patch resolved both instances where we had single and multiple compacted records at the tail of a batch. Nice work!

This fix would close out #709 (and a handful of other PRs by the looks of it)

@iddqdeika
Copy link
Contributor

seems this pull will never get merget this way =(

this pushes us to use sarama (which simply counts new messages from batch), but sarama's API makes me sad...

@nlsun
Copy link
Contributor Author

nlsun commented Jan 24, 2022

@iddqdeika if you are ok with using a branch you can make one using the solution in https://github.com/segmentio/kafka-go/compare/fix-batch-offset-advance-nlsunsuggestion-multicompact-test?expand=1 (you'll want to remove the print statements), I'll try and finish the regression test for that branch this week but I can't promise it

You can alternatively make a branch with the other suggested solution as a temporary solution https://github.com/leedavis81/kafka-go/pull/1/files

@achille-roussel achille-roussel self-assigned this Jan 28, 2022
@nlsun nlsun dismissed achille-roussel’s stale review February 4, 2022 19:40

we found a new issue that will change the implementation

@nlsun nlsun force-pushed the fix-batch-offset-advance-nlsunsuggestion branch from 1571671 to f8b1f38 Compare February 8, 2022 01:46
Copy link
Contributor

@achille-roussel achille-roussel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Really nice work, and the implementation is neat.

Ship it!

bolshakov and others added 17 commits February 9, 2022 10:25
…ages (because of compaction delete) as sarama does
…ages (because of compaction delete) as sarama does (comment added)
The reasoning is that `Batch.readMessage` is normally where we update
the batch offset so the goal is to do this additional update to the
batch offset in the same location.

Also adjusts the test case retry so that the test case may finish
earlier than the previous sleep time while also giving the test more
time to finish to reduce flakiness.

The test case also now ensures that the final state is fully compacted
before declaring success.
Also a test to ensure that truncation due to MaxBytes is
still handled properly.
@nlsun nlsun force-pushed the fix-batch-offset-advance-nlsunsuggestion branch from 200a0d1 to cd49f8a Compare February 9, 2022 18:26
@nlsun nlsun merged commit 999d0b3 into main Feb 9, 2022
@nlsun nlsun deleted the fix-batch-offset-advance-nlsunsuggestion branch February 9, 2022 18:37
@nlsun nlsun changed the title Fix batch offset when a batch has 0 messages Fix offset when a batch ends with compacted records Feb 9, 2022
@nlsun
Copy link
Contributor Author

nlsun commented Feb 10, 2022

@leedavis81 thanks for your patience and all the help testing
@iddqdeika thanks for your patience and providing the unit test
@achille-roussel thanks for the reviews and suggestions

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants