Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ST] Kafka Roller: enable pod pending test for kraft controller #9584

Merged
merged 4 commits into from
Feb 2, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,15 @@
import io.fabric8.kubernetes.api.model.ConfigMapKeySelectorBuilder;
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.api.model.Quantity;
import io.fabric8.kubernetes.api.model.ResourceRequirements;
import io.fabric8.kubernetes.api.model.ResourceRequirementsBuilder;
import io.strimzi.api.kafka.model.common.ProbeBuilder;
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetrics;
import io.strimzi.api.kafka.model.common.metrics.JmxPrometheusExporterMetricsBuilder;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.operator.common.Annotations;
import io.strimzi.systemtest.AbstractST;
Expand All @@ -34,6 +38,7 @@
import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTemplates;
import io.strimzi.systemtest.templates.crd.KafkaTopicTemplates;
import io.strimzi.systemtest.templates.crd.KafkaUserTemplates;
Expand Down Expand Up @@ -72,6 +77,8 @@
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assumptions.assumeFalse;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

@Tag(REGRESSION)
@Tag(INTERNAL_CLIENTS_USED)
Expand All @@ -80,33 +87,51 @@ class RollingUpdateST extends AbstractST {
private static final Logger LOGGER = LogManager.getLogger(RollingUpdateST.class);
private static final Pattern ZK_SERVER_STATE = Pattern.compile("zk_server_state\\s+(leader|follower)");

/**
* @description This test case checks recover during Kafka Rolling Update in Zookeeper based Kafka cluster.
*
* @steps
* 1. - Deploy Kafka Cluster with 3 replicas
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
* 2. - Deploy Kafka producer and send messages targeting created KafkaTopic
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
* 3. - Modify Zookeeper to unreasonable CPU request causing Rolling Update
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
* - One of Zookeeper Pods is in Pending state
* 4. - Consume messages from KafkaTopic created previously
* 5. - Modify Zookeeper to reasonable CPU request
* - Zookeeper Pods are rolled including previously pending Pod
* 6. - Modify Kafka with an unreasonable CPU request to trigger a Rolling Update
* - One of Kafka Pods is in Pending state
* 7. - Consume messages from KafkaTopic created previously
* 8. - Modify Kafka to the reasonable CPU request
* - Pods are rolled including previously pending Pod
* 9. - Create a new KafkaTopic and transmit messages using this topic
* - Topic is created and messages transmitted, verifying both Zookeeper and Kafka works
*
* @usecase
* - kafka
* - zookeeper
* - rolling-update
*/
@ParallelNamespaceTest
@Tag(ROLLING_UPDATE)
@KRaftNotSupported("ZooKeeper is not supported by KRaft mode and is used in this test case")
void testRecoveryDuringZookeeperRollingUpdate(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext, Environment.TEST_SUITE_NAMESPACE);
void testRecoveryDuringZookeeperBasedRollingUpdate(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext);

resourceManager.createResourceWithWait(extensionContext,
KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 3, 3).build(),
KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 2, 2, testStorage.getNamespaceName()).build(),
KafkaUserTemplates.tlsUser(testStorage).build()
KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 3).build(),
KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 2, 2, testStorage.getNamespaceName()).build()
);

KafkaClients clients = new KafkaClientsBuilder()
.withProducerName(testStorage.getProducerName())
.withConsumerName(testStorage.getConsumerName())
.withBootstrapAddress(KafkaResources.tlsBootstrapAddress(testStorage.getClusterName()))
.withNamespaceName(testStorage.getNamespaceName())
.withTopicName(testStorage.getTopicName())
.withMessageCount(testStorage.getMessageCount())
.withUsername(testStorage.getUsername())
KafkaClients clients = ClientUtils.getDefaultClientBuilder(testStorage)
.build();

resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getClusterName()));
resourceManager.createResourceWithWait(extensionContext, clients.producerStrimzi());
ClientUtils.waitForProducerClientSuccess(testStorage);

