From 07b3deebc8580090b15747e244c5f63b47da4149 Mon Sep 17 00:00:00 2001 From: Stephan Zuercher Date: Wed, 23 Nov 2011 10:26:31 -0800 Subject: [PATCH] move get_{hit,miss}_latency_usec into PersisteneQueue; introduce delivery_latency_msec to queue latency --- .../net/lag/kestrel/KestrelHandler.scala | 15 -------- .../net/lag/kestrel/PersistentQueue.scala | 35 ++++++++++++++----- 2 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/main/scala/net/lag/kestrel/KestrelHandler.scala b/src/main/scala/net/lag/kestrel/KestrelHandler.scala index 294a5864..80352e34 100644 --- a/src/main/scala/net/lag/kestrel/KestrelHandler.scala +++ b/src/main/scala/net/lag/kestrel/KestrelHandler.scala @@ -194,21 +194,6 @@ abstract class KestrelHandler(val queues: QueueCollection, val maxOpenTransactio waitingFor = Some(future) future.map { itemOption => waitingFor = None - timeout match { - case None => { - val usec = (Time.now - startTime).inMicroseconds.toInt - val statName = if (itemOption.isDefined) "get_hit_latency_usec" else "get_miss_latency_usec" - Stats.addMetric(statName, usec) - Stats.addMetric("q/" + key + "/" + statName, usec) - } - case Some(_) => { - if (!itemOption.isDefined) { - val msec = (Time.now - startTime).inMilliseconds.toInt - Stats.addMetric("get_timeout_msec", msec) - Stats.addMetric("q/" + key + "/get_timeout_msec", msec) - } - } - } itemOption.foreach { item => log.debug("get <- %s", item) if (opening) pendingTransactions.add(key, item.xid) diff --git a/src/main/scala/net/lag/kestrel/PersistentQueue.scala b/src/main/scala/net/lag/kestrel/PersistentQueue.scala index f359573a..7075c0ac 100644 --- a/src/main/scala/net/lag/kestrel/PersistentQueue.scala +++ b/src/main/scala/net/lag/kestrel/PersistentQueue.scala @@ -240,6 +240,13 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c if (transaction) journal.removeTentative(item.get.xid) else journal.remove() checkRotateJournal() } + + item.foreach { qItem => + val usec = (Time.now - qItem.addTime).inMilliseconds.toInt max 0 + Stats.addMetric("delivery_latency_msec", usec) + Stats.addMetric("q/" + name + "/delivery_latency_msec", usec) + } + item } } @@ -250,12 +257,20 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c */ def remove(): Option[QItem] = remove(false) - private def waitOperation(op: => Option[QItem], deadline: Option[Time], future: Promise[Option[QItem]]) { + private def waitOperation(op: => Option[QItem], startTime: Time, deadline: Option[Time], + future: Promise[Option[QItem]]) { val item = op if (synchronized { if (!item.isDefined && !closed && !paused && deadline.isDefined && deadline.get > Time.now) { // if we get woken up, try again with the same deadline. - val w = waiters.add(deadline.get, { () => waitOperation(op, deadline, future) }, { () => future.setValue(None) }) + def onTrigger() = waitOperation(op, startTime, deadline, future) + def onTimeout() { + val msec = (Time.now - startTime).inMilliseconds.toInt + Stats.addMetric("get_timeout_msec", msec) + Stats.addMetric("q/" + name + "/get_timeout_msec", msec) + future.setValue(None) + } + val w = waiters.add(deadline.get, onTrigger, onTimeout) // FIXME: use onCancellation when util-core is bumped. future.linkTo(new CancellableSink({ waiters.remove(w) })) false @@ -266,20 +281,22 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c } final def waitRemove(deadline: Option[Time], transaction: Boolean): Future[Option[QItem]] = { + val startTime = Time.now val promise = new Promise[Option[QItem]]() - waitOperation(remove(transaction), deadline, promise) - // if an item was handed off immediately, track latency from the "put" to "get". - if (promise.isDefined && promise().isDefined) { - val usec = (Time.now - promise().get.addTime).inMicroseconds.toInt max 0 - Stats.addMetric("get_hit_latency_usec", usec) - Stats.addMetric("q/" + name + "/get_hit_latency_usec", usec) + waitOperation(remove(transaction), startTime, deadline, promise) + // if an item was handed off immediately, track latency of the "get" operation + if (promise.isDefined) { + val statName = if (promise().isDefined) "get_hit_latency_usec" else "get_miss_latency_usec" + val usec = (Time.now - startTime).inMicroseconds.toInt max 0 + Stats.addMetric(statName, usec) + Stats.addMetric("q/" + name + "/" + statName, usec) } promise } final def waitPeek(deadline: Option[Time]): Future[Option[QItem]] = { val promise = new Promise[Option[QItem]]() - waitOperation(peek(), deadline, promise) + waitOperation(peek(), Time.now, deadline, promise) promise }