Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add warnings when unused fields are set when Node Pools or Kraft are used #9553

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@
*/
package io.strimzi.operator.cluster.model;

import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaSpec;
import io.strimzi.api.kafka.model.kafka.KafkaStatus;
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorSpec;
import io.strimzi.operator.common.model.InvalidResourceException;
import io.strimzi.operator.common.model.StatusUtils;
import org.apache.kafka.server.common.MetadataVersion;

import java.util.HashSet;
Expand Down Expand Up @@ -106,4 +109,44 @@ public static void validateKafkaCrForZooKeeper(KafkaSpec kafkaSpec, boolean node
throw new InvalidResourceException("Kafka configuration is not valid: " + errors);
}
}

/**
* Generates Kafka CR status warnings about the fields ignored in Kraft mode if they are set - the ZooKeeper section
* and Kafka replicas and storage configuration.
*
* @param kafkaCr The Kafka custom resource
* @param kafkaStatus The Kafka Status to add the warnings to
*/
public static void kraftWarnings(Kafka kafkaCr, KafkaStatus kafkaStatus) {
if (kafkaCr.getSpec().getZookeeper() != null) {
kafkaStatus.addCondition(StatusUtils.buildWarningCondition("UnusedZooKeeperConfiguration",
"The .spec.zookeeper section in the Kafka custom resource is ignored in KRaft mode and " +
"should be removed from the custom resource."));
}

nodePoolWarnings(kafkaCr, kafkaStatus);
}

