Skip to content

Commit

Permalink
Interpret empty mono from status handler as normal response
Browse files Browse the repository at this point in the history
Prior to this commit, returning an empty mono from an exception handler
registered through ResponseSpec::onStatus would result in memory leaks
(since the response was not read) and in an empty response from bodyTo*
methods of the webclient.

As of this commit, that same empty mono is now interpreted to return
the body (and not an exception), offering a way to override the default
status handlers and return a normal response for 4xx and 5xx status
codes.
  • Loading branch information
poutsma committed Jul 16, 2019
1 parent b2b79ae commit a9b3d95
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,6 @@ public HttpHeaders getHeaders() {

private static class DefaultResponseSpec implements ResponseSpec {

private static final StatusHandler DEFAULT_STATUS_HANDLER =
new StatusHandler(HttpStatus::isError, ClientResponse::createException);

private final Mono<ClientResponse> responseMono;

private final Supplier<HttpRequest> requestSupplier;
Expand All @@ -427,72 +424,95 @@ private static class DefaultResponseSpec implements ResponseSpec {
DefaultResponseSpec(Mono<ClientResponse> responseMono, Supplier<HttpRequest> requestSupplier) {
this.responseMono = responseMono;
this.requestSupplier = requestSupplier;
this.statusHandlers.add(DEFAULT_STATUS_HANDLER);
this.statusHandlers.add(new StatusHandler(HttpStatus::isError, ClientResponse::createException));
}

@Override
public ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate,
Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction) {

if (this.statusHandlers.size() == 1 && this.statusHandlers.get(0) == DEFAULT_STATUS_HANDLER) {
this.statusHandlers.clear();
}
this.statusHandlers.add(new StatusHandler(statusPredicate, exceptionFunction));
Assert.notNull(statusPredicate, "StatusPredicate must not be null");
Assert.notNull(exceptionFunction, "Function must not be null");

this.statusHandlers.add(0, new StatusHandler(statusPredicate, exceptionFunction));
return this;
}

@Override
public <T> Mono<T> bodyToMono(Class<T> elementClass) {
return this.responseMono.flatMap(response -> handleBody(response,
response.bodyToMono(elementClass), mono -> mono.flatMap(Mono::error)));
Assert.notNull(elementClass, "ElementClass must not be null");
return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementClass)));
}

@Override
public <T> Mono<T> bodyToMono(ParameterizedTypeReference<T> elementTypeRef) {
return this.responseMono.flatMap(response ->
handleBody(response, response.bodyToMono(elementTypeRef), mono -> mono.flatMap(Mono::error)));
Assert.notNull(elementTypeRef, "ElementTypeRef must not be null");
return this.responseMono.flatMap(response -> handleBodyMono(response, response.bodyToMono(elementTypeRef)));
}

private <T> Mono<T> handleBodyMono(ClientResponse response, Mono<T> bodyPublisher) {
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
Mono<T> result = statusHandlers(response);
if (result != null) {
return result.switchIfEmpty(bodyPublisher);
}
else {
return bodyPublisher;
}
}
else {
return response.createException().flatMap(Mono::error);
}
}

@Override
public <T> Flux<T> bodyToFlux(Class<T> elementClass) {
return this.responseMono.flatMapMany(response ->
handleBody(response, response.bodyToFlux(elementClass), mono -> mono.handle((t, sink) -> sink.error(t))));
Assert.notNull(elementClass, "ElementClass must not be null");
return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementClass)));
}

@Override
public <T> Flux<T> bodyToFlux(ParameterizedTypeReference<T> elementTypeRef) {
return this.responseMono.flatMapMany(response ->
handleBody(response, response.bodyToFlux(elementTypeRef), mono -> mono.handle((t, sink) -> sink.error(t))));
Assert.notNull(elementTypeRef, "ElementTypeRef must not be null");
return this.responseMono.flatMapMany(response -> handleBodyFlux(response, response.bodyToFlux(elementTypeRef)));
}

private <T extends Publisher<?>> T handleBody(ClientResponse response,
T bodyPublisher, Function<Mono<? extends Throwable>, T> errorFunction) {

private <T> Publisher<T> handleBodyFlux(ClientResponse response, Flux<T> bodyPublisher) {
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
for (StatusHandler handler : this.statusHandlers) {
if (handler.test(response.statusCode())) {
Mono<? extends Throwable> exMono;
try {
exMono = handler.apply(response);
exMono = exMono.flatMap(ex -> drainBody(response, ex));
exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
}
catch (Throwable ex2) {
exMono = drainBody(response, ex2);
}
T result = errorFunction.apply(exMono);
HttpRequest request = this.requestSupplier.get();
return insertCheckpoint(result, response.statusCode(), request);
}
Mono<T> result = statusHandlers(response);
if (result != null) {
return result.flux().switchIfEmpty(bodyPublisher);
}
else {
return bodyPublisher;
}
return bodyPublisher;
}
else {
return errorFunction.apply(response.createException());
return response.createException().flatMap(Mono::error);
}
}

@Nullable
private <T> Mono<T> statusHandlers(ClientResponse response) {
for (StatusHandler handler : this.statusHandlers) {
if (handler.test(response.statusCode())) {
Mono<? extends Throwable> exMono;
try {
exMono = handler.apply(response);
exMono = exMono.flatMap(ex -> drainBody(response, ex));
exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
}
catch (Throwable ex2) {
exMono = drainBody(response, ex2);
}
Mono<T> result = exMono.flatMap(Mono::error);
HttpRequest request = this.requestSupplier.get();
return insertCheckpoint(result, response.statusCode(), request);
}
}
return null;
}

@SuppressWarnings("unchecked")
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
// Ensure the body is drained, even if the StatusHandler didn't consume it,
Expand All @@ -501,20 +521,11 @@ private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
.onErrorResume(ex2 -> Mono.empty()).thenReturn(ex);
}

