Skip to content

Commit

Permalink
Added Jmx interfaces for all queue stats we now track. Updated Client…
Browse files Browse the repository at this point in the history
…SocketStatsTest as well.

Added a big TODO expressing concern over how statistics are tracked with suggestions for improvements.
  • Loading branch information
jayjwylie committed Oct 9, 2012
1 parent 233ceb1 commit e1fc74c
Show file tree
Hide file tree
Showing 5 changed files with 438 additions and 115 deletions.
Expand Up @@ -167,6 +167,8 @@ public ClientRequestExecutor checkout(SocketDestination destination) {
long end = System.nanoTime();
if(stats != null) {
stats.recordCheckoutTimeUs(destination, (end - start) / Time.NS_PER_US);
stats.recordCheckoutQueueLength(destination,
queuedPool.getBlockingGetsCount(destination));
}
}
return clientRequestExecutor;
Expand All @@ -187,26 +189,6 @@ public void checkin(SocketDestination destination, ClientRequestExecutor clientR
}
}

// TODO: remove this helper method once JmxUtils are all updated to track
// queue stats of interest.
/**
* This method is useful for printing out QueuedKeyedResourcePool statistics
* if debugging issues with the underlying QueuedKeyedResourcePool and
* KeyedResourcePool.
*
* @param tag A tag to be printed out in debugger output.
* @param destination The socket destination to print pool statistics for.
*/
private void printPoolStats(String tag, SocketDestination destination) {
if(logger.isDebugEnabled()) {
logger.debug("CREP::" + tag + " : " + destination.toString() + " --- AQ QLen = "
+ queuedPool.getRegisteredResourceRequestCount(destination)
+ " --- SQ QLen = " + queuedPool.getBlockingGetsCount(destination)
+ ", ChkIn = " + queuedPool.getCheckedInResourcesCount(destination)
+ ", Tot = " + queuedPool.getTotalResourceCount(destination));
}
}

