Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
("client.close() "kills" all ongoing requests, but there would be nice to have a method that cleanly shuts down but waits for reqs")
  • Loading branch information
jfarcand committed Jun 29, 2011
1 parent acf5085 commit c5a7ea5
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 6 deletions.
13 changes: 13 additions & 0 deletions src/main/java/com/ning/http/client/HttpResponseBodyPart.java
Expand Up @@ -57,4 +57,17 @@ public HttpResponseBodyPart(URI uri, AsyncHttpProvider provider) {
*/
abstract public boolean isLast();

/**
* Close the underlying connection once the processing has completed. Invoking that method means the
* underlying TCP connection will be closed as soon as the processing of the response is completed. That
* means the underlying connection will never get pooled.
*/
abstract public void markUnderlyingConnectionAsClosed();

/**
* Return true of the underlying connection will be closed once the response has been fully processed.
* @return true of the underlying connection will be closed once the response has been fully processed.
*/
abstract public boolean closeUnderlyingConnection();

}
Expand Up @@ -27,6 +27,7 @@ public class ApacheResponseBodyPart extends HttpResponseBodyPart {

private final byte[] chunk;
private final boolean isLast;
private boolean closeConnection;

public ApacheResponseBodyPart(URI uri, byte[] chunk, AsyncHttpProvider provider, boolean last) {
super(uri, provider);
Expand Down Expand Up @@ -61,4 +62,20 @@ public ByteBuffer getBodyByteBuffer() {
public boolean isLast() {
return isLast;
}

/**
* {@inheritDoc}
*/
@Override
public void markUnderlyingConnectionAsClosed() {
closeConnection = true;
}

/**
* {@inheritDoc}
*/
@Override
public boolean closeUnderlyingConnection() {
return closeConnection;
}
}
Expand Up @@ -27,6 +27,7 @@ public class ResponseBodyPart extends HttpResponseBodyPart {

private final byte[] chunk;
private final boolean isLast;
private boolean closeConnection;

public ResponseBodyPart(URI uri, byte[] chunk, AsyncHttpProvider provider, boolean last) {
super(uri, provider);
Expand Down Expand Up @@ -61,4 +62,20 @@ public ByteBuffer getBodyByteBuffer() {
public boolean isLast() {
return isLast;
}

/**
* {@inheritDoc}
*/
@Override
public void markUnderlyingConnectionAsClosed() {
closeConnection = true;
}

/**
* {@inheritDoc}
*/
@Override
public boolean closeUnderlyingConnection() {
return closeConnection;
}
}
Expand Up @@ -1254,14 +1254,14 @@ public Object call() throws Exception {
return;
} else if (!response.isChunked()) {
if (response.getContent().readableBytes() != 0) {
updateBodyAndInterrupt(handler, new ResponseBodyPart(future.getURI(), response, this, true));
updateBodyAndInterrupt(future, handler, new ResponseBodyPart(future.getURI(), response, this, true));
}
finishUpdate(future, ctx, false);
return;
}

if (nettyRequest.getMethod().equals(HttpMethod.HEAD)) {
updateBodyAndInterrupt(handler, new ResponseBodyPart(future.getURI(), response, this, true));
updateBodyAndInterrupt(future, handler, new ResponseBodyPart(future.getURI(), response, this, true));
markAsDone(future, ctx);
drainChannel(ctx, future, future.getKeepAlive(), future.getURI());
}
Expand All @@ -1270,7 +1270,7 @@ public Object call() throws Exception {
HttpChunk chunk = (HttpChunk) e.getMessage();

if (handler != null) {
if (chunk.isLast() || updateBodyAndInterrupt(handler, new ResponseBodyPart(future.getURI(), null, this, chunk, chunk.isLast()))) {
if (chunk.isLast() || updateBodyAndInterrupt(future, handler, new ResponseBodyPart(future.getURI(), null, this, chunk, chunk.isLast()))) {
if (chunk instanceof DefaultHttpChunkTrailer) {
updateHeadersAndInterrupt(handler, new ResponseHeaders(future.getURI(),
future.getHttpResponse(), this, (HttpChunkTrailer) chunk));
Expand Down Expand Up @@ -1600,8 +1600,12 @@ private final boolean updateHeadersAndInterrupt(AsyncHandler handler, HttpRespon
}

@SuppressWarnings("unchecked")
private final boolean updateBodyAndInterrupt(AsyncHandler handler, HttpResponseBodyPart c) throws Exception {
return handler.onBodyPartReceived(c) != STATE.CONTINUE;
private final boolean updateBodyAndInterrupt(final NettyResponseFuture<?> future, AsyncHandler handler, HttpResponseBodyPart c) throws Exception {
boolean state = handler.onBodyPartReceived(c) != STATE.CONTINUE;
if (c.closeUnderlyingConnection()) {
future.setKeepAlive(false);
}
return state;
}

//Simple marker for stopping publishing bytes.
Expand Down
Expand Up @@ -36,6 +36,7 @@ public class ResponseBodyPart extends HttpResponseBodyPart {
private final HttpResponse response;
private final AtomicReference<byte[]> bytes = new AtomicReference(null);
private final boolean isLast;
private boolean closeConnection = false;

public ResponseBodyPart(URI uri, HttpResponse response, AsyncHttpProvider provider, boolean last) {
super(uri, provider);
Expand Down Expand Up @@ -69,7 +70,7 @@ public byte[] getBodyPartBytes() {
byte[] rb = new byte[read];
b.readBytes(rb);
bytes.set(rb);
b.readerIndex(index);
b.readerIndex(index);
return bytes.get();
}

Expand Down Expand Up @@ -97,6 +98,22 @@ public boolean isLast() {
return isLast;
}

/**
* {@inheritDoc}
*/
@Override
public void markUnderlyingConnectionAsClosed() {
closeConnection = true;
}

/**
* {@inheritDoc}
*/
@Override
public boolean closeUnderlyingConnection() {
return closeConnection;
}

protected HttpChunk chunk() {
return chunk;
}
Expand Down
Expand Up @@ -22,6 +22,7 @@
import com.ning.http.client.HttpResponseBodyPart;
import com.ning.http.client.HttpResponseHeaders;
import com.ning.http.client.HttpResponseStatus;
import com.ning.http.client.Response;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -504,4 +505,46 @@ public String onCompleted() throws Exception {
}
c.close();
}

@Test(groups = {"standalone", "default_provider"})
public void closeConnectionTest() throws Throwable {
final CountDownLatch l = new CountDownLatch(1);
AsyncHttpClient c = getAsyncHttpClient(null);

Response r = c.prepareGet(getTargetUrl()).execute(new AsyncHandler<Response>() {

private Response.ResponseBuilder builder = new Response.ResponseBuilder();

public STATE onHeadersReceived(HttpResponseHeaders content) throws Exception {
builder.accumulate(content);
return STATE.CONTINUE;
}

public void onThrowable(Throwable t) {
}

public STATE onBodyPartReceived(HttpResponseBodyPart content) throws Exception {
builder.accumulate(content);

if (content.isLast()) {
content.markUnderlyingConnectionAsClosed();
}
return STATE.CONTINUE;
}

public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {
builder.accumulate(responseStatus);

return STATE.CONTINUE;
}

public Response onCompleted() throws Exception {
return builder.build();
}
}).get();

Assert.assertNotNull(r);
Assert.assertEquals(r.getStatusCode(), 200);
c.close();
}
}

0 comments on commit c5a7ea5

Please sign in to comment.