Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove global ,format Topics , consumer options and base test suite #2

Merged
merged 10 commits into from
Aug 27, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions pulsar/common/topics/topicnames.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package topics

import "github.com/kubescape/messaging/pulsar/connector"

// Known topics names
const (
//Event broadcaster topic
TopicNameBroadcastEvent connector.TopicName = "broadcast-event-V1"
)
126 changes: 126 additions & 0 deletions pulsar/connector/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package connector

import (
"testing"
"time"
)

func TestNackBackoffPolicy(t *testing.T) {
tests := []struct {
name string
minMultiplier uint32
maxMultiplier uint32
baseDelay time.Duration
redeliveryCount uint32
expectedDelay time.Duration
expectedValidationErr bool
}{
{
name: "Base delay 100ms, Redelivery count 1",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 100 * time.Millisecond,
redeliveryCount: 1,
expectedDelay: 200 * time.Millisecond,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count 2",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 2,
expectedDelay: 3 * time.Second,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count 5",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 5,
expectedDelay: 5 * time.Second,
expectedValidationErr: false,
},
// Test cases targeting maxMultiplier
{
name: "Base delay 100ms, Redelivery count at maxMultiplier",
minMultiplier: 1,
maxMultiplier: 3,
baseDelay: 100 * time.Millisecond,
redeliveryCount: 3,
expectedDelay: 300 * time.Millisecond,
expectedValidationErr: false,
},

{
name: "Base delay 1s, Redelivery count at maxMultiplier",
minMultiplier: 1,
maxMultiplier: 4,
baseDelay: 1 * time.Second,
redeliveryCount: 4,
expectedDelay: 4 * time.Second,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count beyond maxMultiplier",
minMultiplier: 1,
maxMultiplier: 2,
baseDelay: 1 * time.Second,
redeliveryCount: 3,
expectedDelay: 2 * time.Second, // Should cap at maxMultiplier
expectedValidationErr: false,
},
// Additional test cases
{
name: "Base delay 100ms, Redelivery count 0",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 100 * time.Millisecond,
redeliveryCount: 0,
expectedDelay: 100 * time.Millisecond,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count at maxMultiplier",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 5,
expectedDelay: 5 * time.Second,
expectedValidationErr: false,
},
{
name: "Base delay 1s, Redelivery count beyond maxMultiplier",
minMultiplier: 1,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 6,
expectedDelay: 5 * time.Second,
expectedValidationErr: false,
},
{
name: "Invalid configuration: minMultiplier > maxMultiplier",
minMultiplier: 6,
maxMultiplier: 5,
baseDelay: 1 * time.Second,
redeliveryCount: 1,
expectedValidationErr: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
nbp, err := NewNackBackoffPolicy(tt.minMultiplier, tt.maxMultiplier, tt.baseDelay)
if (err != nil) != tt.expectedValidationErr {
t.Fatalf("Expected validation error: %v, got: %v", tt.expectedValidationErr, err)
}
if err == nil {
delay := nbp.Next(tt.redeliveryCount)
if delay != tt.expectedDelay {
t.Fatalf("Expected delay: %v, got: %v", tt.expectedDelay, delay)
}
}
})
}
}
80 changes: 23 additions & 57 deletions pulsar/connector/basic_conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (
"time"

"github.com/apache/pulsar-client-go/pulsar"
"github.com/kubescape/messaging/pulsar/common/tracer"
"github.com/kubescape/messaging/pulsar/common/utils"
"github.com/kubescape/messaging/pulsar/config"
)

