Skip to content

Commit

Permalink
[ST] Node Pools role changes. Role change and scale down prevention i…
Browse files Browse the repository at this point in the history
…f replicas present. (#9693)

Signed-off-by: hzrncik <hzrncik@redhat.com>
  • Loading branch information
henryZrncik committed Feb 23, 2024
1 parent 6975ee3 commit c58e67c
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ public static Pod mapToPod(Map<String, Object> map) {
}

public static void annotateStrimziPodSet(String namespaceName, String resourceName, Map<String, String> annotations) {
LOGGER.info("Annotating StrimziPodSet {}/{} with annotations: {}", namespaceName, resourceName, annotations);
StrimziPodSetResource.replaceStrimziPodSetInSpecificNamespace(resourceName,
strimziPodSet -> strimziPodSet.getMetadata().setAnnotations(annotations), namespaceName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,36 +447,30 @@ void testCruiseControlIntraBrokerBalancingWithoutSpecifyingJBODStorage() {
}

@IsolatedTest
@KRaftNotSupported("Scale-up / scale-down not working in KRaft mode - https://github.com/strimzi/strimzi-kafka-operator/issues/6862")
void testCruiseControlDuringBrokerScaleUpAndDown() {
TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());
TestStorage testStorage = new TestStorage(ResourceManager.getTestContext(), TestConstants.CO_NAMESPACE);
final int initialReplicas = 3;
final int scaleTo = 5;

resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPool(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), initialReplicas).build(),
KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), initialReplicas).build()
)
);
resourceManager.createResourceWithWait(
KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), initialReplicas, initialReplicas)
// if we run test in kraft mode test makes sense only with separated roles
KafkaNodePoolTemplates.brokerPool(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), initialReplicas).build(),
KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), initialReplicas)
.editOrNewMetadata()
.withNamespace(Environment.TEST_SUITE_NAMESPACE)
// controllers have Ids set in order to keep default ordering for brokers only (once we scale broker KNP)
.withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NEXT_NODE_IDS, "[100-103]"))
.endMetadata()
.build(),
.build()
);
resourceManager.createResourceWithWait(
KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), initialReplicas, initialReplicas).build(),
KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 10, 3, testStorage.getNamespaceName()).build(),
ScraperTemplates.scraperPod(testStorage.getNamespaceName(), testStorage.getScraperName()).build()
);

String scraperPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), testStorage.getScraperName()).get(0).getMetadata().getName();

LOGGER.info("Checking that Topic: {} has replicas on first 3 brokers", testStorage.getTopicName());
List<String> topicReplicas = KafkaTopicUtils.getKafkaTopicReplicasForEachPartition(testStorage.getNamespaceName(), testStorage.getTopicName(), scraperPodName, KafkaResources.plainBootstrapAddress(testStorage.getClusterName()));
assertEquals(0, (int) topicReplicas.stream().filter(line -> line.contains("3") || line.contains("4")).count());
final String scraperPodName = kubeClient().listPodsByPrefixInName(testStorage.getNamespaceName(), testStorage.getScraperName()).get(0).getMetadata().getName();

LOGGER.info("Scaling Kafka up to {}", scaleTo);

