From 2c3c94421e83e7d60a872ed795603ef7d4c39683 Mon Sep 17 00:00:00 2001 From: Jay Kreps Date: Thu, 23 Jul 2009 23:48:06 -0700 Subject: [PATCH] More tests for resource pool. --- .../voldemort/store/socket/SocketPool.java | 4 +- .../utils/pool/KeyedResourcePool.java | 55 ++++----- .../utils/pool/ResourcePoolConfig.java | 14 +-- .../performance/ResourcePoolPerfTest.java | 103 ++++++++++++++++ .../socketpool/AbstractSocketPoolTest.java | 8 +- .../socketpool/SimpleSocketPoolTest.java | 6 +- .../utils/pool/KeyedResourcePoolTest.java | 114 +++++++++++------- 7 files changed, 214 insertions(+), 90 deletions(-) create mode 100644 test/integration/voldemort/performance/ResourcePoolPerfTest.java diff --git a/src/java/voldemort/store/socket/SocketPool.java b/src/java/voldemort/store/socket/SocketPool.java index 510c533427..89349d5584 100644 --- a/src/java/voldemort/store/socket/SocketPool.java +++ b/src/java/voldemort/store/socket/SocketPool.java @@ -52,8 +52,8 @@ public SocketPool(int maxConnectionsPerNode, int soTimeoutMs, int socketBufferSize) { ResourcePoolConfig config = new ResourcePoolConfig().setIsFair(true) - .setPoolSize(maxConnectionsPerNode) - .setMaximumInvalidResourceCreationLimit(maxConnectionsPerNode) + .setMaxPoolSize(maxConnectionsPerNode) + .setMaxInvalidAttempts(maxConnectionsPerNode) .setTimeout(connectionTimeoutMs, TimeUnit.MILLISECONDS); this.socketFactory = new SocketResourceFactory(soTimeoutMs, socketBufferSize); diff --git a/src/java/voldemort/utils/pool/KeyedResourcePool.java b/src/java/voldemort/utils/pool/KeyedResourcePool.java index fded9e0546..276a87c347 100644 --- a/src/java/voldemort/utils/pool/KeyedResourcePool.java +++ b/src/java/voldemort/utils/pool/KeyedResourcePool.java @@ -31,14 +31,14 @@ public class KeyedResourcePool { private final ConcurrentMap> resourcesMap; private final AtomicBoolean isOpen = new AtomicBoolean(true); private final long timeoutNs; - private final int poolSize; + private final int poolMaxSize; private final int maxCreateAttempts; private final boolean isFair; public KeyedResourcePool(ResourceFactory objectFactory, ResourcePoolConfig config) { this.objectFactory = Utils.notNull(objectFactory); this.timeoutNs = Utils.notNull(config).getTimeout(TimeUnit.NANOSECONDS); - this.poolSize = config.getPoolSize(); + this.poolMaxSize = config.getMaxPoolSize(); this.maxCreateAttempts = config.getMaximumInvalidResourceCreationLimit(); this.resourcesMap = new ConcurrentHashMap>(); this.isFair = config.isFair(); @@ -94,7 +94,7 @@ public V checkout(K key) throws Exception { V resource = null; try { int attempts = 0; - do { + for(; attempts < this.maxCreateAttempts; attempts++) { checkNotClosed(); long timeRemainingNs = this.timeoutNs - (System.nanoTime() - startNs); if(timeRemainingNs < 0) @@ -103,13 +103,14 @@ public V checkout(K key) throws Exception { resource = checkoutOrCreateResource(key, resources, timeRemainingNs); if(objectFactory.validate(key, resource)) return resource; - } while(++attempts < this.maxCreateAttempts); + else + destroyResource(key, resources, resource); + } + throw new ExcessiveInvalidResourcesException(attempts); } catch(Exception e) { destroyResource(key, resources, resource); throw e; } - - return resource; } /* @@ -124,12 +125,11 @@ private V checkoutOrCreateResource(K key, Pool pool, long timeoutNs) throws E if(resource != null) return resource; - // okay the queue is empty, maybe we have room to create a new resource - resource = attemptCreate(key, pool); - if(resource != null) - return resource; + // okay the queue is empty, maybe we have room to expand a bit? + if(pool.size.get() < this.poolMaxSize) + attemptGrow(key, pool); - // pool has reached max size, block for next available resource + // now block for next available resource resource = pool.blockingGet(timeoutNs); if(resource == null) throw new TimeoutException("Timed out wait for resource after " @@ -139,28 +139,23 @@ private V checkoutOrCreateResource(K key, Pool pool, long timeoutNs) throws E } /* - * Attempt to create a new object in the pool if there is room. Return the - * object if created, else null. + * Attempt to create a new object and add it to the pool--this only happens + * if there is room for the new object. */ - private V attemptCreate(K key, Pool pool) throws Exception { - V resource = null; - // do a sanity check on the size - if(pool.size.get() < this.poolSize) { - // now attempt to increment, and if the incremented value is less - // than the pool size then create a new resource - if(pool.size.incrementAndGet() <= this.poolSize) { - try { - resource = objectFactory.create(key); - } catch(Exception e) { - pool.size.decrementAndGet(); - throw e; - } - } else { + private void attemptGrow(K key, Pool pool) throws Exception { + // attempt to increment, and if the incremented value is less + // than the pool size then create a new resource + if(pool.size.incrementAndGet() <= this.poolMaxSize) { + try { + V resource = objectFactory.create(key); + pool.nonBlockingPut(resource); + } catch(Exception e) { pool.size.decrementAndGet(); + throw e; } + } else { + pool.size.decrementAndGet(); } - - return resource; } /* @@ -169,7 +164,7 @@ private V attemptCreate(K key, Pool pool) throws Exception { private Pool getResourcePoolForKey(K key) { Pool pool = resourcesMap.get(key); if(pool == null) { - pool = new Pool(this.poolSize, this.isFair); + pool = new Pool(this.poolMaxSize, this.isFair); resourcesMap.putIfAbsent(key, pool); pool = resourcesMap.get(key); } diff --git a/src/java/voldemort/utils/pool/ResourcePoolConfig.java b/src/java/voldemort/utils/pool/ResourcePoolConfig.java index 4640b9d776..e5c319e85a 100644 --- a/src/java/voldemort/utils/pool/ResourcePoolConfig.java +++ b/src/java/voldemort/utils/pool/ResourcePoolConfig.java @@ -11,7 +11,7 @@ public class ResourcePoolConfig { /* Note: if you change the defaults you must update the javadoc as well. */ - private int poolSize = 100; + private int poolMaxSize = 20; private long timeoutNs = Long.MAX_VALUE; private int maxInvalidResourceCreations = Integer.MAX_VALUE; private boolean isFair = true; @@ -23,21 +23,21 @@ public ResourcePoolConfig() { /** * Get the size of the pool */ - public int getPoolSize() { - return poolSize; + public int getMaxPoolSize() { + return poolMaxSize; } /** * The size of the pool to maintain for each key. * - * The default pool size is 100 + * The default pool size is 20 * * @param poolSize The desired per-key pool size */ - public ResourcePoolConfig setPoolSize(int poolSize) { + public ResourcePoolConfig setMaxPoolSize(int poolSize) { if(poolSize <= 0) throw new IllegalArgumentException("Pool size must be a positive number."); - this.poolSize = poolSize; + this.poolMaxSize = poolSize; return this; } @@ -74,7 +74,7 @@ public ResourcePoolConfig setTimeout(long timeout, TimeUnit unit) { * * @param limit The desired limit */ - public ResourcePoolConfig setMaximumInvalidResourceCreationLimit(int limit) { + public ResourcePoolConfig setMaxInvalidAttempts(int limit) { if(limit <= 0) throw new IllegalArgumentException("Limit must be a positive number."); this.maxInvalidResourceCreations = limit; diff --git a/test/integration/voldemort/performance/ResourcePoolPerfTest.java b/test/integration/voldemort/performance/ResourcePoolPerfTest.java new file mode 100644 index 0000000000..c30bb7d0fa --- /dev/null +++ b/test/integration/voldemort/performance/ResourcePoolPerfTest.java @@ -0,0 +1,103 @@ +package voldemort.performance; + +import java.text.NumberFormat; + +import org.apache.commons.pool.KeyedObjectPool; +import org.apache.commons.pool.KeyedPoolableObjectFactory; +import org.apache.commons.pool.impl.GenericKeyedObjectPool; + +import voldemort.utils.pool.KeyedResourcePool; +import voldemort.utils.pool.ResourceFactory; +import voldemort.utils.pool.ResourcePoolConfig; + +public class ResourcePoolPerfTest { + + public static void main(String[] args) throws Exception { + + final int numKeys = 10; + final int numThreads = 10; + final int numRequests = 10000000; + NumberFormat format = NumberFormat.getInstance(); + format.setMaximumFractionDigits(2); + + for(int poolSize: new int[] { 1, 5, 10 }) { + System.out.println("Perf test for voldemort pool with numThreads = " + numThreads + + ", poolSize = " + poolSize + ":"); + final KeyedResourcePool pool = KeyedResourcePool.create(new StringResourceFactory(), + new ResourcePoolConfig().setMaxPoolSize(poolSize) + .setIsFair(true)); + PerformanceTest test = new PerformanceTest() { + + @Override + public void doOperation(int id) throws Exception { + Integer key = id % numKeys; + String s = pool.checkout(key); + pool.checkin(key, s); + } + }; + test.run(numRequests, numThreads); + test.printStats(); + System.out.println(); + } + + System.out.println("--------------------------------------"); + System.out.println(); + + for(int poolSize: new int[] { 1, 5, 10 }) { + System.out.println("Perf test for commons pool with numThreads = " + numThreads + + ", poolSize = " + poolSize + ":"); + GenericKeyedObjectPool.Config config = new GenericKeyedObjectPool.Config(); + config.maxActive = poolSize; + config.testOnBorrow = true; + config.whenExhaustedAction = GenericKeyedObjectPool.WHEN_EXHAUSTED_BLOCK; + config.maxWait = 10000; + StringPoolableObjectFactory objFactory = new StringPoolableObjectFactory(); + final KeyedObjectPool pool = new GenericKeyedObjectPool(objFactory, config); + PerformanceTest test = new PerformanceTest() { + + @Override + public void doOperation(int id) throws Exception { + Integer key = id % numKeys; + String s = (String) pool.borrowObject(key); + pool.returnObject(key, s); + } + }; + test.run(numRequests, numThreads); + test.printStats(); + System.out.println(); + } + + } + + private static class StringResourceFactory implements ResourceFactory { + + public String create(Integer key) { + return new String(key + "-val"); + } + + public void destroy(Integer key, String obj) {} + + public boolean validate(Integer key, String value) { + return true; + } + } + + private static class StringPoolableObjectFactory implements KeyedPoolableObjectFactory { + + public void activateObject(Object k, Object v) {} + + public void passivateObject(Object k, Object v) {} + + public void destroyObject(Object k, Object v) {} + + public Object makeObject(Object k) { + return new String(k + "-val"); + } + + public boolean validateObject(Object k, Object v) { + return true; + } + + } + +} diff --git a/test/integration/voldemort/socketpool/AbstractSocketPoolTest.java b/test/integration/voldemort/socketpool/AbstractSocketPoolTest.java index 1285619ab7..c665755cf8 100644 --- a/test/integration/voldemort/socketpool/AbstractSocketPoolTest.java +++ b/test/integration/voldemort/socketpool/AbstractSocketPoolTest.java @@ -50,9 +50,9 @@ public void run() { // Size assertEquals("resources In Hand(" + resourceInHand.get(key).get() + ") should be less than equal to pool size(" - + config.getPoolSize() + ")", + + config.getMaxPoolSize() + ")", true, - resourceInHand.get(key).get() <= config.getPoolSize()); + resourceInHand.get(key).get() <= config.getMaxPoolSize()); // do something doSomethingWithResource(key, resource); @@ -65,8 +65,8 @@ public void run() { } catch(TimeoutException e) { // only if alloted resources are same as pool size assertEquals("resources In Hand(" + resourceInHand.get(key).get() - + ") should be same as pool size(" + config.getPoolSize() - + ")", config.getPoolSize(), resourceInHand.get(key).get()); + + ") should be same as pool size(" + config.getMaxPoolSize() + + ")", config.getMaxPoolSize(), resourceInHand.get(key).get()); ++testStats.timeoutRequests; System.out.println("saw timeout !!"); return; diff --git a/test/integration/voldemort/socketpool/SimpleSocketPoolTest.java b/test/integration/voldemort/socketpool/SimpleSocketPoolTest.java index f3a8baef34..51337ac6f3 100644 --- a/test/integration/voldemort/socketpool/SimpleSocketPoolTest.java +++ b/test/integration/voldemort/socketpool/SimpleSocketPoolTest.java @@ -12,7 +12,7 @@ public class SimpleSocketPoolTest extends TestCase { public void testPoolLimitNoTimeout() throws Exception { final ResourcePoolConfig config = new ResourcePoolConfig().setTimeout(1000, TimeUnit.MILLISECONDS) - .setPoolSize(20); + .setMaxPoolSize(20); ResourceFactory factory = ResourcePoolTestUtils.getBasicPoolFactory(); final AbstractSocketPoolTest test = new AbstractSocketPoolTest() { @@ -36,7 +36,7 @@ protected String getRequestKey() throws Exception { public void testPoolLimitSomeTimeout() throws Exception { final ResourcePoolConfig config = new ResourcePoolConfig().setTimeout(50, TimeUnit.MILLISECONDS) - .setPoolSize(20); + .setMaxPoolSize(20); ResourceFactory factory = ResourcePoolTestUtils.getBasicPoolFactory(); final AbstractSocketPoolTest test = new AbstractSocketPoolTest() { @@ -60,7 +60,7 @@ protected String getRequestKey() throws Exception { public void testNoTimeout() throws Exception { final ResourcePoolConfig config = new ResourcePoolConfig().setTimeout(100, TimeUnit.MILLISECONDS) - .setPoolSize(20); + .setMaxPoolSize(20); ResourceFactory factory = ResourcePoolTestUtils.getBasicPoolFactory(); final AbstractSocketPoolTest test = new AbstractSocketPoolTest() { diff --git a/test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java b/test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java index 9f89e043e2..e4ce95f1d6 100644 --- a/test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java +++ b/test/unit/voldemort/utils/pool/KeyedResourcePoolTest.java @@ -1,7 +1,5 @@ package voldemort.utils.pool; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -13,6 +11,7 @@ public class KeyedResourcePoolTest extends TestCase { private static int POOL_SIZE = 5; private static long TIMEOUT_MS = 100; + private static int MAX_ATTEMPTS = 10; private TestResourceFactory factory; private KeyedResourcePool pool; @@ -20,9 +19,10 @@ public class KeyedResourcePoolTest extends TestCase { @Override public void setUp() { factory = new TestResourceFactory(); - ResourcePoolConfig config = new ResourcePoolConfig(); - config.setPoolSize(POOL_SIZE); - config.setTimeout(TIMEOUT_MS, TimeUnit.MILLISECONDS); + ResourcePoolConfig config = new ResourcePoolConfig().setMaxPoolSize(POOL_SIZE) + .setTimeout(TIMEOUT_MS, + TimeUnit.MILLISECONDS) + .setMaxInvalidAttempts(MAX_ATTEMPTS); this.pool = new KeyedResourcePool(factory, config); } @@ -46,6 +46,42 @@ public void testFullPoolBlocks() throws Exception { } } + public void testExceptions() throws Exception { + // we should start with an empty pool + assertEquals(0, this.pool.getTotalResourceCount()); + + Exception toThrow = new Exception("An exception!"); + + // test exception on destroy + TestResource checkedOut = this.pool.checkout("a"); + assertEquals(1, this.pool.getTotalResourceCount()); + assertEquals(0, this.pool.getCheckedInResourceCount()); + this.factory.setDestroyException(toThrow); + try { + this.pool.checkin("a", checkedOut); + // checking out again should force destroy + this.pool.checkout("a"); + assertTrue(checkedOut.isDestroyed()); + } catch(Exception caught) { + fail("No exception expected."); + } + assertEquals(1, this.pool.getTotalResourceCount()); + assertEquals(0, this.pool.getCheckedInResourceCount()); + + this.factory.setCreateException(toThrow); + try { + this.pool.checkout("b"); + fail("Excpected exception!"); + } catch(Exception caught) { + assertEquals("The exception thrown by the factory should propage to the caller.", + toThrow, + caught); + } + // failed checkout shouldn't effect count + assertEquals(1, this.pool.getTotalResourceCount()); + assertEquals(0, this.pool.getCheckedInResourceCount()); + } + public void testInvalidIsDestroyed() throws Exception { TestResource r1 = this.pool.checkout("a"); r1.invalidate(); @@ -55,45 +91,14 @@ public void testInvalidIsDestroyed() throws Exception { assertTrue("Invalid objects should be destroyed.", r1.isDestroyed()); } - public void testMultithreaded() throws Exception { - int numThreads = POOL_SIZE * 2; - final String[] keys = new String[numThreads * 2]; - for(int i = 0; i < keys.length; i++) - keys[i] = Integer.toString(i); - - final AtomicInteger totalExecutions = new AtomicInteger(0); - final AtomicInteger destroyed = new AtomicInteger(0); - final AtomicBoolean isStopped = new AtomicBoolean(false); - ExecutorService executor = Executors.newFixedThreadPool(numThreads); - for(int i = 0; i < numThreads; i++) { - executor.execute(new Runnable() { - - public void run() { - while(!isStopped.get()) { - int curr = totalExecutions.getAndIncrement(); - String key = keys[curr % keys.length]; - try { - TestResource r = pool.checkout(key); - assertTrue(r.isValid()); - if(curr % 10021 == 0) { - r.invalidate(); - destroyed.getAndIncrement(); - } - pool.checkin(key, r); - } catch(Exception e) { - fail("Unexpected exception: " + e); - } - } - } - }); - } - Thread.sleep(1000); - isStopped.set(true); - Thread.sleep(200); - executor.shutdownNow(); - assertTrue(executor.awaitTermination(100, TimeUnit.MILLISECONDS)); - pool.close(); - assertEquals(factory.getCreated(), factory.getDestroyed()); + public void testMaxInvalidCreations() throws Exception { + this.factory.setCreatedValid(false); + try { + this.pool.checkout("a"); + fail("Exceeded max failed attempts without exception."); + } catch(ExcessiveInvalidResourcesException e) { + // this is expected + } } private static class TestResource { @@ -143,13 +148,22 @@ private static class TestResourceFactory implements ResourceFactory