-
Notifications
You must be signed in to change notification settings - Fork 839
Description
Receiving compressed batches from compacted topic sometimes leads to number of errors
In some cases (depends on messages in topic) the consumer (that reads from the first offset) can fall into endless batch reading (with rather high CPU usage), produce empty message, or panic on negative messages cont into batch. In other cases it just read the topic normally.
Kafka Version
3.9
To Reproduce
The reproduction requires to send into topic data with keys from set, for example "A","B","C","D","E","F","G","H" with repeating keys and in randomized order, then switch to another set like "E","F","G","H","I","J","K","L" and continue sending them in randomized order.
After compaction of that flow of data kafka returns some spread batches to consumer. Reading of such spread batches of messages can lead to a number of described above issues or just pass without any issue.
Additional requirement for reproduction - use the flow compression.
package main
import (
"log"
"github.com/segmentio/kafka-go"
)
func main() {
reader := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{<broker_address>},
Topic: <topic>,
GroupID: "test_grop",
StartOffset: kafka.FirstOffset,
QueueCapacity: 1000,
Dialer: &kafka.Dialer{},
}
ctx := context.Background()
for {
msg, err := reader.FetchMessage(ctx)
if err != nil {
panic(err)
}
log.Printf("%v", msg)
}
}Expected Behavior
Reading always have to read all topic data without issues.
Observed Behavior
In some cases the reading from compacted topic leeds to one of following errors:
- endless reading with high CPU consumption
- panic on negative count (in message_reader.go:356) like in this issue
- reading of empty message
Additional Context
I've spent rather a lot of time and with adding additional debug logging into the library I've discovered the reason of that strange behaviour.
The result of my research is in this MR.
Some additional explanations to the MR.
The problem with incorrect counting the messageReader.lengthRemain leads that special handling of compacted batches into batch.go:291 fails (lengthRemain is negative due to incorrect handling on compressed batch message) and the batch.offset is not set to batch.lastOffset +1 but remains set as last read message offset +1 (in batch.go:251).
The usage of last message offset +1 in the next batch reading leads to various issues as last message offset +1 pointing to non existing message into spread batch. Sometimes it pass through the next batch but sometimes it just endlessly read the same batch or panics on negative count or returns empty message.