Skip to content

Commit

Permalink
feat: runtime info for health probes (#1594)
Browse files Browse the repository at this point in the history
  • Loading branch information
csviri committed Nov 23, 2022
1 parent 5552723 commit 641f38c
Show file tree
Hide file tree
Showing 25 changed files with 486 additions and 39 deletions.
12 changes: 12 additions & 0 deletions docs/documentation/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -699,6 +699,18 @@ leader left off should one of them become elected leader.
See sample configuration in the [E2E test](https://github.com/java-operator-sdk/java-operator-sdk/blob/8865302ac0346ee31f2d7b348997ec2913d5922b/sample-operators/leader-election/src/main/java/io/javaoperatorsdk/operator/sample/LeaderElectionTestOperator.java#L21-L23)
.

## Runtime Info

[RuntimeInfo](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/RuntimeInfo.java#L16-L16)
is used mainly to check the actual health of event sources. Based on this information it is easy to implement custom
liveness probes.

[stopOnInformerErrorDuringStartup](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java#L168-L168)
setting, where this flag usually needs to be set to false, in order to control the exact liveness properties.

See also an example implementation in the
[WebPage sample](https://github.com/java-operator-sdk/java-operator-sdk/blob/3e2e7c4c834ef1c409d636156b988125744ca911/sample-operators/webpage/src/main/java/io/javaoperatorsdk/operator/sample/WebPageOperator.java#L38-L43)

## Monitoring with Micrometer

## Automatic Generation of CRDs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,11 @@ public KubernetesClient getKubernetesClient() {
* where there is no obvious entrypoint to the application which can trigger the injection process
* and start the cluster monitoring processes.
*/
public void start() {
public synchronized void start() {
try {
if (started) {
return;
}
started = true;
controllerManager.shouldStart();
final var version = ConfigurationServiceProvider.instance().getVersion();
log.info(
Expand All @@ -101,6 +100,7 @@ public void start() {
// the leader election would start subsequently the processor if on
controllerManager.start(!leaderElectionManager.isLeaderElectionEnabled());
leaderElectionManager.start();
started = true;
} catch (Exception e) {
log.error("Error starting operator", e);
stop();
Expand Down Expand Up @@ -208,4 +208,11 @@ public int getRegisteredControllersNumber() {
return controllerManager.size();
}

public RuntimeInfo getRuntimeInfo() {
return new RuntimeInfo(this);
}

boolean isStarted() {
return started;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,12 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
import io.javaoperatorsdk.operator.health.ControllerHealthInfo;

public interface RegisteredController<P extends HasMetadata> extends NamespaceChangeable {

ControllerConfiguration<P> getConfiguration();

ControllerHealthInfo getControllerHealthInfo();

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
package io.javaoperatorsdk.operator;

import java.util.*;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator;
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;

/**
* RuntimeInfo in general is available when operator is fully started. You can use "isStarted" to
* check that.
*/
@SuppressWarnings("rawtypes")
public class RuntimeInfo {

private static final Logger log = LoggerFactory.getLogger(RuntimeInfo.class);

private final Set<RegisteredController> registeredControllers;
private final Operator operator;

public RuntimeInfo(Operator operator) {
this.registeredControllers = operator.getRegisteredControllers();
this.operator = operator;
}

public boolean isStarted() {
return operator.isStarted();
}

public Set<RegisteredController> getRegisteredControllers() {
checkIfStarted();
return registeredControllers;
}

private void checkIfStarted() {
if (!isStarted()) {
log.warn(
"Operator not started yet while accessing runtime info, this might lead to an unreliable behavior");
}
}

public boolean allEventSourcesAreHealthy() {
checkIfStarted();
return registeredControllers.stream()
.filter(rc -> !rc.getControllerHealthInfo().unhealthyEventSources().isEmpty())
.findFirst().isEmpty();
}

/**
* @return Aggregated Map with controller related event sources.
*/

public Map<String, Map<String, EventSourceHealthIndicator>> unhealthyEventSources() {
checkIfStarted();
Map<String, Map<String, EventSourceHealthIndicator>> res = new HashMap<>();
for (var rc : registeredControllers) {
res.put(rc.getConfiguration().getName(),
rc.getControllerHealthInfo().unhealthyEventSources());
}
return res;
}

/**
* @return Aggregated Map with controller related event sources that wraps an informer. Thus,
* either a
* {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource}
* or an
* {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}.
*/
public Map<String, Map<String, InformerWrappingEventSourceHealthIndicator>> unhealthyInformerWrappingEventSourceHealthIndicator() {
checkIfStarted();
Map<String, Map<String, InformerWrappingEventSourceHealthIndicator>> res = new HashMap<>();
for (var rc : registeredControllers) {
res.put(rc.getConfiguration().getName(), rc.getControllerHealthInfo()
.unhealthyInformerEventSourceHealthIndicators());
}
return res;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.javaoperatorsdk.operator.health;

import java.util.Map;
import java.util.stream.Collectors;

import io.javaoperatorsdk.operator.processing.event.EventSourceManager;

@SuppressWarnings("rawtypes")
public class ControllerHealthInfo {

private EventSourceManager<?> eventSourceManager;

public ControllerHealthInfo(EventSourceManager eventSourceManager) {
this.eventSourceManager = eventSourceManager;
}

public Map<String, EventSourceHealthIndicator> eventSourceHealthIndicators() {
return eventSourceManager.allEventSources().entrySet().stream()
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Map<String, EventSourceHealthIndicator> unhealthyEventSources() {
return eventSourceManager.allEventSources().entrySet().stream()
.filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public Map<String, InformerWrappingEventSourceHealthIndicator> informerEventSourceHealthIndicators() {
return eventSourceManager.allEventSources().entrySet().stream()
.filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
.collect(Collectors.toMap(Map.Entry::getKey,
e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));

}

/**
* @return Map with event sources that wraps an informer. Thus, either a
* {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource}
* or an
* {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource}.
*/
public Map<String, InformerWrappingEventSourceHealthIndicator> unhealthyInformerEventSourceHealthIndicators() {
return eventSourceManager.allEventSources().entrySet().stream()
.filter(e -> e.getValue().getStatus() == Status.UNHEALTHY)
.filter(e -> e.getValue() instanceof InformerWrappingEventSourceHealthIndicator)
.collect(Collectors.toMap(Map.Entry::getKey,
e -> (InformerWrappingEventSourceHealthIndicator) e.getValue()));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package io.javaoperatorsdk.operator.health;

public interface EventSourceHealthIndicator {

Status getStatus();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package io.javaoperatorsdk.operator.health;

public interface InformerHealthIndicator extends EventSourceHealthIndicator {

boolean hasSynced();

boolean isWatching();

boolean isRunning();

@Override
default Status getStatus() {
return isRunning() && hasSynced() && isWatching() ? Status.HEALTHY : Status.UNHEALTHY;
}

String getTargetNamespace();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.javaoperatorsdk.operator.health;

import java.util.Map;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;

public interface InformerWrappingEventSourceHealthIndicator<R extends HasMetadata>
extends EventSourceHealthIndicator {

Map<String, InformerHealthIndicator> informerHealthIndicators();

@Override
default Status getStatus() {
var nonUp = informerHealthIndicators().values().stream()
.filter(i -> i.getStatus() != Status.HEALTHY).findAny();

return nonUp.isPresent() ? Status.UNHEALTHY : Status.HEALTHY;
}

ResourceConfiguration<R> getInformerConfiguration();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.javaoperatorsdk.operator.health;

public enum Status {

HEALTHY, UNHEALTHY,
/**
* For event sources where it cannot be determined if it is healthy ot not.
*/
UNKNOWN

}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceProvider;
import io.javaoperatorsdk.operator.api.reconciler.dependent.EventSourceReferencer;
import io.javaoperatorsdk.operator.api.reconciler.dependent.managed.DefaultManagedDependentResourceContext;
import io.javaoperatorsdk.operator.health.ControllerHealthInfo;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflow;
import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowCleanupResult;
import io.javaoperatorsdk.operator.processing.event.EventProcessor;
Expand Down Expand Up @@ -67,6 +68,7 @@ public class Controller<P extends HasMetadata>

private final GroupVersionKind associatedGVK;
private final EventProcessor<P> eventProcessor;
private final ControllerHealthInfo controllerHealthInfo;

public Controller(Reconciler<P> reconciler,
ControllerConfiguration<P> configuration,
Expand All @@ -86,6 +88,7 @@ public Controller(Reconciler<P> reconciler,
eventSourceManager = new EventSourceManager<>(this);
eventProcessor = new EventProcessor<>(eventSourceManager);
eventSourceManager.postProcessDefaultEventSourcesAfterProcessorInitializer();
controllerHealthInfo = new ControllerHealthInfo(eventSourceManager);
}

@Override
Expand Down Expand Up @@ -285,6 +288,11 @@ public ControllerConfiguration<P> getConfiguration() {
return configuration;
}

@Override
public ControllerHealthInfo getControllerHealthInfo() {
return controllerHealthInfo;
}

public KubernetesClient getClient() {
return kubernetesClient;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
package io.javaoperatorsdk.operator.processing.event;

import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;

import org.slf4j.Logger;
Expand Down Expand Up @@ -174,6 +171,11 @@ public Set<EventSource> getRegisteredEventSources() {
.collect(Collectors.toCollection(LinkedHashSet::new));
}

public Map<String, EventSource> allEventSources() {
return eventSources.allNamedEventSources().collect(Collectors.toMap(NamedEventSource::name,
NamedEventSource::original));
}

public ControllerResourceEventSource<P> getControllerResourceEventSource() {
return eventSources.controllerResourceEventSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ public Stream<NamedEventSource> additionalNamedEventSources() {
flatMappedSources());
}

public Stream<NamedEventSource> allNamedEventSources() {
return Stream.concat(Stream.of(namedControllerResourceEventSource(),
new NamedEventSource(retryAndRescheduleTimerEventSource,
RETRY_RESCHEDULE_TIMER_EVENT_SOURCE_NAME)),
flatMappedSources());
}

Stream<EventSource> additionalEventSources() {
return Stream.concat(
Stream.of(retryEventSource()).filter(Objects::nonNull),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.javaoperatorsdk.operator.processing.event.source;

import io.javaoperatorsdk.operator.health.EventSourceHealthIndicator;
import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
import io.javaoperatorsdk.operator.processing.event.EventHandler;

Expand All @@ -10,7 +12,7 @@
* your reconciler implement
* {@link io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer}.
*/
public interface EventSource extends LifecycleAware {
public interface EventSource extends LifecycleAware, EventSourceHealthIndicator {

/**
* Sets the {@link EventHandler} that is linked to your reconciler when this EventSource is
Expand All @@ -23,4 +25,9 @@ public interface EventSource extends LifecycleAware {
default EventSourceStartPriority priority() {
return EventSourceStartPriority.DEFAULT;
}

@Override
default Status getStatus() {
return Status.UNKNOWN;
}
}

0 comments on commit 641f38c

Please sign in to comment.