Skip to content

Commit

Permalink
Implementations of various async vs sync queueing policies for socket…
Browse files Browse the repository at this point in the history
… checkout. Two commented out policies are included in this commit.

src/java/voldemort/store/socket/clientrequest/ClientRequestExecutorPool.java
- a few TODOs to be investigated before completing work on async checkouts

/src/java/voldemort/utils/pool/AsyncResourceRequest.java
- a couple helper methods for implementing/debugging queueing policies

src/java/voldemort/utils/pool/KeyedResourcePool.java
- refactor to clean up checkin method
- TODOs for further code cleanup
- cleaned up all methods for tracking stats, added stats tracking of length of synchronous queue
- various aspects of (commented out) socket checkout queuing policies

src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
- fixes to async socket checkout
- various aspects of (commented out) socket checkout queuing policies
- TODOs for further code cleanup
- cleaned up stats tracking for async queue length

*Test.java
- minor tweaks/cleanup
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent 8f7c4a7 commit f1cd8ef
Show file tree
Hide file tree
Showing 7 changed files with 290 additions and 43 deletions.
Expand Up @@ -187,6 +187,24 @@ public void checkin(SocketDestination destination, ClientRequestExecutor clientR
}
}

/**
* This method is useful for printing out QueuedKeyedResourcePool statistics
* if debugging issues with the underlying QueuedKeyedResourcePool and
* KeyedResourcePool.
*
* @param tag A tag to be printed out in debugger output.
* @param destination The socket destination to print pool statistics for.
*/
private void printPoolStats(String tag, SocketDestination destination) {
if(logger.isDebugEnabled()) {
logger.debug("CREP::" + tag + " : " + destination.toString() + " --- AQ QLen = "
+ queuedPool.getRegisteredResourceRequestCount(destination)
+ " --- SQ QLen = " + queuedPool.getBlockingGetsCount(destination)
+ ", ChkIn = " + queuedPool.getCheckedInResourcesCount(destination)
+ ", Tot = " + queuedPool.getTotalResourceCount(destination));
}
}

