Skip to content

Commit

Permalink
added Queue#changes
Browse files Browse the repository at this point in the history
now based on latest master
  • Loading branch information
fb55 committed Nov 2, 2013
1 parent e072380 commit 611b5ed
Showing 1 changed file with 62 additions and 0 deletions.
62 changes: 62 additions & 0 deletions queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,68 @@ Queue.prototype.receive = function(opts, callback) {
})
}

Queue.prototype.changes = function(opts) {
var self = this

if(typeof opts != 'object')
opts = {}

var prefix = 'CQS/' + self.name + '/'
var feed = self.db.changes(opts)
var limit = opts.ConcurrencyLimit || 1e3
var count = 0
var vis_timeout = opts.VisibilityTimeout || self.VisibilityTimeout

assert.ok(vis_timeout >= 0, "Visibility timeout is too low")
assert.ok(vis_timeout <= 43200, "Visibility timeout is too high")

//TODO this is a rather ugly solution (as it consumes _all_ changes)
//follow also allows `filter` to be based on a view, which is probably much better suited
feed.filter = function(doc, req) {
return doc._id.substr(0, prefix.length) === prefix
&& Date.parse(doc.visible_at) <= Date.now()
}

feed.on('change', function(change) {
if(count++ > limit) feed.pause()

var msg_opts = {}
lib.copy(change.doc, msg_opts, 'uppercase')
msg_opts.MessageId = change.id.substr(prefix.length)

var msg = new message.Message(msg_opts)
msg.queue = self
msg.VisibilityTimeout = vis_timeout
msg.is_heartbeat = false
msg.mvcc = { _id: change.doc._id, _rev: change.doc._rev }
msg.receive(function(err) {
if(err) {
if(err.statusCode == 409 && err.error == 'conflict');
else feed.emit('error', err)
reduceCount()
} else {
feed.emit('message', msg, callback)
}

var called = false
function callback() {
if(!called){
called = true
reduceCount()
}
}
})

function reduceCount() {
count -= 1
if(feed.is_paused) feed.resume()
}
})

feed.follow()

return feed
}

Queue.prototype.scrub = function(opts, callback) {
var self = this;
Expand Down

0 comments on commit 611b5ed

Please sign in to comment.