Skip to content

Commit

Permalink
Clean up of KeyedResourcePool and significant hardening of the unit t…
Browse files Browse the repository at this point in the history
…est.

src/java/voldemort/utils/pool/KeyedResourcePool.java
- Documented the invariants (or lack thereof) guaranteed by this class.
- Documented this classes expectations of its users.
- Moved attemptGrow into the inner Pool class.
- Got rid of the attemptCheckoutGrowCheckout method. ;)

src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
- tweaked to match revised attemptGrow interface

test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java
- Added a bunch of 'negative' tests. I.e., they demonstrate
  non-desirable behavior of current KeyedResourcePool.
- Added a contention test that has many threads checkout,
  possibly invalidate, and then checkin resources for some key.
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent 3841081 commit ca6dcd7
Show file tree
Hide file tree
Showing 3 changed files with 366 additions and 92 deletions.
167 changes: 100 additions & 67 deletions src/java/voldemort/utils/pool/KeyedResourcePool.java
Expand Up @@ -14,7 +14,6 @@

import org.apache.log4j.Logger;

import voldemort.utils.Time;
import voldemort.utils.Utils;

/**
Expand All @@ -24,24 +23,44 @@
* <li>allocates resources in FIFO order
* <li>Pools are per key and there is no global maximum pool limit.
* </ul>
*
* Invariants that this implementation does not guarantee:
* <ul>
* <li>A checked in resource was previously checked out. (I.e., user can use
* ResourceFactory and then check in a resource that this pool did not create.)
* <li>A checked out resource is checked in at most once. (I.e., a user does not
* call check in on a checked out resource more than once.)
* <li>User no longer has a reference to a checked in resource. (I.e., user can
* keep using the resource after it invokes check in.)
* <li>A resource that is checked out is eventually either checked in or
* destroyed via objectFactory.destroy(). (I.e., a user can squat on a resource
* or let its reference to the resource lapse without checking the resource in
* or destroying the resource.)
* </ul>
*
* Phrased differently, the following is expected of the user of this class:
* <ul>
* <li>A checked out resource is checked in exactly once.
* <li>A resource that is checked in was previously checked out.
* <li>A resource that is checked in is never used again. / No reference is
* retained to a checked in resource.
* <li>Also, checkout is never called after close.
* </ul>
*/
public class KeyedResourcePool<K, V> {

private static final Logger logger = Logger.getLogger(KeyedResourcePool.class.getName());

private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final ResourceFactory<K, V> objectFactory;
private final ResourcePoolConfig resourcePoolConfig;
private final ConcurrentMap<K, Pool<V>> resourcePoolMap;
private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final long timeoutNs;
private final int poolMaxSize;
private final boolean isFair;

public KeyedResourcePool(ResourceFactory<K, V> objectFactory, ResourcePoolConfig config) {
public KeyedResourcePool(ResourceFactory<K, V> objectFactory,
ResourcePoolConfig resourcePoolConfig) {
this.objectFactory = Utils.notNull(objectFactory);
this.timeoutNs = Utils.notNull(config).getTimeout(TimeUnit.NANOSECONDS);
this.poolMaxSize = config.getMaxPoolSize();
this.resourcePoolConfig = Utils.notNull(resourcePoolConfig);
this.resourcePoolMap = new ConcurrentHashMap<K, Pool<V>>();
this.isFair = config.isFair();
}

/**
Expand Down Expand Up @@ -81,7 +100,6 @@ public static <K, V> KeyedResourcePool<K, V> create(ResourceFactory<K, V> factor
* timeout + object creation time or throw an exception. If an exception is
* thrown, resource is guaranteed to be destroyed.
*
*
* @param key The key to checkout the resource for
* @return The resource
*/
Expand All @@ -90,51 +108,36 @@ public V checkout(K key) throws Exception {

long startNs = System.nanoTime();
Pool<V> resourcePool = getResourcePoolForKey(key);
// Always attempt to grow. This protects against running out of
// resources because they were destroyed.
attemptGrow(key, resourcePool);

V resource = null;
try {
checkNotClosed();
resource = attemptCheckoutGrowCheckout(key, resourcePool);
resource = attemptCheckout(resourcePool);

if(resource == null) {
long timeRemainingNs = this.timeoutNs - (System.nanoTime() - startNs);
if(timeRemainingNs < 0)
throw new TimeoutException("Could not acquire resource in "
+ (this.timeoutNs / Time.NS_PER_MS) + " ms.");
long timeRemainingNs = resourcePoolConfig.getTimeout(TimeUnit.NANOSECONDS)
- (System.nanoTime() - startNs);
if(timeRemainingNs > 0)
resource = resourcePool.blockingGet(timeRemainingNs);

resource = resourcePool.blockingGet(timeoutNs);
if(resource == null) {
throw new TimeoutException("Timed out wait for resource after "
+ (timeoutNs / Time.NS_PER_MS) + " ms.");
}
if(resource == null)
throw new TimeoutException("Could not acquire resource in "
+ resourcePoolConfig.getTimeout(TimeUnit.MILLISECONDS)
+ " ms.");
}

if(!objectFactory.validate(key, resource))
throw new ExcessiveInvalidResourcesException(1);
} catch(Exception e) {
destroyResource(key, resourcePool, resource);
System.err.println(e.toString());
throw e;
}
return resource;
}

/*
* Checkout a free resource if one exists. If not, and there is space, try
* and create one. If you create one, try and checkout again. Returns null
* or a resource.
*/
protected V attemptCheckoutGrowCheckout(K key, Pool<V> pool) throws Exception {
V resource = attemptCheckout(pool);
if(resource == null) {
if(attemptGrow(key, pool)) {
resource = attemptCheckout(pool);
}
}

return resource;
}

/*
* Get a free resource if one exists. This method does not block. It either
* returns null or a resource.
Expand All @@ -152,25 +155,7 @@ protected V attemptCheckout(Pool<V> pool) throws Exception {
* checkouts may occur.)
*/
protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
if(pool.size.get() >= this.poolMaxSize) {
// "fail fast" if not worth trying to grow the pool.
return false;
}
// attempt to increment, and if the incremented value is less
// than the pool size then create a new resource
if(pool.size.incrementAndGet() <= this.poolMaxSize) {
try {
V resource = objectFactory.create(key);
pool.nonBlockingPut(resource);
} catch(Exception e) {
pool.size.decrementAndGet();
throw e;
}
} else {
pool.size.decrementAndGet();
return false;
}
return true;
return pool.attemptGrow(key, this.objectFactory);
}

/*
Expand All @@ -179,7 +164,7 @@ protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
protected Pool<V> getResourcePoolForKey(K key) {
Pool<V> resourcePool = resourcePoolMap.get(key);
if(resourcePool == null) {
resourcePool = new Pool<V>(this.poolMaxSize, this.isFair);
resourcePool = new Pool<V>(this.resourcePoolConfig);
resourcePoolMap.putIfAbsent(key, resourcePool);
resourcePool = resourcePoolMap.get(key);
}
Expand All @@ -200,15 +185,17 @@ protected Pool<V> getResourcePoolForExistingKey(K key) {

/*
* A safe wrapper to destroy the given resource that catches any user
* exceptions
* exceptions.
*/
protected void destroyResource(K key, Pool<V> resourcePool, V resource) {
if(resource != null) {
try {
objectFactory.destroy(key, resource);
} catch(Exception e) {
logger.error("Exception while destorying invalid resource:", e);
logger.error("Exception while destroying invalid resource:", e);
} finally {
// Assumes destroyed resource was in fact checked out of the
// pool.
resourcePool.size.decrementAndGet();
}
}
Expand All @@ -226,7 +213,7 @@ public void checkin(K key, V resource) throws Exception {
boolean success = resourcePool.nonBlockingPut(resource);
if(!success) {
destroyResource(key, resourcePool, resource);
throw new IllegalStateException("Checkin failed is the pool already full?");
throw new IllegalStateException("Checkin failed. Is the pool already full?");
}
} else {
destroyResource(key, resourcePool, resource);
Expand Down Expand Up @@ -320,8 +307,7 @@ public int getCheckedInResourceCount() {
for(Entry<K, Pool<V>> entry: this.resourcePoolMap.entrySet())
count += entry.getValue().queue.size();
// count is approximate in the case of concurrency since .queue.size()
// for
// various entries can change while other entries are being counted.
// for various entries can change while other entries are being counted.
return count;
}

Expand All @@ -335,15 +321,62 @@ protected void checkNotClosed() {
}

/**
* A simple pool that uses an ArrayBlockingQueue
* A fixed size pool that uses an ArrayBlockingQueue. The pool grows to no
* more than some specified maxPoolSize. The pool creates new resources in
* the face of existing resources being destroyed.
*
*/
protected static class Pool<V> {

final BlockingQueue<V> queue;
final AtomicInteger size = new AtomicInteger(0);
final private AtomicInteger size = new AtomicInteger(0);
final private int maxPoolSize;
final private BlockingQueue<V> queue;

public Pool(ResourcePoolConfig resourcePoolConfig) {
this.maxPoolSize = resourcePoolConfig.getMaxPoolSize();
queue = new ArrayBlockingQueue<V>(this.maxPoolSize, resourcePoolConfig.isFair());
}

public Pool(int defaultPoolSize, boolean isFair) {
queue = new ArrayBlockingQueue<V>(defaultPoolSize, isFair);
/**
* If there is room in the pool, attempt to to create a new resource and
* add it to the pool. This method is cheap to call even if the pool is
* full (i.e., the first thing it does is looks a the current size of
* the pool relative to the max pool size.
*
* @param key
* @param objectFactory
* @return True if and only if a resource was successfully added to the
* pool.
* @throws Exception if there are issues creating a new object, or
* destroying newly created object that could not be added to
* the pool.
*
*/
public <K> boolean attemptGrow(K key, ResourceFactory<K, V> objectFactory) throws Exception {
if(this.size.get() >= this.maxPoolSize) {
return false;
}
if(this.size.incrementAndGet() <= this.maxPoolSize) {
try {
V resource = objectFactory.create(key);
if(resource != null) {
if(!nonBlockingPut(resource)) {
this.size.decrementAndGet();
// TODO: Do we need to destroy the non-null,
// non-enqueued resource?
objectFactory.destroy(key, resource);
return false;
}
}
} catch(Exception e) {
this.size.decrementAndGet();
throw e;
}
} else {
this.size.decrementAndGet();
return false;
}
return true;
}

public V nonBlockingGet() {
Expand Down
9 changes: 8 additions & 1 deletion src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
Expand Up @@ -96,10 +96,17 @@ public void requestResource(K key, ResourceRequest<V> resourceRequest) {
Queue<ResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
if(requestQueue.isEmpty()) {
Pool<V> resourcePool = getResourcePoolForKey(key);
try {
attemptGrow(key, resourcePool);
} catch(Exception e) {
resourceRequest.handleException(e);
return;
}

V resource = null;

try {
resource = attemptCheckoutGrowCheckout(key, resourcePool);
resource = attemptCheckout(resourcePool);
} catch(Exception e) {
super.destroyResource(key, resourcePool, resource);
resourceRequest.handleException(e);
Expand Down

0 comments on commit ca6dcd7

Please sign in to comment.