Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
14 changes: 14 additions & 0 deletions kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,3 +171,17 @@ func BenchmarkUnmarshal(b *testing.B) {
})
}
}

type testKafkaLogger struct {
Prefix string
T *testing.T
}

func newTestKafkaLogger(t *testing.T, prefix string) Logger {
return &testKafkaLogger{Prefix: prefix, T: t}
}

func (l *testKafkaLogger) Printf(msg string, args ...interface{}) {
l.T.Helper()
l.T.Logf(l.Prefix+" "+msg, args...)
}
72 changes: 50 additions & 22 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,14 @@ type Reader struct {
// useConsumerGroup indicates whether the Reader is part of a consumer group.
func (r *Reader) useConsumerGroup() bool { return r.config.GroupID != "" }

func (r *Reader) getTopics() []string {
if len(r.config.GroupTopics) > 0 {
return r.config.GroupTopics[:]
}

return []string{r.config.Topic}
}

// useSyncCommits indicates whether the Reader is configured to perform sync or
// async commits.
func (r *Reader) useSyncCommits() bool { return r.config.CommitInterval == 0 }
Expand All @@ -104,18 +112,24 @@ func (r *Reader) unsubscribe() {
// another consumer to avoid such a race.
}

func (r *Reader) subscribe(assignments []PartitionAssignment) {
offsetsByPartition := make(map[int]int64)
for _, assignment := range assignments {
offsetsByPartition[assignment.ID] = assignment.Offset
func (r *Reader) subscribe(allAssignments map[string][]PartitionAssignment) {
offsets := make(map[topicPartition]int64)
for topic, assignments := range allAssignments {
for _, assignment := range assignments {
key := topicPartition{
topic: topic,
partition: int32(assignment.ID),
}
offsets[key] = assignment.Offset
}
}

r.mutex.Lock()
r.start(offsetsByPartition)
r.start(offsets)
r.mutex.Unlock()

r.withLogger(func(l Logger) {
l.Printf("subscribed to partitions: %+v", offsetsByPartition)
l.Printf("subscribed to topics and partitions: %+v", offsets)
})
}

Expand Down Expand Up @@ -302,7 +316,7 @@ func (r *Reader) run(cg *ConsumerGroup) {

r.stats.rebalances.observe(1)

r.subscribe(gen.Assignments[r.config.Topic])
r.subscribe(gen.Assignments)

gen.Start(func(ctx context.Context) {
r.commitLoop(ctx, gen)
Expand Down Expand Up @@ -330,6 +344,11 @@ type ReaderConfig struct {
// Partition should NOT be specified e.g. 0
GroupID string

// GroupTopics allows specifying multiple topics, but can only be used in
// combination with GroupID, as it is a consumer-group feature. As such, if
// GroupID is set, then either Topic or GroupTopics must be defined.
GroupTopics []string

// The topic to read messages from.
Topic string

Expand Down Expand Up @@ -473,10 +492,6 @@ func (config *ReaderConfig) Validate() error {
return errors.New("cannot create a new kafka reader with an empty list of broker addresses")
}

if len(config.Topic) == 0 {
return errors.New("cannot create a new kafka reader with an empty topic")
}

if config.Partition < 0 || config.Partition >= math.MaxInt32 {
return errors.New(fmt.Sprintf("partition number out of bounds: %d", config.Partition))
}
Expand All @@ -489,8 +504,16 @@ func (config *ReaderConfig) Validate() error {
return errors.New(fmt.Sprintf("invalid negative maximum batch size (max = %d)", config.MaxBytes))
}

if config.GroupID != "" && config.Partition != 0 {
return errors.New("either Partition or GroupID may be specified, but not both")
if config.GroupID != "" {
if config.Partition != 0 {
return errors.New("either Partition or GroupID may be specified, but not both")
}

if len(config.Topic) == 0 && len(config.GroupTopics) == 0 {
return errors.New("either Topic or GroupTopics must be specified with GroupID")
}
} else if len(config.Topic) == 0 {
return errors.New("cannot create a new kafka reader with an empty topic")
}

if config.MinBytes > config.MaxBytes {
Expand Down Expand Up @@ -660,7 +683,7 @@ func NewReader(config ReaderConfig) *Reader {
ID: r.config.GroupID,
Brokers: r.config.Brokers,
Dialer: r.config.Dialer,
Topics: []string{r.config.Topic},
Topics: r.getTopics(),
GroupBalancers: r.config.GroupBalancers,
HeartbeatInterval: r.config.HeartbeatInterval,
PartitionWatchInterval: r.config.PartitionWatchInterval,
Expand Down Expand Up @@ -754,7 +777,7 @@ func (r *Reader) FetchMessage(ctx context.Context) (Message, error) {
r.mutex.Lock()

if !r.closed && r.version == 0 {
r.start(map[int]int64{r.config.Partition: r.offset})
r.start(r.getTopicPartitionOffset())
}

version := r.version
Expand Down Expand Up @@ -968,7 +991,7 @@ func (r *Reader) SetOffset(offset int64) error {
r.offset = offset

if r.version != 0 {
r.start(map[int]int64{r.config.Partition: r.offset})
r.start(r.getTopicPartitionOffset())
}

r.activateReadLag()
Expand Down Expand Up @@ -1047,6 +1070,11 @@ func (r *Reader) Stats() ReaderStats {
return stats
}

func (r *Reader) getTopicPartitionOffset() map[topicPartition]int64 {
key := topicPartition{topic: r.config.Topic, partition: int32(r.config.Partition)}
return map[topicPartition]int64{key: r.offset}
}

func (r *Reader) withLogger(do func(Logger)) {
if r.config.Logger != nil {
do(r.config.Logger)
Expand Down Expand Up @@ -1097,7 +1125,7 @@ func (r *Reader) readLag(ctx context.Context) {
}
}

func (r *Reader) start(offsetsByPartition map[int]int64) {
func (r *Reader) start(offsetsByPartition map[topicPartition]int64) {
if r.closed {
// don't start child reader if parent Reader is closed
return
Expand All @@ -1110,17 +1138,17 @@ func (r *Reader) start(offsetsByPartition map[int]int64) {
r.version++

r.join.Add(len(offsetsByPartition))
for partition, offset := range offsetsByPartition {
go func(ctx context.Context, partition int, offset int64, join *sync.WaitGroup) {
for key, offset := range offsetsByPartition {
go func(ctx context.Context, key topicPartition, offset int64, join *sync.WaitGroup) {
defer join.Done()

(&reader{
dialer: r.config.Dialer,
logger: r.config.Logger,
errorLogger: r.config.ErrorLogger,
brokers: r.config.Brokers,
topic: r.config.Topic,
partition: partition,
topic: key.topic,
partition: int(key.partition),
minBytes: r.config.MinBytes,
maxBytes: r.config.MaxBytes,
maxWait: r.config.MaxWait,
Expand All @@ -1132,7 +1160,7 @@ func (r *Reader) start(offsetsByPartition map[int]int64) {
isolationLevel: r.config.IsolationLevel,
maxAttempts: r.config.MaxAttempts,
}).run(ctx, offset)
}(ctx, partition, offset, &r.join)
}(ctx, key, offset, &r.join)
}
}

Expand Down
175 changes: 175 additions & 0 deletions reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package kafka

import (
"context"
"fmt"
"io"
"math/rand"
"net"
Expand Down Expand Up @@ -1342,6 +1343,180 @@ func TestConsumerGroupWithMissingTopic(t *testing.T) {
}
}

func TestConsumerGroupWithTopic(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

conf := ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: makeGroupID(),
Topic: makeTopic(),
MaxWait: time.Second,
PartitionWatchInterval: 100 * time.Millisecond,
WatchPartitionChanges: true,
Logger: newTestKafkaLogger(t, "Reader:"),
}

r := NewReader(conf)
defer r.Close()

recvErr := make(chan error, len(conf.GroupTopics))
go func() {
msg, err := r.ReadMessage(ctx)
t.Log(msg)
recvErr <- err
}()

time.Sleep(conf.MaxWait)

client, shutdown := newLocalClientWithTopic(conf.Topic, 1)
defer shutdown()

w := &Writer{
Addr: TCP(r.config.Brokers...),
Topic: conf.Topic,
BatchTimeout: 10 * time.Millisecond,
BatchSize: 1,
Transport: client.Transport,
Logger: newTestKafkaLogger(t, "Writer:"),
}
defer w.Close()
if err := w.WriteMessages(ctx, Message{Value: []byte(conf.Topic)}); err != nil {
t.Fatalf("write error: %+v", err)
}

if err := <-recvErr; err != nil {
t.Fatalf("read error: %+v", err)
}

nMsgs := r.Stats().Messages
if nMsgs != 1 {
t.Fatalf("expected to receive 1 message, but got %d", nMsgs)
}
}

func TestConsumerGroupWithGroupTopicsSingle(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

conf := ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: makeGroupID(),
GroupTopics: []string{makeTopic()},
MaxWait: time.Second,
PartitionWatchInterval: 100 * time.Millisecond,
WatchPartitionChanges: true,
Logger: newTestKafkaLogger(t, "Reader:"),
}

r := NewReader(conf)
defer r.Close()

recvErr := make(chan error, len(conf.GroupTopics))
go func() {
msg, err := r.ReadMessage(ctx)
t.Log(msg)
recvErr <- err
}()

time.Sleep(conf.MaxWait)

for i, topic := range conf.GroupTopics {
client, shutdown := newLocalClientWithTopic(topic, 1)
defer shutdown()

w := &Writer{
Addr: TCP(r.config.Brokers...),
Topic: topic,
BatchTimeout: 10 * time.Millisecond,
BatchSize: 1,
Transport: client.Transport,
Logger: newTestKafkaLogger(t, fmt.Sprintf("Writer(%d):", i)),
}
defer w.Close()
if err := w.WriteMessages(ctx, Message{Value: []byte(topic)}); err != nil {
t.Fatalf("write error: %+v", err)
}
}

if err := <-recvErr; err != nil {
t.Fatalf("read error: %+v", err)
}

nMsgs := r.Stats().Messages
if nMsgs != int64(len(conf.GroupTopics)) {
t.Fatalf("expected to receive %d messages, but got %d", len(conf.GroupTopics), nMsgs)
}
}

func TestConsumerGroupWithGroupTopicsMultple(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

client, shutdown := newLocalClient()
defer shutdown()

conf := ReaderConfig{
Brokers: []string{"localhost:9092"},
GroupID: makeGroupID(),
GroupTopics: []string{makeTopic(), makeTopic()},
MaxWait: time.Second,
PartitionWatchInterval: 100 * time.Millisecond,
WatchPartitionChanges: true,
Logger: newTestKafkaLogger(t, "Reader:"),
}

r := NewReader(conf)

w := &Writer{
Addr: TCP(r.config.Brokers...),
BatchTimeout: 10 * time.Millisecond,
BatchSize: 1,
Transport: client.Transport,
Logger: newTestKafkaLogger(t, "Writer:"),
}
defer w.Close()

time.Sleep(time.Second)

var msgs []Message
for _, topic := range conf.GroupTopics {
msgs = append(msgs, Message{Topic: topic})
}
if err := w.WriteMessages(ctx, msgs...); err != nil {
t.Logf("write error: %+v", err)
}

wg := new(sync.WaitGroup)
wg.Add(len(msgs))

go func() {
wg.Wait()
t.Log("closing reader")
r.Close()
}()

for {
msg, err := r.ReadMessage(ctx)
if err != nil {
if err == io.EOF {
t.Log("reader closed")
break
}

t.Fatalf("read error: %+v", err)
} else {
t.Logf("message read: %+v", msg)
wg.Done()
}
}

nMsgs := r.Stats().Messages
if nMsgs != int64(len(conf.GroupTopics)) {
t.Fatalf("expected to receive %d messages, but got %d", len(conf.GroupTopics), nMsgs)
}
}

func getOffsets(t *testing.T, config ReaderConfig) map[int]int64 {
// minimal config required to lookup coordinator
cg := ConsumerGroup{
Expand Down