LOGGER.info("Update resources for Pods");
// zookeeper recovery

LOGGER.info("Update resources for Pods");
// modify zookeeper resource to unreasonable value
KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> {
k.getSpec()
.getZookeeper()
Expand All @@ -115,8 +140,8 @@ void testRecoveryDuringZookeeperRollingUpdate(ExtensionContext extensionContext)
.build());
}, testStorage.getNamespaceName());

resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getClusterName()));
ClientUtils.waitForProducerClientSuccess(testStorage);
resourceManager.createResourceWithWait(extensionContext, clients.consumerStrimzi());
ClientUtils.waitForConsumerClientSuccess(testStorage);

PodUtils.waitForPendingPod(testStorage.getNamespaceName(), testStorage.getZookeeperStatefulSetName());
LOGGER.info("Verifying stability of ZooKeeper Pods except the one, which is in pending phase");
Expand All @@ -139,57 +164,20 @@ void testRecoveryDuringZookeeperRollingUpdate(ExtensionContext extensionContext)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.consumerTlsStrimzi(testStorage.getClusterName()));
resourceManager.createResourceWithWait(extensionContext, clients.consumerStrimzi());
ClientUtils.waitForConsumerClientSuccess(testStorage);

// Create new topic to ensure, that ZK is working properly
String newTopicName = KafkaTopicUtils.generateRandomNameOfTopic();

resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(testStorage.getClusterName(), newTopicName, testStorage.getNamespaceName()).build());

clients = new KafkaClientsBuilder(clients)
.withTopicName(newTopicName)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getClusterName()), clients.consumerTlsStrimzi(testStorage.getClusterName()));
ClientUtils.waitForClientsSuccess(testStorage);
}

@ParallelNamespaceTest
@Tag(ROLLING_UPDATE)
void testRecoveryDuringKafkaRollingUpdate(ExtensionContext extensionContext) {
final TestStorage testStorage = new TestStorage(extensionContext, Environment.TEST_SUITE_NAMESPACE);

resourceManager.createResourceWithWait(extensionContext,
KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 3, 3).build(),
KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 2, 2, testStorage.getNamespaceName()).build(),
KafkaUserTemplates.tlsUser(testStorage).build()
);

KafkaClients clients = new KafkaClientsBuilder()
.withProducerName(testStorage.getProducerName())
.withConsumerName(testStorage.getConsumerName())
.withBootstrapAddress(KafkaResources.tlsBootstrapAddress(testStorage.getClusterName()))
.withNamespaceName(testStorage.getNamespaceName())
.withTopicName(testStorage.getTopicName())
.withMessageCount(testStorage.getMessageCount())
.withUsername(testStorage.getUsername())
.build();

resourceManager.createResourceWithWait(extensionContext,
clients.producerTlsStrimzi(testStorage.getClusterName()),
clients.consumerTlsStrimzi(testStorage.getClusterName())
);
ClientUtils.waitForClientsSuccess(testStorage);

LOGGER.info("Update resources for Pods");
// Kafka recovery

// change kafka to unreasonable CPU request causing trigger of Rolling update and recover by second modification
// if kafka node pool is enabled change specification directly in KNP CR as changing it in kafka would have no impact in case it is already specified in KNP
if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(testStorage.getKafkaNodePoolName(), knp ->
knp.getSpec().setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("100000m"))
.build()), testStorage.getNamespaceName());
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(testStorage.getKafkaNodePoolName(), knp -> {
knp.getSpec()
.setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("100000m"))
.build());
}, testStorage.getNamespaceName());
} else {
KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> {
k.getSpec()
Expand All @@ -206,22 +194,18 @@ void testRecoveryDuringKafkaRollingUpdate(ExtensionContext extensionContext) {
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.consumerTlsStrimzi(testStorage.getClusterName()));
resourceManager.createResourceWithWait(extensionContext, clients.consumerStrimzi());
ClientUtils.waitForConsumerClientSuccess(testStorage);

LOGGER.info("Verifying stability of Kafka Pods except the one, which is in pending phase");
PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), testStorage.getKafkaStatefulSetName());