type TestPayload interface {
Expand Down Expand Up @@ -44,18 +42,17 @@ func (p TestPayloadImplInterface) GetId() string {
return fmt.Sprintf("%v", p.Id)
}

const TestTopicName = "test-topic"
const TestTopicName TopicName = "test-topic"
const TestSubscriptionName = "test-consumer"
const testProducerName = "test-producer"

func (suite *MainTestSuite) TestConsumerAndProducer() {
testConf := suite.defaultTestConfig
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

//create producer to input test payloads
pubsubCtx := utils.NewContextWithValues(ctx, "testConsumer")
producer, err := CreateTestProducer(pubsubCtx, &testConf)
producer, err := CreateTestProducer(pubsubCtx, suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error(), "create producer")
}
Expand All @@ -65,7 +62,7 @@ func (suite *MainTestSuite) TestConsumerAndProducer() {
defer producer.Close()

//create consumer to get actual payloads
consumer, err := CreateTestConsumer(pubsubCtx, &testConf)
consumer, err := CreateTestConsumer(pubsubCtx, suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error())
}
Expand All @@ -82,17 +79,9 @@ func (suite *MainTestSuite) TestConsumerAndProducer() {
}

// CreateTestProducer creates a producer
func CreateTestProducer(ctx context.Context, config *config.PulsarConfig) (pulsar.Producer, error) {
client, err := GetClientOnce(WithConfig(config))
if err != nil {
return nil, err
}
func CreateTestProducer(ctx context.Context, client PulsarClient) (pulsar.Producer, error) {

producer, err := client.CreateProducer(pulsar.ProducerOptions{
Topic: GetTopic(TestTopicName),
Name: testProducerName,
Interceptors: tracer.NewProducerInterceptors(ctx),
})
producer, err := CreateProducer(client, WithProducerTopic(TestTopicName))

if err != nil && utils.IsProducerNameExistsError(testProducerName, err) {
//other instance became the producer
Expand All @@ -105,57 +94,34 @@ func CreateTestProducer(ctx context.Context, config *config.PulsarConfig) (pulsa
return producer, err
}

func CreateTestConsumer(ctx context.Context, config *config.PulsarConfig) (pulsar.Consumer, error) {
client, err := GetClientOnce(WithConfig(config))
if err != nil {
return nil, err
}
func CreateTestConsumer(ctx context.Context, client PulsarClient) (pulsar.Consumer, error) {
return CreateSharedConsumer(client,
WithTopic(TestTopicName),
WithSubscriptionName(TestSubscriptionName),
WithRedeliveryDelay(time.Duration(client.GetConfig().RedeliveryDelaySeconds)*time.Second),
WithDLQ(uint32(client.GetConfig().MaxDeliveryAttempts)),
WithDefaultBackoffPolicy(),
)

topic := GetTopic(TestTopicName)
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: TestSubscriptionName,
Type: pulsar.Shared,
DLQ: NewDlq(topic, ctx),
Interceptors: tracer.NewConsumerInterceptors(ctx),
NackRedeliveryDelay: time.Duration(config.RedeliveryDelaySeconds) * time.Second,
EnableDefaultNackBackoffPolicy: true,
})
if consumer != nil && err == nil {
utils.SetContextConsumer(ctx, consumer)
}
return consumer, err
}

func CreateTestDlqConsumer(config *config.PulsarConfig) (pulsar.Consumer, error) {
client, err := GetClientOnce(WithConfig(config))
if err != nil {
return nil, err
}

topic := GetTopic(TestTopicName + "-dlq")
consumer, err := client.Subscribe(pulsar.ConsumerOptions{
Topic: topic,
SubscriptionName: "Test",
Type: pulsar.Shared,
})
if err != nil {
return nil, err
}

return consumer, nil
func CreateTestDlqConsumer(client PulsarClient) (pulsar.Consumer, error) {
return CreateSharedConsumer(client,
WithTopic(TestTopicName),
WithSubscriptionName(TestSubscriptionName+"-dlq"),
WithRedeliveryDelay(0),
WithDLQ(0),
)
}

func (suite *MainTestSuite) TestDLQ() {
//set test config
testConf := suite.defaultTestConfig
//start tenant check
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

//create producer to input test payloads
pubsubCtx := utils.NewContextWithValues(ctx, "testConsumer")
producer, err := CreateTestProducer(pubsubCtx, &testConf)
producer, err := CreateTestProducer(pubsubCtx, suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error(), "create producer")
}
Expand All @@ -164,12 +130,12 @@ func (suite *MainTestSuite) TestDLQ() {
}
defer producer.Close()
//create consumer to get actual payloads
consumer, err := CreateTestConsumer(pubsubCtx, &testConf)
consumer, err := CreateTestConsumer(pubsubCtx, suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error())
}
defer consumer.Close()
dlqConsumer, err := CreateTestDlqConsumer(&testConf)
dlqConsumer, err := CreateTestDlqConsumer(suite.pulsarClient)
if err != nil {
suite.FailNow(err.Error())
}
Expand Down
Loading
Loading