From 968bd5a24a5f0acfe6860e02f16456ab7a58a029 Mon Sep 17 00:00:00 2001 From: Avraham Shalev <8184528+avrahams@users.noreply.github.com> Date: Thu, 24 Aug 2023 14:31:18 +0300 Subject: [PATCH] pr changes Signed-off-by: Avraham Shalev <8184528+avrahams@users.noreply.github.com> --- pulsar/connector/consumer.go | 21 +++++++++++++++++---- pulsar/connector/suite_test.go | 2 +- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/pulsar/connector/consumer.go b/pulsar/connector/consumer.go index 7dd7eb0..ae2c30e 100644 --- a/pulsar/connector/consumer.go +++ b/pulsar/connector/consumer.go @@ -9,7 +9,7 @@ import ( type createConsumerOptions struct { Topic TopicName - Topics []string + Topics []TopicName SubscriptionName string MaxDeliveryAttempts uint32 RedeliveryDelay time.Duration @@ -31,6 +31,9 @@ func (opt *createConsumerOptions) validate() error { if opt.Topic == "" && len(opt.Topics) == 0 { return fmt.Errorf("topic or topics must be specified") } + if opt.Topic != "" && len(opt.Topics) != 0 { + return fmt.Errorf("cannot specify both topic and topics") + } if opt.SubscriptionName == "" { return fmt.Errorf("subscription name must be specified") } @@ -74,7 +77,7 @@ func WithTopic(topic TopicName) CreateConsumerOption { } } -func WithTopics(topics []string) CreateConsumerOption { +func WithTopics(topics []TopicName) CreateConsumerOption { return func(o *createConsumerOptions) { o.Topics = topics } @@ -105,9 +108,19 @@ func CreateSharedConsumer(pulsarClient pulsar.Client, createConsumerOpts ...Crea if opts.MaxDeliveryAttempts != 0 { dlq = NewDlq(opts.Topic, opts.MaxDeliveryAttempts) } + var topic string + var topics []string + if opts.Topic != "" { + topic = GetTopic(opts.Topic) + } else { + topics = make([]string, len(opts.Topics)) + for i, t := range opts.Topics { + topics[i] = GetTopic(t) + } + } return pulsarClient.Subscribe(pulsar.ConsumerOptions{ - Topic: GetTopic(opts.Topic), - Topics: opts.Topics, + Topic: topic, + Topics: topics, SubscriptionName: opts.SubscriptionName, Type: pulsar.Shared, MessageChannel: opts.MessageChannel, diff --git a/pulsar/connector/suite_test.go b/pulsar/connector/suite_test.go index 7cf7d52..b0ec98b 100644 --- a/pulsar/connector/suite_test.go +++ b/pulsar/connector/suite_test.go @@ -82,7 +82,7 @@ func (suite *MainTestSuite) TestCreateConsumer() { //create consumer chan1 := make(chan pulsar.ConsumerMessage) - consumer, err := CreateSharedConsumer(suite.pulsarClient, WithMessageChannel(chan1), WithTopic("test-topic"), WithSubscriptionName("test-subscription"), WithTopics([]string{"test-topic"})) + consumer, err := CreateSharedConsumer(suite.pulsarClient, WithMessageChannel(chan1), WithTopic("test-topic"), WithSubscriptionName("test-subscription")) if err != nil { suite.FailNow("failed to create consumer", err.Error()) }