Skip to content

Commit

Permalink
Add Reactive probe (#1201)
Browse files Browse the repository at this point in the history
  • Loading branch information
OlgaMaciaszek committed Feb 2, 2023
1 parent 77f9d2b commit 6677d16
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.springframework.cloud.client.discovery;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.core.Ordered;
Expand All @@ -26,9 +29,12 @@
* Eureka or consul.io.
*
* @author Tim Ysewyn
* @author Olga Maciaszek-Sharma
*/
public interface ReactiveDiscoveryClient extends Ordered {

Log LOG = LogFactory.getLog(ReactiveDiscoveryClient.class);

/**
* Default order of the discovery client.
*/
Expand Down Expand Up @@ -60,11 +66,35 @@ public interface ReactiveDiscoveryClient extends Ordered {
* <p>
* The default implementation simply calls {@link #getServices()} - client
* implementations can override with a lighter weight operation if they choose to.
* @deprecated in favour of {@link ReactiveDiscoveryClient#reactiveProbe()}. This
* method should not be used as is, as it contains a bug - the method called within
* returns a {@link Flux}, which is not accessible for subscription or blocking from
* within. We are leaving it with a deprecation in order not to bring downstream
* implementations.
*/
@Deprecated
default void probe() {
if (LOG.isWarnEnabled()) {
LOG.warn("ReactiveDiscoveryClient#probe has been called. If you're calling this method directly, "
+ "use ReactiveDiscoveryClient#reactiveProbe instead.");
}
getServices();
}

/**
* Can be used to verify the client is still valid and able to make calls.
* <p>
* A successful invocation with no exception thrown implies the client is able to make
* calls.
* <p>
* The default implementation simply calls {@link #getServices()} and wraps it with a
* {@link Mono} - client implementations can override with a lighter weight operation
* if they choose to.
*/
default Mono<Void> reactiveProbe() {
return getServices().then();
}

/**
* Default implementation for getting order of discovery clients.
* @return order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@
import static java.util.Collections.emptyList;

/**
* A health indicator which indicates whether or not the discovery client has been
* initialized.
* A health indicator which indicates whether the discovery client has been initialized.
*
* @author Tim Ysewyn
* @author Chris Bono
* @author Olga Maciaszek-Sharma
*/
public class ReactiveDiscoveryClientHealthIndicator
implements ReactiveDiscoveryHealthIndicator, Ordered, ApplicationListener<InstanceRegisteredEvent<?>> {

private static final Log LOG = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class);

private final ReactiveDiscoveryClient discoveryClient;

private final DiscoveryClientHealthIndicatorProperties properties;

private final Log log = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class);

private AtomicBoolean discoveryInitialized = new AtomicBoolean(false);

private int order = Ordered.HIGHEST_PRECEDENCE;
Expand All @@ -60,14 +60,14 @@ public ReactiveDiscoveryClientHealthIndicator(ReactiveDiscoveryClient discoveryC

@Override
public void onApplicationEvent(InstanceRegisteredEvent<?> event) {
if (this.discoveryInitialized.compareAndSet(false, true)) {
this.log.debug("Discovery Client has been initialized");
if (discoveryInitialized.compareAndSet(false, true)) {
LOG.debug("Discovery Client has been initialized");
}
}

@Override
public Mono<Health> health() {
if (this.discoveryInitialized.get()) {
if (discoveryInitialized.get()) {
return doHealthCheck();
}
else {
Expand All @@ -78,38 +78,39 @@ public Mono<Health> health() {

private Mono<Health> doHealthCheck() {
// @formatter:off
return Mono.just(this.properties.isUseServicesQuery())
return Mono.just(properties.isUseServicesQuery())
.flatMap(useServices -> useServices ? doHealthCheckWithServices() : doHealthCheckWithProbe())
.onErrorResume(exception -> {
this.log.error("Error", exception);
if (LOG.isErrorEnabled()) {
LOG.error("Error", exception);
}
return Mono.just(Health.down().withException(exception).build());
});
// @formatter:on
}

private Mono<Health> doHealthCheckWithProbe() {
// @formatter:off
return Mono.justOrEmpty(this.discoveryClient)
.flatMap(client -> {
client.probe();
return Mono.just(client);
})
.map(client -> {
String description = (this.properties.isIncludeDescription()) ? client.description() : "";
return Health.status(new Status("UP", description)).build();
});
// @formatter:on
return discoveryClient.reactiveProbe().doOnError(exception -> {
if (LOG.isErrorEnabled()) {
LOG.error("Probe has failed.", exception);
}
}).then(buildHealthUp(discoveryClient));
}

private Mono<Health> buildHealthUp(ReactiveDiscoveryClient discoveryClient) {
String description = (properties.isIncludeDescription()) ? discoveryClient.description() : "";
return Mono.just(Health.status(new Status("UP", description)).build());
}

private Mono<Health> doHealthCheckWithServices() {
// @formatter:off
return Mono.justOrEmpty(this.discoveryClient)
return Mono.justOrEmpty(discoveryClient)
.flatMapMany(ReactiveDiscoveryClient::getServices)
.collectList()
.defaultIfEmpty(emptyList())
.map(services -> {
String description = (this.properties.isIncludeDescription()) ?
this.discoveryClient.description() : "";
String description = (properties.isIncludeDescription()) ?
discoveryClient.description() : "";
return Health.status(new Status("UP", description))
.withDetail("services", services).build();
});
Expand All @@ -123,7 +124,7 @@ public String getName() {

@Override
public int getOrder() {
return this.order;
return order;
}

public void setOrder(int order) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent;
import org.springframework.cloud.client.discovery.health.DiscoveryClientHealthIndicatorProperties;
Expand All @@ -35,12 +37,12 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;

/**
* @author Tim Ysewyn
* @author Chris Bono
* @author Olga Maciaszek-Sharma
*/
@ExtendWith(MockitoExtension.class)
class ReactiveDiscoveryClientHealthIndicatorTests {
Expand Down Expand Up @@ -78,6 +80,9 @@ public void shouldReturnUnknownStatusWhenNotInitialized() {
@Test
public void shouldReturnUpStatusWhenNotUsingServicesQueryAndProbeSucceeds() {
when(properties.isUseServicesQuery()).thenReturn(false);
ReactiveDiscoveryClient discoveryClient = new TestDiscoveryClient();
ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient,
properties);
Health expectedHealth = Health.status(new Status(Status.UP.getCode(), "")).build();

indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null));
Expand All @@ -88,10 +93,10 @@ public void shouldReturnUpStatusWhenNotUsingServicesQueryAndProbeSucceeds() {

@Test
public void shouldReturnDownStatusWhenNotUsingServicesQueryAndProbeFails() {
when(properties.isUseServicesQuery()).thenReturn(false);
RuntimeException ex = new RuntimeException("something went wrong");
doThrow(ex).when(discoveryClient).probe();
Health expectedHealth = Health.down(ex).build();
ExceptionThrowingDiscoveryClient discoveryClient = new ExceptionThrowingDiscoveryClient();
ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient,
properties);
Health expectedHealth = Health.down(discoveryClient.exception).build();

indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null));
Mono<Health> health = indicator.health();
Expand Down Expand Up @@ -140,4 +145,44 @@ public void shouldReturnDownStatusWhenUsingServicesQueryAndCallFails() {
StepVerifier.create(health).expectNext(expectedHealth).expectComplete().verify();
}

static class TestDiscoveryClient implements ReactiveDiscoveryClient {

@Override
public String description() {
return "Test";
}

@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
return Flux.just(new DefaultServiceInstance());
}

@Override
public Flux<String> getServices() {
return Flux.just("Test");
}

}

static class ExceptionThrowingDiscoveryClient implements ReactiveDiscoveryClient {

RuntimeException exception = new RuntimeException("something went wrong");

@Override
public String description() {
return "Exception";
}

@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
throw new RuntimeException("Test!");
}

@Override
public Flux<String> getServices() {
throw new RuntimeException("something went wrong");
}

}

}
1 change: 1 addition & 0 deletions src/checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
<suppress files=".*RefreshScopeConfigurationTests.*" checks="JavadocStyle"/>
<suppress files=".*RefreshScopeConfigurationTests.*" checks="JavadocMethod"/>
<suppress files=".*CachingServiceInstanceListSupplierTests.*" checks="RegexpSinglelineJava"/>
<suppress files=".*ReactiveDiscoveryClient.*" checks="JavadocVariable"/>
</suppressions>

0 comments on commit 6677d16

Please sign in to comment.