Permalink
Browse files

Merge branch 'master' of github.com:robey/kestrel

  • Loading branch information...
2 parents ec9b68d + 97ccc0c commit 5e3ff4887c0d7e7af0e9350a67b051018f565ca9 Robey Pointer committed Feb 14, 2012
View
@@ -355,6 +355,8 @@ Global stats reported by kestrel are:
- `get_misses` - total `GET` requests on an empty queue
- `bytes_read` - total bytes read from clients
- `bytes_written` - total bytes written to clients
+- `queue_creates` - total number of queues created
+- `queue_deletes` - total number of queues deleted
For each queue, the following stats are also reported:
@@ -376,6 +378,7 @@ For each queue, the following stats are also reported:
- `waiters` - number of clients waiting for an item from this queue (using
`GET/t`)
- `open_transactions` - items read with `/open` but not yet confirmed
+- `total_flushes` total number of times this queue has been flushed
Kestrel as a library
@@ -199,6 +199,8 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
report += (("get_misses", Stats.getCounter("get_misses")().toString))
report += (("bytes_read", Stats.getCounter("bytes_read")().toString))
report += (("bytes_written", Stats.getCounter("bytes_written")().toString))
+ report += (("queue_creates", Stats.getCounter("queue_creates")().toString))
+ report += (("queue_deletes", Stats.getCounter("queue_deletes")().toString))
for (qName <- queues.queueNames) {
report ++= queues.stats(qName).map { case (k, v) => ("queue_" + qName + "_" + k, v) }
@@ -59,6 +59,10 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
val totalDiscarded = Stats.getCounter(statNamed("discarded"))
totalDiscarded.reset()
+ // # of times this queue has been flushed:
+ val totalFlushes = Stats.getCounter(statNamed("total_flushes"))
+ totalFlushes.reset()
+
// # of items in the queue (including those not in memory)
private var queueLength: Long = 0
@@ -110,7 +114,8 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
("age", currentAge.inMilliseconds.toString),
("discarded", totalDiscarded().toString),
("waiters", waiterCount.toString),
- ("open_transactions", openTransactionCount.toString)
+ ("open_transactions", openTransactionCount.toString),
+ ("total_flushes", totalFlushes().toString)
)
}
@@ -327,6 +332,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
def flush() {
while (remove(false).isDefined) { }
+ totalFlushes.incr()
}
/**
@@ -371,6 +377,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
Stats.removeCounter(statNamed("total_items"))
Stats.removeCounter(statNamed("expired_items"))
Stats.removeCounter(statNamed("discarded"))
+ Stats.removeCounter(statNamed("total_flushes"))
Stats.clearGauge(statNamed("items"))
Stats.clearGauge(statNamed("bytes"))
Stats.clearGauge(statNamed("journal_size"))
@@ -54,6 +54,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
}
val config = queueConfigMap.getOrElse(name, defaultQueueConfig)
log.info("Setting up queue %s: %s", realName, config)
+ Stats.incr("queue_creates")
new PersistentQueue(realName, path, config, timer, journalSyncScheduler, Some(this.apply))
}
@@ -178,6 +179,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
q.destroyJournal()
q.removeStats()
queues.remove(name)
+ Stats.incr("queue_deletes")
}
if (name contains '+') {
val master = name.split('+')(0)
@@ -95,6 +95,7 @@ class PersistentQueueSpec extends Specification
q.totalItems() mustEqual 0
q.bytes mustEqual 0
q.journalSize mustEqual 0
+ q.totalFlushes() mustEqual 0
q.add("alpha".getBytes)
q.add("beta".getBytes)
@@ -103,6 +104,8 @@ class PersistentQueueSpec extends Specification
q.flush()
q.length mustEqual 0
+
+ q.totalFlushes() mustEqual 1
// journal should contain exactly: one unfinished transaction, 2 items.
q.close
@@ -47,6 +47,8 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
Stats.clearAll()
qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
qc.queueNames mustEqual Nil
+ Stats.getCounter("queue_creates")() mustEqual 0
+ Stats.getCounter("queue_deletes")() mustEqual 0
qc.add("work1", "stuff".getBytes)
qc.add("work2", "other stuff".getBytes)
@@ -55,6 +57,8 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
qc.currentBytes mustEqual 16
qc.currentItems mustEqual 2
Stats.getCounter("total_items")() mustEqual 2
+ Stats.getCounter("queue_creates")() mustEqual 2
+ Stats.getCounter("queue_deletes")() mustEqual 0
qc.remove("work1")() must beSomeQItem("stuff")
qc.remove("work1")() mustEqual None
@@ -164,8 +168,11 @@ class QueueCollectionSpec extends Specification with TempFolder with TestLogging
new File(folderName + "/apples").createNewFile()
new File(folderName + "/oranges").createNewFile()
qc = new QueueCollection(folderName, timer, scheduler, config, Nil)
+ Stats.getCounter("queue_deletes")() mustEqual 0
qc.loadQueues()
qc.delete("oranges")
+
+ Stats.getCounter("queue_deletes")() mustEqual 1
new File(folderName).list().toList.sorted mustEqual List("apples")
qc.queueNames.sorted mustEqual List("apples")

0 comments on commit 5e3ff48

Please sign in to comment.