Skip to content
This repository has been archived by the owner on Sep 18, 2021. It is now read-only.

Commit

Permalink
Modify various bit after review by robey.
Browse files Browse the repository at this point in the history
  • Loading branch information
gphat committed Feb 16, 2012
1 parent 7841145 commit 52c5fac
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 19 deletions.
11 changes: 4 additions & 7 deletions docs/guide.md
Expand Up @@ -151,12 +151,10 @@ useful as a throttling mechanism when using a queue as a way to delay work.
Queue expiration Queue expiration
---------------- ----------------


Queues can be configure to expire whole queues as well. When a queue is Whole queues can be configured to expire as well. If `maxQueueAge` is set
created the time is noted. It is periodically (see the aforementioned `expirationTimerFrequency` is used to check the queue age. If the queue is
`expirationTimerFrequency`) checked against the current time and the empty, and it has been longer than `maxQueueAge` since it was created then
`maxQueueAge` configuration option. If the queue is *empty* and the current time the queue will be deleted.
is greater than create time + `maxQueueAge` then the queue is ready to be
expired and will be deleted.


A `maxQueueAge` of zero, which is usually the default, means a queue never A `maxQueueAge` of zero, which is usually the default, means a queue never
expires. expires.
Expand All @@ -180,7 +178,6 @@ is created, and it will start receiving new items written to the parent queue.
Existing items are not copied over. A fanout queue can be deleted to stop it Existing items are not copied over. A fanout queue can be deleted to stop it
from receiving new items. from receiving new items.



Memcache commands Memcache commands
----------------- -----------------


Expand Down
2 changes: 2 additions & 0 deletions src/main/scala/net/lag/kestrel/Kestrel.scala
Expand Up @@ -154,6 +154,8 @@ class Kestrel(defaultQueueConfig: QueueConfig, builders: List[QueueBuilder],
if (expired > 0) { if (expired > 0) {
log.info("Expired %d item(s) from queues automatically.", expired) log.info("Expired %d item(s) from queues automatically.", expired)
} }
// Now that we've cleaned out the queue, lets see if any of them are
// ready to be expired.
Kestrel.this.queueCollection.deleteExpiredQueues() Kestrel.this.queueCollection.deleteExpiredQueues()
} }
}.start() }.start()
Expand Down
10 changes: 3 additions & 7 deletions src/main/scala/net/lag/kestrel/PersistentQueue.scala
Expand Up @@ -148,14 +148,10 @@ class PersistentQueue(val name: String, persistencePath: String, @volatile var c
*/ */
def isReadyForExpiration: Boolean = { def isReadyForExpiration: Boolean = {
// Don't even bother if the maxQueueAge is None // Don't even bother if the maxQueueAge is None
if(config.maxQueueAge.isDefined && config.maxQueueAge.get == None) { if(config.maxQueueAge.isDefined && queue.isEmpty && Time.now > _createTime + config.maxQueueAge.get) {
false true
} else { } else {
if(queue.isEmpty && config.maxQueueAge.isDefined && Time.now > _createTime + config.maxQueueAge.get) { false
true
} else {
false
}
} }
} }


Expand Down
7 changes: 2 additions & 5 deletions src/main/scala/net/lag/kestrel/QueueCollection.scala
Expand Up @@ -198,10 +198,9 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S
} }


def expireQueue(name: String): Unit = { def expireQueue(name: String): Unit = {

if (!shuttingDown) {
if(!shuttingDown) {
queues.get(name) map { q => queues.get(name) map { q =>
if(q.isReadyForExpiration) { if (q.isReadyForExpiration) {
delete(name) delete(name)
Stats.incr("queue_expires") Stats.incr("queue_expires")
log.info("Expired queue %s", name) log.info("Expired queue %s", name)
Expand All @@ -217,8 +216,6 @@ class QueueCollection(queueFolder: String, timer: Timer, journalSyncScheduler: S


def deleteExpiredQueues(): Unit = { def deleteExpiredQueues(): Unit = {


// Now that we've cleaned out the queue, lets see if any of them are
// ready to be expired.
queueNames.map { qName => expireQueue(qName) } queueNames.map { qName => expireQueue(qName) }
} }


Expand Down

0 comments on commit 52c5fac

Please sign in to comment.