Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ST] KafkaRoller prevent scale down if partition present #9305

Merged
merged 2 commits into from Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -101,7 +101,7 @@ public static void waitUntilKafkaStatusConditionContainsMessage(String clusterNa
List<Condition> conditions = KafkaResource.kafkaClient().inNamespace(namespace).withName(clusterName).get().getStatus().getConditions();
for (Condition condition : conditions) {
String conditionMessage = condition.getMessage();
if (conditionMessage.matches(pattern)) {
if (conditionMessage != null && conditionMessage.matches(pattern)) {
return true;
}
}
Expand Down
Expand Up @@ -15,6 +15,7 @@
import io.fabric8.kubernetes.api.model.NodeSelectorTermBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.strimzi.api.kafka.model.KafkaResources;
import io.strimzi.api.kafka.model.KafkaTopic;
import io.strimzi.api.kafka.model.StrimziPodSet;
import io.strimzi.api.kafka.model.template.KafkaClusterTemplate;
import io.strimzi.api.kafka.model.template.KafkaClusterTemplateBuilder;
Expand Down Expand Up @@ -56,6 +57,7 @@
import static io.strimzi.systemtest.k8s.Events.Scheduled;
import static io.strimzi.systemtest.k8s.Events.Started;
import static io.strimzi.systemtest.matchers.Matchers.hasAllOfReasons;
import static io.strimzi.systemtest.utils.kubeUtils.objects.PodUtils.waitForPodsReady;
import static io.strimzi.test.k8s.KubeClusterResource.kubeClient;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
Expand All @@ -71,80 +73,56 @@ public class KafkaRollerST extends AbstractST {

@ParallelNamespaceTest
@KRaftWithoutUTONotSupported
void testKafkaRollsWhenTopicIsUnderReplicated(ExtensionContext extensionContext) {
void testKafkaDoesNotRollsWhenTopicIsUnderReplicated(ExtensionContext extensionContext) {
final TestStorage testStorage = storageMap.get(extensionContext);
final String namespaceName = StUtils.getNamespaceBasedOnRbac(Environment.TEST_SUITE_NAMESPACE, extensionContext);
final String clusterName = testStorage.getClusterName();
final String topicName = testStorage.getTopicName();
final String kafkaStsName = KafkaResources.kafkaStatefulSetName(clusterName);
final LabelSelector kafkaSelector = KafkaResource.getLabelSelector(clusterName, kafkaStsName);

Instant startTime = Instant.now();

// We need to start with 3 replicas / brokers,
// so that KafkaStreamsTopicStore topic gets set/distributed on this first 3 [0, 1, 2],
// since this topic has replication-factor 3 and minISR 2.

// We have disabled the broker scale down check for now since the test fails at the moment
// due to partition replicas being present on the broker during scale down. We can enable this check
// once the issue is resolved
// https://github.com/strimzi/strimzi-kafka-operator/issues/9134
resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaPersistent(clusterName, 3)
.editMetadata()
.addToAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_SKIP_BROKER_SCALEDOWN_CHECK, "true"))
.endMetadata()
.editSpec()
.editKafka()
.addToConfig("auto.create.topics.enable", "false")
.endKafka()
.endSpec()
.build());
final int initialBrokerReplicaCount = 3;
final int scaledUpBrokerReplicaCount = 4;

LOGGER.info("Running kafkaScaleUpScaleDown {}", clusterName);
final int initialReplicas = kubeClient(namespaceName).listPods(kafkaSelector).size();
assertEquals(3, initialReplicas);
resourceManager.createResourceWithWait(extensionContext, KafkaTemplates.kafkaPersistent(testStorage.getClusterName(), initialBrokerReplicaCount).build());

// Now that KafkaStreamsTopicStore topic is set on the first 3 brokers, lets spin-up another one.
int scaledUpReplicas = 4;
LOGGER.info("Verify expected number of replicas '{}' is present in in Kafka Cluster: {}/{}", initialBrokerReplicaCount, testStorage.getNamespaceName(), testStorage.getClusterName());
final int observedReplicas = kubeClient(testStorage.getNamespaceName()).listPods(testStorage.getKafkaSelector()).size();
assertEquals(initialBrokerReplicaCount, observedReplicas);

LOGGER.info("Scale Kafka up from 3 to 4 brokers");
if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(KafkaResource.getNodePoolName(clusterName), knp -> knp.getSpec().setReplicas(scaledUpReplicas), namespaceName);
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(KafkaResource.getNodePoolName(testStorage.getClusterName()), knp -> knp.getSpec().setReplicas(scaledUpBrokerReplicaCount), testStorage.getNamespaceName());
} else {
KafkaResource.replaceKafkaResourceInSpecificNamespace(clusterName, k -> k.getSpec().getKafka().setReplicas(scaledUpReplicas), namespaceName);
KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> k.getSpec().getKafka().setReplicas(scaledUpBrokerReplicaCount), testStorage.getNamespaceName());
}
RollingUpdateUtils.waitForComponentScaleUpOrDown(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), scaledUpBrokerReplicaCount);

