Skip to content
Browse files

Release v0.2.1 - Bug fixes and refactoring

v0.2.1
* Majorly refactored code
* Added autoIndexId: true to queue collection creation
* Better MongoMQ application with help()
* Updated test application
* Added an exception to emit() when you try to emit before start() has
been called
* fix to onAny so it will restart listeners after a close() and start()
re-issue
* Added remove*() methods
* Changed close() to stop()
* hereOnOut options - allows listeners to only pay attention to messages
posted after they have been started up
* Added ability to register listeners (via on and onAny) when queue is
not started
  • Loading branch information...
1 parent a6bd9e0 commit 5669f5e277dfd344bab0622105df4d395b80774d @jdarling committed Jul 17, 2012
Showing with 766 additions and 180 deletions.
  1. +24 −3 bin/MongoMQ.js
  2. +48 −20 bin/test.js
  3. +0 −1 lib/MongoConnection.js
  4. +317 −108 lib/MongoMQ.js
  5. +181 −0 lib/MongoMQ_bu.js
  6. +1 −1 package.json
  7. +195 −47 readme.md
View
27 bin/MongoMQ.js
@@ -23,13 +23,14 @@ var r = repl.start({
});
r.on('exit', function(){
- queue.close();
+ queue.stop(); // force a close
});
-var funcName, value;
+var funcName, value, funcs = [];
for(funcName in queue){
value = queue[funcName];
if(typeof(value)=='function'){
+ funcs.push(funcName);
r.context[funcName] = (function(f){
return function(){
f.apply(queue, arguments);
@@ -45,4 +46,24 @@ r.context.logAny = function(){
});
};
-r.context.queue = queue;
+r.context.help = function(){
+ msg = 'Built in test methods:\r\n'+
+ ' help() - shows this message\r\n'+
+ ' logAny() - logs any message to the console\r\n'+
+ ' listeners() - Alias of the queue.listeners property surfaced as a method\r\n';
+ var l = funcs.length;
+ for(var i = 0; i<l; i++){
+ msg += ' '+funcs[i]+'() - Alias of the queue.'+funcs[i]+' method\r\n';
+ }
+ msg += '\r\nInstance Data\r\n'+
+ ' queue - the global MongoMQ instance\r\n'
+ ;
+ console.log(msg);
+ return '';
+};
+
+r.context.listeners = function(){
+ return queue.listeners;
+};
+
+r.context.queue = queue;
View
68 bin/test.js
@@ -1,23 +1,17 @@
var MongoMQ = require('../lib/MongoMQ').MongoMQ;
var repl = require('repl');
-var queue = new MongoMQ();
+var queue = new MongoMQ({
+ autoStart: false
+});
var r = repl.start({
- prompt: ">"
+ prompt: "testbed>"
});
-
r.on('exit', function(){
- queue.close();
+ queue.stop();
});
-r.context.listen = function(){
- queue.on('test', function(err, data, next){
- console.log('got: ', data);
- next();
- });
-};
-
var msgidx = 0;
r.context.send = function(){
queue.emit('test', msgidx);
@@ -31,18 +25,52 @@ r.context.load = function(){
}
};
+var logMsg = function(err, data, next){
+ console.log('LOG: ', data);
+ next();
+ };
+var eatTest = function(err, data, next){
+ console.log('eat: ', data);
+ next();
+ };
+
r.context.logAny = function(){
- queue.onAny(function(err, data, next){
- console.log(data);
- next();
- });
+ queue.onAny(logMsg);
};
r.context.eatTest = function(){
- queue.on('test', function(err, data, next){
- console.log('eat: ', data);
- next();
- });
+ queue.on('test', eatTest);
+};
+
+r.context.start = function(cb){
+ queue.start(cb);
};
-r.context.queue = queue;
+r.context.stop = function(){
+ queue.stop();
+};
+
+r.context.help = function(){
+ console.log('Built in test methods:\r\n'+
+ ' help() - shows this message\r\n'+
+ ' logAny() - logs any message to the console\r\n'+
+ ' eatTest() - consumes next available "test" message from the queue\r\n'+
+ ' send() - places a "test" message on the queue\r\n'+
+ ' load() - places 100 "test" messages on the queue\r\n'+
+ ' start() - start the queue listener\r\n'+
+ ' stop() - stop the queue listener\r\n'+
+ '\r\nInstance Data\r\n'+
+ ' queue - the global MongoMQ instance\r\n'
+ );
+ return '';
+};
+
+/*
+queue.start(function(){
+ r.context.eatTest();
+});
+*/
+
+r.context.queue = queue;
+
+r.context.help();
View
1 lib/MongoConnection.js
@@ -3,7 +3,6 @@ var key;
for(key in mongodb){
if(hasOwnProperty.call(mongodb, key)){
- //console.log('Importing: ', key);
global[key] = exports[key] = mongodb[key];
}
}
View
425 lib/MongoMQ.js
@@ -1,150 +1,359 @@
-var MC = require("./MongoConnection").MongoConnection;
+var mongodb = require('mongodb');
-var MongoMQ = exports.MongoMQ = function(options){
- var self = this;
- options = options || {};
- self.mqCollectionName = options.mqCollectionName||'queue';
- self.mqDB = options.mqDB||options.db||'MongoMQ';
- options.db = self.mqDB;
- self.collectionSize = options.collectionSize||100000000;
- self.handlers = [];
- var server = self.server = new MC(options, function(err, conn){
- if(err){
- console.log(err);
+var E_NOTLISTENING = 'Queue interface not started!';
+var E_INVALIDHANDLER = 'Must provide a valid function as handler!';
+
+var startListener = function(passive, collection, filter, handler, endHandler){
+ if(typeof(handler)!=='function') throw E_INVALIDHANDLER;
+ var cursor = collection.find(filter, {tailable: true}),
+ _cursor = {
+ stop: function(){
+ cursor.close(function(callback){
+ if(endHandler) endHandler(_cursor, callback);
+ });
+ },
+ start: function(){},
+ raw: function(){
+ return cursor;
+ }
+ };
+ _cursor.__defineGetter__('handler', function(){
+ return handler;
+ });
+ if(filter.event){
+ _cursor.__defineGetter__('event', function(){
+ return filter.event;
+ });
+ }else{
+ _cursor.__defineGetter__('event', function(){
+ return '*';
+ });
+ }
+ _cursor.__defineGetter__('active', function(){
+ return true;
+ });
+ var next,
+ callHandler = function(err, msg){
+ handler(err, msg?msg.data:false, next);
+ };
+ if(passive){
+ next = function(closeCursor){
+ closeCursor = closeCursor || (!collection.db.openCalled);
+ if(!closeCursor){
+ cursor.nextObject(callHandler);
}else{
- self.db = conn._db;
- self.start();
+ cursor.close(function(){
+ if(endHandler) endHandler(_cursor);
+ });
}
+ };
+ }else{
+ next = function(closeCursor){
+ closeCursor = closeCursor || (!collection.db.openCalled);
+ if(!closeCursor){
+ cursor.nextObject(function(err, msg){
+ if(msg){
+ collection.findAndModify((msg&&msg.length>0)?msg[0]:msg, {emitted: -1}, {$set: {handled: true}}, {},
+ function(err, data){
+ if(!data) next();
+ else callHandler(err, data);
+ });
+ }else next();
+ });
+ }else{
+ cursor.close(function(){
+ if(endHandler) endHandler(_cursor);
+ });
+ }
+ };
+ }
+ next();
+ return _cursor;
+};
+
+var createCollection = function(db, collectionName, collectionSize, callback){
+ db.dropCollection(collectionName, function(){
+ db.createCollection(collectionName, {
+ capped : true,
+ autoIndexId : true,
+ size : collectionSize
+ }, function(err, collection){
+ if(collection) collection.insert({ignore: 'This works around the bug in mongo-native where capped collections must have at least 1 record before you can setup a tailed cursor.'});
+ callback(err, collection);
});
+ });
};
-MongoMQ.prototype.start = function(){
- var self = this;
- self.db.collection(self.mqCollectionName, {safe: true}, function(err, collection){
- var okToStart = true;
- if(err){
- if(collection){
- okToStart = false;
- }else{
- self.createCollection();
- }
+var openQueue = function(db, collectionName, collectionSize, callback){
+ var checkDoCreate = function(createColl, collection){
+ if(createColl) createCollection(db, collectionName, collectionSize, callback);
+ else callback(null, collection);
+ };
+ db.collection(collectionName, {safe: true}, function(err, collection){
+ if(err&&collection) throw err;
+ if(collection){
+ collection.isCapped(function(err, isCapped){
+ checkDoCreate(!isCapped, collection);
+ });
}else{
- self.startListening(collection);
+ createCollection(db, collectionName, collectionSize, callback);
}
});
};
-MongoMQ.prototype.close = function(){
- var self = this;
- self.db.close();
+var MongoMQ = exports.MongoMQ = function(options, callback){
+ var self = this, listeners = [];
+ options = options || {};
+ self.mqCollectionName = options.mqCollectionName||'queue';
+ self.mqDB = options.mqDB||options.db||'MongoMQ';
+ self.__defineGetter__('listeners', function(){
+ return listeners;
+ });
+ if(!(options.host||options.servers)) options.host = 'localhost';
+ options.db = self.mqDB;
+ self.collectionSize = options.collectionSize||100000000;
+ self.listening = false;
+ self.options = options;
+ if(options.autoStart||(typeof(options.autoStart)=='undefined')) self.start(callback);
+ else if(typeof(callback)=='function') callback(null, self);
};
-MongoMQ.prototype.createCollection = function(){
+MongoMQ.prototype.start = function(callback){
var self = this;
- self.db.dropCollection(self.mqCollectionName, function(){
- self.db.createCollection(self.mqCollectionName, {
- capped: true,
- size: self.collectionSize
- }, function(err, collection){
- self.startListening(collection);
+ var defaultServerOptions = {auto_reconnect: true};
+
+ if(self.listening){
+ if(typeof(callback)=='function') callback(null, self);
+ }else{
+ var open = function(p_db, mqDB, collectionSize){
+ openQueue(p_db, mqDB, collectionSize, function(err, collection){
+ self.collection = collection;
+ self.listening = true;
+ if(self.listeners.length){
+ var l = self.listeners.length;
+ for(i = 0; i<l; i++){
+ self.listeners[i].start();
+ }
+ }
+ if(typeof(callback)=='function') callback(null, self);
});
- });
+ };
+
+ options = self.options;
+ if(options.servers instanceof Array){
+ var servers = [], host, port, serverOptions, l = options.servers.length;
+ for(var i = 0; i<l; i++){
+ if(typeof(options.servers[i])=='string'){
+ host = options.servers[i];
+ port = mongodb.Connection.DEFAULT_PORT;
+ serverOptions = options.serverOptions||defaultServerOptions;
+ }else{
+ host = options.servers[i].host||options.host||'localhost';
+ port = options.servers[i].port||options.port||mongodb.Connection.DEFAULT_PORT;
+ serverOptions = options.servers[i].serverOptions||options.serverOptions||defaultServerOptions;
+ }
+ servers.push(new mongodb.Server(host, port, options));
+ }
+ self.server = new mongodb.ReplSetServers(servers);
+ }else self.server = new mongodb.Server(options.host||'localhost', options.port||mongodb.Connection.DEFAULT_PORT, options.serverOptions||defaultServerOptions);
+ var db = self.dbConnection = new mongodb.Db(options.db, self.server, options.dbOptions||{native_parser:(options.nativeParser==null?false:options.nativeParser)});
+ db.open(function(err, p_db){
+ self.p_db = p_db;
+ if(options.username&&options.password){
+ db.admin(function(err, adminDb){
+ adminDb.authenticate(options.username, options.password, function(err, result){
+ if(result){
+ open(p_db, self.mqDB, self.collectionSize);
+ }else{
+ self.p_db.close();
+ delete self.p_db;
+ self.listening = false;
+ }
+ });
+ });
+ }else{
+ open(p_db, self.mqDB, self.collectionSize);
+ }
+ });
+ }
};
-MongoMQ.prototype.startListening = function(collection){
+MongoMQ.prototype.stop = function(callback){
var self = this;
- collection = collection||self.db.collection(self.mqCollectionName, {safe: true});
- if(!collection) self.createCollection();
- else{
- collection.isCapped(function(err, isCapped){
- if(!isCapped) self.createCollection();
- else{
- self.collection = collection;
- if(self.handlers.length>0){
- var l = self.handlers.length;
- for(var i = 0; i<l; i++){
- if(self.handlers[i].msgType) self.nextMessage(self.handlers[i].msgType, self.handlers[i].passive, self.handlers[i].callback);
- else self.onAny(self.handlers[i].callback);
- }
+ if(self.listening){
+ self.p_db.close(true, function(){
+ delete self.p_db;
+ self.listening = false;
+ var l = self.listeners.length;
+ for(var i = l-1; i>-1; i--){
+ if(self.listeners[i].active){
+ self.listeners[i].stop();
}
- self.listening = true;
}
+ if(typeof(callback)=='function') callback(null, self);
});
+ }else{
+ if(typeof(callback)=='function') callback(null, self);
}
};
-MongoMQ.prototype.getCursor = function(forMsg, passive, callback, cursor){
+MongoMQ.prototype.emit = function(msgType, data, partialCallback, completeCallback){
var self = this;
- var doCallback = function(err, msg){
- var next = function(){
- self.getCursor(forMsg, passive, callback, cursor);
+ if(!self.listening) throw E_NOTLISTENING;
+ var msgPkt = {
+ event: msgType,
+ data: data,
+ handled: false,
+ emitted: new Date()
+ };
+ self.collection.insert(msgPkt, function(){});
+};
+
+var getEventPlaceholder = function(self, msgType, options, handler){
+ var placeholder = {
+ stop: function(callback){
+ if(typeof(callback)=='function') callback(self.indexOfListener(msgType, handler));
+ }
};
- if(typeof(callback)=='function') callback(err, msg?msg.pkt:false, next);
+ if(msgType=='*'){
+ placeholder.start=function(){
+ self.onAny(handler);
+ };
+ }else{
+ placeholder.start=function(){
+ self.on(msgType, options, handler);
};
- if(typeof(passive)=='function'){
- callback = passive;
- passive = false;
}
- self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true});
- if(passive){
- cursor = cursor || self.collection.find({$and: [{msg: forMsg}, {$or: [{handled: false}, {handled: {$exists: false}}]}]}, {tailable: true});
- cursor.nextObject(function(err, msg){
- if(typeof(callback)=='function') doCallback(err, (msg&&msg.length)>0?msg[0]:msg);
+ placeholder.__defineGetter__('options', function(){
+ return options;
+ });
+ placeholder.__defineGetter__('handler', function(){
+ return handler;
+ });
+ placeholder.__defineGetter__('event', function(){
+ return msgType;
+ });
+ placeholder.__defineGetter__('active', function(){
+ return false;
+ });
+ return placeholder;
+};
+
+MongoMQ.prototype.on = function(msgType, options, handler){
+ var self = this;
+ if(self.listening){
+ var filter = {event: msgType, handled: false};
+ if(typeof(options)=='function'){
+ handler = options;
+ options = {passive: false};
+ }
+ if(typeof(options)=='boolean') options = {passive: options};
+ if(options.hereOnOut) filter.emitted = {$gte: new Date()};
+ var listener = startListener(options.passive, self.collection, filter, handler, function(listener, callback){
+ var idx;
+ if((idx = self.indexOfListener(msgType, handler))>-1){
+ self.listeners[idx] = getEventPlaceholder(self, msgType, handler);
+ }
+ if(typeof(callback)=='function') callback(idx);
});
}else{
- cursor = cursor || self.collection.find({$and: [{msg: forMsg}, {$or: [{handled: false}, {handled: {$exists: false}}]}]}, {tailable: true});
- cursor.nextObject(function(err, msg){
- self.collection.findAndModify((msg&&msg.length)>0?msg[0]:msg, {emitted: -1}, {$set: {handled: true}}, {tailable: true},
- function(err, data){
- if(err||(!data)) self.getCursor(forMsg, passive, callback); // someone else picked it up
- else doCallback(err, data);
- });
- });
+ var listener = getEventPlaceholder(self, msgType, options, handler);
}
+ var idx;
+ if((idx = self.indexOfListener(msgType, handler)) == -1) self.listeners.push(listener);
+ else self.listeners[idx] = listener;
};
-MongoMQ.prototype.nextMessage = function(msgType, passive, callback){
+MongoMQ.prototype.onAny = function(handler){
var self = this;
- self.getCursor(msgType, passive, callback);
+ if(self.listening){
+ var filter = {emitted: {$gte: new Date()}};
+ var listener = startListener(true, self.collection, filter, handler, function(listener){
+ var idx;
+ if((idx = self.indexOfListener('*', handler))>-1){
+ self.listeners[idx] = getEventPlaceholder(self, '*', handler);
+ }
+ if(typeof(callback)=='function') callback(idx);
+ });
+ }else{
+ var listener = getEventPlaceholder(self, '*', {}, handler);
+ }
+ var idx;
+ if((idx = self.indexOfListener('*', handler)) == -1) self.listeners.push(listener);
+ else self.listeners[idx] = listener;
};
-MongoMQ.prototype.emit = function(msg, pkt){
- var self = this;
- self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true});
- msg = {
- msg: msg,
- pkt: pkt,
- handled: false,
- emitted: new Date()
- };
- self.collection.insert(msg, function(){});
+MongoMQ.prototype.indexOfListener = function(event, handler){
+ var self = this, l = self.listeners.length;
+ if(typeof(event)=='function'){
+ handler = event;
+ event = false;
+ }
+ if(!(event||handler)) return false;
+ var found = false, listener;
+ for(var i = 0; i<l; i++){
+ found = false;
+ listener = self.listeners[i];
+ if(event) found = (event==listener.event)||(listener.event=='*');
+ if(handler) found = found && listener.handler == handler;
+ if(found) return i;
+ }
+ return -1;
};
-MongoMQ.prototype.on = function(msg, passive, callback){
- var self = this;
- if(typeof(passive)=='function'){
- callback = passive;
- passive = false;
+MongoMQ.prototype.getListenerFor = function(event, handler){
+ var self = this, idx = self.indexOfListener(event, handler);
+ if(idx){
+ return self.listeners[idx];
}
- if(self.listening) self.nextMessage(msg, passive, callback);
- else self.handlers.push({msgType: msg, passive: !!passive, handler: callback});
+ return false;
};
-MongoMQ.prototype.onAny = function(callback, cursor){
- var self = this;
- if(!self.listening) self.handlers.push({msgType: msg, passive: !!passive, handler: callback});
- else{
- var doCallback = function(err, msg){
- var next = function(){
- self.onAny(callback, cursor);
- };
- if(typeof(callback)=='function') callback(err, msg?msg.pkt:false, next);
- };
- self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true});
- var clause = [{emitted: {$gte: new Date()}}];
- cursor = cursor || self.collection.find({$and: clause}, {tailable: true});
- cursor.nextObject(function(err, msg){
- if(typeof(callback)=='function') doCallback(err, (msg&&msg.length)>0?msg[0]:msg);
- });
+MongoMQ.prototype.removeListener = function(event, handler){
+ var self = this, idx = self.indexOfListener(event, handler);
+ if((idx!==false)&&idx>-1){
+ if(self.listeners[idx].active){
+ self.listeners[idx].stop(function(idx){
+ if(idx>-1) self.listeners.splice(idx, 1);
+ });
+ }else{
+ self.listeners.splice(idx, 1);
+ }
+ return true;
+ }
+ return false;
+};
+
+MongoMQ.prototype.removeListeners = function(event){
+ var self = this, l = self.listeners.length, numRemoved = 0;
+ if(typeof(event)=='function'){
+ handler = event;
+ event = false;
+ }
+ if(!(event||handler)) return false;
+ var found = false, listener;
+ for(var i = l; i>-1; i--){
+ found = false;
+ listener = self.listeners[i];
+ if(event) found = (event==listener.event)||(listener.event=='*');
+ if(handler) found = found && listener.handler == handler;
+ if(found){
+ self.listeners[i].stop(function(idx){
+ if(idx>-1) self.listeners.splice(idx, 1);
+ });
+ numRemoved++;
+ }
}
-};
+ return numRemoved;
+};
+
+MongoMQ.prototype.removeAllListeners = function(){
+ var self = this, l = self.listeners.length;
+ for(var i = l-1; i>-1; i--){
+ self.listeners[i].stop(function(idx){
+ if(idx>-1) self.listeners.splice(idx, 1);
+ });
+ }
+ return true;
+};
+
View
181 lib/MongoMQ_bu.js
@@ -0,0 +1,181 @@
+var MC = require("./MongoConnection").MongoConnection;
+
+var MongoMQ = exports.MongoMQ = function(options){
+ var self = this;
+ options = options || {};
+ self.mqCollectionName = options.mqCollectionName||'queue';
+ self.mqDB = options.mqDB||options.db||'MongoMQ';
+ options.db = self.mqDB;
+ self.collectionSize = options.collectionSize||100000000;
+ self.handlers = [];
+ self.listening = false;
+ self.options = options;
+ if(options.autoStart||(typeof(options.autoStart)=='undefined')) self.start();
+};
+
+MongoMQ.prototype.start = function(){
+ var self = this;
+ if(self.listening) return;
+ var server = self.server = new MC(self.options, function(err, conn){
+ if(err){
+ console.log(err);
+ }else{
+ self.db = conn._db;
+ self.db.collection(self.mqCollectionName, {safe: true}, function(err, collection){
+ if(err){
+ if(collection){
+ throw err;
+ }else{
+ self.createCollection();
+ }
+ }else{
+ self.startListening(collection);
+ }
+ });
+ }
+ });
+};
+
+MongoMQ.prototype.close = function(force, callback){
+ var self = this;
+ if(!self.listening) return;
+ self.db.close(typeof(force)!=='undefined'?force:true, callback||function(){});
+ self.collection = false;
+ self.listening = false;
+};
+
+MongoMQ.prototype.createCollection = function(){
+ var self = this;
+ self.db.dropCollection(self.mqCollectionName, function(){
+ self.db.createCollection(self.mqCollectionName, {
+ capped: true,
+ autoIndexId: true,
+ size: self.collectionSize
+ }, function(err, collection){
+ self.startListening(collection);
+ });
+ });
+};
+
+MongoMQ.prototype.startListening = function(collection){
+ var self = this;
+ if(self.listening) return;
+ collection = collection||self.db.collection(self.mqCollectionName, {safe: true});
+ if(!collection) self.createCollection();
+ else{
+ collection.isCapped(function(err, isCapped){
+ if(!isCapped) self.createCollection();
+ else{
+ self.collection = collection;
+ self.listening = true;
+ if(self.handlers.length>0){
+ setTimeout(function(){
+ var l = self.handlers.length, handler;
+ for(var i = 0; i<l; i++){
+ handler = self.handlers[i];
+ if(handler.msgType){
+ //self.nextMessage(self.handlers[i].msgType, self.handlers[i].passive, self.handlers[i].fromDT, self.handlers[i].callback);
+ self.nextMessage(handler.msgType, handler.passive, handler.fromDT, handler.callback);
+ }else{
+ //startListener(self, {$and: [{emitted: {$gte: self.handlers[i].fromDT}}]}, true, self.handlers[i].callback);
+ self.onAny(handler.callback);
+ }
+ }
+ }, 5000);
+ }
+ }
+ });
+ }
+};
+
+var startListener = function(self, filter, isPassive, callback){
+ console.log(filter);
+ self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true});
+ callback = callback||function(){};
+ var cursor = self.collection.find(filter, {tailable: true}), next;
+ var doCallback = function(err, msg){
+ callback(err, msg?msg.pkt:false, next);
+ };
+ if(isPassive){
+ next = function(){
+ if(self.listening) cursor.nextObject(doCallback);
+ };
+ }else{
+ next = function(){
+ cursor.nextObject(function(err, msg){
+ if(self.collection) self.collection.findAndModify((msg&&msg.length)>0?msg[0]:msg, {emitted: -1}, {$set: {handled: true}}, {},
+ function(err, data){
+ if(err||(!data)){
+ if(self.listening) next(); // someone else picked it up
+ }else doCallback(err, data);
+ });
+ });
+ };
+ }
+ next();
+ return next;
+};
+
+MongoMQ.prototype.nextMessage = function(msgType, passive, fromDT, callback){
+ var self = this, filter = {$and: [{msg: msgType}, {$or: [{handled: false}, {handled: {$exists: false}}]}]};
+ if(fromDT) filter.$and.push({emitted: {$gte: fromDT}});
+ startListener(self, filter, passive||false, callback);
+};
+
+MongoMQ.prototype.emit = function(msg, pkt){
+ var self = this;
+ if(!self.listening) throw "Queue Listener not started!";
+ self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true});
+ msg = {
+ msg: msg,
+ pkt: pkt,
+ handled: false,
+ emitted: new Date()
+ };
+ self.collection.insert(msg, function(){});
+};
+
+MongoMQ.prototype.addHandler = function(info){
+ var self = this, l = self.handlers.length, matches, key, handler;
+ for(var i = 0; i<l; i++){
+ matches = true;
+ handler = self.handlers[i];
+ for(key in info){
+ matches = matches && (key in handler);
+ if(matches) return;
+ }
+ }
+ self.handlers.push(info);
+};
+
+MongoMQ.prototype.on = function(msg, options, callback){
+ var self = this, passive = false, hereOnOut = false;
+ if(typeof(options)=='function'){
+ callback = options;
+ options = {passive: false};
+ }
+ passive = options.passive || passive;
+ hereOnOut = (options.hereOnOut || hereOnOut)?new Date():false;
+ if(self.listening) self.nextMessage(msg, passive, hereOnOut, callback);
+ self.addHandler({msgType: msg, passive: passive, handler: callback, fromDT: hereOnOut});
+};
+
+MongoMQ.prototype.onAny = function(callback){
+ var self = this, fromDT = new Date();
+ if(self.listening) startListener(self, {$and: [{emitted: {$gte: fromDT}}]}, true, callback);
+ self.addHandler({msgType: false, passive: true, handler: callback, fromDT: fromDT});
+};
+
+MongoMQ.prototype.forget = function(callback){
+ var self = this, l = self.handlers.length;
+ if(self.listening) throw "Can't forget while listening";
+ for(var i = l; i>-1; i--){
+ if(self.handlers[i].handler===callback) self.handlers.splice(i, 1);
+ }
+};
+
+MongoMQ.prototype.clearListeners = function(){
+ var self = this;
+ if(self.listening) throw "Can't clear while listening";
+ self.handlers = [];
+};
View
2 package.json
@@ -4,7 +4,7 @@
"email": "jeremy.darling@gmail.com"
},
"name": "mongomq",
- "version": "0.1.1",
+ "version": "0.2.1",
"repository": {
"type": "git",
"url": "git://github.com/jdarling/MongoMQ.git"
View
242 readme.md
@@ -1,5 +1,8 @@
MongoMQ - Node.js MongoMQ
=========================
+
+Version 0.2.1 presents what will hopefully be the final API, but is not feature complete yet. Hopefully version 0.2.2 will be feature complete.
+
Installation
============
@@ -11,7 +14,12 @@ From GitHub
Using NPM
---------
* npm install mongomq
-
+
+Requirements
+============
+ * Node.js - v0.8.2 or better (really only for the MongoMQ and Test.js scripts REPL support)
+ * MongoDB - v2.0.2 or better (for Tailable Cursors, autoIndexId bug fix, and ReplicaSet fixes)
+
What is MongoMQ?
================
@@ -38,32 +46,100 @@ options
* collectionSize - The size in bytes to cap the collection at, defaults to 100000000
* serverOptions - An optional options object that will be passed to Mongo-Native when the Mongo connection is created
* nativeParser - Boolean (defaults false) to enable usage of Mongo-Native's native parser. Only use this if you install mongodb with native support
+ * autoStart - Boolean (defaults true) if the MongoMQ instance should start itself up when created, if set to false you need to call MongoMQ.start()
-MongoMQ.on(msgType, passive, callback);
----------------------------------------
-msgType
- * The message type to listen for
-
-passive
- * If true will not mark the message as handled and will
-
-callback(err, messageContents, next)
- * Use next() to look for another message in the queue, don't call next() if you only want a one time listener
+MongoMQ.start([callback])
+-------------------------
+
+Starts the queue listener and sets up the emitter.
-MongoMQ.onAny(callback);
+Params:
+* callback - will be called once the queue is opened and all registered listeners are setup
+
+MongoMQ.stop([callback])
------------------------
-NOTE: Passive by default and only reacts to events after handler is registered
-callback(err, messageContents, next)
- * Use next() to look for another message in the queue, don't call next() if you only want a one time listener
+Stops listening for messages and closes the emitter.
+
+Params:
+* callback - will be called once the queue is completely close and all registered listeners have been shut down
+
+MongoMQ.emit(msgType, data, [partialCallback], [completeCallback])
+------------------------------------------------------------------
+
+Places the a message of msgTye on the queue with the provided data for handlers to consume.
+
+NOTE: partialCallback and completeCallback are not yet implemented, they are here as a prototype of how this functionality may be implemented.
-MongoMQ.emit(msgType, messageContents);
+Params:
+* msgType - The message type to emit.
+* data - a JSON serializeable collection of data to be sent.
+* partialCallback - NOT IMPLEMENTED - Will be called for large or long running result sets to return partial data back. Optional and if not present then completeCallback will be called with buffered results once all operations have completed.
+* completeCallback - NOT IMPLEMENTED - Will be called once all remote processing has been completed. If partialCallback is not provided and completeCallback is provided a temporary buffer will be setup and the final result set will be sent to completeCallback.
+
+MongoMQ.on(msgType, [passive||options], handler)
---------------------------------------
-msgType
- * The message type to send
-
-messageContents
- * What to send
+
+Sets up a listener for a specific message type.
+
+Params:
+* msgType - The message type to listen for
+* passive - If true will not mark the message as handled when a message is consumed from the queue
+* options - additional options that can be passed
+* callback(err, messageContents, next) - Use next() to look for another message in the queue, don't call next() if you only want a one time listener
+
+options -
+ * passive - If true will not mark the message as handled when a message is consumed from the queue
+ * hereOnOut - Boolean (defaults false) will only recieve messages from the time the listener was registered instead of all unconsumed messages if set to true
+
+
+MongoMQ.onAny(handler)
+----------------------
+
+Sets up a passive listener for all messages that are placed on the queue.
+
+Params:
+* callback(err, messageContents, next) - Use next() to look for another message in the queue, don't call next() if you only want a one time listener
+
+MongoMQ.indexOfListener([event], [handler])
+-------------------------------------------
+
+Provides back the index of the first listener that matches the supplied event and/or handler. One or both of event and handler must be supplied. Returns BOOLEAN false if no handler is found.
+
+Params:
+* event - The name of the event to look for.
+* handler - The specific handler function to look for.
+
+MongoMQ.getListenerFor([event], [handler])
+------------------------------------------
+
+Provides back the first listener that matches the supplied event and/or handler. One or both of event and handler must be supplied. Returns BOOLEAN false if no handler is found.
+
+Params:
+* event - The name of the event to look for.
+* handler - The specific handler function to look for.
+
+MongoMQ.removeListener([event], [handler])
+------------------------------------------
+
+Shuts down the first listener that matches the supplied event and/or handler and removes it from the listeners list.
+
+Params:
+* event - The name of the event to look for.
+* handler - The specific handler function to look for.
+
+MongoMQ.removeListeners(event)
+------------------------------
+
+Shuts down ALL listeners for the specified event and removes it from the listeners list.
+
+Params:
+* event - The name of the event to look for.
+
+MongoMQ.removeAllListeners()
+----------------------------
+
+Shuts down ALL listeners and clears the listeners list.
How does MongoMQ work?
======================
@@ -85,37 +161,109 @@ You should see the two listeners pickup messages one at a time with whoever has
bin/test.js
===========
- var MongoMQ = require('../lib/MongoMQ').MongoMQ;
- var repl = require('repl');
+```javascript
+var MongoMQ = require('../lib/MongoMQ').MongoMQ;
+var repl = require('repl');
- var queue = new MongoMQ();
+var queue = new MongoMQ({
+ autoStart: false
+});
- var r = repl.start({
- prompt: ">"
- });
-
- r.on('exit', function(){
- queue.close();
+var r = repl.start({
+ prompt: "testbed>"
});
+r.on('exit', function(){
+ queue.stop();
+});
- r.context.listen = function(){
- queue.on('test', function(err, data, next){
- console.log('got: ', data);
- next();
- });
- };
+var msgidx = 0;
+r.context.send = function(){
+ queue.emit('test', msgidx);
+ msgidx++;
+};
- var msgidx = 0;
- r.context.send = function(){
- queue.emit('test', msgidx);
- msgidx++;
- };
+r.context.load = function(){
+ for(var i = 0; i<100; i++){
+ queue.emit('test', msgidx);
+ msgidx++;
+ }
+};
- r.context.load = function(){
- for(var i = 0; i<100; i++){
- queue.emit('test', msgidx);
- msgidx++;
- }
+var logMsg = function(err, data, next){
+ console.log('LOG: ', data);
+ next();
};
+var eatTest = function(err, data, next){
+ console.log('eat: ', data);
+ next();
+ };
+
+r.context.logAny = function(){
+ queue.onAny(logMsg);
+};
+
+r.context.eatTest = function(){
+ queue.on('test', eatTest);
+};
+
+r.context.start = function(cb){
+ queue.start(cb);
+};
+
+r.context.stop = function(){
+ queue.stop();
+};
+
+r.context.help = function(){
+ console.log('Built in test methods:\r\n'+
+ ' help() - shows this message\r\n'+
+ ' logAny() - logs any message to the console\r\n'+
+ ' eatTest() - consumes next available "test" message from the queue\r\n'+
+ ' send() - places a "test" message on the queue\r\n'+
+ ' load() - places 100 "test" messages on the queue\r\n'+
+ ' start() - start the queue listener\r\n'+
+ ' stop() - stop the queue listener\r\n'+
+ '\r\nInstance Data\r\n'+
+ ' queue - the global MongoMQ instance\r\n'
+ );
+ return '';
+};
+
+/*
+queue.start(function(){
+ r.context.eatTest();
+});
+*/
- r.context.queue = queue;
+r.context.queue = queue;
+
+r.context.help();
+```
+Planned Improvements
+====================
+
+ * Update the emit method and handler callbacks to allow placing an event on to the queue collection and receiving a response or responses from it.
+
+Update History
+==============
+
+v0.2.1
+ * Majorly refactored code
+ * Added autoIndexId: true to queue collection creation
+ * Better MongoMQ application with help()
+ * Updated test application
+ * Added an exception to emit() when you try to emit before start() has been called
+ * fix to onAny so it will restart listeners after a close() and start() re-issue
+ * Added remove*() methods
+ * Changed close() to stop()
+ * hereOnOut options - allows listeners to only pay attention to messages posted after they have been started up
+ * Added ability to register listeners (via on and onAny) when queue is not started
+
+v0.1.1
+ * Bug fixes to on event
+ * Added in new onAny register
+ * Migrated code to retain cursor
+
+v0.1.0
+ * Initial release
+ * More of a proof of concept

0 comments on commit 5669f5e

Please sign in to comment.
Something went wrong with that request. Please try again.