Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Expiring queues.

  • Loading branch information...
commit 7841145f7415a9465c6f1d167736c819cf47ddc8 1 parent 97ccc0c
Cory G Watson gphat authored
15 docs/guide.md
View
@@ -148,6 +148,18 @@ expired at the same time, `maxExpireSweep` limits the number of items that
will be removed by the background thread in a single round. This is primarily
useful as a throttling mechanism when using a queue as a way to delay work.
+Queue expiration
+----------------
+
+Queues can be configure to expire whole queues as well. When a queue is
+created the time is noted. It is periodically (see the aforementioned
+`expirationTimerFrequency`) checked against the current time and the
+`maxQueueAge` configuration option. If the queue is *empty* and the current time
+is greater than create time + `maxQueueAge` then the queue is ready to be
+expired and will be deleted.
+
+A `maxQueueAge` of zero, which is usually the default, means a queue never
+expires.
Fanout Queues
-------------
@@ -356,7 +368,8 @@ Global stats reported by kestrel are:
- `bytes_read` - total bytes read from clients
- `bytes_written` - total bytes written to clients
- `queue_creates` - total number of queues created
-- `queue_deletes` - total number of queues deleted
+- `queue_deletes` - total number of queues deleted (includes expires)
+- `queue_expires` - total number of queues expires
For each queue, the following stats are also reported:
1  src/main/scala/net/lag/kestrel/Kestrel.scala
View
@@ -154,6 +154,7 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
if (expired > 0) {
log.info("Expired %d item(s) from queues automatically.", expired)
}
+ Kestrel.this.queueCollection.deleteExpiredQueues()
}
}.start()
}
1  src/main/scala/net/lag/kestrel/MemcacheHandler.scala
View
@@ -201,6 +201,7 @@ extends NettyHandler[MemcacheRequest](channelGroup, queueCollection, maxOpenTran
report += (("bytes_written", Stats.getCounter("bytes_written")().toString))
report += (("queue_creates", Stats.getCounter("queue_creates")().toString))
report += (("queue_deletes", Stats.getCounter("queue_deletes")().toString))
+ report += (("queue_expires", Stats.getCounter("queue_expires")().toString))
for (qName <- queues.queueNames) {
report ++= queues.stats(qName).map { case (k, v) => ("queue_" + qName + "_" + k, v) }
20 src/main/scala/net/lag/kestrel/PersistentQueue.scala
View
@@ -44,6 +44,9 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
// age of the last item read from the queue:
private var _currentAge: Duration = 0.milliseconds
+
+ // time the queue was created
+ private var _createTime = Time.now
def statNamed(statName: String) = "q/" + name + "/" + statName
@@ -139,6 +142,23 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
}
}
+ /**
+ * Check if this Queue is eligible for expiration by way of it being empty
+ * and it's age being greater than or equal to maxQueueAge
+ */
+ def isReadyForExpiration: Boolean = {
+ // Don't even bother if the maxQueueAge is None
+ if(config.maxQueueAge.isDefined && config.maxQueueAge.get == None) {
+ false
+ } else {
+ if(queue.isEmpty && config.maxQueueAge.isDefined && Time.now > _createTime + config.maxQueueAge.get) {
+ true
+ } else {
+ false
+ }
+ }
+ }
+
// you are holding the lock, and config.keepJournal is true.
private def checkRotateJournal() {
/*
21 src/main/scala/net/lag/kestrel/QueueCollection.scala
View
@@ -196,10 +196,31 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
queue(name) map { q => q.discardExpired(q.config.maxExpireSweep) } getOrElse(0)
}
}
+
+ def expireQueue(name: String): Unit = {
+
+ if(!shuttingDown) {
+ queues.get(name) map { q =>
+ if(q.isReadyForExpiration) {
+ delete(name)
+ Stats.incr("queue_expires")
+ log.info("Expired queue %s", name)
+ }
+ }
+ }
+ return 1
+ }
def flushAllExpired(): Int = {
queueNames.foldLeft(0) { (sum, qName) => sum + flushExpired(qName) }
}
+
+ def deleteExpiredQueues(): Unit = {
+
+ // Now that we've cleaned out the queue, lets see if any of them are
+ // ready to be expired.
+ queueNames.map { qName => expireQueue(qName) }
+ }
def stats(key: String): Array[(String, String)] = queue(key) match {
case None => Array[(String, String)]()
15 src/main/scala/net/lag/kestrel/config/KestrelConfig.scala
View
@@ -39,14 +39,15 @@ case class QueueConfig(
syncJournal: Duration,
expireToQueue: Option[String],
maxExpireSweep: Int,
- fanoutOnly: Boolean
+ fanoutOnly: Boolean,
+ maxQueueAge: Option[Duration]
) {
override def toString() = {
("maxItems=%d maxSize=%s maxItemSize=%s maxAge=%s defaultJournalSize=%s maxMemorySize=%s " +
"maxJournalSize=%s discardOldWhenFull=%s keepJournal=%s syncJournal=%s " +
- "expireToQueue=%s maxExpireSweep=%d fanoutOnly=%s").format(maxItems, maxSize,
+ "expireToQueue=%s maxExpireSweep=%d fanoutOnly=%s maxQueueAge=%s").format(maxItems, maxSize,
maxItemSize, maxAge, defaultJournalSize, maxMemorySize, maxJournalSize, discardOldWhenFull,
- keepJournal, syncJournal, expireToQueue, maxExpireSweep, fanoutOnly)
+ keepJournal, syncJournal, expireToQueue, maxExpireSweep, fanoutOnly, maxQueueAge)
}
}
@@ -140,10 +141,16 @@ class QueueBuilder extends Config[QueueConfig] {
*/
var fanoutOnly: Boolean = false
+ /**
+ * Expiration time for the queue itself. If the queue is empty and older
+ * than this value then we should delete it.
+ */
+ var maxQueueAge: Option[Duration] = None
+
def apply() = {
QueueConfig(maxItems, maxSize, maxItemSize, maxAge, defaultJournalSize, maxMemorySize,
maxJournalSize, discardOldWhenFull, keepJournal, syncJournal,
- expireToQueue, maxExpireSweep, fanoutOnly)
+ expireToQueue, maxExpireSweep, fanoutOnly, maxQueueAge)
}
}
38 src/test/scala/net/lag/kestrel/PersistentQueueSpec.scala
View
@@ -611,6 +611,44 @@ class PersistentQueueSpec extends Specification
}
}
+ "PersistentQueue with expiry" should {
+ val timer = new FakeTimer()
+ val scheduler = new ScheduledThreadPoolExecutor(1)
+
+ "expire queue" in {
+ withTempFolder {
+ Time.withCurrentTimeFrozen { time =>
+ val config = new QueueBuilder {
+ keepJournal = false
+ maxQueueAge = 90.seconds
+ }.apply()
+ val q = new PersistentQueue("wu_tang", folderName, config, timer, scheduler)
+ q.setup()
+
+ // Not ready, we just got started!
+ q.isReadyForExpiration mustEqual false
+
+ q.add("method man".getBytes, None) mustEqual true
+
+ time.advance(30.seconds)
+ // We aren't ready to expire yet, as it's not been long enough
+ q.isReadyForExpiration mustEqual false
+
+ time.advance(61.seconds)
+
+ // Still not ready, as we have items in the queue!
+ q.isReadyForExpiration mustEqual false
+
+ q.remove must beSomeQItem("method man") // queue is now empty
+
+ // This should be true now because the queue is 91 seconds old and
+ // has no items
+ q.isReadyForExpiration mustEqual true
+ }
+ }
+ }
+ }
+
"PersistentQueue with item expiry" should {
val timer = new FakeTimer()
val scheduler = new ScheduledThreadPoolExecutor(1)
Please sign in to comment.
Something went wrong with that request. Please try again.