diff --git a/src/java/voldemort/utils/pool/KeyedResourcePool.java b/src/java/voldemort/utils/pool/KeyedResourcePool.java index 4f508fea99..ddb2841f56 100644 --- a/src/java/voldemort/utils/pool/KeyedResourcePool.java +++ b/src/java/voldemort/utils/pool/KeyedResourcePool.java @@ -14,7 +14,6 @@ import org.apache.log4j.Logger; -import voldemort.utils.Time; import voldemort.utils.Utils; /** @@ -24,24 +23,44 @@ *
  • allocates resources in FIFO order *
  • Pools are per key and there is no global maximum pool limit. * + * + * Invariants that this implementation does not guarantee: + * + * + * Phrased differently, the following is expected of the user of this class: + * */ public class KeyedResourcePool { private static final Logger logger = Logger.getLogger(KeyedResourcePool.class.getName()); + private final AtomicBoolean isOpen = new AtomicBoolean(true); private final ResourceFactory objectFactory; + private final ResourcePoolConfig resourcePoolConfig; private final ConcurrentMap> resourcePoolMap; - private final AtomicBoolean isOpen = new AtomicBoolean(true); - private final long timeoutNs; - private final int poolMaxSize; - private final boolean isFair; - public KeyedResourcePool(ResourceFactory objectFactory, ResourcePoolConfig config) { + public KeyedResourcePool(ResourceFactory objectFactory, + ResourcePoolConfig resourcePoolConfig) { this.objectFactory = Utils.notNull(objectFactory); - this.timeoutNs = Utils.notNull(config).getTimeout(TimeUnit.NANOSECONDS); - this.poolMaxSize = config.getMaxPoolSize(); + this.resourcePoolConfig = Utils.notNull(resourcePoolConfig); this.resourcePoolMap = new ConcurrentHashMap>(); - this.isFair = config.isFair(); } /** @@ -81,7 +100,6 @@ public static KeyedResourcePool create(ResourceFactory factor * timeout + object creation time or throw an exception. If an exception is * thrown, resource is guaranteed to be destroyed. * - * * @param key The key to checkout the resource for * @return The resource */ @@ -90,51 +108,36 @@ public V checkout(K key) throws Exception { long startNs = System.nanoTime(); Pool resourcePool = getResourcePoolForKey(key); + // Always attempt to grow. This protects against running out of + // resources because they were destroyed. + attemptGrow(key, resourcePool); V resource = null; try { checkNotClosed(); - resource = attemptCheckoutGrowCheckout(key, resourcePool); + resource = attemptCheckout(resourcePool); if(resource == null) { - long timeRemainingNs = this.timeoutNs - (System.nanoTime() - startNs); - if(timeRemainingNs < 0) - throw new TimeoutException("Could not acquire resource in " - + (this.timeoutNs / Time.NS_PER_MS) + " ms."); + long timeRemainingNs = resourcePoolConfig.getTimeout(TimeUnit.NANOSECONDS) + - (System.nanoTime() - startNs); + if(timeRemainingNs > 0) + resource = resourcePool.blockingGet(timeRemainingNs); - resource = resourcePool.blockingGet(timeoutNs); - if(resource == null) { - throw new TimeoutException("Timed out wait for resource after " - + (timeoutNs / Time.NS_PER_MS) + " ms."); - } + if(resource == null) + throw new TimeoutException("Could not acquire resource in " + + resourcePoolConfig.getTimeout(TimeUnit.MILLISECONDS) + + " ms."); } if(!objectFactory.validate(key, resource)) throw new ExcessiveInvalidResourcesException(1); } catch(Exception e) { destroyResource(key, resourcePool, resource); - System.err.println(e.toString()); throw e; } return resource; } - /* - * Checkout a free resource if one exists. If not, and there is space, try - * and create one. If you create one, try and checkout again. Returns null - * or a resource. - */ - protected V attemptCheckoutGrowCheckout(K key, Pool pool) throws Exception { - V resource = attemptCheckout(pool); - if(resource == null) { - if(attemptGrow(key, pool)) { - resource = attemptCheckout(pool); - } - } - - return resource; - } - /* * Get a free resource if one exists. This method does not block. It either * returns null or a resource. @@ -152,25 +155,7 @@ protected V attemptCheckout(Pool pool) throws Exception { * checkouts may occur.) */ protected boolean attemptGrow(K key, Pool pool) throws Exception { - if(pool.size.get() >= this.poolMaxSize) { - // "fail fast" if not worth trying to grow the pool. - return false; - } - // attempt to increment, and if the incremented value is less - // than the pool size then create a new resource - if(pool.size.incrementAndGet() <= this.poolMaxSize) { - try { - V resource = objectFactory.create(key); - pool.nonBlockingPut(resource); - } catch(Exception e) { - pool.size.decrementAndGet(); - throw e; - } - } else { - pool.size.decrementAndGet(); - return false; - } - return true; + return pool.attemptGrow(key, this.objectFactory); } /* @@ -179,7 +164,7 @@ protected boolean attemptGrow(K key, Pool pool) throws Exception { protected Pool getResourcePoolForKey(K key) { Pool resourcePool = resourcePoolMap.get(key); if(resourcePool == null) { - resourcePool = new Pool(this.poolMaxSize, this.isFair); + resourcePool = new Pool(this.resourcePoolConfig); resourcePoolMap.putIfAbsent(key, resourcePool); resourcePool = resourcePoolMap.get(key); } @@ -200,15 +185,17 @@ protected Pool getResourcePoolForExistingKey(K key) { /* * A safe wrapper to destroy the given resource that catches any user - * exceptions + * exceptions. */ protected void destroyResource(K key, Pool resourcePool, V resource) { if(resource != null) { try { objectFactory.destroy(key, resource); } catch(Exception e) { - logger.error("Exception while destorying invalid resource:", e); + logger.error("Exception while destroying invalid resource:", e); } finally { + // Assumes destroyed resource was in fact checked out of the + // pool. resourcePool.size.decrementAndGet(); } } @@ -226,7 +213,7 @@ public void checkin(K key, V resource) throws Exception { boolean success = resourcePool.nonBlockingPut(resource); if(!success) { destroyResource(key, resourcePool, resource); - throw new IllegalStateException("Checkin failed is the pool already full?"); + throw new IllegalStateException("Checkin failed. Is the pool already full?"); } } else { destroyResource(key, resourcePool, resource); @@ -320,8 +307,7 @@ public int getCheckedInResourceCount() { for(Entry> entry: this.resourcePoolMap.entrySet()) count += entry.getValue().queue.size(); // count is approximate in the case of concurrency since .queue.size() - // for - // various entries can change while other entries are being counted. + // for various entries can change while other entries are being counted. return count; } @@ -335,15 +321,62 @@ protected void checkNotClosed() { } /** - * A simple pool that uses an ArrayBlockingQueue + * A fixed size pool that uses an ArrayBlockingQueue. The pool grows to no + * more than some specified maxPoolSize. The pool creates new resources in + * the face of existing resources being destroyed. + * */ protected static class Pool { - final BlockingQueue queue; - final AtomicInteger size = new AtomicInteger(0); + final private AtomicInteger size = new AtomicInteger(0); + final private int maxPoolSize; + final private BlockingQueue queue; + + public Pool(ResourcePoolConfig resourcePoolConfig) { + this.maxPoolSize = resourcePoolConfig.getMaxPoolSize(); + queue = new ArrayBlockingQueue(this.maxPoolSize, resourcePoolConfig.isFair()); + } - public Pool(int defaultPoolSize, boolean isFair) { - queue = new ArrayBlockingQueue(defaultPoolSize, isFair); + /** + * If there is room in the pool, attempt to to create a new resource and + * add it to the pool. This method is cheap to call even if the pool is + * full (i.e., the first thing it does is looks a the current size of + * the pool relative to the max pool size. + * + * @param key + * @param objectFactory + * @return True if and only if a resource was successfully added to the + * pool. + * @throws Exception if there are issues creating a new object, or + * destroying newly created object that could not be added to + * the pool. + * + */ + public boolean attemptGrow(K key, ResourceFactory objectFactory) throws Exception { + if(this.size.get() >= this.maxPoolSize) { + return false; + } + if(this.size.incrementAndGet() <= this.maxPoolSize) { + try { + V resource = objectFactory.create(key); + if(resource != null) { + if(!nonBlockingPut(resource)) { + this.size.decrementAndGet(); + // TODO: Do we need to destroy the non-null, + // non-enqueued resource? + objectFactory.destroy(key, resource); + return false; + } + } + } catch(Exception e) { + this.size.decrementAndGet(); + throw e; + } + } else { + this.size.decrementAndGet(); + return false; + } + return true; } public V nonBlockingGet() { diff --git a/src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java b/src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java index 8e02772162..87f9d5ba5f 100644 --- a/src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java +++ b/src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java @@ -96,10 +96,17 @@ public void requestResource(K key, ResourceRequest resourceRequest) { Queue> requestQueue = getRequestQueueForKey(key); if(requestQueue.isEmpty()) { Pool resourcePool = getResourcePoolForKey(key); + try { + attemptGrow(key, resourcePool); + } catch(Exception e) { + resourceRequest.handleException(e); + return; + } + V resource = null; try { - resource = attemptCheckoutGrowCheckout(key, resourcePool); + resource = attemptCheckout(resourcePool); } catch(Exception e) { super.destroyResource(key, resourcePool, resource); resourceRequest.handleException(e); diff --git a/test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java b/test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java index f5b573dcae..c70830b284 100644 --- a/test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java +++ b/test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java @@ -1,38 +1,49 @@ package voldemort.utils.pool; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.Random; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import junit.framework.TestCase; +import org.junit.Before; +import org.junit.Test; + import voldemort.utils.Time; -public class KeyedResourcePoolTest extends TestCase { +public class KeyedResourcePoolTest { private static int POOL_SIZE = 5; private static long TIMEOUT_MS = 100; - private static int MAX_ATTEMPTS = 10; private TestResourceFactory factory; private KeyedResourcePool pool; private ResourcePoolConfig config; - @Override + @Before public void setUp() { factory = new TestResourceFactory(); config = new ResourcePoolConfig().setMaxPoolSize(POOL_SIZE) - .setTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS) - .setMaxInvalidAttempts(MAX_ATTEMPTS); + .setTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS); this.pool = new KeyedResourcePool(factory, config); } + @Test public void testResourcePoolConfigTimeout() { // Issue 343 assertEquals(config.getTimeout(TimeUnit.MILLISECONDS), TIMEOUT_MS); assertEquals(config.getTimeout(TimeUnit.NANOSECONDS), TIMEOUT_MS * Time.NS_PER_MS); } + @Test public void testPoolingOccurs() throws Exception { TestResource r1 = this.pool.checkout("a"); this.pool.checkin("a", r1); @@ -41,6 +52,7 @@ public void testPoolingOccurs() throws Exception { r1 == r2); } + @Test public void testFullPoolBlocks() throws Exception { for(int i = 0; i < POOL_SIZE; i++) this.pool.checkout("a"); @@ -53,42 +65,167 @@ public void testFullPoolBlocks() throws Exception { } } - public void testExceptions() throws Exception { - // we should start with an empty pool - assertEquals(0, this.pool.getTotalResourceCount()); + @Test + public void testExceptionOnDestroy() throws Exception { + assertTrue("POOL_SIZE is not big enough", POOL_SIZE >= 2); Exception toThrow = new Exception("An exception!"); - - // test exception on destroy - TestResource checkedOut = this.pool.checkout("a"); - assertEquals(1, this.pool.getTotalResourceCount()); - assertEquals(0, this.pool.getCheckedInResourceCount()); this.factory.setDestroyException(toThrow); + + assertEquals(0, this.pool.getTotalResourceCount()); try { + TestResource checkedOut = this.pool.checkout("a"); + assertFalse(checkedOut.isDestroyed()); + assertEquals(1, this.factory.getCreated()); + assertEquals(1, this.pool.getTotalResourceCount()); + assertEquals(0, this.pool.getCheckedInResourceCount()); + this.pool.checkin("a", checkedOut); - // checking out again should force destroy + assertEquals(1, this.factory.getCreated()); + assertEquals(1, this.pool.getTotalResourceCount()); + assertEquals(1, this.pool.getCheckedInResourceCount()); + this.pool.checkout("a"); - assertTrue(checkedOut.isDestroyed()); + assertEquals(2, this.factory.getCreated()); + assertEquals(2, this.pool.getTotalResourceCount()); + assertEquals(1, this.pool.getCheckedInResourceCount()); + + for(int i = 0; i < POOL_SIZE - 1; i++) { + checkedOut = this.pool.checkout("a"); + assertFalse(checkedOut.isDestroyed()); + } + assertEquals(POOL_SIZE, this.factory.getCreated()); + assertEquals(POOL_SIZE, this.pool.getTotalResourceCount()); + assertEquals(0, this.pool.getCheckedInResourceCount()); + assertEquals(0, this.factory.getDestroyed()); + + checkedOut.invalidate(); + try { + // pool.checkin should catch and print out the destroy + // exception. + this.pool.checkin("a", checkedOut); + } catch(Exception caught) { + fail("No exception expected."); + } + assertEquals(POOL_SIZE - 1, this.pool.getTotalResourceCount()); + assertEquals(0, this.pool.getCheckedInResourceCount()); + assertEquals(0, this.factory.getDestroyed()); + + this.pool.checkout("a"); + assertEquals(POOL_SIZE + 1, this.factory.getCreated()); + assertEquals(POOL_SIZE, this.pool.getTotalResourceCount()); + assertEquals(0, this.pool.getCheckedInResourceCount()); + assertEquals(0, this.factory.getDestroyed()); } catch(Exception caught) { fail("No exception expected."); } - assertEquals(1, this.pool.getTotalResourceCount()); - assertEquals(0, this.pool.getCheckedInResourceCount()); + } + @Test + public void testExceptionOnCreate() throws Exception { + Exception toThrow = new Exception("An exception!"); + + assertEquals(0, this.pool.getTotalResourceCount()); this.factory.setCreateException(toThrow); try { this.pool.checkout("b"); - fail("Excpected exception!"); + fail("Expected exception!"); } catch(Exception caught) { - assertEquals("The exception thrown by the factory should propage to the caller.", + assertEquals("The exception thrown by the factory should propagate to the caller.", toThrow, caught); } - // failed checkout shouldn't effect count + // failed checkout shouldn't affect count + assertEquals(0, this.pool.getTotalResourceCount()); + assertEquals(0, this.pool.getCheckedInResourceCount()); + } + + @Test + public void repeatedCheckins() throws Exception { + assertEquals(0, this.pool.getTotalResourceCount()); + + TestResource resource = this.pool.checkout("a"); + assertEquals(1, this.factory.getCreated()); assertEquals(1, this.pool.getTotalResourceCount()); assertEquals(0, this.pool.getCheckedInResourceCount()); + + this.pool.checkin("a", resource); + assertEquals(1, this.factory.getCreated()); + assertEquals(1, this.pool.getTotalResourceCount()); + assertEquals(1, this.pool.getCheckedInResourceCount()); + + this.pool.checkin("a", resource); + assertEquals(1, this.factory.getCreated()); + assertEquals(1, this.pool.getTotalResourceCount()); + // KeyedResourcePool does not protect against repeated checkins. It + // Should. If it did, then the commented out test below would be + // correct. + assertEquals(2, this.pool.getCheckedInResourceCount()); + // assertEquals(1, this.pool.getCheckedInResourceCount()); } + @Test + public void testExceptionOnFullCheckin() throws Exception { + assertEquals(0, this.pool.getTotalResourceCount()); + + Queue resources = new LinkedList(); + for(int i = 0; i < POOL_SIZE; i++) { + TestResource resource = this.pool.checkout("a"); + resources.add(resource); + } + assertEquals(POOL_SIZE, this.pool.getTotalResourceCount()); + + for(int i = 0; i < POOL_SIZE; i++) { + this.pool.checkin("a", resources.poll()); + } + assertEquals(POOL_SIZE, this.pool.getTotalResourceCount()); + + TestResource extraResource = this.factory.create("a"); + try { + this.pool.checkin("a", extraResource); + fail("Checking in an extra resource should throw an exception."); + } catch(IllegalStateException ise) { + // this is good + } + + // KeyedResourcePool does not protect against repeated or extraneous + // checkins. If an extraneous checkin occurs, then the checked in + // resource is destroyed and the size of the resource pool is reduced by + // one (even though it should not be in this exceptional case). + assertEquals(POOL_SIZE - 1, this.pool.getTotalResourceCount()); + // assertEquals(POOL_SIZE, this.pool.getTotalResourceCount()); + } + + @Test + public void testCheckinExtraneousResource() throws Exception { + assertEquals(0, this.pool.getTotalResourceCount()); + + TestResource resource = this.pool.checkout("a"); + this.pool.checkin("a", resource); + + TestResource extraResource = this.factory.create("a"); + // KeyedResourcePool should not permit random resources to be checked + // in. Until it protects against arbitrary resources being checked in, + // it is possible to checkin an extraneous resource. + this.pool.checkin("a", extraResource); + assertEquals(1, this.pool.getTotalResourceCount()); + assertEquals(2, this.pool.getCheckedInResourceCount()); + } + + @Test + public void testNeverCheckin() throws Exception { + assertEquals(0, this.pool.getTotalResourceCount()); + + { + this.pool.checkout("a"); + } + // KeyedResourcePool does not protect against resources being checked + // out and never checked back in (or destroyed). + assertEquals(1, this.pool.getTotalResourceCount()); + assertEquals(0, this.pool.getCheckedInResourceCount()); + } + + @Test public void testInvalidIsDestroyed() throws Exception { TestResource r1 = this.pool.checkout("a"); r1.invalidate(); @@ -96,8 +233,10 @@ public void testInvalidIsDestroyed() throws Exception { TestResource r2 = this.pool.checkout("a"); assertTrue("Invalid objects should be destroyed.", r1 != r2); assertTrue("Invalid objects should be destroyed.", r1.isDestroyed()); + assertEquals(1, this.factory.getDestroyed()); } + @Test public void testMaxInvalidCreations() throws Exception { this.factory.setCreatedValid(false); try { @@ -108,6 +247,103 @@ public void testMaxInvalidCreations() throws Exception { } } + // This method was helpful when developing contendForResources + public void printStats(String key) { + System.err.println(""); + System.err.println("getCreated: " + this.factory.getCreated()); + System.err.println("getDestroyed: " + this.factory.getDestroyed()); + System.err.println("getTotalResourceCount(key): " + this.pool.getTotalResourceCount(key)); + System.err.println("getTotalResourceCount(): " + this.pool.getTotalResourceCount()); + System.err.println("getCheckedInResourcesCount(key): " + + this.pool.getCheckedInResourcesCount(key)); + System.err.println("getCheckedInResourceCount(): " + this.pool.getCheckedInResourceCount()); + } + + @Test + public void contendForResources() throws Exception { + int numCheckers = POOL_SIZE * 2; + int numChecks = 10 * 1000; + String key = "Key"; + float invalidationRate = (float) 0.25; + CountDownLatch waitForThreads = new CountDownLatch(numCheckers); + CountDownLatch waitForCheckers = new CountDownLatch(numCheckers); + for(int i = 0; i < numCheckers; ++i) { + new Thread(new Checkers(waitForThreads, + waitForCheckers, + key, + numChecks, + invalidationRate)).start(); + } + + try { + waitForCheckers.await(); + assertEquals(POOL_SIZE, this.pool.getTotalResourceCount()); + assertEquals(POOL_SIZE, this.pool.getCheckedInResourceCount()); + } catch(InterruptedException e) { + e.printStackTrace(); + } + + } + + public class Checkers implements Runnable { + + private final CountDownLatch startSignal; + private final CountDownLatch doneSignal; + + private final String key; + private final int checks; + + private Random random; + private float invalidationRate; + + Checkers(CountDownLatch startSignal, + CountDownLatch doneSignal, + String key, + int checks, + float invalidationRate) { + this.startSignal = startSignal; + this.doneSignal = doneSignal; + + this.key = key; + this.checks = checks; + + this.random = new Random(); + this.invalidationRate = invalidationRate; + } + + public void run() { + startSignal.countDown(); + try { + startSignal.await(); + } catch(InterruptedException e) { + e.printStackTrace(); + } + + try { + TestResource tr = null; + for(int i = 0; i < checks; ++i) { + tr = pool.checkout(key); + assertTrue(tr.isValid()); + + // Invalid some resources (except on last checkin) + float f = random.nextFloat(); + if(f < invalidationRate && i != checks - 1) { + tr.invalidate(); + } + Thread.yield(); + + pool.checkin(key, tr); + Thread.yield(); + + // if(i % 1000 == 0) { printStats(key); } + } + } catch(Exception e) { + fail(e.toString()); + } + doneSignal.countDown(); + } + } + private static class TestResource { private String value; @@ -117,7 +353,7 @@ private static class TestResource { public TestResource(String value) { this.value = value; this.isValid = new AtomicBoolean(true); - this.isDestroyed = new AtomicBoolean(true); + this.isDestroyed = new AtomicBoolean(false); } public boolean isValid() { @@ -171,12 +407,10 @@ public boolean validate(String key, TestResource value) { return value.isValid(); } - @SuppressWarnings("unused") public int getCreated() { return this.created.get(); } - @SuppressWarnings("unused") public int getDestroyed() { return this.destroyed.get(); }