Skip to content

Commit

Permalink
another fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Kral <lukywill16@gmail.com>
  • Loading branch information
im-konge committed Feb 14, 2024
1 parent 88d5d41 commit 503156f
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import io.fabric8.kubernetes.api.model.LabelSelector;
import io.strimzi.api.kafka.model.connect.KafkaConnect;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.mirrormaker2.KafkaMirrorMaker2;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.systemtest.TestConstants;
Expand All @@ -16,6 +15,7 @@
import io.strimzi.systemtest.resources.crd.KafkaConnectResource;
import io.strimzi.systemtest.resources.crd.KafkaMirrorMaker2Resource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.resources.crd.StrimziPodSetResource;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaConnectUtils;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaMirrorMaker2Utils;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaUtils;
Expand Down Expand Up @@ -226,7 +226,7 @@ public static Map<String, String> waitForComponentScaleUpOrDown(String namespace
public static void waitForNoKafkaAndZKRollingUpdate(String namespaceName, String clusterName, Map<String, String> brokerPods) {
int[] i = {0};

LabelSelector brokerSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.kafkaComponentName(clusterName));
LabelSelector brokerSelector = KafkaResource.getLabelSelector(clusterName, StrimziPodSetResource.getBrokerComponentName(clusterName));

TestUtils.waitFor("Kafka Pods to remain stable and rolling update not to be triggered", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.GLOBAL_TIMEOUT,
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,25 @@ public static void waitForStrimziPodSetLabelsChange(String namespaceName, String
* Wait until the SPS is ready and all of its Pods are also ready with custom timeout.
*
* @param namespaceName Namespace name
* @param spsName The name of the StrimziPodSet
* @param clusterName name of the Kafka cluster
* @param componentName The name of the StrimziPodSet
* @param expectPods The number of pods expected.
*/
public static void waitForAllStrimziPodSetAndPodsReady(String namespaceName, String spsName, String componentName, int expectPods, long timeout) {
String resourceName = componentName.contains("-kafka") ? componentName.replace("-kafka", "") : componentName.replace("-zookeeper", "");
LabelSelector labelSelector = KafkaResource.getLabelSelector(resourceName, componentName);
public static void waitForAllStrimziPodSetAndPodsReady(String namespaceName, String clusterName, String componentName, int expectPods, long timeout) {
LabelSelector labelSelector = KafkaResource.getLabelSelector(clusterName, componentName);

LOGGER.info("Waiting for StrimziPodSet: {}/{} to be ready", namespaceName, spsName);
TestUtils.waitFor("readiness of StrimziPodSet: " + namespaceName + "/" + spsName, TestConstants.POLL_INTERVAL_FOR_RESOURCE_READINESS, timeout,
LOGGER.info("Waiting for StrimziPodSet: {}/{} to be ready", namespaceName, componentName);
TestUtils.waitFor("readiness of StrimziPodSet: " + namespaceName + "/" + componentName, TestConstants.POLL_INTERVAL_FOR_RESOURCE_READINESS, timeout,
() -> {
StrimziPodSetStatus podSetStatus = StrimziPodSetResource.strimziPodSetClient().inNamespace(namespaceName).withName(spsName).get().getStatus();
StrimziPodSetStatus podSetStatus = StrimziPodSetResource.strimziPodSetClient().inNamespace(namespaceName).withName(componentName).get().getStatus();
return podSetStatus.getPods() == podSetStatus.getReadyPods();
},
() -> ResourceManager.logCurrentResourceStatus(KafkaResource.kafkaClient().inNamespace(namespaceName).withName(resourceName).get()));
() -> ResourceManager.logCurrentResourceStatus(KafkaResource.kafkaClient().inNamespace(namespaceName).withName(clusterName).get()));

LOGGER.info("Waiting for {} Pod(s) of StrimziPodSet {}/{} to be ready", expectPods, namespaceName, spsName);
LOGGER.info("Waiting for {} Pod(s) of StrimziPodSet {}/{} to be ready", expectPods, namespaceName, componentName);
PodUtils.waitForPodsReady(namespaceName, labelSelector, expectPods, true,
() -> ResourceManager.logCurrentResourceStatus(KafkaResource.kafkaClient().inNamespace(namespaceName).withName(resourceName).get()));
LOGGER.info("StrimziPodSet: {}/{} is ready", namespaceName, spsName);
() -> ResourceManager.logCurrentResourceStatus(KafkaResource.kafkaClient().inNamespace(namespaceName).withName(clusterName).get()));
LOGGER.info("StrimziPodSet: {}/{} is ready", namespaceName, componentName);
}

public static void waitForAllStrimziPodSetAndPodsReady(String namespaceName, String spsName, String componentName, int expectPods) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.strimzi.systemtest.resources.crd.KafkaMirrorMakerResource;
import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.resources.crd.StrimziPodSetResource;
import io.strimzi.systemtest.storage.TestStorage;
import io.strimzi.systemtest.templates.crd.KafkaBridgeTemplates;
import io.strimzi.systemtest.templates.crd.KafkaConnectTemplates;
Expand Down Expand Up @@ -195,11 +196,11 @@ void testKafkaLogSetting(ExtensionContext extensionContext) {
String userOperatorMap = String.format("%s-%s", LOG_SETTING_CLUSTER_NAME, "entity-user-operator-config");

String eoDepName = KafkaResources.entityOperatorDeploymentName(LOG_SETTING_CLUSTER_NAME);
String kafkaSsName = KafkaResources.kafkaComponentName(LOG_SETTING_CLUSTER_NAME);
String zkSsName = KafkaResources.zookeeperComponentName(LOG_SETTING_CLUSTER_NAME);
String brokerComponentName = StrimziPodSetResource.getBrokerComponentName(LOG_SETTING_CLUSTER_NAME);
String controllerComponentName = StrimziPodSetResource.getControllerComponentName(LOG_SETTING_CLUSTER_NAME);

LabelSelector brokerSelector = KafkaResource.getLabelSelector(LOG_SETTING_CLUSTER_NAME, kafkaSsName);
LabelSelector controllerSelector = KafkaResource.getLabelSelector(LOG_SETTING_CLUSTER_NAME, zkSsName);
LabelSelector brokerSelector = KafkaResource.getLabelSelector(LOG_SETTING_CLUSTER_NAME, brokerComponentName);
LabelSelector controllerSelector = KafkaResource.getLabelSelector(LOG_SETTING_CLUSTER_NAME, controllerComponentName);

Map<String, String> eoPods = DeploymentUtils.depSnapshot(Environment.TEST_SUITE_NAMESPACE, eoDepName);
Map<String, String> brokerPods = PodUtils.podSnapshot(Environment.TEST_SUITE_NAMESPACE, brokerSelector);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,8 +405,7 @@ void testKafkaExporterDifferentSetting() throws InterruptedException, ExecutionE
@ParallelTest
void testClusterOperatorMetrics() {
// Expected PodSet counts per component
int zooPodSetCount = Environment.isKRaftModeEnabled() ? 0 : 1;
int kafkaPodSetCount = 1;
int podSetCount = 2;

assertCoMetricResourceNotNull(clusterOperatorCollector, "strimzi_reconciliations_periodical_total", Kafka.RESOURCE_KIND);
assertCoMetricResourceNotNull(clusterOperatorCollector, "strimzi_reconciliations_duration_seconds_count", Kafka.RESOURCE_KIND);
Expand All @@ -429,7 +428,7 @@ void testClusterOperatorMetrics() {

// check StrimziPodSet metrics in CO
assertMetricCountHigherThan(clusterOperatorCollector, getResourceMetricPattern(StrimziPodSet.RESOURCE_KIND, namespaceFirst), 0);
assertCoMetricResources(clusterOperatorCollector, StrimziPodSet.RESOURCE_KIND, namespaceSecond, zooPodSetCount + kafkaPodSetCount);
assertCoMetricResources(clusterOperatorCollector, StrimziPodSet.RESOURCE_KIND, namespaceSecond, podSetCount);

assertCoMetricResourceNotNull(clusterOperatorCollector, "strimzi_reconciliations_duration_seconds_bucket", StrimziPodSet.RESOURCE_KIND);
assertCoMetricResourceNotNull(clusterOperatorCollector, "strimzi_reconciliations_duration_seconds_count", StrimziPodSet.RESOURCE_KIND);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,8 @@ void testKafkaManagementTransferToAndFromKafkaNodePool(ExtensionContext extensio

StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(
testStorage.getNamespaceName(),
testStorage.getClusterName(),
KafkaResource.getStrimziPodSetName(testStorage.getClusterName(), kafkaNodePoolName),
KafkaResources.kafkaComponentName(testStorage.getClusterName()),
nodePoolIncreasedKafkaReplicaCount
);

Expand Down Expand Up @@ -254,8 +254,8 @@ void testKafkaManagementTransferToAndFromKafkaNodePool(ExtensionContext extensio

StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(
testStorage.getNamespaceName(),
KafkaResources.kafkaComponentName(testStorage.getClusterName()),
KafkaResources.kafkaComponentName(testStorage.getClusterName()),
testStorage.getClusterName(),
KafkaResource.getStrimziPodSetName(testStorage.getClusterName(), kafkaNodePoolName),
originalKafkaReplicaCount
);
PodUtils.waitUntilPodStabilityReplicasCount(testStorage.getNamespaceName(), KafkaResources.kafkaComponentName(testStorage.getClusterName()), originalKafkaReplicaCount);
Expand Down Expand Up @@ -286,8 +286,8 @@ void testKafkaManagementTransferToAndFromKafkaNodePool(ExtensionContext extensio

StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(
testStorage.getNamespaceName(),
testStorage.getClusterName(),
KafkaResource.getStrimziPodSetName(testStorage.getClusterName(), kafkaNodePoolName),
KafkaResources.kafkaComponentName(testStorage.getClusterName()),
nodePoolIncreasedKafkaReplicaCount
);
PodUtils.waitUntilPodStabilityReplicasCount(testStorage.getNamespaceName(), KafkaResources.kafkaComponentName(testStorage.getClusterName()), nodePoolIncreasedKafkaReplicaCount);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ void testPodSetOnlyReconciliation(ExtensionContext extensionContext) {
RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(), testStorage.getBrokerSelector(), replicas, brokerPods);

LOGGER.info("Wait till all StrimziPodSet {}/{} status match number of ready pods", testStorage.getNamespaceName(), testStorage.getBrokerComponentName());
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(testStorage.getNamespaceName(), testStorage.getBrokerComponentName(), KafkaResources.kafkaComponentName(testStorage.getClusterName()), 3);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(testStorage.getNamespaceName(), testStorage.getClusterName(), testStorage.getBrokerComponentName(), 3);

ClientUtils.waitForClientsSuccess(testStorage);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ void testRecoveryFromKafkaStrimziPodSetDeletion() {

LOGGER.info("Waiting for recovery {}", kafkaName);
StrimziPodSetUtils.waitForStrimziPodSetRecovery(Environment.TEST_SUITE_NAMESPACE, kafkaName, kafkaUid);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, kafkaName, StrimziPodSetResource.getBrokerComponentName(sharedClusterName), KAFKA_REPLICAS);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, kafkaName, KAFKA_REPLICAS);
}

@IsolatedTest("We need for each test case its own Cluster Operator")
Expand All @@ -86,7 +86,7 @@ void testRecoveryFromZookeeperStrimziPodSetDeletion() {

LOGGER.info("Waiting for recovery {}", zookeeperName);
StrimziPodSetUtils.waitForStrimziPodSetRecovery(Environment.TEST_SUITE_NAMESPACE, zookeeperName, zookeeperUid);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, zookeeperName, zookeeperName, ZOOKEEPER_REPLICAS);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, zookeeperName, ZOOKEEPER_REPLICAS);
}

@IsolatedTest("We need for each test case its own Cluster Operator")
Expand Down Expand Up @@ -235,8 +235,8 @@ void testRecoveryFromKafkaAndZookeeperPodDeletion() {
kafkaPodList.subList(0, kafkaPodList.size() - 1).forEach(pod -> kubeClient().deletePod(pod));
zkPodList.subList(0, zkPodList.size() - 1).forEach(pod -> kubeClient().deletePod(pod));

StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, kafkaStrimziPodSet, kafkaName, KAFKA_REPLICAS);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, zkName, zkName, ZOOKEEPER_REPLICAS);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, kafkaStrimziPodSet, KAFKA_REPLICAS);
StrimziPodSetUtils.waitForAllStrimziPodSetAndPodsReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName, zkName, ZOOKEEPER_REPLICAS);
KafkaUtils.waitForKafkaReady(Environment.TEST_SUITE_NAMESPACE, sharedClusterName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -355,11 +355,11 @@ void testKafkaPodPendingDueToRack(ExtensionContext extensionContext) {
if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(testStorage.getBrokerPoolName(), knp ->
knp.getSpec().getTemplate().getPod().setAffinity(null), testStorage.getNamespaceName());
} else {
KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), kafka ->
kafka.getSpec().getKafka().getTemplate().getPod().setAffinity(null), testStorage.getNamespaceName());
}

KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), kafka ->
kafka.getSpec().getKafka().getTemplate().getPod().setAffinity(null), testStorage.getNamespaceName());

