Permalink
Browse files

Merge branch 'release_2_2'

  • Loading branch information...
2 parents 7e00fd6 + f967a41 commit ff3bfddec1f4a4197cebf35dc3bf619073c354b7 Stephan Zuercher committed May 11, 2012
@@ -152,18 +152,26 @@ abstract class KestrelHandler(
// will do a continuous fetch on a queue until time runs out or read buffer is full.
final def monitorUntil(key: String, timeLimit: Option[Time], maxItems: Int, opening: Boolean)(f: (Option[QItem], Option[Long]) => Unit) {
log.debug("monitor -> q=%s t=%s max=%d open=%s", key, timeLimit, maxItems, opening)
- if (maxItems == 0 || (timeLimit.isDefined && timeLimit.get <= Time.now) || countPendingReads(key) >= maxOpenReads) {
- f(None, None)
- } else {
- queues.remove(key, timeLimit, opening, false).onSuccess {
- case None =>
- f(None, None)
- case x @ Some(item) =>
- val xidContext = if (opening) addPendingRead(key, item.xid) else None
- f(x, xidContext)
- monitorUntil(key, timeLimit, maxItems - 1, opening)(f)
+ Stats.incr("cmd_monitor")
+
+ def monitorLoop(maxItems: Int) {
+ log.debug("monitor loop -> q=%s t=%s max=%d open=%s", key, timeLimit, maxItems, opening)
+ if (maxItems == 0 || (timeLimit.isDefined && timeLimit.get <= Time.now) || countPendingReads(key) >= maxOpenReads) {
+ f(None, None)
+ } else {
+ Stats.incr("cmd_monitor_get")
+ queues.remove(key, timeLimit, opening, false).onSuccess {
+ case None =>
+ f(None, None)
+ case x @ Some(item) =>
+ val xidContext = if (opening) addPendingRead(key, item.xid) else None
+ f(x, xidContext)
+ monitorLoop(maxItems - 1)
+ }
}
}
+
+ monitorLoop(maxItems)
}
def getItem(key: String, timeout: Option[Time], opening: Boolean, peeking: Boolean): Future[Option[QItem]] = {
@@ -99,6 +99,8 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
Stats.getCounter("cmd_get")() mustEqual 0
Stats.getCounter("cmd_set")() mustEqual 0
+ Stats.getCounter("cmd_monitor")() mustEqual 0
+ Stats.getCounter("cmd_monitor_get")() mustEqual 0
Stats.getCounter("get_hits")() mustEqual 0
Stats.getCounter("get_misses")() mustEqual 0
@@ -117,6 +119,64 @@ class KestrelHandlerSpec extends Specification with TempFolder with TestLogging
Stats.getCounter("cmd_get")() mustEqual 2
Stats.getCounter("get_hits")() mustEqual 1
Stats.getCounter("get_misses")() mustEqual 1
+ Stats.getCounter("cmd_monitor")() mustEqual 0
+ Stats.getCounter("cmd_monitor_get")() mustEqual 0
+ }
+ }
+
+ "track monitor stats" in {
+ withTempFolder {
+ Stats.clearAll()
+ queues = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ val handler = new FakeKestrelHandler(queues, 10)
+
+ handler.setItem("test", 0, None, "one".getBytes)
+ handler.setItem("test", 0, None, "two".getBytes)
+ handler.setItem("test", 0, None, "three".getBytes)
+ Stats.getCounter("cmd_set")() mustEqual 3
+ Stats.getCounter("cmd_get")() mustEqual 0
+ Stats.getCounter("cmd_monitor")() mustEqual 0
+ Stats.getCounter("cmd_monitor_get")() mustEqual 0
+
+ val items = new mutable.ListBuffer[Option[QItem]]()
+ def addItem(item: Option[QItem], xid: Option[Long]) { items.append(item) }
+
+ handler.monitorUntil("test", Some(1.hour.fromNow), 2, false)(addItem)
+ items.size mustEqual 3
+ items(0) must beString("one")
+ items(1) must beString("two")
+ items(2) mustEqual None
+ Stats.getCounter("cmd_set")() mustEqual 3
+ Stats.getCounter("cmd_get")() mustEqual 0
+ Stats.getCounter("cmd_monitor")() mustEqual 1
+ Stats.getCounter("cmd_monitor_get")() mustEqual 2
+ Stats.getCounter("get_hits")() mustEqual 2
+ Stats.getCounter("get_misses")() mustEqual 0
+
+ items.clear()
+ handler.monitorUntil("test", Some(1.second.fromNow), 2, false)(addItem)
+ timer.timeout()
+ items.size mustEqual 2
+ items(0) must beString("three")
+ items(1) mustEqual None
+ Stats.getCounter("cmd_set")() mustEqual 3
+ Stats.getCounter("cmd_get")() mustEqual 0
+ Stats.getCounter("cmd_monitor")() mustEqual 2
+ Stats.getCounter("cmd_monitor_get")() mustEqual 4
+ Stats.getCounter("get_hits")() mustEqual 3
+ Stats.getCounter("get_misses")() mustEqual 1
+
+ items.clear()
+ handler.monitorUntil("test", Some(1.second.fromNow), 2, false)(addItem)
+ timer.timeout()
+ items.size mustEqual 1
+ items(0) mustEqual None
+ Stats.getCounter("cmd_set")() mustEqual 3
+ Stats.getCounter("cmd_get")() mustEqual 0
+ Stats.getCounter("cmd_monitor")() mustEqual 3
+ Stats.getCounter("cmd_monitor_get")() mustEqual 5
+ Stats.getCounter("get_hits")() mustEqual 3
+ Stats.getCounter("get_misses")() mustEqual 2
}
}

0 comments on commit ff3bfdd

Please sign in to comment.