Expiring Queues #87

Merged
merged 5 commits into from Feb 17, 2012

Conversation

Projects
None yet
2 participants

gphat commented Feb 16, 2012

I've added support for expiring queues (re: #64).

PersistentQueue now has a create time and it is checked after expired items. When used in conjunction with expiring items, this cleans up queues quiet nicely. Tests included.

I'm new to Scala so I'd love any tips and improvements that you might have for me, especially the Queue's check of it's expiry. It feels clunky, but it works.

src/main/scala/net/lag/kestrel/PersistentQueue.scala
+ */
+ def isReadyForExpiration: Boolean = {
+ // Don't even bother if the maxQueueAge is None
+ if(config.maxQueueAge.isDefined && config.maxQueueAge.get == None) {
@robey

robey Feb 16, 2012

Contributor

if config.maxQueueAge isn't defined, "get" will throw an exception. you can omit this line entirely and just start with #154 below...

src/main/scala/net/lag/kestrel/PersistentQueue.scala
+ if(config.maxQueueAge.isDefined && config.maxQueueAge.get == None) {
+ false
+ } else {
+ if(queue.isEmpty && config.maxQueueAge.isDefined && Time.now > _createTime + config.maxQueueAge.get) {
@robey

robey Feb 16, 2012

Contributor

... which i would start with "if (config.maxQueueAge.isDefined &&" to shortcut out right away unless a maxQueueAge is defined.

src/main/scala/net/lag/kestrel/QueueCollection.scala
+
+ def deleteExpiredQueues(): Unit = {
+
+ // Now that we've cleaned out the queue, lets see if any of them are
@robey

robey Feb 16, 2012

Contributor

i would move this comment to Kestrel.scala#157 where it makes more sense.

@@ -148,6 +148,18 @@ expired at the same time, `maxExpireSweep` limits the number of items that
will be removed by the background thread in a single round. This is primarily
useful as a throttling mechanism when using a queue as a way to delay work.
+Queue expiration
@robey

robey Feb 16, 2012

Contributor

sorry to fuss over the wording here, but how about something like

"Whole queues can be configured to expire as well. If maxQueueAge is set, expirationTimerFrequency is used to check the queue age. If the queue is empty, and it has been longer than maxQueueAge since it was created, then the queue will be deleted."

src/main/scala/net/lag/kestrel/QueueCollection.scala
+
+ def expireQueue(name: String): Unit = {
+
+ if(!shuttingDown) {
@robey

robey Feb 16, 2012

Contributor

here and elsewhere, use "if (" with a space, and remove the blank line at the beginning of the function.

Contributor

robey commented Feb 16, 2012

mostly nitpicky style comments, but i like this overall!

gphat commented Feb 16, 2012

Here's an updated version! Sorry for the mismatched styling. I forgot the space-between-if and wasn't noticing the extra line. I barely managed to keep the 2 space indentation.

Also, thanks for the updated guide text. I dunno how many beers I'd had when I wrote the original, it was terrible. :)

Lemme know if this passes muster!

docs/guide.md
+empty, and it has been longer than `maxQueueAge` since it was created then
+the queue will be deleted.
+
+A `maxQueueAge` of zero, which is usually the default, means a queue never
@robey

robey Feb 16, 2012

Contributor

this isn't true, since it's an Option. i'd just delete this line.

src/main/scala/net/lag/kestrel/PersistentQueue.scala
+ */
+ def isReadyForExpiration: Boolean = {
+ // Don't even bother if the maxQueueAge is None
+ if(config.maxQueueAge.isDefined && queue.isEmpty && Time.now > _createTime + config.maxQueueAge.get) {
@robey

robey Feb 16, 2012

Contributor

if (

def flushAllExpired(): Int = {
queueNames.foldLeft(0) { (sum, qName) => sum + flushExpired(qName) }
}
+
+ def deleteExpiredQueues(): Unit = {
+
@robey

robey Feb 16, 2012

Contributor

delete this line. :)

Contributor

robey commented Feb 16, 2012

good after that!

gphat commented Feb 17, 2012

Ugh, sorry for missing those. :(

gphat commented Feb 17, 2012

Hrm, I didn't really mean to include that master merge, but I don't think it hurts anything.

That last commit exposes the create time of the queue as a gauge and documents age_msec, which was already there but undocumented.

robey added a commit that referenced this pull request Feb 17, 2012

@robey robey merged commit 5b6e0b2 into twitter-archive:master Feb 17, 2012

Contributor

robey commented Feb 17, 2012

thanks! :)

gphat commented Feb 17, 2012

No problem! Thanks for the help.

When you roll this release, please update to the latest ostrich so my fixes there go in as well. ;)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment