Permalink
Browse files

Fixed to retain cursor and add onAny handler

Fixed the code that the cursor gets reused, this really should be in its
own closure and will get refactored eventually for this.
Add an onAny handler.
Added a better command line app for monitoring the queue (MongoMQ)
Updated npm version to 0.1.1.
Still need to update readme.md
  • Loading branch information...
1 parent a12e994 commit 203e609c207d8166006ddb13ee71c3e89b9bb5f7 @jdarling committed Jul 13, 2012
Showing with 99 additions and 9 deletions.
  1. +48 −0 bin/MongoMQ.js
  2. +14 −0 bin/test.js
  3. +30 −7 lib/MongoMQ.js
  4. +2 −2 package.json
  5. +5 −0 readme.md
View
@@ -0,0 +1,48 @@
+var MongoMQ = require('../lib/MongoMQ').MongoMQ;
+var repl = require('repl');
+
+var l = process.argv.length, tmp, names, reCmdLineStrip=/^(\-|\\|\/)*/i, opts = {};
+for(var i = 2; i < l; i++){
+ tmp = process.argv[i].replace(reCmdLineStrip, '').split('=');
+ name = tmp.shift();
+ if(tmp.length>0) val = tmp.join('=');
+ else val = true;
+ tmp = opts;
+ names = name.split('.');
+ while(names.length>1){
+ name = names.shift();
+ tmp = tmp[name]=tmp[name]||{};
+ }
+ tmp[names.shift()]=val;
+}
+
+var queue = new MongoMQ(opts);
+
+var r = repl.start({
+ prompt: "MongoMQ>"
+ });
+
+r.on('exit', function(){
+ queue.close();
+});
+
+var funcName, value;
+for(funcName in queue){
+ value = queue[funcName];
+ if(typeof(value)=='function'){
+ r.context[funcName] = (function(f){
+ return function(){
+ f.apply(queue, arguments);
+ };
+ })(value);
+ }
+}
+
+r.context.logAny = function(){
+ queue.onAny(function(err, data, next){
+ console.log(data);
+ next();
+ });
+};
+
+r.context.queue = queue;
View
@@ -31,4 +31,18 @@ r.context.load = function(){
}
};
+r.context.logAny = function(){
+ queue.onAny(function(err, data, next){
+ console.log(data);
+ next();
+ });
+};
+
+r.context.eatTest = function(){
+ queue.on('test', function(err, data, next){
+ console.log('eat: ', data);
+ next();
+ });
+};
+
r.context.queue = queue;
View
@@ -6,6 +6,7 @@ var MongoMQ = exports.MongoMQ = function(options){
self.mqCollectionName = options.mqCollectionName||'queue';
self.mqDB = options.mqDB||options.db||'MongoMQ';
options.db = self.mqDB;
+ self.collectionSize = options.collectionSize||100000000;
self.handlers = [];
var server = self.server = new MC(options, function(err, conn){
if(err){
@@ -43,7 +44,7 @@ MongoMQ.prototype.createCollection = function(){
self.db.dropCollection(self.mqCollectionName, function(){
self.db.createCollection(self.mqCollectionName, {
capped: true,
- size: 100000000
+ size: self.collectionSize
}, function(err, collection){
self.startListening(collection);
});
@@ -62,7 +63,8 @@ MongoMQ.prototype.startListening = function(collection){
if(self.handlers.length>0){
var l = self.handlers.length;
for(var i = 0; i<l; i++){
- self.nextMessage(self.handlers[i].msgType, self.handlers[i].passive, self.handlers[i].callback);
+ if(self.handlers[i].msgType) self.nextMessage(self.handlers[i].msgType, self.handlers[i].passive, self.handlers[i].callback);
+ else self.onAny(self.handlers[i].callback);
}
}
self.listening = true;
@@ -71,11 +73,11 @@ MongoMQ.prototype.startListening = function(collection){
}
};
-MongoMQ.prototype.getCursor = function(forMsg, passive, callback){
+MongoMQ.prototype.getCursor = function(forMsg, passive, callback, cursor){
var self = this;
var doCallback = function(err, msg){
var next = function(){
- self.getCursor(forMsg, passive, callback);
+ self.getCursor(forMsg, passive, callback, cursor);
};
if(typeof(callback)=='function') callback(err, msg?msg.pkt:false, next);
};
@@ -85,11 +87,13 @@ MongoMQ.prototype.getCursor = function(forMsg, passive, callback){
}
self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true});
if(passive){
- self.collection.find({$and: [{msg: forMsg}, {$or: [{handled: false}, {handled: {$exists: false}}]}]}, {tailable: true}, function(err, msg){
+ cursor = cursor || self.collection.find({$and: [{msg: forMsg}, {$or: [{handled: false}, {handled: {$exists: false}}]}]}, {tailable: true});
+ cursor.nextObject(function(err, msg){
if(typeof(callback)=='function') doCallback(err, (msg&&msg.length)>0?msg[0]:msg);
});
}else{
- self.collection.find({$and: [{msg: forMsg}, {$or: [{handled: false}, {handled: {$exists: false}}]}]}, {tailable: true}).nextObject(function(err, msg){
+ cursor = cursor || self.collection.find({$and: [{msg: forMsg}, {$or: [{handled: false}, {handled: {$exists: false}}]}]}, {tailable: true});
+ cursor.nextObject(function(err, msg){
self.collection.findAndModify((msg&&msg.length)>0?msg[0]:msg, {emitted: -1}, {$set: {handled: true}}, {tailable: true},
function(err, data){
if(err||(!data)) self.getCursor(forMsg, passive, callback); // someone else picked it up
@@ -122,6 +126,25 @@ MongoMQ.prototype.on = function(msg, passive, callback){
callback = passive;
passive = false;
}
- self.handlers.push({msgType: msg, passive: !!passive, handler: callback});
if(self.listening) self.nextMessage(msg, passive, callback);
+ else self.handlers.push({msgType: msg, passive: !!passive, handler: callback});
};
+
+MongoMQ.prototype.onAny = function(callback, cursor){
+ var self = this;
+ if(!self.listening) self.handlers.push({msgType: msg, passive: !!passive, handler: callback});
+ else{
+ var doCallback = function(err, msg){
+ var next = function(){
+ self.onAny(callback, cursor);
+ };
+ if(typeof(callback)=='function') callback(err, msg?msg.pkt:false, next);
+ };
+ self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true});
+ var clause = [{emitted: {$gte: new Date()}}];
+ cursor = cursor || self.collection.find({$and: clause}, {tailable: true});
+ cursor.nextObject(function(err, msg){
+ if(typeof(callback)=='function') doCallback(err, (msg&&msg.length)>0?msg[0]:msg);
+ });
+ }
+};
View
@@ -4,7 +4,7 @@
"email": "jeremy.darling@gmail.com"
},
"name": "mongomq",
- "version": "0.1.0",
+ "version": "0.1.1",
"repository": {
"type": "git",
"url": "git://github.com/jdarling/MongoMQ.git"
@@ -13,7 +13,7 @@
],
"main": "./lib/MongoMQ",
"bin": {
- "MongoMQ": "./bin/test.js"
+ "MongoMQ": "./bin/MongoMQ.js"
},
"engines": {
"node": ">= v0.6.0"
View
@@ -30,9 +30,14 @@ new MongoMQ(options)
options
* mqCollectionName - Collection to store queue messages in, defaults to 'queue'
* mqDB - Database to store queue in, defaults to 'MongoMQ'
+ * username - Optional value of the username to validate against Mongo with
+ * password - Optional value of the password to validate against Mongo with
* server - If not running against a ReplicaSet this is the server to connect to
* port - If not running against a ReplicaSet this is the server port to connect with
* servers[] - If connecting to a ReplicaSet then this is a collection of {host: 'hostname', port: 1234} objects defining the root entry for the ReplicaSet
+ * collectionSize - The size in bytes to cap the collection at, defaults to 100000000
+ * serverOptions - An optional options object that will be passed to Mongo-Native when the Mongo connection is created
+ * nativeParser - Boolean (defaults false) to enable usage of Mongo-Native's native parser. Only use this if you install mongodb with native support
MongoMQ.on(msgType, passive, callback);
---------------------------------------

0 comments on commit 203e609

Please sign in to comment.