diff --git a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java index 1d9c3d6da82..6faf953b819 100644 --- a/src/main/java/org/redisson/connection/ClientConnectionsEntry.java +++ b/src/main/java/org/redisson/connection/ClientConnectionsEntry.java @@ -19,6 +19,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; +import org.redisson.MasterSlaveServersConfig; import org.redisson.client.ReconnectListener; import org.redisson.client.RedisClient; import org.redisson.client.RedisConnection; @@ -186,6 +187,10 @@ public void operationComplete(Future future) throws Exception { }); } + public MasterSlaveServersConfig getConfig() { + return connectionManager.getConfig(); + } + public Future connectPubSub() { final Promise connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise(); Future future = client.connectPubSubAsync(); diff --git a/src/main/java/org/redisson/connection/pool/ConnectionPool.java b/src/main/java/org/redisson/connection/pool/ConnectionPool.java index 3c63d632bef..6d46625cb3a 100644 --- a/src/main/java/org/redisson/connection/pool/ConnectionPool.java +++ b/src/main/java/org/redisson/connection/pool/ConnectionPool.java @@ -326,13 +326,12 @@ public void operationComplete(Future future) throws Exception { return; } - Future f = c.asyncWithTimeout(null, RedisCommands.PING); - f.addListener(new FutureListener() { + final FutureListener pingListener = new FutureListener() { @Override public void operationComplete(Future future) throws Exception { try { if (entry.getFreezeReason() != FreezeReason.RECONNECT - || !entry.isFreezed()) { + || !entry.isFreezed()) { return; } @@ -342,7 +341,7 @@ public void operationComplete(Future future) throws Exception { promise.addListener(new FutureListener() { @Override public void operationComplete(Future future) - throws Exception { + throws Exception { if (entry.getNodeType() == NodeType.SLAVE) { masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); log.info("slave {} successfully reconnected", entry.getClient().getAddr()); @@ -365,13 +364,32 @@ public void operationComplete(Future future) c.closeAsync(); } } - }); + }; + + if (entry.getConfig().getPassword() != null) { + Future temp = c.asyncWithTimeout(null, RedisCommands.AUTH, config.getPassword()); + + FutureListener listener = new FutureListener () { + @Override public void operationComplete (Future < Void > future)throws Exception { + ping(c, pingListener); + } + }; + + temp.addListener(listener); + } else { + ping(c, pingListener); + } } }); } }, config.getReconnectionTimeout(), TimeUnit.MILLISECONDS); } + private void ping(RedisConnection c, final FutureListener pingListener) { + Future f = c.asyncWithTimeout(null, RedisCommands.PING); + f.addListener(pingListener); + } + public void returnConnection(ClientConnectionsEntry entry, T connection) { if (entry.isFreezed()) { connection.closeAsync();