Skip to content

Commit

Permalink
Initial commit of asynchronous checkouts.
Browse files Browse the repository at this point in the history
src/java/log4j.properties
- print out more concise timestamp (don't print the date)
- print thread at end of message

src/java/voldemort/utils/pool/KeyedResourcePool.java
- reverted all protected data members back to private
- refactored attemptGrow to do the size check to determine if it
  is worth trying to attempt to grow. This made the method more
  useful to subclasses and cleaned up the local member that
  called it.
- added an internalClose method that returns whether or not the
  caller is "the one thread" responsible for closing everything
  down

src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
- first complete implementation of this class
- Four TODOs left in the code for the sake of interim code review

test/integration/voldemort/nonblocking/E2ENonblockingCheckoutTest.java
- minor tweaks to the test
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent 51ab567 commit 3841081
Show file tree
Hide file tree
Showing 4 changed files with 221 additions and 128 deletions.
5 changes: 3 additions & 2 deletions src/java/log4j.properties
Expand Up @@ -5,7 +5,8 @@ log4j.rootLogger=INFO, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d %c] %p %m %n
# log4j.appender.stdout.layout.ConversionPattern=[%d %c] %p %m %n
log4j.appender.stdout.layout.ConversionPattern=[%d{ABSOLUTE} %c] %p %m [%t]%n

# Turn on all our debugging info
log4j.logger=INFO
Expand All @@ -16,4 +17,4 @@ log4j.logger.voldemort.server.niosocket=INFO
log4j.logger.voldemort.utils=INFO
log4j.logger.voldemort.client.rebalance=INFO
log4j.logger.voldemort.server=INFO
log4j.logger.krati=WARN
log4j.logger.krati=WARN
38 changes: 23 additions & 15 deletions src/java/voldemort/utils/pool/KeyedResourcePool.java
Expand Up @@ -29,10 +29,10 @@ public class KeyedResourcePool<K, V> {

private static final Logger logger = Logger.getLogger(KeyedResourcePool.class.getName());

protected final ResourceFactory<K, V> objectFactory;
private final ResourceFactory<K, V> objectFactory;
private final ConcurrentMap<K, Pool<V>> resourcePoolMap;
protected final AtomicBoolean isOpen = new AtomicBoolean(true);
protected final long timeoutNs;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final long timeoutNs;
private final int poolMaxSize;
private final boolean isFair;

Expand Down Expand Up @@ -127,10 +127,8 @@ public V checkout(K key) throws Exception {
protected V attemptCheckoutGrowCheckout(K key, Pool<V> pool) throws Exception {
V resource = attemptCheckout(pool);
if(resource == null) {
if(pool.size.get() < this.poolMaxSize) {
if(attemptGrow(key, pool)) {
resource = attemptCheckout(pool);
}
if(attemptGrow(key, pool)) {
resource = attemptCheckout(pool);
}
}

Expand All @@ -154,6 +152,10 @@ protected V attemptCheckout(Pool<V> pool) throws Exception {
* checkouts may occur.)
*/
protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
if(pool.size.get() >= this.poolMaxSize) {
// "fail fast" if not worth trying to grow the pool.
return false;
}
// attempt to increment, and if the incremented value is less
// than the pool size then create a new resource
if(pool.size.incrementAndGet() <= this.poolMaxSize) {
Expand Down Expand Up @@ -231,14 +233,10 @@ public void checkin(K key, V resource) throws Exception {
}
}

/**
* Close the pool. This will destroy all checked in resource immediately.
* Once closed all attempts to checkout a new resource will fail. All
* resources checked in after close is called will be immediately destroyed.
*/
public void close() {
protected boolean internalClose() {
boolean wasOpen = isOpen.compareAndSet(true, false);
// change state to false and allow one thread.
if(isOpen.compareAndSet(true, false)) {
if(wasOpen) {
for(Entry<K, Pool<V>> entry: resourcePoolMap.entrySet()) {
Pool<V> pool = entry.getValue();
// destroy each resource in the queue
Expand All @@ -247,6 +245,16 @@ public void close() {
resourcePoolMap.remove(entry.getKey());
}
}
return wasOpen;
}

/**
* Close the pool. This will destroy all checked in resource immediately.
* Once closed all attempts to checkout a new resource will fail. All
* resources checked in after close is called will be immediately destroyed.
*/
public void close() {
internalClose();
}

/**
Expand All @@ -259,7 +267,7 @@ public void close() {
public void close(K key) {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
List<V> list = resourcePool.close();
// destroy each resource currently in the queue
// destroy each resource currently in the pool
for(V value: list)
destroyResource(key, resourcePool, value);
}
Expand Down

0 comments on commit 3841081

Please sign in to comment.