Skip to content

Commit

Permalink
Merge pull request #1299 from rmartinc/UNDERTOW-2027
Browse files Browse the repository at this point in the history
[UNDERTOW-2027] Spurious buffer leak error in the test-suite (HttpClientTestCase.testReadTimeout)
  • Loading branch information
fl4via committed Mar 11, 2022
2 parents 7133a92 + f1b4ba4 commit 4314944
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,18 @@ private Integer getTimeout() {
return timeout;
}

@Override
public void resumeReads() {
super.resumeReads();
if (handle == null) {
try {
handleReadTimeout(1);
} catch (IOException e) {
// impossible as 1 is passed
}
}
}

@Override
public void terminateReads() throws IOException {
checkExpired();
Expand Down
85 changes: 56 additions & 29 deletions core/src/test/java/io/undertow/client/http/HttpClientTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.xnio.Options;
import org.xnio.Xnio;
import org.xnio.XnioWorker;
import org.xnio.channels.ReadTimeoutException;
import org.xnio.channels.StreamSinkChannel;
import org.xnio.ssl.XnioSsl;

Expand All @@ -76,6 +77,7 @@ public class HttpClientTestCase {
private static final String message = "Hello World!";
public static final String MESSAGE = "/message";
public static final String READTIMEOUT = "/readtimeout";
public static final String READTIMEOUT_AT_INIT = "/readtimeout-init";
public static final String POST = "/post";
private static XnioWorker worker;

Expand Down Expand Up @@ -126,19 +128,35 @@ public void handleRequest(HttpServerExchange exchange) throws Exception {
public void handleRequest(HttpServerExchange exchange) throws Exception {
exchange.setStatusCode(StatusCodes.OK);
exchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, 5 + "");
StreamSinkChannel responseChannel = exchange.getResponseChannel();
responseChannel.write(ByteBuffer.wrap(new byte[]{'a', 'b', 'c'}));
responseChannel.flush();
try (StreamSinkChannel responseChannel = exchange.getResponseChannel()) {
responseChannel.write(ByteBuffer.wrap(new byte[]{'a', 'b', 'c'}));
responseChannel.flush();
try {
//READ_TIMEOUT set as 600ms on the client side
//On the server side intentionally sleep 2000ms
//to make READ_TIMEOUT happening at client side
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
responseChannel.write(ByteBuffer.wrap(new byte[]{'d', 'e'}));
}
}
})
.addExactPath(READTIMEOUT_AT_INIT, new HttpHandler() {
@Override
public void handleRequest(HttpServerExchange exchange) throws Exception {
try {
//READ_TIMEOUT set as 600ms on the client side
//On the server side intentionally sleep 2000ms
//to make READ_TIMEOUT happening at client side
// Do the sleep before sending any data to the client
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
responseChannel.write(ByteBuffer.wrap(new byte[]{'d', 'e'}));
responseChannel.close();
exchange.setStatusCode(StatusCodes.OK);
exchange.getResponseHeaders().put(Headers.CONTENT_LENGTH, "5");
try (StreamSinkChannel responseChannel = exchange.getResponseChannel()) {
responseChannel.write(ByteBuffer.wrap(new byte[]{'a', 'b', 'c', 'd', 'e'}));
}
}
})
.addExactPath(POST, new HttpHandler() {
Expand Down Expand Up @@ -347,34 +365,44 @@ public void testReadTimeout() throws Exception {
OptionMap.Builder builder = OptionMap.builder();
builder.set(Options.READ_TIMEOUT, 600);
final ClientConnection connection = client.connect(ADDRESS, worker, DefaultServer.getBufferPool(), builder.getMap()).get();
final ClientRequest request = new ClientRequest().setMethod(Methods.GET).setPath(READTIMEOUT);
request.getRequestHeaders().put(Headers.HOST, DefaultServer.getHostAddress());
connection.sendRequest(request, createClientCallback(responses, latch, false));
try {
connection.getIoThread().execute(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 1; i++) {
final ClientRequest request = new ClientRequest().setMethod(Methods.GET).setPath(READTIMEOUT);
request.getRequestHeaders().put(Headers.HOST, DefaultServer.getHostAddress());
connection.sendRequest(request, createClientCallback(responses, latch, false));
}
}

connection.getIoThread().execute(() -> {
final ClientRequest request = new ClientRequest().setMethod(Methods.GET).setPath(READTIMEOUT);
request.getRequestHeaders().put(Headers.HOST, DefaultServer.getHostAddress());
connection.sendRequest(request, createClientCallback(responses, latch));
});
latch.await(10, TimeUnit.SECONDS);
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
//exception expected because of read timeout
Assert.assertNotNull(exception);
Assert.assertTrue(exception instanceof ReadTimeoutException);
} finally {
IoUtils.safeClose(connection);
connection.getIoThread().execute(() -> IoUtils.safeClose(connection));
}
}

private ClientCallback<ClientExchange> createClientCallback(final List<ClientResponse> responses, final CountDownLatch latch) {
return createClientCallback(responses, latch, true);
@Test
public void testReadTimeoutAtInit() throws Exception {
final UndertowClient client = createClient();
exception = null;

final List<ClientResponse> responses = new CopyOnWriteArrayList<>();
final CountDownLatch latch = new CountDownLatch(1);
OptionMap.Builder builder = OptionMap.builder().set(Options.READ_TIMEOUT, 600);
final ClientConnection connection = client.connect(ADDRESS, worker, DefaultServer.getBufferPool(), builder.getMap()).get();
try {
connection.getIoThread().execute(() -> {
final ClientRequest request = new ClientRequest().setMethod(Methods.GET).setPath(READTIMEOUT_AT_INIT);
request.getRequestHeaders().put(Headers.HOST, DefaultServer.getHostAddress());
connection.sendRequest(request, createClientCallback(responses, latch));
});
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
//exception expected because of read timeout
Assert.assertTrue(exception instanceof ReadTimeoutException);
} finally {
connection.getIoThread().execute(() -> IoUtils.safeClose(connection));
}
}

private ClientCallback<ClientExchange> createClientCallback(final List<ClientResponse> responses, final CountDownLatch latch, final boolean expectedResponse) {
private ClientCallback<ClientExchange> createClientCallback(final List<ClientResponse> responses, final CountDownLatch latch) {
return new ClientCallback<ClientExchange>() {
@Override
public void completed(ClientExchange result) {
Expand All @@ -389,8 +417,7 @@ protected void stringDone(String string) {
// add response only if there is a string or error, or else
// we risk adding keep alive messages in timeout tests
responses.add(result.getResponse());
if (expectedResponse)
latch.countDown();
latch.countDown();
}

@Override
Expand Down
23 changes: 14 additions & 9 deletions core/src/test/java/io/undertow/testutils/DefaultServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -387,22 +387,24 @@ public void testStarted(Description description) throws Exception {

@Override
public void testFinished(Description description) throws Exception {

if (!DebuggingSlicePool.BUFFERS.isEmpty()) {
boolean empty = DebuggingSlicePool.BUFFERS.isEmpty();
if (!empty) {
try {
Thread.sleep(200);
long end = System.currentTimeMillis() + 20000;
while (!DebuggingSlicePool.BUFFERS.isEmpty() && System.currentTimeMillis() < end) {
do {
Thread.sleep(200);
}
empty = DebuggingSlicePool.BUFFERS.isEmpty();
} while (!empty && System.currentTimeMillis() < end);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
for (DebuggingSlicePool.DebuggingBuffer b : DebuggingSlicePool.BUFFERS) {
b.getAllocationPoint().printStackTrace();
notifier.fireTestFailure(new Failure(description, new RuntimeException("Buffer Leak " + b.getLabel(), b.getAllocationPoint())));
if (!empty) {
for (DebuggingSlicePool.DebuggingBuffer b : DebuggingSlicePool.BUFFERS) {
b.getAllocationPoint().printStackTrace();
notifier.fireTestFailure(new Failure(description, new RuntimeException("Buffer Leak " + b.getLabel(), b.getAllocationPoint())));
}
DebuggingSlicePool.BUFFERS.clear();
}
DebuggingSlicePool.BUFFERS.clear();
}
super.testFinished(description);
}
Expand Down Expand Up @@ -576,6 +578,9 @@ public static final void stopServer() {
if (server != null) {
server.close();
}
if (proxyServer != null) {
proxyServer.close();
}
stopSSLServer();
if (worker != null) {
stopWorker(worker);
Expand Down

0 comments on commit 4314944

Please sign in to comment.