From 6677d16736de42c25525381694758c48ebf9ae8b Mon Sep 17 00:00:00 2001 From: Olga Maciaszek-Sharma Date: Thu, 2 Feb 2023 14:51:10 +0100 Subject: [PATCH] Add Reactive probe (#1201) --- .../discovery/ReactiveDiscoveryClient.java | 30 ++++++++++ ...eactiveDiscoveryClientHealthIndicator.java | 49 +++++++++-------- ...veDiscoveryClientHealthIndicatorTests.java | 55 +++++++++++++++++-- src/checkstyle/checkstyle-suppressions.xml | 1 + 4 files changed, 106 insertions(+), 29 deletions(-) diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java index 0cd1ce3b0..5ef5b3335 100644 --- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java +++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/ReactiveDiscoveryClient.java @@ -16,7 +16,10 @@ package org.springframework.cloud.client.discovery; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; import org.springframework.cloud.client.ServiceInstance; import org.springframework.core.Ordered; @@ -26,9 +29,12 @@ * Eureka or consul.io. * * @author Tim Ysewyn + * @author Olga Maciaszek-Sharma */ public interface ReactiveDiscoveryClient extends Ordered { + Log LOG = LogFactory.getLog(ReactiveDiscoveryClient.class); + /** * Default order of the discovery client. */ @@ -60,11 +66,35 @@ public interface ReactiveDiscoveryClient extends Ordered { *

* The default implementation simply calls {@link #getServices()} - client * implementations can override with a lighter weight operation if they choose to. + * @deprecated in favour of {@link ReactiveDiscoveryClient#reactiveProbe()}. This + * method should not be used as is, as it contains a bug - the method called within + * returns a {@link Flux}, which is not accessible for subscription or blocking from + * within. We are leaving it with a deprecation in order not to bring downstream + * implementations. */ + @Deprecated default void probe() { + if (LOG.isWarnEnabled()) { + LOG.warn("ReactiveDiscoveryClient#probe has been called. If you're calling this method directly, " + + "use ReactiveDiscoveryClient#reactiveProbe instead."); + } getServices(); } + /** + * Can be used to verify the client is still valid and able to make calls. + *

+ * A successful invocation with no exception thrown implies the client is able to make + * calls. + *

+ * The default implementation simply calls {@link #getServices()} and wraps it with a + * {@link Mono} - client implementations can override with a lighter weight operation + * if they choose to. + */ + default Mono reactiveProbe() { + return getServices().then(); + } + /** * Default implementation for getting order of discovery clients. * @return order diff --git a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java index d49a1412f..13111cd86 100644 --- a/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java +++ b/spring-cloud-commons/src/main/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicator.java @@ -33,21 +33,21 @@ import static java.util.Collections.emptyList; /** - * A health indicator which indicates whether or not the discovery client has been - * initialized. + * A health indicator which indicates whether the discovery client has been initialized. * * @author Tim Ysewyn * @author Chris Bono + * @author Olga Maciaszek-Sharma */ public class ReactiveDiscoveryClientHealthIndicator implements ReactiveDiscoveryHealthIndicator, Ordered, ApplicationListener> { + private static final Log LOG = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class); + private final ReactiveDiscoveryClient discoveryClient; private final DiscoveryClientHealthIndicatorProperties properties; - private final Log log = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class); - private AtomicBoolean discoveryInitialized = new AtomicBoolean(false); private int order = Ordered.HIGHEST_PRECEDENCE; @@ -60,14 +60,14 @@ public ReactiveDiscoveryClientHealthIndicator(ReactiveDiscoveryClient discoveryC @Override public void onApplicationEvent(InstanceRegisteredEvent event) { - if (this.discoveryInitialized.compareAndSet(false, true)) { - this.log.debug("Discovery Client has been initialized"); + if (discoveryInitialized.compareAndSet(false, true)) { + LOG.debug("Discovery Client has been initialized"); } } @Override public Mono health() { - if (this.discoveryInitialized.get()) { + if (discoveryInitialized.get()) { return doHealthCheck(); } else { @@ -78,38 +78,39 @@ public Mono health() { private Mono doHealthCheck() { // @formatter:off - return Mono.just(this.properties.isUseServicesQuery()) + return Mono.just(properties.isUseServicesQuery()) .flatMap(useServices -> useServices ? doHealthCheckWithServices() : doHealthCheckWithProbe()) .onErrorResume(exception -> { - this.log.error("Error", exception); + if (LOG.isErrorEnabled()) { + LOG.error("Error", exception); + } return Mono.just(Health.down().withException(exception).build()); }); // @formatter:on } private Mono doHealthCheckWithProbe() { - // @formatter:off - return Mono.justOrEmpty(this.discoveryClient) - .flatMap(client -> { - client.probe(); - return Mono.just(client); - }) - .map(client -> { - String description = (this.properties.isIncludeDescription()) ? client.description() : ""; - return Health.status(new Status("UP", description)).build(); - }); - // @formatter:on + return discoveryClient.reactiveProbe().doOnError(exception -> { + if (LOG.isErrorEnabled()) { + LOG.error("Probe has failed.", exception); + } + }).then(buildHealthUp(discoveryClient)); + } + + private Mono buildHealthUp(ReactiveDiscoveryClient discoveryClient) { + String description = (properties.isIncludeDescription()) ? discoveryClient.description() : ""; + return Mono.just(Health.status(new Status("UP", description)).build()); } private Mono doHealthCheckWithServices() { // @formatter:off - return Mono.justOrEmpty(this.discoveryClient) + return Mono.justOrEmpty(discoveryClient) .flatMapMany(ReactiveDiscoveryClient::getServices) .collectList() .defaultIfEmpty(emptyList()) .map(services -> { - String description = (this.properties.isIncludeDescription()) ? - this.discoveryClient.description() : ""; + String description = (properties.isIncludeDescription()) ? + discoveryClient.description() : ""; return Health.status(new Status("UP", description)) .withDetail("services", services).build(); }); @@ -123,7 +124,7 @@ public String getName() { @Override public int getOrder() { - return this.order; + return order; } public void setOrder(int order) { diff --git a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java index dc48098f2..0da5afdb1 100644 --- a/spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java +++ b/spring-cloud-commons/src/test/java/org/springframework/cloud/client/discovery/health/reactive/ReactiveDiscoveryClientHealthIndicatorTests.java @@ -27,6 +27,8 @@ import org.springframework.boot.actuate.health.Health; import org.springframework.boot.actuate.health.Status; +import org.springframework.cloud.client.DefaultServiceInstance; +import org.springframework.cloud.client.ServiceInstance; import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient; import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent; import org.springframework.cloud.client.discovery.health.DiscoveryClientHealthIndicatorProperties; @@ -35,12 +37,12 @@ import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static org.assertj.core.api.Assertions.assertThat; -import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.when; /** * @author Tim Ysewyn * @author Chris Bono + * @author Olga Maciaszek-Sharma */ @ExtendWith(MockitoExtension.class) class ReactiveDiscoveryClientHealthIndicatorTests { @@ -78,6 +80,9 @@ public void shouldReturnUnknownStatusWhenNotInitialized() { @Test public void shouldReturnUpStatusWhenNotUsingServicesQueryAndProbeSucceeds() { when(properties.isUseServicesQuery()).thenReturn(false); + ReactiveDiscoveryClient discoveryClient = new TestDiscoveryClient(); + ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient, + properties); Health expectedHealth = Health.status(new Status(Status.UP.getCode(), "")).build(); indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null)); @@ -88,10 +93,10 @@ public void shouldReturnUpStatusWhenNotUsingServicesQueryAndProbeSucceeds() { @Test public void shouldReturnDownStatusWhenNotUsingServicesQueryAndProbeFails() { - when(properties.isUseServicesQuery()).thenReturn(false); - RuntimeException ex = new RuntimeException("something went wrong"); - doThrow(ex).when(discoveryClient).probe(); - Health expectedHealth = Health.down(ex).build(); + ExceptionThrowingDiscoveryClient discoveryClient = new ExceptionThrowingDiscoveryClient(); + ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient, + properties); + Health expectedHealth = Health.down(discoveryClient.exception).build(); indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null)); Mono health = indicator.health(); @@ -140,4 +145,44 @@ public void shouldReturnDownStatusWhenUsingServicesQueryAndCallFails() { StepVerifier.create(health).expectNext(expectedHealth).expectComplete().verify(); } + static class TestDiscoveryClient implements ReactiveDiscoveryClient { + + @Override + public String description() { + return "Test"; + } + + @Override + public Flux getInstances(String serviceId) { + return Flux.just(new DefaultServiceInstance()); + } + + @Override + public Flux getServices() { + return Flux.just("Test"); + } + + } + + static class ExceptionThrowingDiscoveryClient implements ReactiveDiscoveryClient { + + RuntimeException exception = new RuntimeException("something went wrong"); + + @Override + public String description() { + return "Exception"; + } + + @Override + public Flux getInstances(String serviceId) { + throw new RuntimeException("Test!"); + } + + @Override + public Flux getServices() { + throw new RuntimeException("something went wrong"); + } + + } + } diff --git a/src/checkstyle/checkstyle-suppressions.xml b/src/checkstyle/checkstyle-suppressions.xml index 9a855ed3a..d8a816088 100644 --- a/src/checkstyle/checkstyle-suppressions.xml +++ b/src/checkstyle/checkstyle-suppressions.xml @@ -17,4 +17,5 @@ +