Skip to content

Commit

Permalink
failed connection should be closed
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Sep 14, 2016
1 parent 11602d9 commit bec6ee4
Showing 1 changed file with 17 additions and 4 deletions.
Expand Up @@ -41,6 +41,13 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

/**
* Base connection pool class
*
* @author Nikita Koksharov
*
* @param <T> - connection type
*/
abstract class ConnectionPool<T extends RedisConnection> {

private final Logger log = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -211,8 +218,6 @@ private RFuture<T> createConnection(final ClientConnectionsEntry entry) {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
releaseConnection(entry);

promiseFailure(entry, promise, future.cause());
return;
}
Expand All @@ -223,13 +228,13 @@ public void operationComplete(Future<T> future) throws Exception {
return;
}

promiseSuccessful(entry, promise, conn);
connectedSuccessful(entry, promise, conn);
}
});
return promise;
}

private void promiseSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
entry.resetFailedAttempts();
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
Expand All @@ -247,15 +252,20 @@ private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, T
checkForReconnect(entry);
}

releaseConnection(entry);

promise.tryFailure(cause);
}

private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, T conn) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
conn.closeAsync();
checkForReconnect(entry);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
} else {
conn.closeAsync();
}

releaseConnection(entry);
Expand All @@ -267,9 +277,12 @@ private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, T
private RFuture<T> promiseFailure(ClientConnectionsEntry entry, T conn) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
conn.closeAsync();
checkForReconnect(entry);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
} else {
conn.closeAsync();
}

releaseConnection(entry);
Expand Down

0 comments on commit bec6ee4

Please sign in to comment.