Skip to content

Commit

Permalink
Connection pool further optimization. #223
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Oct 19, 2015
1 parent 6bde9d2 commit df928fe
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 71 deletions.
21 changes: 19 additions & 2 deletions src/main/java/org/redisson/client/RedisClient.java
Expand Up @@ -88,7 +88,7 @@ public RedisConnection connect() {
future.syncUninterruptibly();
return new RedisConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);
throw new RedisConnectionException("Unable to connect to " + addr, e);
}
}

Expand All @@ -115,10 +115,27 @@ public RedisPubSubConnection connectPubSub() {
future.syncUninterruptibly();
return new RedisPubSubConnection(this, future.channel());
} catch (Exception e) {
throw new RedisConnectionException("unable to connect", e);
throw new RedisConnectionException("Unable to connect to " + addr, e);
}
}

public Future<RedisPubSubConnection> connectPubSubAsync() {
final Promise<RedisPubSubConnection> f = bootstrap.group().next().newPromise();
ChannelFuture channelFuture = bootstrap.connect();
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
RedisPubSubConnection c = new RedisPubSubConnection(RedisClient.this, future.channel());
f.setSuccess(c);
} else {
f.setFailure(future.cause());
}
}
});
return f;
}

public void shutdown() {
shutdownAsync().syncUninterruptibly();
}
Expand Down
55 changes: 37 additions & 18 deletions src/main/java/org/redisson/connection/ConnectionEntry.java
Expand Up @@ -28,6 +28,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

public class ConnectionEntry {

final Logger log = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -82,19 +85,27 @@ public void releaseConnection(RedisConnection connection) {
connections.add(connection);
}

public RedisConnection connect(final MasterSlaveServersConfig config) {
RedisConnection conn = client.connect();
log.debug("new connection created: {}", conn);

prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
public Future<RedisConnection> connect(final MasterSlaveServersConfig config) {
Future<RedisConnection> future = client.connectAsync();
future.addListener(new FutureListener<RedisConnection>() {
@Override
public void onReconnect(RedisConnection conn) {
public void operationComplete(Future<RedisConnection> future) throws Exception {
if (!future.isSuccess()) {
return;
}
RedisConnection conn = future.getNow();
log.debug("new connection created: {}", conn);

prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
}
});
}
});

return conn;
return future;
}

private void prepareConnection(MasterSlaveServersConfig config, RedisConnection conn) {
Expand All @@ -109,19 +120,27 @@ private void prepareConnection(MasterSlaveServersConfig config, RedisConnection
}
}

public RedisPubSubConnection connectPubSub(final MasterSlaveServersConfig config) {
RedisPubSubConnection conn = client.connectPubSub();
log.debug("new pubsub connection created: {}", conn);

prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
public Future<RedisPubSubConnection> connectPubSub(final MasterSlaveServersConfig config) {
Future<RedisPubSubConnection> future = client.connectPubSubAsync();
future.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void onReconnect(RedisConnection conn) {
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
return;
}
RedisPubSubConnection conn = future.getNow();
log.debug("new pubsub connection created: {}", conn);

prepareConnection(config, conn);
conn.setReconnectListener(new ReconnectListener() {
@Override
public void onReconnect(RedisConnection conn) {
prepareConnection(config, conn);
}
});
}
});

return conn;
return future;
}

@Override
Expand Down
Expand Up @@ -52,6 +52,7 @@
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.PlatformDependent;

