Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
track number of transactional get requests and cancellations per queue
Browse files Browse the repository at this point in the history
RB_ID=81821
  • Loading branch information
Stephan Zuercher committed Aug 22, 2012
1 parent ba5b029 commit 9afe5c4
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 1 deletion.
4 changes: 4 additions & 0 deletions docs/guide.md
Expand Up @@ -421,6 +421,10 @@ For each queue, the following stats are also reported:
- `waiters` - number of clients waiting for an item from this queue (using
`GET/t`)
- `open_transactions` - items read with `/open` but not yet confirmed
- `transactions` - number of transactional get requests (irrespective of whether an
item was read or not)
- `canceled_transactions` - number of transactional get requests canceled (for any
reason)
- `total_flushes` - total number of times this queue has been flushed
- `age_msec` - age of the last item read from the queue
- `create_time` - the time that the queue was created (in milliseconds since epoch)
Expand Down
16 changes: 15 additions & 1 deletion src/main/scala/net/lag/kestrel/PersistentQueue.scala
Expand Up @@ -71,6 +71,12 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
Stats.removeCounter(statNamed("get_items_miss"))
Stats.makeCounter(statNamed("get_items_miss"), getItemsMiss)

// # of transactions attempted/canceled
val totalTransactions = Stats.getCounter(statNamed("transactions"))
totalTransactions.reset()
val totalCanceledTransactions = Stats.getCounter(statNamed("canceled_transactions"))
totalCanceledTransactions.reset()

// # of items that were expired by the time they were read:
val totalExpired = Stats.getCounter(statNamed("expired_items"))
totalExpired.reset()
Expand Down Expand Up @@ -136,6 +142,8 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
("discarded", totalDiscarded().toString),
("waiters", waiterCount.toString),
("open_transactions", openTransactionCount.toString),
("transactions", totalTransactions().toString),
("canceled_transactions", totalCanceledTransactions().toString),
("total_flushes", totalFlushes().toString)
)
}
Expand Down Expand Up @@ -285,6 +293,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
if (closed || paused || queueLength == 0) {
None
} else {
if (transaction) totalTransactions.incr()
val item = _remove(transaction, None)
if (config.keepJournal && item.isDefined) {
if (transaction) journal.removeTentative(item.get.xid) else journal.remove()
Expand Down Expand Up @@ -375,7 +384,10 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
synchronized {
if (!closed) {
if (config.keepJournal) journal.unremove(xid)
_unremove(xid)
_unremove(xid) match {
case Some(_) => totalCanceledTransactions.incr()
case None => ()
}
waiters.trigger()
}
}
Expand Down Expand Up @@ -440,6 +452,8 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
Stats.removeCounter(statNamed("put_bytes"))
Stats.removeCounter(statNamed("put_items"))
Stats.removeCounter(statNamed("expired_items"))
Stats.removeCounter(statNamed("transactions"))
Stats.removeCounter(statNamed("canceled_transactions"))
Stats.removeCounter(statNamed("discarded"))
Stats.removeCounter(statNamed("total_flushes"))
Stats.clearGauge(statNamed("items"))
Expand Down

0 comments on commit 9afe5c4

Please sign in to comment.