Skip to content

Commit

Permalink
Merge pull request #387 from JohnRoesler/support-clustered-kafka
Browse files Browse the repository at this point in the history
support clustered kafka brokers in the form of string delimited list
  • Loading branch information
maxekman committed Apr 15, 2022
2 parents d9133a0 + b986832 commit ba75821
Showing 1 changed file with 4 additions and 2 deletions.
6 changes: 4 additions & 2 deletions eventbus/kafka/eventbus.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"errors"
"fmt"
"log"
"strings"
"sync"
"time"

Expand All @@ -32,7 +33,7 @@ import (
// to all matching registered handlers, in order of registration.
type EventBus struct {
// TODO: Support multiple brokers.
addr string
addr string // comma delimited list of brokers
appID string
topic string
startOffset int64
Expand Down Expand Up @@ -216,8 +217,9 @@ func (b *EventBus) AddHandler(ctx context.Context, m eh.EventMatcher, h eh.Event

// Get or create the subscription.
groupID := b.appID + "_" + h.HandlerType().String()

r := kafka.NewReader(kafka.ReaderConfig{
Brokers: []string{b.addr},
Brokers: strings.Split(b.addr, ","),
Topic: b.topic,
GroupID: groupID, // Send messages to only one subscriber per group.
MaxWait: time.Second, // Allow to exit readloop in max 1s.
Expand Down

0 comments on commit ba75821

Please sign in to comment.