Skip to content

Commit

Permalink
Inform about reverted scale-down or role-change in .status section (#…
Browse files Browse the repository at this point in the history
…9676)

Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj committed Feb 14, 2024
1 parent 5c78743 commit 764de7a
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -630,7 +630,7 @@ Future<KafkaReconciler> kafkaReconciler() {
}

return new KafkaClusterCreator(vertx, reconciliation, config, supplier)
.prepareKafkaCluster(kafkaAssembly, nodePools, oldStorage, currentPods, versionChange, true)
.prepareKafkaCluster(kafkaAssembly, nodePools, oldStorage, currentPods, versionChange, kafkaStatus, true)
.compose(kafkaCluster -> {
// We store this for use with Cruise Control later. As these configurations might
// not be exactly hte same as in the original custom resource (for example because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.strimzi.api.kafka.model.common.Condition;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaBuilder;
import io.strimzi.api.kafka.model.kafka.KafkaStatus;
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder;
Expand All @@ -23,6 +25,7 @@
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.model.InvalidResourceException;
import io.strimzi.operator.common.model.StatusUtils;
import io.strimzi.operator.common.operator.resource.SecretOperator;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -54,6 +57,7 @@ public class KafkaClusterCreator {
// State
private boolean scaleDownCheckFailed = false;
private boolean usedToBeBrokersCheckFailed = false;
private final List<Condition> warningConditions = new ArrayList<>();

/**
* Constructor
Expand Down Expand Up @@ -90,18 +94,19 @@ public KafkaClusterCreator(
* @param oldStorage Old storage configuration
* @param currentPods Existing Kafka pods
* @param versionChange Version Change object describing any possible upgrades / downgrades
* @param kafkaStatus The KafkaStatus where any possibly warnings will be added
* @param tryToFixProblems Flag indicating whether recoverable configuration issues should be fixed or not
*
* @return New Kafka Cluster instance
* @return New Kafka Cluster instance
*/
public Future<KafkaCluster> prepareKafkaCluster(
Kafka kafkaCr,
List<KafkaNodePool> nodePools,
Map<String, Storage> oldStorage,
Map<String, List<String>> currentPods,
KafkaVersionChange versionChange,
boolean tryToFixProblems
) {
KafkaStatus kafkaStatus,
boolean tryToFixProblems) {
return createKafkaCluster(kafkaCr, nodePools, oldStorage, currentPods, versionChange)
.compose(kafka -> brokerRemovalCheck(kafkaCr, kafka))
.compose(kafka -> {
Expand All @@ -110,7 +115,7 @@ public Future<KafkaCluster> prepareKafkaCluster(
// Once we fix it, we call this method again, but this time with tryToFixProblems set to false
return revertScaleDown(kafka, kafkaCr, nodePools)
.compose(kafkaAndNodePools -> revertRoleChange(kafkaAndNodePools.kafkaCr(), kafkaAndNodePools.nodePoolCrs()))
.compose(kafkaAndNodePools -> prepareKafkaCluster(kafkaAndNodePools.kafkaCr(), kafkaAndNodePools.nodePoolCrs(), oldStorage, currentPods, versionChange, false));
.compose(kafkaAndNodePools -> prepareKafkaCluster(kafkaAndNodePools.kafkaCr(), kafkaAndNodePools.nodePoolCrs(), oldStorage, currentPods, versionChange, kafkaStatus, false));
} else if (checkFailed()) {
// We have a failure, but we should not try to fix it
List<String> errors = new ArrayList<>();
Expand All @@ -126,6 +131,11 @@ public Future<KafkaCluster> prepareKafkaCluster(
return Future.failedFuture(new InvalidResourceException("Following errors were found when processing the Kafka custom resource: " + errors));
} else {
// If everything succeeded, we return the KafkaCluster object
// If any warning conditions exist from the reverted changes, we add them to the status
if (!warningConditions.isEmpty()) {
kafkaStatus.addConditions(warningConditions);
}

return Future.succeededFuture(kafka);
}
});
Expand Down Expand Up @@ -209,6 +219,7 @@ private Future<KafkaAndNodePools> revertScaleDown(KafkaCluster kafka, Kafka kafk
if (nodePoolCrs == null || nodePoolCrs.isEmpty()) {
// There are no node pools => the Kafka CR is used
int newReplicasCount = kafkaCr.getSpec().getKafka().getReplicas() + kafka.removedNodes().size();
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting scale-down of Kafka " + kafkaCr.getMetadata().getName() + " by changing number of replicas to " + newReplicasCount));
LOGGER.warnCr(reconciliation, "Reverting scale-down of Kafka {} by changing number of replicas to {}", kafkaCr.getMetadata().getName(), newReplicasCount);

Kafka newKafkaCr = new KafkaBuilder(kafkaCr)
Expand All @@ -230,6 +241,7 @@ private Future<KafkaAndNodePools> revertScaleDown(KafkaCluster kafka, Kafka kafk
&& nodePool.getStatus().getNodeIds() != null
&& nodePool.getSpec().getReplicas() < nodePool.getStatus().getNodeIds().size()) {
int newReplicasCount = nodePool.getStatus().getNodeIds().size();
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting scale-down of KafkaNodePool " + nodePool.getMetadata().getName() + " by changing number of replicas to " + newReplicasCount));
LOGGER.warnCr(reconciliation, "Reverting scale-down of KafkaNodePool {} by changing number of replicas to {}", nodePool.getMetadata().getName(), newReplicasCount);
newNodePools.add(
new KafkaNodePoolBuilder(nodePool)
Expand Down Expand Up @@ -266,6 +278,7 @@ private Future<KafkaAndNodePools> revertRoleChange(Kafka kafkaCr, List<KafkaNode
if (nodePool.getStatus() != null
&& nodePool.getStatus().getRoles().contains(ProcessRoles.BROKER)
&& !nodePool.getSpec().getRoles().contains(ProcessRoles.BROKER)) {
warningConditions.add(StatusUtils.buildWarningCondition("ScaleDownPreventionCheck", "Reverting role change of KafkaNodePool " + nodePool.getMetadata().getName() + " by adding the broker role to it"));
LOGGER.warnCr(reconciliation, "Reverting role change of KafkaNodePool {} by adding the broker role to it", nodePool.getMetadata().getName());
newNodePools.add(
new KafkaNodePoolBuilder(nodePool)
Expand Down

0 comments on commit 764de7a

Please sign in to comment.