Skip to content

Commit

Permalink
Connection pool refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Jan 30, 2016
1 parent ba18a3c commit b23b6ad
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 24 deletions.
Expand Up @@ -33,7 +33,7 @@
import org.redisson.connection.ClientConnectionsEntry.NodeType; import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.balancer.LoadBalancerManager; import org.redisson.connection.balancer.LoadBalancerManager;
import org.redisson.connection.balancer.LoadBalancerManagerImpl; import org.redisson.connection.balancer.LoadBalancerManagerImpl;
import org.redisson.misc.MasterConnectionPool; import org.redisson.connection.pool.MasterConnectionPool;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand Down
7 changes: 3 additions & 4 deletions src/main/java/org/redisson/connection/SingleEntry.java
Expand Up @@ -25,20 +25,19 @@
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.cluster.ClusterSlotRange; import org.redisson.cluster.ClusterSlotRange;
import org.redisson.connection.ClientConnectionsEntry.NodeType; import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.misc.ConnectionPool; import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;


import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;


public class SingleEntry extends MasterSlaveEntry { public class SingleEntry extends MasterSlaveEntry {


final ConnectionPool<RedisPubSubConnection> pubSubConnectionHolder; final PubSubConnectionPool pubSubConnectionHolder;


public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) { public SingleEntry(Set<ClusterSlotRange> slotRanges, ConnectionManager connectionManager, MasterSlaveServersConfig config) {
super(slotRanges, connectionManager, config); super(slotRanges, connectionManager, config);
pubSubConnectionHolder = new PubSubConnectionPoll(config, connectionManager, this) { pubSubConnectionHolder = new PubSubConnectionPool(config, connectionManager, this) {
protected ClientConnectionsEntry getEntry() { protected ClientConnectionsEntry getEntry() {
return entries.get(0); return entries.get(0);
} }
Expand Down
Expand Up @@ -27,11 +27,11 @@
import org.redisson.client.RedisConnectionException; import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.ClientConnectionsEntry.FreezeReason; import org.redisson.connection.pool.PubSubConnectionPool;
import org.redisson.misc.ConnectionPool; import org.redisson.connection.pool.SlaveConnectionPool;
import org.redisson.misc.PubSubConnectionPoll;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -45,13 +45,13 @@ public class LoadBalancerManagerImpl implements LoadBalancerManager {


private final ConnectionManager connectionManager; private final ConnectionManager connectionManager;
private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap(); private final Map<InetSocketAddress, ClientConnectionsEntry> addr2Entry = PlatformDependent.newConcurrentHashMap();
private final PubSubConnectionPoll pubSubEntries; private final PubSubConnectionPool pubSubEntries;
private final ConnectionPool<RedisConnection> entries; private final SlaveConnectionPool entries;


public LoadBalancerManagerImpl(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) { public LoadBalancerManagerImpl(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry entry) {
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
entries = new ConnectionPool<RedisConnection>(config, connectionManager, entry); entries = new SlaveConnectionPool(config, connectionManager, entry);
pubSubEntries = new PubSubConnectionPoll(config, connectionManager, entry); pubSubEntries = new PubSubConnectionPool(config, connectionManager, entry);
} }


public Future<Void> add(final ClientConnectionsEntry entry) { public Future<Void> add(final ClientConnectionsEntry entry) {
Expand Down
Expand Up @@ -13,8 +13,10 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.misc; package org.redisson.connection.pool;


import java.net.InetSocketAddress;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
Expand All @@ -29,14 +31,18 @@
import org.redisson.connection.ClientConnectionsEntry.NodeType; import org.redisson.connection.ClientConnectionsEntry.NodeType;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;


import io.netty.util.Timeout; import io.netty.util.Timeout;
import io.netty.util.TimerTask; import io.netty.util.TimerTask;
import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise; import io.netty.util.concurrent.Promise;


public class ConnectionPool<T extends RedisConnection> { abstract class ConnectionPool<T extends RedisConnection> {

private final Logger log = LoggerFactory.getLogger(getClass());


protected final List<ClientConnectionsEntry> entries = new CopyOnWriteArrayList<ClientConnectionsEntry>(); protected final List<ClientConnectionsEntry> entries = new CopyOnWriteArrayList<ClientConnectionsEntry>();


Expand Down Expand Up @@ -107,9 +113,7 @@ public void operationComplete(Future<T> future) throws Exception {
} }
} }


protected int getMinimumIdleSize(ClientConnectionsEntry entry) { protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry);
return config.getSlaveConnectionMinimumIdleSize();
}


protected ClientConnectionsEntry getEntry() { protected ClientConnectionsEntry getEntry() {
return config.getLoadBalancer().getEntry(entries); return config.getLoadBalancer().getEntry(entries);
Expand All @@ -123,8 +127,26 @@ public Future<T> get() {
} }
} }


RedisConnectionException exception = new RedisConnectionException( List<InetSocketAddress> zeroConnectionsAmount = new LinkedList<InetSocketAddress>();
"Can't aquire connection from pool! " + entries); List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();
for (ClientConnectionsEntry entry : entries) {
if (entry.isFreezed()) {
freezed.add(entry.getClient().getAddr());
} else {
zeroConnectionsAmount.add(entry.getClient().getAddr());
}
}

StringBuilder errorMsg = new StringBuilder("Connection pool exhausted!");
if (!freezed.isEmpty()) {
errorMsg.append(" disconnected hosts: " + freezed);
}
if (!zeroConnectionsAmount.isEmpty()) {
errorMsg.append(" hosts with (available connections amount) = 0 : " + zeroConnectionsAmount);
}
errorMsg.append(" Try to increase connection pool size.");

RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());
return connectionManager.newFailedFuture(exception); return connectionManager.newFailedFuture(exception);
} }


Expand Down Expand Up @@ -238,9 +260,11 @@ private void checkForReconnect(ClientConnectionsEntry entry) {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(), connectionManager.slaveDown(masterSlaveEntry, entry.getClient().getAddr().getHostName(),
entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
log.warn("slave {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
scheduleCheck(entry); scheduleCheck(entry);
} else { } else {
if (entry.freezeMaster(FreezeReason.RECONNECT)) { if (entry.freezeMaster(FreezeReason.RECONNECT)) {
log.warn("host {} disconnected due to failedAttempts={} limit reached", entry.getClient().getAddr(), config.getFailedAttempts());
scheduleCheck(entry); scheduleCheck(entry);
} }
} }
Expand Down Expand Up @@ -297,11 +321,13 @@ public void operationComplete(Future<Void> future)
throws Exception { throws Exception {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT); masterSlaveEntry.slaveUp(entry.getClient().getAddr().getHostName(), entry.getClient().getAddr().getPort(), FreezeReason.RECONNECT);
log.info("slave {} successfully reconnected", entry.getClient().getAddr());
} else { } else {
synchronized (entry) { synchronized (entry) {
if (entry.getFreezeReason() == FreezeReason.RECONNECT) { if (entry.getFreezeReason() == FreezeReason.RECONNECT) {
entry.setFreezed(false); entry.setFreezed(false);
entry.setFreezeReason(null); entry.setFreezeReason(null);
log.info("host {} successfully reconnected", entry.getClient().getAddr());
} }
} }
} }
Expand Down
Expand Up @@ -13,16 +13,16 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.misc; package org.redisson.connection.pool;


import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.connection.ConnectionManager; import org.redisson.connection.ConnectionManager;
import org.redisson.connection.MasterSlaveEntry; import org.redisson.connection.MasterSlaveEntry;
import org.redisson.connection.ClientConnectionsEntry; import org.redisson.connection.ClientConnectionsEntry;


public class MasterConnectionPool extends ConnectionPool<RedisConnection> {


public class MasterConnectionPool extends ConnectionPool<RedisConnection> {
public MasterConnectionPool(MasterSlaveServersConfig config, public MasterConnectionPool(MasterSlaveServersConfig config,
ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
super(config, connectionManager, masterSlaveEntry); super(config, connectionManager, masterSlaveEntry);
Expand Down
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package org.redisson.misc; package org.redisson.connection.pool;


import org.redisson.MasterSlaveServersConfig; import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
Expand All @@ -23,9 +23,9 @@


import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;


public class PubSubConnectionPoll extends ConnectionPool<RedisPubSubConnection> { public class PubSubConnectionPool extends ConnectionPool<RedisPubSubConnection> {


public PubSubConnectionPoll(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) { public PubSubConnectionPool(MasterSlaveServersConfig config, ConnectionManager connectionManager, MasterSlaveEntry masterSlaveEntry) {
super(config, connectionManager, masterSlaveEntry); super(config, connectionManager, masterSlaveEntry);
} }


Expand Down

0 comments on commit b23b6ad

Please sign in to comment.