Permalink
Browse files

Implementations of various async vs sync queueing policies for socket…

… checkout. Two commented out policies are included in this commit.

src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java
- a few TODOs to be investigated before completing work on async checkouts

/src/java/voldemort/utils/pool/AsyncResourceRequest.java
- a couple helper methods for implementing/debugging queueing policies

src/java/voldemort/utils/pool/KeyedResourcePool.java
- refactor to clean up checkin method
- TODOs for further code cleanup
- cleaned up all methods for tracking stats, added stats tracking of length of synchronous queue
- various aspects of (commented out) socket checkout queuing policies

src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
- fixes to async socket checkout
- various aspects of (commented out) socket checkout queuing policies
- TODOs for further code cleanup
- cleaned up stats tracking for async queue length

*Test.java
- minor tweaks/cleanup
  • Loading branch information...
1 parent 8f7c4a7 commit f1cd8ef0883172d3eecab81b82cc72adef6e21bb @jayjwylie jayjwylie committed Oct 4, 2012
@@ -187,6 +187,24 @@ public void checkin(SocketDestination destination, ClientRequestExecutor clientR
}
}
+ /**
+ * This method is useful for printing out QueuedKeyedResourcePool statistics
+ * if debugging issues with the underlying QueuedKeyedResourcePool and
+ * KeyedResourcePool.
+ *
+ * @param tag A tag to be printed out in debugger output.
+ * @param destination The socket destination to print pool statistics for.
+ */
+ private void printPoolStats(String tag, SocketDestination destination) {
+ if(logger.isDebugEnabled()) {
+ logger.debug("CREP::" + tag + " : " + destination.toString() + " --- AQ QLen = "
+ + queuedPool.getRegisteredResourceRequestCount(destination)
+ + " --- SQ QLen = " + queuedPool.getBlockingGetsCount(destination)
+ + ", ChkIn = " + queuedPool.getCheckedInResourcesCount(destination)
+ + ", Tot = " + queuedPool.getTotalResourceCount(destination));
+ }
+ }
+
public void close(SocketDestination destination) {
factory.setLastClosedTimestamp(destination);
queuedPool.reset(destination);
@@ -219,6 +237,7 @@ public ClientSocketStats getStats() {
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {
+
AsyncSocketDestinationRequest<T> asyncSocketDestinationRequest = new AsyncSocketDestinationRequest<T>(destination,
delegate,
callback,
@@ -242,6 +261,7 @@ public ClientSocketStats getStats() {
public final String operationName;
private final long startTimeNs;
+ private final long startTimeMs;
public AsyncSocketDestinationRequest(SocketDestination destination,
ClientRequest<T> delegate,
@@ -255,6 +275,7 @@ public AsyncSocketDestinationRequest(SocketDestination destination,
this.operationName = operationName;
this.startTimeNs = System.nanoTime();
+ this.startTimeMs = System.currentTimeMillis();
}
public void useResource(ClientRequestExecutor clientRequestExecutor) {
@@ -295,6 +316,8 @@ public void handleException(Exception e) {
e = new UnreachableStoreException("Failure in " + operationName + ": "
+ e.getMessage(), e);
try {
+ // TODO: when can callback end up being null? HAs something to
+ // do with destroying resources. --JJW
callback.requestComplete(e, 0);
} catch(Exception ex) {
if(logger.isEnabledFor(Level.WARN))
@@ -305,6 +328,14 @@ public void handleException(Exception e) {
public long getDeadlineNs() {
return startTimeNs + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
}
+
+ public long getStartTimeNs() {
+ return startTimeNs;
+ }
+
+ public long getStartTimeMs() {
+ return startTimeMs;
+ }
}
private class NonblockingStoreCallbackClientRequest<T> implements ClientRequest<T> {
@@ -367,6 +398,8 @@ public void complete() {
} catch(Exception e) {
invokeCallback(e, (System.nanoTime() - startNs) / Time.NS_PER_MS);
} finally {
+ // TODO: checkin can throw an exception. should "iscomplete" be
+ // set before the call to checkin?
checkin(destination, clientRequestExecutor);
isComplete = true;
}
@@ -32,4 +32,8 @@
* invoked.
*/
long getDeadlineNs();
+
+ long getStartTimeNs();
+
+ long getStartTimeMs();
}
@@ -11,6 +11,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
@@ -106,6 +107,7 @@ public KeyedResourcePool(ResourceFactory<K, V> objectFactory,
public V checkout(K key) throws Exception {
checkNotClosed();
+ // TODO: clean up nanotime BS
long startNs = System.nanoTime();
Pool<V> resourcePool = getResourcePoolForKey(key);
// Always attempt to grow. This protects against running out of
@@ -121,7 +123,7 @@ public V checkout(K key) throws Exception {
long timeRemainingNs = resourcePoolConfig.getTimeout(TimeUnit.NANOSECONDS)
- (System.nanoTime() - startNs);
if(timeRemainingNs > 0)
- resource = resourcePool.blockingGet(timeRemainingNs);
+ resource = resourcePool.blockingGet(timeRemainingNs, System.currentTimeMillis());
if(resource == null)
throw new TimeoutException("Could not acquire resource in "
@@ -189,12 +191,14 @@ protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
* A safe wrapper to destroy the given resource that catches any user
* exceptions.
*/
+ // TODO: If this method is called multiple times on the same resource, the
+ // resourcePool size gets decremented too many times and badness ensues.
protected void destroyResource(K key, Pool<V> resourcePool, V resource) {
if(resource != null) {
try {
objectFactory.destroy(key, resource);
} catch(Exception e) {
- logger.error("Exception while destroying invalid resource:", e);
+ logger.error("Exception while destroying invalid resource: ", e);
} finally {
// Assumes destroyed resource was in fact checked out of the
// pool.
@@ -210,15 +214,25 @@ protected void destroyResource(K key, Pool<V> resourcePool, V resource) {
* @param resource The resource
*/
public void checkin(K key, V resource) throws Exception {
- Pool<V> resourcePool = getResourcePoolForExistingKey(key);
- if(isOpen.get() && objectFactory.validate(key, resource)) {
+ if(isOpenAndValid(key, resource)) {
+ Pool<V> resourcePool = getResourcePoolForExistingKey(key);
boolean success = resourcePool.nonBlockingPut(resource);
if(!success) {
destroyResource(key, resourcePool, resource);
- throw new IllegalStateException("Checkin failed. Is the pool already full?");
+ throw new IllegalStateException("Checkin failed. Is the pool already full? (NB: see if KeyedResourcePool::destroyResource is being called multiple times.)");
}
+ }
+ }
+
+ // This method may be made protected in the future for the benefit of
+ // classes which extend from KeyedResourcePool.
+ protected boolean isOpenAndValid(K key, V resource) throws Exception {
+ if(isOpen.get() && objectFactory.validate(key, resource)) {
+ return true;
} else {
+ Pool<V> resourcePool = getResourcePoolForExistingKey(key);
destroyResource(key, resourcePool, resource);
+ return false;
}
}
@@ -261,54 +275,107 @@ public void reset(K key) {
}
/**
- * Return the total number of resources for the given key whether they are
- * currently checked in or checked out.
+ * Count the number of existing resources for a specific pool.
*
* @param key The key
- * @return The count
+ * @return The count of existing resources. Returns 0 if no pool exists for
+ * given key.
*/
public int getTotalResourceCount(K key) {
- Pool<V> resourcePool = getResourcePoolForExistingKey(key);
- return resourcePool.size.get();
+ int rc = 0;
+ if(!resourcePoolMap.containsKey(key)) {
+ return rc;
+ }
+ try {
+ Pool<V> resourcePool = getResourcePoolForExistingKey(key);
+ rc = resourcePool.size.get();
+ } catch(IllegalArgumentException iae) {
+ logger.debug("getTotalResourceCount called on invalid key: ", iae);
+ }
+ return rc;
}
/**
- * Get the count of all resources for all pools.
+ * Count the total number of existing resources for all pools. The result is
+ * "approximate" in the face of concurrency since individual pools can
+ * change size during the aggregate count.
*
- * @return The count of resources.
+ * @return The (approximate) aggregate count of existing resources.
*/
public int getTotalResourceCount() {
int count = 0;
for(Entry<K, Pool<V>> entry: this.resourcePoolMap.entrySet())
count += entry.getValue().size.get();
- // count is approximate in the case of concurrency since .size.get() for
- // various entries can change while other entries are being counted.
return count;
}
/**
- * Return the number of resources for the given key that are currently
- * sitting idle in the pool waiting to be checked out.
+ * Count the number of checked in (idle) resources for a specific pool.
*
* @param key The key
- * @return The count
+ * @return The count of checked in resources. Returns 0 if no pool exists
+ * for given key.
*/
public int getCheckedInResourcesCount(K key) {
- Pool<V> resourcePool = getResourcePoolForExistingKey(key);
- return resourcePool.queue.size();
+ int rc = 0;
+ if(!resourcePoolMap.containsKey(key)) {
+ return rc;
+ }
+ try {
+ Pool<V> resourcePool = getResourcePoolForExistingKey(key);
+ rc = resourcePool.queue.size();
+ } catch(IllegalArgumentException iae) {
+ logger.debug("getCheckedInResourceCount called on invalid key: ", iae);
+ }
+ return rc;
}
/**
- * Get the count of resources for all pools currently checkedin
+ * Count the total number of checked in (idle) resources across all pools.
+ * The result is "approximate" in the face of concurrency since individual
+ * pools can have resources checked in, or out, during the aggregate count.
*
- * @return The count of resources
+ * @return The (approximate) aggregate count of checked in resources.
*/
public int getCheckedInResourceCount() {
int count = 0;
for(Entry<K, Pool<V>> entry: this.resourcePoolMap.entrySet())
count += entry.getValue().queue.size();
- // count is approximate in the case of concurrency since .queue.size()
- // for various entries can change while other entries are being counted.
+ return count;
+ }
+
+ /**
+ * Count the number of blocking gets for a specific key.
+ *
+ * @param key The key
+ * @return The count of blocking gets. Returns 0 if no pool exists for given
+ * key.
+ */
+ public int getBlockingGetsCount(K key) {
+ int rc = 0;
+ if(!resourcePoolMap.containsKey(key)) {
+ return rc;
+ }
+ try {
+ Pool<V> resourcePool = getResourcePoolForExistingKey(key);
+ rc = resourcePool.blockingGets.get();
+ } catch(IllegalArgumentException iae) {
+ logger.debug("getBlockingGetsCount called on invalid key: ", iae);
+ }
+ return rc;
+ }
+
+ /**
+ * Count the total number of blocking gets across all pools. The result is
+ * "approximate" in the face of concurrency since blocking gets for
+ * individual pools can be issued or serviced during the aggregate count.
+ *
+ * @return The (approximate) aggregate count of blocking gets.
+ */
+ public int getBlockingGetsCount() {
+ int count = 0;
+ for(Entry<K, Pool<V>> entry: this.resourcePoolMap.entrySet())
+ count += entry.getValue().blockingGets.get();
return count;
}
@@ -321,6 +388,24 @@ protected void checkNotClosed() {
throw new IllegalStateException("Pool is closed!");
}
+ // OPTION I: Attempt to provide FIFO between sync and async requests.
+ /*-
+ protected long getLastTimeMs(K key) {
+ long rc = 0;
+ if(!resourcePoolMap.containsKey(key)) {
+ return rc;
+ }
+ try {
+ Pool<V> resourcePool = getResourcePoolForExistingKey(key);
+ rc = resourcePool.lastTimeMs.get();
+ } catch(IllegalArgumentException iae) {
+ logger.debug("getBlockingGetsCount called on invalid key: ", iae);
+ }
+ return rc;
+ }
+
+ // */
+
/**
* A fixed size pool that uses an ArrayBlockingQueue. The pool grows to no
* more than some specified maxPoolSize. The pool creates new resources in
@@ -330,6 +415,8 @@ protected void checkNotClosed() {
protected static class Pool<V> {
final private AtomicInteger size = new AtomicInteger(0);
+ final private AtomicInteger blockingGets = new AtomicInteger(0);
+ private final AtomicLong lastTimeMs = new AtomicLong(0);
final private int maxPoolSize;
final private BlockingQueue<V> queue;
@@ -357,6 +444,7 @@ public Pool(ResourcePoolConfig resourcePoolConfig) {
if(this.size.get() >= this.maxPoolSize) {
return false;
}
+
if(this.size.incrementAndGet() <= this.maxPoolSize) {
try {
V resource = objectFactory.create(key);
@@ -382,8 +470,18 @@ public V nonBlockingGet() {
return this.queue.poll();
}
- public V blockingGet(long timeoutNs) throws InterruptedException {
- return this.queue.poll(timeoutNs, TimeUnit.NANOSECONDS);
+ public V blockingGet(long timeoutNs, long startTimeMs) throws InterruptedException {
+ V v;
+ try {
+ blockingGets.incrementAndGet();
+ v = this.queue.poll(timeoutNs, TimeUnit.NANOSECONDS);
+ // OPTION I: Attempt to provide FIFO between sync and async
+ // requests.
+ // updateLastTimeMs(startTimeMs);
+ } finally {
+ blockingGets.decrementAndGet();
+ }
+ return v;
}
public boolean nonBlockingPut(V v) {
@@ -396,5 +494,18 @@ public boolean nonBlockingPut(V v) {
return list;
}
+ // OPTION I: Attempt to provide FIFO between sync and async requests.
+ /*-
+ private void updateLastTimeMs(long startTimeMs) {
+ long curLastTimeMs = lastTimeMs.get();
+ while(startTimeMs > curLastTimeMs) {
+ if(lastTimeMs.compareAndSet(curLastTimeMs, startTimeMs)) {
+ break;
+ }
+ curLastTimeMs = lastTimeMs.get();
+ }
+ }
+ // */
+
}
}
Oops, something went wrong.

0 comments on commit f1cd8ef

Please sign in to comment.