Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ST] Kafka Roller: enable pod pending test for kraft controller #9584

Merged
merged 4 commits into from
Feb 2, 2024
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import io.fabric8.kubernetes.api.model.ConfigMapKeySelectorBuilder;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.strimzi.api.kafka.model.common.ProbeBuilder;
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetricsBuilder;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.operator.common.Annotations;
import io.strimzi.systemtest.AbstractST;
Expand All @@ -34,6 +38,7 @@
import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates;
import io.strimzi.systemtest.templates.crd.KafkaUserTemplates;
Expand Down Expand Up @@ -72,6 +77,8 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

@Tag(REGRESSION)
@Tag(INTERNAL_CLIENTS_USED)
Expand All @@ -80,33 +87,50 @@ class RollingUpdateST extends AbstractST {
private static final Logger LOGGER = LogManager.getLogger(RollingUpdateST.class);
private static final Pattern ZK_SERVER_STATE = Pattern.compile("zk_server_state\\s+(leader|follower)");

/**
* @description This test case checks recover during Kafka Rolling Update in Zookeeper based Kafka cluster.
*
* @steps
* 1. - Deploy Kafka Cluster with 3 replicas
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
* 2. - Deploy Kafka producer and send messages targeting created KafkaTopic
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
* 3. - Modify Zookeeper to unreasonable CPU request causing Rolling Update
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
* - One of Zookeeper Pods is in Pending state
* 4. - Consume messages from KafkaTopic created previously
* 5. - Modify Zookeeper to reasonable CPU request
* - Zookeeper Pods are rolled including previously pending Pod.
* 6. - Modify Kafka to unreasonable CPU request causing Rolling Update
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
* - One of Kafka Pods is in Pending state
* 7. - Consume messages from KafkaTopic created previously
* 8. - Modify Kafka to reasonable CPU request
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
* - Pods are rolled including previously pending Pod.
* 9. - Create mew KafkaTopic and transmit messages using this topic
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
* - Topic is created and messages transmitted, verifying both Zookeeper and Kafka works
*
* @usecase
* - kafka
* - zookeeper
* - rolling-update
*/
@ParallelNamespaceTest
@Tag(ROLLING_UPDATE)
@KRaftNotSupported("ZooKeeper is not supported by KRaft mode and is used in this test case")
void testRecoveryDuringZookeeperRollingUpdate(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext, Environment.TEST_SUITE_NAMESPACE);
void testRecoveryDuringZookeeperBasedRollingUpdate(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext);

resourceManager.createResourceWithWait(extensionContext,
KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 3, 3).build(),
KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 2, 2, testStorage.getNamespaceName()).build(),
KafkaUserTemplates.tlsUser(testStorage).build()
KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 3).build(),
KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 2, 2, testStorage.getNamespaceName()).build()
);

KafkaClients clients = new KafkaClientsBuilder()
.withProducerName(testStorage.getProducerName())
.withConsumerName(testStorage.getConsumerName())
.withBootstrapAddress(KafkaResources.tlsBootstrapAddress(testStorage.getClusterName()))
.withNamespaceName(testStorage.getNamespaceName())
.withTopicName(testStorage.getTopicName())
.withMessageCount(testStorage.getMessageCount())
.withUsername(testStorage.getUsername())
KafkaClients clients = ClientUtils.getDefaultClientBuilder(testStorage)
.build();

resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getClusterName()));
resourceManager.createResourceWithWait(extensionContext, clients.producerStrimzi());
ClientUtils.waitForProducerClientSuccess(testStorage);

LOGGER.info("Update resources for Pods");
// zookeeper recovery

LOGGER.info("Update resources for Pods");
KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> {
k.getSpec()
.getZookeeper()
Expand All @@ -115,8 +139,8 @@ void testRecoveryDuringZookeeperRollingUpdate(ExtensionContext extensionContext)
.build());
}, testStorage.getNamespaceName());

resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getClusterName()));
ClientUtils.waitForProducerClientSuccess(testStorage);
resourceManager.createResourceWithWait(extensionContext, clients.consumerStrimzi());
ClientUtils.waitForConsumerClientSuccess(testStorage);

PodUtils.waitForPendingPod(testStorage.getNamespaceName(), testStorage.getZookeeperStatefulSetName());
LOGGER.info("Verifying stability of ZooKeeper Pods except the one, which is in pending phase");
Expand All @@ -139,9 +163,36 @@ void testRecoveryDuringZookeeperRollingUpdate(ExtensionContext extensionContext)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.consumerTlsStrimzi(testStorage.getClusterName()));
resourceManager.createResourceWithWait(extensionContext, clients.consumerStrimzi());
ClientUtils.waitForConsumerClientSuccess(testStorage);

