diff --git a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java index ffea521678..b124833944 100644 --- a/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java +++ b/systemtest/src/main/java/io/strimzi/systemtest/utils/kafkaUtils/KafkaUtils.java @@ -101,7 +101,7 @@ public static void waitUntilKafkaStatusConditionContainsMessage(String clusterNa List conditions = KafkaResource.kafkaClient().inNamespace(namespace).withName(clusterName).get().getStatus().getConditions(); for (Condition condition : conditions) { String conditionMessage = condition.getMessage(); - if (conditionMessage.matches(pattern)) { + if (conditionMessage != null && conditionMessage.matches(pattern)) { return true; } } diff --git a/systemtest/src/test/java/io/strimzi/systemtest/rollingupdate/KafkaRollerST.java b/systemtest/src/test/java/io/strimzi/systemtest/rollingupdate/KafkaRollerST.java index 3704d63877..47875b12fe 100644 --- a/systemtest/src/test/java/io/strimzi/systemtest/rollingupdate/KafkaRollerST.java +++ b/systemtest/src/test/java/io/strimzi/systemtest/rollingupdate/KafkaRollerST.java @@ -15,6 +15,7 @@ import io.fabric8.kubernetes.api.model.NodeSelectorTermBuilder; import io.fabric8.kubernetes.api.model.Pod; import io.strimzi.api.kafka.model.KafkaResources; +import io.strimzi.api.kafka.model.KafkaTopic; import io.strimzi.api.kafka.model.StrimziPodSet; import io.strimzi.api.kafka.model.template.KafkaClusterTemplate; import io.strimzi.api.kafka.model.template.KafkaClusterTemplateBuilder; @@ -56,6 +57,7 @@ import static io.strimzi.systemtest.k8s.Events.Scheduled; import static io.strimzi.systemtest.k8s.Events.Started; import static io.strimzi.systemtest.matchers.Matchers.hasAllOfReasons; +import static io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils.waitForPodsReady; import static io.strimzi.test.k8s.KubeClusterResource.kubeClient; import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.CoreMatchers.not; @@ -71,80 +73,56 @@ public class KafkaRollerST extends AbstractST { @ParallelNamespaceTest @KRaftWithoutUTONotSupported - void testKafkaRollsWhenTopicIsUnderReplicated(ExtensionContext extensionContext) { + void testKafkaDoesNotRollsWhenTopicIsUnderReplicated(ExtensionContext extensionContext) { final TestStorage testStorage = storageMap.get(extensionContext); - final String namespaceName = StUtils.getNamespaceBasedOnRbac(Environment.TEST_SUITE_NAMESPACE, extensionContext); - final String clusterName = testStorage.getClusterName(); - final String topicName = testStorage.getTopicName(); - final String kafkaStsName = KafkaResources.kafkaStatefulSetName(clusterName); - final LabelSelector kafkaSelector = KafkaResource.getLabelSelector(clusterName, kafkaStsName); - Instant startTime = Instant.now(); - // We need to start with 3 replicas / brokers, - // so that KafkaStreamsTopicStore topic gets set/distributed on this first 3 [0, 1, 2], - // since this topic has replication-factor 3 and minISR 2. - - // We have disabled the broker scale down check for now since the test fails at the moment - // due to partition replicas being present on the broker during scale down. We can enable this check - // once the issue is resolved - // https://github.com/strimzi/strimzi-kafka-operator/issues/9134 - resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaPersistent(clusterName, 3) - .editMetadata() - .addToAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_SKIP_BROKER_SCALEDOWN_CHECK, "true")) - .endMetadata() - .editSpec() - .editKafka() - .addToConfig("auto.create.topics.enable", "false") - .endKafka() - .endSpec() - .build()); + final int initialBrokerReplicaCount = 3; + final int scaledUpBrokerReplicaCount = 4; - LOGGER.info("Running kafkaScaleUpScaleDown {}", clusterName); - final int initialReplicas = kubeClient(namespaceName).listPods(kafkaSelector).size(); - assertEquals(3, initialReplicas); + resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), initialBrokerReplicaCount).build()); - // Now that KafkaStreamsTopicStore topic is set on the first 3 brokers, lets spin-up another one. - int scaledUpReplicas = 4; + LOGGER.info("Verify expected number of replicas '{}' is present in in Kafka Cluster: {}/{}", initialBrokerReplicaCount, testStorage.getNamespaceName(), testStorage.getClusterName()); + final int observedReplicas = kubeClient(testStorage.getNamespaceName()).listPods(testStorage.getKafkaSelector()).size(); + assertEquals(initialBrokerReplicaCount, observedReplicas); + LOGGER.info("Scale Kafka up from 3 to 4 brokers"); if (Environment.isKafkaNodePoolsEnabled()) { - KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(KafkaResource.getNodePoolName(clusterName), knp -> knp.getSpec().setReplicas(scaledUpReplicas), namespaceName); + KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(KafkaResource.getNodePoolName(testStorage.getClusterName()), knp -> knp.getSpec().setReplicas(scaledUpBrokerReplicaCount), testStorage.getNamespaceName()); } else { - KafkaResource.replaceKafkaResourceInSpecificNamespace(clusterName, k -> k.getSpec().getKafka().setReplicas(scaledUpReplicas), namespaceName); + KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> k.getSpec().getKafka().setReplicas(scaledUpBrokerReplicaCount), testStorage.getNamespaceName()); } + RollingUpdateUtils.waitForComponentScaleUpOrDown(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), scaledUpBrokerReplicaCount); - RollingUpdateUtils.waitForComponentScaleUpOrDown(namespaceName, kafkaSelector, scaledUpReplicas); - - resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(clusterName, topicName, 4, 4, 4, namespaceName).build()); + LOGGER.info("Create kafkaTopic: {}/{} with replica on each broker", testStorage.getNamespaceName(), testStorage.getTopicName()); + KafkaTopic kafkaTopic = KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 4, 4, 4, testStorage.getNamespaceName()).build(); + resourceManager.createResourceWithWait(extensionContext, kafkaTopic); //Test that the new pod does not have errors or failures in events - String uid = kubeClient(namespaceName).getPodUid(KafkaResource.getKafkaPodName(clusterName, 3)); - List events = kubeClient(namespaceName).listEventsByResourceUid(uid); + String uid = kubeClient(testStorage.getNamespaceName()).getPodUid(KafkaResource.getKafkaPodName(testStorage.getClusterName(), 3)); + List events = kubeClient(testStorage.getNamespaceName()).listEventsByResourceUid(uid); assertThat(events, hasAllOfReasons(Scheduled, Pulled, Created, Started)); - // TODO scaling down of Kafka Cluster (with 4 replicas which has KafkaTopic with 4 replicas) can be forbidden in a future due to would-be Topic under-replication (https://github.com/strimzi/strimzi-kafka-operator/pull/9042) - // scale down - final int scaledDownReplicas = 3; - LOGGER.info("Scaling down to {}", scaledDownReplicas); - + LOGGER.info("Scaling down to {}", initialBrokerReplicaCount); if (Environment.isKafkaNodePoolsEnabled()) { - KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(KafkaResource.getNodePoolName(clusterName), knp -> knp.getSpec().setReplicas(scaledDownReplicas), namespaceName); + KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(KafkaResource.getNodePoolName(testStorage.getClusterName()), knp -> knp.getSpec().setReplicas(initialBrokerReplicaCount), testStorage.getNamespaceName()); } else { - KafkaResource.replaceKafkaResourceInSpecificNamespace(clusterName, k -> k.getSpec().getKafka().setReplicas(scaledDownReplicas), namespaceName); + KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> k.getSpec().getKafka().setReplicas(initialBrokerReplicaCount), testStorage.getNamespaceName()); } - Map kafkaPods = RollingUpdateUtils.waitForComponentScaleUpOrDown(namespaceName, kafkaSelector, scaledDownReplicas); + kubeClient().listPodsByPrefixInName(Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()).size(); + LOGGER.info("Waiting for warning regarding preventing Kafka from scaling down when the broker to be scaled down have some partitions"); + KafkaUtils.waitUntilKafkaStatusConditionContainsMessage(testStorage.getClusterName(), testStorage.getNamespaceName(), "Cannot scale down broker.*"); + waitForPodsReady(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), scaledUpBrokerReplicaCount, false); - PodUtils.verifyThatRunningPodsAreStable(namespaceName, clusterName); - - // set annotation to trigger Kafka rolling update - StrimziPodSetUtils.annotateStrimziPodSet(namespaceName, KafkaResource.getStrimziPodSetName(clusterName), Collections.singletonMap(Annotations.ANNO_STRIMZI_IO_MANUAL_ROLLING_UPDATE, "true")); + LOGGER.info("Remove Topic, thereby remove all partitions located on broker to be scaled down"); + resourceManager.deleteResource(kafkaTopic); + RollingUpdateUtils.waitForComponentScaleUpOrDown(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), initialBrokerReplicaCount); - RollingUpdateUtils.waitTillComponentHasRolled(namespaceName, kafkaSelector, scaledDownReplicas, kafkaPods); //Test that CO doesn't have any exceptions in log Instant endTime = Instant.now(); long duration = Duration.between(startTime, endTime).toSeconds(); - assertNoCoErrorsLogged(namespaceName, duration); + assertNoCoErrorsLogged(testStorage.getNamespaceName(), duration); } @ParallelNamespaceTest