RollingUpdateUtils.waitForComponentScaleUpOrDown(namespaceName, kafkaSelector, scaledUpReplicas);

resourceManager.createResourceWithWait(extensionContext, KafkaTopicTemplates.topic(clusterName, topicName, 4, 4, 4, namespaceName).build());
LOGGER.info("Create kafkaTopic: {}/{} with replica on each broker", testStorage.getNamespaceName(), testStorage.getTopicName());
KafkaTopic kafkaTopic = KafkaTopicTemplates.topic(testStorage.getClusterName(), testStorage.getTopicName(), 4, 4, 4, testStorage.getNamespaceName()).build();
resourceManager.createResourceWithWait(extensionContext, kafkaTopic);

//Test that the new pod does not have errors or failures in events
String uid = kubeClient(namespaceName).getPodUid(KafkaResource.getKafkaPodName(clusterName, 3));
List<Event> events = kubeClient(namespaceName).listEventsByResourceUid(uid);
String uid = kubeClient(testStorage.getNamespaceName()).getPodUid(KafkaResource.getKafkaPodName(testStorage.getClusterName(), 3));
List<Event> events = kubeClient(testStorage.getNamespaceName()).listEventsByResourceUid(uid);
assertThat(events, hasAllOfReasons(Scheduled, Pulled, Created, Started));

// TODO scaling down of Kafka Cluster (with 4 replicas which has KafkaTopic with 4 replicas) can be forbidden in a future due to would-be Topic under-replication (https://github.com/strimzi/strimzi-kafka-operator/pull/9042)
// scale down
final int scaledDownReplicas = 3;
LOGGER.info("Scaling down to {}", scaledDownReplicas);

LOGGER.info("Scaling down to {}", initialBrokerReplicaCount);
if (Environment.isKafkaNodePoolsEnabled()) {
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(KafkaResource.getNodePoolName(clusterName), knp -> knp.getSpec().setReplicas(scaledDownReplicas), namespaceName);
KafkaNodePoolResource.replaceKafkaNodePoolResourceInSpecificNamespace(KafkaResource.getNodePoolName(testStorage.getClusterName()), knp -> knp.getSpec().setReplicas(initialBrokerReplicaCount), testStorage.getNamespaceName());
} else {
KafkaResource.replaceKafkaResourceInSpecificNamespace(clusterName, k -> k.getSpec().getKafka().setReplicas(scaledDownReplicas), namespaceName);
KafkaResource.replaceKafkaResourceInSpecificNamespace(testStorage.getClusterName(), k -> k.getSpec().getKafka().setReplicas(initialBrokerReplicaCount), testStorage.getNamespaceName());
}

Map<String, String> kafkaPods = RollingUpdateUtils.waitForComponentScaleUpOrDown(namespaceName, kafkaSelector, scaledDownReplicas);
kubeClient().listPodsByPrefixInName(Environment.TEST_SUITE_NAMESPACE, testStorage.getClusterName()).size();
LOGGER.info("Waiting for warning regarding preventing Kafka from scaling down when the broker to be scaled down have some partitions");
KafkaUtils.waitUntilKafkaStatusConditionContainsMessage(testStorage.getClusterName(), testStorage.getNamespaceName(), "Cannot scale down broker.*");
waitForPodsReady(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), scaledUpBrokerReplicaCount, false);

PodUtils.verifyThatRunningPodsAreStable(namespaceName, clusterName);

// set annotation to trigger Kafka rolling update
StrimziPodSetUtils.annotateStrimziPodSet(namespaceName, KafkaResource.getStrimziPodSetName(clusterName), Collections.singletonMap(Annotations.ANNO_STRIMZI_IO_MANUAL_ROLLING_UPDATE, "true"));
LOGGER.info("Remove Topic, thereby remove all partitions located on broker to be scaled down");
resourceManager.deleteResource(kafkaTopic);
RollingUpdateUtils.waitForComponentScaleUpOrDown(testStorage.getNamespaceName(), testStorage.getKafkaSelector(), initialBrokerReplicaCount);

RollingUpdateUtils.waitTillComponentHasRolled(namespaceName, kafkaSelector, scaledDownReplicas, kafkaPods);
//Test that CO doesn't have any exceptions in log
Instant endTime = Instant.now();
long duration = Duration.between(startTime, endTime).toSeconds();
assertNoCoErrorsLogged(namespaceName, duration);
assertNoCoErrorsLogged(testStorage.getNamespaceName(), duration);
}

@ParallelNamespaceTest
Expand Down