Permalink
Browse files

A method to assist with high-speed purging of messages which match so…

…me predicate
  • Loading branch information...
1 parent 1688bc4 commit 26546c00949136ed591f2f8cfa355cf44197a1f2 @jhs committed Apr 2, 2013
Showing with 81 additions and 0 deletions.
  1. +81 −0 queue.js
View
@@ -275,6 +275,87 @@ Queue.prototype.receive = function(opts, callback) {
}
+Queue.prototype.scrub = function(opts, callback) {
+ var self = this;
+
+ if(typeof callback != 'function' || typeof opts != 'object')
+ throw new Error('Need options object and callback function')
+ if(typeof opts.match != 'function')
+ throw new Error('Need options.match function to evaluate messages')
+
+ self.confirmed(function(er) {
+ if(er)
+ return callback(er)
+
+ var startkey = lib.JS([ self.name, "" ])
+ var endkey = lib.JS([ self.name, {} ]) // All messages
+
+ if(opts.descending) {
+ var _tmp = endkey
+ endkey = startkey
+ startkey = _tmp
+ }
+
+ var query = { 'reduce': false
+ , 'startkey': startkey
+ , 'endkey' : endkey
+ }
+
+ if(opts.count)
+ query.limit = opts.count
+
+ query = querystring.stringify(query)
+
+ var path = lib.enc_id(self.ddoc_id) + '/_view/visible_at?' + query
+ self.db.request(path, function(er, resp, view) {
+ if(er)
+ return callback(er)
+
+ var update = {'docs':[]}
+ for(var i = 0; i < view.rows.length; i++) {
+ var row = view.rows[i]
+ var msg = row.value.Body
+
+ try {
+ var is_match = !! opts.match(msg)
+ } catch (match_er) {
+ return callback(match_er)
+ }
+
+ // console.log('Row: %j', row)
+ if(is_match)
+ update.docs.push({'_id':row.id, '_rev':row.value._rev, '_deleted':true})
+ }
+
+ var found_rows = update.docs.length
+ // console.log('Delete %d docs', found_rows)
+
+ self.db.request({'method':'POST', 'uri':'_bulk_docs', 'json':update}, function(er, res) {
+ if(er)
+ return callback(er)
+ if(res.statusCode != 201)
+ return callback(new Error('Bad response '+res.statusCode+' to bulk update'))
+
+ var deleted = 0
+ var conflicts = 0
+
+ for(var i = 0; i < res.body.length; i++) {
+ var result = res.body[i]
+ if(result.ok)
+ deleted += 1
+ else if(result.error == 'conflict')
+ conflicts += 1
+ else
+ return callback(new Error('Unknown update: ' + JSON.stringify(result)))
+ }
+
+ return callback(null, found_rows, deleted, conflicts)
+ })
+ })
+ }) // self.confirmed
+}
+
+
Queue.prototype.set = function set_attrs(opts, callback) {
var self = this;
assert.ok(opts);

0 comments on commit 26546c0

Please sign in to comment.