// Kafka recovery

KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> {
k.getSpec()
.getKafka()
.setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("100000m"))
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
.build());
}, testStorage.getNamespaceName());

PodUtils.waitForPendingPod(testStorage.getNamespaceName(), testStorage.getKafkaStatefulSetName());

clients = new KafkaClientsBuilder(clients)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.consumerStrimzi());
ClientUtils.waitForConsumerClientSuccess(testStorage);

KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> {
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
k.getSpec()
.getKafka()
.setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("200m"))
.build());
}, testStorage.getNamespaceName());

// Create new topic to ensure, that ZK is working properly
String newTopicName = KafkaTopicUtils.generateRandomNameOfTopic();

Expand All @@ -152,109 +203,72 @@ void testRecoveryDuringZookeeperRollingUpdate(ExtensionContext extensionContext)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getClusterName()), clients.consumerTlsStrimzi(testStorage.getClusterName()));
resourceManager.createResourceWithWait(extensionContext, clients.producerStrimzi(), clients.consumerStrimzi());
ClientUtils.waitForClientsSuccess(testStorage);
}

/**
* @description This test case checks recover during Kafka Rolling Update in KRaft based Kafka cluster.
*
* @steps
* 1. - Deploy Kafka Cluster with 2 KafkaNodePools,first one with role broker second with role controller
* 2. - Deploy Kafka producer and send messages targeting created KafkaTopic
* 3. - Modify controller KafkaNodePool to unreasonable CPU request causing Rolling Update
* - One of controller KafkaNodePool Pods is in Pending state
* 4. - Modify controller KafkaNodePool to som reasonable CPU request
* - Pods are rolled including previously pending Pod.
* 5. - Modify broker KafkaNodePool to unreasonable CPU request causing Rolling Update
* - One of broker KafkaNodePool Pods is in Pending state
* 6. - Modify broker KafkaNodePool to som reasonable CPU request
* - Pods are rolled including previously pending Pod.
* 7. - Consume messages from KafkaTopic created previously
*
* @usecase
* - kafka
* - kraft
* - rolling-update
*/
@ParallelNamespaceTest
@Tag(ROLLING_UPDATE)
void testRecoveryDuringKafkaRollingUpdate(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext, Environment.TEST_SUITE_NAMESPACE);
void testRecoveryDuringKRaftRollingUpdate(ExtensionContext extensionContext) {
assumeTrue(Environment.isKRaftModeEnabled());
assumeFalse(Environment.isOlmInstall() || Environment.isHelmInstall());

// kafka with 1 knp broker and 1 knp controller
final TestStorage testStorage = new TestStorage(extensionContext);
final String brokerPoolName = testStorage.getKafkaNodePoolName() + "-b";
final String controllerPoolName = testStorage.getKafkaNodePoolName() + "-c";

final Kafka kafka = KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 1, 1).build();
final KafkaNodePool brokerPool = KafkaNodePoolTemplates.kafkaBasedNodePoolWithBrokerRole(brokerPoolName, kafka, 3).build();
final KafkaNodePool controllerPool = KafkaNodePoolTemplates.kafkaBasedNodePoolWithControllerRole(controllerPoolName, kafka, 3).build();

final LabelSelector brokerPoolSelector = KafkaNodePoolResource.getLabelSelector(testStorage.getClusterName(), brokerPoolName, ProcessRoles.BROKER);
final LabelSelector controllerPoolSelector = KafkaNodePoolResource.getLabelSelector(testStorage.getClusterName(), controllerPoolName, ProcessRoles.CONTROLLER);

resourceManager.createResourceWithWait(extensionContext,
KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 3, 3).build(),
KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 2, 2, testStorage.getNamespaceName()).build(),
KafkaUserTemplates.tlsUser(testStorage).build()
controllerPool, brokerPool, kafka,
KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 2, 2, testStorage.getNamespaceName()).build()
);

KafkaClients clients = new KafkaClientsBuilder()
.withProducerName(testStorage.getProducerName())
.withConsumerName(testStorage.getConsumerName())
.withBootstrapAddress(KafkaResources.tlsBootstrapAddress(testStorage.getClusterName()))
.withNamespaceName(testStorage.getNamespaceName())
.withTopicName(testStorage.getTopicName())
.withMessageCount(testStorage.getMessageCount())
.withUsername(testStorage.getUsername())
.build();
KafkaClients clients = ClientUtils.getDefaultClientBuilder(testStorage).build();

