Skip to content

Commit

Permalink
Remove pool exhausted exception #680
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita committed Oct 25, 2016
1 parent 64fc4d9 commit c73c760
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 96 deletions.
Expand Up @@ -27,23 +27,23 @@
import org.redisson.client.RedisPubSubConnection; import org.redisson.client.RedisPubSubConnection;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.misc.RPromise; import org.redisson.misc.RPromise;
import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


import io.netty.util.concurrent.Future; import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.FutureListener; import io.netty.util.concurrent.FutureListener;
import io.netty.util.concurrent.Promise;


public class ClientConnectionsEntry { public class ClientConnectionsEntry {


final Logger log = LoggerFactory.getLogger(getClass()); final Logger log = LoggerFactory.getLogger(getClass());


private final Queue<RedisPubSubConnection> allSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>(); private final Queue<RedisPubSubConnection> allSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>(); private final Queue<RedisPubSubConnection> freeSubscribeConnections = new ConcurrentLinkedQueue<RedisPubSubConnection>();
private final AtomicInteger freeSubscribeConnectionsCounter = new AtomicInteger(); private final AsyncSemaphore freeSubscribeConnectionsCounter;


private final Queue<RedisConnection> freeConnections = new ConcurrentLinkedQueue<RedisConnection>(); private final Queue<RedisConnection> freeConnections = new ConcurrentLinkedQueue<RedisConnection>();
private final AtomicInteger freeConnectionsCounter = new AtomicInteger(); private final AsyncSemaphore freeConnectionsCounter;


public enum FreezeReason {MANAGER, RECONNECT, SYSTEM} public enum FreezeReason {MANAGER, RECONNECT, SYSTEM}


Expand All @@ -59,10 +59,10 @@ public enum FreezeReason {MANAGER, RECONNECT, SYSTEM}
public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize, public ClientConnectionsEntry(RedisClient client, int poolMinSize, int poolMaxSize, int subscribePoolMinSize, int subscribePoolMaxSize,
ConnectionManager connectionManager, NodeType serverMode) { ConnectionManager connectionManager, NodeType serverMode) {
this.client = client; this.client = client;
this.freeConnectionsCounter.set(poolMaxSize); this.freeConnectionsCounter = new AsyncSemaphore(poolMaxSize);
this.connectionManager = connectionManager; this.connectionManager = connectionManager;
this.nodeType = serverMode; this.nodeType = serverMode;
this.freeSubscribeConnectionsCounter.set(subscribePoolMaxSize); this.freeSubscribeConnectionsCounter = new AsyncSemaphore(subscribePoolMaxSize);


if (subscribePoolMaxSize > 0) { if (subscribePoolMaxSize > 0) {
connectionManager.getConnectionWatcher().add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter); connectionManager.getConnectionWatcher().add(subscribePoolMinSize, subscribePoolMaxSize, freeSubscribeConnections, freeSubscribeConnectionsCounter);
Expand Down Expand Up @@ -107,27 +107,15 @@ public void setFreezed(boolean freezed) {
} }


public int getFreeAmount() { public int getFreeAmount() {
return freeConnectionsCounter.get(); return freeConnectionsCounter.getCounter();
} }


private boolean tryAcquire(AtomicInteger counter) { public void acquireConnection(Runnable runnable) {
while (true) { freeConnectionsCounter.acquire(runnable);
int value = counter.get();
if (value == 0) {
return false;
}
if (counter.compareAndSet(value, value - 1)) {
return true;
}
}
}

public boolean tryAcquireConnection() {
return tryAcquire(freeConnectionsCounter);
} }


public void releaseConnection() { public void releaseConnection() {
freeConnectionsCounter.incrementAndGet(); freeConnectionsCounter.release();
} }


public RedisConnection pollConnection() { public RedisConnection pollConnection() {
Expand Down Expand Up @@ -228,12 +216,12 @@ public void releaseSubscribeConnection(RedisPubSubConnection connection) {
freeSubscribeConnections.add(connection); freeSubscribeConnections.add(connection);
} }


public boolean tryAcquireSubscribeConnection() { public void acquireSubscribeConnection(Runnable runnable) {
return tryAcquire(freeSubscribeConnectionsCounter); freeSubscribeConnectionsCounter.acquire(runnable);
} }


public void releaseSubscribeConnection() { public void releaseSubscribeConnection() {
freeSubscribeConnectionsCounter.incrementAndGet(); freeSubscribeConnectionsCounter.release();
} }


public boolean freezeMaster(FreezeReason reason) { public boolean freezeMaster(FreezeReason reason) {
Expand Down
Expand Up @@ -19,10 +19,10 @@
import java.util.Queue; import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;


import org.redisson.client.RedisConnection; import org.redisson.client.RedisConnection;
import org.redisson.config.MasterSlaveServersConfig; import org.redisson.config.MasterSlaveServersConfig;
import org.redisson.pubsub.AsyncSemaphore;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;


Expand All @@ -38,10 +38,10 @@ public static class Entry {


private final int minimumAmount; private final int minimumAmount;
private final int maximumAmount; private final int maximumAmount;
private final AtomicInteger freeConnectionsCounter; private final AsyncSemaphore freeConnectionsCounter;
private final Collection<? extends RedisConnection> connections; private final Collection<? extends RedisConnection> connections;


public Entry(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AtomicInteger freeConnectionsCounter) { public Entry(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AsyncSemaphore freeConnectionsCounter) {
super(); super();
this.minimumAmount = minimumAmount; this.minimumAmount = minimumAmount;
this.maximumAmount = maximumAmount; this.maximumAmount = maximumAmount;
Expand Down Expand Up @@ -84,10 +84,10 @@ public void operationComplete(Future<Void> future) throws Exception {
} }


private boolean validateAmount(Entry entry) { private boolean validateAmount(Entry entry) {
return entry.maximumAmount - entry.freeConnectionsCounter.get() + entry.connections.size() > entry.minimumAmount; return entry.maximumAmount - entry.freeConnectionsCounter.getCounter() + entry.connections.size() > entry.minimumAmount;
} }


public void add(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AtomicInteger freeConnectionsCounter) { public void add(int minimumAmount, int maximumAmount, Collection<? extends RedisConnection> connections, AsyncSemaphore freeConnectionsCounter) {
entries.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter)); entries.add(new Entry(minimumAmount, maximumAmount, connections, freeConnectionsCounter));
} }


Expand Down
Expand Up @@ -104,39 +104,51 @@ private void createConnection(final boolean checkFreezed, final AtomicInteger re
initPromise.tryFailure(cause); initPromise.tryFailure(cause);
return; return;
} }

RFuture<T> promise = createConnection(entry); acquireConnection(entry, new Runnable() {
promise.addListener(new FutureListener<T>() {
@Override @Override
public void operationComplete(Future<T> future) throws Exception { public void run() {
if (future.isSuccess()) { RFuture<T> promise = createConnection(entry, null);
T conn = future.getNow(); promise.addListener(new FutureListener<T>() {
releaseConnection(entry, conn); @Override
} public void operationComplete(Future<T> future) throws Exception {
if (future.isSuccess()) {
T conn = future.getNow();


releaseConnection(entry); releaseConnection(entry, conn);
}


if (!future.isSuccess()) { releaseConnection(entry);
Throwable cause = new RedisConnectionException(
"Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: "
+ entry.getClient().getAddr(), future.cause());
initPromise.tryFailure(cause);
return;
}


int value = initializedConnections.decrementAndGet(); if (!future.isSuccess()) {
if (value == 0) { Throwable cause = new RedisConnectionException(
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr()); "Can't init enough connections amount! Only " + (minimumIdleSize - initializedConnections.get()) + " from " + minimumIdleSize + " were initialized. Server: "
if (!initPromise.trySuccess(null)) { + entry.getClient().getAddr(), future.cause());
throw new IllegalStateException(); initPromise.tryFailure(cause);
} return;
} else if (value > 0 && !initPromise.isDone()) { }
if (requests.incrementAndGet() <= minimumIdleSize) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections); int value = initializedConnections.decrementAndGet();
if (value == 0) {
log.info("{} connections initialized for {}", minimumIdleSize, entry.getClient().getAddr());
if (!initPromise.trySuccess(null)) {
throw new IllegalStateException();
}
} else if (value > 0 && !initPromise.isDone()) {
if (requests.incrementAndGet() <= minimumIdleSize) {
createConnection(checkFreezed, requests, entry, initPromise, minimumIdleSize, initializedConnections);
}
}
} }
} });
} }
}); });

}

protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) {
entry.acquireConnection(runnable);
} }


protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry); protected abstract int getMinimumIdleSize(ClientConnectionsEntry entry);
Expand All @@ -148,27 +160,35 @@ protected ClientConnectionsEntry getEntry() {
public RFuture<T> get() { public RFuture<T> get() {
for (int j = entries.size() - 1; j >= 0; j--) { for (int j = entries.size() - 1; j >= 0; j--) {
ClientConnectionsEntry entry = getEntry(); ClientConnectionsEntry entry = getEntry();
if (!entry.isFreezed() && tryAcquireConnection(entry)) { if (!entry.isFreezed()
return connectTo(entry); && tryAcquireConnection(entry)) {
final RPromise<T> result = connectionManager.newPromise();
acquireConnection(entry, new Runnable() {
@Override
public void run() {
connectTo(entry, result);
}
});
return result;
} }
} }

List<InetSocketAddress> zeroConnectionsAmount = new LinkedList<InetSocketAddress>(); List<InetSocketAddress> failedAttempts = new LinkedList<InetSocketAddress>();
List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>(); List<InetSocketAddress> freezed = new LinkedList<InetSocketAddress>();
for (ClientConnectionsEntry entry : entries) { for (ClientConnectionsEntry entry : entries) {
if (entry.isFreezed()) { if (entry.isFreezed()) {
freezed.add(entry.getClient().getAddr()); freezed.add(entry.getClient().getAddr());
} else { } else {
zeroConnectionsAmount.add(entry.getClient().getAddr()); failedAttempts.add(entry.getClient().getAddr());
} }
} }


StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " exhausted! "); StringBuilder errorMsg = new StringBuilder(getClass().getSimpleName() + " no available Redis entries. ");
if (!freezed.isEmpty()) { if (!freezed.isEmpty()) {
errorMsg.append(" Disconnected hosts: " + freezed); errorMsg.append(" Disconnected hosts: " + freezed);
} }
if (!zeroConnectionsAmount.isEmpty()) { if (!failedAttempts.isEmpty()) {
errorMsg.append(" Hosts with fully busy connections: " + zeroConnectionsAmount); errorMsg.append(" Hosts disconnected due to `failedAttempts` limit reached: " + failedAttempts);
} }


RedisConnectionException exception = new RedisConnectionException(errorMsg.toString()); RedisConnectionException exception = new RedisConnectionException(errorMsg.toString());
Expand All @@ -178,7 +198,9 @@ public RFuture<T> get() {
public RFuture<T> get(ClientConnectionsEntry entry) { public RFuture<T> get(ClientConnectionsEntry entry) {
if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed()) if (((entry.getNodeType() == NodeType.MASTER && entry.getFreezeReason() == FreezeReason.SYSTEM) || !entry.isFreezed())
&& tryAcquireConnection(entry)) { && tryAcquireConnection(entry)) {
return connectTo(entry); RPromise<T> result = connectionManager.newPromise();
connectTo(entry, result);
return result;
} }


RedisConnectionException exception = new RedisConnectionException( RedisConnectionException exception = new RedisConnectionException(
Expand All @@ -187,7 +209,7 @@ && tryAcquireConnection(entry)) {
} }


protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { protected boolean tryAcquireConnection(ClientConnectionsEntry entry) {
return entry.getFailedAttempts() < config.getFailedAttempts() && entry.tryAcquireConnection(); return entry.getFailedAttempts() < config.getFailedAttempts();
} }


protected T poll(ClientConnectionsEntry entry) { protected T poll(ClientConnectionsEntry entry) {
Expand All @@ -198,21 +220,29 @@ protected RFuture<T> connect(ClientConnectionsEntry entry) {
return (RFuture<T>) entry.connect(); return (RFuture<T>) entry.connect();
} }


private RFuture<T> connectTo(ClientConnectionsEntry entry) { private void connectTo(ClientConnectionsEntry entry, RPromise<T> promise) {
T conn = poll(entry); T conn = poll(entry);
if (conn != null) { if (conn != null) {
if (!conn.isActive()) { if (!conn.isActive()) {
return promiseFailure(entry, conn); promiseFailure(entry, promise, conn);
return;
} }


return promiseSuccessful(entry, conn); entry.resetFailedAttempts();
promise.trySuccess(conn);
return;
} }


return createConnection(entry); createConnection(entry, promise);
} }


private RFuture<T> createConnection(final ClientConnectionsEntry entry) { private RFuture<T> createConnection(final ClientConnectionsEntry entry, RPromise<T> ppromise) {
final RPromise<T> promise = connectionManager.newPromise(); final RPromise<T> promise;
if (ppromise != null) {
promise = ppromise;
} else {
promise = connectionManager.newPromise();
}
RFuture<T> connFuture = connect(entry); RFuture<T> connFuture = connect(entry);
connFuture.addListener(new FutureListener<T>() { connFuture.addListener(new FutureListener<T>() {
@Override @Override
Expand Down Expand Up @@ -242,11 +272,6 @@ private void connectedSuccessful(ClientConnectionsEntry entry, RPromise<T> promi
} }
} }


private RFuture<T> promiseSuccessful(ClientConnectionsEntry entry, T conn) {
entry.resetFailedAttempts();
return (RFuture<T>) conn.getAcquireFuture();
}

private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, Throwable cause) { private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, Throwable cause) {
if (entry.incFailedAttempts() == config.getFailedAttempts()) { if (entry.incFailedAttempts() == config.getFailedAttempts()) {
checkForReconnect(entry); checkForReconnect(entry);
Expand Down Expand Up @@ -274,23 +299,6 @@ private void promiseFailure(ClientConnectionsEntry entry, RPromise<T> promise, T
promise.tryFailure(cause); promise.tryFailure(cause);
} }


private RFuture<T> promiseFailure(ClientConnectionsEntry entry, T conn) {
int attempts = entry.incFailedAttempts();
if (attempts == config.getFailedAttempts()) {
conn.closeAsync();
checkForReconnect(entry);
} else if (attempts < config.getFailedAttempts()) {
releaseConnection(entry, conn);
} else {
conn.closeAsync();
}

releaseConnection(entry);

RedisConnectionException cause = new RedisConnectionException(conn + " is not active!");
return connectionManager.newFailedFuture(cause);
}

private void checkForReconnect(ClientConnectionsEntry entry) { private void checkForReconnect(ClientConnectionsEntry entry) {
if (entry.getNodeType() == NodeType.SLAVE) { if (entry.getNodeType() == NodeType.SLAVE) {
masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(), masterSlaveEntry.slaveDown(entry.getClient().getAddr().getHostName(),
Expand Down
Expand Up @@ -50,10 +50,10 @@ protected RFuture<RedisPubSubConnection> connect(ClientConnectionsEntry entry) {
} }


@Override @Override
protected boolean tryAcquireConnection(ClientConnectionsEntry entry) { protected void acquireConnection(ClientConnectionsEntry entry, Runnable runnable) {
return entry.tryAcquireSubscribeConnection(); entry.acquireSubscribeConnection(runnable);
} }

@Override @Override
protected void releaseConnection(ClientConnectionsEntry entry) { protected void releaseConnection(ClientConnectionsEntry entry) {
entry.releaseSubscribeConnection(); entry.releaseSubscribeConnection();
Expand Down
Expand Up @@ -74,6 +74,10 @@ public boolean remove(Runnable listener) {
} }
} }


public int getCounter() {
return counter;
}

public void release() { public void release() {
Runnable runnable = null; Runnable runnable = null;


Expand Down

0 comments on commit c73c760

Please sign in to comment.