Skip to content

Commit

Permalink
improve auth reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
fransiskusx committed Jun 3, 2016
1 parent 645b124 commit a3c495f
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 5 deletions.
Expand Up @@ -19,6 +19,7 @@
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;


import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.ReconnectListener; import org.redisson.client.ReconnectListener;
import org.redisson.client.RedisClient; import org.redisson.client.RedisClient;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
Expand Down Expand Up @@ -186,6 +187,10 @@ public void operationComplete(Future<T> future) throws Exception {
}); });
} }


public MasterSlaveServersConfig getConfig() {
return connectionManager.getConfig();
}

public Future<RedisPubSubConnection> connectPubSub() { public Future<RedisPubSubConnection> connectPubSub() {
final Promise<RedisPubSubConnection> connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise(); final Promise<RedisPubSubConnection> connectionFuture = ImmediateEventExecutor.INSTANCE.newPromise();
Future<RedisPubSubConnection> future = client.connectPubSubAsync(); Future<RedisPubSubConnection> future = client.connectPubSubAsync();
Expand Down
28 changes: 23 additions & 5 deletions src/main/java/org/redisson/connection/pool/ConnectionPool.java
Expand Up @@ -326,13 +326,12 @@ public void operationComplete(Future<RedisConnection> future) throws Exception {
return; return;
} }


Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING); final FutureListener<String> pingListener = new FutureListener<String>() {
f.addListener(new FutureListener<String>() {
@Override @Override
public void operationComplete(Future<String> future) throws Exception { public void operationComplete(Future<String> future) throws Exception {
try { try {
if (entry.getFreezeReason() != FreezeReason.RECONNECT if (entry.getFreezeReason() != FreezeReason.RECONNECT
|| !entry.isFreezed()) { || !entry.isFreezed()) {
return; return;
} }


Expand All @@ -342,7 +341,7 @@ public void operationComplete(Future<String> future) throws Exception {
promise.addListener(new FutureListener<Void>() { promise.addListener(new FutureListener<Void>() {
@Override @Override
public void operationComplete(Future<Void> future) public void operationComplete(Future<Void> future)
throws Exception { throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
log.info("slave {} successfully reconnected", entry.getClient().getAddr()); log.info("slave {} successfully reconnected", entry.getClient().getAddr());
Expand All @@ -365,13 +364,32 @@ public void operationComplete(Future<Void> future)
c.closeAsync(); c.closeAsync();
} }
} }
}); };

if (entry.getConfig().getPassword() != null) {
Future<Void> temp = c.asyncWithTimeout(null, RedisCommands.AUTH, config.getPassword());

FutureListener<Void> listener = new FutureListener<Void> () {
@Override public void operationComplete (Future < Void > future)throws Exception {
ping(c, pingListener);
}
};

temp.addListener(listener);
} else {
ping(c, pingListener);
}
} }
}); });
} }
}, config.getReconnectionTimeout(), TimeUnit.MILLISECONDS); }, config.getReconnectionTimeout(), TimeUnit.MILLISECONDS);
} }


private void ping(RedisConnection c, final FutureListener<String> pingListener) {
Future<String> f = c.asyncWithTimeout(null, RedisCommands.PING);
f.addListener(pingListener);
}

public void returnConnection(ClientConnectionsEntry entry, T connection) { public void returnConnection(ClientConnectionsEntry entry, T connection) {
if (entry.isFreezed()) { if (entry.isFreezed()) {
connection.closeAsync(); connection.closeAsync();
Expand Down

0 comments on commit a3c495f

Please sign in to comment.