From c5a7ea54aae5dfed7c817e5f190c0fb89910a416 Mon Sep 17 00:00:00 2001 From: jfarcand Date: Wed, 29 Jun 2011 14:16:03 -0400 Subject: [PATCH] Fix for https://issues.sonatype.org/browse/AHC-26 ("client.close() "kills" all ongoing requests, but there would be nice to have a method that cleanly shuts down but waits for reqs") --- .../http/client/HttpResponseBodyPart.java | 13 ++++++ .../apache/ApacheResponseBodyPart.java | 17 ++++++++ .../providers/jdk/ResponseBodyPart.java | 17 ++++++++ .../netty/NettyAsyncHttpProvider.java | 14 +++--- .../providers/netty/ResponseBodyPart.java | 19 +++++++- .../client/async/AsyncStreamHandlerTest.java | 43 +++++++++++++++++++ 6 files changed, 117 insertions(+), 6 deletions(-) diff --git a/src/main/java/com/ning/http/client/HttpResponseBodyPart.java b/src/main/java/com/ning/http/client/HttpResponseBodyPart.java index f038992e56..2e1301ea25 100644 --- a/src/main/java/com/ning/http/client/HttpResponseBodyPart.java +++ b/src/main/java/com/ning/http/client/HttpResponseBodyPart.java @@ -57,4 +57,17 @@ public HttpResponseBodyPart(URI uri, AsyncHttpProvider provider) { */ abstract public boolean isLast(); + /** + * Close the underlying connection once the processing has completed. Invoking that method means the + * underlying TCP connection will be closed as soon as the processing of the response is completed. That + * means the underlying connection will never get pooled. + */ + abstract public void markUnderlyingConnectionAsClosed(); + + /** + * Return true of the underlying connection will be closed once the response has been fully processed. + * @return true of the underlying connection will be closed once the response has been fully processed. + */ + abstract public boolean closeUnderlyingConnection(); + } diff --git a/src/main/java/com/ning/http/client/providers/apache/ApacheResponseBodyPart.java b/src/main/java/com/ning/http/client/providers/apache/ApacheResponseBodyPart.java index ef44296fac..af6c74f0dd 100644 --- a/src/main/java/com/ning/http/client/providers/apache/ApacheResponseBodyPart.java +++ b/src/main/java/com/ning/http/client/providers/apache/ApacheResponseBodyPart.java @@ -27,6 +27,7 @@ public class ApacheResponseBodyPart extends HttpResponseBodyPart { private final byte[] chunk; private final boolean isLast; + private boolean closeConnection; public ApacheResponseBodyPart(URI uri, byte[] chunk, AsyncHttpProvider provider, boolean last) { super(uri, provider); @@ -61,4 +62,20 @@ public ByteBuffer getBodyByteBuffer() { public boolean isLast() { return isLast; } + + /** + * {@inheritDoc} + */ + @Override + public void markUnderlyingConnectionAsClosed() { + closeConnection = true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean closeUnderlyingConnection() { + return closeConnection; + } } \ No newline at end of file diff --git a/src/main/java/com/ning/http/client/providers/jdk/ResponseBodyPart.java b/src/main/java/com/ning/http/client/providers/jdk/ResponseBodyPart.java index c80581626f..75115d70a4 100644 --- a/src/main/java/com/ning/http/client/providers/jdk/ResponseBodyPart.java +++ b/src/main/java/com/ning/http/client/providers/jdk/ResponseBodyPart.java @@ -27,6 +27,7 @@ public class ResponseBodyPart extends HttpResponseBodyPart { private final byte[] chunk; private final boolean isLast; + private boolean closeConnection; public ResponseBodyPart(URI uri, byte[] chunk, AsyncHttpProvider provider, boolean last) { super(uri, provider); @@ -61,4 +62,20 @@ public ByteBuffer getBodyByteBuffer() { public boolean isLast() { return isLast; } + + /** + * {@inheritDoc} + */ + @Override + public void markUnderlyingConnectionAsClosed() { + closeConnection = true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean closeUnderlyingConnection() { + return closeConnection; + } } \ No newline at end of file diff --git a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java index 5019e0b59d..6b11f3792f 100644 --- a/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java +++ b/src/main/java/com/ning/http/client/providers/netty/NettyAsyncHttpProvider.java @@ -1254,14 +1254,14 @@ public Object call() throws Exception { return; } else if (!response.isChunked()) { if (response.getContent().readableBytes() != 0) { - updateBodyAndInterrupt(handler, new ResponseBodyPart(future.getURI(), response, this, true)); + updateBodyAndInterrupt(future, handler, new ResponseBodyPart(future.getURI(), response, this, true)); } finishUpdate(future, ctx, false); return; } if (nettyRequest.getMethod().equals(HttpMethod.HEAD)) { - updateBodyAndInterrupt(handler, new ResponseBodyPart(future.getURI(), response, this, true)); + updateBodyAndInterrupt(future, handler, new ResponseBodyPart(future.getURI(), response, this, true)); markAsDone(future, ctx); drainChannel(ctx, future, future.getKeepAlive(), future.getURI()); } @@ -1270,7 +1270,7 @@ public Object call() throws Exception { HttpChunk chunk = (HttpChunk) e.getMessage(); if (handler != null) { - if (chunk.isLast() || updateBodyAndInterrupt(handler, new ResponseBodyPart(future.getURI(), null, this, chunk, chunk.isLast()))) { + if (chunk.isLast() || updateBodyAndInterrupt(future, handler, new ResponseBodyPart(future.getURI(), null, this, chunk, chunk.isLast()))) { if (chunk instanceof DefaultHttpChunkTrailer) { updateHeadersAndInterrupt(handler, new ResponseHeaders(future.getURI(), future.getHttpResponse(), this, (HttpChunkTrailer) chunk)); @@ -1600,8 +1600,12 @@ private final boolean updateHeadersAndInterrupt(AsyncHandler handler, HttpRespon } @SuppressWarnings("unchecked") - private final boolean updateBodyAndInterrupt(AsyncHandler handler, HttpResponseBodyPart c) throws Exception { - return handler.onBodyPartReceived(c) != STATE.CONTINUE; + private final boolean updateBodyAndInterrupt(final NettyResponseFuture future, AsyncHandler handler, HttpResponseBodyPart c) throws Exception { + boolean state = handler.onBodyPartReceived(c) != STATE.CONTINUE; + if (c.closeUnderlyingConnection()) { + future.setKeepAlive(false); + } + return state; } //Simple marker for stopping publishing bytes. diff --git a/src/main/java/com/ning/http/client/providers/netty/ResponseBodyPart.java b/src/main/java/com/ning/http/client/providers/netty/ResponseBodyPart.java index ac53210ede..8279c76d77 100644 --- a/src/main/java/com/ning/http/client/providers/netty/ResponseBodyPart.java +++ b/src/main/java/com/ning/http/client/providers/netty/ResponseBodyPart.java @@ -36,6 +36,7 @@ public class ResponseBodyPart extends HttpResponseBodyPart { private final HttpResponse response; private final AtomicReference bytes = new AtomicReference(null); private final boolean isLast; + private boolean closeConnection = false; public ResponseBodyPart(URI uri, HttpResponse response, AsyncHttpProvider provider, boolean last) { super(uri, provider); @@ -69,7 +70,7 @@ public byte[] getBodyPartBytes() { byte[] rb = new byte[read]; b.readBytes(rb); bytes.set(rb); - b.readerIndex(index); + b.readerIndex(index); return bytes.get(); } @@ -97,6 +98,22 @@ public boolean isLast() { return isLast; } + /** + * {@inheritDoc} + */ + @Override + public void markUnderlyingConnectionAsClosed() { + closeConnection = true; + } + + /** + * {@inheritDoc} + */ + @Override + public boolean closeUnderlyingConnection() { + return closeConnection; + } + protected HttpChunk chunk() { return chunk; } diff --git a/src/test/java/com/ning/http/client/async/AsyncStreamHandlerTest.java b/src/test/java/com/ning/http/client/async/AsyncStreamHandlerTest.java index 33b5142895..8e77bdd99d 100644 --- a/src/test/java/com/ning/http/client/async/AsyncStreamHandlerTest.java +++ b/src/test/java/com/ning/http/client/async/AsyncStreamHandlerTest.java @@ -22,6 +22,7 @@ import com.ning.http.client.HttpResponseBodyPart; import com.ning.http.client.HttpResponseHeaders; import com.ning.http.client.HttpResponseStatus; +import com.ning.http.client.Response; import org.testng.Assert; import org.testng.annotations.Test; @@ -504,4 +505,46 @@ public String onCompleted() throws Exception { } c.close(); } + + @Test(groups = {"standalone", "default_provider"}) + public void closeConnectionTest() throws Throwable { + final CountDownLatch l = new CountDownLatch(1); + AsyncHttpClient c = getAsyncHttpClient(null); + + Response r = c.prepareGet(getTargetUrl()).execute(new AsyncHandler() { + + private Response.ResponseBuilder builder = new Response.ResponseBuilder(); + + public STATE onHeadersReceived(HttpResponseHeaders content) throws Exception { + builder.accumulate(content); + return STATE.CONTINUE; + } + + public void onThrowable(Throwable t) { + } + + public STATE onBodyPartReceived(HttpResponseBodyPart content) throws Exception { + builder.accumulate(content); + + if (content.isLast()) { + content.markUnderlyingConnectionAsClosed(); + } + return STATE.CONTINUE; + } + + public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception { + builder.accumulate(responseStatus); + + return STATE.CONTINUE; + } + + public Response onCompleted() throws Exception { + return builder.build(); + } + }).get(); + + Assert.assertNotNull(r); + Assert.assertEquals(r.getStatusCode(), 200); + c.close(); + } }