if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(testStorage.getBrokerPoolName(), knp ->
knp.getSpec().setReplicas(scaleTo), testStorage.getNamespaceName());
Expand All @@ -491,9 +485,6 @@ void testCruiseControlDuringBrokerScaleUpAndDown() {
// when using add_brokers mode, we can hit `ProposalReady` right after KR creation - that's why `waitReady` is set to `false` here
resourceManager.createResourceWithoutWait(
KafkaRebalanceTemplates.kafkaRebalance(testStorage.getClusterName())
.editOrNewMetadata()
.withNamespace(Environment.TEST_SUITE_NAMESPACE)
.endMetadata()
.editOrNewSpec()
.withMode(KafkaRebalanceMode.ADD_BROKERS)
.withBrokers(3, 4)
Expand All @@ -506,17 +497,14 @@ void testCruiseControlDuringBrokerScaleUpAndDown() {
KafkaRebalanceResource.kafkaRebalanceClient().inNamespace(testStorage.getNamespaceName()).withName(testStorage.getClusterName()).delete();

LOGGER.info("Checking that Topic: {} has replicas on one of the new brokers (or both)", testStorage.getTopicName());
topicReplicas = KafkaTopicUtils.getKafkaTopicReplicasForEachPartition(testStorage.getNamespaceName(), testStorage.getTopicName(), scraperPodName, KafkaResources.plainBootstrapAddress(testStorage.getClusterName()));
List<String> topicReplicas = KafkaTopicUtils.getKafkaTopicReplicasForEachPartition(testStorage.getNamespaceName(), testStorage.getTopicName(), scraperPodName, KafkaResources.plainBootstrapAddress(testStorage.getClusterName()));
assertTrue(topicReplicas.stream().anyMatch(line -> line.contains("3") || line.contains("4")));

LOGGER.info("Creating KafkaRebalance with remove_brokers mode - it needs to be done before actual scaling down of Kafka Pods");

// when using remove_brokers mode, we can hit `ProposalReady` right after KR creation - that's why `waitReady` is set to `false` here
resourceManager.createResourceWithoutWait(
KafkaRebalanceTemplates.kafkaRebalance(testStorage.getClusterName())
.editOrNewMetadata()
.withNamespace(Environment.TEST_SUITE_NAMESPACE)
.endMetadata()
.editOrNewSpec()
.withMode(KafkaRebalanceMode.REMOVE_BROKERS)
.withBrokers(3, 4)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
*/
package io.strimzi.systemtest.kafka;


import io.fabric8.kubernetes.api.model.LabelSelector;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.operator.common.Annotations;
Expand All @@ -13,14 +15,18 @@
import io.strimzi.systemtest.kafkaclients.internalClients.KafkaClients;
import io.strimzi.systemtest.resources.NodePoolsConverter;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates;
import io.strimzi.systemtest.utils.ClientUtils;
import io.strimzi.systemtest.utils.RollingUpdateUtils;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaNodePoolUtils;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaTopicUtils;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils;
import io.strimzi.systemtest.utils.kubeUtils.controllers.StrimziPodSetUtils;
import io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -29,6 +35,7 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Random;

Expand All @@ -38,6 +45,7 @@
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;


@Tag(REGRESSION)
public class KafkaNodePoolST extends AbstractST {
private static final Logger LOGGER = LogManager.getLogger(KafkaNodePoolST.class);
Expand Down Expand Up @@ -151,6 +159,91 @@ void testKafkaNodePoolBrokerIdsManagementUsingAnnotations() {
KafkaNodePoolUtils.getCurrentKafkaNodePoolIds(testStorage.getNamespaceName(), nodePoolNameB).equals(Arrays.asList(0, 2, 3, 5)));
}

/**
* @description This test case verifies changing of roles in Kafka Node Pools.
*
* @steps
* 1. - Deploy a Kafka instance with annotations to manage Node Pools and Initial 2 NodePools, both with mixed role, first one stable, second one which will be modified.
* 2. - Create KafkaTopic with replica number requiring all Kafka Brokers to be present.
* 3. - Annotate one of Node Pools to perform manual Rolling Update.
* - Rolling Update started.
* 3. - Change role of Kafka Node Pool from mixed to controller only role.
* - Role Change is being prevented because a previously created KafkaTopic still has some replicas present on the node to be scaled down, also there is original Rolling Update going on.
* 4. - Original Rolling Update finishes successfully.
* 5. - Delete previously created KafkaTopic.
* - KafkaTopic is deleted, and roll of Node Pool whose role was changed begins resulting in new nodes with expected role.
* 6. - Change role of Kafka Node Pool from controller only to mixed role.
* - Kafka Node Pool changes role to mixed role.
* 7. - Produce and consume messages on newly created KafkaTopic with replica count requiring also new brokers to be present.
*
* @usecase
* - kafka-node-pool
*/
@ParallelNamespaceTest
void testNodePoolsRolesChanging() {
assumeTrue(Environment.isKRaftModeEnabled());
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());

// volatile KNP which will be transitioned from mixed to -> controller only role and afterward to mixed role again
final String volatileRolePoolName = testStorage.getMixedPoolName() + "-volatile";
final String volatileSPSComponentName = KafkaResource.getStrimziPodSetName(testStorage.getClusterName(), volatileRolePoolName);
final LabelSelector volatilePoolLabelSelector = KafkaNodePoolResource.getLabelSelector(testStorage.getClusterName(), volatileRolePoolName, ProcessRoles.CONTROLLER);


// Stable Node Pool for purpose of having at least 3 brokers and 3 controllers all the time.
resourceManager.createResourceWithWait(
NodePoolsConverter.convertNodePoolsIfNeeded(
KafkaNodePoolTemplates.brokerPool(testStorage.getNamespaceName(), testStorage.getBrokerPoolName(), testStorage.getClusterName(), 3).build(),
KafkaNodePoolTemplates.controllerPool(testStorage.getNamespaceName(), testStorage.getControllerPoolName(), testStorage.getClusterName(), 3).build()
)
);

resourceManager.createResourceWithWait(
KafkaNodePoolTemplates.mixedPoolPersistentStorage(testStorage.getNamespaceName(), volatileRolePoolName, testStorage.getClusterName(), 3).build(),
KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 1, 1).build()
);

LOGGER.info("Create KafkaTopic {}/{} with 6 replicas, spawning across all brokers", testStorage.getNamespaceName(), testStorage.getTopicName());
final KafkaTopic kafkaTopic = KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 1, 6, testStorage.getNamespaceName()).build();
resourceManager.createResourceWithWait(kafkaTopic);

LOGGER.info("wait for Kafka pods stability");
PodUtils.waitUntilPodStabilityReplicasCount(testStorage.getNamespaceName(), volatileSPSComponentName, 3);

LOGGER.info("Start rolling update");
Map<String, String> volatilePoolPodsSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), volatilePoolLabelSelector);
StrimziPodSetUtils.annotateStrimziPodSet(testStorage.getNamespaceName(), volatileSPSComponentName, Collections.singletonMap(Annotations.ANNO_STRIMZI_IO_MANUAL_ROLLING_UPDATE, "true"));
RollingUpdateUtils.waitTillComponentHasStartedRolling(testStorage.getNamespaceName(), volatilePoolLabelSelector, volatilePoolPodsSnapshot);

