Permalink
Browse files

Address Issue:257. Connection pool race condition

  • Loading branch information...
1 parent 76f2a53 commit c87e2d81234c49ea20c3dcc010d1da17421dea70 @patricioe patricioe committed Sep 3, 2011
Showing with 77 additions and 60 deletions.
  1. +77 −60 core/src/main/java/me/prettyprint/cassandra/connection/ConcurrentHClientPool.java
@@ -20,12 +20,13 @@
private static final Logger log = LoggerFactory.getLogger(ConcurrentHClientPool.class);
- private static final Object PRESENT = new Object();
private final ArrayBlockingQueue<HThriftClient> availableClientQueue;
- private final ConcurrentMap<HThriftClient, Object> activeClients;
+ private final AtomicInteger activeClientsCount;
+ private final AtomicInteger realActiveClientsCount;
private final CassandraHost cassandraHost;
- //private final CassandraClientMonitor monitor;
+
+ /** Total threads waiting for connections */
private final AtomicInteger numBlocked;
private final AtomicBoolean active;
@@ -35,15 +36,18 @@ public ConcurrentHClientPool(CassandraHost host) {
this.cassandraHost = host;
availableClientQueue = new ArrayBlockingQueue<HThriftClient>(cassandraHost.getMaxActive(), true);
- activeClients = new ConcurrentHashMap<HThriftClient, Object>();
+ // This counter can be offset by as much as the number of threads.
+ activeClientsCount = new AtomicInteger(0);
+ realActiveClientsCount = new AtomicInteger(0);
numBlocked = new AtomicInteger();
active = new AtomicBoolean(true);
maxWaitTimeWhenExhausted = cassandraHost.getMaxWaitTimeWhenExhausted() < 0 ? 0 : cassandraHost.getMaxWaitTimeWhenExhausted();
- for (int i = 0; i < cassandraHost.getMaxActive()/3; i++) {
+ for (int i = 0; i < cassandraHost.getMaxActive() / 3; i++) {
availableClientQueue.add(new HThriftClient(cassandraHost).open());
}
+
if ( log.isDebugEnabled() ) {
log.debug("Concurrent Host pool started with {} active clients; max: {} exhausted wait: {}",
new Object[]{getNumIdle(),
@@ -53,80 +57,91 @@ public ConcurrentHClientPool(CassandraHost host) {
}
-@Override
-public HThriftClient borrowClient() throws HectorException {
+ @Override
+ public HThriftClient borrowClient() throws HectorException {
if ( !active.get() ) {
throw new HectorException("Attempt to borrow on in-active pool: " + getName());
}
- HThriftClient cassandraClient;
- int currentActive = activeClients.size();
- int tillExhausted = cassandraHost.getMaxActive() - currentActive;
- numBlocked.incrementAndGet();
- try
- {
- cassandraClient = availableClientQueue.poll();
+ HThriftClient cassandraClient = availableClientQueue.poll();
+ int currentActiveClients = activeClientsCount.incrementAndGet();
+
+ try {
+
if ( cassandraClient == null ) {
- if ( tillExhausted > 0 ) {
- // if we start with #of threads == getMaxActive, we could trigger this condition
- // replace addClientToPoolGently(new HThriftClient(cassandraHost).open()) with immediate acquisition
- return greedyCreate();
- }
- // blocked take on the queue if we are configured to wait forever
- if ( log.isDebugEnabled() ) {
- log.debug("blocking on queue - current block count {}", numBlocked.get());
- }
- // wait and catch, creating a new one if the counts have changed. Infinite wait should just recurse.
- if ( maxWaitTimeWhenExhausted == 0 ) {
- while (cassandraClient == null && active.get() ) {
- try {
- cassandraClient = availableClientQueue.poll(100, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ie) {
- log.error("InterruptedException poll operation on retry forever", ie);
- break;
- }
- }
+
+ if (currentActiveClients <= cassandraHost.getMaxActive()) {
+ cassandraClient = createClient();
} else {
-
- try {
- cassandraClient = availableClientQueue.poll(maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS);
- if ( cassandraClient == null ) {
- throw new PoolExhaustedException(String.format("maxWaitTimeWhenExhausted exceeded for thread %s on host %s",
- new Object[]{
- Thread.currentThread().getName(),
- cassandraHost.getName()}
- ));
- }
- } catch (InterruptedException ie) {
- //monitor.incCounter(Counter.POOL_EXHAUSTED);
- log.error("Cassandra client acquisition interrupted",ie);
- }
+ // We can't grow so let's wait for a connection to become available.
+ cassandraClient = waitForConnection();
}
-
+
}
+
if ( cassandraClient == null ) {
throw new HectorException("HConnectionManager returned a null client after aquisition - are we shutting down?");
}
- activeClients.put(cassandraClient, PRESENT);
+ } catch (RuntimeException e) {
+ activeClientsCount.decrementAndGet();
+ }
+
+ realActiveClientsCount.incrementAndGet();
+ return cassandraClient;
+ }
+
+
+ private HThriftClient waitForConnection() {
+ HThriftClient cassandraClient = null;
+ numBlocked.incrementAndGet();
+
+ // blocked take on the queue if we are configured to wait forever
+ if ( log.isDebugEnabled() ) {
+ log.debug("blocking on queue - current block count {}", numBlocked.get());
+ }
+
+ try {
+ // wait and catch, creating a new one if the counts have changed. Infinite wait should just recurse.
+ if (maxWaitTimeWhenExhausted == 0) {
+ while (cassandraClient == null && active.get()) {
+ try {
+ cassandraClient = availableClientQueue.poll(100, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ie) {
+ log.error("InterruptedException poll operation on retry forever", ie);
+ break;
+ }
+ }
+ } else {
+ try {
+ cassandraClient = availableClientQueue.poll(maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS);
+ if (cassandraClient == null) {
+ throw new PoolExhaustedException(String.format(
+ "maxWaitTimeWhenExhausted exceeded for thread %s on host %s",
+ new Object[] { Thread.currentThread().getName(), cassandraHost.getName() }));
+ }
+ } catch (InterruptedException ie) {
+ // monitor.incCounter(Counter.POOL_EXHAUSTED);
+ log.error("Cassandra client acquisition interrupted", ie);
+ }
+ }
} finally {
numBlocked.decrementAndGet();
}
return cassandraClient;
}
-
- /**
+
+
+/**
* Used when we still have room to grow. Return an HThriftClient without
* having to wait on polling logic. (But still increment all the counters)
* @return
*/
- private HThriftClient greedyCreate() {
+ private HThriftClient createClient() {
if ( log.isDebugEnabled() ) {
- log.debug("Greedy creation of new client");
+ log.debug("Creation of new client");
}
- HThriftClient client = new HThriftClient(cassandraHost).open();
- activeClients.put(client, PRESENT);
- return client;
+ return new HThriftClient(cassandraHost).open();
}
/**
@@ -165,13 +180,13 @@ public String getName() {
@Override
public int getNumActive() {
- return activeClients.size();
+ return realActiveClientsCount.get();
}
@Override
public int getNumBeforeExhausted() {
- return cassandraHost.getMaxActive() - activeClients.size();
+ return cassandraHost.getMaxActive() - realActiveClientsCount.get();
}
@@ -210,7 +225,6 @@ public String getStatusAsString() {
@Override
public void releaseClient(HThriftClient client) throws HectorException {
- activeClients.remove(client);
boolean open = client.isOpen();
if ( open ) {
if ( active.get() ) {
@@ -223,6 +237,9 @@ public void releaseClient(HThriftClient client) throws HectorException {
addClientToPoolGently(new HThriftClient(cassandraHost).open());
}
+ realActiveClientsCount.decrementAndGet();
+ activeClientsCount.decrementAndGet();
+
if ( log.isDebugEnabled() ) {
log.debug("Status of releaseClient {} to queue: {}", client.toString(), open);
}
@@ -238,7 +255,7 @@ private void addClientToPoolGently(HThriftClient client) {
try {
availableClientQueue.add(client);
} catch (IllegalStateException ise) {
- log.error("Capacity hit adding client back to queue. Closing extra.");
+ log.error("Capacity hit adding client back to queue. Closing extra");
client.close();
}
}

0 comments on commit c87e2d8

Please sign in to comment.