From 641f38c9125889b46c44662e7c71011b36c96ad7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 23 Nov 2022 12:40:37 +0100 Subject: [PATCH] feat: runtime info for health probes (#1594) --- docs/documentation/features.md | 12 +++ .../io/javaoperatorsdk/operator/Operator.java | 11 ++- .../operator/RegisteredController.java | 5 ++ .../javaoperatorsdk/operator/RuntimeInfo.java | 81 +++++++++++++++++++ .../operator/health/ControllerHealthInfo.java | 50 ++++++++++++ .../health/EventSourceHealthIndicator.java | 6 ++ .../health/InformerHealthIndicator.java | 17 ++++ ...merWrappingEventSourceHealthIndicator.java | 22 +++++ .../operator/health/Status.java | 11 +++ .../operator/processing/Controller.java | 8 ++ .../processing/event/EventSourceManager.java | 10 ++- .../processing/event/EventSources.java | 7 ++ .../processing/event/source/EventSource.java | 9 ++- .../source/informer/InformerManager.java | 25 +++--- .../source/informer/InformerWrapper.java | 28 ++++++- .../informer/ManagedInformerEventSource.java | 23 +++++- .../PerResourcePollingEventSource.java | 2 + .../source/polling/PollingEventSource.java | 23 ++++-- .../polling/PollingEventSourceTest.java | 27 ++++++- .../operator/InformerRelatedBehaviorITS.java | 45 ++++++++++- ...InformerRelatedBehaviorTestReconciler.java | 9 ++- sample-operators/webpage/k8s/operator.yaml | 12 ++- .../operator/sample/LivenessHandler.java | 29 +++++++ .../operator/sample/StartupHandler.java | 37 +++++++++ .../operator/sample/WebPageOperator.java | 16 ++-- 25 files changed, 486 insertions(+), 39 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java create mode 100644 sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java create mode 100644 sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java diff --git a/docs/documentation/features.md b/docs/documentation/features.md index 195c4ec55e..b4cbd1f2fd 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -699,6 +699,18 @@ leader left off should one of them become elected leader. See sample configuration in the [E2E test](https://github.com/java-operator-sdk/java-operator-sdk/blob/8865302ac0346ee31f2d7b348997ec2913d5922b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestOperator.java#L21-L23) . +## Runtime Info + +[RuntimeInfo](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java#L16-L16) +is used mainly to check the actual health of event sources. Based on this information it is easy to implement custom +liveness probes. + +[stopOnInformerErrorDuringStartup](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L168-L168) +setting, where this flag usually needs to be set to false, in order to control the exact liveness properties. + +See also an example implementation in the +[WebPage sample](https://github.com/java-operator-sdk/java-operator-sdk/blob/3e2e7c4c834ef1c409d636156b988125744ca911/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java#L38-L43) + ## Monitoring with Micrometer ## Automatic Generation of CRDs diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 036db02dba..ea9999d5b0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -80,12 +80,11 @@ public KubernetesClient getKubernetesClient() { * where there is no obvious entrypoint to the application which can trigger the injection process * and start the cluster monitoring processes. */ - public void start() { + public synchronized void start() { try { if (started) { return; } - started = true; controllerManager.shouldStart(); final var version = ConfigurationServiceProvider.instance().getVersion(); log.info( @@ -101,6 +100,7 @@ public void start() { // the leader election would start subsequently the processor if on controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled()); leaderElectionManager.start(); + started = true; } catch (Exception e) { log.error("Error starting operator", e); stop(); @@ -208,4 +208,11 @@ public int getRegisteredControllersNumber() { return controllerManager.size(); } + public RuntimeInfo getRuntimeInfo() { + return new RuntimeInfo(this); + } + + boolean isStarted() { + return started; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java index 832c2df6ee..88cd0123b0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RegisteredController.java @@ -3,7 +3,12 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; +import io.javaoperatorsdk.operator.health.ControllerHealthInfo; public interface RegisteredController

extends NamespaceChangeable { + ControllerConfiguration

getConfiguration(); + + ControllerHealthInfo getControllerHealthInfo(); + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java new file mode 100644 index 0000000000..961e519d62 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java @@ -0,0 +1,81 @@ +package io.javaoperatorsdk.operator; + +import java.util.*; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; + +/** + * RuntimeInfo in general is available when operator is fully started. You can use "isStarted" to + * check that. + */ +@SuppressWarnings("rawtypes") +public class RuntimeInfo { + + private static final Logger log = LoggerFactory.getLogger(RuntimeInfo.class); + + private final Set registeredControllers; + private final Operator operator; + + public RuntimeInfo(Operator operator) { + this.registeredControllers = operator.getRegisteredControllers(); + this.operator = operator; + } + + public boolean isStarted() { + return operator.isStarted(); + } + + public Set getRegisteredControllers() { + checkIfStarted(); + return registeredControllers; + } + + private void checkIfStarted() { + if (!isStarted()) { + log.warn( + "Operator not started yet while accessing runtime info, this might lead to an unreliable behavior"); + } + } + + public boolean allEventSourcesAreHealthy() { + checkIfStarted(); + return registeredControllers.stream() + .filter(rc -> !rc.getControllerHealthInfo().unhealthyEventSources().isEmpty()) + .findFirst().isEmpty(); + } + + /** + * @return Aggregated Map with controller related event sources. + */ + + public Map> unhealthyEventSources() { + checkIfStarted(); + Map> res = new HashMap<>(); + for (var rc : registeredControllers) { + res.put(rc.getConfiguration().getName(), + rc.getControllerHealthInfo().unhealthyEventSources()); + } + return res; + } + + /** + * @return Aggregated Map with controller related event sources that wraps an informer. Thus, + * either a + * {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource} + * or an + * {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}. + */ + public Map> unhealthyInformerWrappingEventSourceHealthIndicator() { + checkIfStarted(); + Map> res = new HashMap<>(); + for (var rc : registeredControllers) { + res.put(rc.getConfiguration().getName(), rc.getControllerHealthInfo() + .unhealthyInformerEventSourceHealthIndicators()); + } + return res; + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java new file mode 100644 index 0000000000..2adb3a8508 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/ControllerHealthInfo.java @@ -0,0 +1,50 @@ +package io.javaoperatorsdk.operator.health; + +import java.util.Map; +import java.util.stream.Collectors; + +import io.javaoperatorsdk.operator.processing.event.EventSourceManager; + +@SuppressWarnings("rawtypes") +public class ControllerHealthInfo { + + private EventSourceManager eventSourceManager; + + public ControllerHealthInfo(EventSourceManager eventSourceManager) { + this.eventSourceManager = eventSourceManager; + } + + public Map eventSourceHealthIndicators() { + return eventSourceManager.allEventSources().entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public Map unhealthyEventSources() { + return eventSourceManager.allEventSources().entrySet().stream() + .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + public Map informerEventSourceHealthIndicators() { + return eventSourceManager.allEventSources().entrySet().stream() + .filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator) + .collect(Collectors.toMap(Map.Entry::getKey, + e -> (InformerWrappingEventSourceHealthIndicator) e.getValue())); + + } + + /** + * @return Map with event sources that wraps an informer. Thus, either a + * {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource} + * or an + * {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}. + */ + public Map unhealthyInformerEventSourceHealthIndicators() { + return eventSourceManager.allEventSources().entrySet().stream() + .filter(e -> e.getValue().getStatus() == Status.UNHEALTHY) + .filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator) + .collect(Collectors.toMap(Map.Entry::getKey, + e -> (InformerWrappingEventSourceHealthIndicator) e.getValue())); + } + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java new file mode 100644 index 0000000000..e44fcb5b72 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/EventSourceHealthIndicator.java @@ -0,0 +1,6 @@ +package io.javaoperatorsdk.operator.health; + +public interface EventSourceHealthIndicator { + + Status getStatus(); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java new file mode 100644 index 0000000000..afd8b61bed --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerHealthIndicator.java @@ -0,0 +1,17 @@ +package io.javaoperatorsdk.operator.health; + +public interface InformerHealthIndicator extends EventSourceHealthIndicator { + + boolean hasSynced(); + + boolean isWatching(); + + boolean isRunning(); + + @Override + default Status getStatus() { + return isRunning() && hasSynced() && isWatching() ? Status.HEALTHY : Status.UNHEALTHY; + } + + String getTargetNamespace(); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java new file mode 100644 index 0000000000..5a603ad321 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/InformerWrappingEventSourceHealthIndicator.java @@ -0,0 +1,22 @@ +package io.javaoperatorsdk.operator.health; + +import java.util.Map; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; + +public interface InformerWrappingEventSourceHealthIndicator + extends EventSourceHealthIndicator { + + Map informerHealthIndicators(); + + @Override + default Status getStatus() { + var nonUp = informerHealthIndicators().values().stream() + .filter(i -> i.getStatus() != Status.HEALTHY).findAny(); + + return nonUp.isPresent() ? Status.UNHEALTHY : Status.HEALTHY; + } + + ResourceConfiguration getInformerConfiguration(); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java new file mode 100644 index 0000000000..d3a300b7d8 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/health/Status.java @@ -0,0 +1,11 @@ +package io.javaoperatorsdk.operator.health; + +public enum Status { + + HEALTHY, UNHEALTHY, + /** + * For event sources where it cannot be determined if it is healthy ot not. + */ + UNKNOWN + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index 1b6542b8eb..3de7d665b2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -39,6 +39,7 @@ import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider; import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer; import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext; +import io.javaoperatorsdk.operator.health.ControllerHealthInfo; import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow; import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult; import io.javaoperatorsdk.operator.processing.event.EventProcessor; @@ -67,6 +68,7 @@ public class Controller

private final GroupVersionKind associatedGVK; private final EventProcessor

eventProcessor; + private final ControllerHealthInfo controllerHealthInfo; public Controller(Reconciler

reconciler, ControllerConfiguration

configuration, @@ -86,6 +88,7 @@ public Controller(Reconciler

reconciler, eventSourceManager = new EventSourceManager<>(this); eventProcessor = new EventProcessor<>(eventSourceManager); eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer(); + controllerHealthInfo = new ControllerHealthInfo(eventSourceManager); } @Override @@ -285,6 +288,11 @@ public ControllerConfiguration

getConfiguration() { return configuration; } + @Override + public ControllerHealthInfo getControllerHealthInfo() { + return controllerHealthInfo; + } + public KubernetesClient getClient() { return kubernetesClient; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index d091d442f6..279eb78381 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -1,9 +1,6 @@ package io.javaoperatorsdk.operator.processing.event; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Objects; -import java.util.Set; +import java.util.*; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -174,6 +171,11 @@ public Set getRegisteredEventSources() { .collect(Collectors.toCollection(LinkedHashSet::new)); } + public Map allEventSources() { + return eventSources.allNamedEventSources().collect(Collectors.toMap(NamedEventSource::name, + NamedEventSource::original)); + } + public ControllerResourceEventSource

getControllerResourceEventSource() { return eventSources.controllerResourceEventSource(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java index e4fabe7ff8..a4276ba8c9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java @@ -46,6 +46,13 @@ public Stream additionalNamedEventSources() { flatMappedSources()); } + public Stream allNamedEventSources() { + return Stream.concat(Stream.of(namedControllerResourceEventSource(), + new NamedEventSource(retryAndRescheduleTimerEventSource, + RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)), + flatMappedSources()); + } + Stream additionalEventSources() { return Stream.concat( Stream.of(retryEventSource()).filter(Objects::nonNull), diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java index 76c8ca164d..ec2783f797 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/EventSource.java @@ -1,5 +1,7 @@ package io.javaoperatorsdk.operator.processing.event.source; +import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.EventHandler; @@ -10,7 +12,7 @@ * your reconciler implement * {@link io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer}. */ -public interface EventSource extends LifecycleAware { +public interface EventSource extends LifecycleAware, EventSourceHealthIndicator { /** * Sets the {@link EventHandler} that is linked to your reconciler when this EventSource is @@ -23,4 +25,9 @@ public interface EventSource extends LifecycleAware { default EventSourceStartPriority priority() { return EventSourceStartPriority.DEFAULT; } + + @Override + default Status getStatus() { + return Status.UNKNOWN; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index f5f52d1c0e..c57d805e34 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -21,15 +21,17 @@ import io.javaoperatorsdk.operator.api.config.Cloner; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; +import static io.javaoperatorsdk.operator.api.reconciler.Constants.WATCH_ALL_NAMESPACES; + public class InformerManager> implements LifecycleAware, IndexerResourceCache { - private static final String ALL_NAMESPACES_MAP_KEY = "allNamespaces"; private static final Logger log = LoggerFactory.getLogger(InformerManager.class); private final Map> sources = new ConcurrentHashMap<>(); @@ -58,7 +60,7 @@ void initSources(MixedOperation, Resource> clien final var filteredBySelectorClient = client.inAnyNamespace().withLabelSelector(labelSelector); final var source = - createEventSource(filteredBySelectorClient, eventHandler, ALL_NAMESPACES_MAP_KEY); + createEventSource(filteredBySelectorClient, eventHandler, WATCH_ALL_NAMESPACES); log.debug("Registered {} -> {} for any namespace", this, source); } else { targetNamespaces.forEach( @@ -96,10 +98,11 @@ public void changeNamespaces(Set namespaces) { private InformerWrapper createEventSource( FilterWatchListDeletable, Resource> filteredBySelectorClient, - ResourceEventHandler eventHandler, String key) { - var source = new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0)); + ResourceEventHandler eventHandler, String namespaceIdentifier) { + var source = + new InformerWrapper<>(filteredBySelectorClient.runnableInformer(0), namespaceIdentifier); source.addEventHandler(eventHandler); - sources.put(key, source); + sources.put(namespaceIdentifier, source); return source; } @@ -127,7 +130,7 @@ public Stream list(Predicate predicate) { @Override public Stream list(String namespace, Predicate predicate) { if (isWatchingAllNamespaces()) { - return getSource(ALL_NAMESPACES_MAP_KEY) + return getSource(WATCH_ALL_NAMESPACES) .map(source -> source.list(namespace, predicate)) .orElseGet(Stream::empty); } else { @@ -139,7 +142,7 @@ public Stream list(String namespace, Predicate predicate) { @Override public Optional get(ResourceID resourceID) { - return getSource(resourceID.getNamespace().orElse(ALL_NAMESPACES_MAP_KEY)) + return getSource(resourceID.getNamespace().orElse(WATCH_ALL_NAMESPACES)) .flatMap(source -> source.get(resourceID)) .map(cloner::clone); } @@ -150,11 +153,11 @@ public Stream keys() { } private boolean isWatchingAllNamespaces() { - return sources.containsKey(ALL_NAMESPACES_MAP_KEY); + return sources.containsKey(WATCH_ALL_NAMESPACES); } private Optional> getSource(String namespace) { - namespace = isWatchingAllNamespaces() || namespace == null ? ALL_NAMESPACES_MAP_KEY : namespace; + namespace = isWatchingAllNamespaces() || namespace == null ? WATCH_ALL_NAMESPACES : namespace; return Optional.ofNullable(sources.get(namespace)); } @@ -179,4 +182,8 @@ public String toString() { + configuration.getEffectiveNamespaces() + (selector != null ? " selector: " + selector : ""); } + + public Map informerHealthIndicators() { + return Collections.unmodifiableMap(sources); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 601cb0c10c..c121a29c9c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -21,21 +21,25 @@ import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; import io.javaoperatorsdk.operator.processing.LifecycleAware; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.IndexerResourceCache; class InformerWrapper - implements LifecycleAware, IndexerResourceCache { + implements LifecycleAware, IndexerResourceCache, InformerHealthIndicator { private static final Logger log = LoggerFactory.getLogger(InformerWrapper.class); private final SharedIndexInformer informer; private final Cache cache; + private final String namespaceIdentifier; - public InformerWrapper(SharedIndexInformer informer) { + public InformerWrapper(SharedIndexInformer informer, String namespaceIdentifier) { this.informer = informer; + this.namespaceIdentifier = namespaceIdentifier; this.cache = (Cache) informer.getStore(); + } @Override @@ -145,4 +149,24 @@ public List byIndex(String indexName, String indexKey) { public String toString() { return "InformerWrapper [" + versionedFullResourceName() + "] (" + informer + ')'; } + + @Override + public boolean hasSynced() { + return informer.hasSynced(); + } + + @Override + public boolean isWatching() { + return informer.isWatching(); + } + + @Override + public boolean isRunning() { + return informer.isRunning(); + } + + @Override + public String getTargetNamespace() { + return namespaceIdentifier; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index cc9af59094..51460ceea6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -19,24 +19,29 @@ import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.api.reconciler.dependent.RecentOperationCacheFiller; +import io.javaoperatorsdk.operator.health.InformerHealthIndicator; +import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.*; public abstract class ManagedInformerEventSource> extends AbstractResourceEventSource implements ResourceEventHandler, Cache, IndexerResourceCache, - RecentOperationCacheFiller, - NamespaceChangeable { + RecentOperationCacheFiller, NamespaceChangeable, + InformerWrappingEventSourceHealthIndicator { private static final Logger log = LoggerFactory.getLogger(ManagedInformerEventSource.class); protected TemporaryResourceCache temporaryResourceCache = new TemporaryResourceCache<>(this); protected InformerManager cache = new InformerManager<>(); + protected C configuration; protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { super(configuration.getResourceClass()); manager().initSources(client, configuration, this); + this.configuration = configuration; } @Override @@ -133,4 +138,18 @@ public Stream list(Predicate predicate) { return cache.list(predicate); } + @Override + public Map informerHealthIndicators() { + return cache.informerHealthIndicators(); + } + + @Override + public Status getStatus() { + return InformerWrappingEventSourceHealthIndicator.super.getStatus(); + } + + @Override + public ResourceConfiguration getInformerConfiguration() { + return configuration; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java index 44bab7a624..8bf5c50fd4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PerResourcePollingEventSource.java @@ -40,6 +40,7 @@ public class PerResourcePollingEventSource private final long period; private final Set fetchedForPrimaries = ConcurrentHashMap.newKeySet(); + public PerResourcePollingEventSource(ResourceFetcher resourceFetcher, Cache

resourceCache, long period, Class resourceClass) { this(resourceFetcher, resourceCache, period, null, resourceClass, @@ -152,4 +153,5 @@ public void stop() throws OperatorException { super.stop(); timer.cancel(); } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java index 94efbf25aa..9ef889ecb6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSource.java @@ -1,12 +1,14 @@ package io.javaoperatorsdk.operator.processing.event.source.polling; import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.CacheKeyMapper; import io.javaoperatorsdk.operator.processing.event.source.ExternalResourceCachingEventSource; @@ -45,6 +47,7 @@ public class PollingEventSource private final Timer timer = new Timer(); private final GenericResourceFetcher genericResourceFetcher; private final long period; + private final AtomicBoolean healthy = new AtomicBoolean(true); public PollingEventSource( GenericResourceFetcher supplier, @@ -73,11 +76,17 @@ public void start() throws OperatorException { new TimerTask() { @Override public void run() { - if (!isRunning()) { - log.debug("Event source not yet started. Will not run."); - return; + try { + if (!isRunning()) { + log.debug("Event source not yet started. Will not run."); + return; + } + getStateAndFillCache(); + healthy.set(true); + } catch (RuntimeException e) { + healthy.set(false); + log.error("Error during polling.", e); } - getStateAndFillCache(); } }, period, @@ -89,7 +98,6 @@ protected synchronized void getStateAndFillCache() { handleResources(values); } - public interface GenericResourceFetcher { Map> fetchResources(); } @@ -99,4 +107,9 @@ public void stop() throws OperatorException { super.stop(); timer.cancel(); } + + @Override + public Status getStatus() { + return healthy.get() ? Status.HEALTHY : Status.UNHEALTHY; + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java index 0a777f0cbd..605922f06b 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/polling/PollingEventSourceTest.java @@ -1,5 +1,6 @@ package io.javaoperatorsdk.operator.processing.event.source.polling; +import java.time.Duration; import java.util.HashMap; import java.util.Map; import java.util.Set; @@ -8,12 +9,15 @@ import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.health.Status; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource; import static io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource.*; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; import static org.mockito.Mockito.*; class PollingEventSourceTest @@ -21,11 +25,12 @@ class PollingEventSourceTest AbstractEventSourceTestBase, EventHandler> { public static final int DEFAULT_WAIT_PERIOD = 100; + public static final long POLL_PERIOD = 30L; private PollingEventSource.GenericResourceFetcher resourceFetcher = mock(PollingEventSource.GenericResourceFetcher.class); private final PollingEventSource pollingEventSource = - new PollingEventSource<>(resourceFetcher, 30L, SampleExternalResource.class, + new PollingEventSource<>(resourceFetcher, POLL_PERIOD, SampleExternalResource.class, (SampleExternalResource er) -> er.getName() + "#" + er.getValue()); @BeforeEach @@ -73,6 +78,26 @@ void propagatesEventOnNewResourceForPrimary() throws InterruptedException { verify(eventHandler, times(2)).handleEvent(any()); } + @Test + void updatesHealthIndicatorBasedOnExceptionsInFetcher() throws InterruptedException { + when(resourceFetcher.fetchResources()) + .thenReturn(testResponseWithOneValue()); + pollingEventSource.start(); + assertThat(pollingEventSource.getStatus()).isEqualTo(Status.HEALTHY); + + when(resourceFetcher.fetchResources()) + // 2x - to make sure to catch the health indicator change + .thenThrow(new RuntimeException("test exception")) + .thenThrow(new RuntimeException("test exception")) + .thenReturn(testResponseWithOneValue()); + + await().pollInterval(Duration.ofMillis(POLL_PERIOD)).untilAsserted( + () -> assertThat(pollingEventSource.getStatus()).isEqualTo(Status.UNHEALTHY)); + + await() + .untilAsserted(() -> assertThat(pollingEventSource.getStatus()).isEqualTo(Status.HEALTHY)); + } + private Map> testResponseWithTwoValueForSameId() { Map> res = new HashMap<>(); res.put(primaryID1(), Set.of(testResource1(), testResource2())); diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java index bddfe2c428..97f5883a08 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/InformerRelatedBehaviorITS.java @@ -14,9 +14,12 @@ import io.fabric8.kubernetes.client.utils.KubernetesResourceUtil; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestCustomResource; import io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestReconciler; +import static io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestReconciler.CONFIG_MAP_DEPENDENT_RESOURCE; +import static io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestReconciler.INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -73,21 +76,25 @@ void startsUpWhenNoPermissionToCustomResource() { adminClient.resource(testCustomResource()).createOrReplace(); setNoCustomResourceAccess(); - startOperator(false); + var operator = startOperator(false); assertNotReconciled(); + assertRuntimeInfoNoCRPermission(operator); setFullResourcesAccess(); waitForWatchReconnect(); assertReconciled(); + assertThat(operator.getRuntimeInfo().allEventSourcesAreHealthy()).isTrue(); } + @Test void startsUpWhenNoPermissionToSecondaryResource() { adminClient.resource(testCustomResource()).createOrReplace(); setNoConfigMapAccess(); - startOperator(false); + var operator = startOperator(false); assertNotReconciled(); + assertRuntimeInfoForSecondaryPermission(operator); setFullResourcesAccess(); waitForWatchReconnect(); @@ -184,6 +191,40 @@ private void assertReconciled() { }); } + + private void assertRuntimeInfoNoCRPermission(Operator operator) { + assertThat(operator.getRuntimeInfo().allEventSourcesAreHealthy()).isFalse(); + var unhealthyEventSources = + operator.getRuntimeInfo().unhealthyEventSources() + .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER); + assertThat(unhealthyEventSources).isNotEmpty(); + assertThat(unhealthyEventSources.get(ControllerResourceEventSource.class.getSimpleName())) + .isNotNull(); + var informerHealthIndicators = operator.getRuntimeInfo() + .unhealthyInformerWrappingEventSourceHealthIndicator() + .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER); + assertThat(informerHealthIndicators).isNotEmpty(); + assertThat(informerHealthIndicators.get(ControllerResourceEventSource.class.getSimpleName()) + .informerHealthIndicators()) + .hasSize(1); + } + + private void assertRuntimeInfoForSecondaryPermission(Operator operator) { + assertThat(operator.getRuntimeInfo().allEventSourcesAreHealthy()).isFalse(); + var unhealthyEventSources = + operator.getRuntimeInfo().unhealthyEventSources() + .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER); + assertThat(unhealthyEventSources).isNotEmpty(); + assertThat(unhealthyEventSources.get(CONFIG_MAP_DEPENDENT_RESOURCE)).isNotNull(); + var informerHealthIndicators = operator.getRuntimeInfo() + .unhealthyInformerWrappingEventSourceHealthIndicator() + .get(INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER); + assertThat(informerHealthIndicators).isNotEmpty(); + assertThat( + informerHealthIndicators.get(CONFIG_MAP_DEPENDENT_RESOURCE).informerHealthIndicators()) + .hasSize(1); + } + KubernetesClient clientUsingServiceAccount() { KubernetesClient client = new KubernetesClientBuilder() .withConfig(new ConfigBuilder() diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java index baeff08478..13057d547a 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/informerrelatedbehavior/InformerRelatedBehaviorTestReconciler.java @@ -10,10 +10,17 @@ import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; -@ControllerConfiguration(dependents = @Dependent(type = ConfigMapDependentResource.class)) +@ControllerConfiguration( + name = InformerRelatedBehaviorTestReconciler.INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER, + dependents = @Dependent( + name = InformerRelatedBehaviorTestReconciler.CONFIG_MAP_DEPENDENT_RESOURCE, + type = ConfigMapDependentResource.class)) public class InformerRelatedBehaviorTestReconciler implements Reconciler, TestExecutionInfoProvider { + public static final String INFORMER_RELATED_BEHAVIOR_TEST_RECONCILER = + "InformerRelatedBehaviorTestReconciler"; + public static final String CONFIG_MAP_DEPENDENT_RESOURCE = "ConfigMapDependentResource"; private final AtomicInteger numberOfExecutions = new AtomicInteger(0); private KubernetesClient client; diff --git a/sample-operators/webpage/k8s/operator.yaml b/sample-operators/webpage/k8s/operator.yaml index f4b0b027ea..d8518ab21d 100644 --- a/sample-operators/webpage/k8s/operator.yaml +++ b/sample-operators/webpage/k8s/operator.yaml @@ -25,18 +25,22 @@ spec: imagePullPolicy: Never ports: - containerPort: 80 - readinessProbe: + startupProbe: httpGet: - path: /health + path: /startup port: 8080 initialDelaySeconds: 1 + periodSeconds: 2 timeoutSeconds: 1 + failureThreshold: 10 livenessProbe: httpGet: - path: /health + path: /healthz port: 8080 - initialDelaySeconds: 30 + initialDelaySeconds: 5 timeoutSeconds: 1 + periodSeconds: 2 + failureThreshold: 3 --- apiVersion: rbac.authorization.k8s.io/v1 diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java new file mode 100644 index 0000000000..155fc13fec --- /dev/null +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/LivenessHandler.java @@ -0,0 +1,29 @@ +package io.javaoperatorsdk.operator.sample; + +import java.io.IOException; + +import io.javaoperatorsdk.operator.Operator; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +import static io.javaoperatorsdk.operator.sample.StartupHandler.sendMessage; + +public class LivenessHandler implements HttpHandler { + + private final Operator operator; + + public LivenessHandler(Operator operator) { + this.operator = operator; + } + + // custom logic can be added here based on the health of event sources + @Override + public void handle(HttpExchange httpExchange) throws IOException { + if (operator.getRuntimeInfo().allEventSourcesAreHealthy()) { + sendMessage(httpExchange, 200, "healthy"); + } else { + sendMessage(httpExchange, 400, "an event source is not healthy"); + } + } +} diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java new file mode 100644 index 0000000000..0cbc313273 --- /dev/null +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/StartupHandler.java @@ -0,0 +1,37 @@ +package io.javaoperatorsdk.operator.sample; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import io.javaoperatorsdk.operator.Operator; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; + +public class StartupHandler implements HttpHandler { + + private final Operator operator; + + public StartupHandler(Operator operator) { + this.operator = operator; + } + + @Override + public void handle(HttpExchange httpExchange) throws IOException { + if (operator.getRuntimeInfo().isStarted()) { + sendMessage(httpExchange, 200, "started"); + } else { + sendMessage(httpExchange, 400, "not started yet"); + } + } + + public static void sendMessage(HttpExchange httpExchange, int code, String message) + throws IOException { + try (var outputStream = httpExchange.getResponseBody()) { + var bytes = message.getBytes(StandardCharsets.UTF_8); + httpExchange.sendResponseHeaders(code, bytes.length); + outputStream.write(bytes); + outputStream.flush(); + } + } +} diff --git a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java index 8f205ddbd2..6f6dd10539 100644 --- a/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java +++ b/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java @@ -1,17 +1,16 @@ package io.javaoperatorsdk.operator.sample; import java.io.IOException; +import java.net.InetSocketAddress; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.takes.facets.fork.FkRegex; -import org.takes.facets.fork.TkFork; -import org.takes.http.Exit; -import org.takes.http.FtBasic; import io.fabric8.kubernetes.client.*; import io.javaoperatorsdk.operator.Operator; +import com.sun.net.httpserver.HttpServer; + public class WebPageOperator { public static final String WEBPAGE_RECONCILER_ENV = "WEBPAGE_RECONCILER"; public static final String WEBPAGE_CLASSIC_RECONCILER_ENV_VALUE = "classic"; @@ -23,7 +22,7 @@ public static void main(String[] args) throws IOException { log.info("WebServer Operator starting!"); KubernetesClient client = new KubernetesClientBuilder().build(); - Operator operator = new Operator(client); + Operator operator = new Operator(client, o -> o.withStopOnInformerErrorDuringStartup(false)); String reconcilerEnvVar = System.getenv(WEBPAGE_RECONCILER_ENV); if (WEBPAGE_CLASSIC_RECONCILER_ENV_VALUE.equals(reconcilerEnvVar)) { operator.register(new WebPageReconciler(client)); @@ -35,6 +34,11 @@ public static void main(String[] args) throws IOException { } operator.start(); - new FtBasic(new TkFork(new FkRegex("/health", "ALL GOOD!")), 8080).start(Exit.NEVER); + HttpServer server = HttpServer.create(new InetSocketAddress(8080), 0); + server.createContext("/startup", new StartupHandler(operator)); + // we want to restart the operator if something goes wrong with (maybe just some) event sources + server.createContext("/healthz", new LivenessHandler(operator)); + server.setExecutor(null); + server.start(); } }