Permalink
Browse files

Make sure jobs aren't left queued.

Signed-off-by: Tim Smart <tim@fostle.com>
  • Loading branch information...
tim-smart committed Jul 6, 2012
1 parent dc5653b commit 43498a5854f6e079356e7c4db269510ec03c59ca
Showing with 10 additions and 7 deletions.
  1. +10 −7 index.js
View
@@ -55,7 +55,7 @@ var Queue = function (options) {
// On blpop
queue._onPop = function onPop (error) {
if (error) return queue._done(error)
if (error) return queue._done(error, true)
queue._next()
queue.client.watch(queue._prefix + ':queued')
@@ -64,19 +64,19 @@ var Queue = function (options) {
// On zrevrange
function gotQueued (error, job_id) {
if (error) return queue._done(error)
if (error) return queue._done(error, true)
if (!job_id || !job_id[0]) return queue._done()
queue.client.hget(queue._prefix + ':jobs', job_id[0].toString(), gotJob)
}
function gotJob (error, data) {
if (error) return queue._done(error)
if (error) return queue._done(error, true)
try {
queue._processJob(queue._returnJob(data))
} catch (error) {
return queue._done(error)
return queue._done(error, true)
}
}
@@ -281,13 +281,16 @@ Queue.prototype._next = function next () {
*
* @param @optional {Error} error
*/
Queue.prototype._done = function done (error) {
Queue.prototype._done = function done (error, premature) {
var queue = this
;--queue.processing
if (error) {
queue.emit('error', error)
}
if (premature) {
queue.client.rpush(queue._prefix + ':listen', '1')
}
queue._next()
@@ -315,8 +318,8 @@ Queue.prototype._processJob = function processJob (job) {
client.publish(queue._prefix + ':running' , job.id)
client.exec(function (error, results) {
if (error) return queue._done(error)
if (results === null) return queue._done()
if (error) return queue._done(error, true)
if (results === null) return queue._done(null, true)
if (job.timeout) {
job._timer = setTimeout(job._onTimeout, job.timeout, job)

0 comments on commit 43498a5

Please sign in to comment.