Skip to content

Commit

Permalink
Update execute logic to make the connection async.
Browse files Browse the repository at this point in the history
  • Loading branch information
rlubke committed Sep 17, 2011
1 parent 71d54d8 commit 7f466b4
Showing 1 changed file with 90 additions and 12 deletions.
Expand Up @@ -167,10 +167,41 @@ public <T> ListenableFuture<T> execute(final Request request,
final GrizzlyResponseFuture<T> future =
new GrizzlyResponseFuture<T>(this, request, handler);
future.setDelegate(SafeFutureImpl.<T>create());
final Connection c;
final CompletionHandler<Connection> connectHandler = new CompletionHandler<Connection>() {
@Override
public void cancelled() {
future.cancel(true);
}

@Override
public void failed(final Throwable throwable) {
future.abort(throwable);
}

@Override
public void completed(final Connection c) {
try {
execute(c, request, handler, future);
} catch (Exception e) {
if (e instanceof RuntimeException) {
failed(e);
} else if (e instanceof IOException) {
failed(e);
}
if (LOGGER.isWarnEnabled()) {
LOGGER.warn(e.toString(), e);
}
}
}

@Override
public void updated(final Connection c) {
// no-op
}
};

try {
c = connectionManager.obtainTrackedConnection(request, future);
execute(c, request, handler, future);
connectionManager.doAsyncTrackedConnection(request, future, connectHandler);
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
Expand All @@ -185,9 +216,6 @@ public <T> ListenableFuture<T> execute(final Request request,
return future;
}




/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -1926,7 +1954,25 @@ static boolean isConnectionCacheable(final Connection c) {
return ((canCache != null) ? canCache : false);
}

private void doAsyncTrackedConnection(final Request request,
final GrizzlyResponseFuture requestFuture,
final CompletionHandler<Connection> connectHandler)
throws IOException, ExecutionException, InterruptedException {
final String url = request.getUrl();
Connection c = pool.poll(AsyncHttpProviderUtils.getBaseUrl(url));
if (c == null) {
if (!connectionMonitor.acquire()) {
throw new IOException("Max connections exceeded");
}
doAsyncConnect(url, request, requestFuture, connectHandler);
} else {
provider.touchConnection(c, request);
connectHandler.completed(c);
}

}

/*
Connection obtainTrackedConnection(final Request request,
final GrizzlyResponseFuture requestFuture)
throws IOException, ExecutionException, InterruptedException {
Expand All @@ -1945,7 +1991,7 @@ Connection obtainTrackedConnection(final Request request,
return c;
}

*/

Connection obtainConnection(final Request request,
final GrizzlyResponseFuture requestFuture)
Expand All @@ -1959,6 +2005,23 @@ Connection obtainConnection(final Request request,

}

private void doAsyncConnect(final String url,
final Request request,
final GrizzlyResponseFuture requestFuture,
final CompletionHandler<Connection> connectHandler)
throws IOException, ExecutionException, InterruptedException {

final URI uri = AsyncHttpProviderUtils.createUri(url);
ProxyServer proxy = getProxyServer(request);
if (ProxyUtils.avoidProxy(proxy, request)) {
proxy = null;
}
String host = ((proxy != null) ? proxy.getHost() : uri.getHost());
int port = ((proxy != null) ? proxy.getPort() : uri.getPort());
connectionHandler.connect(new InetSocketAddress(host, getPort(uri, port)),
createConnectionCompletionHandler(request, requestFuture, connectHandler));

}

private Connection obtainConnection0(final String url,
final Request request,
Expand All @@ -1973,7 +2036,7 @@ private Connection obtainConnection0(final String url,
String host = ((proxy != null) ? proxy.getHost() : uri.getHost());
int port = ((proxy != null) ? proxy.getPort() : uri.getPort());
return connectionHandler.connect(new InetSocketAddress(host, getPort(uri, port)),
createConnectionCompletionHandler(request, requestFuture)).get();
createConnectionCompletionHandler(request, requestFuture, null)).get();

}

Expand Down Expand Up @@ -2012,23 +2075,38 @@ void destroy() {
}

CompletionHandler<Connection> createConnectionCompletionHandler(final Request request,
final GrizzlyResponseFuture future) {
final GrizzlyResponseFuture future,
final CompletionHandler<Connection> wrappedHandler) {
return new CompletionHandler<Connection>() {
public void cancelled() {
future.cancel(true);
if (wrappedHandler != null) {
wrappedHandler.cancelled();
} else {
future.cancel(true);
}
}

public void failed(Throwable throwable) {
future.abort(throwable);
if (wrappedHandler != null) {
wrappedHandler.failed(throwable);
} else {
future.abort(throwable);
}
}

public void completed(Connection connection) {
future.setConnection(connection);
provider.touchConnection(connection, request);
if (wrappedHandler != null) {
connection.addCloseListener(connectionMonitor);
wrappedHandler.completed(connection);
}
}

public void updated(Connection result) {
// no-op
if (wrappedHandler != null) {
wrappedHandler.updated(result);
}
}
};
}
Expand Down

0 comments on commit 7f466b4

Please sign in to comment.