Skip to content

Commit

Permalink
Merge 2dce0d0 into 0c02081
Browse files Browse the repository at this point in the history
  • Loading branch information
avrahams committed Aug 24, 2023
2 parents 0c02081 + 2dce0d0 commit a31e9f3
Show file tree
Hide file tree
Showing 10 changed files with 572 additions and 67 deletions.
9 changes: 9 additions & 0 deletions pulsar/common/topics/topicnames.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package topics

import "github.com/kubescape/messaging/pulsar/connector"

// Known topics names
const (
//Event broadcaster topic
TopicNameBroadcastEvent connector.TopicName = "broadcast-event-V1"
)
126 changes: 126 additions & 0 deletions pulsar/connector/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package connector

import (
"testing"
"time"
)

func TestNackBackoffPolicy(t *testing.T) {
tests := []struct {
name string
minMultiplier uint32
maxMultiplier uint32
baseDelay time.Duration
redeliveryCount uint32
expectedDelay time.Duration
expectedValidationErr bool
}{
{
name: "Base delay 100ms, Redelivery count 1",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 100 * time.Millisecond,
redeliveryCount: 1,
expectedDelay: 200 * time.Millisecond,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count 2",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 2,
expectedDelay: 3 * time.Second,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count 5",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 5,
expectedDelay: 5 * time.Second,
expectedValidationErr: false,
},
// Test cases targeting maxMultiplier
{
name: "Base delay 100ms, Redelivery count at maxMultiplier",
minMultiplier: 1,
maxMultiplier: 3,
baseDelay: 100 * time.Millisecond,
redeliveryCount: 3,
expectedDelay: 300 * time.Millisecond,
expectedValidationErr: false,
},

{
name: "Base delay 1s, Redelivery count at maxMultiplier",
minMultiplier: 1,
maxMultiplier: 4,
baseDelay: 1 * time.Second,
redeliveryCount: 4,
expectedDelay: 4 * time.Second,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count beyond maxMultiplier",
minMultiplier: 1,
maxMultiplier: 2,
baseDelay: 1 * time.Second,
redeliveryCount: 3,
expectedDelay: 2 * time.Second, // Should cap at maxMultiplier
expectedValidationErr: false,
},
// Additional test cases
{
name: "Base delay 100ms, Redelivery count 0",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 100 * time.Millisecond,
redeliveryCount: 0,
expectedDelay: 100 * time.Millisecond,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count at maxMultiplier",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 5,
expectedDelay: 5 * time.Second,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count beyond maxMultiplier",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 6,
expectedDelay: 5 * time.Second,
expectedValidationErr: false,
},
{
name: "Invalid configuration: minMultiplier > maxMultiplier",
minMultiplier: 6,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 1,
expectedValidationErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nbp, err := NewNackBackoffPolicy(tt.minMultiplier, tt.maxMultiplier, tt.baseDelay)
if (err != nil) != tt.expectedValidationErr {
t.Fatalf("Expected validation error: %v, got: %v", tt.expectedValidationErr, err)
}
if err == nil {
delay := nbp.Next(tt.redeliveryCount)
if delay != tt.expectedDelay {
t.Fatalf("Expected delay: %v, got: %v", tt.expectedDelay, delay)
}
}
})
}
}
43 changes: 16 additions & 27 deletions pulsar/connector/basic_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (p TestPayloadImplInterface) GetId() string {
return fmt.Sprintf("%v", p.Id)
}

const TestTopicName = "test-topic"
const TestTopicName TopicName = "test-topic"
const TestSubscriptionName = "test-consumer"
const testProducerName = "test-producer"

Expand Down Expand Up @@ -89,7 +89,7 @@ func CreateTestProducer(ctx context.Context, config *config.PulsarConfig) (pulsa
}

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: GetTopic(TestTopicName),
Topic: BuildPersistentTopic(GetClientConfig().Tenant, GetClientConfig().Namespace, TestTopicName),
Name: testProducerName,
Interceptors: tracer.NewProducerInterceptors(ctx),
})
Expand All @@ -111,20 +111,14 @@ func CreateTestConsumer(ctx context.Context, config *config.PulsarConfig) (pulsa
return nil, err
}

topic := GetTopic(TestTopicName)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: TestSubscriptionName,
Type: pulsar.Shared,
DLQ: NewDlq(topic, ctx),
Interceptors: tracer.NewConsumerInterceptors(ctx),
NackRedeliveryDelay: time.Duration(config.RedeliveryDelaySeconds) * time.Second,
EnableDefaultNackBackoffPolicy: true,
})
if consumer != nil && err == nil {
utils.SetContextConsumer(ctx, consumer)
}
return consumer, err
return CreateSharedConsumer(client,
WithTopic(TestTopicName),
WithSubscriptionName(TestSubscriptionName),
WithRedeliveryDelay(time.Duration(config.RedeliveryDelaySeconds)*time.Second),
WithDLQ(uint32(GetClientConfig().MaxDeliveryAttempts)),
WithDefaultBackoffPolicy(),
)

}

func CreateTestDlqConsumer(config *config.PulsarConfig) (pulsar.Consumer, error) {
Expand All @@ -133,17 +127,12 @@ func CreateTestDlqConsumer(config *config.PulsarConfig) (pulsar.Consumer, error)
return nil, err
}