/**
* Generates Kafka CR status warnings about the fields ignored when node pools are used if they are set - the
* replicas and storage configuration.
*
* @param kafkaCr The Kafka custom resource
* @param kafkaStatus The Kafka Status to add the warnings to
*/
public static void nodePoolWarnings(Kafka kafkaCr, KafkaStatus kafkaStatus) {
if (kafkaCr.getSpec().getKafka() != null
&& kafkaCr.getSpec().getKafka().getReplicas() > 0) {
kafkaStatus.addCondition(StatusUtils.buildWarningCondition("UnusedReplicasConfiguration",
"The .spec.kafka.replicas property in the Kafka custom resource is ignored when node pools " +
"are used and should be removed from the custom resource."));
}

if (kafkaCr.getSpec().getKafka() != null
&& kafkaCr.getSpec().getKafka().getStorage() != null) {
kafkaStatus.addCondition(StatusUtils.buildWarningCondition("UnusedStorageConfiguration",
"The .spec.kafka.storage section in the Kafka custom resource is ignored when node pools " +
"are used and should be removed from the custom resource."));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -220,13 +220,18 @@ Future<Void> reconcile(ReconciliationState reconcileState) {
// Validates features which are currently not supported in KRaft mode
try {
KRaftUtils.validateKafkaCrForKRaft(reconcileState.kafkaAssembly.getSpec(), featureGates.unidirectionalTopicOperatorEnabled());
KRaftUtils.kraftWarnings(reconcileState.kafkaAssembly, reconcileState.kafkaStatus);
} catch (InvalidResourceException e) {
return Future.failedFuture(e);
}
} else {
// Validates the properties required for a ZooKeeper based Kafka cluster
try {
KRaftUtils.validateKafkaCrForZooKeeper(reconcileState.kafkaAssembly.getSpec(), nodePoolsEnabled);

if (nodePoolsEnabled) {
KRaftUtils.nodePoolWarnings(reconcileState.kafkaAssembly, reconcileState.kafkaStatus);
}
} catch (InvalidResourceException e) {
return Future.failedFuture(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
*/
package io.strimzi.operator.cluster.model;

import io.strimzi.api.kafka.model.common.Condition;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaBuilder;
import io.strimzi.api.kafka.model.kafka.KafkaSpec;
import io.strimzi.api.kafka.model.kafka.KafkaSpecBuilder;
import io.strimzi.api.kafka.model.kafka.KafkaStatus;
import io.strimzi.api.kafka.model.kafka.PersistentClaimStorageBuilder;
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorSpec;
import io.strimzi.api.kafka.model.kafka.entityoperator.EntityOperatorSpecBuilder;
Expand Down Expand Up @@ -274,4 +278,97 @@ public void testZooBasedClusterWithMissingReplicasAndStorage() {
assertThat(e.getMessage(), containsString("The .spec.kafka.replicas property of the Kafka custom resource is missing. This property is required for a ZooKeeper-based Kafka cluster that is not using Node Pools."));
assertThat(e.getMessage(), containsString("The .spec.kafka.storage section of the Kafka custom resource is missing. This section is required for a ZooKeeper-based Kafka cluster that is not using Node Pools."));
}

@ParallelTest
public void testKRaftWarnings() {
Kafka kafka = new KafkaBuilder()
.withNewSpec()
.withNewZookeeper()
.withReplicas(3)
.withNewEphemeralStorage()
.endEphemeralStorage()
.endZookeeper()
.withNewKafka()
.withReplicas(3)
.withListeners(new GenericKafkaListenerBuilder()
.withName("listener")
.withPort(9092)
.withTls(true)
.withType(KafkaListenerType.INTERNAL)
.withNewKafkaListenerAuthenticationTlsAuth()
.endKafkaListenerAuthenticationTlsAuth()
.build())
.withNewEphemeralStorage()
.endEphemeralStorage()
.withNewKafkaAuthorizationOpa()
.withUrl("http://opa:8080")
.endKafkaAuthorizationOpa()
.endKafka()
.endSpec()
.build();

KafkaStatus status = new KafkaStatus();
KRaftUtils.kraftWarnings(kafka, status);

assertThat(status.getConditions().size(), is(3));

Condition condition = status.getConditions().stream().filter(c -> "UnusedZooKeeperConfiguration".equals(c.getReason())).findFirst().orElseThrow();
assertThat(condition.getMessage(), is("The .spec.zookeeper section in the Kafka custom resource is ignored in KRaft mode and should be removed from the custom resource."));
assertThat(condition.getType(), is("Warning"));
assertThat(condition.getStatus(), is("True"));

condition = status.getConditions().stream().filter(c -> "UnusedReplicasConfiguration".equals(c.getReason())).findFirst().orElseThrow();
assertThat(condition.getMessage(), is("The .spec.kafka.replicas property in the Kafka custom resource is ignored when node pools are used and should be removed from the custom resource."));
assertThat(condition.getType(), is("Warning"));
assertThat(condition.getStatus(), is("True"));

condition = status.getConditions().stream().filter(c -> "UnusedStorageConfiguration".equals(c.getReason())).findFirst().orElseThrow();
assertThat(condition.getMessage(), is("The .spec.kafka.storage section in the Kafka custom resource is ignored when node pools are used and should be removed from the custom resource."));
assertThat(condition.getType(), is("Warning"));
assertThat(condition.getStatus(), is("True"));
}

@ParallelTest
public void testZooKeeperWarnings() {
Kafka kafka = new KafkaBuilder()
.withNewSpec()
.withNewZookeeper()
.withReplicas(3)
.withNewEphemeralStorage()
.endEphemeralStorage()
.endZookeeper()
.withNewKafka()
.withReplicas(3)
.withListeners(new GenericKafkaListenerBuilder()
.withName("listener")
.withPort(9092)
.withTls(true)
.withType(KafkaListenerType.INTERNAL)
.withNewKafkaListenerAuthenticationTlsAuth()
.endKafkaListenerAuthenticationTlsAuth()
.build())
.withNewEphemeralStorage()
.endEphemeralStorage()
.withNewKafkaAuthorizationOpa()
.withUrl("http://opa:8080")
.endKafkaAuthorizationOpa()
.endKafka()
.endSpec()
.build();

KafkaStatus status = new KafkaStatus();
KRaftUtils.nodePoolWarnings(kafka, status);

assertThat(status.getConditions().size(), is(2));

Condition condition = status.getConditions().stream().filter(c -> "UnusedReplicasConfiguration".equals(c.getReason())).findFirst().orElseThrow();
assertThat(condition.getMessage(), is("The .spec.kafka.replicas property in the Kafka custom resource is ignored when node pools are used and should be removed from the custom resource."));
assertThat(condition.getType(), is("Warning"));
assertThat(condition.getStatus(), is("True"));

condition = status.getConditions().stream().filter(c -> "UnusedStorageConfiguration".equals(c.getReason())).findFirst().orElseThrow();
assertThat(condition.getMessage(), is("The .spec.kafka.storage section in the Kafka custom resource is ignored when node pools are used and should be removed from the custom resource."));
assertThat(condition.getType(), is("Warning"));
assertThat(condition.getStatus(), is("True"));
}
}