Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
Merge pull request hector-client#237 from sbridges/afdf30b87588a9cf4b…
Browse files Browse the repository at this point in the history
…42b82993d23c20cf3496c2

fix race in numBlocked
  • Loading branch information
Nate McCall committed Jul 12, 2011
2 parents 3e12206 + afdf30b commit 072b257
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 59 deletions.
Expand Up @@ -96,27 +96,22 @@ public void run() {
log.debug("Retry service fired... nothing to do.");
return;
}
Set<CassandraHost> alreadyTried = Sets.newHashSet();
Iterator<CassandraHost> iter = downedHostQueue.iterator();
while( iter.hasNext() ) {
CassandraHost cassandraHost = iter.next();
if( cassandraHost == null ) {
continue;
}
if( !alreadyTried.add(cassandraHost) ) {
iter.remove();
continue;
}
boolean reconnected = verifyConnection(cassandraHost);
log.info("Downed Host retry status {} with host: {}", reconnected, cassandraHost.getName());
if ( reconnected ) {
connectionManager.addCassandraHost(cassandraHost);
//we can't call iter.remove() based on return value of connectionManager.addCassandraHost, since
//that returns false if an error occurs, or if the host already exists
if(connectionManager.getHosts().contains(cassandraHost)) {
iter.remove();
}
}
CassandraHost cassandraHost = iter.next();
if( cassandraHost == null ) {
continue;
}
boolean reconnected = verifyConnection(cassandraHost);
log.info("Downed Host retry status {} with host: {}", reconnected, cassandraHost.getName());
if ( reconnected ) {
connectionManager.addCassandraHost(cassandraHost);
//we can't call iter.remove() based on return value of connectionManager.addCassandraHost, since
//that returns false if an error occurs, or if the host already exists
if(connectionManager.getHosts().contains(cassandraHost)) {
iter.remove();
}
}
}
}

Expand Down
Expand Up @@ -61,51 +61,54 @@ public HThriftClient borrowClient() throws HectorException {
int tillExhausted = cassandraHost.getMaxActive() - currentActive;

numBlocked.incrementAndGet();
cassandraClient = availableClientQueue.poll();
if ( cassandraClient == null ) {
if ( tillExhausted > 0 ) {
// if we start with #of threads == getMaxActive, we could trigger this condition
// replace addClientToPoolGently(new HThriftClient(cassandraHost).open()) with immediate acquisition
return greedyCreate();
}
// blocked take on the queue if we are configured to wait forever
if ( log.isDebugEnabled() ) {
log.debug("blocking on queue - current block count {}", numBlocked.get());
}
// wait and catch, creating a new one if the counts have changed. Infinite wait should just recurse.
if ( maxWaitTimeWhenExhausted == 0 ) {
while (cassandraClient == null && active.get() ) {
try
{
cassandraClient = availableClientQueue.poll();
if ( cassandraClient == null ) {
if ( tillExhausted > 0 ) {
// if we start with #of threads == getMaxActive, we could trigger this condition
// replace addClientToPoolGently(new HThriftClient(cassandraHost).open()) with immediate acquisition
return greedyCreate();
}
// blocked take on the queue if we are configured to wait forever
if ( log.isDebugEnabled() ) {
log.debug("blocking on queue - current block count {}", numBlocked.get());
}
// wait and catch, creating a new one if the counts have changed. Infinite wait should just recurse.
if ( maxWaitTimeWhenExhausted == 0 ) {
while (cassandraClient == null && active.get() ) {
try {
cassandraClient = availableClientQueue.poll(100, TimeUnit.MILLISECONDS);
} catch (InterruptedException ie) {
log.error("InterruptedException poll operation on retry forever", ie);
break;
}
}
} else {

try {
cassandraClient = availableClientQueue.poll(100, TimeUnit.MILLISECONDS);
cassandraClient = availableClientQueue.poll(maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS);
if ( cassandraClient == null ) {
throw new PoolExhaustedException(String.format("maxWaitTimeWhenExhausted exceeded for thread %s on host %s",
new Object[]{
Thread.currentThread().getName(),
cassandraHost.getName()}
));
}
} catch (InterruptedException ie) {
log.error("InterruptedException poll operation on retry forever", ie);
break;
}
}
} else {

try {
cassandraClient = availableClientQueue.poll(maxWaitTimeWhenExhausted, TimeUnit.MILLISECONDS);
if ( cassandraClient == null ) {
numBlocked.decrementAndGet();
throw new PoolExhaustedException(String.format("maxWaitTimeWhenExhausted exceeded for thread %s on host %s",
new Object[]{
Thread.currentThread().getName(),
cassandraHost.getName()}
));
//monitor.incCounter(Counter.POOL_EXHAUSTED);
log.error("Cassandra client acquisition interrupted",ie);
}
} catch (InterruptedException ie) {
//monitor.incCounter(Counter.POOL_EXHAUSTED);
log.error("Cassandra client acquisition interrupted",ie);
}

}

}
if ( cassandraClient == null ) {
throw new HectorException("HConnectionManager returned a null client after aquisition - are we shutting down?");
if ( cassandraClient == null ) {
throw new HectorException("HConnectionManager returned a null client after aquisition - are we shutting down?");
}
activeClients.add(cassandraClient);
} finally {
numBlocked.decrementAndGet();
}
activeClients.add(cassandraClient);
numBlocked.decrementAndGet();

return cassandraClient;
}
Expand All @@ -121,7 +124,6 @@ private HThriftClient greedyCreate() {
}
HThriftClient client = new HThriftClient(cassandraHost).open();
activeClients.add(client);
numBlocked.decrementAndGet();
return client;
}

Expand Down

0 comments on commit 072b257

Please sign in to comment.