Skip to content

Commit

Permalink
fix(jetstream): Raise an error if leader not found and added v2.10 e2…
Browse files Browse the repository at this point in the history
…e coverage (kedacore#5358)

* fix(jetstream): Scaler doesn't print multiple (wrong) errors

Signed-off-by: Jorge Turrado <jorge.turrado@scrm.lidl>

* Add error on failed leader search

Signed-off-by: Jorge Turrado <jorge.turrado@scrm.lidl>

* update changelog

Signed-off-by: Jorge Turrado <jorge.turrado@scrm.lidl>

* .

Signed-off-by: Jorge Turrado <jorge.turrado@scrm.lidl>

---------

Signed-off-by: Jorge Turrado <jorge.turrado@scrm.lidl>
Signed-off-by: anton.lysina <alysina@gmail.com>
  • Loading branch information
JorTurFer authored and toniiiik committed Jan 15, 2024
1 parent 6f2d0cb commit 95af467
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ Here is an overview of all new **experimental** features:
- **Azure Pipelines**: No more HTTP 400 errors produced by poolName with spaces ([#5107](https://github.com/kedacore/keda/issues/5107))
- **GCP pubsub scaler**: Added `project_id` to filter for metrics queries ([#5256](https://github.com/kedacore/keda/issues/5256))
- **GCP pubsub scaler**: Missing use of default value of `value` added ([#5093](https://github.com/kedacore/keda/issues/5093))
- **NATS JetSteam Scaler**: Raise an error if leader not found ([#5358](https://github.com/kedacore/keda/pull/5358))
- **Pulsar scaler**: Fix panic when auth is not used ([#5271](https://github.com/kedacore/keda/issues/5271))
- **ScaledJobs**: Copy ScaledJob annotations to child Jobs ([#4594](https://github.com/kedacore/keda/issues/4594))

Expand Down
6 changes: 2 additions & 4 deletions pkg/scalers/nats_jetstream_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,7 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context
clusterUrls := jetStreamServerResp.ConnectUrls
if len(clusterUrls) == 0 {
isNodeAdvertised = false
// append current node's `server_name` to check if it is a leader
// even though `server_name` is not an url, it will be split by first . (dot)
// to get the node's name anyway
clusterUrls = append(clusterUrls, jetStreamServerResp.ServerName)
// jetStreamServerResp.Cluster.HostUrls contains all the cluster nodes
clusterUrls = append(clusterUrls, jetStreamServerResp.Cluster.HostUrls...)
}

Expand Down Expand Up @@ -311,6 +308,7 @@ func (s *natsJetStreamScaler) getNATSJetstreamMonitoringData(ctx context.Context
}
}
}
return fmt.Errorf("leader node not found for consumer %s", s.metadata.consumer)
}
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/nats_jetstream_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ var testNATSJetStreamMockResponses = []parseNATSJetStreamMockResponsesTestData{
Accounts: []accountDetail{{Name: "$G",
Streams: []*streamDetail{{Name: "mystream"}},
}},
}, false, false},
}, false, true},
}

var testNATSJetStreamServerMockResponses = map[string][]byte{
Expand Down
9 changes: 5 additions & 4 deletions tests/scalers/nats_jetstream/helper/nats_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@ type JetStreamTemplateData struct {
}

const (
NatsJetStreamName = "nats"
NatsJetStreamConsumerName = "PULL_CONSUMER"
NatsJetStreamChartVersion = "0.18.2"
NatsJetStreamServerVersion = "2.9.3"
NatsJetStreamName = "nats"
NatsJetStreamConsumerName = "PULL_CONSUMER"
Natsv2_10JetStreamChartVersion = "1.1.2"
NatsJetStreamChartVersion = "0.18.2"
NatsJetStreamServerVersion = "2.9.3"
)

type JetStreamDeploymentTemplateData struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@ const (
)

var (
testNamespace = fmt.Sprintf("%s-test-ns", testName)
natsNamespace = fmt.Sprintf("%s-nats-ns", testName)
natsAddress = fmt.Sprintf("nats://%s.%s.svc.cluster.local:4222", nats.NatsJetStreamName, natsNamespace)
natsServerMonitoringEndpoint = fmt.Sprintf("%s.%s.svc.cluster.local:8222", nats.NatsJetStreamName, natsNamespace)
natsHelmRepo = "https://nats-io.github.io/k8s/helm/charts/"
natsServerReplicas = 3
messagePublishCount = 300
deploymentName = "sub"
minReplicaCount = 0
maxReplicaCount = 2
testNamespace = fmt.Sprintf("%s-test-ns", testName)
natsNamespace = fmt.Sprintf("%s-nats-ns", testName)
natsAddress = fmt.Sprintf("nats://%s.%s.svc.cluster.local:4222", nats.NatsJetStreamName, natsNamespace)
natsServerMonitoringEndpoint = fmt.Sprintf("%s.%s.svc.cluster.local:8222", nats.NatsJetStreamName, natsNamespace)
natsServerHeadlessMonitoringEndpoint = fmt.Sprintf("%s-headless.%s.svc.cluster.local:8222", nats.NatsJetStreamName, natsNamespace)
natsHelmRepo = "https://nats-io.github.io/k8s/helm/charts/"
natsServerReplicas = 3
messagePublishCount = 300
deploymentName = "sub"
minReplicaCount = 0
maxReplicaCount = 2
)

func TestNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T) {
Expand Down Expand Up @@ -63,7 +64,6 @@ func testNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T, noAdvertise
assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3),
"stream and consumer creation job with 3 stream replicas should be success")

testActivation(t, kc, testData)
testScaleOut(t, kc, testData)
testScaleIn(t, kc)

Expand All @@ -72,24 +72,58 @@ func testNATSJetStreamScalerClusterWithStreamReplicas(t *testing.T, noAdvertise
assert.True(t, WaitForJobCount(t, kc, testNamespace, 0, 60, 3),
"job count in namespace should be 0")

// Create stream and consumer with 2 stream replicas
// Create single replica stream with consumer
testData.NatsStream = "case2"
installStreamAndConsumer(t, 2, testData.NatsStream, testNamespace, natsAddress)
installStreamAndConsumer(t, 1, testData.NatsStream, testNamespace, natsAddress)
KubectlApplyWithTemplate(t, testData, "scaledObjectTemplate", nats.ScaledObjectTemplate)
assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3),
"stream and consumer creation job with 1 stream replica should be success")

testScaleOut(t, kc, testData)
testScaleIn(t, kc)

// Cleanup test namespace
removeStreamAndConsumer(t, 1, testData.NatsStream, testNamespace, natsAddress)
DeleteKubernetesResources(t, testNamespace, testData, testTemplates)

// Cleanup nats namespace
removeClusterWithJetStream(t)
DeleteNamespace(t, natsNamespace)
deleted := WaitForNamespaceDeletion(t, natsNamespace)
assert.Truef(t, deleted, "%s namespace not deleted", natsNamespace)
}

func TestNATSv2_10JetStreamScalerClusterWithStreamReplicas(t *testing.T) {
// Create k8s resources.
kc := GetKubernetesClient(t)

// Deploy NATS server.
installClusterWithJetStreaV2_10(t, kc)
assert.True(t, WaitForStatefulsetReplicaReadyCount(t, kc, nats.NatsJetStreamName, natsNamespace, natsServerReplicas, 60, 3),
"replica count should be %d after 3 minutes", minReplicaCount)

// Create k8s resources for testing.
testData, testTemplates := nats.GetJetStreamDeploymentTemplateData(testNamespace, natsAddress, natsServerHeadlessMonitoringEndpoint, messagePublishCount)
CreateKubernetesResources(t, kc, testNamespace, testData, testTemplates)

// Create 3 replica stream with consumer
testData.NatsStream = "case1"
installStreamAndConsumer(t, 3, testData.NatsStream, testNamespace, natsAddress)
KubectlApplyWithTemplate(t, testData, "scaledObjectTemplate", nats.ScaledObjectTemplate)
assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3),
"stream and consumer creation job with 2 stream replicas should be success")
"stream and consumer creation job with 3 stream replicas should be success")

testActivation(t, kc, testData)
testScaleOut(t, kc, testData)
testScaleIn(t, kc)

// Remove 2 replica stream with consumer
removeStreamAndConsumer(t, 2, testData.NatsStream, testNamespace, natsAddress)
// Remove 3 replica stream with consumer
removeStreamAndConsumer(t, 3, testData.NatsStream, testNamespace, natsAddress)
assert.True(t, WaitForJobCount(t, kc, testNamespace, 0, 60, 3),
"job count in namespace should be 0")

// Create single replica stream with consumer
testData.NatsStream = "case3"
testData.NatsStream = "case2"
installStreamAndConsumer(t, 1, testData.NatsStream, testNamespace, natsAddress)
KubectlApplyWithTemplate(t, testData, "scaledObjectTemplate", nats.ScaledObjectTemplate)
assert.True(t, WaitForJobSuccess(t, kc, "stream", testNamespace, 60, 3),
Expand Down Expand Up @@ -155,6 +189,27 @@ func installClusterWithJetStream(t *testing.T, kc *k8s.Clientset, noAdvertise bo
assert.NoErrorf(t, err, "cannot execute command - %s", err)
}

// installClusterWithJetStreaV2_10 install the nats helm chart with clustered jetstream enabled using v2.10
func installClusterWithJetStreaV2_10(t *testing.T, kc *k8s.Clientset) {
CreateNamespace(t, kc, natsNamespace)
_, err := ExecuteCommand(fmt.Sprintf("helm repo add %s %s", nats.NatsJetStreamName, natsHelmRepo))
assert.NoErrorf(t, err, "cannot execute command - %s", err)
_, err = ExecuteCommand("helm repo update")
assert.NoErrorf(t, err, "cannot execute command - %s", err)
_, err = ExecuteCommand(fmt.Sprintf(`helm upgrade --install --version %s --set %s --set %s --set %s --set %s --set %s --set %s --set %s --wait --namespace %s %s nats/nats`,
nats.Natsv2_10JetStreamChartVersion,
"config.jetstream.enabled=true",
"config.jetstream.fileStorage.enabled=false",
"config.jetstream.memoryStore.enabled=true",
"config.cluster.enabled=true",
"service.enabled=true",
"service.ports.monitor.enabled=true",
fmt.Sprintf("config.cluster.replicas=%d", natsServerReplicas),
natsNamespace,
nats.NatsJetStreamName))
assert.NoErrorf(t, err, "cannot execute command - %s", err)
}

// removeClusterWithJetStream uninstall the nats helm chart
func removeClusterWithJetStream(t *testing.T) {
_, err := ExecuteCommand(fmt.Sprintf(`helm uninstall --wait --namespace %s %s`, natsNamespace, nats.NatsJetStreamName))
Expand Down

0 comments on commit 95af467

Please sign in to comment.