Skip to content

Commit

Permalink
Merge pull request #858 from strimzi/EN417
Browse files Browse the repository at this point in the history
changing metrics setting does not need to restart pods
  • Loading branch information
sknot-rh committed Sep 21, 2018
2 parents f769af0 + 16d1e4e commit ecc2a64
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 6 deletions.
Expand Up @@ -4,6 +4,8 @@
*/
package io.strimzi.operator.cluster.operator.assembly;

import com.fasterxml.jackson.databind.JsonNode;
import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.Doneable;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
Expand All @@ -13,6 +15,7 @@
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.dsl.Resource;
import io.fabric8.zjsonpatch.JsonDiff;
import io.strimzi.certs.CertManager;
import io.strimzi.operator.cluster.InvalidConfigParameterException;
import io.strimzi.operator.common.Reconciliation;
Expand All @@ -35,6 +38,8 @@
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static io.fabric8.kubernetes.client.internal.PatchUtils.patchMapper;

/**
* <p>Abstract assembly creation, update, read, deletion, etc.</p>
*
Expand Down Expand Up @@ -292,4 +297,24 @@ private void handleResult(Reconciliation reconciliation, AsyncResult<Void> resul
}
}
}

/**
* @param current Previsous ConfigMap
* @param desired Desired ConfigMap
* @return Returns true if only metrics settings has been changed
*/
public boolean onlyMetricsSettingChanged(ConfigMap current, ConfigMap desired) {
if ((current == null && desired != null) || (current != null && desired == null)) {
// Metrics were added or deleted. We want rolling update
return false;
}
JsonNode diff = JsonDiff.asJson(patchMapper().valueToTree(current), patchMapper().valueToTree(desired));
boolean onlyMetricsSettingChanged = false;
for (JsonNode d : diff) {
if (d.get("path").asText().equals("/data/metrics-config.yml") && d.get("op").asText().equals("replace")) {
onlyMetricsSettingChanged = true;
}
}
return onlyMetricsSettingChanged && diff.size() == 1;
}
}
Expand Up @@ -57,6 +57,7 @@
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.function.BiFunction;

import static io.strimzi.operator.cluster.model.ModelUtils.findSecretWithName;

Expand Down Expand Up @@ -338,8 +339,33 @@ Future<ReconciliationState> zkHeadlessService() {
return withVoid(serviceOperations.reconcile(namespace, zkCluster.getHeadlessServiceName(), zkHeadlessService));
}

Future<ReconciliationState> getReconciliationStateOfConfigMap(AbstractModel cluster, ConfigMap configMap, BiFunction<Boolean, Future<ReconcileResult<ConfigMap>>, Future<ReconciliationState>> function) {
Future<ReconciliationState> result = Future.future();

vertx.createSharedWorkerExecutor("kubernetes-ops-pool").<Boolean>executeBlocking(
future -> {
ConfigMap current = configMapOperations.get(namespace, cluster.getAncillaryConfigName());
boolean onlyMetricsSettingChanged = onlyMetricsSettingChanged(current, configMap);
future.complete(onlyMetricsSettingChanged);
}, res -> {
if (res.succeeded()) {
boolean onlyMetricsSettingChanged = res.result();
function.apply(onlyMetricsSettingChanged, configMapOperations.reconcile(namespace, cluster.getAncillaryConfigName(), configMap)).setHandler(res2 -> {
if (res2.succeeded()) {
result.complete(res2.result());
} else {
result.fail(res2.cause());
}
});
} else {
result.fail(res.cause());
}
});
return result;
}

Future<ReconciliationState> zkAncillaryCm() {
return withZkAncillaryCmChanged(configMapOperations.reconcile(namespace, zkCluster.getAncillaryConfigName(), zkMetricsAndLogsConfigMap));
return getReconciliationStateOfConfigMap(zkCluster, zkMetricsAndLogsConfigMap, this::withZkAncillaryCmChanged);
}

Future<ReconciliationState> zkNodesSecret() {
Expand Down Expand Up @@ -370,9 +396,14 @@ Future<ReconciliationState> zkHeadlessServiceEndpointReadiness() {
return withVoid(serviceOperations.endpointReadiness(namespace, zkHeadlessService, 1_000, operationTimeoutMs));
}

Future<ReconciliationState> withZkAncillaryCmChanged(Future<ReconcileResult<ConfigMap>> r) {
Future<ReconciliationState> withZkAncillaryCmChanged(boolean onlyMetricsSettingChanged, Future<ReconcileResult<ConfigMap>> r) {
return r.map(rr -> {
this.zkForcedRestart = rr instanceof ReconcileResult.Patched;
if (onlyMetricsSettingChanged) {
log.debug("Only metrics setting changed - not triggering rolling update");
this.zkForcedRestart = false;
} else {
this.zkForcedRestart = rr instanceof ReconcileResult.Patched;
}
return this;
});
}
Expand Down Expand Up @@ -416,9 +447,14 @@ Future<ReconciliationState> withKafkaDiff(Future<ReconcileResult<StatefulSet>> r
});
}

Future<ReconciliationState> withKafkaAncillaryCmChanged(Future<ReconcileResult<ConfigMap>> r) {
Future<ReconciliationState> withKafkaAncillaryCmChanged(boolean onlyMetricsSettingChanged, Future<ReconcileResult<ConfigMap>> r) {
return r.map(rr -> {
this.kafkaForcedRestart = rr instanceof ReconcileResult.Patched;
if (onlyMetricsSettingChanged) {
log.debug("Only metrics setting changed - not triggering rolling update");
this.kafkaForcedRestart = false;
} else {
this.kafkaForcedRestart = rr instanceof ReconcileResult.Patched;
}
return this;
});
}
Expand Down Expand Up @@ -738,7 +774,7 @@ Future<ReconciliationState> kafkaGenerateCertificates() {
}

Future<ReconciliationState> kafkaAncillaryCm() {
return withKafkaAncillaryCmChanged(configMapOperations.reconcile(namespace, kafkaCluster.getAncillaryConfigName(), kafkaMetricsAndLogsConfigMap));
return getReconciliationStateOfConfigMap(kafkaCluster, kafkaMetricsAndLogsConfigMap, this::withKafkaAncillaryCmChanged);
}

Future<ReconciliationState> kafkaClientsCaSecret() {
Expand Down

0 comments on commit ecc2a64

Please sign in to comment.