Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

alias per-queue "total_items" to "put_items", and add "put_bytes", "g…

…et_items_hit", and "get_items_miss".
  • Loading branch information...
commit 92725400e481dc9a8f7f549792ff71a45c1cd194 1 parent 3060b0d
Robey Pointer authored
View
39 src/main/scala/net/lag/kestrel/PersistentQueue.scala
@@ -21,6 +21,7 @@ import java.io._
import java.nio.{ByteBuffer, ByteOrder}
import java.nio.channels.FileChannel
import java.util.concurrent.{CountDownLatch, Executor, ScheduledExecutorService}
+import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
import com.twitter.conversions.storage._
import com.twitter.conversions.time._
@@ -51,8 +52,24 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
def statNamed(statName: String) = "q/" + name + "/" + statName
// # of items EVER added to the queue:
- val totalItems = Stats.getCounter(statNamed("total_items"))
- totalItems.reset()
+ val putItems = new AtomicLong(0)
+ Stats.removeCounter(statNamed("total_items"))
+ Stats.makeCounter(statNamed("total_items"), putItems)
+ Stats.removeCounter(statNamed("put_items"))
+ Stats.makeCounter(statNamed("put_items"), putItems)
+
+ // # of bytes EVER added to the queue:
+ val putBytes = new AtomicLong(0)
+ Stats.removeCounter(statNamed("put_bytes"))
+ Stats.makeCounter(statNamed("put_bytes"), putBytes)
+
+ // # of items EVER received as hit or miss:
+ val getItemsHit = new AtomicLong(0)
+ Stats.removeCounter(statNamed("get_items_hit"))
+ Stats.makeCounter(statNamed("get_items_hit"), getItemsHit)
+ val getItemsMiss = new AtomicLong(0)
+ Stats.removeCounter(statNamed("get_items_miss"))
+ Stats.makeCounter(statNamed("get_items_miss"), getItemsMiss)
// # of items that were expired by the time they were read:
val totalExpired = Stats.getCounter(statNamed("expired_items"))
@@ -110,7 +127,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
Array(
("items", length.toString),
("bytes", bytes.toString),
- ("total_items", totalItems().toString),
+ ("total_items", putItems.toString),
("logsize", journalSize.toString),
("expired_items", totalExpired().toString),
("mem_items", memoryLength.toString),
@@ -296,8 +313,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
future.setValue(None)
}
val w = waiters.add(deadline.get, onTrigger, onTimeout)
- // FIXME: use onCancellation when util-core is bumped.
- future.linkTo(new CancellableSink({ waiters.remove(w) }))
+ future.onCancellation { waiters.remove(w) }
false
} else {
true
@@ -316,13 +332,19 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
Stats.addMetric(statName, usec)
Stats.addMetric("q/" + name + "/" + statName, usec)
}
- promise
+ promise map { itemOption =>
+ if (itemOption.isDefined) getItemsHit.getAndIncrement() else getItemsMiss.getAndIncrement()
+ itemOption
+ }
}
final def waitPeek(deadline: Option[Time]): Future[Option[QItem]] = {
val promise = new Promise[Option[QItem]]()
waitOperation(peek(), Time.now, deadline, promise)
- promise
+ promise map { itemOption =>
+ if (itemOption.isDefined) getItemsHit.getAndIncrement() else getItemsMiss.getAndIncrement()
+ itemOption
+ }
}
/**
@@ -483,7 +505,8 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
queue += item
_memoryBytes += item.data.length
}
- totalItems.incr()
+ putItems.getAndIncrement()
+ putBytes.getAndAdd(item.data.length)
queueSize += item.data.length
queueLength += 1
}
View
4 src/main/scala/net/lag/kestrel/QueueCollection.scala
@@ -196,7 +196,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
queue(name) map { q => q.discardExpired(limit) } getOrElse(0)
}
}
-
+
def expireQueue(name: String): Unit = {
if (!shuttingDown) {
queues.get(name) map { q =>
@@ -213,7 +213,7 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
def flushAllExpired(limit: Boolean = false): Int = {
queueNames.foldLeft(0) { (sum, qName) => sum + flushExpired(qName, limit) }
}
-
+
def deleteExpiredQueues(): Unit = {
queueNames.map { qName => expireQueue(qName) }
}
View
34 src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
@@ -47,14 +47,15 @@ class PersistentQueueSpec extends Specification
q.setup
q.length mustEqual 0
- q.totalItems() mustEqual 0
+ q.putItems.get mustEqual 0L
q.bytes mustEqual 0
q.journalSize mustEqual 0
q.add("hello kitty".getBytes)
q.length mustEqual 1
- q.totalItems() mustEqual 1
+ q.putItems.get mustEqual 1L
+ q.putBytes.get mustEqual 11L
q.bytes mustEqual 11
q.journalSize mustEqual 32
new File(folderName, "work").length mustEqual 32
@@ -62,7 +63,8 @@ class PersistentQueueSpec extends Specification
new String(q.remove.get.data) mustEqual "hello kitty"
q.length mustEqual 0
- q.totalItems() mustEqual 1
+ q.putItems.get mustEqual 1L
+ q.putBytes.get mustEqual 11L
q.bytes mustEqual 0
q.journalSize mustEqual 33
@@ -92,7 +94,7 @@ class PersistentQueueSpec extends Specification
q.setup()
q.length mustEqual 0
- q.totalItems() mustEqual 0
+ q.putItems.get mustEqual 0L
q.bytes mustEqual 0
q.journalSize mustEqual 0
q.totalFlushes() mustEqual 0
@@ -104,7 +106,7 @@ class PersistentQueueSpec extends Specification
q.flush()
q.length mustEqual 0
-
+
q.totalFlushes() mustEqual 1
// journal should contain exactly: one unfinished transaction, 2 items.
@@ -125,20 +127,20 @@ class PersistentQueueSpec extends Specification
q.add(new Array[Byte](32))
q.add(new Array[Byte](64))
q.length mustEqual 2
- q.totalItems() mustEqual 2
+ q.putItems.get mustEqual 2L
q.bytes mustEqual 32 + 64
(q.journalTotalSize > 96) mustBe true
q.remove()
q.length mustEqual 1
- q.totalItems() mustEqual 2
+ q.putItems.get mustEqual 2L
q.bytes mustEqual 64
(q.journalTotalSize > 96) mustBe true
// now it should rotate:
q.remove()
q.length mustEqual 0
- q.totalItems() mustEqual 2
+ q.putItems.get mustEqual 2L
q.bytes mustEqual 0
(q.journalTotalSize < 10) mustBe true
}
@@ -155,13 +157,13 @@ class PersistentQueueSpec extends Specification
q.add(new Array[Byte](32))
q.add(new Array[Byte](64))
q.length mustEqual 2
- q.totalItems() mustEqual 2
+ q.putItems.get mustEqual 2
q.bytes mustEqual 32 + 64
(q.journalSize > 96) mustBe true
q.remove()
q.length mustEqual 1
- q.totalItems() mustEqual 2
+ q.putItems.get mustEqual 2
q.bytes mustEqual 64
(q.journalSize > 96) mustBe true
@@ -169,7 +171,7 @@ class PersistentQueueSpec extends Specification
q.remove(true)
q.length mustEqual 0
q.openTransactionCount mustEqual 1
- q.totalItems() mustEqual 2
+ q.putItems.get mustEqual 2
q.bytes mustEqual 0
(q.journalSize < 96) mustBe true
}
@@ -617,7 +619,7 @@ class PersistentQueueSpec extends Specification
"expire queue" in {
withTempFolder {
- Time.withCurrentTimeFrozen { time =>
+ Time.withCurrentTimeFrozen { time =>
val config = new QueueBuilder {
keepJournal = false
maxQueueAge = 90.seconds
@@ -629,18 +631,18 @@ class PersistentQueueSpec extends Specification
q.isReadyForExpiration mustEqual false
q.add("method man".getBytes, None) mustEqual true
-
+
time.advance(30.seconds)
// We aren't ready to expire yet, as it's not been long enough
q.isReadyForExpiration mustEqual false
-
+
time.advance(61.seconds)
-
+
// Still not ready, as we have items in the queue!
q.isReadyForExpiration mustEqual false
q.remove must beSomeQItem("method man") // queue is now empty
-
+
// This should be true now because the queue is 91 seconds old and
// has no items
q.isReadyForExpiration mustEqual true
Please sign in to comment.
Something went wrong with that request. Please try again.