/**
*
Expand All @@ -73,7 +74,7 @@ public class MasterSlaveConnectionManager implements ConnectionManager {

protected Class<? extends SocketChannel> socketChannelClass;

protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = new ConcurrentHashMap<String, PubSubConnectionEntry>();
protected final ConcurrentMap<String, PubSubConnectionEntry> name2PubSubConnection = PlatformDependent.newConcurrentHashMap();

protected MasterSlaveServersConfig config;

Expand Down Expand Up @@ -395,28 +396,24 @@ public void subscribe(final RedisPubSubListener listener, final String channelNa

final int slot = 0;
Future<RedisPubSubConnection> connFuture = nextPubSubConnection(slot);
connFuture.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
RedisPubSubConnection conn = future.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
return;
}
entry.subscribe(codec, listener, channelName);
return;
}
connFuture.syncUninterruptibly();
RedisPubSubConnection conn = connFuture.getNow();
PubSubConnectionEntry entry = new PubSubConnectionEntry(conn, config.getSubscriptionsPerConnection());
entry.tryAcquire();
PubSubConnectionEntry oldEntry = name2PubSubConnection.putIfAbsent(channelName, entry);
if (oldEntry != null) {
releaseSubscribeConnection(slot, entry);
return;
}
synchronized (entry) {
if (!entry.isActive()) {
entry.release();
subscribe(listener, channelName);
return;
}
}).syncUninterruptibly();
entry.subscribe(codec, listener, channelName);
return;
}

}

Expand Down
Expand Up @@ -23,6 +23,9 @@
import org.redisson.client.RedisClient;
import org.redisson.client.RedisPubSubConnection;

import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;

public class SubscribesConnectionEntry extends ConnectionEntry {

private final Queue<RedisPubSubConnection> allSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
Expand Down Expand Up @@ -65,10 +68,19 @@ public void releaseSubscribeConnection() {
connectionsCounter.incrementAndGet();
}

public RedisPubSubConnection connectPubSub(MasterSlaveServersConfig config) {
RedisPubSubConnection conn = super.connectPubSub(config);
allSubscribeConnections.add(conn);
return conn;
public Future<RedisPubSubConnection> connectPubSub(MasterSlaveServersConfig config) {
Future<RedisPubSubConnection> future = super.connectPubSub(config);
future.addListener(new FutureListener<RedisPubSubConnection>() {
@Override
public void operationComplete(Future<RedisPubSubConnection> future) throws Exception {
if (!future.isSuccess()) {
return;
}
RedisPubSubConnection conn = future.getNow();
allSubscribeConnections.add(conn);
}
});
return future;
}


Expand Down
66 changes: 42 additions & 24 deletions src/main/java/org/redisson/misc/ConnectionPool.java
Expand Up @@ -16,20 +16,20 @@
package org.redisson.misc;

import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;

import org.redisson.MasterSlaveServersConfig;
import org.redisson.client.RedisConnection;
import org.redisson.client.RedisConnectionException;
import org.redisson.client.RedisException;
import org.redisson.connection.LoadBalancer;
import org.redisson.connection.SubscribesConnectionEntry;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.EventExecutor;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;
import io.netty.util.internal.OneTimeTask;

public class ConnectionPool<T extends RedisConnection> {

Expand All @@ -41,6 +41,8 @@ public class ConnectionPool<T extends RedisConnection> {

LoadBalancer loadBalancer;

final ConcurrentLinkedQueue<Promise<T>> promises = new ConcurrentLinkedQueue<Promise<T>>();

public ConnectionPool(MasterSlaveServersConfig config, LoadBalancer loadBalancer, EventLoopGroup eventLoopGroup) {
this.config = config;
this.loadBalancer = loadBalancer;
Expand All @@ -49,6 +51,7 @@ public ConnectionPool(MasterSlaveServersConfig config, LoadBalancer loadBalancer

public void add(SubscribesConnectionEntry entry) {
entries.add(entry);
handleQueue(entry);
}

public void remove(SubscribesConnectionEntry entry) {
Expand All @@ -70,8 +73,9 @@ public Future<T> get() {
}
}

RedisConnectionException exception = new RedisConnectionException("Connection pool exhausted!");
return executor.newFailedFuture(exception);
Promise<T> promise = executor.newPromise();
promises.add(promise);
return promise;
}

public Future<T> get(SubscribesConnectionEntry entry) {
Expand All @@ -93,35 +97,36 @@ protected T poll(SubscribesConnectionEntry entry) {
return (T) entry.pollConnection();
}

protected T connect(SubscribesConnectionEntry entry) {
return (T) entry.connect(config);
protected Future<T> connect(SubscribesConnectionEntry entry) {
return (Future<T>) entry.connect(config);
}

private Future<T> connect(final SubscribesConnectionEntry entry, final Promise<T> promise) {
private void connect(final SubscribesConnectionEntry entry, final Promise<T> promise) {
T conn = poll(entry);
if (conn != null) {
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
releaseConnection(entry);
}
} else {
executor.execute(new OneTimeTask() {
@Override
public void run() {
try {
T conn = connect(entry);
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
releaseConnection(entry);
}
} catch (RedisException e) {
releaseConnection(entry);
promise.setFailure(e);
}
}
});
return;
}
return promise;

Future<T> connFuture = connect(entry);
connFuture.addListener(new FutureListener<T>() {
@Override
public void operationComplete(Future<T> future) throws Exception {
if (!future.isSuccess()) {
releaseConnection(entry);
promise.setFailure(future.cause());
return;
}
T conn = future.getNow();
if (!promise.trySuccess(conn)) {
releaseConnection(entry, conn);
releaseConnection(entry);
}
}
});
}

public void returnConnection(SubscribesConnectionEntry entry, T connection) {
Expand All @@ -138,6 +143,19 @@ public void returnConnection(SubscribesConnectionEntry entry, T connection) {

protected void releaseConnection(SubscribesConnectionEntry entry) {
entry.releaseConnection();

handleQueue(entry);
}

private void handleQueue(SubscribesConnectionEntry entry) {
Promise<T> promise = promises.poll();
if (promise != null) {
if (!entry.isFreezed() && tryAcquireConnection(entry)) {
connect(entry, promise);
} else {
promises.add(promise);
}
}
}

protected void releaseConnection(SubscribesConnectionEntry entry, T conn) {
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/org/redisson/misc/PubSubConnectionPoll.java
Expand Up @@ -21,6 +21,7 @@
import org.redisson.connection.SubscribesConnectionEntry;

import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.Future;

public class PubSubConnectionPoll extends ConnectionPool<RedisPubSubConnection> {

Expand All @@ -35,7 +36,7 @@ protected RedisPubSubConnection poll(SubscribesConnectionEntry entry) {
}

@Override
protected RedisPubSubConnection connect(SubscribesConnectionEntry entry) {
protected Future<RedisPubSubConnection> connect(SubscribesConnectionEntry entry) {
return entry.connectPubSub(config);
}

Expand Down

0 comments on commit df928fe

Please sign in to comment.