Skip to content

Commit

Permalink
Removed the commented out implementations of distinct policies for as…
Browse files Browse the repository at this point in the history
…ync socket checkout.
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent f1cd8ef commit 233ceb1
Show file tree
Hide file tree
Showing 5 changed files with 10 additions and 128 deletions.
Expand Up @@ -187,6 +187,8 @@ public void checkin(SocketDestination destination, ClientRequestExecutor clientR
}
}

// TODO: remove this helper method once JmxUtils are all updated to track
// queue stats of interest.
/**
* This method is useful for printing out QueuedKeyedResourcePool statistics
* if debugging issues with the underlying QueuedKeyedResourcePool and
Expand Down Expand Up @@ -261,7 +263,6 @@ private class AsyncSocketDestinationRequest<T> implements
public final String operationName;

private final long startTimeNs;
private final long startTimeMs;

public AsyncSocketDestinationRequest(SocketDestination destination,
ClientRequest<T> delegate,
Expand All @@ -275,7 +276,6 @@ public AsyncSocketDestinationRequest(SocketDestination destination,
this.operationName = operationName;

this.startTimeNs = System.nanoTime();
this.startTimeMs = System.currentTimeMillis();
}

public void useResource(ClientRequestExecutor clientRequestExecutor) {
Expand Down Expand Up @@ -328,14 +328,6 @@ public void handleException(Exception e) {
public long getDeadlineNs() {
return startTimeNs + TimeUnit.MILLISECONDS.toNanos(timeoutMs);
}

public long getStartTimeNs() {
return startTimeNs;
}

public long getStartTimeMs() {
return startTimeMs;
}
}

private class NonblockingStoreCallbackClientRequest<T> implements ClientRequest<T> {
Expand Down
3 changes: 0 additions & 3 deletions src/java/voldemort/utils/pool/AsyncResourceRequest.java
Expand Up @@ -33,7 +33,4 @@ public interface AsyncResourceRequest<V> {
*/
long getDeadlineNs();

long getStartTimeNs();

long getStartTimeMs();
}
51 changes: 7 additions & 44 deletions src/java/voldemort/utils/pool/KeyedResourcePool.java
Expand Up @@ -11,7 +11,6 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.log4j.Logger;

