Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

track queue item add time and compute duration on request

RB_ID=82735
  • Loading branch information...
commit 8c95afc03f748aae948f8347723d04ab6087d84b 1 parent e291cd2
@zuercher zuercher authored
View
10 src/main/scala/net/lag/kestrel/PersistentQueue.scala
@@ -43,8 +43,8 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
// current size of all data in the queue:
private var queueSize: Long = 0
- // age of the last item read from the queue:
- private var _currentAge: Duration = 0.milliseconds
+ // timestamp of the last item read from the queue:
+ private var _currentAge: Time = Time.epoch
// time the queue was created
private var _createTime = Time.now
@@ -115,7 +115,9 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
def maxMemoryBytes: Long = synchronized { config.maxMemorySize.inBytes }
def journalSize: Long = synchronized { journal.size }
def journalTotalSize: Long = journal.archivedSize + journalSize
- def currentAge: Duration = synchronized { if (queueSize == 0) 0.milliseconds else _currentAge }
+ def currentAge: Duration = synchronized {
+ if (queueSize == 0) 0.milliseconds else Time.now - _currentAge
+ }
def waiterCount: Long = synchronized { waiters.size }
def isClosed: Boolean = synchronized { closed || paused }
def createTime: Long = synchronized { _createTime.inSeconds }
@@ -565,7 +567,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
_memoryBytes -= len
queueLength -= 1
fillReadBehind()
- _currentAge = now - item.addTime
+ _currentAge = item.addTime
if (transaction) {
item.xid = xid.getOrElse { nextXid() }
openTransactions(item.xid) = item
View
18 src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
@@ -525,6 +525,24 @@ class PersistentQueueSpec extends Specification
}
}
+ "report the age of the queue in the absence of gets" in {
+ withTempFolder {
+ Time.withCurrentTimeFrozen { time =>
+ val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
+ q.setup
+ put(q, 128, 0)
+ put(q, 128, 0)
+ q.remove()
+
+ time.advance(10.milliseconds)
+ q.currentAge mustEqual 10.milliseconds
+
+ time.advance(10.milliseconds)
+ q.currentAge mustEqual 20.milliseconds
+ }
+ }
+ }
+
"remove all stats" in {
def stats(queueName: String): List[String] = {
val prefix = "q/" + queueName + "/"
Please sign in to comment.
Something went wrong with that request. Please try again.