Permalink
Browse files

Clean up of KeyedResourcePool and significant hardening of the unit t…

…est.

src/java/voldemort/utils/pool/KeyedResourcePool.java
- Documented the invariants (or lack thereof) guaranteed by this class.
- Documented this classes expectations of its users.
- Moved attemptGrow into the inner Pool class.
- Got rid of the attemptCheckoutGrowCheckout method. ;)

src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
- tweaked to match revised attemptGrow interface

test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java
- Added a bunch of 'negative' tests. I.e., they demonstrate
  non-desirable behavior of current KeyedResourcePool.
- Added a contention test that has many threads checkout,
  possibly invalidate, and then checkin resources for some key.
  • Loading branch information...
1 parent 3841081 commit ca6dcd7b2506377e0252a074dfc3507379018023 @jayjwylie jayjwylie committed Aug 29, 2012
@@ -14,7 +14,6 @@
import org.apache.log4j.Logger;
-import voldemort.utils.Time;
import voldemort.utils.Utils;
/**
@@ -24,24 +23,44 @@
* <li>allocates resources in FIFO order
* <li>Pools are per key and there is no global maximum pool limit.
* </ul>
+ *
+ * Invariants that this implementation does not guarantee:
+ * <ul>
+ * <li>A checked in resource was previously checked out. (I.e., user can use
+ * ResourceFactory and then check in a resource that this pool did not create.)
+ * <li>A checked out resource is checked in at most once. (I.e., a user does not
+ * call check in on a checked out resource more than once.)
+ * <li>User no longer has a reference to a checked in resource. (I.e., user can
+ * keep using the resource after it invokes check in.)
+ * <li>A resource that is checked out is eventually either checked in or
+ * destroyed via objectFactory.destroy(). (I.e., a user can squat on a resource
+ * or let its reference to the resource lapse without checking the resource in
+ * or destroying the resource.)
+ * </ul>
+ *
+ * Phrased differently, the following is expected of the user of this class:
+ * <ul>
+ * <li>A checked out resource is checked in exactly once.
+ * <li>A resource that is checked in was previously checked out.
+ * <li>A resource that is checked in is never used again. / No reference is
+ * retained to a checked in resource.
+ * <li>Also, checkout is never called after close.
+ * </ul>
*/
public class KeyedResourcePool<K, V> {
private static final Logger logger = Logger.getLogger(KeyedResourcePool.class.getName());
+ private final AtomicBoolean isOpen = new AtomicBoolean(true);
private final ResourceFactory<K, V> objectFactory;
+ private final ResourcePoolConfig resourcePoolConfig;
private final ConcurrentMap<K, Pool<V>> resourcePoolMap;
- private final AtomicBoolean isOpen = new AtomicBoolean(true);
- private final long timeoutNs;
- private final int poolMaxSize;
- private final boolean isFair;
- public KeyedResourcePool(ResourceFactory<K, V> objectFactory, ResourcePoolConfig config) {
+ public KeyedResourcePool(ResourceFactory<K, V> objectFactory,
+ ResourcePoolConfig resourcePoolConfig) {
this.objectFactory = Utils.notNull(objectFactory);
- this.timeoutNs = Utils.notNull(config).getTimeout(TimeUnit.NANOSECONDS);
- this.poolMaxSize = config.getMaxPoolSize();
+ this.resourcePoolConfig = Utils.notNull(resourcePoolConfig);
this.resourcePoolMap = new ConcurrentHashMap<K, Pool<V>>();
- this.isFair = config.isFair();
}
/**
@@ -81,7 +100,6 @@ public KeyedResourcePool(ResourceFactory<K, V> objectFactory, ResourcePoolConfig
* timeout + object creation time or throw an exception. If an exception is
* thrown, resource is guaranteed to be destroyed.
*
- *
* @param key The key to checkout the resource for
* @return The resource
*/
@@ -90,52 +108,37 @@ public V checkout(K key) throws Exception {
long startNs = System.nanoTime();
Pool<V> resourcePool = getResourcePoolForKey(key);
+ // Always attempt to grow. This protects against running out of
+ // resources because they were destroyed.
+ attemptGrow(key, resourcePool);
V resource = null;
try {
checkNotClosed();
- resource = attemptCheckoutGrowCheckout(key, resourcePool);
+ resource = attemptCheckout(resourcePool);
if(resource == null) {
- long timeRemainingNs = this.timeoutNs - (System.nanoTime() - startNs);
- if(timeRemainingNs < 0)
- throw new TimeoutException("Could not acquire resource in "
- + (this.timeoutNs / Time.NS_PER_MS) + " ms.");
+ long timeRemainingNs = resourcePoolConfig.getTimeout(TimeUnit.NANOSECONDS)
+ - (System.nanoTime() - startNs);
+ if(timeRemainingNs > 0)
+ resource = resourcePool.blockingGet(timeRemainingNs);
- resource = resourcePool.blockingGet(timeoutNs);
- if(resource == null) {
- throw new TimeoutException("Timed out wait for resource after "
- + (timeoutNs / Time.NS_PER_MS) + " ms.");
- }
+ if(resource == null)
+ throw new TimeoutException("Could not acquire resource in "
+ + resourcePoolConfig.getTimeout(TimeUnit.MILLISECONDS)
+ + " ms.");
}
if(!objectFactory.validate(key, resource))
throw new ExcessiveInvalidResourcesException(1);
} catch(Exception e) {
destroyResource(key, resourcePool, resource);
- System.err.println(e.toString());
throw e;
}
return resource;
}
/*
- * Checkout a free resource if one exists. If not, and there is space, try
- * and create one. If you create one, try and checkout again. Returns null
- * or a resource.
- */
- protected V attemptCheckoutGrowCheckout(K key, Pool<V> pool) throws Exception {
- V resource = attemptCheckout(pool);
- if(resource == null) {
- if(attemptGrow(key, pool)) {
- resource = attemptCheckout(pool);
- }
- }
-
- return resource;
- }
-
- /*
* Get a free resource if one exists. This method does not block. It either
* returns null or a resource.
*/
@@ -152,25 +155,7 @@ protected V attemptCheckout(Pool<V> pool) throws Exception {
* checkouts may occur.)
*/
protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
- if(pool.size.get() >= this.poolMaxSize) {
- // "fail fast" if not worth trying to grow the pool.
- return false;
- }
- // attempt to increment, and if the incremented value is less
- // than the pool size then create a new resource
- if(pool.size.incrementAndGet() <= this.poolMaxSize) {
- try {
- V resource = objectFactory.create(key);
- pool.nonBlockingPut(resource);
- } catch(Exception e) {
- pool.size.decrementAndGet();
- throw e;
- }
- } else {
- pool.size.decrementAndGet();
- return false;
- }
- return true;
+ return pool.attemptGrow(key, this.objectFactory);
}
/*
@@ -179,7 +164,7 @@ protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
protected Pool<V> getResourcePoolForKey(K key) {
Pool<V> resourcePool = resourcePoolMap.get(key);
if(resourcePool == null) {
- resourcePool = new Pool<V>(this.poolMaxSize, this.isFair);
+ resourcePool = new Pool<V>(this.resourcePoolConfig);
resourcePoolMap.putIfAbsent(key, resourcePool);
resourcePool = resourcePoolMap.get(key);
}
@@ -200,15 +185,17 @@ protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
/*
* A safe wrapper to destroy the given resource that catches any user
- * exceptions
+ * exceptions.
*/
protected void destroyResource(K key, Pool<V> resourcePool, V resource) {
if(resource != null) {
try {
objectFactory.destroy(key, resource);
} catch(Exception e) {
- logger.error("Exception while destorying invalid resource:", e);
+ logger.error("Exception while destroying invalid resource:", e);
} finally {
+ // Assumes destroyed resource was in fact checked out of the
+ // pool.
resourcePool.size.decrementAndGet();
}
}
@@ -226,7 +213,7 @@ public void checkin(K key, V resource) throws Exception {
boolean success = resourcePool.nonBlockingPut(resource);
if(!success) {
destroyResource(key, resourcePool, resource);
- throw new IllegalStateException("Checkin failed is the pool already full?");
+ throw new IllegalStateException("Checkin failed. Is the pool already full?");
}
} else {
destroyResource(key, resourcePool, resource);
@@ -320,8 +307,7 @@ public int getCheckedInResourceCount() {
for(Entry<K, Pool<V>> entry: this.resourcePoolMap.entrySet())
count += entry.getValue().queue.size();
// count is approximate in the case of concurrency since .queue.size()
- // for
- // various entries can change while other entries are being counted.
+ // for various entries can change while other entries are being counted.
return count;
}
@@ -335,15 +321,62 @@ protected void checkNotClosed() {
}
/**
- * A simple pool that uses an ArrayBlockingQueue
+ * A fixed size pool that uses an ArrayBlockingQueue. The pool grows to no
+ * more than some specified maxPoolSize. The pool creates new resources in
+ * the face of existing resources being destroyed.
+ *
*/
protected static class Pool<V> {
- final BlockingQueue<V> queue;
- final AtomicInteger size = new AtomicInteger(0);
+ final private AtomicInteger size = new AtomicInteger(0);
+ final private int maxPoolSize;
+ final private BlockingQueue<V> queue;
+
+ public Pool(ResourcePoolConfig resourcePoolConfig) {
+ this.maxPoolSize = resourcePoolConfig.getMaxPoolSize();
+ queue = new ArrayBlockingQueue<V>(this.maxPoolSize, resourcePoolConfig.isFair());
+ }
- public Pool(int defaultPoolSize, boolean isFair) {
- queue = new ArrayBlockingQueue<V>(defaultPoolSize, isFair);
+ /**
+ * If there is room in the pool, attempt to to create a new resource and
+ * add it to the pool. This method is cheap to call even if the pool is
+ * full (i.e., the first thing it does is looks a the current size of
+ * the pool relative to the max pool size.
+ *
+ * @param key
+ * @param objectFactory
+ * @return True if and only if a resource was successfully added to the
+ * pool.
+ * @throws Exception if there are issues creating a new object, or
+ * destroying newly created object that could not be added to
+ * the pool.
+ *
+ */
+ public <K> boolean attemptGrow(K key, ResourceFactory<K, V> objectFactory) throws Exception {
+ if(this.size.get() >= this.maxPoolSize) {
+ return false;
+ }
+ if(this.size.incrementAndGet() <= this.maxPoolSize) {
+ try {
+ V resource = objectFactory.create(key);
+ if(resource != null) {
+ if(!nonBlockingPut(resource)) {
+ this.size.decrementAndGet();
+ // TODO: Do we need to destroy the non-null,
+ // non-enqueued resource?
+ objectFactory.destroy(key, resource);
+ return false;
+ }
+ }
+ } catch(Exception e) {
+ this.size.decrementAndGet();
+ throw e;
+ }
+ } else {
+ this.size.decrementAndGet();
+ return false;
+ }
+ return true;
}
public V nonBlockingGet() {
@@ -96,10 +96,17 @@ public void requestResource(K key, ResourceRequest<V> resourceRequest) {
Queue<ResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
if(requestQueue.isEmpty()) {
Pool<V> resourcePool = getResourcePoolForKey(key);
+ try {
+ attemptGrow(key, resourcePool);
+ } catch(Exception e) {
+ resourceRequest.handleException(e);
+ return;
+ }
+
V resource = null;
try {
- resource = attemptCheckoutGrowCheckout(key, resourcePool);
+ resource = attemptCheckout(resourcePool);
} catch(Exception e) {
super.destroyResource(key, resourcePool, resource);
resourceRequest.handleException(e);
Oops, something went wrong.

0 comments on commit ca6dcd7

Please sign in to comment.