diff --git a/go.sum b/go.sum index 411e17969..78511c0c5 100644 --- a/go.sum +++ b/go.sum @@ -26,6 +26,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/kafka_test.go b/kafka_test.go index f66855cd7..a82e844aa 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -171,3 +171,17 @@ func BenchmarkUnmarshal(b *testing.B) { }) } } + +type testKafkaLogger struct { + Prefix string + T *testing.T +} + +func newTestKafkaLogger(t *testing.T, prefix string) Logger { + return &testKafkaLogger{Prefix: prefix, T: t} +} + +func (l *testKafkaLogger) Printf(msg string, args ...interface{}) { + l.T.Helper() + l.T.Logf(l.Prefix+" "+msg, args...) +} diff --git a/reader.go b/reader.go index 22e0bd55b..fbd0c563b 100644 --- a/reader.go +++ b/reader.go @@ -87,6 +87,14 @@ type Reader struct { // useConsumerGroup indicates whether the Reader is part of a consumer group. func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" } +func (r *Reader) getTopics() []string { + if len(r.config.GroupTopics) > 0 { + return r.config.GroupTopics[:] + } + + return []string{r.config.Topic} +} + // useSyncCommits indicates whether the Reader is configured to perform sync or // async commits. func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 } @@ -104,18 +112,24 @@ func (r *Reader) unsubscribe() { // another consumer to avoid such a race. } -func (r *Reader) subscribe(assignments []PartitionAssignment) { - offsetsByPartition := make(map[int]int64) - for _, assignment := range assignments { - offsetsByPartition[assignment.ID] = assignment.Offset +func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) { + offsets := make(map[topicPartition]int64) + for topic, assignments := range allAssignments { + for _, assignment := range assignments { + key := topicPartition{ + topic: topic, + partition: int32(assignment.ID), + } + offsets[key] = assignment.Offset + } } r.mutex.Lock() - r.start(offsetsByPartition) + r.start(offsets) r.mutex.Unlock() r.withLogger(func(l Logger) { - l.Printf("subscribed to partitions: %+v", offsetsByPartition) + l.Printf("subscribed to topics and partitions: %+v", offsets) }) } @@ -302,7 +316,7 @@ func (r *Reader) run(cg *ConsumerGroup) { r.stats.rebalances.observe(1) - r.subscribe(gen.Assignments[r.config.Topic]) + r.subscribe(gen.Assignments) gen.Start(func(ctx context.Context) { r.commitLoop(ctx, gen) @@ -330,6 +344,11 @@ type ReaderConfig struct { // Partition should NOT be specified e.g. 0 GroupID string + // GroupTopics allows specifying multiple topics, but can only be used in + // combination with GroupID, as it is a consumer-group feature. As such, if + // GroupID is set, then either Topic or GroupTopics must be defined. + GroupTopics []string + // The topic to read messages from. Topic string @@ -473,10 +492,6 @@ func (config *ReaderConfig) Validate() error { return errors.New("cannot create a new kafka reader with an empty list of broker addresses") } - if len(config.Topic) == 0 { - return errors.New("cannot create a new kafka reader with an empty topic") - } - if config.Partition < 0 || config.Partition >= math.MaxInt32 { return errors.New(fmt.Sprintf("partition number out of bounds: %d", config.Partition)) } @@ -489,8 +504,16 @@ func (config *ReaderConfig) Validate() error { return errors.New(fmt.Sprintf("invalid negative maximum batch size (max = %d)", config.MaxBytes)) } - if config.GroupID != "" && config.Partition != 0 { - return errors.New("either Partition or GroupID may be specified, but not both") + if config.GroupID != "" { + if config.Partition != 0 { + return errors.New("either Partition or GroupID may be specified, but not both") + } + + if len(config.Topic) == 0 && len(config.GroupTopics) == 0 { + return errors.New("either Topic or GroupTopics must be specified with GroupID") + } + } else if len(config.Topic) == 0 { + return errors.New("cannot create a new kafka reader with an empty topic") } if config.MinBytes > config.MaxBytes { @@ -660,7 +683,7 @@ func NewReader(config ReaderConfig) *Reader { ID: r.config.GroupID, Brokers: r.config.Brokers, Dialer: r.config.Dialer, - Topics: []string{r.config.Topic}, + Topics: r.getTopics(), GroupBalancers: r.config.GroupBalancers, HeartbeatInterval: r.config.HeartbeatInterval, PartitionWatchInterval: r.config.PartitionWatchInterval, @@ -754,7 +777,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) { r.mutex.Lock() if !r.closed && r.version == 0 { - r.start(map[int]int64{r.config.Partition: r.offset}) + r.start(r.getTopicPartitionOffset()) } version := r.version @@ -968,7 +991,7 @@ func (r *Reader) SetOffset(offset int64) error { r.offset = offset if r.version != 0 { - r.start(map[int]int64{r.config.Partition: r.offset}) + r.start(r.getTopicPartitionOffset()) } r.activateReadLag() @@ -1047,6 +1070,11 @@ func (r *Reader) Stats() ReaderStats { return stats } +func (r *Reader) getTopicPartitionOffset() map[topicPartition]int64 { + key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)} + return map[topicPartition]int64{key: r.offset} +} + func (r *Reader) withLogger(do func(Logger)) { if r.config.Logger != nil { do(r.config.Logger) @@ -1097,7 +1125,7 @@ func (r *Reader) readLag(ctx context.Context) { } } -func (r *Reader) start(offsetsByPartition map[int]int64) { +func (r *Reader) start(offsetsByPartition map[topicPartition]int64) { if r.closed { // don't start child reader if parent Reader is closed return @@ -1110,8 +1138,8 @@ func (r *Reader) start(offsetsByPartition map[int]int64) { r.version++ r.join.Add(len(offsetsByPartition)) - for partition, offset := range offsetsByPartition { - go func(ctx context.Context, partition int, offset int64, join *sync.WaitGroup) { + for key, offset := range offsetsByPartition { + go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) { defer join.Done() (&reader{ @@ -1119,8 +1147,8 @@ func (r *Reader) start(offsetsByPartition map[int]int64) { logger: r.config.Logger, errorLogger: r.config.ErrorLogger, brokers: r.config.Brokers, - topic: r.config.Topic, - partition: partition, + topic: key.topic, + partition: int(key.partition), minBytes: r.config.MinBytes, maxBytes: r.config.MaxBytes, maxWait: r.config.MaxWait, @@ -1132,7 +1160,7 @@ func (r *Reader) start(offsetsByPartition map[int]int64) { isolationLevel: r.config.IsolationLevel, maxAttempts: r.config.MaxAttempts, }).run(ctx, offset) - }(ctx, partition, offset, &r.join) + }(ctx, key, offset, &r.join) } } diff --git a/reader_test.go b/reader_test.go index 1324483d4..0a807276e 100644 --- a/reader_test.go +++ b/reader_test.go @@ -2,6 +2,7 @@ package kafka import ( "context" + "fmt" "io" "math/rand" "net" @@ -1342,6 +1343,180 @@ func TestConsumerGroupWithMissingTopic(t *testing.T) { } } +func TestConsumerGroupWithTopic(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conf := ReaderConfig{ + Brokers: []string{"localhost:9092"}, + GroupID: makeGroupID(), + Topic: makeTopic(), + MaxWait: time.Second, + PartitionWatchInterval: 100 * time.Millisecond, + WatchPartitionChanges: true, + Logger: newTestKafkaLogger(t, "Reader:"), + } + + r := NewReader(conf) + defer r.Close() + + recvErr := make(chan error, len(conf.GroupTopics)) + go func() { + msg, err := r.ReadMessage(ctx) + t.Log(msg) + recvErr <- err + }() + + time.Sleep(conf.MaxWait) + + client, shutdown := newLocalClientWithTopic(conf.Topic, 1) + defer shutdown() + + w := &Writer{ + Addr: TCP(r.config.Brokers...), + Topic: conf.Topic, + BatchTimeout: 10 * time.Millisecond, + BatchSize: 1, + Transport: client.Transport, + Logger: newTestKafkaLogger(t, "Writer:"), + } + defer w.Close() + if err := w.WriteMessages(ctx, Message{Value: []byte(conf.Topic)}); err != nil { + t.Fatalf("write error: %+v", err) + } + + if err := <-recvErr; err != nil { + t.Fatalf("read error: %+v", err) + } + + nMsgs := r.Stats().Messages + if nMsgs != 1 { + t.Fatalf("expected to receive 1 message, but got %d", nMsgs) + } +} + +func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + conf := ReaderConfig{ + Brokers: []string{"localhost:9092"}, + GroupID: makeGroupID(), + GroupTopics: []string{makeTopic()}, + MaxWait: time.Second, + PartitionWatchInterval: 100 * time.Millisecond, + WatchPartitionChanges: true, + Logger: newTestKafkaLogger(t, "Reader:"), + } + + r := NewReader(conf) + defer r.Close() + + recvErr := make(chan error, len(conf.GroupTopics)) + go func() { + msg, err := r.ReadMessage(ctx) + t.Log(msg) + recvErr <- err + }() + + time.Sleep(conf.MaxWait) + + for i, topic := range conf.GroupTopics { + client, shutdown := newLocalClientWithTopic(topic, 1) + defer shutdown() + + w := &Writer{ + Addr: TCP(r.config.Brokers...), + Topic: topic, + BatchTimeout: 10 * time.Millisecond, + BatchSize: 1, + Transport: client.Transport, + Logger: newTestKafkaLogger(t, fmt.Sprintf("Writer(%d):", i)), + } + defer w.Close() + if err := w.WriteMessages(ctx, Message{Value: []byte(topic)}); err != nil { + t.Fatalf("write error: %+v", err) + } + } + + if err := <-recvErr; err != nil { + t.Fatalf("read error: %+v", err) + } + + nMsgs := r.Stats().Messages + if nMsgs != int64(len(conf.GroupTopics)) { + t.Fatalf("expected to receive %d messages, but got %d", len(conf.GroupTopics), nMsgs) + } +} + +func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + client, shutdown := newLocalClient() + defer shutdown() + + conf := ReaderConfig{ + Brokers: []string{"localhost:9092"}, + GroupID: makeGroupID(), + GroupTopics: []string{makeTopic(), makeTopic()}, + MaxWait: time.Second, + PartitionWatchInterval: 100 * time.Millisecond, + WatchPartitionChanges: true, + Logger: newTestKafkaLogger(t, "Reader:"), + } + + r := NewReader(conf) + + w := &Writer{ + Addr: TCP(r.config.Brokers...), + BatchTimeout: 10 * time.Millisecond, + BatchSize: 1, + Transport: client.Transport, + Logger: newTestKafkaLogger(t, "Writer:"), + } + defer w.Close() + + time.Sleep(time.Second) + + var msgs []Message + for _, topic := range conf.GroupTopics { + msgs = append(msgs, Message{Topic: topic}) + } + if err := w.WriteMessages(ctx, msgs...); err != nil { + t.Logf("write error: %+v", err) + } + + wg := new(sync.WaitGroup) + wg.Add(len(msgs)) + + go func() { + wg.Wait() + t.Log("closing reader") + r.Close() + }() + + for { + msg, err := r.ReadMessage(ctx) + if err != nil { + if err == io.EOF { + t.Log("reader closed") + break + } + + t.Fatalf("read error: %+v", err) + } else { + t.Logf("message read: %+v", msg) + wg.Done() + } + } + + nMsgs := r.Stats().Messages + if nMsgs != int64(len(conf.GroupTopics)) { + t.Fatalf("expected to receive %d messages, but got %d", len(conf.GroupTopics), nMsgs) + } +} + func getOffsets(t *testing.T, config ReaderConfig) map[int]int64 { // minimal config required to lookup coordinator cg := ConsumerGroup{