Skip to content

Commit

Permalink
Adding effectiveAverageMessageRate
Browse files Browse the repository at this point in the history
  • Loading branch information
shanqing-cai committed Aug 14, 2015
1 parent 0da9ce0 commit 4c5ef9e
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 1 deletion.
10 changes: 10 additions & 0 deletions src/main/java/me/scai/utilities/WorkerPool.java
Expand Up @@ -48,6 +48,16 @@ public interface WorkerPool {
*/
public float getCurrentAverageMessageRate(String workerId);

/* Get the current average message rate, defined as n_M / (t - t0)
* where in n_M is the current total number of messages, t is the current time,
* and t0 is the time of worker registration
*
* @param worker ID
* @return current average message rate (s ^ -1)
* @throws IllegalArgumentException on invalid worker ID
*/
public float getEffectiveAverageMessageRate(String workerId);

/* Clear all workers in the pool */
public void clearWorkers();

Expand Down
13 changes: 13 additions & 0 deletions src/main/java/me/scai/utilities/WorkerPoolImpl.java
Expand Up @@ -253,4 +253,17 @@ public synchronized float getCurrentAverageMessageRate(String workerId) {
return ((float) workerMessageCounts.get(workerId)) / ((float) (t - t0)) / 1000.0f;
}


@Override
public synchronized float getEffectiveAverageMessageRate(String workerId) {
if (!workerMessageCounts.containsKey(workerId)) {
throw new IllegalArgumentException("Invalid worker ID: \"" + workerId + "\"");
}

final long t1 = getLastUseTimestamp(workerId).getTime();
final long t0 = workerCreatedTimestamps.get(workerId).getTime();

return ((float) workerMessageCounts.get(workerId)) / ((float) (t1 - t0)) / 1000.0f;
}

}
11 changes: 10 additions & 1 deletion src/test/java/me/scai/utilities/TestWorkerPool.java
Expand Up @@ -53,7 +53,7 @@ public void testInvalidWorkerId() {
/* Make sure that the right types of exceptions are thrown under invalid worker ID arguments */
invalidWorkerIdExceptionThrown = false;
try {
PoolWorker pw = wp.getWorker(fakeWorkerId):
PooledWorker pw = wp.getWorker(fakeWorkerId);
} catch (IllegalArgumentException exc) {
invalidWorkerIdExceptionThrown = true;
}
Expand Down Expand Up @@ -83,6 +83,14 @@ public void testInvalidWorkerId() {
}
assertTrue(invalidWorkerIdExceptionThrown);

invalidWorkerIdExceptionThrown = false;
try {
float emr = wp.getEffectiveAverageMessageRate(fakeWorkerId);
} catch (IllegalArgumentException exc) {
invalidWorkerIdExceptionThrown = true;
}
assertTrue(invalidWorkerIdExceptionThrown);


}

Expand Down Expand Up @@ -113,6 +121,7 @@ public void testWorkerPool() throws InterruptedException {
}

assertTrue(wp.getCurrentAverageMessageRate(wkrId) > 0.0f);
assertTrue(wp.getEffectiveAverageMessageRate(wkrId) > 0.0f);

/* Wait for a period of time that shouldn't lead to expiration */
Thread.sleep(workerTimeout / 2);
Expand Down

0 comments on commit 4c5ef9e

Please sign in to comment.