Skip to content

Commit

Permalink
pr changes
Browse files Browse the repository at this point in the history
Signed-off-by: Avraham Shalev <8184528+avrahams@users.noreply.github.com>
  • Loading branch information
avrahams committed Aug 24, 2023
1 parent dde7022 commit 968bd5a
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 5 deletions.
21 changes: 17 additions & 4 deletions pulsar/connector/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

type createConsumerOptions struct {
Topic TopicName
Topics []string
Topics []TopicName
SubscriptionName string
MaxDeliveryAttempts uint32
RedeliveryDelay time.Duration
Expand All @@ -31,6 +31,9 @@ func (opt *createConsumerOptions) validate() error {
if opt.Topic == "" && len(opt.Topics) == 0 {
return fmt.Errorf("topic or topics must be specified")
}
if opt.Topic != "" && len(opt.Topics) != 0 {
return fmt.Errorf("cannot specify both topic and topics")
}
if opt.SubscriptionName == "" {
return fmt.Errorf("subscription name must be specified")
}
Expand Down Expand Up @@ -74,7 +77,7 @@ func WithTopic(topic TopicName) CreateConsumerOption {
}
}

func WithTopics(topics []string) CreateConsumerOption {
func WithTopics(topics []TopicName) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.Topics = topics
}
Expand Down Expand Up @@ -105,9 +108,19 @@ func CreateSharedConsumer(pulsarClient pulsar.Client, createConsumerOpts ...Crea
if opts.MaxDeliveryAttempts != 0 {
dlq = NewDlq(opts.Topic, opts.MaxDeliveryAttempts)
}
var topic string
var topics []string
if opts.Topic != "" {
topic = GetTopic(opts.Topic)
} else {
topics = make([]string, len(opts.Topics))
for i, t := range opts.Topics {
topics[i] = GetTopic(t)
}
}
return pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: GetTopic(opts.Topic),
Topics: opts.Topics,
Topic: topic,
Topics: topics,
SubscriptionName: opts.SubscriptionName,
Type: pulsar.Shared,
MessageChannel: opts.MessageChannel,
Expand Down
2 changes: 1 addition & 1 deletion pulsar/connector/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (suite *MainTestSuite) TestCreateConsumer() {
//create consumer
chan1 := make(chan pulsar.ConsumerMessage)

consumer, err := CreateSharedConsumer(suite.pulsarClient, WithMessageChannel(chan1), WithTopic("test-topic"), WithSubscriptionName("test-subscription"), WithTopics([]string{"test-topic"}))
consumer, err := CreateSharedConsumer(suite.pulsarClient, WithMessageChannel(chan1), WithTopic("test-topic"), WithSubscriptionName("test-subscription"))
if err != nil {
suite.FailNow("failed to create consumer", err.Error())
}
Expand Down

0 comments on commit 968bd5a

Please sign in to comment.