From df928fe20f12549d1e7b627becd99e18505a3ef1 Mon Sep 17 00:00:00 2001 From: Nikita Date: Mon, 19 Oct 2015 20:20:37 +0300 Subject: [PATCH] Connection pool further optimization. #223 --- .../java/org/redisson/client/RedisClient.java | 21 +++++- .../redisson/connection/ConnectionEntry.java | 55 +++++++++++----- .../MasterSlaveConnectionManager.java | 41 ++++++------ .../connection/SubscribesConnectionEntry.java | 20 ++++-- .../org/redisson/misc/ConnectionPool.java | 66 ++++++++++++------- .../redisson/misc/PubSubConnectionPoll.java | 3 +- 6 files changed, 135 insertions(+), 71 deletions(-) diff --git a/src/main/java/org/redisson/client/RedisClient.java b/src/main/java/org/redisson/client/RedisClient.java index 623d89e2a83..6baf9faa23b 100644 --- a/src/main/java/org/redisson/client/RedisClient.java +++ b/src/main/java/org/redisson/client/RedisClient.java @@ -88,7 +88,7 @@ public RedisConnection connect() { future.syncUninterruptibly(); return new RedisConnection(this, future.channel()); } catch (Exception e) { - throw new RedisConnectionException("unable to connect", e); + throw new RedisConnectionException("Unable to connect to " + addr, e); } } @@ -115,10 +115,27 @@ public RedisPubSubConnection connectPubSub() { future.syncUninterruptibly(); return new RedisPubSubConnection(this, future.channel()); } catch (Exception e) { - throw new RedisConnectionException("unable to connect", e); + throw new RedisConnectionException("Unable to connect to " + addr, e); } } + public Future connectPubSubAsync() { + final Promise f = bootstrap.group().next().newPromise(); + ChannelFuture channelFuture = bootstrap.connect(); + channelFuture.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) throws Exception { + if (future.isSuccess()) { + RedisPubSubConnection c = new RedisPubSubConnection(RedisClient.this, future.channel()); + f.setSuccess(c); + } else { + f.setFailure(future.cause()); + } + } + }); + return f; + } + public void shutdown() { shutdownAsync().syncUninterruptibly(); } diff --git a/src/main/java/org/redisson/connection/ConnectionEntry.java b/src/main/java/org/redisson/connection/ConnectionEntry.java index d965f98d021..5f83125682d 100644 --- a/src/main/java/org/redisson/connection/ConnectionEntry.java +++ b/src/main/java/org/redisson/connection/ConnectionEntry.java @@ -28,6 +28,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + public class ConnectionEntry { final Logger log = LoggerFactory.getLogger(getClass()); @@ -82,19 +85,27 @@ public void releaseConnection(RedisConnection connection) { connections.add(connection); } - public RedisConnection connect(final MasterSlaveServersConfig config) { - RedisConnection conn = client.connect(); - log.debug("new connection created: {}", conn); - - prepareConnection(config, conn); - conn.setReconnectListener(new ReconnectListener() { + public Future connect(final MasterSlaveServersConfig config) { + Future future = client.connectAsync(); + future.addListener(new FutureListener() { @Override - public void onReconnect(RedisConnection conn) { + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + RedisConnection conn = future.getNow(); + log.debug("new connection created: {}", conn); + prepareConnection(config, conn); + conn.setReconnectListener(new ReconnectListener() { + @Override + public void onReconnect(RedisConnection conn) { + prepareConnection(config, conn); + } + }); } }); - - return conn; + return future; } private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) { @@ -109,19 +120,27 @@ private void prepareConnection(MasterSlaveServersConfig config, RedisConnection } } - public RedisPubSubConnection connectPubSub(final MasterSlaveServersConfig config) { - RedisPubSubConnection conn = client.connectPubSub(); - log.debug("new pubsub connection created: {}", conn); - - prepareConnection(config, conn); - conn.setReconnectListener(new ReconnectListener() { + public Future connectPubSub(final MasterSlaveServersConfig config) { + Future future = client.connectPubSubAsync(); + future.addListener(new FutureListener() { @Override - public void onReconnect(RedisConnection conn) { + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + RedisPubSubConnection conn = future.getNow(); + log.debug("new pubsub connection created: {}", conn); + prepareConnection(config, conn); + conn.setReconnectListener(new ReconnectListener() { + @Override + public void onReconnect(RedisConnection conn) { + prepareConnection(config, conn); + } + }); } }); - - return conn; + return future; } @Override diff --git a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java index 640e05f463e..9f61bf53f03 100644 --- a/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java +++ b/src/main/java/org/redisson/connection/MasterSlaveConnectionManager.java @@ -52,6 +52,7 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; +import io.netty.util.internal.PlatformDependent; /** * @@ -73,7 +74,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager { protected Class socketChannelClass; - protected final ConcurrentMap name2PubSubConnection = new ConcurrentHashMap(); + protected final ConcurrentMap name2PubSubConnection = PlatformDependent.newConcurrentHashMap(); protected MasterSlaveServersConfig config; @@ -395,28 +396,24 @@ public void subscribe(final RedisPubSubListener listener, final String channelNa final int slot = 0; Future connFuture = nextPubSubConnection(slot); - connFuture.addListener(new FutureListener() { - @Override - public void operationComplete(Future future) throws Exception { - RedisPubSubConnection conn = future.getNow(); - PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); - entry.tryAcquire(); - PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); - if (oldEntry != null) { - releaseSubscribeConnection(slot, entry); - return; - } - synchronized (entry) { - if (!entry.isActive()) { - entry.release(); - subscribe(listener, channelName); - return; - } - entry.subscribe(codec, listener, channelName); - return; - } + connFuture.syncUninterruptibly(); + RedisPubSubConnection conn = connFuture.getNow(); + PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection()); + entry.tryAcquire(); + PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry); + if (oldEntry != null) { + releaseSubscribeConnection(slot, entry); + return; + } + synchronized (entry) { + if (!entry.isActive()) { + entry.release(); + subscribe(listener, channelName); + return; } - }).syncUninterruptibly(); + entry.subscribe(codec, listener, channelName); + return; + } } diff --git a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java index 48f66a0f73d..4a2c88982e9 100644 --- a/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java +++ b/src/main/java/org/redisson/connection/SubscribesConnectionEntry.java @@ -23,6 +23,9 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisPubSubConnection; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + public class SubscribesConnectionEntry extends ConnectionEntry { private final Queue allSubscribeConnections = new ConcurrentLinkedQueue(); @@ -65,10 +68,19 @@ public void releaseSubscribeConnection() { connectionsCounter.incrementAndGet(); } - public RedisPubSubConnection connectPubSub(MasterSlaveServersConfig config) { - RedisPubSubConnection conn = super.connectPubSub(config); - allSubscribeConnections.add(conn); - return conn; + public Future connectPubSub(MasterSlaveServersConfig config) { + Future future = super.connectPubSub(config); + future.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + return; + } + RedisPubSubConnection conn = future.getNow(); + allSubscribeConnections.add(conn); + } + }); + return future; } diff --git a/src/main/java/org/redisson/misc/ConnectionPool.java b/src/main/java/org/redisson/misc/ConnectionPool.java index 7c0c3603ad7..568e0c2830f 100644 --- a/src/main/java/org/redisson/misc/ConnectionPool.java +++ b/src/main/java/org/redisson/misc/ConnectionPool.java @@ -16,20 +16,20 @@ package org.redisson.misc; import java.util.List; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import org.redisson.MasterSlaveServersConfig; import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnectionException; -import org.redisson.client.RedisException; import org.redisson.connection.LoadBalancer; import org.redisson.connection.SubscribesConnectionEntry; import io.netty.channel.EventLoopGroup; import io.netty.util.concurrent.EventExecutor; import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.Promise; -import io.netty.util.internal.OneTimeTask; public class ConnectionPool { @@ -41,6 +41,8 @@ public class ConnectionPool { LoadBalancer loadBalancer; + final ConcurrentLinkedQueue> promises = new ConcurrentLinkedQueue>(); + public ConnectionPool(MasterSlaveServersConfig config, LoadBalancer loadBalancer, EventLoopGroup eventLoopGroup) { this.config = config; this.loadBalancer = loadBalancer; @@ -49,6 +51,7 @@ public ConnectionPool(MasterSlaveServersConfig config, LoadBalancer loadBalancer public void add(SubscribesConnectionEntry entry) { entries.add(entry); + handleQueue(entry); } public void remove(SubscribesConnectionEntry entry) { @@ -70,8 +73,9 @@ public Future get() { } } - RedisConnectionException exception = new RedisConnectionException("Connection pool exhausted!"); - return executor.newFailedFuture(exception); + Promise promise = executor.newPromise(); + promises.add(promise); + return promise; } public Future get(SubscribesConnectionEntry entry) { @@ -93,35 +97,36 @@ protected T poll(SubscribesConnectionEntry entry) { return (T) entry.pollConnection(); } - protected T connect(SubscribesConnectionEntry entry) { - return (T) entry.connect(config); + protected Future connect(SubscribesConnectionEntry entry) { + return (Future) entry.connect(config); } - private Future connect(final SubscribesConnectionEntry entry, final Promise promise) { + private void connect(final SubscribesConnectionEntry entry, final Promise promise) { T conn = poll(entry); if (conn != null) { if (!promise.trySuccess(conn)) { releaseConnection(entry, conn); releaseConnection(entry); } - } else { - executor.execute(new OneTimeTask() { - @Override - public void run() { - try { - T conn = connect(entry); - if (!promise.trySuccess(conn)) { - releaseConnection(entry, conn); - releaseConnection(entry); - } - } catch (RedisException e) { - releaseConnection(entry); - promise.setFailure(e); - } - } - }); + return; } - return promise; + + Future connFuture = connect(entry); + connFuture.addListener(new FutureListener() { + @Override + public void operationComplete(Future future) throws Exception { + if (!future.isSuccess()) { + releaseConnection(entry); + promise.setFailure(future.cause()); + return; + } + T conn = future.getNow(); + if (!promise.trySuccess(conn)) { + releaseConnection(entry, conn); + releaseConnection(entry); + } + } + }); } public void returnConnection(SubscribesConnectionEntry entry, T connection) { @@ -138,6 +143,19 @@ public void returnConnection(SubscribesConnectionEntry entry, T connection) { protected void releaseConnection(SubscribesConnectionEntry entry) { entry.releaseConnection(); + + handleQueue(entry); + } + + private void handleQueue(SubscribesConnectionEntry entry) { + Promise promise = promises.poll(); + if (promise != null) { + if (!entry.isFreezed() && tryAcquireConnection(entry)) { + connect(entry, promise); + } else { + promises.add(promise); + } + } } protected void releaseConnection(SubscribesConnectionEntry entry, T conn) { diff --git a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java index d5119087c74..67f6fa8ddef 100644 --- a/src/main/java/org/redisson/misc/PubSubConnectionPoll.java +++ b/src/main/java/org/redisson/misc/PubSubConnectionPoll.java @@ -21,6 +21,7 @@ import org.redisson.connection.SubscribesConnectionEntry; import io.netty.channel.EventLoopGroup; +import io.netty.util.concurrent.Future; public class PubSubConnectionPoll extends ConnectionPool { @@ -35,7 +36,7 @@ protected RedisPubSubConnection poll(SubscribesConnectionEntry entry) { } @Override - protected RedisPubSubConnection connect(SubscribesConnectionEntry entry) { + protected Future connect(SubscribesConnectionEntry entry) { return entry.connectPubSub(config); }