Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reader doesn't recover (quickly) from OffsetOutOfRange "Offset Out Of Range" #891

Closed
wwade opened this issue Apr 21, 2022 · 1 comment · Fixed by #917
Closed

reader doesn't recover (quickly) from OffsetOutOfRange "Offset Out Of Range" #891

wwade opened this issue Apr 21, 2022 · 1 comment · Fixed by #917
Assignees
Labels

Comments

@wwade
Copy link
Contributor

wwade commented Apr 21, 2022

Describe the bug

I had a case where the kafka topic was recreated. I am not using a consumer group for this topic, and so it reports, e.g.

error initializing the kafka reader for partition 0 of SOMETOPIC: [1] Offset Out Of Range: the requested offset is outside the range of offsets maintained by the server for the given topic/partition

Kafka Version

  • What version(s) of Kafka are you testing against?

3.0.0

  • What version of kafka-go are you using?

0.4.30

To Reproduce

  1. docker-compose up -d
  2. go run ./consumer.go
  3. wait for the messages to get processed, leave it running
  4. docker-compose rm -vfs kafka; docker-compose up -d
  5. the topic was recreated, the reader is in a perpetual error loop
  6. produce a single message, this message will not be received as its offset will be 0 again.

Resources to reproduce the behavior:

docker-compose.yaml

---
version: '3.4'

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    network_mode: host
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_ADMIN_SERVER_PORT: 10880

  kafka:
    hostname: localhost
    depends_on:
      - zookeeper
    image: confluentinc/cp-kafka:latest
    network_mode: host
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: localhost:2181
      KAFKA_ADVERTISED_LISTENERS: PRIMARY://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PRIMARY:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PRIMARY
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

consumer.go

package main

import (
	"context"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/segmentio/kafka-go"
)

const broker = "localhost:9092"
const topic = "my-topic"

func produce(ctx context.Context) error {
	writer := kafka.NewWriter(kafka.WriterConfig{
		Brokers:   []string{broker},
		Topic:     topic,
		BatchSize: 1,
		Logger:    kafka.LoggerFunc(log.New(log.Writer(), "[kafka writer] ", 0).Printf),
	})

	const count = 100
	msgs := make([]kafka.Message, count)
	for i := 0; i < count; i++ {
		msgs[i] = kafka.Message{
			Partition: 0,
			Key:       []byte{byte(i)},
			Value:     []byte(fmt.Sprintf("message %v", i)),
		}
	}
	return writer.WriteMessages(ctx, msgs...)
}

func runner() int {
	ctx, cancel := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
	defer cancel()

	if err := produce(ctx); err != nil {
		log.Printf("produce: %v", err)
		return 1
	}

	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{broker},
		Topic:     topic,
		Partition: 0,
		Logger:    kafka.LoggerFunc(log.New(log.Writer(), "[kafka reader] ", 0).Printf),
	})

	defer func() {
		if err := reader.Close(); err != nil {
			panic(err)
		}
	}()
	for {
		select {
		case <-ctx.Done():
			return 0
		default:
		}
		msg, err := reader.ReadMessage(ctx)
		if err != nil {
			log.Printf("ReadMessage: %v", err)
			time.Sleep(1 * time.Second)
			continue
		}
		log.Printf("offset=%v key=%v value=%q", msg.Offset, msg.Key, string(msg.Value))
	}
}

func main() {
	os.Exit(runner())
}

Expected Behavior

A clear and concise description of what you expected to happen.

Either the ReadMessage call should return with an error, or the reader should gracefully restart at a valid offset.

Observed Behavior

A clear and concise description of the behavior you observed.

Perpetual output of error initializing the kafka reader for partition 0 message, reader never resumes reading until the offset is once again where it was (this could be a matter of days).