topic := GetTopic(TestTopicName + "-dlq")
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "Test",
Type: pulsar.Shared,
})
if err != nil {
return nil, err
}

return consumer, nil
return CreateSharedConsumer(client,
WithTopic(TestTopicName),
WithSubscriptionName(TestSubscriptionName+"-dlq"),
WithRedeliveryDelay(0),
WithDLQ(0),
)
}

func (suite *MainTestSuite) TestDLQ() {
Expand Down
123 changes: 109 additions & 14 deletions pulsar/connector/consumer.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,98 @@
package connector

import (
"fmt"
"time"

"github.com/apache/pulsar-client-go/pulsar"
)

type createConsumerOptions struct {
Topic string
Topics []string
SubscriptionName string
MessageChannel chan pulsar.ConsumerMessage
Topic TopicName
Topics []TopicName
SubscriptionName string
MaxDeliveryAttempts uint32
RedeliveryDelay time.Duration
MessageChannel chan pulsar.ConsumerMessage
DefaultBackoffPolicy bool
BackoffPolicy pulsar.NackBackoffPolicy
Tenant string
Namespace string
}

func (opt *createConsumerOptions) defaults() {
if opt.MaxDeliveryAttempts == 0 {
opt.MaxDeliveryAttempts = uint32(GetClientConfig().MaxDeliveryAttempts)
}
if opt.RedeliveryDelay == 0 {
opt.RedeliveryDelay = time.Duration(GetClientConfig().RedeliveryDelaySeconds)
}
if opt.Tenant == "" {
opt.Tenant = GetClientConfig().Tenant
}
if opt.Namespace == "" {
opt.Namespace = GetClientConfig().Namespace
}
}

func (opt *createConsumerOptions) validate() error {
if opt.Topic == "" && len(opt.Topics) == 0 {
return fmt.Errorf("topic or topics must be specified")
}
if opt.Topic != "" && len(opt.Topics) != 0 {
return fmt.Errorf("cannot specify both topic and topics")
}
if opt.SubscriptionName == "" {
return fmt.Errorf("subscription name must be specified")
}
if opt.DefaultBackoffPolicy && opt.BackoffPolicy != nil {
return fmt.Errorf("cannot specify both default backoff policy and backoff policy")
}
return nil
}

func WithNamespace(tenant, namespace string) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.Tenant = tenant
o.Namespace = namespace
}
}

type CreateConsumerOption func(*createConsumerOptions)

func WithTopic(topic string) CreateConsumerOption {
func WithRedeliveryDelay(redeliveryDelay time.Duration) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.RedeliveryDelay = redeliveryDelay
}
}

func WithBackoffPolicy(backoffPolicy pulsar.NackBackoffPolicy) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.BackoffPolicy = backoffPolicy
}
}

func WithDefaultBackoffPolicy() CreateConsumerOption {
return func(o *createConsumerOptions) {
o.DefaultBackoffPolicy = true
}
}

// maxDeliveryAttempts before sending to DLQ - 0 means no DLQ
// by default, maxDeliveryAttempts is 5
func WithDLQ(maxDeliveryAttempts uint32) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.MaxDeliveryAttempts = maxDeliveryAttempts
}
}

func WithTopic(topic TopicName) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.Topic = topic
}
}

func WithTopics(topics []string) CreateConsumerOption {
func WithTopics(topics []TopicName) CreateConsumerOption {
return func(o *createConsumerOptions) {
o.Topics = topics
}
Expand All @@ -37,19 +110,41 @@ func WithMessageChannel(messageChannel chan pulsar.ConsumerMessage) CreateConsum
}
}

func CreateConsumer(pulsarClient pulsar.Client, createConsumerOpts ...CreateConsumerOption) (pulsar.Consumer, error) {
func CreateSharedConsumer(pulsarClient pulsar.Client, createConsumerOpts ...CreateConsumerOption) (pulsar.Consumer, error) {
opts := &createConsumerOptions{}
opts.defaults()
for _, o := range createConsumerOpts {
o(opts)
}
if err := opts.validate(); err != nil {
return nil, err
}
var dlq *pulsar.DLQPolicy
if opts.MaxDeliveryAttempts != 0 {
dlq = NewDlq(opts.Tenant, opts.Namespace, opts.Topic, opts.MaxDeliveryAttempts)
}
var topic string
var topics []string
if opts.Topic != "" {
topic = BuildPersistentTopic(opts.Tenant, opts.Namespace, opts.Topic)
} else {
topics = make([]string, len(opts.Topics))
for i, t := range opts.Topics {
topics[i] = BuildPersistentTopic(opts.Tenant, opts.Namespace, t)
}
}
return pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
Topics: topics,
SubscriptionName: opts.SubscriptionName,
Type: pulsar.Shared,
MessageChannel: opts.MessageChannel,
DLQ: dlq,
EnableDefaultNackBackoffPolicy: opts.DefaultBackoffPolicy,

consumer, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
Topic: opts.Topic,
Topics: opts.Topics,
SubscriptionName: opts.SubscriptionName,
Type: pulsar.Shared,
MessageChannel: opts.MessageChannel,
// Interceptors: tracer.NewConsumerInterceptors(ctx),
NackRedeliveryDelay: opts.RedeliveryDelay,
NackBackoffPolicy: opts.BackoffPolicy,
})

return consumer, err
}
Loading

0 comments on commit a31e9f3

Please sign in to comment.