public void close(SocketDestination destination) {
factory.setLastClosedTimestamp(destination);
queuedPool.reset(destination);
Expand Down Expand Up @@ -219,6 +237,7 @@ public <T> void submitAsync(SocketDestination destination,
NonblockingStoreCallback callback,
long timeoutMs,
String operationName) {

AsyncSocketDestinationRequest<T> asyncSocketDestinationRequest = new AsyncSocketDestinationRequest<T>(destination,
delegate,
callback,
Expand All @@ -242,6 +261,7 @@ private class AsyncSocketDestinationRequest<T> implements
public final String operationName;

private final long startTimeNs;
private final long startTimeMs;

public AsyncSocketDestinationRequest(SocketDestination destination,
ClientRequest<T> delegate,
Expand All @@ -255,6 +275,7 @@ public AsyncSocketDestinationRequest(SocketDestination destination,
this.operationName = operationName;

this.startTimeNs = System.nanoTime();
this.startTimeMs = System.currentTimeMillis();
}

public void useResource(ClientRequestExecutor clientRequestExecutor) {
Expand Down Expand Up @@ -295,6 +316,8 @@ public void handleException(Exception e) {
e = new UnreachableStoreException("Failure in " + operationName + ": "
+ e.getMessage(), e);
try {
// TODO: when can callback end up being null? HAs something to
// do with destroying resources. --JJW
callback.requestComplete(e, 0);
} catch(Exception ex) {
if(logger.isEnabledFor(Level.WARN))
Expand All @@ -305,6 +328,14 @@ public void handleException(Exception e) {
public long getDeadlineNs() {
return startTimeNs + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
}

public long getStartTimeNs() {
return startTimeNs;
}

public long getStartTimeMs() {
return startTimeMs;
}
}

private class NonblockingStoreCallbackClientRequest<T> implements ClientRequest<T> {
Expand Down Expand Up @@ -367,6 +398,8 @@ public void complete() {
} catch(Exception e) {
invokeCallback(e, (System.nanoTime() - startNs) / Time.NS_PER_MS);
} finally {
// TODO: checkin can throw an exception. should "iscomplete" be
// set before the call to checkin?
checkin(destination, clientRequestExecutor);
isComplete = true;
}
Expand Down
4 changes: 4 additions & 0 deletions src/java/voldemort/utils/pool/AsyncResourceRequest.java
Expand Up @@ -32,4 +32,8 @@ public interface AsyncResourceRequest<V> {
* invoked.
*/
long getDeadlineNs();

long getStartTimeNs();

long getStartTimeMs();
}
161 changes: 136 additions & 25 deletions src/java/voldemort/utils/pool/KeyedResourcePool.java
Expand Up @@ -11,6 +11,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -106,6 +107,7 @@ public static <K, V> KeyedResourcePool<K, V> create(ResourceFactory<K, V> factor
public V checkout(K key) throws Exception {
checkNotClosed();

// TODO: clean up nanotime BS
long startNs = System.nanoTime();
Pool<V> resourcePool = getResourcePoolForKey(key);
// Always attempt to grow. This protects against running out of
Expand All @@ -121,7 +123,7 @@ public V checkout(K key) throws Exception {
long timeRemainingNs = resourcePoolConfig.getTimeout(TimeUnit.NANOSECONDS)
- (System.nanoTime() - startNs);
if(timeRemainingNs > 0)
resource = resourcePool.blockingGet(timeRemainingNs);
resource = resourcePool.blockingGet(timeRemainingNs, System.currentTimeMillis());

if(resource == null)
throw new TimeoutException("Could not acquire resource in "
Expand Down Expand Up @@ -189,12 +191,14 @@ protected Pool<V> getResourcePoolForExistingKey(K key) {
* A safe wrapper to destroy the given resource that catches any user
* exceptions.
*/
// TODO: If this method is called multiple times on the same resource, the
// resourcePool size gets decremented too many times and badness ensues.
protected void destroyResource(K key, Pool<V> resourcePool, V resource) {
if(resource != null) {
try {
objectFactory.destroy(key, resource);
} catch(Exception e) {
logger.error("Exception while destroying invalid resource:", e);
logger.error("Exception while destroying invalid resource: ", e);
} finally {
// Assumes destroyed resource was in fact checked out of the
// pool.
Expand All @@ -210,15 +214,25 @@ protected void destroyResource(K key, Pool<V> resourcePool, V resource) {
* @param resource The resource
*/
public void checkin(K key, V resource) throws Exception {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
if(isOpen.get() && objectFactory.validate(key, resource)) {
if(isOpenAndValid(key, resource)) {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
boolean success = resourcePool.nonBlockingPut(resource);
if(!success) {
destroyResource(key, resourcePool, resource);
throw new IllegalStateException("Checkin failed. Is the pool already full?");
throw new IllegalStateException("Checkin failed. Is the pool already full? (NB: see if KeyedResourcePool::destroyResource is being called multiple times.)");
}
}
}

// This method may be made protected in the future for the benefit of
// classes which extend from KeyedResourcePool.
protected boolean isOpenAndValid(K key, V resource) throws Exception {
if(isOpen.get() && objectFactory.validate(key, resource)) {
return true;
} else {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
destroyResource(key, resourcePool, resource);
return false;
}
}

Expand Down Expand Up @@ -261,54 +275,107 @@ public void reset(K key) {
}

/**
* Return the total number of resources for the given key whether they are
* currently checked in or checked out.
* Count the number of existing resources for a specific pool.
*
* @param key The key
* @return The count
* @return The count of existing resources. Returns 0 if no pool exists for
* given key.
*/
public int getTotalResourceCount(K key) {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
return resourcePool.size.get();
int rc = 0;
if(!resourcePoolMap.containsKey(key)) {
return rc;
}
try {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
rc = resourcePool.size.get();
} catch(IllegalArgumentException iae) {
logger.debug("getTotalResourceCount called on invalid key: ", iae);
}
return rc;
}

/**
* Get the count of all resources for all pools.
* Count the total number of existing resources for all pools. The result is
* "approximate" in the face of concurrency since individual pools can
* change size during the aggregate count.
*
* @return The count of resources.
* @return The (approximate) aggregate count of existing resources.
*/
public int getTotalResourceCount() {
int count = 0;
for(Entry<K, Pool<V>> entry: this.resourcePoolMap.entrySet())
count += entry.getValue().size.get();
// count is approximate in the case of concurrency since .size.get() for
// various entries can change while other entries are being counted.
return count;
}

/**
* Return the number of resources for the given key that are currently
* sitting idle in the pool waiting to be checked out.
* Count the number of checked in (idle) resources for a specific pool.
*
* @param key The key
* @return The count
* @return The count of checked in resources. Returns 0 if no pool exists
* for given key.
*/
public int getCheckedInResourcesCount(K key) {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
return resourcePool.queue.size();
int rc = 0;
if(!resourcePoolMap.containsKey(key)) {
return rc;
}
try {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
rc = resourcePool.queue.size();
} catch(IllegalArgumentException iae) {
logger.debug("getCheckedInResourceCount called on invalid key: ", iae);
}
return rc;
}

/**
* Get the count of resources for all pools currently checkedin
* Count the total number of checked in (idle) resources across all pools.
* The result is "approximate" in the face of concurrency since individual
* pools can have resources checked in, or out, during the aggregate count.
*
* @return The count of resources
* @return The (approximate) aggregate count of checked in resources.
*/
public int getCheckedInResourceCount() {
int count = 0;
for(Entry<K, Pool<V>> entry: this.resourcePoolMap.entrySet())
count += entry.getValue().queue.size();
// count is approximate in the case of concurrency since .queue.size()
// for various entries can change while other entries are being counted.
return count;
}

/**
* Count the number of blocking gets for a specific key.
*
* @param key The key
* @return The count of blocking gets. Returns 0 if no pool exists for given
* key.
*/
public int getBlockingGetsCount(K key) {
int rc = 0;
if(!resourcePoolMap.containsKey(key)) {
return rc;
}
try {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
rc = resourcePool.blockingGets.get();
} catch(IllegalArgumentException iae) {
logger.debug("getBlockingGetsCount called on invalid key: ", iae);
}
return rc;
}

/**
* Count the total number of blocking gets across all pools. The result is
* "approximate" in the face of concurrency since blocking gets for
* individual pools can be issued or serviced during the aggregate count.
*
* @return The (approximate) aggregate count of blocking gets.
*/
public int getBlockingGetsCount() {
int count = 0;
for(Entry<K, Pool<V>> entry: this.resourcePoolMap.entrySet())
count += entry.getValue().blockingGets.get();
return count;
}

Expand All @@ -321,6 +388,24 @@ protected void checkNotClosed() {
throw new IllegalStateException("Pool is closed!");
}

// OPTION I: Attempt to provide FIFO between sync and async requests.
/*-
protected long getLastTimeMs(K key) {
long rc = 0;
if(!resourcePoolMap.containsKey(key)) {
return rc;
}
try {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
rc = resourcePool.lastTimeMs.get();
} catch(IllegalArgumentException iae) {
logger.debug("getBlockingGetsCount called on invalid key: ", iae);
}
return rc;
}
// */

/**
* A fixed size pool that uses an ArrayBlockingQueue. The pool grows to no
* more than some specified maxPoolSize. The pool creates new resources in
Expand All @@ -330,6 +415,8 @@ protected void checkNotClosed() {
protected static class Pool<V> {

final private AtomicInteger size = new AtomicInteger(0);
final private AtomicInteger blockingGets = new AtomicInteger(0);
private final AtomicLong lastTimeMs = new AtomicLong(0);
final private int maxPoolSize;
final private BlockingQueue<V> queue;

Expand Down Expand Up @@ -357,6 +444,7 @@ public <K> boolean attemptGrow(K key, ResourceFactory<K, V> objectFactory) throw
if(this.size.get() >= this.maxPoolSize) {
return false;
}

if(this.size.incrementAndGet() <= this.maxPoolSize) {
try {
V resource = objectFactory.create(key);
Expand All @@ -382,8 +470,18 @@ public V nonBlockingGet() {
return this.queue.poll();
}

public V blockingGet(long timeoutNs) throws InterruptedException {
return this.queue.poll(timeoutNs, TimeUnit.NANOSECONDS);
public V blockingGet(long timeoutNs, long startTimeMs) throws InterruptedException {
V v;
try {
blockingGets.incrementAndGet();
v = this.queue.poll(timeoutNs, TimeUnit.NANOSECONDS);
// OPTION I: Attempt to provide FIFO between sync and async
// requests.
// updateLastTimeMs(startTimeMs);
} finally {
blockingGets.decrementAndGet();
}
return v;
}

public boolean nonBlockingPut(V v) {
Expand All @@ -396,5 +494,18 @@ public List<V> close() {
return list;
}

// OPTION I: Attempt to provide FIFO between sync and async requests.
/*-
private void updateLastTimeMs(long startTimeMs) {
long curLastTimeMs = lastTimeMs.get();
while(startTimeMs > curLastTimeMs) {
if(lastTimeMs.compareAndSet(curLastTimeMs, startTimeMs)) {
break;
}
curLastTimeMs = lastTimeMs.get();
}
}
// */

}
}

0 comments on commit f1cd8ef

Please sign in to comment.