resourceManager.createResourceWithWait(extensionContext,
clients.producerTlsStrimzi(testStorage.getClusterName()),
clients.consumerTlsStrimzi(testStorage.getClusterName())
clients.producerStrimzi(), clients.consumerStrimzi()
);
ClientUtils.waitForClientsSuccess(testStorage);

LOGGER.info("Update resources for Pods");
// change controller knp to unreasonable CPU request causing trigger of Rolling update and recover by second modification
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
modifyNodePoolToUnscheduledAndRecover(controllerPoolName, controllerPoolSelector, testStorage);

if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(testStorage.getKafkaNodePoolName(), knp ->
knp.getSpec().setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("100000m"))
.build()), testStorage.getNamespaceName());
} else {
KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> {
k.getSpec()
.getKafka()
.setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("100000m"))
.build());
}, testStorage.getNamespaceName());
}

PodUtils.waitForPendingPod(testStorage.getNamespaceName(), testStorage.getKafkaStatefulSetName());
// change broker knp to unreasonable CPU request causing trigger of Rolling update
modifyNodePoolToUnscheduledAndRecover(brokerPoolName, brokerPoolSelector, testStorage);

clients = new KafkaClientsBuilder(clients)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.consumerTlsStrimzi(testStorage.getClusterName()));
resourceManager.createResourceWithWait(extensionContext, clients.consumerStrimzi());
ClientUtils.waitForConsumerClientSuccess(testStorage);

LOGGER.info("Verifying stability of Kafka Pods except the one, which is in pending phase");
PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), testStorage.getKafkaStatefulSetName());

if (!Environment.isKRaftModeEnabled()) {
LOGGER.info("Verifying stability of ZooKeeper Pods");
PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), testStorage.getZookeeperStatefulSetName());
}

if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(testStorage.getKafkaNodePoolName(), knp ->
knp.getSpec().setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("200m"))
.build()), testStorage.getNamespaceName());
} else {
KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> {
k.getSpec()
.getKafka()
.setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("200m"))
.build());
}, testStorage.getNamespaceName());
}

// This might need to wait for the previous reconciliation to timeout and for the KafkaRoller to timeout.
// Therefore we use longer timeout.
RollingUpdateUtils.waitForComponentAndPodsReady(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), 3);

clients = new KafkaClientsBuilder(clients)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.consumerTlsStrimzi(testStorage.getClusterName()));
ClientUtils.waitForConsumerClientSuccess(testStorage);

// Create new topic to ensure, that ZK is working properly
String newTopicName = KafkaTopicUtils.generateRandomNameOfTopic();

resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(testStorage.getClusterName(), newTopicName, testStorage.getNamespaceName()).build());

clients = new KafkaClientsBuilder(clients)
.withTopicName(newTopicName)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getClusterName()), clients.consumerTlsStrimzi(testStorage.getClusterName()));
ClientUtils.waitForClientsSuccess(testStorage);
}

/**
Expand Down Expand Up @@ -862,6 +876,40 @@ void testMetricsChange(ExtensionContext extensionContext) throws JsonProcessingE
zkCollector.collectMetricsFromPodsWithoutWait().values().forEach(value -> assertThat(value, is("")));
}

/**
* Modifies a Kafka node pool to have an unreasonable CPU request, triggering a rolling update,
* and then recovers it to a normal state. CPU request is firstly increased, causing single pod
* to enter a pending state. Afterward wait for the pod to stabilize before reducing the CPU
* request back to a reasonable amount, allowing the node pool to recover.
*/
private static void modifyNodePoolToUnscheduledAndRecover(String controllerPoolName, LabelSelector controllerPoolSelector, TestStorage testStorage) {
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
// change knp to unreasonable CPU request causing trigger of Rolling update
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(controllerPoolName,
knp -> {
knp
.getSpec()
.setResources(
new ResourceRequirements(null, null, Map.of("cpu", new Quantity("100000m")))
);
},
testStorage.getNamespaceName());

PodUtils.waitForPendingPod(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(testStorage.getClusterName(), controllerPoolName));
LOGGER.info("Verifying stability of {}/{} Pods except the one, which is in pending phase", controllerPoolName, testStorage.getNamespaceName());
PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(testStorage.getClusterName(), controllerPoolName));

KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(controllerPoolName,
knp -> {
knp
.getSpec()
.setResources(
new ResourceRequirements(null, null, Map.of("cpu", new Quantity("100m")))
);
},
testStorage.getNamespaceName());
RollingUpdateUtils.waitForComponentAndPodsReady(testStorage.getNamespaceName(), controllerPoolSelector, 3);
}

@BeforeAll
void setup(ExtensionContext extensionContext) {
this.clusterOperator = this.clusterOperator
Expand Down