Skip to content

Commit

Permalink
Staging changes for onAny and Monitor
Browse files Browse the repository at this point in the history
  • Loading branch information
jdarling committed Jan 17, 2013
1 parent d66fcbc commit 20fa1b6
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 16 deletions.
2 changes: 1 addition & 1 deletion bin/MongoMQ.js
@@ -1,7 +1,7 @@
var MongoMQ = require('../lib/MongoMQ'); var MongoMQ = require('../lib/MongoMQ');
var repl = require('repl'); var repl = require('repl');


var l = process.argv.length, tmp, names, reCmdLineStrip=/^(\-|\\|\/)*/i, opts = {}, var l = process.argv.length, tmp, names, reCmdLineStrip=/^(\-|\\|\/)*/i, opts = {autoStart: false},
alias = { alias = {
db: 'databaseName', db: 'databaseName',
collection: 'queueCollection', collection: 'queueCollection',
Expand Down
18 changes: 17 additions & 1 deletion lib/MongoMQ.js
Expand Up @@ -7,7 +7,8 @@ var QueueMonitor = require('./QueueMonitor');


var defaults = { var defaults = {
autoStart: true, autoStart: true,
queueCollection: 'queue' queueCollection: 'queue',
databaseName: 'mongomq'
}; };


var MongoMQ = module.exports = function(options, callback){ var MongoMQ = module.exports = function(options, callback){
Expand Down Expand Up @@ -267,6 +268,21 @@ MongoMQ.prototype.stopListeners = function(callback){
}; };
MongoMQ.prototype.stopListeners.description = 'Stops the queue listeners that have not yet been stopped.'; MongoMQ.prototype.stopListeners.description = 'Stops the queue listeners that have not yet been stopped.';



MongoMQ.prototype.onAny = function(options, callback){
var self = this;
if(typeof(options)==='function'){
callback = options;
options = {};
}
options=options||{};
options.listenerType = options.listenerType||'streams';
options.hereOnOut = typeof(options.hereOnOut)!=='undefined'?options.hereOnOut:true;
options.passive = true;
self.on('*', options, callback);
};
MongoMQ.prototype.onAny.description = 'Listens for any message on the message queue in a passive mode.';

MongoMQ.prototype.on = MongoMQ.prototype.addListener = function(event, options, callback){ MongoMQ.prototype.on = MongoMQ.prototype.addListener = function(event, options, callback){
var self = this; var self = this;
var list = self.monitors[event]=self.monitors[event]||[], i, l=list.length, index=false; var list = self.monitors[event]=self.monitors[event]||[], i, l=list.length, index=false;
Expand Down
52 changes: 38 additions & 14 deletions lib/QueueMonitor.js
Expand Up @@ -35,13 +35,32 @@ var QueueMonitor = module.exports = function(mq, event, options, callback){
self.start(); self.start();
}; };


QueueMonitor.prototype.buildSelector = function(cursorFixId){
var self = this, event = self.event, dt = new Date(), afterDt = new Date(dt-self.mq.serverTimeOffset), $or, i, l;
if(event!=='*'){
$or = [{handled: false, event: event}, {handled: {$exists: false}, event: event}, {fix: cursorFixId}];
}else{
$or = [{handled: false}, {handled: {$exists: false}, fix: {$exists: false}}, {fix: cursorFixId}];
}
if(self.options.hereOnOut){
l = $or.length;
for(i=0; i<l; i++){
if($or[i].fix!==cursorFixId){
$or[i].globalTime = {$gt: afterDt};
}
}
}
return {$or: $or};
};
QueueMonitor.prototype.buildSelector.description = 'Used internally by start_usingNextObject and start_usingStreams to build the message selector.';

QueueMonitor.prototype.start_usingNextObject = function(startupCallback){ QueueMonitor.prototype.start_usingNextObject = function(startupCallback){
// call out to setup the tailed cursor for us // call out to setup the tailed cursor for us
var self = this, cursorFixId = self.cursorFixId = UUID.v4(), mq = self.mq, callback = self.callback, event = self.event; var self = this, cursorFixId = self.cursorFixId = UUID.v4(), mq = self.mq, callback = self.callback, event = self.event;
// this is a work around so we don't get runaway handles when there is no records for us to process // this is a work around so we don't get runaway handles when there is no records for us to process
mq.collection(mq.options.queueCollection, function(err, collection){ mq.collection(mq.options.queueCollection, function(err, collection){
collection.insert({fix: cursorFixId}, {w: 1}, function(){ collection.insert({fix: cursorFixId}, {w: 1}, function(){
mq.tailedCursor(mq.options.queueCollection, {$or: [{handled: false, event: event}, {handled: {$exists: false}, event: event}, {fix: cursorFixId}]}, {$natural: 1}, function(err, cursor){ mq.tailedCursor(mq.options.queueCollection, self.buildSelector(cursorFixId), {$natural: 1}, function(err, cursor){
if(err){ if(err){
self.errorCallback(err); self.errorCallback(err);
startupCallback(err); startupCallback(err);
Expand All @@ -63,7 +82,7 @@ QueueMonitor.prototype.start_usingStreams = function(startupCallback){
mq.collection(mq.options.queueCollection, function(err, collection){ mq.collection(mq.options.queueCollection, function(err, collection){
if(collection){ if(collection){
collection.insert({fix: cursorFixId}, {w: 1}, function(){ collection.insert({fix: cursorFixId}, {w: 1}, function(){
mq.tailedCursorStream(mq.options.queueCollection, {$or: [{handled: false, event: event}, {handled: {$exists: false}, event: event}, {fix: cursorFixId}]}, {$natural: 1}, function(err, stream){ mq.tailedCursorStream(mq.options.queueCollection, self.buildSelector(cursorFixId), {$natural: 1}, function(err, stream){
if(!stream){ if(!stream){
self.start_usingStreams(startupCallback); self.start_usingStreams(startupCallback);
}else{ }else{
Expand Down Expand Up @@ -104,7 +123,7 @@ QueueMonitor.prototype.start = function(callback){
case('streams'): case('streams'):
self.start_usingStreams(started); self.start_usingStreams(started);
break; break;
case('nextObject'): case('nextobject'):
default: default:
self.start_usingNextObject(started); self.start_usingNextObject(started);
} }
Expand Down Expand Up @@ -169,18 +188,23 @@ QueueMonitor.prototype.handleResponse = function(err, record, doNext){
var dt = new Date(); var dt = new Date();
set.$set.pickedTime = new Date(dt-mq.serverTimeOffset) set.$set.pickedTime = new Date(dt-mq.serverTimeOffset)
} }
collection.findAndModify(record, {$natural: -1}, set, {w: ((mq.db||{}).serverConfig||{}).replicaSet?self.options.writeConcern||mq.db.serverConfig.servers.length:1}, function(err, data){ if(options.passive){
if(data||err){ mq.emitter.emit('recieved', data);
if(err){ callback(err, (data||{}).data, next);
self.errorCallback(err); }else{
collection.findAndModify(record, {$natural: -1}, set, {w: ((mq.db||{}).serverConfig||{}).replicaSet?self.options.writeConcern||mq.db.serverConfig.servers.length:1}, function(err, data){
if(data||err){
if(err){
self.errorCallback(err);
}
mq.emitter.emit('recieved', data);
callback(err, (data||{}).data, next);
}else{
// We didn't get the lock on the record
next();
} }
mq.emitter.emit('recieved', data); });
callback(err, (data||{}).data, next); }
}else{
// We didn't get the lock on the record
next();
}
});
} }
}); });
} }
Expand Down

0 comments on commit 20fa1b6

Please sign in to comment.