LOGGER.info("Change role in {}/{}, from mixed to broker only resulting in revert", testStorage.getNamespaceName(), volatileRolePoolName);
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(volatileRolePoolName, knp -> {
knp.getSpec().setRoles(List.of(ProcessRoles.CONTROLLER));
}, testStorage.getNamespaceName());

LOGGER.info("Wait for warning message in Kafka {}/{}", testStorage.getNamespaceName(), testStorage.getClusterName());
KafkaUtils.waitUntilKafkaStatusConditionContainsMessage(testStorage.getClusterName(), testStorage.getNamespaceName(), ".*Reverting role change.*");

LOGGER.info("Wait for (original) Rolling Update to finish successfully");
volatilePoolPodsSnapshot = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), volatilePoolLabelSelector, 3, volatilePoolPodsSnapshot);

// remove topic which blocks role change (removal of broker role thus decreasing number of broker nodes available)
LOGGER.info("Delete Kafka Topic {}/{}", testStorage.getNamespaceName(), testStorage.getTopicName());
resourceManager.deleteResource(kafkaTopic);
KafkaTopicUtils.waitForKafkaTopicDeletion(testStorage.getNamespaceName(), testStorage.getTopicName());

// wait for final roll changing
LOGGER.info("Wait for roll that will change role of KNP from mixed role to broker {}/{}", testStorage.getNamespaceName(), volatileRolePoolName);
volatilePoolPodsSnapshot = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), volatilePoolLabelSelector, 3, volatilePoolPodsSnapshot);

LOGGER.info("Change role in {}/{}, from broker only to mixed", testStorage.getNamespaceName(), volatileRolePoolName);
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(volatileRolePoolName, knp -> {
knp.getSpec().setRoles(List.of(ProcessRoles.CONTROLLER, ProcessRoles.BROKER));
}, testStorage.getNamespaceName());
RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), volatilePoolLabelSelector, 3, volatilePoolPodsSnapshot);

transmitMessagesWithNewTopicAndClean(testStorage, 5);
}

/**
* @description This test case verifies possibility of adding and removing Kafka Node Pools into existing Kafka cluster.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import io.strimzi.operator.common.model.Labels;
import io.strimzi.systemtest.AbstractST;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.annotations.KRaftNotSupported;
import io.strimzi.systemtest.annotations.KRaftWithoutUTONotSupported;
import io.strimzi.systemtest.annotations.ParallelNamespaceTest;
import io.strimzi.systemtest.enums.CustomResourceStatus;
Expand Down Expand Up @@ -67,7 +66,6 @@ public class ReconciliationST extends AbstractST {
@ParallelNamespaceTest
@Tag(CONNECT)
@Tag(CONNECT_COMPONENTS)
@KRaftNotSupported("Probably bug - https://github.com/strimzi/strimzi-kafka-operator/issues/6862")
void testPauseReconciliationInKafkaAndKafkaConnectWithConnector() {
final TestStorage testStorage = new TestStorage(ResourceManager.getTestContext());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@
import static io.strimzi.systemtest.k8s.Events.Scheduled;
import static io.strimzi.systemtest.k8s.Events.Started;
import static io.strimzi.systemtest.matchers.Matchers.hasAllOfReasons;
import static io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils.waitForPodsReady;
import static io.strimzi.test.k8s.KubeClusterResource.kubeClient;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
Expand Down Expand Up @@ -170,7 +169,12 @@ void testKafkaDoesNotRollsWhenTopicIsUnderReplicated() {

LOGGER.info("Scale-down should have been reverted and the cluster should be still Ready");
KafkaUtils.waitForKafkaReady(testStorage.getNamespaceName(), testStorage.getClusterName());
waitForPodsReady(testStorage.getNamespaceName(), testStorage.getBrokerSelector(), scaledUpBrokerReplicaCount, false);
KafkaUtils.waitUntilKafkaStatusConditionContainsMessage(testStorage.getClusterName(), testStorage.getNamespaceName(), ".*Reverting scale-down.*");

// try to perform rolling update while scale down is being prevented.
Map<String, String> kafkaPods = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getBrokerSelector());
StrimziPodSetUtils.annotateStrimziPodSet(testStorage.getNamespaceName(), testStorage.getBrokerComponentName(), Collections.singletonMap(Annotations.ANNO_STRIMZI_IO_MANUAL_ROLLING_UPDATE, "true"));
kafkaPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), testStorage.getBrokerSelector(), scaledUpBrokerReplicaCount, kafkaPods);

LOGGER.info("Remove Topic, thereby remove all partitions located on broker to be scaled down");
resourceManager.deleteResource(kafkaTopicWith4Replicas);
Expand Down

0 comments on commit c58e67c

Please sign in to comment.