Skip to content

Commit

Permalink
Add tests
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj committed Jan 26, 2024
1 parent e9ed687 commit ff5eb9f
Show file tree
Hide file tree
Showing 12 changed files with 952 additions and 331 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.strimzi.api.kafka.model.kafka.Storage;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePool;
import io.strimzi.api.kafka.model.nodepool.KafkaNodePoolBuilder;
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.cluster.model.KafkaPool;
Expand Down Expand Up @@ -103,11 +104,12 @@ public Future<KafkaCluster> prepareKafkaCluster(
.compose(kafka -> {
if (scaleDownCheckFailed && tryToFixProblems) {
// We have a failure, and should try to fix issues
// Once we fix it, we call this method again, but this time with tryToFixProblems set to false
return revertScaleDown(kafka, kafkaCr, nodePools)
.compose(kafkaAndNodePools -> prepareKafkaCluster(kafkaAndNodePools.kafkaCr(), kafkaAndNodePools.nodePools(), oldStorage, currentPods, versionChange, false));
.compose(kafkaAndNodePools -> prepareKafkaCluster(kafkaAndNodePools.kafkaCr(), kafkaAndNodePools.nodePoolCrs(), oldStorage, currentPods, versionChange, false));
} else if (scaleDownCheckFailed) {
// We have a failure, but we should not try to fix it
return Future.failedFuture(new InvalidResourceException("Cannot scale-down Kafka brokers because they have assigned partition-replicas."));
return Future.failedFuture(new InvalidResourceException("Cannot scale-down Kafka brokers " + kafka.removedNodes() + " because they have assigned partition-replicas."));
} else {
// If something else failed, we just re-throw it
return Future.succeededFuture(kafka);
Expand All @@ -119,7 +121,7 @@ public Future<KafkaCluster> prepareKafkaCluster(
* Creates a new Kafka cluster
*
* @param kafkaCr Kafka custom resource
* @param nodePools List with KafkaNodePool custom resources
* @param nodePoolCrs List with KafkaNodePool custom resources
* @param oldStorage Old storage configuration
* @param currentPods Current Kafka pods
* @param versionChange Version change descriptor containing any upgrade / downgrade changes
Expand All @@ -128,12 +130,12 @@ public Future<KafkaCluster> prepareKafkaCluster(
*/
private Future<KafkaCluster> createKafkaCluster(
Kafka kafkaCr,
List<KafkaNodePool> nodePools,
List<KafkaNodePool> nodePoolCrs,
Map<String, Storage> oldStorage,
Map<String, List<String>> currentPods,
KafkaVersionChange versionChange
) {
return Future.succeededFuture(createKafkaCluster(reconciliation, kafkaCr, nodePools, oldStorage, currentPods, versionChange, useKRaftFGEnabled, versions, sharedEnvironmentProvider));
return Future.succeededFuture(createKafkaCluster(reconciliation, kafkaCr, nodePoolCrs, oldStorage, currentPods, versionChange, useKRaftFGEnabled, versions, sharedEnvironmentProvider));
}

/**
Expand Down Expand Up @@ -169,12 +171,12 @@ private Future<KafkaCluster> brokerScaleDownCheck(Kafka kafkaCr, KafkaCluster ka
*
* @param kafka Instance of the Kafka cluster model that contains information needed to revert the changes
* @param kafkaCr Kafka custom resource
* @param nodePools List with KafkaNodePool custom resources
* @param nodePoolCrs List with KafkaNodePool custom resources
*
* @return Future with KafkaAndNodePools record containing the fixed Kafka and KafkaNodePool CRs
*/
private Future<KafkaAndNodePools> revertScaleDown(KafkaCluster kafka, Kafka kafkaCr, List<KafkaNodePool> nodePools) {
if (nodePools == null || nodePools.isEmpty()) {
private Future<KafkaAndNodePools> revertScaleDown(KafkaCluster kafka, Kafka kafkaCr, List<KafkaNodePool> nodePoolCrs) {
if (nodePoolCrs == null || nodePoolCrs.isEmpty()) {
// There are no node pools => the Kafka CR is used
int newReplicasCount = kafkaCr.getSpec().getKafka().getReplicas() + kafka.removedNodes().size();
LOGGER.warnCr(reconciliation, "Reverting scale-down of Kafka {} by changing number of replicas to {}", kafkaCr.getMetadata().getName(), newReplicasCount);
Expand All @@ -187,13 +189,14 @@ private Future<KafkaAndNodePools> revertScaleDown(KafkaCluster kafka, Kafka kafk
.endSpec()
.build();

return Future.succeededFuture(new KafkaAndNodePools(newKafkaCr, nodePools));
return Future.succeededFuture(new KafkaAndNodePools(newKafkaCr, nodePoolCrs));
} else {
// Node pools are used -> we have to fix scale down in the KafkaNodePools
List<KafkaNodePool> newNodePools = new ArrayList<>();

for (KafkaNodePool nodePool : nodePools) {
for (KafkaNodePool nodePool : nodePoolCrs) {
if (nodePool.getStatus() != null
&& nodePool.getStatus().getRoles().contains(ProcessRoles.BROKER)
&& nodePool.getStatus().getNodeIds() != null
&& nodePool.getSpec().getReplicas() < nodePool.getStatus().getNodeIds().size()) {
int newReplicasCount = nodePool.getStatus().getNodeIds().size();
Expand Down Expand Up @@ -230,7 +233,7 @@ private static boolean skipBrokerScaleDownCheck(Kafka kafkaCr) {
*
* @param reconciliation Reconciliation marker
* @param kafkaCr Kafka custom resource
* @param nodePools KafkaNodePool custom resources
* @param nodePoolCrs KafkaNodePool custom resources
* @param oldStorage Old storage configuration
* @param currentPods List of current Kafka pods
* @param versionChange Version change descriptor containing any upgrade / downgrade changes
Expand All @@ -240,10 +243,10 @@ private static boolean skipBrokerScaleDownCheck(Kafka kafkaCr) {
*
* @return New KafkaCluster object
*/
static KafkaCluster createKafkaCluster(
public static KafkaCluster createKafkaCluster(
Reconciliation reconciliation,
Kafka kafkaCr,
List<KafkaNodePool> nodePools,
List<KafkaNodePool> nodePoolCrs,
Map<String, Storage> oldStorage,
Map<String, List<String>> currentPods,
KafkaVersionChange versionChange,
Expand All @@ -252,16 +255,16 @@ static KafkaCluster createKafkaCluster(
SharedEnvironmentProvider sharedEnvironmentProvider
) {
boolean isKRaftEnabled = useKRaftEnabled && ReconcilerUtils.kraftEnabled(kafkaCr);
List<KafkaPool> pools = NodePoolUtils.createKafkaPools(reconciliation, kafkaCr, nodePools, oldStorage, currentPods, isKRaftEnabled, sharedEnvironmentProvider);
String clusterId = isKRaftEnabled ? NodePoolUtils.getOrGenerateKRaftClusterId(kafkaCr, nodePools) : NodePoolUtils.getClusterIdIfSet(kafkaCr, nodePools);
List<KafkaPool> pools = NodePoolUtils.createKafkaPools(reconciliation, kafkaCr, nodePoolCrs, oldStorage, currentPods, isKRaftEnabled, sharedEnvironmentProvider);
String clusterId = isKRaftEnabled ? NodePoolUtils.getOrGenerateKRaftClusterId(kafkaCr, nodePoolCrs) : NodePoolUtils.getClusterIdIfSet(kafkaCr, nodePoolCrs);
return KafkaCluster.fromCrd(reconciliation, kafkaCr, pools, versions, versionChange, isKRaftEnabled, clusterId, sharedEnvironmentProvider);
}

/**
* Utility record to pass fixed custom resources between methods
*
* @param kafkaCr Kafka custom resource
* @param nodePools List of KafkaNodePool resources
* @param nodePoolCrs List of KafkaNodePool resources
*/
record KafkaAndNodePools(Kafka kafkaCr, List<KafkaNodePool> nodePools) { }
record KafkaAndNodePools(Kafka kafkaCr, List<KafkaNodePool> nodePoolCrs) { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import io.strimzi.operator.cluster.model.KafkaCluster;
import io.strimzi.operator.cluster.model.KafkaConfiguration;
import io.strimzi.operator.cluster.model.KafkaPool;
import io.strimzi.operator.cluster.model.KafkaVersionChange;
import io.strimzi.operator.cluster.model.ListenersUtils;
import io.strimzi.operator.cluster.model.MetricsAndLogging;
import io.strimzi.operator.cluster.model.NodeRef;
Expand Down Expand Up @@ -220,53 +219,6 @@ public KafkaReconciler(
this.adminClientProvider = supplier.adminClientProvider;
}

/**
* Constructs the Kafka reconciler
*
* @param reconciliation Reconciliation marker
* @param kafkaCr The Kafka custom resource
* @param nodePools List of KafkaNodePool resources belonging to this cluster
* @param oldStorage Maps with old storage configurations, where the key is the name of the controller
* resource (e.g. my-cluster-pool-a) and the value is the current storage configuration.
* @param currentPods Map with current pods, where the key is the name of the controller resource
* (e.g. my-cluster-pool-a) and the value is a list with Pod names
* @param clusterCa The Cluster CA instance
* @param clientsCa The Clients CA instance
* @param versionChange Description of Kafka upgrade / downgrade state
* @param config Cluster Operator Configuration
* @param supplier Supplier with Kubernetes Resource Operators
* @param pfa PlatformFeaturesAvailability describing the environment we run in
* @param vertx Vert.x instance
*/
public KafkaReconciler(
Reconciliation reconciliation,
Kafka kafkaCr,
List<KafkaNodePool> nodePools,
Map<String, Storage> oldStorage,
Map<String, List<String>> currentPods,
ClusterCa clusterCa,
ClientsCa clientsCa,
KafkaVersionChange versionChange,
ClusterOperatorConfig config,
ResourceOperatorSupplier supplier,
PlatformFeaturesAvailability pfa,
Vertx vertx
) {
// TODO: This constructor will be removed once tests are fixed
this(
reconciliation,
kafkaCr,
nodePools,
KafkaClusterCreator.createKafkaCluster(reconciliation, kafkaCr, nodePools, oldStorage, currentPods, versionChange, config.featureGates().useKRaftEnabled(), config.versions(), supplier.sharedEnvironmentProvider),
clusterCa,
clientsCa,
config,
supplier,
pfa,
vertx
);
}

/**
* The main reconciliation method which triggers the whole reconciliation pipeline. This is the method which is
* expected to be called from the outside to trigger the reconciliation.
Expand All @@ -279,7 +231,6 @@ public KafkaReconciler(
*/
public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
return modelWarnings(kafkaStatus)
//.compose(i -> brokerScaleDownCheck())
.compose(i -> manualPodCleaning())
.compose(i -> networkPolicy())
.compose(i -> manualRollingUpdate())
Expand Down Expand Up @@ -309,11 +260,6 @@ public Future<Void> reconcile(KafkaStatus kafkaStatus, Clock clock) {
.compose(i -> updateKafkaVersion(kafkaStatus));
}

protected Future<Void> brokerScaleDownCheck() {
// TODO: This will be removed once the tests are fixed
return Future.succeededFuture();
}

/**
* Takes the warning conditions from the Model and adds them in the KafkaStatus
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,9 +178,8 @@ public void testNoManualRollingUpdateWithPodSets(VertxTestContext context) {
supplier,
new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
kafka,
null, VERSION_CHANGE,
Map.of(),
Map.of(),
null,
kafkaCluster,
null,
null);

Expand Down Expand Up @@ -280,9 +279,8 @@ public void testManualRollingUpdateWithPodSets(VertxTestContext context) {
supplier,
new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
kafka,
null, VERSION_CHANGE,
Map.of(),
Map.of(),
null,
kafkaCluster,
null,
null);

Expand Down Expand Up @@ -396,9 +394,8 @@ public void testManualPodRollingUpdateWithPodSets(VertxTestContext context) {
supplier,
new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
kafka,
null, VERSION_CHANGE,
Map.of(),
Map.of(),
null,
kafkaCluster,
null,
null);

Expand Down Expand Up @@ -546,9 +543,7 @@ public void testManualPodRollingUpdateWithNodePools(VertxTestContext context) {
new PlatformFeaturesAvailability(false, KUBERNETES_VERSION),
kafka,
List.of(poolA, poolB),
VERSION_CHANGE,
Map.of(),
Map.of(),
kafkaCluster,
null,
null);

Expand Down Expand Up @@ -660,8 +655,8 @@ static class MockKafkaReconciler extends KafkaReconciler {
Function<Pod, RestartReasons> kafkaRestartReasons = null;
List<String> kafkaNodesNeedRestart = new ArrayList<>();

public MockKafkaReconciler(Reconciliation reconciliation, Vertx vertx, ClusterOperatorConfig config, ResourceOperatorSupplier supplier, PlatformFeaturesAvailability pfa, Kafka kafkaAssembly, List<KafkaNodePool> nodePools, KafkaVersionChange versionChange, Map<String, Storage> oldStorage, Map<String, List<String>> currentPods, ClusterCa clusterCa, ClientsCa clientsCa) {
super(reconciliation, kafkaAssembly, nodePools, oldStorage, currentPods, clusterCa, clientsCa, versionChange, config, supplier, pfa, vertx);
public MockKafkaReconciler(Reconciliation reconciliation, Vertx vertx, ClusterOperatorConfig config, ResourceOperatorSupplier supplier, PlatformFeaturesAvailability pfa, Kafka kafkaAssembly, List<KafkaNodePool> nodePools, KafkaCluster kafkaCluster, ClusterCa clusterCa, ClientsCa clientsCa) {
super(reconciliation, kafkaAssembly, nodePools, kafkaCluster, clusterCa, clientsCa, config, supplier, pfa, vertx);
}

@Override
Expand Down

0 comments on commit ff5eb9f

Please sign in to comment.