if (!Environment.isKRaftModeEnabled()) {
LOGGER.info("Verifying stability of ZooKeeper Pods");
PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), testStorage.getZookeeperStatefulSetName());
}

LOGGER.info("Recover Kafka {}/{} from pending state by modifying its resource request to realistic value", testStorage.getClusterName(), testStorage.getNamespaceName());
// if kafka node pool is enabled change specification directly in KNP CR as changing it in kafka would have no impact in case it is already specified in KNP
if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(testStorage.getKafkaNodePoolName(), knp ->
knp.getSpec().setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("200m"))
.build()), testStorage.getNamespaceName());
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(testStorage.getKafkaNodePoolName(), knp -> {
knp.getSpec()
.setResources(new ResourceRequirementsBuilder()
.addToRequests("cpu", new Quantity("200m"))
.build());
}, testStorage.getNamespaceName());
} else {
KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> {
k.getSpec()
Expand All @@ -232,17 +216,6 @@ void testRecoveryDuringKafkaRollingUpdate(ExtensionContext extensionContext) {
}, testStorage.getNamespaceName());
}

// This might need to wait for the previous reconciliation to timeout and for the KafkaRoller to timeout.
// Therefore we use longer timeout.
RollingUpdateUtils.waitForComponentAndPodsReady(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), 3);

clients = new KafkaClientsBuilder(clients)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.consumerTlsStrimzi(testStorage.getClusterName()));
ClientUtils.waitForConsumerClientSuccess(testStorage);

// Create new topic to ensure, that ZK is working properly
String newTopicName = KafkaTopicUtils.generateRandomNameOfTopic();

Expand All @@ -253,10 +226,74 @@ void testRecoveryDuringKafkaRollingUpdate(ExtensionContext extensionContext) {
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();

resourceManager.createResourceWithWait(extensionContext, clients.producerTlsStrimzi(testStorage.getClusterName()), clients.consumerTlsStrimzi(testStorage.getClusterName()));
resourceManager.createResourceWithWait(extensionContext, clients.producerStrimzi(), clients.consumerStrimzi());
ClientUtils.waitForClientsSuccess(testStorage);
}

/**
* @description This test case checks recover during Kafka Rolling Update in KRaft based Kafka cluster.
*
* @steps
* 1. - Deploy Kafka Cluster with 2 KafkaNodePools,first one with role broker second with role controller
* 2. - Deploy Kafka producer and send messages targeting created KafkaTopic
* 3. - Modify controller KafkaNodePool to unreasonable CPU request causing Rolling Update
* - One of controller KafkaNodePool Pods is in Pending state
* 4. - Modify controller KafkaNodePool to som reasonable CPU request
* - Pods are rolled including previously pending Pod
* 5. - Modify broker KafkaNodePool to unreasonable CPU request causing Rolling Update
* - One of broker KafkaNodePool Pods is in Pending state
* 6. - Modify broker KafkaNodePool to som reasonable CPU request
* - Pods are rolled including previously pending Pod
* 7. - Consume messages from KafkaTopic created previously
*
* @usecase
* - kafka
* - kraft
* - rolling-update
*/
@ParallelNamespaceTest
@Tag(ROLLING_UPDATE)
void testRecoveryDuringKRaftRollingUpdate(ExtensionContext extensionContext) {
assumeTrue(Environment.isKRaftModeEnabled());
assumeFalse(Environment.isOlmInstall() || Environment.isHelmInstall());

// kafka with 1 knp broker and 1 knp controller
final TestStorage testStorage = new TestStorage(extensionContext);
final String brokerPoolName = testStorage.getKafkaNodePoolName() + "-b";
final String controllerPoolName = testStorage.getKafkaNodePoolName() + "-c";

final Kafka kafka = KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 1, 1).build();
final KafkaNodePool brokerPool = KafkaNodePoolTemplates.kafkaBasedNodePoolWithBrokerRole(brokerPoolName, kafka, 3).build();
final KafkaNodePool controllerPool = KafkaNodePoolTemplates.kafkaBasedNodePoolWithControllerRole(controllerPoolName, kafka, 3).build();

