Skip to content

Commit

Permalink
PubSubConnectionPool initialization fixed. #550
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jul 14, 2016
1 parent 8cb409f commit 5577a2c
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 22 deletions.
Expand Up @@ -17,6 +17,7 @@


import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;


import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
Expand All @@ -33,6 +34,7 @@


import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;


public class LoadBalancerManagerImpl implements LoadBalancerManager { public class LoadBalancerManagerImpl implements LoadBalancerManager {
Expand All @@ -41,25 +43,37 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {


private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap(); private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPool pubSubEntries; private final PubSubConnectionPool pubSubConnectionPool;
private final SlaveConnectionPool entries; private final SlaveConnectionPool slaveConnectionPool;


public LoadBalancerManagerImpl(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { public LoadBalancerManagerImpl(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
entries = new SlaveConnectionPool(config, connectionManager, entry); slaveConnectionPool = new SlaveConnectionPool(config, connectionManager, entry);
pubSubEntries = new PubSubConnectionPool(config, connectionManager, entry); pubSubConnectionPool = new PubSubConnectionPool(config, connectionManager, entry);
} }


public Future<Void> add(final ClientConnectionsEntry entry) { public Future<Void> add(ClientConnectionsEntry entry) {
Future<Void> f = entries.add(entry); final Promise<Void> result = connectionManager.newPromise();
f.addListener(new FutureListener<Void>() { FutureListener<Void> listener = new FutureListener<Void>() {
AtomicInteger counter = new AtomicInteger(2);
@Override @Override
public void operationComplete(Future<Void> future) throws Exception { public void operationComplete(Future<Void> future) throws Exception {
addr2Entry.put(entry.getClient().getAddr(), entry); if (!future.isSuccess()) {
pubSubEntries.add(entry); result.tryFailure(future.cause());
return;
}
if (counter.decrementAndGet() == 0) {
addr2Entry.put(entry.getClient().getAddr(), entry);
result.setSuccess(null);
}
} }
}); };
return f;
Future<Void> slaveFuture = slaveConnectionPool.add(entry);
slaveFuture.addListener(listener);
Future<Void> pubSubFuture = pubSubConnectionPool.add(entry);
pubSubFuture.addListener(listener);
return result;
} }


public int getAvailableClients() { public int getAvailableClients() {
Expand Down Expand Up @@ -123,30 +137,30 @@ public ClientConnectionsEntry freeze(ClientConnectionsEntry connectionEntry, Fre
} }


public Future<RedisPubSubConnection> nextPubSubConnection() { public Future<RedisPubSubConnection> nextPubSubConnection() {
return pubSubEntries.get(); return pubSubConnectionPool.get();
} }


public Future<RedisConnection> getConnection(InetSocketAddress addr) { public Future<RedisConnection> getConnection(InetSocketAddress addr) {
ClientConnectionsEntry entry = addr2Entry.get(addr); ClientConnectionsEntry entry = addr2Entry.get(addr);
if (entry != null) { if (entry != null) {
return entries.get(entry); return slaveConnectionPool.get(entry);
} }
RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr); RedisConnectionException exception = new RedisConnectionException("Can't find entry for " + addr);
return connectionManager.newFailedFuture(exception); return connectionManager.newFailedFuture(exception);
} }


public Future<RedisConnection> nextConnection() { public Future<RedisConnection> nextConnection() {
return entries.get(); return slaveConnectionPool.get();
} }


public void returnPubSubConnection(RedisPubSubConnection connection) { public void returnPubSubConnection(RedisPubSubConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
pubSubEntries.returnConnection(entry, connection); pubSubConnectionPool.returnConnection(entry, connection);
} }


public void returnConnection(RedisConnection connection) { public void returnConnection(RedisConnection connection) {
ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr()); ClientConnectionsEntry entry = addr2Entry.get(connection.getRedisClient().getAddr());
entries.returnConnection(entry, connection); slaveConnectionPool.returnConnection(entry, connection);
} }


public void shutdown() { public void shutdown() {
Expand Down
Expand Up @@ -153,12 +153,7 @@ public Future<T> get() {
} }
} }


StringBuilder errorMsg; StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " exhausted! ");
if (connectionManager.isClusterMode()) {
errorMsg = new StringBuilder("Connection pool exhausted! for slots: " + masterSlaveEntry.getSlotRanges());
} else {
errorMsg = new StringBuilder("Connection pool exhausted! ");
}
if (!freezed.isEmpty()) { if (!freezed.isEmpty()) {
errorMsg.append(" Disconnected hosts: " + freezed); errorMsg.append(" Disconnected hosts: " + freezed);
} }
Expand Down

0 comments on commit 5577a2c

Please sign in to comment.