-
Couldn't load subscription status.
- Fork 836
Description
Describe the bug
Hi, I get an error when reading from the topic. The error occurs with some periodicity, not all the time. After it occurs, kafka reader recreates the connection and continues to work.
Kafka Version
kafka version: v.2.8.1
kafka-go version: v0.4.35
To Reproduce
Here are my sample kafka reader settings:
mechanism, _ := scram.Mechanism(scram.SHA512, username, password)
dialer := &kafka.Dialer{
Timeout: 10 * time.Second,
DualStack: true,
SASLMechanism: mechanism,
}
topic := kafka.TopicConfig{
Topic: topicName,
NumPartitions: 3,
ReplicationFactor: 3,
ConfigEntries: []kafka.ConfigEntry{
{
ConfigName: "retention.bytes",
ConfigValue: strconv.Itoa(-1),
},
{
ConfigName: "retention.ms",
ConfigValue: strconv.Itoa(int((24 * time.Hour).Milliseconds())),
},
},
}
nr := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{kafkaAddress},
Topic: topic.Topic,
GroupID: kafkaGroupName,
WatchPartitionChanges: true,
Dialer: dialer,
MaxWait: 10 * time.Second,
MinBytes: 10e3, // 10KiB
MaxBytes: 10e6, // 10MiB
CommitInterval: time.Second,
Logger: &debugLogger{},
ErrorLogger: &errorLogger{},
})Expected Behavior
No "i/o timeout" error and reconnecting to the kafka topic.
Observed Behavior
The kafka reader logs when a connection timeout error occurs:
[2022-09-22T10:37:24.113][level=ERROR]: the kafka reader got an unknown error reading partition 2 of gt_events at offset 29055: read tcp xxx.xxx.xxx.xxx:xxxx->xxx.xxx.xxx.xxx:xxxx: i/o timeout
[2022-09-22T10:37:24.12][level=ERROR]: the kafka reader got an unknown error reading partition 1 of ht_events at offset 158: read tcp xxx.xxx.xxx.xxx:xxxx->xxx.xxx.xxx.xxx:xxxx: i/o timeout
[2022-09-22T10:37:24.213][level=DEBUG]: initializing kafka reader for partition 2 of gt_events starting at offset 29055
[2022-09-22T10:37:24.221][level=DEBUG]: initializing kafka reader for partition 1 of ht_events starting at offset 158
[2022-09-22T10:37:24.506][level=DEBUG]: the kafka reader for partition 1 of ht_events is seeking to offset 158
[2022-09-22T10:37:24.506][level=DEBUG]: the kafka reader for partition 2 of gt_events is seeking to offset 29055
[2022-09-22T10:44:14.618][level=ERROR]: the kafka reader got an unknown error reading partition 0 of rt_events at offset 35251: read tcp xxx.xxx.xxx.xxx:xxxx->xxx.xxx.xxx.xxx:xxxx: i/o timeout
[2022-09-22T10:44:14.718][level=DEBUG]: initializing kafka reader for partition 0 of rt_events starting at offset 35251
[2022-09-22T10:44:14.812][level=DEBUG]: the kafka reader for partition 0 of rt_events is seeking to offset 35251In normal case, I have this message by ReaderConfig.MaxWait timeout:
[2022-09-23T14:21:03.835][level=DEBUG]: no messages received from kafka within the allocated time for partition 0 of rt_events at offset 162Additional Context
I was able to get the type of error from the log message. It is *net.OpError, which contains *poll.DeadlineExceededError.
I could not find in the code when the connection timeout triggered, as well as the config attribute that is responsible for this.
Deadline for the batch reader is set here: https://github.com/segmentio/kafka-go/blob/v0.4.35/reader.go#L1478
This sets the deadline for reading the message, which is equal to MaxWait - time.Second, as far as I understood from the code: https://github.com/segmentio/kafka-go/blob/v0.4.35/conn.go#L791
How can I increase the timeout of kafka Conn or decrease the timeout of reading the message? I think the problem is that the tcp connection is closed before the reading from the topic is done.
I will be glad to get any advice and suggestions :)