Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Integer instead of int in KafkaClusterSpec replicas #9663

Merged
merged 3 commits into from
Feb 11, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public class KafkaClusterSpec implements HasConfigurableMetrics, HasConfigurable
private String brokerRackInitImage;
private Rack rack;
private Logging logging;
private int replicas;
private Integer replicas;
private String image;
private ResourceRequirements resources;
private Probe livenessProbe;
Expand Down Expand Up @@ -164,11 +164,11 @@ public void setLogging(Logging logging) {
@Description("The number of pods in the cluster. " +
"This property is required when node pools are not used.")
@Minimum(1)
public int getReplicas() {
public Integer getReplicas() {
return replicas;
}

public void setReplicas(int replicas) {
public void setReplicas(Integer replicas) {
this.replicas = replicas;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public static void validateKafkaCrForZooKeeper(KafkaSpec kafkaSpec, boolean node
}

if (!nodePoolsEnabled) {
if (kafkaSpec.getKafka().getReplicas() == 0) {
if (kafkaSpec.getKafka().getReplicas() == null || kafkaSpec.getKafka().getReplicas() == 0) {
errors.add("The .spec.kafka.replicas property of the Kafka custom resource is missing. " +
"This property is required for a ZooKeeper-based Kafka cluster that is not using Node Pools.");
}
Expand Down Expand Up @@ -136,6 +136,7 @@ public static void kraftWarnings(Kafka kafkaCr, KafkaStatus kafkaStatus) {
*/
public static void nodePoolWarnings(Kafka kafkaCr, KafkaStatus kafkaStatus) {
if (kafkaCr.getSpec().getKafka() != null
&& kafkaCr.getSpec().getKafka().getReplicas() != null
&& kafkaCr.getSpec().getKafka().getReplicas() > 0) {
kafkaStatus.addCondition(StatusUtils.buildWarningCondition("UnusedReplicasConfiguration",
"The .spec.kafka.replicas property in the Kafka custom resource is ignored when node pools " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import io.strimzi.api.kafka.model.nodepool.ProcessRoles;

import java.util.List;
import java.util.Optional;
import java.util.stream.IntStream;

/**
Expand Down Expand Up @@ -48,7 +49,7 @@ public static KafkaNodePool convertKafkaToVirtualNodePool(Kafka kafka, Integer e
.withLabels(kafka.getMetadata().getLabels())
.endMetadata()
.withNewSpec()
.withReplicas(kafka.getSpec().getKafka().getReplicas())
.withReplicas(Optional.ofNullable(kafka.getSpec().getKafka().getReplicas()).orElse(0))
im-konge marked this conversation as resolved.
Show resolved Hide resolved
.withStorage(kafka.getSpec().getKafka().getStorage())
.withRoles(List.of(ProcessRoles.BROKER)) // We do not need to care about the controller role here since this is only used with ZooKeeper based clusters
.withResources(kafka.getSpec().getKafka().getResources())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -208,7 +209,7 @@ private Future<KafkaAndNodePools> revertScaleDown(KafkaCluster kafka, Kafka kafk
if (scaleDownCheckFailed) {
if (nodePoolCrs == null || nodePoolCrs.isEmpty()) {
// There are no node pools => the Kafka CR is used
int newReplicasCount = kafkaCr.getSpec().getKafka().getReplicas() + kafka.removedNodes().size();
int newReplicasCount = Optional.ofNullable(kafkaCr.getSpec().getKafka().getReplicas()).orElse(0) + kafka.removedNodes().size();
im-konge marked this conversation as resolved.
Show resolved Hide resolved
LOGGER.warnCr(reconciliation, "Reverting scale-down of Kafka {} by changing number of replicas to {}", kafkaCr.getMetadata().getName(), newReplicasCount);

Kafka newKafkaCr = new KafkaBuilder(kafkaCr)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public static Secret createInitialCaKeySecret(String clusterNamespace, String cl
.build();
}

public static Kafka createKafka(String namespace, String name, int replicas,
public static Kafka createKafka(String namespace, String name, Integer replicas,
im-konge marked this conversation as resolved.
Show resolved Hide resolved
String image, int healthDelay, int healthTimeout,
MetricsConfig metricsConfig,
Map<String, Object> kafkaConfigurationJson,
Expand All @@ -251,7 +251,7 @@ healthTimeout, metricsConfig, kafkaConfigurationJson, emptyMap()))
}

@SuppressWarnings({"checkstyle:ParameterNumber"})
public static Kafka createKafka(String namespace, String name, int replicas,
public static Kafka createKafka(String namespace, String name, Integer replicas,
String image, int healthDelay, int healthTimeout,
MetricsConfig metricsConfig,
Map<String, Object> kafkaConfiguration,
Expand Down