public void close(SocketDestination destination) {
factory.setLastClosedTimestamp(destination);
queuedPool.reset(destination);
Expand Down Expand Up @@ -278,7 +260,17 @@ public AsyncSocketDestinationRequest(SocketDestination destination,
this.startTimeNs = System.nanoTime();
}

protected void updateStats() {
if(stats != null) {
stats.recordResourceRequestTimeUs(destination, (System.nanoTime() - startTimeNs)
/ Time.NS_PER_US);
stats.recordResourceRequestQueueLength(destination,
queuedPool.getRegisteredResourceRequestCount(destination));
}
}

public void useResource(ClientRequestExecutor clientRequestExecutor) {
updateStats();
if(logger.isDebugEnabled()) {
logger.debug("Async request start; type: "
+ operationName
Expand Down Expand Up @@ -306,12 +298,14 @@ public void useResource(ClientRequestExecutor clientRequestExecutor) {
}

public void handleTimeout() {
// Do *not* invoke updateStats since handleException does so.
long durationNs = System.nanoTime() - startTimeNs;
handleException(new TimeoutException("Could not acquire resource in " + timeoutMs
+ " ms. (Took " + durationNs + " ns.)"));
}

public void handleException(Exception e) {
updateStats();
if(!(e instanceof UnreachableStoreException))
e = new UnreachableStoreException("Failure in " + operationName + ": "
+ e.getMessage(), e);
Expand Down
203 changes: 167 additions & 36 deletions src/java/voldemort/store/stats/ClientSocketStats.java
Expand Up @@ -32,20 +32,45 @@
*
*
*/
// TODO: This approach to stats tracking seems scary. All of the getter methods
// query current counters/histograms that are being updated. If you happen to
// use a getter soon after the monitoringInterval has rolled over, then your
// answer is likely statistically insignificant and potentially totally whacked
// out (not a technical term, sorry). Either of the following approaches seem
// like an improvement to me:
//
// (1) Effectively have two copies of all stats tracking "current" and "prev".
// All the getters would access "last". This means the responses are
// statistically meaningful, but potentially stale. reset() would copy
// "current" to "prev" and then resets "current".
//
// (2) A more general variant of (1) is to have n copies of all stats tracking.
// The getters would aggregated over all n copies of stats tracking. This
// provides a "sliding" window of statistically valid responses. reset() would
// create a new stats tracking object and delete the oldest stats trackig
// object.
public class ClientSocketStats {

private final ClientSocketStats parent;
private final ConcurrentMap<SocketDestination, ClientSocketStats> statsMap;
private final SocketDestination destination;
private QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool;

// monitoringInterval <= connectionCheckouts + resourceRequests
private final AtomicInteger monitoringInterval = new AtomicInteger(10000);
private final Histogram checkoutTimeUsHistogram = new Histogram(20000, 100);
private final AtomicLong totalCheckoutTimeUs = new AtomicLong(0);
private final AtomicLong avgCheckoutTimeUs = new AtomicLong(0);
// Connection lifecycle
private final AtomicInteger connectionsCreated = new AtomicInteger(0);
private final AtomicInteger connectionsDestroyed = new AtomicInteger(0);
private final AtomicInteger connectionsCheckedout = new AtomicInteger(0);
// "Sync checkouts" / KeyedResourcePool::checkout
private final Histogram checkoutTimeUsHistogram = new Histogram(20000, 100);
private final AtomicLong totalCheckoutTimeUs = new AtomicLong(0);
private final AtomicInteger checkoutCount = new AtomicInteger(0);
private final Histogram checkoutQueueLengthHistogram = new Histogram(250, 1);
// "Async checkouts" / QueuedKeyedResourcePool::registerResourceRequest
private final Histogram resourceRequestTimeUsHistogram = new Histogram(20000, 100);
private final AtomicLong totalResourceRequestTimeUs = new AtomicLong(0);
private final AtomicInteger resourceRequestCount = new AtomicInteger(0);
private final Histogram resourceRequestQueueLengthHistogram = new Histogram(250, 1);

private final int jmxId;

Expand Down Expand Up @@ -114,34 +139,69 @@ public void recordCheckoutTimeUs(SocketDestination dest, long checkoutTimeUs) {
recordCheckoutTimeUs(null, checkoutTimeUs);
} else {
this.totalCheckoutTimeUs.getAndAdd(checkoutTimeUs);
checkoutTimeUsHistogram.insert(checkoutTimeUs);
int checkouts = this.connectionsCheckedout.getAndIncrement();

// reset aggregated stats and all the node stats for new interval
if(parent == null && statsMap != null) {
int interval = this.monitoringInterval.get();
if(checkouts % interval == interval - 1) {
// reset all children
Iterator<SocketDestination> it = statsMap.keySet().iterator();
while(it.hasNext()) {
ClientSocketStats stats = statsMap.get(it.next());
stats.resetForInterval();
}
// reset itself
resetForInterval();
}
}
this.checkoutTimeUsHistogram.insert(checkoutTimeUs);
this.checkoutCount.getAndIncrement();

checkMonitoringInterval();
}
}

/**
* Calculate the average and reset the stats
* Record the checkout queue length
*
* @param dest Destination of the socket to checkout. Will actually record
* if null. Otherwise will call this on self and corresponding child
* with this param null.
* @param queueLength The number of entries in the "synchronous" checkout
* queue.
*/
public void resetForInterval() {
// harmless race condition:
this.totalCheckoutTimeUs.set(0);
this.connectionsCheckedout.set(0);
checkoutTimeUsHistogram.reset();
public void recordCheckoutQueueLength(SocketDestination dest, int queueLength) {
if(dest != null) {
getOrCreateNodeStats(dest).recordCheckoutQueueLength(null, queueLength);
recordCheckoutQueueLength(null, queueLength);
} else {
this.checkoutQueueLengthHistogram.insert(queueLength);
}
}

/**
* Record the resource request wait time in us
*
* @param dest Destination of the socket for which the resource was
* requested. Will actually record if null. Otherwise will call this
* on self and corresponding child with this param null.
* @param resourceRequestTimeUs The number of us to wait before getting a
* socket
*/
public void recordResourceRequestTimeUs(SocketDestination dest, long resourceRequestTimeUs) {
if(dest != null) {
getOrCreateNodeStats(dest).recordResourceRequestTimeUs(null, resourceRequestTimeUs);
recordResourceRequestTimeUs(null, resourceRequestTimeUs);
} else {
this.totalResourceRequestTimeUs.getAndAdd(resourceRequestTimeUs);
this.resourceRequestTimeUsHistogram.insert(resourceRequestTimeUs);
this.resourceRequestCount.getAndIncrement();

checkMonitoringInterval();
}
}

/**
* Record the resource request queue length
*
* @param dest Destination of the socket for which resource request is
* enqueued. Will actually record if null. Otherwise will call this
* on self and corresponding child with this param null.
* @param queueLength The number of entries in the "asynchronous" resource
* request queue.
*/
public void recordResourceRequestQueueLength(SocketDestination dest, int queueLength) {
if(dest != null) {
getOrCreateNodeStats(dest).recordResourceRequestQueueLength(null, queueLength);
recordResourceRequestQueueLength(null, queueLength);
} else {
this.resourceRequestQueueLengthHistogram.insert(queueLength);
}
}

public void connectionCreate(SocketDestination dest) {
Expand All @@ -162,7 +222,7 @@ public void connectionDestroy(SocketDestination dest) {
}
}

/* getters */
// Getters for connection life cycle stats

public int getConnectionsCreated() {
return connectionsCreated.intValue();
Expand All @@ -172,21 +232,55 @@ public int getConnectionsDestroyed() {
return connectionsDestroyed.intValue();
}

public int getConnectionsCheckedout() {
return connectionsCheckedout.intValue();
// Getters for checkout stats

public int getCheckoutCount() {
return checkoutCount.intValue();
}

public Histogram getWaitHistogram() {
public Histogram getCheckoutWaitUsHistogram() {
return this.checkoutTimeUsHistogram;
}

public long getAveWaitUs() {
long ns = totalCheckoutTimeUs.get();
int count = connectionsCheckedout.get();
this.avgCheckoutTimeUs.set(count > 0 ? (ns / count) : -1);
return this.avgCheckoutTimeUs.longValue();
/**
* @return -1 if there have been no checkout invocations
*/
public long getAvgCheckoutWaitUs() {
long count = checkoutCount.get();
if(count > 0)
return totalCheckoutTimeUs.get() / count;
return -1;
}

public Histogram getCheckoutQueueLengthHistogram() {
return this.checkoutQueueLengthHistogram;
}

// Getters for resourceRequest stats

public int resourceRequestCount() {
return resourceRequestCount.intValue();
}

public Histogram getResourceRequestWaitUsHistogram() {
return this.resourceRequestTimeUsHistogram;
}

/**
* @return -1 if there have been no resourceRequest invocations
*/
public long getAvgResourceRequestWaitUs() {
long count = resourceRequestCount.get();
if(count > 0)
return totalResourceRequestTimeUs.get() / count;
return -1;
}

public Histogram getResourceRequestQueueLengthHistogram() {
return this.resourceRequestQueueLengthHistogram;
}

// Getters for (queued)pool stats
public int getConnectionsActive(SocketDestination destination) {
if(destination == null) {
return pool.getTotalResourceCount();
Expand All @@ -203,6 +297,8 @@ public int getConnectionsInPool(SocketDestination destination) {
}
}

// Config & administrivia interfaces

public void setMonitoringInterval(int count) {
this.monitoringInterval.set(count);
}
Expand All @@ -211,6 +307,41 @@ public int getMonitoringInterval() {
return this.monitoringInterval.get();
}

protected void checkMonitoringInterval() {
int monitoringCount = this.checkoutCount.get() + this.resourceRequestCount.get();

// reset aggregated stats and all the node stats for new interval
if(parent == null && statsMap != null) {
int monitoringInterval = this.monitoringInterval.get();
if(monitoringCount % (monitoringInterval + 1) == monitoringInterval) {
// reset all children
Iterator<SocketDestination> it = statsMap.keySet().iterator();
while(it.hasNext()) {
ClientSocketStats stats = statsMap.get(it.next());
stats.resetForInterval();
}
// reset itself
resetForInterval();
}
}
}

/**
* Reset all of the stats counters
*/
protected void resetForInterval() {
// harmless race conditions amongst all of this counter resetting:
this.totalCheckoutTimeUs.set(0);
this.checkoutCount.set(0);
this.checkoutTimeUsHistogram.reset();
this.checkoutQueueLengthHistogram.reset();

this.totalResourceRequestTimeUs.set(0);
this.resourceRequestCount.set(0);
this.resourceRequestTimeUsHistogram.reset();
this.resourceRequestQueueLengthHistogram.reset();
}

public void setPool(QueuedKeyedResourcePool<SocketDestination, ClientRequestExecutor> pool) {
this.pool = pool;
}
Expand Down

0 comments on commit e1fc74c

Please sign in to comment.