Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Queue Expiration #64

Closed
gphat opened this Issue · 13 comments

3 participants

@gphat

We've got a number of non-persistent queues that we vivify throughout the day and (inevitably) someone forgets to delete the queue before they disconnect. This leads to lots of old, stale queues around. Granted that most are programmer error…

Any thoughts on a way to expire these queues and remove them after a certain time? We OOMed our kestrel today because we had a nagios script that was not delete()ing it's temporary queue. ;)

@robey
Collaborator

this is becoming a common request -- i'll add it to the roadmap. :)

@nkvoll

+1 on this.

My use case is using kestrel for distributed RPC. In this scenario, I'd like to open a "transient" queue for responses.

Clients pull from a transient queue with an unique name (uuids etc), and adds the name of the response queue to outgoing messages.
Servers poll from the queue the clients sends to and replies by adding a message to the clients response queue. If the queue does not exist, kestrel should drop the message.

Maybe this functionality can come through the unused "fields" parameter? fields={transient:true} to create a non-persistent queue, fields={passive:true} to not create a queue if the queue doesn't already exist etc.

This is largely similar to how AMQP, but without the exchanges, message properties and headers.

@nkvoll

On second thought, I dont think the fields parameter is right for this, and maybe SET/GET should be extended instead :)

@gphat

I've got some tuits and thought I might take a shot at implementing this. Before doing so I thought it might be prudent to check in first.

Expiration seems easy at first, but after thinking it through I think it's a bit of an iceberg.

The queue already keeps up with it's "age" (_currentAge, time since last read). Would expiration be the time since the last read or the overall time the queue has been in existence? The latter would require a new variable.

After solving these little nits the only remaining problem seems to be where to do it. That seems easier. The existing background process calls flushAllExpired where a check of the queue's age could result in expiration. In addition we'd probably need to check on queue operations (add, remove, peek, etc) and expire the queue if applicable.

All the above seems reasonable, but I'm suspicious that it's not as simple as creating an expire function in QueueCollection that mirrors delete in all but it's counter increment.

What say you?

@robey
Collaborator

it sounds like the first use-case would be satisfied by checking that the queue is empty, and that it has an age > (some duration).

the second use-case, dropping messages that nobody's listening for anymore, would require the check to happen even if the queue isn't empty. but that use-case seems a bit more dubious to me -- i worry about lost jobs.

@gphat

It being empty is likely legit for our case. Our messages should also have small expirations, therefore the queue should be empty.

Your first use-case is spot on!

@robey
Collaborator

oh yeah, expiring items would solve case 2. :) so i think your implementation would work well. and a check in flushAllExpired sounds like the right place.

@gphat

I've written a first pass at adding this behavior and it (sorta, see below) works for me! You can take a look here if you can look past a couple of misplaced logging statements: (Critiques welcome)

gphat/kestrel@7a68ff3

The issue is that the _currentAge is only updated on reads. Therefore if nobody reads the queue after it's passed the maximum age, it won't ever be expired.

I see two ways to fix this:

  • Update the age when an item is expired (which seems a bit hackish, as it would break what that variable means)
  • Add a val for the date the queue was created and use that for comparison (timeCreated + maxQueueAge >= Time.now

Thoughts?

@gphat

Ping?

@robey
Collaborator

oh, i see, _currentAge doesn't change when the queue is empty, so you really want to know "how long has it been since something was read from this queue".

maybe just add a field "lastReadTime: Time" and set it whenever something is read (or expired)?

@gphat

Can do! I'll be back after I finish that. ;)

@robey
Collaborator

i think this got merged in. :)

@robey robey closed this
@gphat
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Something went wrong with that request. Please try again.