Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Addressed most of the feedback from the code review.

- Renamed many variables, methods & classes
- Addressed most of the TODOs in my changes based on the feedback
  • Loading branch information...
commit 4b05dcfb574605475cbcdca0cc51305c26d541e9 1 parent 714cd46
@jayjwylie jayjwylie authored
View
2  src/java/voldemort/client/TimeoutConfig.java
@@ -1,6 +1,6 @@
package voldemort.client;
-import voldemort.utils.OpTimeMap;
+import voldemort.common.OpTimeMap;
/**
* Encapsulates the timeouts, in ms, for various Voldemort operations
View
2  src/java/voldemort/client/protocol/admin/SocketPool.java
@@ -126,7 +126,7 @@ public void checkin(SocketDestination destination, SocketAndStreams socket) {
public void close(SocketDestination destination) {
socketFactory.setLastClosedTimestamp(destination);
- pool.close(destination);
+ pool.reset(destination);
}
/**
View
5 src/java/voldemort/utils/OpTimeMap.java → src/java/voldemort/common/OpTimeMap.java
@@ -1,11 +1,10 @@
-package voldemort.utils;
+package voldemort.common;
import java.util.HashMap;
-import voldemort.common.VoldemortOpCode;
/**
- * Encapsulates time to operation mapping
+ * Encapsulates time to Voldemort operation mapping
*
*/
public class OpTimeMap {
View
2  src/java/voldemort/server/VoldemortConfig.java
@@ -25,6 +25,7 @@
import voldemort.client.TimeoutConfig;
import voldemort.client.protocol.RequestFormatType;
import voldemort.cluster.failuredetector.FailureDetectorConfig;
+import voldemort.common.OpTimeMap;
import voldemort.common.VoldemortOpCode;
import voldemort.server.scheduler.slop.StreamingSlopPusherJob;
import voldemort.store.bdb.BdbStorageConfiguration;
@@ -34,7 +35,6 @@
import voldemort.store.readonly.BinarySearchStrategy;
import voldemort.store.readonly.ReadOnlyStorageConfiguration;
import voldemort.utils.ConfigurationException;
-import voldemort.utils.OpTimeMap;
import voldemort.utils.Props;
import voldemort.utils.Time;
import voldemort.utils.UndefinedPropertyException;
View
55 src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java
@@ -40,15 +40,15 @@
import voldemort.utils.JmxUtils;
import voldemort.utils.Time;
import voldemort.utils.Utils;
-import voldemort.utils.pool.KeyedResourcePool;
+import voldemort.utils.pool.AsyncResourceRequest;
import voldemort.utils.pool.QueuedKeyedResourcePool;
import voldemort.utils.pool.ResourcePoolConfig;
-import voldemort.utils.pool.ResourceRequest;
/**
* A pool of {@link ClientRequestExecutor} keyed off the
- * {@link SocketDestination}. This is a wrapper around {@link KeyedResourcePool}
- * that translates exceptions as well as providing some JMX access.
+ * {@link SocketDestination}. This is a wrapper around
+ * {@link QueuedKeyedResourcePool} that translates exceptions, provides some JMX
+ * access, and handles asynchronous requests for SocketDestinations.
*
* <p/>
*
@@ -58,7 +58,7 @@
public class ClientRequestExecutorPool implements SocketStoreFactory {
- private final QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
+ private final QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> queuedPool;
private final ClientRequestExecutorFactory factory;
private final ClientSocketStats stats;
private final boolean jmxEnabled;
@@ -96,10 +96,10 @@ public ClientRequestExecutorPool(int selectors,
socketBufferSize,
socketKeepAlive,
stats);
- this.pool = new QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor>(factory,
- config);
+ this.queuedPool = new QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor>(factory,
+ config);
if(stats != null) {
- this.stats.setPool(pool);
+ this.stats.setPool(queuedPool);
}
}
@@ -159,7 +159,7 @@ public ClientRequestExecutor checkout(SocketDestination destination) {
long start = System.nanoTime();
ClientRequestExecutor clientRequestExecutor;
try {
- clientRequestExecutor = pool.checkout(destination);
+ clientRequestExecutor = queuedPool.checkout(destination);
} catch(Exception e) {
throw new UnreachableStoreException("Failure while checking out socket for "
+ destination + ": ", e);
@@ -180,7 +180,7 @@ public ClientRequestExecutor checkout(SocketDestination destination) {
*/
public void checkin(SocketDestination destination, ClientRequestExecutor clientRequestExecutor) {
try {
- pool.checkin(destination, clientRequestExecutor);
+ queuedPool.checkin(destination, clientRequestExecutor);
} catch(Exception e) {
throw new VoldemortException("Failure while checking in socket for " + destination
+ ": ", e);
@@ -189,7 +189,7 @@ public void checkin(SocketDestination destination, ClientRequestExecutor clientR
public void close(SocketDestination destination) {
factory.setLastClosedTimestamp(destination);
- pool.close(destination);
+ queuedPool.reset(destination);
}
/**
@@ -207,7 +207,7 @@ public void close() {
stats.close();
}
factory.close();
- pool.close();
+ queuedPool.close();
}
public ClientSocketStats getStats() {
@@ -219,12 +219,12 @@ public ClientSocketStats getStats() {
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {
- AsyncRequest<T> asyncRequest = new AsyncRequest<T>(destination,
- delegate,
- callback,
- timeoutMs,
- operationName);
- pool.requestResource(destination, asyncRequest);
+ AsyncSocketDestinationRequest<T> asyncSocketDestinationRequest = new AsyncSocketDestinationRequest<T>(destination,
+ delegate,
+ callback,
+ timeoutMs,
+ operationName);
+ queuedPool.registerResourceRequest(destination, asyncSocketDestinationRequest);
return;
}
@@ -232,7 +232,8 @@ public ClientSocketStats getStats() {
* Wrap up an asynchronous request and actually issue it once a
* SocketDestination is checked out.
*/
- private class AsyncRequest<T> implements ResourceRequest<ClientRequestExecutor> {
+ private class AsyncSocketDestinationRequest<T> implements
+ AsyncResourceRequest<ClientRequestExecutor> {
private final SocketDestination destination;
public final ClientRequest<T> delegate;
@@ -242,11 +243,11 @@ public ClientSocketStats getStats() {
private final long startTimeNs;
- public AsyncRequest(SocketDestination destination,
- ClientRequest<T> delegate,
- NonblockingStoreCallback callback,
- long timeoutMs,
- String operationName) {
+ public AsyncSocketDestinationRequest(SocketDestination destination,
+ ClientRequest<T> delegate,
+ NonblockingStoreCallback callback,
+ long timeoutMs,
+ String operationName) {
this.destination = destination;
this.delegate = delegate;
this.callback = callback;
@@ -263,8 +264,10 @@ public void useResource(ClientRequestExecutor clientRequestExecutor) {
+ " requestRef: "
+ System.identityHashCode(delegate)
+ " time: "
- // TODO: output startTimeNs instead?
- + System.currentTimeMillis()
+ // Output time (ms) includes queueing delay (i.e.,
+ // time between when registerResourceRequest is
+ // called and time when useResource is invoked).
+ + (this.startTimeNs / Time.NS_PER_MS)
+ " server: "
+ clientRequestExecutor.getSocketChannel()
.socket()
View
9 src/java/voldemort/store/stats/ClientSocketStats.java
@@ -25,7 +25,7 @@
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.clientrequest.ClientRequestExecutor;
import voldemort.utils.JmxUtils;
-import voldemort.utils.pool.KeyedResourcePool;
+import voldemort.utils.pool.QueuedKeyedResourcePool;
/**
* Some convenient statistics to track about the client requests
@@ -37,7 +37,7 @@
private final ClientSocketStats parent;
private final ConcurrentMap<SocketDestination, ClientSocketStats> statsMap;
private final SocketDestination destination;
- private KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
+ private QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
private final AtomicInteger monitoringInterval = new AtomicInteger(10000);
private final Histogram checkoutTimeUsHistogram = new Histogram(20000, 100);
@@ -59,8 +59,7 @@
*/
public ClientSocketStats(ClientSocketStats parent,
SocketDestination destination,
- KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool,
- int jmxId) {
+ QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool, int jmxId) {
this.parent = parent;
this.statsMap = null;
this.destination = destination;
@@ -212,7 +211,7 @@ public int getMonitoringInterval() {
return this.monitoringInterval.get();
}
- public void setPool(KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool) {
+ public void setPool(QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool) {
this.pool = pool;
}
View
2  ...voldemort/utils/pool/ResourceRequest.java → ...mort/utils/pool/AsyncResourceRequest.java
@@ -5,7 +5,7 @@
* useResource, handleTimeout, or handleException expected to be invoked within
* deadline specified by getDeadlineNs.
*/
-public interface ResourceRequest<V> {
+public interface AsyncResourceRequest<V> {
/**
* To be invoked with resource to use.
View
21 src/java/voldemort/utils/pool/KeyedResourcePool.java
@@ -164,9 +164,11 @@ protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
protected Pool<V> getResourcePoolForKey(K key) {
Pool<V> resourcePool = resourcePoolMap.get(key);
if(resourcePool == null) {
- resourcePool = new Pool<V>(this.resourcePoolConfig);
- resourcePoolMap.putIfAbsent(key, resourcePool);
- resourcePool = resourcePoolMap.get(key);
+ Pool<V> newResourcePool = new Pool<V>(this.resourcePoolConfig);
+ resourcePool = resourcePoolMap.putIfAbsent(key, newResourcePool);
+ if(resourcePool == null) {
+ resourcePool = newResourcePool;
+ }
}
return resourcePool;
}
@@ -245,16 +247,15 @@ public void close() {
}
/**
- * "Close" a specific resource pool by destroying all the resources in the
- * pool. This method does not affect whether any pool is "open" in the sense
- * of permitting new resources to be added to it.
+ * Reset a specific resource pool. Destroys all the resources in the pool.
+ * This method does not affect whether the pool is "open" in the sense of
+ * permitting new resources to be added to it.
*
- * @param key The key for the pool to close.
+ * @param key The key for the pool to reset.
*/
- public void close(K key) {
+ public void reset(K key) {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
List<V> list = resourcePool.close();
- // destroy each resource currently in the pool
for(V value: list)
destroyResource(key, resourcePool, value);
}
@@ -362,8 +363,6 @@ public Pool(ResourcePoolConfig resourcePoolConfig) {
if(resource != null) {
if(!nonBlockingPut(resource)) {
this.size.decrementAndGet();
- // TODO: Do we need to destroy the non-null,
- // non-enqueued resource?
objectFactory.destroy(key, resource);
return false;
}
View
85 src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
@@ -35,11 +35,11 @@
private static final Logger logger = Logger.getLogger(QueuedKeyedResourcePool.class.getName());
- private final ConcurrentMap<K, Queue<ResourceRequest<V>>> requestQueueMap;
+ private final ConcurrentMap<K, Queue<AsyncResourceRequest<V>>> requestQueueMap;
public QueuedKeyedResourcePool(ResourceFactory<K, V> objectFactory, ResourcePoolConfig config) {
super(objectFactory, config);
- requestQueueMap = new ConcurrentHashMap<K, Queue<ResourceRequest<V>>>();
+ requestQueueMap = new ConcurrentHashMap<K, Queue<AsyncResourceRequest<V>>>();
}
/**
@@ -83,14 +83,12 @@ public QueuedKeyedResourcePool(ResourceFactory<K, V> objectFactory, ResourcePool
* @return The resource
*
*/
- public void requestResource(K key, ResourceRequest<V> resourceRequest) {
+ public void registerResourceRequest(K key, AsyncResourceRequest<V> resourceRequest) {
checkNotClosed();
- // Non-blocking checkout attempt iff requestQueue is empty. If
- // requestQueue is not empty and we attempted non-blocking checkout,
- // then FIFO at risk.
- Queue<ResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
+ Queue<AsyncResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
if(requestQueue.isEmpty()) {
+ // Attempt non-blocking checkout iff requestQueue is empty.
Pool<V> resourcePool = getResourcePoolForKey(key);
try {
attemptGrow(key, resourcePool);
@@ -123,8 +121,8 @@ public void requestResource(K key, ResourceRequest<V> resourceRequest) {
*
* @return null or a valid ResourceRequest
*/
- private ResourceRequest<V> getNextUnexpiredResourceRequest(Queue<ResourceRequest<V>> requestQueue) {
- ResourceRequest<V> resourceRequest = requestQueue.poll();
+ private AsyncResourceRequest<V> getNextUnexpiredResourceRequest(Queue<AsyncResourceRequest<V>> requestQueue) {
+ AsyncResourceRequest<V> resourceRequest = requestQueue.poll();
while(resourceRequest != null) {
if(resourceRequest.getDeadlineNs() < System.nanoTime()) {
resourceRequest.handleTimeout();
@@ -144,7 +142,7 @@ public void requestResource(K key, ResourceRequest<V> resourceRequest) {
* @return true iff an item was processed from the Queue.
*/
private boolean processQueue(K key) throws Exception {
- Queue<ResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
+ Queue<AsyncResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
if(requestQueue.isEmpty()) {
return false;
}
@@ -165,7 +163,7 @@ private boolean processQueue(K key) throws Exception {
}
// With resource in hand, process the resource requests
- ResourceRequest<V> resourceRequest = getNextUnexpiredResourceRequest(requestQueue);
+ AsyncResourceRequest<V> resourceRequest = getNextUnexpiredResourceRequest(requestQueue);
if(resourceRequest == null) {
// Did not use the resource! Directly check in via super to avoid
// circular call to processQueue().
@@ -194,7 +192,7 @@ public void checkin(K key, V resource) throws Exception {
/*
* A safe wrapper to destroy the given resource request.
*/
- protected void destroyRequest(ResourceRequest<V> resourceRequest) {
+ protected void destroyRequest(AsyncResourceRequest<V> resourceRequest) {
if(resourceRequest != null) {
try {
Exception e = new UnreachableStoreException("Resource request destroyed before resource checked out.");
@@ -211,8 +209,8 @@ protected void destroyRequest(ResourceRequest<V> resourceRequest) {
* @param requestQueue The queue for which all resource requests are to be
* destroyed.
*/
- private void destroyRequestQueue(Queue<ResourceRequest<V>> requestQueue) {
- ResourceRequest<V> resourceRequest = requestQueue.poll();
+ private void destroyRequestQueue(Queue<AsyncResourceRequest<V>> requestQueue) {
+ AsyncResourceRequest<V> resourceRequest = requestQueue.poll();
while(resourceRequest != null) {
destroyRequest(resourceRequest);
resourceRequest = requestQueue.poll();
@@ -224,8 +222,8 @@ protected boolean internalClose() {
// wasOpen ensures only one thread destroys everything.
boolean wasOpen = super.internalClose();
if(wasOpen) {
- for(Entry<K, Queue<ResourceRequest<V>>> entry: requestQueueMap.entrySet()) {
- Queue<ResourceRequest<V>> requestQueue = entry.getValue();
+ for(Entry<K, Queue<AsyncResourceRequest<V>>> entry: requestQueueMap.entrySet()) {
+ Queue<AsyncResourceRequest<V>> requestQueue = entry.getValue();
destroyRequestQueue(requestQueue);
requestQueueMap.remove(entry.getKey());
}
@@ -242,46 +240,35 @@ public void close() {
}
/**
- * "Close" a specific resource pool and request queue by destroying all the
- * resources in the pool and all the requests in the queue. This method does
- * not affect whether any pool or queue is "open" in the sense of permitting
- * new resources to be added or requests to be enqueued.
+ * Reset a specific resource pool and resource request queue. First,
+ * "destroy" all registered resource requests. Second, destroy all resources
+ * in the pool.
*
- * @param key The key for the pool to close.
+ * @param key The key for the pool to reset.
*/
@Override
- public void close(K key) {
- // TODO: The close method in the super class is not documented at all.
- // super.close(key) is called by ClientRequestExecutorPool.close which
- // is called by SocketStoreclientFactory. Given the super class does not
- // set any closed bit, unclear what the semantics of this.close(key)
- // ought to be.
- //
- // Also, super.close(key) does nothing to protect against multiple
- // threads accessing the method at the same time. And, super.close(key)
- // does not remove the affected pool from super.resourcePoolMap. The
- // semantics of super.close(key) are truly unclear.
-
- // Destroy enqueued resource requests (if any exist) first.
- Queue<ResourceRequest<V>> requestQueue = requestQueueMap.get(key);
+ public void reset(K key) {
+ // First, destroy enqueued resource requests (if any exist).
+ Queue<AsyncResourceRequest<V>> requestQueue = requestQueueMap.get(key);
if(requestQueue != null) {
destroyRequestQueue(requestQueue);
- // TODO: requestQueueMap.remove(entry.getKey()); ?
}
- // Destroy resources in the pool second.
- super.close(key);
+ // Second, destroy resources in the pool.
+ super.reset(key);
}
/*
* Get the queue of work for the given key. If no queue exists, create one.
*/
- protected Queue<ResourceRequest<V>> getRequestQueueForKey(K key) {
- Queue<ResourceRequest<V>> requestQueue = requestQueueMap.get(key);
+ protected Queue<AsyncResourceRequest<V>> getRequestQueueForKey(K key) {
+ Queue<AsyncResourceRequest<V>> requestQueue = requestQueueMap.get(key);
if(requestQueue == null) {
- requestQueue = new ConcurrentLinkedQueue<ResourceRequest<V>>();
- requestQueueMap.putIfAbsent(key, requestQueue);
- requestQueue = requestQueueMap.get(key);
+ Queue<AsyncResourceRequest<V>> newRequestQueue = new ConcurrentLinkedQueue<AsyncResourceRequest<V>>();
+ requestQueue = requestQueueMap.putIfAbsent(key, newRequestQueue);
+ if(requestQueue == null) {
+ requestQueue = newRequestQueue;
+ }
}
return requestQueue;
}
@@ -289,8 +276,8 @@ public void close(K key) {
/*
* Get the pool for the given key. If no pool exists, throw an exception.
*/
- protected Queue<ResourceRequest<V>> getRequestQueueForExistingKey(K key) {
- Queue<ResourceRequest<V>> requestQueue = requestQueueMap.get(key);
+ protected Queue<AsyncResourceRequest<V>> getRequestQueueForExistingKey(K key) {
+ Queue<AsyncResourceRequest<V>> requestQueue = requestQueueMap.get(key);
if(requestQueue == null) {
throw new IllegalArgumentException("Invalid key '" + key
+ "': no request queue exists for that key.");
@@ -304,8 +291,8 @@ public void close(K key) {
* @param key The key
* @return The count
*/
- public int getQueuedResourceRequestCount(K key) {
- Queue<ResourceRequest<V>> requestQueue = getRequestQueueForExistingKey(key);
+ public int getRegisteredResourceRequestCount(K key) {
+ Queue<AsyncResourceRequest<V>> requestQueue = getRequestQueueForExistingKey(key);
// FYI: .size() is not constant time in the next call. ;)
return requestQueue.size();
}
@@ -315,9 +302,9 @@ public int getQueuedResourceRequestCount(K key) {
*
* @return The count of resources
*/
- public int getQueuedResourceRequestCount() {
+ public int getRegisteredResourceRequestCount() {
int count = 0;
- for(Entry<K, Queue<ResourceRequest<V>>> entry: this.requestQueueMap.entrySet()) {
+ for(Entry<K, Queue<AsyncResourceRequest<V>>> entry: this.requestQueueMap.entrySet()) {
// FYI: .size() is not constant time in the next call. ;)
count += entry.getValue().size();
}
View
2  test/integration/voldemort/store/slow/SlowStorageConfiguration.java
@@ -16,12 +16,12 @@
package voldemort.store.slow;
import voldemort.VoldemortException;
+import voldemort.common.OpTimeMap;
import voldemort.server.VoldemortConfig;
import voldemort.store.StorageConfiguration;
import voldemort.store.StorageEngine;
import voldemort.store.StoreDefinition;
import voldemort.utils.ByteArray;
-import voldemort.utils.OpTimeMap;
/**
* A storage engine that wraps InMemoryStorageEngine with delays.
View
2  test/integration/voldemort/store/slow/SlowStorageEngine.java
@@ -21,12 +21,12 @@
import java.util.concurrent.TimeUnit;
import voldemort.VoldemortException;
+import voldemort.common.OpTimeMap;
import voldemort.common.VoldemortOpCode;
import voldemort.store.StorageEngine;
import voldemort.store.StoreCapabilityType;
import voldemort.store.memory.InMemoryStorageEngine;
import voldemort.utils.ClosableIterator;
-import voldemort.utils.OpTimeMap;
import voldemort.utils.Pair;
import voldemort.versioning.Version;
import voldemort.versioning.Versioned;
View
2  test/unit/voldemort/store/memory/SlowStorageEngineTest.java
@@ -26,13 +26,13 @@
import org.apache.log4j.Logger;
import voldemort.TestUtils;
+import voldemort.common.OpTimeMap;
import voldemort.common.VoldemortOpCode;
import voldemort.store.AbstractStorageEngineTest;
import voldemort.store.StorageEngine;
import voldemort.store.slow.SlowStorageEngine;
import voldemort.utils.ByteArray;
import voldemort.utils.ByteUtils;
-import voldemort.utils.OpTimeMap;
import voldemort.utils.pool.KeyedResourcePool;
import voldemort.versioning.ObsoleteVersionException;
import voldemort.versioning.VectorClock;
View
4 test/unit/voldemort/store/stats/ClientSocketStatsTest.java
@@ -26,7 +26,7 @@
import voldemort.client.protocol.RequestFormatType;
import voldemort.store.socket.SocketDestination;
import voldemort.store.socket.clientrequest.ClientRequestExecutor;
-import voldemort.utils.pool.KeyedResourcePool;
+import voldemort.utils.pool.QueuedKeyedResourcePool;
public class ClientSocketStatsTest {
@@ -34,7 +34,7 @@
private int port;
private SocketDestination dest1;
private SocketDestination dest2;
- private KeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
+ private QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;
@Before
public void setUp() throws Exception {
View
88 test/unit/voldemort/utils/pool/QueuedKeyedResourcePoolTest.java
@@ -47,31 +47,31 @@ public void testQueuingOccurs() throws Exception {
assertEquals(0, this.factory.getCreated());
assertEquals(0, this.queuedPool.getTotalResourceCount());
assertEquals(0, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(0, resources.size());
// Submit initial POOL_SIZE requests
for(int i = 0; i < POOL_SIZE; i++) {
- this.queuedPool.requestResource("a", resourceRequests.poll());
+ this.queuedPool.registerResourceRequest("a", resourceRequests.poll());
}
// Confirm initial requests were handled in nonblocking manner
assertEquals(POOL_SIZE, this.factory.getCreated());
assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
assertEquals(0, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(POOL_SIZE, resources.size());
// Submit additional POOL_SIZE requests
for(int i = 0; i < POOL_SIZE; i++) {
- this.queuedPool.requestResource("a", resourceRequests.poll());
+ this.queuedPool.registerResourceRequest("a", resourceRequests.poll());
}
// Confirm additional requests are queued
assertEquals(POOL_SIZE, this.factory.getCreated());
assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
assertEquals(0, this.queuedPool.getCheckedInResourceCount());
- assertEquals(POOL_SIZE, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(POOL_SIZE, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(POOL_SIZE, resources.size());
// Check in initial resources and confirm that this consumes queued
@@ -82,7 +82,7 @@ public void testQueuingOccurs() throws Exception {
assertEquals(POOL_SIZE, this.factory.getCreated());
assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
assertEquals(0, this.queuedPool.getCheckedInResourceCount());
- assertEquals(POOL_SIZE - i - 1, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(POOL_SIZE - i - 1, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(POOL_SIZE, resources.size());
}
@@ -93,14 +93,14 @@ public void testQueuingOccurs() throws Exception {
assertEquals(POOL_SIZE, this.factory.getCreated());
assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
assertEquals(i + 1, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(POOL_SIZE - i - 1, resources.size());
}
assertEquals(POOL_SIZE, this.factory.getCreated());
assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
assertEquals(POOL_SIZE, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(0, resources.size());
assertEquals(POOL_SIZE * 2, TestResourceRequest.usedResourceCount.get());
@@ -109,6 +109,53 @@ public void testQueuingOccurs() throws Exception {
}
@Test
+ public void testQueuingStats() throws Exception {
+ Queue<TestResource> resources = new LinkedList<TestResource>();
+ Queue<TestResourceRequest> resourceRequests = new LinkedList<TestResourceRequest>();
+
+ long deadlineNs = System.nanoTime()
+ + TimeUnit.MILLISECONDS.toNanos(KeyedResourcePoolTest.TIMEOUT_MS);
+ for(int i = 0; i < POOL_SIZE * 10001; i++) {
+ resourceRequests.add(new TestResourceRequest(deadlineNs, resources));
+ }
+
+ assertEquals(0, this.factory.getCreated());
+ assertEquals(0, this.queuedPool.getTotalResourceCount());
+ assertEquals(0, this.queuedPool.getCheckedInResourceCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
+ assertEquals(0, resources.size());
+
+ // Submit initial POOL_SIZE requests
+ for(int i = 0; i < POOL_SIZE; i++) {
+ this.queuedPool.registerResourceRequest("a", resourceRequests.poll());
+ }
+
+ // Confirm initial requests were handled in nonblocking manner
+ assertEquals(POOL_SIZE, this.factory.getCreated());
+ assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
+ assertEquals(0, this.queuedPool.getCheckedInResourceCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
+ assertEquals(POOL_SIZE, resources.size());
+
+ // Register five order of magnitude more resource requests.
+ for(int i = 0; i < POOL_SIZE * 10000; i++) {
+ this.queuedPool.registerResourceRequest("a", resourceRequests.poll());
+ }
+
+ long startNs = System.nanoTime();
+ assertEquals(POOL_SIZE * 10000, this.queuedPool.getRegisteredResourceRequestCount());
+ assertTrue("O(n) count of queue is too slow",
+ System.nanoTime() - startNs < TimeUnit.MILLISECONDS.toNanos(10));
+
+ // Confirm additional requests are queued
+ assertEquals(POOL_SIZE, this.factory.getCreated());
+ assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
+ assertEquals(0, this.queuedPool.getCheckedInResourceCount());
+ assertEquals(POOL_SIZE, resources.size());
+
+ }
+
+ @Test
public void testTimeoutInQueue() throws Exception {
Queue<TestResource> resources = new LinkedList<TestResource>();
Queue<TestResourceRequest> resourceRequests = new LinkedList<TestResourceRequest>();
@@ -122,17 +169,17 @@ public void testTimeoutInQueue() throws Exception {
assertEquals(0, this.factory.getCreated());
assertEquals(0, this.queuedPool.getTotalResourceCount());
assertEquals(0, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(0, resources.size());
// Submit initial POOL_SIZE requests
for(int i = 0; i < POOL_SIZE; i++) {
- this.queuedPool.requestResource("a", resourceRequests.poll());
+ this.queuedPool.registerResourceRequest("a", resourceRequests.poll());
}
// Submit additional POOL_SIZE requests that queue up.
for(int i = 0; i < POOL_SIZE; i++) {
- this.queuedPool.requestResource("a", resourceRequests.poll());
+ this.queuedPool.registerResourceRequest("a", resourceRequests.poll());
}
// Force deadline (timeout) to expire
@@ -144,7 +191,7 @@ public void testTimeoutInQueue() throws Exception {
assertEquals(POOL_SIZE, this.factory.getCreated());
assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
assertEquals(1, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(POOL_SIZE - 1, resources.size());
assertEquals(POOL_SIZE, TestResourceRequest.usedResourceCount.get());
@@ -160,7 +207,7 @@ public void testTimeoutInQueue() throws Exception {
assertEquals(POOL_SIZE, this.factory.getCreated());
assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
assertEquals(POOL_SIZE, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(0, resources.size());
assertEquals(POOL_SIZE, TestResourceRequest.usedResourceCount.get());
@@ -182,15 +229,15 @@ public void testExceptionInQueue() throws Exception {
assertEquals(0, this.factory.getCreated());
assertEquals(0, this.queuedPool.getTotalResourceCount());
assertEquals(0, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(0, resources.size());
- this.queuedPool.requestResource("a", trr);
+ this.queuedPool.registerResourceRequest("a", trr);
assertEquals(0, this.factory.getCreated());
assertEquals(0, this.queuedPool.getTotalResourceCount());
assertEquals(0, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(0, resources.size());
assertEquals(0, TestResourceRequest.usedResourceCount.get());
@@ -224,7 +271,7 @@ public void contendForQueue() throws Exception {
waitForEnqueuers.await();
assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
assertEquals(POOL_SIZE, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(numEnqueuers * numEnqueues, TestResourceRequest.usedResourceCount.get());
assertEquals(0, TestResourceRequest.handledTimeoutCount.get());
@@ -270,7 +317,7 @@ public void contendForQueueAndPool() throws Exception {
waitForThreadsEnd.await();
assertEquals(POOL_SIZE, this.queuedPool.getTotalResourceCount());
assertEquals(POOL_SIZE, this.queuedPool.getCheckedInResourceCount());
- assertEquals(0, this.queuedPool.getQueuedResourceRequestCount());
+ assertEquals(0, this.queuedPool.getRegisteredResourceRequestCount());
assertEquals(numEnqueuers * numEnqueues, TestResourceRequest.usedResourceCount.get());
assertEquals(0, TestResourceRequest.handledTimeoutCount.get());
@@ -343,7 +390,8 @@ public void run() {
long deadlineNs = System.nanoTime()
+ TimeUnit.MILLISECONDS.toNanos(config.getTimeout(TimeUnit.NANOSECONDS));
- queuedPool.requestResource(key, new TestResourceRequest(deadlineNs, resources));
+ queuedPool.registerResourceRequest(key, new TestResourceRequest(deadlineNs,
+ resources));
Thread.yield();
processAtMostOneEnqueuedResource();
@@ -359,7 +407,7 @@ public void run() {
}
}
- protected static class TestResourceRequest implements ResourceRequest<TestResource> {
+ protected static class TestResourceRequest implements AsyncResourceRequest<TestResource> {
private AtomicBoolean usedResource;
private AtomicBoolean handledTimeout;
Please sign in to comment.
Something went wrong with that request. Please try again.