Permalink
Browse files

track queue item add time and compute duration on request

RB_ID=82735
  • Loading branch information...
1 parent e291cd2 commit 8c95afc03f748aae948f8347723d04ab6087d84b Stephan Zuercher committed Aug 24, 2012
@@ -43,8 +43,8 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
// current size of all data in the queue:
private var queueSize: Long = 0
- // age of the last item read from the queue:
- private var _currentAge: Duration = 0.milliseconds
+ // timestamp of the last item read from the queue:
+ private var _currentAge: Time = Time.epoch
// time the queue was created
private var _createTime = Time.now
@@ -115,7 +115,9 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
def maxMemoryBytes: Long = synchronized { config.maxMemorySize.inBytes }
def journalSize: Long = synchronized { journal.size }
def journalTotalSize: Long = journal.archivedSize + journalSize
- def currentAge: Duration = synchronized { if (queueSize == 0) 0.milliseconds else _currentAge }
+ def currentAge: Duration = synchronized {
+ if (queueSize == 0) 0.milliseconds else Time.now - _currentAge
+ }
def waiterCount: Long = synchronized { waiters.size }
def isClosed: Boolean = synchronized { closed || paused }
def createTime: Long = synchronized { _createTime.inSeconds }
@@ -565,7 +567,7 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
_memoryBytes -= len
queueLength -= 1
fillReadBehind()
- _currentAge = now - item.addTime
+ _currentAge = item.addTime
if (transaction) {
item.xid = xid.getOrElse { nextXid() }
openTransactions(item.xid) = item
@@ -525,6 +525,24 @@ class PersistentQueueSpec extends Specification
}
}
+ "report the age of the queue in the absence of gets" in {
+ withTempFolder {
+ Time.withCurrentTimeFrozen { time =>
+ val q = new PersistentQueue("things", folderName, new QueueBuilder().apply(), timer, scheduler)
+ q.setup
+ put(q, 128, 0)
+ put(q, 128, 0)
+ q.remove()
+
+ time.advance(10.milliseconds)
+ q.currentAge mustEqual 10.milliseconds
+
+ time.advance(10.milliseconds)
+ q.currentAge mustEqual 20.milliseconds
+ }
+ }
+ }
+
"remove all stats" in {
def stats(queueName: String): List[String] = {
val prefix = "q/" + queueName + "/"

0 comments on commit 8c95afc

Please sign in to comment.