Expand Down Expand Up @@ -107,7 +106,6 @@ public static <K, V> KeyedResourcePool<K, V> create(ResourceFactory<K, V> factor
public V checkout(K key) throws Exception {
checkNotClosed();

// TODO: clean up nanotime BS
long startNs = System.nanoTime();
Pool<V> resourcePool = getResourcePoolForKey(key);
// Always attempt to grow. This protects against running out of
Expand All @@ -123,7 +121,7 @@ public V checkout(K key) throws Exception {
long timeRemainingNs = resourcePoolConfig.getTimeout(TimeUnit.NANOSECONDS)
- (System.nanoTime() - startNs);
if(timeRemainingNs > 0)
resource = resourcePool.blockingGet(timeRemainingNs, System.currentTimeMillis());
resource = resourcePool.blockingGet(timeRemainingNs);

if(resource == null)
throw new TimeoutException("Could not acquire resource in "
Expand All @@ -140,7 +138,7 @@ public V checkout(K key) throws Exception {
return resource;
}

/*
/**
* Get a free resource if one exists. This method does not block. It either
* returns null or a resource.
*/
Expand All @@ -149,7 +147,7 @@ protected V attemptCheckout(Pool<V> pool) throws Exception {
return resource;
}

/*
/**
* Attempt to create a new object and add it to the pool--this only happens
* if there is room for the new object. This method does not block. This
* method returns true if it adds a resource to the pool. (Returning true
Expand All @@ -160,7 +158,7 @@ protected boolean attemptGrow(K key, Pool<V> pool) throws Exception {
return pool.attemptGrow(key, this.objectFactory);
}

/*
/**
* Get the pool for the given key. If no pool exists, create one.
*/
protected Pool<V> getResourcePoolForKey(K key) {
Expand All @@ -175,7 +173,7 @@ protected Pool<V> getResourcePoolForKey(K key) {
return resourcePool;
}

/*
/**
* Get the pool for the given key. If no pool exists, throw an exception.
*/
protected Pool<V> getResourcePoolForExistingKey(K key) {
Expand Down Expand Up @@ -379,7 +377,7 @@ public int getBlockingGetsCount() {
return count;
}

/*
/**
* Check that the pool is not closed, and throw an IllegalStateException if
* it is.
*/
Expand All @@ -388,24 +386,6 @@ protected void checkNotClosed() {
throw new IllegalStateException("Pool is closed!");
}

// OPTION I: Attempt to provide FIFO between sync and async requests.
/*-
protected long getLastTimeMs(K key) {
long rc = 0;
if(!resourcePoolMap.containsKey(key)) {
return rc;
}
try {
Pool<V> resourcePool = getResourcePoolForExistingKey(key);
rc = resourcePool.lastTimeMs.get();
} catch(IllegalArgumentException iae) {
logger.debug("getBlockingGetsCount called on invalid key: ", iae);
}
return rc;
}
// */

/**
* A fixed size pool that uses an ArrayBlockingQueue. The pool grows to no
* more than some specified maxPoolSize. The pool creates new resources in
Expand All @@ -416,7 +396,6 @@ protected static class Pool<V> {

final private AtomicInteger size = new AtomicInteger(0);
final private AtomicInteger blockingGets = new AtomicInteger(0);
private final AtomicLong lastTimeMs = new AtomicLong(0);
final private int maxPoolSize;
final private BlockingQueue<V> queue;

Expand Down Expand Up @@ -470,14 +449,11 @@ public V nonBlockingGet() {
return this.queue.poll();
}

public V blockingGet(long timeoutNs, long startTimeMs) throws InterruptedException {
public V blockingGet(long timeoutNs) throws InterruptedException {
V v;
try {
blockingGets.incrementAndGet();
v = this.queue.poll(timeoutNs, TimeUnit.NANOSECONDS);
// OPTION I: Attempt to provide FIFO between sync and async
// requests.
// updateLastTimeMs(startTimeMs);
} finally {
blockingGets.decrementAndGet();
}
Expand All @@ -494,18 +470,5 @@ public List<V> close() {
return list;
}

// OPTION I: Attempt to provide FIFO between sync and async requests.
/*-
private void updateLastTimeMs(long startTimeMs) {
long curLastTimeMs = lastTimeMs.get();
while(startTimeMs > curLastTimeMs) {
if(lastTimeMs.compareAndSet(curLastTimeMs, startTimeMs)) {
break;
}
curLastTimeMs = lastTimeMs.get();
}
}
// */

}
}
64 changes: 1 addition & 63 deletions src/java/voldemort/utils/pool/QueuedKeyedResourcePool.java
Expand Up @@ -159,8 +159,6 @@ private boolean processQueue(K key) {
try {
// Always attempt to grow to deal with destroyed resources.
attemptGrow(key, resourcePool);
// TODO: Concerned about mixing the bare poll() in attemptCheckout
// with queueing poll(timeout) in KeyedResourcePool...
resource = attemptCheckout(resourcePool);
} catch(Exception e) {
destroyResource(key, resourcePool, resource);
Expand All @@ -184,22 +182,9 @@ private boolean processQueue(K key) {
}

resourceRequest.useResource(resource);
// TODO: remove resourceRequest.getStartTimeNS()
return true;
}

private long peekNextStartTimeMs(K key) {
Queue<AsyncResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
if(requestQueue.isEmpty()) {
return -1;
}
AsyncResourceRequest<V> resourceRequest = requestQueue.peek();
if(resourceRequest == null) {
return -1;
}
return resourceRequest.getStartTimeMs();
}

/**
* Attempts to repeatedly process enqueued resource requests. Tries until no
* more progress is possible without blocking.
Expand All @@ -218,61 +203,14 @@ private void processQueueLoop(K key) {
*/
@Override
public void checkin(K key, V resource) throws Exception {
// OPTION I: Attempt to provide FIFO between sync and async requests.
/*-
long nextStartTime = peekNextStartTimeMs(key);
if(nextStartTime != -1) {
if(nextStartTime < getLastTimeMs(key)) {
if(isOpenAndValid(key, resource)) {
Queue<AsyncResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
AsyncResourceRequest<V> resourceRequest = getNextUnexpiredResourceRequest(requestQueue);
if(resourceRequest != null) {
resourceRequest.useResource(resource);
return;
}
} else {
resource = null; // twas destroyed
}
}
}
// */

// OPTION II: Strictly prefer async requests over sync requets.
/*-
if(isOpenAndValid(key, resource)) {
Queue<AsyncResourceRequest<V>> requestQueue = getRequestQueueForKey(key);
AsyncResourceRequest<V> resourceRequest = getNextUnexpiredResourceRequest(requestQueue);
if(resourceRequest != null) {
resourceRequest.useResource(resource);
return;
}
} else {
// Must null out resource since a side effect of a failed call to
// isOpenAndValid is to call KeyedResourcePool::destroyResource
// which can only safely be invoked once because its finally clause
// decrements KeyedResourcePool's size.
resource = null; // twas destroyed
}
*/

// For either Option I or II: only checkin if resource is not null, to
// avoid side-effect of invoking destroyResource multiple times on the
// same resource.
/*-
if(resource != null)
super.checkin(key, resource);
// */

// Option III:
super.checkin(key, resource);

// NB: Blocking checkout calls for synchronous requests get the resource
// checked in above before processQueueLoop() attempts checkout below.
// There is therefore a risk that asynchronous requests will be starved.
processQueueLoop(key);
}

/*
/**
* A safe wrapper to destroy the given resource request.
*/
protected void destroyRequest(AsyncResourceRequest<V> resourceRequest) {
Expand Down
Expand Up @@ -459,13 +459,5 @@ public void handleException(Exception e) {
public long getDeadlineNs() {
return deadlineNs;
}

public long getStartTimeNs() {
return 0;
}

public long getStartTimeMs() {
return 0;
}
}
}

0 comments on commit 233ceb1

Please sign in to comment.