// kafka should get back ready in some reasonable time frame
KafkaUtils.waitForKafkaReady(testStorage.getNamespaceName(), testStorage.getClusterName());
KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(testStorage.getClusterName()).withPropagationPolicy(DeletionPropagation.FOREGROUND).delete();
Expand Down Expand Up @@ -404,31 +404,31 @@ void testKafkaRollingUpdatesOfSingleRoleNodePools(final ExtensionContext extensi
KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), 1, 1).build()
);

PodUtils.waitForPodsReady(testStorage.getNamespaceName(), testStorage.getBrokerPoolSelector(), brokerPoolReplicas, true);
PodUtils.waitForPodsReady(testStorage.getNamespaceName(), testStorage.getControllerPoolSelector(), controllerPoolReplicas, true);
PodUtils.waitForPodsReady(testStorage.getNamespaceName(), testStorage.getBrokerSelector(), brokerPoolReplicas, true);
PodUtils.waitForPodsReady(testStorage.getNamespaceName(), testStorage.getControllerSelector(), controllerPoolReplicas, true);

Map<String, String> brokerPoolPodsSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getBrokerPoolSelector());
Map<String, String> controllerPoolPodsSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getControllerPoolSelector());
Map<String, String> brokerPoolPodsSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getBrokerSelector());
Map<String, String> controllerPoolPodsSnapshot = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getControllerSelector());

