Skip to content

Commit

Permalink
[RESTEASY-3349] Merge pull request #3709 from crankydillo/RESTEASY-33…
Browse files Browse the repository at this point in the history
…49_reactor-processing-exceptions_6.2

Reactor-Netty client: Make sure most exceptions are ProcessingExceptions.
  • Loading branch information
jamezp committed Aug 2, 2023
2 parents d725a99 + ec2e222 commit 4b99018
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ public ReactorNettyClientHttpEngine(final HttpClient httpClient,
public <T> Mono<T> submitRx(final ClientInvocation request,
final boolean buffered,
final ResultExtractor<T> extractor) {

final Mono<ClientResponse> responseMono = send(request)
.responseSingle((response, bytes) -> bytes
.asInputStream()
Expand Down Expand Up @@ -146,7 +147,7 @@ public <T> Mono<T> submitRx(final ClientInvocation request,
return requestTimeout
.map(responseMono::timeout)
.orElse(responseMono)
.handle((response, sink) -> {
.<T> handle((response, sink) -> {
try {
sink.next(extractor.extractResult(response));
} catch (final Exception e) {
Expand All @@ -164,7 +165,7 @@ public <T> Mono<T> submitRx(final ClientInvocation request,
}
sink.error(e);
}
});
}).onErrorMap(err -> clientException(err, null));
}

/**
Expand Down Expand Up @@ -327,7 +328,7 @@ public void close() {
}
}

static RuntimeException clientException(Throwable ex, Response clientResponse) {
static RuntimeException clientException(final Throwable ex, final Response clientResponse) {
RuntimeException ret;
if (ex == null) {
ret = new ProcessingException(new NullPointerException());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.jboss.resteasy.client.jaxrs.engines;

import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.*;

import java.io.IOException;
import java.io.OutputStream;
Expand Down Expand Up @@ -57,10 +55,12 @@
import io.netty.handler.codec.http.QueryStringDecoder;
import io.netty.util.concurrent.DefaultEventExecutor;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.DisposableServer;
import reactor.netty.NettyOutbound;
import reactor.netty.http.HttpResources;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.PrematureCloseException;
import reactor.netty.http.server.HttpServer;
import reactor.netty.http.server.HttpServerRequest;
import reactor.netty.http.server.HttpServerResponse;
Expand Down Expand Up @@ -162,7 +162,11 @@ private static void setupMockServer() {
.post("/headers/content-encoding",
(req, resp) -> headerEcho(req, resp, HttpHeaderNames.CONTENT_ENCODING.toString()))
.post("/headers/content-type",
(req, resp) -> allHeaderEcho(req, resp, HttpHeaderNames.CONTENT_TYPE.toString())))
(req, resp) -> allHeaderEcho(req, resp, HttpHeaderNames.CONTENT_TYPE.toString()))
.get("/kill-conn", (req, resp) -> {
req.withConnection(Connection::disposeNow);
return Mono.empty();
}))
.bindNow();
}

Expand Down Expand Up @@ -305,32 +309,36 @@ public void failed(Throwable throwable) {
}

@Test
public void testAsyncGetWithInvocationCallbackFailed() throws ExecutionException, InterruptedException {
public void testAsyncGetWithInvocationCallbackFailed() {
final Client client = setupClient(HttpClient.create().baseUrl("invalid"));
final AtomicReference<String> entity = new AtomicReference<>();
final AtomicReference<Throwable> entity = new AtomicReference<>();
final Future<String> future = client.target("/hello")
.request()
.async()
.get(new InvocationCallback<String>() {
@Override
public void completed(String s) {
entity.set(s);
entity.set(new RuntimeException("should have failed!"));
}

@Override
public void failed(Throwable throwable) {
if (throwable instanceof CompletionException) {
entity.set(throwable.getCause().getClass().getName());
entity.set(throwable.getCause());
} else {
entity.set(throwable.getClass().getName());
entity.set(throwable);
}
}
});

while (!future.isDone()) {
// Wait till the result is ready.
}
assertEquals(UnknownHostException.class.getName(), entity.get());

final Throwable err = entity.get();
assertEquals(ProcessingException.class, err.getClass());
assertEquals("Expected cause to be an UnknownHostException",
UnknownHostException.class, err.getCause().getClass());
}

@Test
Expand Down Expand Up @@ -711,6 +719,19 @@ public void test500ResponseBodyClosedState() throws Exception {
}
}

@Test
public void testConnKilledBeforeResponse() throws Exception {
try {
final Response response = Mono.fromCompletionStage(client.target(url("/kill-conn")).request()
.rx()
.get()).block();
fail("An exception should have been thrown.");
} catch (final ProcessingException pe) {
assertEquals("Expected ProcessingException with cause: PrematureCloseException",
pe.getCause().getClass(), PrematureCloseException.class);
}
}

private static String incrementAge(final String json) {
final int length = json.length();
final String age = json.substring(length - 2, length - 1);
Expand Down

0 comments on commit 4b99018

Please sign in to comment.