Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Reactive probe #1201

Merged
merged 3 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package org.springframework.cloud.client.discovery;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.springframework.cloud.client.ServiceInstance;
import org.springframework.core.Ordered;
Expand All @@ -26,9 +29,12 @@
* Eureka or consul.io.
*
* @author Tim Ysewyn
* @author Olga Maciaszek-Sharma
*/
public interface ReactiveDiscoveryClient extends Ordered {

Log LOG = LogFactory.getLog(ReactiveDiscoveryClient.class);

/**
* Default order of the discovery client.
*/
Expand Down Expand Up @@ -60,11 +66,35 @@ public interface ReactiveDiscoveryClient extends Ordered {
* <p>
* The default implementation simply calls {@link #getServices()} - client
* implementations can override with a lighter weight operation if they choose to.
* @deprecated in favour of {@link ReactiveDiscoveryClient#reactiveProbe()}. This
* method should not be used as is, as it contains a bug - the method called within
* returns a {@link Flux}, which is not accessible for subscription or blocking from
* within. We are leaving it with a deprecation in order not to bring downstream
* implementations.
*/
@Deprecated
default void probe() {
if (LOG.isWarnEnabled()) {
LOG.warn("ReactiveDiscoveryClient#probe has been called. If you're calling this method directly, "
+ "use ReactiveDiscoveryClient#reactiveProbe instead.");
}
getServices();
}

/**
* Can be used to verify the client is still valid and able to make calls.
* <p>
* A successful invocation with no exception thrown implies the client is able to make
* calls.
* <p>
* The default implementation simply calls {@link #getServices()} and wraps it with a
* {@link Mono} - client implementations can override with a lighter weight operation
* if they choose to.
*/
default Mono<Void> reactiveProbe() {
return getServices().then();
}

/**
* Default implementation for getting order of discovery clients.
* @return order
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,21 +33,21 @@
import static java.util.Collections.emptyList;

/**
* A health indicator which indicates whether or not the discovery client has been
* initialized.
* A health indicator which indicates whether the discovery client has been initialized.
*
* @author Tim Ysewyn
* @author Chris Bono
* @author Olga Maciaszek-Sharma
*/
public class ReactiveDiscoveryClientHealthIndicator
implements ReactiveDiscoveryHealthIndicator, Ordered, ApplicationListener<InstanceRegisteredEvent<?>> {

private static final Log LOG = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class);

private final ReactiveDiscoveryClient discoveryClient;

private final DiscoveryClientHealthIndicatorProperties properties;

private final Log log = LogFactory.getLog(ReactiveDiscoveryClientHealthIndicator.class);

private AtomicBoolean discoveryInitialized = new AtomicBoolean(false);

private int order = Ordered.HIGHEST_PRECEDENCE;
Expand All @@ -60,14 +60,14 @@ public ReactiveDiscoveryClientHealthIndicator(ReactiveDiscoveryClient discoveryC

@Override
public void onApplicationEvent(InstanceRegisteredEvent<?> event) {
if (this.discoveryInitialized.compareAndSet(false, true)) {
this.log.debug("Discovery Client has been initialized");
if (discoveryInitialized.compareAndSet(false, true)) {
LOG.debug("Discovery Client has been initialized");
}
}

@Override
public Mono<Health> health() {
if (this.discoveryInitialized.get()) {
if (discoveryInitialized.get()) {
return doHealthCheck();
}
else {
Expand All @@ -78,38 +78,39 @@ public Mono<Health> health() {

private Mono<Health> doHealthCheck() {
// @formatter:off
return Mono.just(this.properties.isUseServicesQuery())
return Mono.just(properties.isUseServicesQuery())
.flatMap(useServices -> useServices ? doHealthCheckWithServices() : doHealthCheckWithProbe())
.onErrorResume(exception -> {
this.log.error("Error", exception);
if (LOG.isErrorEnabled()) {
LOG.error("Error", exception);
}
return Mono.just(Health.down().withException(exception).build());
});
// @formatter:on
}

private Mono<Health> doHealthCheckWithProbe() {
// @formatter:off
return Mono.justOrEmpty(this.discoveryClient)
.flatMap(client -> {
client.probe();
return Mono.just(client);
})
.map(client -> {
String description = (this.properties.isIncludeDescription()) ? client.description() : "";
return Health.status(new Status("UP", description)).build();
});
// @formatter:on
return discoveryClient.reactiveProbe().doOnError(exception -> {
if (LOG.isErrorEnabled()) {
LOG.error("Probe has failed.", exception);
}
}).then(buildHealthUp(discoveryClient));
}

private Mono<Health> buildHealthUp(ReactiveDiscoveryClient discoveryClient) {
String description = (properties.isIncludeDescription()) ? discoveryClient.description() : "";
return Mono.just(Health.status(new Status("UP", description)).build());
}

private Mono<Health> doHealthCheckWithServices() {
// @formatter:off
return Mono.justOrEmpty(this.discoveryClient)
return Mono.justOrEmpty(discoveryClient)
.flatMapMany(ReactiveDiscoveryClient::getServices)
.collectList()
.defaultIfEmpty(emptyList())
.map(services -> {
String description = (this.properties.isIncludeDescription()) ?
this.discoveryClient.description() : "";
String description = (properties.isIncludeDescription()) ?
discoveryClient.description() : "";
return Health.status(new Status("UP", description))
.withDetail("services", services).build();
});
Expand All @@ -123,7 +124,7 @@ public String getName() {

@Override
public int getOrder() {
return this.order;
return order;
}

public void setOrder(int order) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@

import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.Status;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.ServiceInstance;
import org.springframework.cloud.client.discovery.ReactiveDiscoveryClient;
import org.springframework.cloud.client.discovery.event.InstanceRegisteredEvent;
import org.springframework.cloud.client.discovery.health.DiscoveryClientHealthIndicatorProperties;
Expand All @@ -35,12 +37,12 @@
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.when;

/**
* @author Tim Ysewyn
* @author Chris Bono
* @author Olga Maciaszek-Sharma
*/
@ExtendWith(MockitoExtension.class)
class ReactiveDiscoveryClientHealthIndicatorTests {
Expand Down Expand Up @@ -78,6 +80,9 @@ public void shouldReturnUnknownStatusWhenNotInitialized() {
@Test
public void shouldReturnUpStatusWhenNotUsingServicesQueryAndProbeSucceeds() {
when(properties.isUseServicesQuery()).thenReturn(false);
ReactiveDiscoveryClient discoveryClient = new TestDiscoveryClient();
ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient,
properties);
Health expectedHealth = Health.status(new Status(Status.UP.getCode(), "")).build();

indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null));
Expand All @@ -88,10 +93,10 @@ public void shouldReturnUpStatusWhenNotUsingServicesQueryAndProbeSucceeds() {

@Test
public void shouldReturnDownStatusWhenNotUsingServicesQueryAndProbeFails() {
when(properties.isUseServicesQuery()).thenReturn(false);
RuntimeException ex = new RuntimeException("something went wrong");
doThrow(ex).when(discoveryClient).probe();
Health expectedHealth = Health.down(ex).build();
ExceptionThrowingDiscoveryClient discoveryClient = new ExceptionThrowingDiscoveryClient();
ReactiveDiscoveryClientHealthIndicator indicator = new ReactiveDiscoveryClientHealthIndicator(discoveryClient,
properties);
Health expectedHealth = Health.down(discoveryClient.exception).build();

indicator.onApplicationEvent(new InstanceRegisteredEvent<>(this, null));
Mono<Health> health = indicator.health();
Expand Down Expand Up @@ -140,4 +145,44 @@ public void shouldReturnDownStatusWhenUsingServicesQueryAndCallFails() {
StepVerifier.create(health).expectNext(expectedHealth).expectComplete().verify();
}

static class TestDiscoveryClient implements ReactiveDiscoveryClient {

@Override
public String description() {
return "Test";
}

@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
return Flux.just(new DefaultServiceInstance());
}

@Override
public Flux<String> getServices() {
return Flux.just("Test");
}

}

static class ExceptionThrowingDiscoveryClient implements ReactiveDiscoveryClient {

RuntimeException exception = new RuntimeException("something went wrong");

@Override
public String description() {
return "Exception";
}

@Override
public Flux<ServiceInstance> getInstances(String serviceId) {
throw new RuntimeException("Test!");
}

@Override
public Flux<String> getServices() {
throw new RuntimeException("something went wrong");
}

}

}
1 change: 1 addition & 0 deletions src/checkstyle/checkstyle-suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,5 @@
<suppress files=".*RefreshScopeConfigurationTests.*" checks="JavadocStyle"/>
<suppress files=".*RefreshScopeConfigurationTests.*" checks="JavadocMethod"/>
<suppress files=".*CachingServiceInstanceListSupplierTests.*" checks="RegexpSinglelineJava"/>
<suppress files=".*ReactiveDiscoveryClient.*" checks="JavadocVariable"/>
</suppressions>