// change Controller-only configuration inside shared Kafka configuration between KafkaNodePools and see that only controller pods rolls
KafkaUtils.updateSpecificConfiguration(testStorage.getNamespaceName(), testStorage.getClusterName(), "controller.quorum.election.timeout.ms", 10000);

// only controller-role nodes rolls
controllerPoolPodsSnapshot = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(),
testStorage.getControllerPoolSelector(), controllerPoolReplicas, controllerPoolPodsSnapshot);
testStorage.getControllerSelector(), controllerPoolReplicas, controllerPoolPodsSnapshot);

// broker-role nodes does not roll
RollingUpdateUtils.waitForNoRollingUpdate(testStorage.getNamespaceName(), testStorage.getBrokerPoolSelector(), brokerPoolPodsSnapshot);
RollingUpdateUtils.waitForNoRollingUpdate(testStorage.getNamespaceName(), testStorage.getBrokerSelector(), brokerPoolPodsSnapshot);

// change Broker-only configuration inside shared Kafka configuration between KafkaNodePools and see that only broker pods rolls
KafkaUtils.updateSpecificConfiguration(testStorage.getNamespaceName(), testStorage.getClusterName(), "initial.broker.registration.timeout.ms", 33500);

// only broker-role nodes rolls
brokerPoolPodsSnapshot = RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(),
testStorage.getBrokerPoolSelector(), brokerPoolReplicas, brokerPoolPodsSnapshot);
testStorage.getBrokerSelector(), brokerPoolReplicas, brokerPoolPodsSnapshot);

