From bcb9180fc3da90c43c12104b1224f0c5a3bad530 Mon Sep 17 00:00:00 2001 From: Dominic Barnes Date: Thu, 28 Jan 2021 17:24:06 -0800 Subject: [PATCH 1/6] feat(reader): allow consuming from multiple topics with a group --- go.sum | 1 + kafka_test.go | 14 +++++++++++++ reader.go | 31 +++++++++++++++++++++------- reader_test.go | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 94 insertions(+), 7 deletions(-) diff --git a/go.sum b/go.sum index 411e17969..78511c0c5 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka_test.go b/kafka_test.go index f66855cd7..a82e844aa 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -171,3 +171,17 @@ func BenchmarkUnmarshal(b *testing.B) { }) } } + +type testKafkaLogger struct { + Prefix string + T *testing.T +} + +func newTestKafkaLogger(t *testing.T, prefix string) Logger { + return &testKafkaLogger{Prefix: prefix, T: t} +} + +func (l *testKafkaLogger) Printf(msg string, args ...interface{}) { + l.T.Helper() + l.T.Logf(l.Prefix+" "+msg, args...) +} diff --git a/reader.go b/reader.go index 22e0bd55b..fbcdaded0 100644 --- a/reader.go +++ b/reader.go @@ -87,6 +87,14 @@ type Reader struct { // useConsumerGroup indicates whether the Reader is part of a consumer group. func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" } +func (r *Reader) getConusmerGroupTopics() []string { + if len(r.config.GroupTopics) > 0 { + return r.config.GroupTopics[:] + } + + return []string{r.config.Topic} +} + // useSyncCommits indicates whether the Reader is configured to perform sync or // async commits. func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 } @@ -330,6 +338,11 @@ type ReaderConfig struct { // Partition should NOT be specified e.g. 0 GroupID string + // GroupTopics allows specifying multiple topics, but can only be used in + // combination with GroupID, as it is a consumer-group feature. As such, if + // GroupID is set, then either Topic or GroupTopics must be defined. + GroupTopics []string + // The topic to read messages from. Topic string @@ -473,10 +486,6 @@ func (config *ReaderConfig) Validate() error { return errors.New("cannot create a new kafka reader with an empty list of broker addresses") } - if len(config.Topic) == 0 { - return errors.New("cannot create a new kafka reader with an empty topic") - } - if config.Partition < 0 || config.Partition >= math.MaxInt32 { return errors.New(fmt.Sprintf("partition number out of bounds: %d", config.Partition)) } @@ -489,8 +498,16 @@ func (config *ReaderConfig) Validate() error { return errors.New(fmt.Sprintf("invalid negative maximum batch size (max = %d)", config.MaxBytes)) } - if config.GroupID != "" && config.Partition != 0 { - return errors.New("either Partition or GroupID may be specified, but not both") + if config.GroupID != "" { + if config.Partition != 0 { + return errors.New("either Partition or GroupID may be specified, but not both") + } + + if len(config.Topic) == 0 && len(config.GroupTopics) == 0 { + return errors.New("either Topic or GroupTopics must be specified with GroupID") + } + } else if len(config.Topic) == 0 { + return errors.New("cannot create a new kafka reader with an empty topic") } if config.MinBytes > config.MaxBytes { @@ -660,7 +677,7 @@ func NewReader(config ReaderConfig) *Reader { ID: r.config.GroupID, Brokers: r.config.Brokers, Dialer: r.config.Dialer, - Topics: []string{r.config.Topic}, + Topics: r.getConusmerGroupTopics(), GroupBalancers: r.config.GroupBalancers, HeartbeatInterval: r.config.HeartbeatInterval, PartitionWatchInterval: r.config.PartitionWatchInterval, diff --git a/reader_test.go b/reader_test.go index 1324483d4..f1e73f3eb 100644 --- a/reader_test.go +++ b/reader_test.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "fmt" "io" "math/rand" "net" @@ -1342,6 +1343,60 @@ func TestConsumerGroupWithMissingTopic(t *testing.T) { } } +func TestConsumerGroupWithMultpleTopics(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conf := ReaderConfig{ + Brokers: []string{"localhost:9092"}, + GroupID: makeGroupID(), + GroupTopics: []string{makeTopic()}, + MaxWait: time.Second, + PartitionWatchInterval: 100 * time.Millisecond, + WatchPartitionChanges: true, + Logger: newTestKafkaLogger(t, "Reader:"), + } + + r := NewReader(conf) + defer r.Close() + + recvErr := make(chan error, len(conf.GroupTopics)) + go func() { + msg, err := r.ReadMessage(ctx) + t.Log(msg) + recvErr <- err + }() + + time.Sleep(conf.MaxWait) + + for i, topic := range conf.GroupTopics { + client, shutdown := newLocalClientWithTopic(topic, 1) + defer shutdown() + + w := &Writer{ + Addr: TCP(r.config.Brokers...), + Topic: topic, + BatchTimeout: 10 * time.Millisecond, + BatchSize: 1, + Transport: client.Transport, + Logger: newTestKafkaLogger(t, fmt.Sprintf("Writer(%d):", i)), + } + defer w.Close() + if err := w.WriteMessages(ctx, Message{Value: []byte(topic)}); err != nil { + t.Fatalf("write error: %+v", err) + } + } + + if err := <-recvErr; err != nil { + t.Fatalf("read error: %+v", err) + } + + nMsgs := r.Stats().Messages + if nMsgs != int64(len(conf.GroupTopics)) { + t.Fatalf("expected to receive %d messages, but got %d", len(conf.GroupTopics), nMsgs) + } +} + func getOffsets(t *testing.T, config ReaderConfig) map[int]int64 { // minimal config required to lookup coordinator cg := ConsumerGroup{ From f77dd3a842802c63e1c12d7bb08ea84d9c7b5f7e Mon Sep 17 00:00:00 2001 From: Dominic Barnes Date: Thu, 28 Jan 2021 17:59:49 -0800 Subject: [PATCH 2/6] test(reader): add test case for consumer group and single topic --- reader_test.go | 54 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 53 insertions(+), 1 deletion(-) diff --git a/reader_test.go b/reader_test.go index f1e73f3eb..0b5d6f246 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1343,6 +1343,58 @@ func TestConsumerGroupWithMissingTopic(t *testing.T) { } } +func TestConsumerGroupWithSingleTopic(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conf := ReaderConfig{ + Brokers: []string{"localhost:9092"}, + GroupID: makeGroupID(), + Topic: makeTopic(), + MaxWait: time.Second, + PartitionWatchInterval: 100 * time.Millisecond, + WatchPartitionChanges: true, + Logger: newTestKafkaLogger(t, "Reader:"), + } + + r := NewReader(conf) + defer r.Close() + + recvErr := make(chan error, len(conf.GroupTopics)) + go func() { + msg, err := r.ReadMessage(ctx) + t.Log(msg) + recvErr <- err + }() + + time.Sleep(conf.MaxWait) + + client, shutdown := newLocalClientWithTopic(conf.Topic, 1) + defer shutdown() + + w := &Writer{ + Addr: TCP(r.config.Brokers...), + Topic: conf.Topic, + BatchTimeout: 10 * time.Millisecond, + BatchSize: 1, + Transport: client.Transport, + Logger: newTestKafkaLogger(t, "Writer:"), + } + defer w.Close() + if err := w.WriteMessages(ctx, Message{Value: []byte(conf.Topic)}); err != nil { + t.Fatalf("write error: %+v", err) + } + + if err := <-recvErr; err != nil { + t.Fatalf("read error: %+v", err) + } + + nMsgs := r.Stats().Messages + if nMsgs != 1 { + t.Fatalf("expected to receive 1 message, but got %d", nMsgs) + } +} + func TestConsumerGroupWithMultpleTopics(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -1350,7 +1402,7 @@ func TestConsumerGroupWithMultpleTopics(t *testing.T) { conf := ReaderConfig{ Brokers: []string{"localhost:9092"}, GroupID: makeGroupID(), - GroupTopics: []string{makeTopic()}, + GroupTopics: []string{makeTopic(), makeTopic()}, MaxWait: time.Second, PartitionWatchInterval: 100 * time.Millisecond, WatchPartitionChanges: true, From de6c291b07b2850e61f34e858d6544fb41e9d773 Mon Sep 17 00:00:00 2001 From: Dominic Barnes Date: Thu, 28 Jan 2021 21:02:30 -0800 Subject: [PATCH 3/6] test(reader): add more test cases around topic --- reader_test.go | 58 ++++++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 56 insertions(+), 2 deletions(-) diff --git a/reader_test.go b/reader_test.go index 0b5d6f246..9bcecee0a 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1343,7 +1343,7 @@ func TestConsumerGroupWithMissingTopic(t *testing.T) { } } -func TestConsumerGroupWithSingleTopic(t *testing.T) { +func TestConsumerGroupWithTopic(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -1395,7 +1395,61 @@ func TestConsumerGroupWithSingleTopic(t *testing.T) { } } -func TestConsumerGroupWithMultpleTopics(t *testing.T) { +func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conf := ReaderConfig{ + Brokers: []string{"localhost:9092"}, + GroupID: makeGroupID(), + GroupTopics: []string{makeTopic()}, + MaxWait: time.Second, + PartitionWatchInterval: 100 * time.Millisecond, + WatchPartitionChanges: true, + Logger: newTestKafkaLogger(t, "Reader:"), + } + + r := NewReader(conf) + defer r.Close() + + recvErr := make(chan error, len(conf.GroupTopics)) + go func() { + msg, err := r.ReadMessage(ctx) + t.Log(msg) + recvErr <- err + }() + + time.Sleep(conf.MaxWait) + + for i, topic := range conf.GroupTopics { + client, shutdown := newLocalClientWithTopic(topic, 1) + defer shutdown() + + w := &Writer{ + Addr: TCP(r.config.Brokers...), + Topic: topic, + BatchTimeout: 10 * time.Millisecond, + BatchSize: 1, + Transport: client.Transport, + Logger: newTestKafkaLogger(t, fmt.Sprintf("Writer(%d):", i)), + } + defer w.Close() + if err := w.WriteMessages(ctx, Message{Value: []byte(topic)}); err != nil { + t.Fatalf("write error: %+v", err) + } + } + + if err := <-recvErr; err != nil { + t.Fatalf("read error: %+v", err) + } + + nMsgs := r.Stats().Messages + if nMsgs != int64(len(conf.GroupTopics)) { + t.Fatalf("expected to receive %d messages, but got %d", len(conf.GroupTopics), nMsgs) + } +} + +func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() From bed793a1576d94c1f0c1290a80585599fa02f2cf Mon Sep 17 00:00:00 2001 From: Dominic Barnes Date: Fri, 5 Feb 2021 09:25:47 -0800 Subject: [PATCH 4/6] try to implement multiple topics in the reader --- reader.go | 18 +++++++------- reader_test.go | 64 ++++++++++++++++++++++++++++++-------------------- 2 files changed, 49 insertions(+), 33 deletions(-) diff --git a/reader.go b/reader.go index fbcdaded0..3ecb333d4 100644 --- a/reader.go +++ b/reader.go @@ -112,18 +112,18 @@ func (r *Reader) unsubscribe() { // another consumer to avoid such a race. } -func (r *Reader) subscribe(assignments []PartitionAssignment) { +func (r *Reader) subscribe(topic string, assignments []PartitionAssignment) { offsetsByPartition := make(map[int]int64) for _, assignment := range assignments { offsetsByPartition[assignment.ID] = assignment.Offset } r.mutex.Lock() - r.start(offsetsByPartition) + r.start(topic, offsetsByPartition) r.mutex.Unlock() r.withLogger(func(l Logger) { - l.Printf("subscribed to partitions: %+v", offsetsByPartition) + l.Printf("subscribed to topic %s partitions: %+v", topic, offsetsByPartition) }) } @@ -310,7 +310,9 @@ func (r *Reader) run(cg *ConsumerGroup) { r.stats.rebalances.observe(1) - r.subscribe(gen.Assignments[r.config.Topic]) + for _, topic := range r.getConusmerGroupTopics() { + r.subscribe(topic, gen.Assignments[topic]) + } gen.Start(func(ctx context.Context) { r.commitLoop(ctx, gen) @@ -771,7 +773,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) { r.mutex.Lock() if !r.closed && r.version == 0 { - r.start(map[int]int64{r.config.Partition: r.offset}) + r.start(r.config.Topic, map[int]int64{r.config.Partition: r.offset}) } version := r.version @@ -985,7 +987,7 @@ func (r *Reader) SetOffset(offset int64) error { r.offset = offset if r.version != 0 { - r.start(map[int]int64{r.config.Partition: r.offset}) + r.start(r.config.Topic, map[int]int64{r.config.Partition: r.offset}) } r.activateReadLag() @@ -1114,7 +1116,7 @@ func (r *Reader) readLag(ctx context.Context) { } } -func (r *Reader) start(offsetsByPartition map[int]int64) { +func (r *Reader) start(topic string, offsetsByPartition map[int]int64) { if r.closed { // don't start child reader if parent Reader is closed return @@ -1136,7 +1138,7 @@ func (r *Reader) start(offsetsByPartition map[int]int64) { logger: r.config.Logger, errorLogger: r.config.ErrorLogger, brokers: r.config.Brokers, - topic: r.config.Topic, + topic: topic, partition: partition, minBytes: r.config.MinBytes, maxBytes: r.config.MaxBytes, diff --git a/reader_test.go b/reader_test.go index 9bcecee0a..0a807276e 100644 --- a/reader_test.go +++ b/reader_test.go @@ -1453,6 +1453,9 @@ func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() + client, shutdown := newLocalClient() + defer shutdown() + conf := ReaderConfig{ Brokers: []string{"localhost:9092"}, GroupID: makeGroupID(), @@ -1464,37 +1467,48 @@ func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) { } r := NewReader(conf) - defer r.Close() - recvErr := make(chan error, len(conf.GroupTopics)) + w := &Writer{ + Addr: TCP(r.config.Brokers...), + BatchTimeout: 10 * time.Millisecond, + BatchSize: 1, + Transport: client.Transport, + Logger: newTestKafkaLogger(t, "Writer:"), + } + defer w.Close() + + time.Sleep(time.Second) + + var msgs []Message + for _, topic := range conf.GroupTopics { + msgs = append(msgs, Message{Topic: topic}) + } + if err := w.WriteMessages(ctx, msgs...); err != nil { + t.Logf("write error: %+v", err) + } + + wg := new(sync.WaitGroup) + wg.Add(len(msgs)) + go func() { - msg, err := r.ReadMessage(ctx) - t.Log(msg) - recvErr <- err + wg.Wait() + t.Log("closing reader") + r.Close() }() - time.Sleep(conf.MaxWait) - - for i, topic := range conf.GroupTopics { - client, shutdown := newLocalClientWithTopic(topic, 1) - defer shutdown() + for { + msg, err := r.ReadMessage(ctx) + if err != nil { + if err == io.EOF { + t.Log("reader closed") + break + } - w := &Writer{ - Addr: TCP(r.config.Brokers...), - Topic: topic, - BatchTimeout: 10 * time.Millisecond, - BatchSize: 1, - Transport: client.Transport, - Logger: newTestKafkaLogger(t, fmt.Sprintf("Writer(%d):", i)), + t.Fatalf("read error: %+v", err) + } else { + t.Logf("message read: %+v", msg) + wg.Done() } - defer w.Close() - if err := w.WriteMessages(ctx, Message{Value: []byte(topic)}); err != nil { - t.Fatalf("write error: %+v", err) - } - } - - if err := <-recvErr; err != nil { - t.Fatalf("read error: %+v", err) } nMsgs := r.Stats().Messages From a4a1ae2296906ea1f460c0bfb137f02c544677a1 Mon Sep 17 00:00:00 2001 From: Dominic Barnes Date: Fri, 5 Feb 2021 09:49:59 -0800 Subject: [PATCH 5/6] fix multiple topics with consumer group --- reader.go | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/reader.go b/reader.go index 3ecb333d4..514e33354 100644 --- a/reader.go +++ b/reader.go @@ -112,18 +112,24 @@ func (r *Reader) unsubscribe() { // another consumer to avoid such a race. } -func (r *Reader) subscribe(topic string, assignments []PartitionAssignment) { - offsetsByPartition := make(map[int]int64) - for _, assignment := range assignments { - offsetsByPartition[assignment.ID] = assignment.Offset +func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) { + offsets := make(map[topicPartition]int64) + for topic, assignments := range allAssignments { + for _, assignment := range assignments { + key := topicPartition{ + topic: topic, + partition: int32(assignment.ID), + } + offsets[key] = assignment.Offset + } } r.mutex.Lock() - r.start(topic, offsetsByPartition) + r.start(offsets) r.mutex.Unlock() r.withLogger(func(l Logger) { - l.Printf("subscribed to topic %s partitions: %+v", topic, offsetsByPartition) + l.Printf("subscribed to topics and partitions: %+v", offsets) }) } @@ -310,9 +316,7 @@ func (r *Reader) run(cg *ConsumerGroup) { r.stats.rebalances.observe(1) - for _, topic := range r.getConusmerGroupTopics() { - r.subscribe(topic, gen.Assignments[topic]) - } + r.subscribe(gen.Assignments) gen.Start(func(ctx context.Context) { r.commitLoop(ctx, gen) @@ -773,7 +777,8 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) { r.mutex.Lock() if !r.closed && r.version == 0 { - r.start(r.config.Topic, map[int]int64{r.config.Partition: r.offset}) + key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)} + r.start(map[topicPartition]int64{key: r.offset}) } version := r.version @@ -987,7 +992,8 @@ func (r *Reader) SetOffset(offset int64) error { r.offset = offset if r.version != 0 { - r.start(r.config.Topic, map[int]int64{r.config.Partition: r.offset}) + key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)} + r.start(map[topicPartition]int64{key: r.offset}) } r.activateReadLag() @@ -1116,7 +1122,7 @@ func (r *Reader) readLag(ctx context.Context) { } } -func (r *Reader) start(topic string, offsetsByPartition map[int]int64) { +func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { if r.closed { // don't start child reader if parent Reader is closed return @@ -1129,8 +1135,8 @@ func (r *Reader) start(topic string, offsetsByPartition map[int]int64) { r.version++ r.join.Add(len(offsetsByPartition)) - for partition, offset := range offsetsByPartition { - go func(ctx context.Context, partition int, offset int64, join *sync.WaitGroup) { + for key, offset := range offsetsByPartition { + go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) { defer join.Done() (&reader{ @@ -1138,8 +1144,8 @@ func (r *Reader) start(topic string, offsetsByPartition map[int]int64) { logger: r.config.Logger, errorLogger: r.config.ErrorLogger, brokers: r.config.Brokers, - topic: topic, - partition: partition, + topic: key.topic, + partition: int(key.partition), minBytes: r.config.MinBytes, maxBytes: r.config.MaxBytes, maxWait: r.config.MaxWait, @@ -1151,7 +1157,7 @@ func (r *Reader) start(topic string, offsetsByPartition map[int]int64) { isolationLevel: r.config.IsolationLevel, maxAttempts: r.config.MaxAttempts, }).run(ctx, offset) - }(ctx, partition, offset, &r.join) + }(ctx, key, offset, &r.join) } } From ee33235b1894deb4a441292b200c10fd9c0b6109 Mon Sep 17 00:00:00 2001 From: Dominic Barnes Date: Fri, 5 Feb 2021 09:57:08 -0800 Subject: [PATCH 6/6] cleanup --- reader.go | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/reader.go b/reader.go index 514e33354..fbd0c563b 100644 --- a/reader.go +++ b/reader.go @@ -87,7 +87,7 @@ type Reader struct { // useConsumerGroup indicates whether the Reader is part of a consumer group. func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" } -func (r *Reader) getConusmerGroupTopics() []string { +func (r *Reader) getTopics() []string { if len(r.config.GroupTopics) > 0 { return r.config.GroupTopics[:] } @@ -683,7 +683,7 @@ func NewReader(config ReaderConfig) *Reader { ID: r.config.GroupID, Brokers: r.config.Brokers, Dialer: r.config.Dialer, - Topics: r.getConusmerGroupTopics(), + Topics: r.getTopics(), GroupBalancers: r.config.GroupBalancers, HeartbeatInterval: r.config.HeartbeatInterval, PartitionWatchInterval: r.config.PartitionWatchInterval, @@ -777,8 +777,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) { r.mutex.Lock() if !r.closed && r.version == 0 { - key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)} - r.start(map[topicPartition]int64{key: r.offset}) + r.start(r.getTopicPartitionOffset()) } version := r.version @@ -992,8 +991,7 @@ func (r *Reader) SetOffset(offset int64) error { r.offset = offset if r.version != 0 { - key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)} - r.start(map[topicPartition]int64{key: r.offset}) + r.start(r.getTopicPartitionOffset()) } r.activateReadLag() @@ -1072,6 +1070,11 @@ func (r *Reader) Stats() ReaderStats { return stats } +func (r *Reader) getTopicPartitionOffset() map[topicPartition]int64 { + key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)} + return map[topicPartition]int64{key: r.offset} +} + func (r *Reader) withLogger(do func(Logger)) { if r.config.Logger != nil { do(r.config.Logger)