@SuppressWarnings("unchecked")
private <T extends Publisher<?>> T insertCheckpoint(T result, HttpStatus status, HttpRequest request) {
private <T> Mono<T> insertCheckpoint(Mono<T> result, HttpStatus status, HttpRequest request) {
String httpMethod = request.getMethodValue();
URI uri = request.getURI();
String description = status + " from " + httpMethod + " " + uri + " [DefaultWebClient]";
if (result instanceof Mono) {
return (T) ((Mono<?>) result).checkpoint(description);
}
else if (result instanceof Flux) {
return (T) ((Flux<?>) result).checkpoint(description);
}
else {
return result;
}
return result.checkpoint(description);
}


Expand All @@ -527,8 +538,6 @@ private static class StatusHandler {
public StatusHandler(Predicate<HttpStatus> predicate,
Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction) {

Assert.notNull(predicate, "Predicate must not be null");
Assert.notNull(exceptionFunction, "Function must not be null");
this.predicate = predicate;
this.exceptionFunction = exceptionFunction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,17 +671,21 @@ interface ResponseSpec {

/**
* Register a custom error function that gets invoked when the given {@link HttpStatus}
* predicate applies. The exception returned from the function will be returned from
* {@link #bodyToMono(Class)} and {@link #bodyToFlux(Class)}.
* <p>By default, an error handler is register that throws a
* predicate applies. Whatever exception is returned from the function (possibly using
* {@link ClientResponse#createException()}) will also be returned as error signal
* from {@link #bodyToMono(Class)} and {@link #bodyToFlux(Class)}.
* <p>By default, an error handler is registered that returns a
* {@link WebClientResponseException} when the response status code is 4xx or 5xx.
* @param statusPredicate a predicate that indicates whether {@code exceptionFunction}
* applies
* To override this default (and return a non-error response from {@code bodyOn*}), register
* an exception function that returns an {@linkplain Mono#empty() empty} mono.
* <p><strong>NOTE:</strong> if the response is expected to have content,
* the exceptionFunction should consume it. If not, the content will be
* automatically drained to ensure resources are released.
* @param statusPredicate a predicate that indicates whether {@code exceptionFunction}
* applies
* @param exceptionFunction the function that returns the exception
* @return this builder
* @see ClientResponse#createException()
*/
ResponseSpec onStatus(Predicate<HttpStatus> statusPredicate,
Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -626,6 +626,50 @@ public void shouldApplyCustomStatusHandlerParameterizedTypeReference() {
});
}

@Test
public void emptyStatusHandlerShouldReturnBody() {
prepareResponse(response -> response.setResponseCode(500)
.setHeader("Content-Type", "text/plain").setBody("Internal Server error"));

Mono<String> result = this.webClient.get()
.uri("/greeting?name=Spring")
.retrieve()
.onStatus(HttpStatus::is5xxServerError, response -> Mono.empty())
.bodyToMono(String.class);

StepVerifier.create(result)
.expectNext("Internal Server error")
.verifyComplete();

expectRequestCount(1);
expectRequest(request -> {
assertThat(request.getHeader(HttpHeaders.ACCEPT)).isEqualTo("*/*");
assertThat(request.getPath()).isEqualTo("/greeting?name=Spring");
});
}

@Test
public void emptyStatusHandlerShouldReturnBodyFlux() {
prepareResponse(response -> response.setResponseCode(500)
.setHeader("Content-Type", "text/plain").setBody("Internal Server error"));

Flux<String> result = this.webClient.get()
.uri("/greeting?name=Spring")
.retrieve()
.onStatus(HttpStatus::is5xxServerError, response -> Mono.empty())
.bodyToFlux(String.class);

StepVerifier.create(result)
.expectNext("Internal Server error")
.verifyComplete();

expectRequestCount(1);
expectRequest(request -> {
assertThat(request.getHeader(HttpHeaders.ACCEPT)).isEqualTo("*/*");
assertThat(request.getPath()).isEqualTo("/greeting?name=Spring");
});
}

@Test
public void shouldReceiveNotFoundEntity() {
prepareResponse(response -> response.setResponseCode(404)
Expand Down

0 comments on commit a9b3d95

Please sign in to comment.