// controller-role nodes does not roll
RollingUpdateUtils.waitForNoRollingUpdate(testStorage.getNamespaceName(), testStorage.getControllerPoolSelector(), controllerPoolPodsSnapshot);
RollingUpdateUtils.waitForNoRollingUpdate(testStorage.getNamespaceName(), testStorage.getControllerSelector(), controllerPoolPodsSnapshot);

// 2nd Rolling update triggered by PodAffinity

Expand Down Expand Up @@ -457,10 +457,10 @@ void testKafkaRollingUpdatesOfSingleRoleNodePools(final ExtensionContext extensi

// Expect a rolling update on the controller nodes due to the affinity change
RollingUpdateUtils.waitTillComponentHasRolledAndPodsReady(testStorage.getNamespaceName(),
testStorage.getControllerPoolSelector(), controllerPoolReplicas, controllerPoolPodsSnapshot);
testStorage.getControllerSelector(), controllerPoolReplicas, controllerPoolPodsSnapshot);

// Verify that broker nodes do not roll due to the controller node pool affinity change
RollingUpdateUtils.waitForNoRollingUpdate(testStorage.getNamespaceName(), testStorage.getBrokerPoolSelector(), brokerPoolPodsSnapshot);
RollingUpdateUtils.waitForNoRollingUpdate(testStorage.getNamespaceName(), testStorage.getBrokerSelector(), brokerPoolPodsSnapshot);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -280,10 +280,10 @@ void testRecoveryDuringKRaftRollingUpdate(ExtensionContext extensionContext) {
ClientUtils.waitForClientsSuccess(testStorage);

// change controller knp to unreasonable CPU request causing trigger of Rolling update and recover by second modification
modifyNodePoolToUnscheduledAndRecover(testStorage.getControllerPoolName(), testStorage.getControllerPoolSelector(), testStorage);
modifyNodePoolToUnscheduledAndRecover(testStorage.getControllerPoolName(), testStorage.getControllerSelector(), testStorage);

// change broker knp to unreasonable CPU request causing trigger of Rolling update
modifyNodePoolToUnscheduledAndRecover(testStorage.getBrokerPoolName(), testStorage.getBrokerPoolSelector(), testStorage);
modifyNodePoolToUnscheduledAndRecover(testStorage.getBrokerPoolName(), testStorage.getBrokerSelector(), testStorage);

clients = new KafkaClientsBuilder(clients)
.withConsumerGroup(ClientUtils.generateRandomConsumerGroup())
Expand Down

0 comments on commit 503156f

Please sign in to comment.