From 20fa1b6f5bebb13526270c0327f42378dbce6ae1 Mon Sep 17 00:00:00 2001 From: Jeremy Darling Date: Wed, 16 Jan 2013 19:42:58 -0600 Subject: [PATCH] Staging changes for onAny and Monitor --- bin/MongoMQ.js | 2 +- lib/MongoMQ.js | 18 +++++++++++++++- lib/QueueMonitor.js | 52 +++++++++++++++++++++++++++++++++------------ 3 files changed, 56 insertions(+), 16 deletions(-) diff --git a/bin/MongoMQ.js b/bin/MongoMQ.js index 0fa8b12..64977cf 100644 --- a/bin/MongoMQ.js +++ b/bin/MongoMQ.js @@ -1,7 +1,7 @@ var MongoMQ = require('../lib/MongoMQ'); var repl = require('repl'); -var l = process.argv.length, tmp, names, reCmdLineStrip=/^(\-|\\|\/)*/i, opts = {}, +var l = process.argv.length, tmp, names, reCmdLineStrip=/^(\-|\\|\/)*/i, opts = {autoStart: false}, alias = { db: 'databaseName', collection: 'queueCollection', diff --git a/lib/MongoMQ.js b/lib/MongoMQ.js index 47f1c43..5fdc56c 100644 --- a/lib/MongoMQ.js +++ b/lib/MongoMQ.js @@ -7,7 +7,8 @@ var QueueMonitor = require('./QueueMonitor'); var defaults = { autoStart: true, - queueCollection: 'queue' + queueCollection: 'queue', + databaseName: 'mongomq' }; var MongoMQ = module.exports = function(options, callback){ @@ -267,6 +268,21 @@ MongoMQ.prototype.stopListeners = function(callback){ }; MongoMQ.prototype.stopListeners.description = 'Stops the queue listeners that have not yet been stopped.'; + +MongoMQ.prototype.onAny = function(options, callback){ + var self = this; + if(typeof(options)==='function'){ + callback = options; + options = {}; + } + options=options||{}; + options.listenerType = options.listenerType||'streams'; + options.hereOnOut = typeof(options.hereOnOut)!=='undefined'?options.hereOnOut:true; + options.passive = true; + self.on('*', options, callback); +}; +MongoMQ.prototype.onAny.description = 'Listens for any message on the message queue in a passive mode.'; + MongoMQ.prototype.on = MongoMQ.prototype.addListener = function(event, options, callback){ var self = this; var list = self.monitors[event]=self.monitors[event]||[], i, l=list.length, index=false; diff --git a/lib/QueueMonitor.js b/lib/QueueMonitor.js index c2aa134..8fdccb8 100644 --- a/lib/QueueMonitor.js +++ b/lib/QueueMonitor.js @@ -35,13 +35,32 @@ var QueueMonitor = module.exports = function(mq, event, options, callback){ self.start(); }; +QueueMonitor.prototype.buildSelector = function(cursorFixId){ + var self = this, event = self.event, dt = new Date(), afterDt = new Date(dt-self.mq.serverTimeOffset), $or, i, l; + if(event!=='*'){ + $or = [{handled: false, event: event}, {handled: {$exists: false}, event: event}, {fix: cursorFixId}]; + }else{ + $or = [{handled: false}, {handled: {$exists: false}, fix: {$exists: false}}, {fix: cursorFixId}]; + } + if(self.options.hereOnOut){ + l = $or.length; + for(i=0; i