Skip to content

Commit

Permalink
change label selectors, component names, etc.
Browse files Browse the repository at this point in the history
Signed-off-by: Lukas Kral <lukywill16@gmail.com>
  • Loading branch information
im-konge committed Feb 12, 2024
1 parent 59d88f5 commit d6aefb5
Show file tree
Hide file tree
Showing 38 changed files with 328 additions and 326 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,10 @@
import io.fabric8.kubernetes.client.dsl.Resource;
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaResources;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolList;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.ResourceType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,12 @@ public static LabelSelector getLabelSelector(String clusterName, String componen
Map<String, String> matchLabels = new HashMap<>();
matchLabels.put(Labels.STRIMZI_CLUSTER_LABEL, clusterName);
matchLabels.put(Labels.STRIMZI_KIND_LABEL, Kafka.RESOURCE_KIND);
matchLabels.put(Labels.STRIMZI_NAME_LABEL, componentName);

if (Environment.isKafkaNodePoolsEnabled() && !componentName.contains("zookeeper")) {
matchLabels.put(Labels.STRIMZI_CONTROLLER_NAME_LABEL, componentName);
} else {
matchLabels.put(Labels.STRIMZI_NAME_LABEL, componentName);
}

return new LabelSelectorBuilder()
.withMatchLabels(matchLabels)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
*/
package io.strimzi.systemtest.resources.crd;

import io.fabric8.kubernetes.api.model.LabelSelector;
import io.fabric8.kubernetes.client.dsl.MixedOperation;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.strimzi.api.kafka.Crds;
Expand Down Expand Up @@ -91,8 +90,8 @@ public static String getControllerComponentName(String clusterName) {
* @return component name of broker
*/
public static String getBrokerComponentName(String clusterName) {
if (Environment.isKRaftModeEnabled()) {
if (!Environment.isSeparateRolesMode()) {
if (Environment.isKafkaNodePoolsEnabled()) {
if (Environment.isKRaftModeEnabled() && !Environment.isSeparateRolesMode()) {
return KafkaResource.getStrimziPodSetName(clusterName, KafkaNodePoolResource.getMixedPoolName(clusterName));
}
return KafkaResource.getStrimziPodSetName(clusterName, KafkaNodePoolResource.getBrokerPoolName(clusterName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.kafkaclients.internalClients.KafkaTracingClients;
import io.strimzi.systemtest.resources.crd.*;
import io.strimzi.systemtest.resources.crd.KafkaConnectResource;
import io.strimzi.systemtest.resources.crd.KafkaMirrorMaker2Resource;
import io.strimzi.systemtest.resources.crd.KafkaNodePoolResource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.resources.crd.StrimziPodSetResource;
import io.strimzi.systemtest.utils.StUtils;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaTopicUtils;
import io.strimzi.systemtest.utils.kafkaUtils.KafkaUserUtils;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.logging.log4j.Logger;

import java.util.Map;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.BooleanSupplier;

Expand Down Expand Up @@ -72,7 +73,8 @@ public static boolean componentHasRolled(String namespaceName, LabelSelector sel
*/
public static Map<String, String> waitTillComponentHasRolled(String namespaceName, LabelSelector selector, Map<String, String> snapshot) {
String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
String componentName = Objects.requireNonNullElse(selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL),
selector.getMatchLabels().get(Labels.STRIMZI_CONTROLLER_NAME_LABEL));

componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;

Expand All @@ -93,7 +95,8 @@ public static Map<String, String> waitTillComponentHasRolled(String namespaceNam

public static Map<String, String> waitTillComponentHasRolledAndPodsReady(String namespaceName, LabelSelector selector, int expectedPods, Map<String, String> snapshot) {
String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
String componentName = Objects.requireNonNullElse(selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL),
selector.getMatchLabels().get(Labels.STRIMZI_CONTROLLER_NAME_LABEL));

componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;

Expand Down Expand Up @@ -163,7 +166,8 @@ public static Map<String, String> waitTillComponentHasStartedRolling(String name

public static void waitForComponentAndPodsReady(String namespaceName, LabelSelector selector, int expectedPods) {
final String clusterName = selector.getMatchLabels().get(Labels.STRIMZI_CLUSTER_LABEL);
String componentName = selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL);
String componentName = Objects.requireNonNullElse(selector.getMatchLabels().get(Labels.STRIMZI_NAME_LABEL),
selector.getMatchLabels().get(Labels.STRIMZI_CONTROLLER_NAME_LABEL));

componentName = componentName == null ? clusterName + "-" + selector.getMatchLabels().get(Labels.STRIMZI_POOL_NAME_LABEL) : componentName;

Expand Down Expand Up @@ -219,19 +223,19 @@ public static Map<String, String> waitForComponentScaleUpOrDown(String namespace
return PodUtils.podSnapshot(namespaceName, selector);
}

public static void waitForNoKafkaAndZKRollingUpdate(String namespaceName, String clusterName, Map<String, String> kafkaPods) {
public static void waitForNoKafkaAndZKRollingUpdate(String namespaceName, String clusterName, Map<String, String> brokerPods) {
int[] i = {0};

LabelSelector kafkaSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.kafkaComponentName(clusterName));
LabelSelector brokerSelector = KafkaResource.getLabelSelector(clusterName, KafkaResources.kafkaComponentName(clusterName));

TestUtils.waitFor("Kafka Pods to remain stable and rolling update not to be triggered", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.GLOBAL_TIMEOUT,
() -> {
boolean kafkaRolled = componentHasRolled(namespaceName, kafkaSelector, kafkaPods);
boolean kafkaRolled = componentHasRolled(namespaceName, brokerSelector, brokerPods);

if (!kafkaRolled) {
LOGGER.info("Kafka Pods did not roll. Must remain stable for: {} second(s)", TestConstants.GLOBAL_RECONCILIATION_COUNT - i[0]);
} else {
throw new RuntimeException(kafkaPods.toString() + " Pods are rolling!");
throw new RuntimeException(brokerPods.toString() + " Pods are rolling!");
}

return i[0]++ == TestConstants.GLOBAL_RECONCILIATION_COUNT;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,28 +180,25 @@ public static void waitForKafkaSecretAndStatusCertsMatches(Supplier<String> kafk

@SuppressWarnings("unchecked")
public static void waitForClusterStability(String namespaceName, String clusterName) {
LabelSelector kafkaSelector = KafkaResource.getLabelSelector(clusterName, kafkaComponentName(clusterName));
LabelSelector zkSelector = KafkaResource.getLabelSelector(clusterName, zookeeperComponentName(clusterName));
LabelSelector brokerSelector = KafkaResource.getLabelSelector(clusterName, kafkaComponentName(clusterName));
LabelSelector controllerSelector = KafkaResource.getLabelSelector(clusterName, zookeeperComponentName(clusterName));

Map<String, String>[] zkPods = new Map[1];
Map<String, String>[] kafkaPods = new Map[1];
Map<String, String>[] controllerPods = new Map[1];
Map<String, String>[] brokerPods = new Map[1];
Map<String, String>[] eoPods = new Map[1];

LOGGER.info("Waiting for cluster stability");

int[] count = {0};

kafkaPods[0] = PodUtils.podSnapshot(namespaceName, kafkaSelector);

if (!Environment.isKRaftModeEnabled()) {
zkPods[0] = PodUtils.podSnapshot(namespaceName, zkSelector);
}
brokerPods[0] = PodUtils.podSnapshot(namespaceName, brokerSelector);
controllerPods[0] = PodUtils.podSnapshot(namespaceName, controllerSelector);
eoPods[0] = DeploymentUtils.depSnapshot(namespaceName, KafkaResources.entityOperatorDeploymentName(clusterName));

TestUtils.waitFor("Cluster to be stable and ready", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.TIMEOUT_FOR_CLUSTER_STABLE, () -> {
Map<String, String> kafkaSnapshot = PodUtils.podSnapshot(namespaceName, kafkaSelector);
Map<String, String> kafkaSnapshot = PodUtils.podSnapshot(namespaceName, brokerSelector);
Map<String, String> eoSnapshot = DeploymentUtils.depSnapshot(namespaceName, KafkaResources.entityOperatorDeploymentName(clusterName));
boolean kafkaSameAsLast = kafkaSnapshot.equals(kafkaPods[0]);
boolean kafkaSameAsLast = kafkaSnapshot.equals(brokerPods[0]);
boolean eoSameAsLast = eoSnapshot.equals(eoPods[0]);

if (!kafkaSameAsLast) {
Expand All @@ -212,9 +209,9 @@ public static void waitForClusterStability(String namespaceName, String clusterN
}

if (!Environment.isKRaftModeEnabled()) {
Map<String, String> zkSnapshot = PodUtils.podSnapshot(namespaceName, zkSelector);
Map<String, String> zkSnapshot = PodUtils.podSnapshot(namespaceName, controllerSelector);

boolean zkSameAsLast = zkSnapshot.equals(zkPods[0]);
boolean zkSameAsLast = zkSnapshot.equals(controllerPods[0]);

if (!zkSameAsLast) {
LOGGER.warn("ZK Cluster not stable");
Expand All @@ -228,7 +225,7 @@ public static void waitForClusterStability(String namespaceName, String clusterN
}
return false;
}
zkPods[0] = zkSnapshot;
controllerPods[0] = zkSnapshot;
} else {
if (kafkaSameAsLast && eoSameAsLast) {
int c = count[0]++;
Expand All @@ -240,7 +237,7 @@ public static void waitForClusterStability(String namespaceName, String clusterN
return false;
}
}
kafkaPods[0] = kafkaSnapshot;
brokerPods[0] = kafkaSnapshot;
eoPods[0] = eoSnapshot;

count[0] = 0;
Expand Down Expand Up @@ -306,10 +303,10 @@ public synchronized static boolean verifyCrDynamicConfiguration(final String nam
*/
public synchronized static boolean verifyPodDynamicConfiguration(final String namespaceName, String scraperPodName, String bootstrapServer, String kafkaPodNamePrefix, String brokerConfigName, Object value) {

List<Pod> kafkaPods = kubeClient().listPodsByPrefixInName(namespaceName, kafkaPodNamePrefix);
List<Pod> brokerPods = kubeClient().listPodsByPrefixInName(namespaceName, kafkaPodNamePrefix);
int[] brokerId = {0};

for (Pod pod : kafkaPods) {
for (Pod pod : brokerPods) {

TestUtils.waitFor("dyn.configuration to change", TestConstants.GLOBAL_POLL_INTERVAL, TestConstants.RECONCILIATION_INTERVAL + Duration.ofSeconds(10).toMillis(),
() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public static void waitForPersistentVolumeClaimDeletion(TestStorage testStorage,
LOGGER.info("Waiting for PVC(s): {}/{} to reach expected amount: {}", testStorage.getClusterName(), testStorage.getNamespaceName(), expectedNum);
TestUtils.waitFor("PVC(s) to be created/deleted", TestConstants.GLOBAL_POLL_INTERVAL_MEDIUM, TestConstants.GLOBAL_TIMEOUT,
() -> KubeClusterResource.kubeClient().listPersistentVolumeClaims(testStorage.getNamespaceName(), testStorage.getClusterName()).stream()
.filter(pvc -> pvc.getMetadata().getName().contains("data-") && pvc.getMetadata().getName().contains(testStorage.getKafkaStatefulSetName())).collect(Collectors.toList()).size() == expectedNum
.filter(pvc -> pvc.getMetadata().getName().contains("data-") && pvc.getMetadata().getName().contains(testStorage.getBrokerComponentName())).collect(Collectors.toList()).size() == expectedNum
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -316,10 +316,10 @@ public static void waitForPodContainerReady(String namespaceName, String podName
*/
public static List<Pod> getKafkaClusterPods(final TestStorage testStorage) {
List<Pod> kafkaClusterPods = kubeClient(testStorage.getNamespaceName())
.listPodsByPrefixInName(testStorage.getKafkaStatefulSetName());
.listPodsByPrefixInName(testStorage.getBrokerComponentName());
// zk pods
kafkaClusterPods.addAll(kubeClient(testStorage.getNamespaceName())
.listPodsByPrefixInName(testStorage.getZookeeperStatefulSetName()));
.listPodsByPrefixInName(testStorage.getControllerComponentName()));
// eo pod
kafkaClusterPods.addAll(kubeClient(testStorage.getNamespaceName())
.listPodsByPrefixInName(testStorage.getEoDeploymentName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -441,7 +441,7 @@ protected void assertNoCoErrorsLogged(String namespaceName, long sinceSeconds) {
}

protected void testDockerImagesForKafkaCluster(String clusterName, String clusterOperatorNamespaceName, String kafkaNamespaceName,
int kafkaPods, int zkPods, boolean rackAwareEnabled) {
int brokerPods, int controllerPods, boolean rackAwareEnabled) {
LOGGER.info("Verifying docker image names");
//Verifying docker image for cluster-operator

Expand All @@ -454,14 +454,14 @@ protected void testDockerImagesForKafkaCluster(String clusterName, String cluste

if (!Environment.isKRaftModeEnabled()) {
//Verifying docker image for zookeeper pods
for (int i = 0; i < zkPods; i++) {
for (int i = 0; i < controllerPods; i++) {
String imgFromPod = PodUtils.getContainerImageNameFromPod(kafkaNamespaceName, KafkaResources.zookeeperPodName(clusterName, i), "zookeeper");
assertThat("ZooKeeper Pod: " + i + " uses wrong image", imgFromPod, containsString(TestUtils.parseImageMap(imgFromDeplConf.get(KAFKA_IMAGE_MAP)).get(kafkaVersion)));
}
}

//Verifying docker image for kafka pods
for (int i = 0; i < kafkaPods; i++) {
for (int i = 0; i < brokerPods; i++) {
String imgFromPod = PodUtils.getContainerImageNameFromPod(kafkaNamespaceName, KafkaResource.getKafkaPodName(clusterName, KafkaNodePoolResource.getBrokerPoolName(clusterName), i), "kafka");
assertThat("Kafka Pod: " + i + " uses wrong image", imgFromPod, containsString(TestUtils.parseImageMap(imgFromDeplConf.get(KAFKA_IMAGE_MAP)).get(kafkaVersion)));
if (rackAwareEnabled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,14 @@ void testDeployAndUnDeployCruiseControl(ExtensionContext extensionContext) throw
);
resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), 3, 3).build());

Map<String, String> kafkaPods = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getKafkaSelector());
Map<String, String> brokerPods = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getBrokerSelector());

KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), kafka -> {
LOGGER.info("Removing CruiseControl from Kafka");
kafka.getSpec().setCruiseControl(null);
}, testStorage.getNamespaceName());

kafkaPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), 3, kafkaPods);
brokerPods = RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), testStorage.getBrokerSelector(), 3, brokerPods);

LOGGER.info("Verifying that in {} is not present in the Kafka cluster", TestConstants.CRUISE_CONTROL_NAME);
assertThat(KafkaResource.kafkaClient().inNamespace(testStorage.getNamespaceName()).withName(testStorage.getClusterName()).get().getSpec().getCruiseControl(), nullValue());
Expand All @@ -93,7 +93,7 @@ void testDeployAndUnDeployCruiseControl(ExtensionContext extensionContext) throw
kafka.getSpec().setCruiseControl(new CruiseControlSpec());
}, testStorage.getNamespaceName());

RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), 3, kafkaPods);
RollingUpdateUtils.waitTillComponentHasRolled(testStorage.getNamespaceName(), testStorage.getBrokerSelector(), 3, brokerPods);

LOGGER.info("Verifying that configuration of CruiseControl metric reporter is present in Kafka ConfigMap");
CruiseControlUtils.verifyCruiseControlMetricReporterConfigurationInKafkaConfigMapIsPresent(CruiseControlUtils.getKafkaCruiseControlMetricsReporterConfiguration(testStorage.getNamespaceName(), testStorage.getClusterName()));
Expand All @@ -118,7 +118,7 @@ void testConfigurationUpdate(ExtensionContext extensionContext) throws IOExcepti
);
resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaWithCruiseControl(testStorage.getClusterName(), 3, 3).build());

Map<String, String> kafkaSnapShot = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getKafkaSelector());
Map<String, String> kafkaSnapShot = PodUtils.podSnapshot(testStorage.getNamespaceName(), testStorage.getBrokerSelector());
Map<String, String> cruiseControlSnapShot = DeploymentUtils.depSnapshot(testStorage.getNamespaceName(), CruiseControlResources.componentName(testStorage.getClusterName()));
Map<String, Object> performanceTuningOpts = new HashMap<String, Object>() {{
put(CruiseControlConfigurationParameters.CONCURRENT_INTRA_PARTITION_MOVEMENTS.getValue(), 2);
Expand All @@ -138,7 +138,7 @@ void testConfigurationUpdate(ExtensionContext extensionContext) throws IOExcepti
DeploymentUtils.waitTillDepHasRolled(testStorage.getNamespaceName(), CruiseControlResources.componentName(testStorage.getClusterName()), 1, cruiseControlSnapShot);

LOGGER.info("Verifying that Kafka Pods did not roll");
RollingUpdateUtils.waitForNoRollingUpdate(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), kafkaSnapShot);
RollingUpdateUtils.waitForNoRollingUpdate(testStorage.getNamespaceName(), testStorage.getBrokerSelector(), kafkaSnapShot);

LOGGER.info("Verifying new configuration in the Kafka CR");
ConfigMap configMap = kubeClient(testStorage.getNamespaceName()).getConfigMap(testStorage.getNamespaceName(), CruiseControlResources.configMapName(testStorage.getClusterName()));
Expand Down

0 comments on commit d6aefb5

Please sign in to comment.