final LabelSelector brokerPoolSelector = KafkaNodePoolResource.getLabelSelector(testStorage.getClusterName(), brokerPoolName, ProcessRoles.BROKER);
final LabelSelector controllerPoolSelector = KafkaNodePoolResource.getLabelSelector(testStorage.getClusterName(), controllerPoolName, ProcessRoles.CONTROLLER);

resourceManager.createResourceWithWait(extensionContext,
controllerPool, brokerPool, kafka,
KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 2, 2, testStorage.getNamespaceName()).build()
);

KafkaClients clients = ClientUtils.getDefaultClientBuilder(testStorage).build();

resourceManager.createResourceWithWait(extensionContext,
clients.producerStrimzi(), clients.consumerStrimzi()
);
ClientUtils.waitForClientsSuccess(testStorage);

// change controller knp to unreasonable CPU request causing trigger of Rolling update and recover by second modification
henryZrncik marked this conversation as resolved.
Show resolved Hide resolved
modifyNodePoolToUnscheduledAndRecover(controllerPoolName, controllerPoolSelector, testStorage);

// change broker knp to unreasonable CPU request causing trigger of Rolling update
modifyNodePoolToUnscheduledAndRecover(brokerPoolName, brokerPoolSelector, testStorage);

clients = new KafkaClientsBuilder(clients)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
.build();
resourceManager.createResourceWithWait(extensionContext, clients.consumerStrimzi());
ClientUtils.waitForConsumerClientSuccess(testStorage);
}

/**
* @description This test case checks scaling Kafka up and down and that it works correctly during this event.
*
Expand Down Expand Up @@ -862,6 +899,40 @@ void testMetricsChange(ExtensionContext extensionContext) throws JsonProcessingE
zkCollector.collectMetricsFromPodsWithoutWait().values().forEach(value -> assertThat(value, is("")));
}

/**
* Modifies a Kafka node pool to have an unreasonable CPU request, triggering a rolling update,
* and then recovers it to a normal state. CPU request is firstly increased, causing single pod
* to enter a pending state. Afterward wait for the pod to stabilize before reducing the CPU
* request back to a reasonable amount, allowing the node pool to recover.
*/
private static void modifyNodePoolToUnscheduledAndRecover(final String controllerPoolName, final LabelSelector controllerPoolSelector, final TestStorage testStorage) {
// change knp to unreasonable CPU request causing trigger of Rolling update
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(controllerPoolName,
knp -> {
knp
.getSpec()
.setResources(
new ResourceRequirements(null, null, Map.of("cpu", new Quantity("100000m")))
);
},
testStorage.getNamespaceName());

PodUtils.waitForPendingPod(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(testStorage.getClusterName(), controllerPoolName));
LOGGER.info("Verifying stability of {}/{} Pods except the one, which is in pending phase", controllerPoolName, testStorage.getNamespaceName());
PodUtils.verifyThatRunningPodsAreStable(testStorage.getNamespaceName(), KafkaResource.getStrimziPodSetName(testStorage.getClusterName(), controllerPoolName));

KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(controllerPoolName,
knp -> {
knp
.getSpec()
.setResources(
new ResourceRequirements(null, null, Map.of("cpu", new Quantity("100m")))
);
},
testStorage.getNamespaceName());
RollingUpdateUtils.waitForComponentAndPodsReady(testStorage.getNamespaceName(), controllerPoolSelector, 3);
}

@BeforeAll
void setup(ExtensionContext extensionContext) {
this.clusterOperator = this.clusterOperator
Expand Down