Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
move get_{hit,miss}_latency_usec into PersisteneQueue; introduce deli…
Browse files Browse the repository at this point in the history
…very_latency_msec to queue latency
  • Loading branch information
Stephan Zuercher committed Nov 23, 2011
1 parent 0f6bc7f commit 07b3dee
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 24 deletions.
15 changes: 0 additions & 15 deletions src/main/scala/net/lag/kestrel/KestrelHandler.scala
Expand Up @@ -194,21 +194,6 @@ abstract class KestrelHandler(val queues: QueueCollection, val maxOpenTransactio
waitingFor = Some(future)
future.map { itemOption =>
waitingFor = None
timeout match {
case None => {
val usec = (Time.now - startTime).inMicroseconds.toInt
val statName = if (itemOption.isDefined) "get_hit_latency_usec" else "get_miss_latency_usec"
Stats.addMetric(statName, usec)
Stats.addMetric("q/" + key + "/" + statName, usec)
}
case Some(_) => {
if (!itemOption.isDefined) {
val msec = (Time.now - startTime).inMilliseconds.toInt
Stats.addMetric("get_timeout_msec", msec)
Stats.addMetric("q/" + key + "/get_timeout_msec", msec)
}
}
}
itemOption.foreach { item =>
log.debug("get <- %s", item)
if (opening) pendingTransactions.add(key, item.xid)
Expand Down
35 changes: 26 additions & 9 deletions src/main/scala/net/lag/kestrel/PersistentQueue.scala
Expand Up @@ -240,6 +240,13 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
if (transaction) journal.removeTentative(item.get.xid) else journal.remove()
checkRotateJournal()
}

item.foreach { qItem =>
val usec = (Time.now - qItem.addTime).inMilliseconds.toInt max 0
Stats.addMetric("delivery_latency_msec", usec)
Stats.addMetric("q/" + name + "/delivery_latency_msec", usec)
}

item
}
}
Expand All @@ -250,12 +257,20 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
*/
def remove(): Option[QItem] = remove(false)

private def waitOperation(op: => Option[QItem], deadline: Option[Time], future: Promise[Option[QItem]]) {
private def waitOperation(op: => Option[QItem], startTime: Time, deadline: Option[Time],
future: Promise[Option[QItem]]) {
val item = op
if (synchronized {
if (!item.isDefined && !closed && !paused && deadline.isDefined && deadline.get > Time.now) {
// if we get woken up, try again with the same deadline.
val w = waiters.add(deadline.get, { () => waitOperation(op, deadline, future) }, { () => future.setValue(None) })
def onTrigger() = waitOperation(op, startTime, deadline, future)
def onTimeout() {
val msec = (Time.now - startTime).inMilliseconds.toInt
Stats.addMetric("get_timeout_msec", msec)
Stats.addMetric("q/" + name + "/get_timeout_msec", msec)
future.setValue(None)
}
val w = waiters.add(deadline.get, onTrigger, onTimeout)
// FIXME: use onCancellation when util-core is bumped.
future.linkTo(new CancellableSink({ waiters.remove(w) }))
false
Expand All @@ -266,20 +281,22 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
}

final def waitRemove(deadline: Option[Time], transaction: Boolean): Future[Option[QItem]] = {
val startTime = Time.now
val promise = new Promise[Option[QItem]]()
waitOperation(remove(transaction), deadline, promise)
// if an item was handed off immediately, track latency from the "put" to "get".
if (promise.isDefined && promise().isDefined) {
val usec = (Time.now - promise().get.addTime).inMicroseconds.toInt max 0
Stats.addMetric("get_hit_latency_usec", usec)
Stats.addMetric("q/" + name + "/get_hit_latency_usec", usec)
waitOperation(remove(transaction), startTime, deadline, promise)
// if an item was handed off immediately, track latency of the "get" operation
if (promise.isDefined) {
val statName = if (promise().isDefined) "get_hit_latency_usec" else "get_miss_latency_usec"
val usec = (Time.now - startTime).inMicroseconds.toInt max 0
Stats.addMetric(statName, usec)
Stats.addMetric("q/" + name + "/" + statName, usec)
}
promise
}

final def waitPeek(deadline: Option[Time]): Future[Option[QItem]] = {
val promise = new Promise[Option[QItem]]()
waitOperation(peek(), deadline, promise)
waitOperation(peek(), Time.now, deadline, promise)
promise
}

Expand Down

0 comments on commit 07b3dee

Please sign in to comment.