Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Corruption when receiving message sets with zero messages #653

Closed
Nilix007 opened this issue May 4, 2021 · 3 comments
Closed

Corruption when receiving message sets with zero messages #653

Nilix007 opened this issue May 4, 2021 · 3 comments
Assignees
Labels

Comments

@Nilix007
Copy link

Nilix007 commented May 4, 2021

Describe the bug
We noticed panics very similar to #457 when reading messages with a consumer group.

Stack trace
panic: runtime error: makeslice: len out of range
goroutine 38 [running]:
github.com/segmentio/kafka-go.(*messageSetReaderV2).readMessage(0xc0004fc018, 0x79e519, 0xc0001d3a30, 0xc0001d3a20, 0xc0001d39f4, 0x2, 0x2, 0x10000000000, 0x0, 0x2, ...)
        /go/pkg/mod/github.com/segmentio/kafka-go@v0.4.15/message.go:556 +0x7ba
github.com/segmentio/kafka-go.(*messageSetReader).readMessage(0xc0004fc000, 0x79e519, 0xc0001d3a30, 0xc0001d3a20, 0x2, 0x1759d3d9915, 0x0, 0xc000318be0, 0xc000186780, 0xc0001d3a00, ...)
        /go/pkg/mod/github.com/segmentio/kafka-go@v0.4.15/message.go:140 +0x78
github.com/segmentio/kafka-go.(*Batch).readMessage(0xc0004f8100, 0xc0001d3a30, 0xc0001d3a20, 0xc0001d3a00, 0xc0001d39f0, 0x0, 0x1, 0x1, 0x1, 0x0)
        /go/pkg/mod/github.com/segmentio/kafka-go@v0.4.15/batch.go:228 +0x79
github.com/segmentio/kafka-go.(*Batch).ReadMessage(0xc0004f8100, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, ...)
        /go/pkg/mod/github.com/segmentio/kafka-go@v0.4.15/batch.go:183 +0x11a
github.com/segmentio/kafka-go.(*reader).read(0xc0001d3ed8, 0x789eb8, 0xc0001403c0, 0x79e519, 0xc00068c8c0, 0x79e50a, 0x0, 0x0)
        /go/pkg/mod/github.com/segmentio/kafka-go@v0.4.15/reader.go:1453 +0x425
github.com/segmentio/kafka-go.(*reader).run(0xc0001d3ed8, 0x789eb8, 0xc0001403c0, 0x79e50a)
        /go/pkg/mod/github.com/segmentio/kafka-go@v0.4.15/reader.go:1272 +0x2b2
github.com/segmentio/kafka-go.(*Reader).start.func1(0xc0000d2380, 0x789eb8, 0xc0001403c0, 0x730d1f, 0x1e, 0x9, 0x79e50a, 0xc0000d24b8)
        /go/pkg/mod/github.com/segmentio/kafka-go@v0.4.15/reader.go:1173 +0x1d8
created by github.com/segmentio/kafka-go.(*Reader).start
        /go/pkg/mod/github.com/segmentio/kafka-go@v0.4.15/reader.go:1153 +0x1a5

Fortunately, the issue was reproducible, so we were able to find the culprit. Apparently, our Kafka broker sends out messages sets which contain zero messages. Kafka-go cannot handle these as the current code proceeds to read a message right after reading the header which leads to a desynchronisation of the stream and finally a corruption.

kafka-go/message.go

Lines 466 to 520 in 8462374

func (r *messageSetReaderV2) readMessage(min int64,
key func(*bufio.Reader, int, int) (int, error),
val func(*bufio.Reader, int, int) (int, error),
) (offset int64, timestamp int64, headers []Header, err error) {
if r.messageCount == 0 {
if r.remain == 0 {
if r.parent != nil {
r.readerStack = r.parent
}
}
if err = r.readHeader(); err != nil {
return
}
if code := r.header.compression(); code != 0 {
var codec CompressionCodec
if codec, err = resolveCodec(code); err != nil {
return
}
var batchRemain = int(r.header.length - 49)
if batchRemain > r.remain {
err = errShortRead
return
}
var decompressed bytes.Buffer
decompressed.Grow(4 * batchRemain)
l := io.LimitedReader{R: r.reader, N: int64(batchRemain)}
d := codec.NewReader(&l)
_, err = decompressed.ReadFrom(d)
r.remain = r.remain - (batchRemain - int(l.N))
d.Close()
if err != nil {
return
}
r.readerStack = &readerStack{
reader: bufio.NewReaderSize(&decompressed, 0),
remain: decompressed.Len(),
base: -1, // base is unused here
parent: r.readerStack,
}
}
}
var length int64
if r.remain, err = readVarInt(r.reader, r.remain, &length); err != nil {
return
}

Kafka Version
0.11

@nlsun
Copy link
Contributor

nlsun commented Nov 5, 2021

Hi @Nilix007, it may be possible that we fixed this in #753 which you can try out in version https://github.com/segmentio/kafka-go/releases/tag/v0.4.22

Let us know if this fixes the issue, thanks!

@nlsun
Copy link
Contributor

nlsun commented Feb 9, 2022

While this fix is more for 0 unread messages due to compaction, it has some similar sounding context to this issue, although the compaction issue has never been reported to panic, you could try the fix here #813

@nlsun
Copy link
Contributor

nlsun commented Feb 11, 2022

@Nilix007 we've made some significant changes and bug fixes in this area of the code, please let us know if you're still running into this issue

@nlsun nlsun closed this as completed Feb 11, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants