Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ST] Add NodePools to all of the test-cases, create NodePoolsConverter, add possibility to specify NP roles for all STs #9668

Merged
merged 5 commits into from
Feb 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 14 additions & 0 deletions systemtest/src/main/java/io/strimzi/systemtest/Environment.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import io.fabric8.kubernetes.api.model.Service;
import io.strimzi.systemtest.enums.ClusterOperatorInstallType;
import io.strimzi.systemtest.enums.NodePoolsRoleMode;
import io.strimzi.systemtest.utils.TestKafkaVersion;
import io.strimzi.test.TestUtils;
import io.strimzi.test.k8s.KubeClusterResource;
Expand Down Expand Up @@ -141,6 +142,11 @@ public class Environment {
*/
public static final String STRIMZI_USE_KRAFT_IN_TESTS_ENV = "STRIMZI_USE_KRAFT_IN_TESTS";

/**
* Switch for changing NodePool roles in STs - separate roles or mixed roles
*/
public static final String STRIMZI_NODE_POOLS_ROLE_MODE_ENV = "STRIMZI_NODE_POOLS_ROLE_MODE";

/**
* CO PodSet-only reconciliation env variable <br>
* Only SPS will be reconciled, when this env variable will be true
Expand Down Expand Up @@ -213,6 +219,7 @@ public class Environment {
public static final String STRIMZI_RBAC_SCOPE = getOrDefault(STRIMZI_RBAC_SCOPE_ENV, STRIMZI_RBAC_SCOPE_DEFAULT);
public static final String STRIMZI_FEATURE_GATES = getOrDefault(STRIMZI_FEATURE_GATES_ENV, STRIMZI_FEATURE_GATES_DEFAULT);
public static final boolean STRIMZI_USE_KRAFT_IN_TESTS = getOrDefault(STRIMZI_USE_KRAFT_IN_TESTS_ENV, Boolean::parseBoolean, false);
public static final NodePoolsRoleMode STRIMZI_NODE_POOLS_ROLE_MODE = getOrDefault(STRIMZI_NODE_POOLS_ROLE_MODE_ENV, value -> NodePoolsRoleMode.valueOf(value.toUpperCase(Locale.ENGLISH)), NodePoolsRoleMode.SEPARATE);

// variables for kafka client app images
private static final String TEST_CLIENTS_VERSION = getOrDefault(TEST_CLIENTS_VERSION_ENV, TEST_CLIENTS_VERSION_DEFAULT);
Expand Down Expand Up @@ -294,6 +301,13 @@ public static boolean isUnidirectionalTopicOperatorEnabled() {
return !STRIMZI_FEATURE_GATES.contains(TestConstants.DONT_USE_UNIDIRECTIONAL_TOPIC_OPERATOR);
}

/**
* Determine whether separate roles mode for KafkaNodePools is used or not
*/
public static boolean isSeparateRolesMode() {
return STRIMZI_NODE_POOLS_ROLE_MODE.equals(NodePoolsRoleMode.SEPARATE);
}

/**
* Provides boolean information, if testing environment support shared memory (i.e., environment, where all
* components share memory). In general, we use {@link Environment#RESOURCE_ALLOCATION_STRATEGY_DEFAULT} if env {@link Environment#RESOURCE_ALLOCATION_STRATEGY_ENV}
Expand Down
31 changes: 22 additions & 9 deletions systemtest/src/main/java/io/strimzi/systemtest/TestConstants.java
Original file line number Diff line number Diff line change
Expand Up @@ -444,9 +444,15 @@ public interface TestConstants {

String TEST_NAME_KEY = "TEST_NAME";
String CLUSTER_KEY = "CLUSTER_NAME";
String KAFKA_NODE_POOL_KEY = "KAFKA_NODE_POOL";
String BROKER_POOL_KEY = "BROKER_POOL";
String CONTROLLER_POOL_KEY = "CONTROLLER_POOL";
String MIXED_POOL_KEY = "MIXED_POOL";
String SOURCE_CLUSTER_KEY = "SOURCE_CLUSTER_NAME";
String SOURCE_BROKER_POOL_KEY = "SOURCE_BROKER_POOL";
String SOURCE_CONTROLLER_POOL_KEY = "SOURCE_CONTROLLER_POOL";
String TARGET_CLUSTER_KEY = "TARGET_CLUSTER_NAME";
String TARGET_BROKER_POOL_KEY = "TARGET_BROKER_POOL";
String TARGET_CONTROLLER_POOL_KEY = "TARGET_CONTROLLER_POOL";
String TOPIC_KEY = "TOPIC_NAME";
String TARGET_TOPIC_KEY = "TARGET_TOPIC_NAME";
String MIRRORED_SOURCE_TOPIC_KEY = "MIRRORED_SOURCE_TOPIC_NAME";
Expand All @@ -460,12 +466,17 @@ public interface TestConstants {
String TARGET_USER_NAME_KEY = "TARGET_USER_NAME";
String KAFKA_USER_NAME_KEY = "KAFKA_USER_NAME";
String ENTITY_OPERATOR_NAME_KEY = "ENTITY_OPERATOR_NAME";
String KAFKA_STATEFULSET_NAME_KEY = "KAFKA_STATEFULSET_NAME";
String ZOOKEEPER_STATEFULSET_NAME_KEY = "ZOOKEEPER_STATEFULSET_NAME";
String BROKER_COMPONENT_NAME_KEY = "BROKER_COMPONENT_NAME";
String CONTROLLER_COMPONENT_NAME_KEY = "CONTROLLER_COMPONENT_NAME";
String MIXED_COMPONENT_NAME_KEY = "MIXED_COMPONENT_NAME";
String SCRAPER_POD_KEY = "SCRAPER_POD_NAME";
String KAFKA_TRACING_CLIENT_KEY = "KAFKA_TRACING_CLIENT";
String KAFKA_SELECTOR_KEY = "KAFKA_SELECTOR";
String ZOOKEEPER_SELECTOR_KEY = "ZOOKEEPER_SELECTOR";
String BROKER_SELECTOR_KEY = "BROKER_SELECTOR";
String BROKER_POOL_SELECTOR_KEY = "BROKER_POOL_SELECTOR";
String CONTROLLER_POOL_SELECTOR_KEY = "CONTROLLER_POOL_SELECTOR";
String MIXED_POOL_SELECTOR_KEY = "MIXED_POOL_SELECTOR";
String CONTROLLER_SELECTOR_KEY = "CONTROLLER_SELECTOR";
String MIXED_SELECTOR_KEY = "MIXED_SELECTOR";
String KAFKA_CONNECT_SELECTOR_KEY = "KAFKA_CONNECT_SELECTOR";
String MM2_SELECTOR_KEY = "MM2_SELECTOR";
String MESSAGE_COUNT_KEY = "MESSAGE_COUNT";
Expand Down Expand Up @@ -498,12 +509,14 @@ public interface TestConstants {
String ST_CONNECT_BUILD_IMAGE_NAME = "strimzi-sts-connect-build";

/**
* KafkaNodePools constants
* Persistent Volume related
*/
String KAFKA_NODE_POOL_PREFIX = "kafka-";
String PVC_PHASE_BOUND = "Bound";

/**
* Persistent Volume related
* NodePool's name prefix based on role
*/
String PVC_PHASE_BOUND = "Bound";
String MIXED_ROLE_PREFIX = "mixed-";
String BROKER_ROLE_PREFIX = "broker-";
String CONTROLLER_ROLE_PREFIX = "control-";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.systemtest.enums;

/**
* Enum for determining in which mode we would like to run the Kafka and KafkaNodePools
* In case of {@link #SEPARATE}, we have different NodePool for "broker" role and for "controller" role
* In {@link #MIXED} case, there is one and only NodePool with both "broker" and "controller" roles
*/
public enum NodePoolsRoleMode {
im-konge marked this conversation as resolved.
Show resolved Hide resolved
SEPARATE,
MIXED
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.strimzi.systemtest.resources.crd.KafkaConnectResource;
import io.strimzi.systemtest.resources.crd.KafkaMirrorMaker2Resource;
import io.strimzi.systemtest.resources.crd.KafkaResource;
import io.strimzi.systemtest.resources.crd.StrimziPodSetResource;
import io.strimzi.test.TestUtils;
import io.strimzi.test.executor.Exec;
import org.apache.logging.log4j.LogManager;
Expand Down Expand Up @@ -165,7 +166,7 @@ protected MetricsCollector(Builder builder) {
private LabelSelector getLabelSelectorForResource() {
switch (this.componentType) {
case Kafka:
return KafkaResource.getLabelSelector(componentName, KafkaResources.kafkaComponentName(componentName));
return KafkaResource.getLabelSelector(componentName, StrimziPodSetResource.getBrokerComponentName(componentName));
case Zookeeper:
return KafkaResource.getLabelSelector(componentName, KafkaResources.zookeeperComponentName(componentName));
case KafkaConnect:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.systemtest.resources;

import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.systemtest.Environment;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;

/**
* Class responsible for handling {@link io.strimzi.api.kafka.model.nodepool.KafkaNodePool} resources in test execution.
* Based on {@link io.strimzi.systemtest.Environment#STRIMZI_FEATURE_GATES} and {@link io.strimzi.systemtest.Environment#STRIMZI_NODE_POOLS_ROLE_MODE}
* how the NodePools should look like, if they should be applied both or just one.
* For example, in case that:
* - we are using KafkaNodePools feature gate, but KRaft is disabled, we apply just Broker NodePool (just with broker role)
* - we are using KafkaNodePools feature gate, KRaft is enabled, then the Broker and Controller NodePools are applied with:
* - separate roles (so as a Broker and Controller) if {@link Environment#isSeparateRolesMode()} is true
* - mixed roles, the NodePool names are renamed (so they are not Brokers and Controllers), otherwise
* - we are not using KafkaNodePools feature gate, KRaft is disabled, we are not applying any of the NodePools
*
* This handler is needed in STs to switch between modes without a problem - and also to make the whole process less confusing.
*/
public class NodePoolsConverter {

/**
* Method that converts each of the NodePool passed to it, based on the mode.
* There are two steps, where are the NodePools changed:
* - change NodePools to mixed based on {@link Environment#isSeparateRolesMode()} and {@link Environment#isKRaftModeEnabled()}
* - filters NodePools that are not relevant to particular mode - ZK mode without NodePools, ZK mode with NodePools, KRaft
*
* @param nodePoolsToBeConverted NodePools that should be converted
* @return array of updated and filtered NodePools
*/
public static KafkaNodePool[] convertNodePoolsIfNeeded(KafkaNodePool... nodePoolsToBeConverted) {
List<KafkaNodePool> nodePools = Arrays.asList(nodePoolsToBeConverted);

changeNodePoolsToHaveMixedRoles(nodePools);

return removeNodePoolsFromArrayIfNeeded(nodePools).toArray(new KafkaNodePool[0]);
}

/**
* Method that changes NodePools with broker role to have both roles (broker, controller).
* in case that we run in {@link io.strimzi.systemtest.enums.NodePoolsRoleMode#MIXED} mode and {@link Environment#isKRaftModeEnabled()} is true,
* it does following:
* - removes all controller NodePools (so we have just one NodePool instead of two - easier handling in STs)
* - for each NodePool that left (broker NodePools) adds {@link ProcessRoles#CONTROLLER} role to its `spec`
* @param nodePools
*/
private static void changeNodePoolsToHaveMixedRoles(List<KafkaNodePool> nodePools) {
if (!Environment.isSeparateRolesMode() && Environment.isKRaftModeEnabled()) {
// remove controller NodePools, so we have just one NodePool for the mixed mode
nodePools.removeIf(nodePool -> nodePool.getSpec().getRoles().stream().anyMatch(ProcessRoles.CONTROLLER::equals));
nodePools.forEach(nodePool -> nodePool.getSpec().setRoles(List.of(ProcessRoles.BROKER, ProcessRoles.CONTROLLER)));
}
}

/**
* Method that returns filtered list of NodePools based on the mode.
* It does following:
* - ZK mode without NodePools - returns empty list, so nothing is applied
* - ZK mode with NodePools - returns list of NodePools from {@param nodePools}, without NodePools containing {@link ProcessRoles#CONTROLLER} role
* - KRaft mode - returns list of NodePools without any other update
* @param nodePools list of NodePools that should be filtered based on mode
* @return filtered/empty/full list of NodePools
*/
private static List<KafkaNodePool> removeNodePoolsFromArrayIfNeeded(List<KafkaNodePool> nodePools) {
if (Environment.isKafkaNodePoolsEnabled()) {
if (Environment.isKRaftModeEnabled()) {
return nodePools;
}

return nodePools
.stream()
.filter(kafkaNodePool -> kafkaNodePool.getSpec().getRoles().stream().noneMatch(role -> role.equals(ProcessRoles.CONTROLLER)))
.collect(Collectors.toList());
}

return Collections.emptyList();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,9 @@
import io.strimzi.api.kafka.model.connector.KafkaConnector;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.Status;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.topic.KafkaTopic;
import io.strimzi.api.kafka.model.user.KafkaUser;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.enums.ConditionStatus;
Expand Down Expand Up @@ -198,10 +196,6 @@ private <T extends HasMetadata> void createResource(boolean waitReady, T... reso
}
resource.getMetadata().setAnnotations(annotations);
}

// in case when Kafka contains "strimzi.io/node-pools: enabled" annotation, we want to create KafkaNodePool
// and configure it as Kafka resource
createKafkaNodePoolIfNeeded((Kafka) resource);
}

if (Environment.isKRaftModeEnabled() && !Environment.isUnidirectionalTopicOperatorEnabled()) {
Expand Down Expand Up @@ -248,45 +242,6 @@ private <T extends HasMetadata> void createResource(boolean waitReady, T... reso
}
}

private void createKafkaNodePoolIfNeeded(Kafka resource) {
Map<String, String> annotations = resource.getMetadata().getAnnotations();

if (annotations.get(Annotations.ANNO_STRIMZI_IO_NODE_POOLS) != null && annotations.get(Annotations.ANNO_STRIMZI_IO_NODE_POOLS).equals("enabled")) {
KafkaNodePool nodePool = KafkaNodePoolResource.convertKafkaResourceToKafkaNodePool(resource);

setNamespaceInResource(nodePool);

boolean nodePoolAlreadyExists = KafkaNodePoolResource.kafkaNodePoolClient().inNamespace(resource.getMetadata().getNamespace()).list().getItems()
.stream()
.anyMatch(knp -> {
Map<String, String> labels = knp.getMetadata().getLabels();
return labels.containsKey(Labels.STRIMZI_CLUSTER_LABEL) && labels.get(Labels.STRIMZI_CLUSTER_LABEL).equals(resource.getMetadata().getName());
});

if (nodePoolAlreadyExists) {
LOGGER.info("Node pool will not be created as part of process of creation of Kafka instance as it already exists");
return;
}

labelResource(nodePool);

ResourceType<KafkaNodePool> nodePoolType = findResourceType(nodePool);
nodePoolType.create(nodePool);

// add it to stack
synchronized (this) {
STORED_RESOURCES.computeIfAbsent(getTestContext().getDisplayName(), k -> new Stack<>());
STORED_RESOURCES.get(getTestContext().getDisplayName()).push(
new ResourceItem<>(
() -> deleteResource(nodePool),
nodePool
));
}

nodePoolType.waitForReadiness(nodePool);
}
}

private <T extends HasMetadata> void labelResource(T resource) {
// If we are create resource in test case we annotate it with label. This is needed for filtering when
// we collect logs from Pods, ReplicaSets, Deployments etc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,15 @@
import io.strimzi.api.kafka.Crds;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolList;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.systemtest.Environment;
import io.strimzi.systemtest.TestConstants;
import io.strimzi.systemtest.resources.ResourceManager;
import io.strimzi.systemtest.resources.ResourceType;
import io.strimzi.systemtest.templates.crd.KafkaNodePoolTemplates;
import io.strimzi.systemtest.utils.kubeUtils.objects.PersistentVolumeClaimUtils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Consumer;

Expand Down Expand Up @@ -81,41 +76,6 @@ public static void replaceKafkaNodePoolResourceInSpecificNamespace(String resour
ResourceManager.replaceCrdResource(KafkaNodePool.class, KafkaNodePoolList.class, resourceName, editor, namespaceName);
}

public static KafkaNodePool convertKafkaResourceToKafkaNodePool(Kafka resource) {
List<ProcessRoles> nodeRoles = new ArrayList<>();
nodeRoles.add(ProcessRoles.BROKER);

if (Environment.isKRaftModeEnabled()) {
nodeRoles.add(ProcessRoles.CONTROLLER);
}

String nodePoolName = TestConstants.KAFKA_NODE_POOL_PREFIX + hashStub(resource.getMetadata().getName());

KafkaNodePoolBuilder builder = KafkaNodePoolTemplates.defaultKafkaNodePool(resource.getMetadata().getNamespace(), nodePoolName, resource.getMetadata().getName(), resource.getSpec().getKafka().getReplicas())
.editOrNewMetadata()
.withNamespace(resource.getMetadata().getNamespace())
.addToLabels(resource.getMetadata().getLabels())
.endMetadata()
.editOrNewSpec()
.withRoles(nodeRoles)
.withStorage(resource.getSpec().getKafka().getStorage())
.withJvmOptions(resource.getSpec().getKafka().getJvmOptions())
.withResources(resource.getSpec().getKafka().getResources())
.endSpec();

if (resource.getSpec().getKafka().getTemplate() != null) {
builder = builder
.editOrNewSpec()
.editOrNewTemplate()
.withPersistentVolumeClaim(resource.getSpec().getKafka().getTemplate().getPersistentVolumeClaim())
.withPod(resource.getSpec().getKafka().getTemplate().getPod())
.endTemplate()
.endSpec();
}

return builder.build();
}

public static LabelSelector getLabelSelector(String clusterName, String poolName, ProcessRoles processRole) {
Map<String, String> matchLabels = new HashMap<>();

Expand All @@ -133,4 +93,16 @@ public static LabelSelector getLabelSelector(String clusterName, String poolName
.withMatchLabels(matchLabels)
.build();
}

public static String getBrokerPoolName(String clusterName) {
return TestConstants.BROKER_ROLE_PREFIX + hashStub(clusterName);
}

public static String getControllerPoolName(String clusterName) {
return TestConstants.CONTROLLER_ROLE_PREFIX + hashStub(clusterName);
}

public static String getMixedPoolName(String clusterName) {
return TestConstants.MIXED_ROLE_PREFIX + hashStub(clusterName);
}
}