[kafka reader] initializing kafka reader for partition 0 of my-topic starting at offset 100
[kafka reader] the kafka reader for partition 0 of my-topic is seeking to offset 100
[kafka reader] error initializing the kafka reader for partition 0 of my-topic: [1] Offset Out Of Range: the requested offset is outside the range of offsets maintained by the server for the given topic/partition
[kafka reader] initializing kafka reader for partition 0 of my-topic starting at offset 100
[kafka reader] the kafka reader for partition 0 of my-topic is seeking to offset 100
[kafka reader] error initializing the kafka reader for partition 0 of my-topic: [1] Offset Out Of Range: the requested offset is outside the range of offsets maintained by the server for the given topic/partition
[kafka reader] initializing kafka reader for partition 0 of my-topic starting at offset 100
[kafka reader] the kafka reader for partition 0 of my-topic is seeking to offset 100
[kafka reader] error initializing the kafka reader for partition 0 of my-topic: [1] Offset Out Of Range: the requested offset is outside the range of offsets maintained by the server for the given topic/partition
[kafka reader] initializing kafka reader for partition 0 of my-topic starting at offset 100
[kafka reader] the kafka reader for partition 0 of my-topic is seeking to offset 100
[kafka reader] error initializing the kafka reader for partition 0 of my-topic: [1] Offset Out Of Range: the requested offset is outside the range of offsets maintained by the server for the given topic/partition
...

Additional Context
probelmatic code from reader.go

func (r *reader) run(ctx context.Context, offset int64) {
...
		conn, start, err := r.initialize(ctx, offset)
		switch err {
		case nil:
		case OffsetOutOfRange:
			// This would happen if the requested offset is passed the last
			// offset on the partition leader. In that case we're just going
			// to retry later hoping that enough data has been produced.
			r.withErrorLogger(func(log Logger) {
				log.Printf("error initializing the kafka reader for partition %d of %s: %s", r.partition, r.topic, OffsetOutOfRange)
			})
			continue

@wwade wwade added the bug label Apr 21, 2022
wwade added a commit to wwade/kafka-go that referenced this issue Apr 21, 2022
Provide a new configuration `AutoOffsetReset` that can be used to
reset the non-group-id reader's offset when the topic has been
recreated (or the current offset is no longer valid for any reason).
The `AutoOffsetReset` can have a value of 0 (no action), or
FirstOffset / LastOffset.

Added a new test case in `TestReader` to verify that, on setting the
offset initially past the last offset in the topic, the reader will
restart from the beginning once it detects this OffsetOutOfRange
error, instead of perpetually reporting OffsetOutOfRange and waiting
for the producer to catch up.

Fixes segmentio#891.
wwade added a commit to wwade/kafka-go that referenced this issue Apr 21, 2022
Provide a new configuration `AutoOffsetReset` that can be used to
reset the non-group-id reader's offset when the topic has been
recreated (or the current offset is no longer valid for any reason).
The `AutoOffsetReset` can have a value of 0 (no action), or
FirstOffset / LastOffset.

Added a new test case in `TestReader` to verify that, on setting the
offset initially past the last offset in the topic, the reader will
restart from the beginning once it detects this OffsetOutOfRange
error, instead of perpetually reporting OffsetOutOfRange and waiting
for the producer to catch up.

Fixes segmentio#891.
wwade added a commit to wwade/kafka-go that referenced this issue Apr 21, 2022
Provide a new configuration `AutoOffsetReset` that can be used to
reset the non-group-id reader's offset when the topic has been
recreated (or the current offset is no longer valid for any reason).
The `AutoOffsetReset` can have a value of 0 (no action), or
FirstOffset / LastOffset.

Added a new test case in `TestReader` to verify that, on setting the
offset initially past the last offset in the topic, the reader will
restart from the beginning once it detects this OffsetOutOfRange
error, instead of perpetually reporting OffsetOutOfRange and waiting
for the producer to catch up.

Fixes segmentio#891.
wwade added a commit to wwade/kafka-go that referenced this issue Apr 30, 2022
Provide a new configuration `AutoOffsetReset` that can be used to
reset the non-group-id reader's offset when the topic has been
recreated (or the current offset is no longer valid for any reason).
The `AutoOffsetReset` can have a value of 0 (no action), or
FirstOffset / LastOffset.

Added a new test case in `TestReader` to verify that, on setting the
offset initially past the last offset in the topic, the reader will
restart from the beginning once it detects this OffsetOutOfRange
error, instead of perpetually reporting OffsetOutOfRange and waiting
for the producer to catch up.

Fixes segmentio#891.
@dominicbarnes
Copy link
Contributor

What's the use-case for having a topic re-created on the fly? This seems like it might be mostly useful during development, but that is just speculation on my part.

With that in mind, I'm more inclined to return an error (allowing the program to crash and restart) than to include logic that will automatically recover, as it seems to be an edge-case.

I'll bring this up on the PR as well, we can have further discussion there.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
2 participants