Skip to content

Commit

Permalink
Reflect the current roles when rolling Kafka nodes in KafkaRoller - C…
Browse files Browse the repository at this point in the history
…loses #9434

Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj committed Feb 14, 2024
1 parent 891624a commit 3861b0d
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 21 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Remove support for Apache Kafka 3.5.0 and 3.5.1
* The `UseKRaft` feature gate moves to beta stage and is enabled by default.
If needed, `UseKRaft` can be disabled in the feature gates configuration in the Cluster Operator.
* Add support for moving from dedicated controller-only KRaft nodes to mixed KRaft nodes
* Fix NullPointerException from missing listenerConfig when using custom auth
* Added support for Kafka Exporter `offset.show-all` parameter
* Prevent removal of the `broker` process role from KRaft mixed-nodes that have assigned partition-replicas
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.fabric8.kubernetes.api.model.ContainerStateWaiting;
import io.fabric8.kubernetes.api.model.ContainerStatus;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.Secret;
import io.fabric8.kubernetes.client.KubernetesClientException;
Expand All @@ -24,6 +25,7 @@
import io.strimzi.operator.common.ReconciliationLogger;
import io.strimzi.operator.common.Util;
import io.strimzi.operator.common.VertxUtil;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.model.OrderedProperties;
import io.strimzi.operator.common.operator.resource.PodOperator;
import io.vertx.core.Future;
Expand Down Expand Up @@ -458,16 +460,21 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext)

restartContext.restartReasons = podNeedsRestart.apply(pod);

// We try to detect the current roles. If we fail to do so, we optimistically assume the roles did not
// change and the desired roles still apply.
boolean isBroker = Optional.ofNullable(isAlreadyBroker(pod)).orElse(nodeRef.broker());
boolean isController = Optional.ofNullable(isAlreadyController(pod)).orElse(nodeRef.controller());

try {
checkIfRestartOrReconfigureRequired(nodeRef, restartContext);
checkIfRestartOrReconfigureRequired(nodeRef, isController, isBroker, restartContext);
if (restartContext.forceRestart) {
LOGGER.debugCr(reconciliation, "Pod {} can be rolled now", nodeRef);
restartAndAwaitReadiness(pod, operationTimeoutMs, TimeUnit.MILLISECONDS, restartContext);
} else if (restartContext.needsRestart || restartContext.needsReconfig) {
if (deferController(nodeRef, restartContext)) {
LOGGER.debugCr(reconciliation, "Pod {} is the active controller and there are other pods to verify first.", nodeRef);
throw new ForceableProblem("Pod " + nodeRef.podName() + " is the active controller and there are other pods to verify first");
} else if (!canRoll(nodeRef, 60, TimeUnit.SECONDS, false, restartContext)) {
} else if (!canRoll(nodeRef.nodeId(), isController, isBroker, 60, TimeUnit.SECONDS, false, restartContext)) {
LOGGER.debugCr(reconciliation, "Pod {} cannot be updated right now", nodeRef);
throw new UnforceableProblem("Pod " + nodeRef.podName() + " cannot be updated right now.");
} else {
Expand All @@ -491,7 +498,7 @@ private void restartIfNecessary(NodeRef nodeRef, RestartContext restartContext)
} catch (ForceableProblem e) {
if (restartContext.podStuck || restartContext.backOff.done() || e.forceNow) {

if (canRoll(nodeRef, 60_000, TimeUnit.MILLISECONDS, true, restartContext)) {
if (canRoll(nodeRef.nodeId(), isController, isBroker, 60_000, TimeUnit.MILLISECONDS, true, restartContext)) {
String errorMsg = e.getMessage();

if (e.getCause() != null) {
Expand Down Expand Up @@ -589,7 +596,7 @@ private void markRestartContextWithForceRestart(RestartContext restartContext) {
* Determine whether the pod should be restarted, or the broker reconfigured.
*/
@SuppressWarnings("checkstyle:CyclomaticComplexity")
private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem {
private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, boolean isController, boolean isBroker, RestartContext restartContext) throws ForceableProblem, InterruptedException, FatalProblem, UnforceableProblem {
RestartReasons reasonToRestartPod = restartContext.restartReasons;
if (restartContext.podStuck && !reasonToRestartPod.contains(RestartReason.POD_HAS_OLD_REVISION)) {
// If the pod is unschedulable then deleting it, or trying to open an Admin client to it will make no difference
Expand All @@ -609,8 +616,8 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext
KafkaBrokerConfigurationDiff brokerConfigDiff = null;
KafkaBrokerLoggingConfigurationDiff brokerLoggingDiff = null;
boolean needsReconfig = false;
if (nodeRef.controller()) {

if (isController) {
if (maybeInitControllerAdminClient()) {
String controllerQuorumFetchTimeout = CONTROLLER_QUORUM_FETCH_TIMEOUT_MS_CONFIG_DEFAULT;
String desiredConfig = kafkaConfigProvider.apply(nodeRef.nodeId());
Expand All @@ -624,7 +631,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext
} else {
//TODO When https://github.com/strimzi/strimzi-kafka-operator/issues/8593 is complete
// we should change this logic to immediately restart this pod because we cannot connect to it.
if (nodeRef.broker()) {
if (isBroker) {
// If it is a combined node (controller and broker) and the admin client cannot be initialised,
// restart this pod. There is no reason to continue as we won't be able to
// connect an admin client to this pod for other checks later.
Expand All @@ -641,8 +648,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext
}
}

if (nodeRef.broker()) {

if (isBroker) {
if (!maybeInitBrokerAdminClient()) {
LOGGER.infoCr(reconciliation, "Pod {} needs to be restarted, because it does not seem to responding to connection attempts", nodeRef);
reasonToRestartPod.add(RestartReason.POD_UNRESPONSIVE);
Expand Down Expand Up @@ -700,7 +706,7 @@ private void checkIfRestartOrReconfigureRequired(NodeRef nodeRef, RestartContext
* @param nodeRef The reference of the broker.
* @return a Future which completes with the config of the given broker.
*/
protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, InterruptedException {
/* test */ Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, InterruptedException {
ConfigResource resource = new ConfigResource(ConfigResource.Type.BROKER, String.valueOf(nodeRef.nodeId()));
return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)),
30, TimeUnit.SECONDS,
Expand All @@ -713,15 +719,15 @@ protected Config brokerConfig(NodeRef nodeRef) throws ForceableProblem, Interrup
* @param brokerId The id of the broker.
* @return a Future which completes with the logging of the given broker.
*/
protected Config brokerLogging(int brokerId) throws ForceableProblem, InterruptedException {
/* test */ Config brokerLogging(int brokerId) throws ForceableProblem, InterruptedException {
ConfigResource resource = Util.getBrokersLogging(brokerId);
return await(VertxUtil.kafkaFutureToVertxFuture(reconciliation, vertx, brokerAdminClient.describeConfigs(singletonList(resource)).values().get(resource)),
30, TimeUnit.SECONDS,
error -> new ForceableProblem("Error getting broker logging", error)
);
}

protected void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff, KafkaBrokerLoggingConfigurationDiff logDiff)
/* test */ void dynamicUpdateBrokerConfig(NodeRef nodeRef, Admin ac, KafkaBrokerConfigurationDiff configurationDiff, KafkaBrokerLoggingConfigurationDiff logDiff)
throws ForceableProblem, InterruptedException {
Map<ConfigResource, Collection<AlterConfigOp>> updatedConfig = new HashMap<>(2);
var podId = nodeRef.nodeId();
Expand Down Expand Up @@ -804,20 +810,20 @@ public FatalProblem(String message) {
}
}

private boolean canRoll(NodeRef nodeRef, long timeout, TimeUnit unit, boolean ignoreSslError, RestartContext restartContext)
private boolean canRoll(int nodeId, boolean isController, boolean isBroker, long timeout, TimeUnit unit, boolean ignoreSslError, RestartContext restartContext)
throws ForceableProblem, InterruptedException, UnforceableProblem {
try {
if (nodeRef.broker() && nodeRef.controller()) {
boolean canRollController = await(restartContext.quorumCheck.canRollController(nodeRef.nodeId()), timeout, unit,
if (isBroker && isController) {
boolean canRollController = await(restartContext.quorumCheck.canRollController(nodeId), timeout, unit,
t -> new UnforceableProblem("An error while trying to determine the possibility of updating Kafka controller pods", t));
boolean canRollBroker = await(availability(brokerAdminClient).canRoll(nodeRef.nodeId()), timeout, unit,
boolean canRollBroker = await(availability(brokerAdminClient).canRoll(nodeId), timeout, unit,
t -> new ForceableProblem("An error while trying to determine the possibility of updating Kafka broker pods", t));
return canRollController && canRollBroker;
} else if (nodeRef.controller()) {
return await(restartContext.quorumCheck.canRollController(nodeRef.nodeId()), timeout, unit,
} else if (isController) {
return await(restartContext.quorumCheck.canRollController(nodeId), timeout, unit,
t -> new UnforceableProblem("An error while trying to determine the possibility of updating Kafka controller pods", t));
} else {
return await(availability(brokerAdminClient).canRoll(nodeRef.nodeId()), timeout, unit,
return await(availability(brokerAdminClient).canRoll(nodeId), timeout, unit,
t -> new ForceableProblem("An error while trying to determine the possibility of updating Kafka broker pods", t));
}
} catch (ForceableProblem | UnforceableProblem e) {
Expand Down Expand Up @@ -909,7 +915,7 @@ protected Future<Void> restart(Pod pod, RestartContext restartContext) {
* Returns an AdminClient instance bootstrapped from the given nodes. If nodes is an
* empty set, use the brokers service to bootstrap the client.
*/
protected Admin adminClient(Set<NodeRef> nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem {
/* test */ Admin adminClient(Set<NodeRef> nodes, boolean ceShouldBeFatal) throws ForceableProblem, FatalProblem {
// If no nodes are passed initialize the admin client using the brokers service
// TODO when https://github.com/strimzi/strimzi-kafka-operator/issues/8593 is completed review whether
// this function can be reverted to expect nodes to be non empty
Expand All @@ -935,11 +941,11 @@ protected Admin adminClient(Set<NodeRef> nodes, boolean ceShouldBeFatal) throws
}
}

protected KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
/* test */ KafkaQuorumCheck quorumCheck(Admin ac, long controllerQuorumFetchTimeoutMs) {
return new KafkaQuorumCheck(reconciliation, ac, vertx, controllerQuorumFetchTimeoutMs);
}

protected KafkaAvailability availability(Admin ac) {
/* test */ KafkaAvailability availability(Admin ac) {
return new KafkaAvailability(reconciliation, ac);
}

Expand Down Expand Up @@ -1047,5 +1053,49 @@ protected Future<Void> isReady(String namespace, String podName) {
return Future.failedFuture(error);
});
}

/**
* Checks from the Pod labels if the Kafka node is already a broker or not.
*
* @param pod Current Pod
*
* @return True if the Pod is already a broker, False if it is currently not a broker or null if we don't know
* (e.g. the label is missing)
*/
/* test */ static Boolean isAlreadyBroker(Pod pod) {
return checkBooleanLabel(pod, Labels.STRIMZI_BROKER_ROLE_LABEL);
}

/**
* Checks from the Pod labels if the Kafka node is already a controller or not.
*
* @param pod Current Pod
*
* @return True if the Pod is already a controller, False if it is currently not a controller or null if we don't
* know (e.g. the label is missing)
*/
/* test */ static Boolean isAlreadyController(Pod pod) {
return checkBooleanLabel(pod, Labels.STRIMZI_CONTROLLER_ROLE_LABEL);
}

/**
* Generic method to extract a boolean value from Kubernetes resource labels
*
* @param pod Kube resource with metadata
* @param annotation Name of the label for which we want to extract the boolean value
*
* @return True if the label is present and is set to `true`. False if it is present and not set to `true`.
* Null if it is not present.
*/
private static Boolean checkBooleanLabel(HasMetadata pod, String annotation) {
if (pod != null
&& pod.getMetadata() != null
&& pod.getMetadata().getLabels() != null
&& pod.getMetadata().getLabels().containsKey(annotation)) {
return "true".equalsIgnoreCase(pod.getMetadata().getLabels().get(annotation));
} else {
return null;
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.strimzi.operator.common.BackOff;
import io.strimzi.operator.common.DefaultAdminClientProvider;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.Labels;
import io.strimzi.operator.common.operator.resource.PodOperator;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
Expand Down Expand Up @@ -68,6 +69,7 @@
import static java.util.Collections.singletonList;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
Expand Down Expand Up @@ -704,6 +706,33 @@ podOps, noException(), null, noException(), noException(), noException(),
asList(7, 4, 3, 5, 6, 8, 1, 0, 2)); //Rolls in order: unready controllers, ready controllers, unready brokers, ready brokers
}

@Test
public void testExistingRoles() {
// No pod
assertThat(KafkaRoller.isAlreadyBroker(null), is(nullValue()));
assertThat(KafkaRoller.isAlreadyController(null), is(nullValue()));

// No annotation
Pod pod = new PodBuilder().withNewMetadata().withName("my-pod").endMetadata().build();
assertThat(KafkaRoller.isAlreadyBroker(pod), is(nullValue()));
assertThat(KafkaRoller.isAlreadyController(pod), is(nullValue()));

// Annotation set to false
pod = new PodBuilder().withNewMetadata().withName("my-pod").withLabels(Map.of(Labels.STRIMZI_BROKER_ROLE_LABEL, "grr", Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "meh")).endMetadata().build();
assertThat(KafkaRoller.isAlreadyBroker(pod), is(false));
assertThat(KafkaRoller.isAlreadyController(pod), is(false));

// Annotation set to wrong value
pod = new PodBuilder().withNewMetadata().withName("my-pod").withLabels(Map.of(Labels.STRIMZI_BROKER_ROLE_LABEL, "false", Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "false")).endMetadata().build();
assertThat(KafkaRoller.isAlreadyBroker(pod), is(false));
assertThat(KafkaRoller.isAlreadyController(pod), is(false));

// Annotation set to true
pod = new PodBuilder().withNewMetadata().withName("my-pod").withLabels(Map.of(Labels.STRIMZI_BROKER_ROLE_LABEL, "true", Labels.STRIMZI_CONTROLLER_ROLE_LABEL, "true")).endMetadata().build();
assertThat(KafkaRoller.isAlreadyBroker(pod), is(true));
assertThat(KafkaRoller.isAlreadyController(pod), is(true));
}

private TestingKafkaRoller rollerWithControllers(PodOperator podOps, int... controllers) {
return new TestingKafkaRoller(null, null, addPodNames(KafkaRollerTest.REPLICAS), podOps,
noException(), null, noException(), noException(), noException(),
Expand Down

0 comments on commit 3861b0d

Please sign in to comment.