From 2e5b7eb914292fb00f2d536a1703b984464363ba Mon Sep 17 00:00:00 2001 From: Jeremy Darling Date: Tue, 31 Jul 2012 18:10:00 -0500 Subject: [PATCH] Added emit callback's and bug fixes v0.2.2 * Completed code to allow for callbacks and partial callbacks to be issued back to emit statements * Complteed refactoring of code to properly seperate functionality into objects --- .project | 12 + bin/MongoMQ.js | 2 +- bin/test.js | 59 ++++- lib/MongoConnection.js | 295 ++++++++++++++++----- lib/MongoMQ.js | 575 ++++++++++++++++++++++------------------- lib/MongoMQ_bu.js | 181 ------------- lib/QueueListener.js | 175 +++++++++++++ package.json | 5 +- readme.md | 15 +- 9 files changed, 801 insertions(+), 518 deletions(-) create mode 100644 .project delete mode 100644 lib/MongoMQ_bu.js create mode 100644 lib/QueueListener.js diff --git a/.project b/.project new file mode 100644 index 0000000..785e798 --- /dev/null +++ b/.project @@ -0,0 +1,12 @@ + + + MongoMQ + + + + + + + com.aptana.projects.webnature + + diff --git a/bin/MongoMQ.js b/bin/MongoMQ.js index 1395fde..a157ddf 100644 --- a/bin/MongoMQ.js +++ b/bin/MongoMQ.js @@ -41,7 +41,7 @@ for(funcName in queue){ r.context.logAny = function(){ queue.onAny(function(err, data, next){ - console.log(data); + console.log(err, data); next(); }); }; diff --git a/bin/test.js b/bin/test.js index 949508f..fb486d7 100644 --- a/bin/test.js +++ b/bin/test.js @@ -18,6 +18,11 @@ r.context.send = function(){ msgidx++; }; +r.context.send2 = function(){ + queue.emit('test2', msgidx); + msgidx++; +}; + r.context.load = function(){ for(var i = 0; i<100; i++){ queue.emit('test', msgidx); @@ -25,6 +30,13 @@ r.context.load = function(){ } }; +r.context.load2 = function(){ + for(var i = 0; i<100; i++){ + queue.emit('test2', msgidx); + msgidx++; + } +}; + var logMsg = function(err, data, next){ console.log('LOG: ', data); next(); @@ -33,6 +45,10 @@ var eatTest = function(err, data, next){ console.log('eat: ', data); next(); }; +var eatTest2 = function(err, data, next){ + console.log('eat2: ', data); + next(); + }; r.context.logAny = function(){ queue.onAny(logMsg); @@ -42,6 +58,10 @@ r.context.eatTest = function(){ queue.on('test', eatTest); }; +r.context.eatTest2 = function(){ + queue.on('test2', eatTest2); +}; + r.context.start = function(cb){ queue.start(cb); }; @@ -50,6 +70,27 @@ r.context.stop = function(){ queue.stop(); }; +r.context.testOnce = function(){ + var handlers = queue.once('once', function(err, data, next){ + console.log('testOnce: ', data); + next(); + }); + queue.emit('once', {foo: 'bar3'}); +}; + +r.context.testAll = function(){ + r.context.logAny(); + r.context.eatTest(); + r.context.eatTest2(); + r.context.testOnce(); + r.context.send(); + r.context.send2(); + r.context.load(); + r.context.load2(); + r.context.testOnce(); + r.context.testOnce(); +}; + r.context.help = function(){ console.log('Built in test methods:\r\n'+ ' help() - shows this message\r\n'+ @@ -65,12 +106,22 @@ r.context.help = function(){ return ''; }; -/* +r.context.tmp = function(){ + queue.on('foo', function(err, data, next){ + console.log('foo got: ', err||data); + next({some: 'data', bar: data.bar}); + next({more: 'data', bar: data.bar}); + next({done: 'data', bar: data.bar}, true); + }); + queue.emit('foo', {bar: 'none'}, function(err, data, next){console.log('test: ', data); next();}); + queue.emit('foo', {bar: 'some'}, function(err, data, next){console.log('partial: ', data); next();}, function(err, data, next){console.log('complete: ', data); next();}); +}; + +//* queue.start(function(){ - r.context.eatTest(); }); -*/ +//*/ r.context.queue = queue; - +//process.stdout.write('\u001B[2J\u001B[0;0f'); r.context.help(); \ No newline at end of file diff --git a/lib/MongoConnection.js b/lib/MongoConnection.js index 9cae457..70d04fb 100644 --- a/lib/MongoConnection.js +++ b/lib/MongoConnection.js @@ -1,94 +1,265 @@ -var mongodb = require('mongodb'); -var key; +/* + MongoConnection(, ): + options: + : + : + : + : + : + : + : + : + : + : + callback: -for(key in mongodb){ - if(hasOwnProperty.call(mongodb, key)){ - global[key] = exports[key] = mongodb[key]; - } -} + MongoConnection.open() + MongoConnection.close() + MongoConnection.collection(collectionName, , ) + MongoConnection.tailedCursor(collectionName, filter, , callback) + MongoConnection.insert(intoCollection, document, , ) + MongoConnection.remove(fromCollection, , , ) + MongoConnection.createCollection(collectionName, , ) + MongoConnection.dropCollection(collectionName, ) + MongoConnection.ensureIndex(collectionName, index, , ) + MongoConnection.findAndModify(inCollection, query, sort, doc, options, callback) +*/ + +var mongodb = require("mongodb"); -exports.ObjectId = exports.ObjectID; +var Connection = mongodb.Connection; +var Server = mongodb.Server; +var Db = mongodb.Db; + +var errors = { + E_CONNCLOSED: 'No active server connection!', + E_NODB: 'No database name provided!', + E_INVALIDFILTER: 'Invalid or no filter provided!', + E_NOCALLBACK: 'No callback provided, callback is required!', + E_INVALIDINDEX: 'Invalid or no index provided!', + E_INVALIDCURSORCOLLECTION: 'Supplied collection is not capped, tailed cursors only work on capped collections!' + }; +var defaults = { + CollectionOptions: {safe: false}, + ServerHost: 'localhost', + ServerOptions: {auto_reconnect: false} + } var MongoConnection = exports.MongoConnection = function(options, callback){ - options = options||{}; var self = this; - if(options.servers instanceof Array){ - var servers = [], host, port, serverOptions, l = options.servers.length; + options = options || {}; + if(typeof(options)=='string'){ + options = {database: options}; + } + if(options.servers){ + var l = options.servers.length, server, serverConfig; + self.servers = []; for(var i = 0; i, ): + options: + : + : + : + : + : + : + : + : + : + : + : + : + : -var E_NOTLISTENING = 'Queue interface not started!'; -var E_INVALIDHANDLER = 'Must provide a valid function as handler!'; + MongoMQ.start(callback): + MongoMQ.stop(callback): + MongoMQ.emit(msgType, data, partialCallback, completeCallback): + MongoMQ.once(msgType, options, handler): + MongoMQ.on(msgType, options, handler): + MongoMQ.onAny(handler): + MongoMQ.indexOfListener(event, handler): + MongoMQ.getListenerFor(event, handler): + MongoMQ.removeListener(event, handler): + MongoMQ.removeListeners(event, handler): + MongoMQ.removeAllListeners(): +*/ -var startListener = function(passive, collection, filter, handler, endHandler){ - if(typeof(handler)!=='function') throw E_INVALIDHANDLER; - var cursor = collection.find(filter, {tailable: true}), - _cursor = { - stop: function(){ - cursor.close(function(callback){ - if(endHandler) endHandler(_cursor, callback); - }); - }, - start: function(){}, - raw: function(){ - return cursor; - } - }; - _cursor.__defineGetter__('handler', function(){ - return handler; - }); - if(filter.event){ - _cursor.__defineGetter__('event', function(){ - return filter.event; - }); - }else{ - _cursor.__defineGetter__('event', function(){ - return '*'; - }); - } - _cursor.__defineGetter__('active', function(){ - return true; - }); - var next, - callHandler = function(err, msg){ - handler(err, msg?msg.data:false, next); - }; - if(passive){ - next = function(closeCursor){ - closeCursor = closeCursor || (!collection.db.openCalled); - if(!closeCursor){ - cursor.nextObject(callHandler); - }else{ - cursor.close(function(){ - if(endHandler) endHandler(_cursor); - }); - } +var MC = require('./MongoConnection').MongoConnection; +var UUID = require('node-uuid'); +var QueueListener = require('./QueueListener').QueueListener; + +var defaults = { + collectionName: 'queue', + db: 'mongomq', + collectionSize: 100000000, + onOptions: { + passive: false + } }; - }else{ - next = function(closeCursor){ - closeCursor = closeCursor || (!collection.db.openCalled); - if(!closeCursor){ - cursor.nextObject(function(err, msg){ - if(msg){ - collection.findAndModify((msg&&msg.length>0)?msg[0]:msg, {emitted: -1}, {$set: {handled: true}}, {}, - function(err, data){ - if(!data) next(); - else callHandler(err, data); - }); - }else next(); - }); - }else{ - cursor.close(function(){ - if(endHandler) endHandler(_cursor); - }); - } +var errors = { + E_NOTLISTENING: 'Queue interface not started!', + E_INVALIDHANDLER: 'Must provide a valid function as handler!', + E_INVALIDEVENTTYPE: 'Must provide an event type!' }; - } - next(); - return _cursor; -}; -var createCollection = function(db, collectionName, collectionSize, callback){ - db.dropCollection(collectionName, function(){ - db.createCollection(collectionName, { - capped : true, - autoIndexId : true, - size : collectionSize - }, function(err, collection){ - if(collection) collection.insert({ignore: 'This works around the bug in mongo-native where capped collections must have at least 1 record before you can setup a tailed cursor.'}); - callback(err, collection); - }); - }); -}; - -var openQueue = function(db, collectionName, collectionSize, callback){ - var checkDoCreate = function(createColl, collection){ - if(createColl) createCollection(db, collectionName, collectionSize, callback); - else callback(null, collection); - }; - db.collection(collectionName, {safe: true}, function(err, collection){ - if(err&&collection) throw err; - if(collection){ - collection.isCapped(function(err, isCapped){ - checkDoCreate(!isCapped, collection); - }); - }else{ - createCollection(db, collectionName, collectionSize, callback); +var validateListenerArguments = function(msgType, options, handler){ + if(typeof(msgType)!=='string'){ + throw errors.E_INVALIDEVENTTYPE; + } + if(typeof(options)==='function'){ + handler = options; + options = {}; + var key; + for(key in defaults.onOptions){ + options[key] = defaults.onOptions[key]; } - }); + }else if(typeof(options)==='boolean'){ + options = {passive: options}; + } + if(options.hereOnOut){ + options.after = new Date(); + } + var selector = {}; + if(msgType!=='*'){ + selector.event = msgType; + }else{ + options.after = options.after||new Date(); + options.passive = true; + } + if(options.after){ + selector.emitted = {$gte: options.after}; + } + if(options.partialOnly){ + selector.partial = true; + } + if(options.completeOnly){ + selector.partial = false; + } + if(options.passive){ + selector.emitted = {$gte: options.after}; + }else if(msgType!=='*'){ + selector.handled = false; + } + if(typeof(handler)!=='function'){ + throw E_INVALIDHANDLER; + } + return {options: options, handler: handler, selector: selector}; }; var MongoMQ = exports.MongoMQ = function(options, callback){ - var self = this, listeners = []; - options = options || {}; - self.mqCollectionName = options.mqCollectionName||'queue'; - self.mqDB = options.mqDB||options.db||'MongoMQ'; + var self = this;//, listening = false; + var listeners = [], mongoConnection = options.mongoConnection; + if(typeof(options)==='function'){ + callback = options; + options = {}; + }else{ + options = options || {}; + } + options.database = options.database||options.db||options.mqDB||defaults.db; + options.collectionName = options.collectionName||options.mqCollectionName||defaults.collectionName; + options.collectionSize = options.collectionSize||defaults.collectionSize; + self.__defineGetter__('options', function(){ + return options; + }); self.__defineGetter__('listeners', function(){ return listeners; }); - if(!(options.host||options.servers)) options.host = 'localhost'; - options.db = self.mqDB; - self.collectionSize = options.collectionSize||100000000; - self.listening = false; - self.options = options; - if(options.autoStart||(typeof(options.autoStart)=='undefined')) self.start(callback); - else if(typeof(callback)=='function') callback(null, self); + self.__defineGetter__('listening', function(){ + return mongoConnection.active;//&&listening; + }); + self.__defineGetter__('mongoConnection', function(){ + return mongoConnection; + }); + if(!mongoConnection){ + mongoConnection = new MC(options, callback); + } }; MongoMQ.prototype.start = function(callback){ var self = this; - var defaultServerOptions = {auto_reconnect: true}; - - if(self.listening){ - if(typeof(callback)=='function') callback(null, self); - }else{ - var open = function(p_db, mqDB, collectionSize){ - openQueue(p_db, mqDB, collectionSize, function(err, collection){ + var startListeners = function(err, collection){ self.collection = collection; - self.listening = true; - if(self.listeners.length){ - var l = self.listeners.length; - for(i = 0; i-1; i--){ - if(self.listeners[i].active){ - self.listeners[i].stop(); - } - } - if(typeof(callback)=='function') callback(null, self); - }); + var l = self.listeners.length; + for(var i = 0; i0){ + process.nextTick(function(){ + completeTest(call_err, data, next); + }); + }else{ + partialListener.stop(); + completeCallback(call_err, data, next); + } + }); + }; + var doComplete = function(err, data, next){ + completeTest(err, data, next); + }; + if(typeof(completeCallback)!=='function'){ + completeCallback = partialCallback; + results = [], errors = []; + partialCallback = function(err, data, next){ + if(err) errors.push(err); + results.push(data); + next(); + }; + doComplete = function(err, data, next){ + if(err) errors.push(err); + results.push(data); + completeTest(errors.length>0?errors:null, results, next); + }; + } + var tmpOptions = validateListenerArguments(conversationId, {partialOnly: true, hereOnOut: true}, partialCallback); + var partialListener = new QueueListener({ + event: conversationId, + autoStart: true, + mongoConnection: self.mongoConnection, + collectionName: self.options.collectionName, + handler: partialCallback, + selector: tmpOptions.selector, + passive: tmpOptions.options.passive + }); + tmpOptions = validateListenerArguments(conversationId, {completeOnly: true, hereOnOut: true}, completeCallback); + var completeListener = new QueueListener({ + event: conversationId, + autoStart: true, + mongoConnection: self.mongoConnection, + collectionName: self.options.collectionName, + handler: doComplete, + selector: tmpOptions.selector, + passive: tmpOptions.options.passive + }); + })(partialCallback, completeCallback); + } + self.collection.insert(msgPkt, function(){}); } - placeholder.__defineGetter__('options', function(){ - return options; - }); - placeholder.__defineGetter__('handler', function(){ - return handler; - }); - placeholder.__defineGetter__('event', function(){ - return msgType; - }); - placeholder.__defineGetter__('active', function(){ - return false; - }); - return placeholder; }; -MongoMQ.prototype.on = function(msgType, options, handler){ - var self = this; - if(self.listening){ - var filter = {event: msgType, handled: false}; - if(typeof(options)=='function'){ - handler = options; - options = {passive: false}; - } - if(typeof(options)=='boolean') options = {passive: options}; - if(options.hereOnOut) filter.emitted = {$gte: new Date()}; - var listener = startListener(options.passive, self.collection, filter, handler, function(listener, callback){ - var idx; - if((idx = self.indexOfListener(msgType, handler))>-1){ - self.listeners[idx] = getEventPlaceholder(self, msgType, handler); - } - if(typeof(callback)=='function') callback(idx); +MongoMQ.prototype.once = function(msgType, options, handler){ + var self = this, tmpOptions = validateListenerArguments(msgType, options, handler); + var idx = self.indexOfListener(msgType, handler), listener; + listener = new QueueListener({ + event: msgType, + autoStart: true, + mongoConnection: self.mongoConnection, + collectionName: self.options.collectionName, + handler: function(err, data, next){ + tmpOptions.handler(err, data, function(){ + next(true); + }); + }, + selector: tmpOptions.selector, + passive: tmpOptions.options.passive }); + return listener; +}; + +MongoMQ.prototype.on = function(msgType, options, handler){ + var self = this, tmpOptions = validateListenerArguments(msgType, options, handler); + if(typeof(options)=='function'){ + handler = options; + options = {}; + } + var idx = self.indexOfListener(msgType, handler), listener; + if(idx===-1){ + listener = new QueueListener({ + event: msgType, + autoStart: true, + mongoConnection: self.mongoConnection, + collectionName: self.options.collectionName, + handler: tmpOptions.handler, + selector: tmpOptions.selector, + passive: tmpOptions.options.passive + }, function(err, listener){ + self.listeners.push(listener); + }); }else{ - var listener = getEventPlaceholder(self, msgType, options, handler); + listener = self.listeners[idx]; + listener.start(); } - var idx; - if((idx = self.indexOfListener(msgType, handler)) == -1) self.listeners.push(listener); - else self.listeners[idx] = listener; + return listener; }; MongoMQ.prototype.onAny = function(handler){ - var self = this; - if(self.listening){ - var filter = {emitted: {$gte: new Date()}}; - var listener = startListener(true, self.collection, filter, handler, function(listener){ - var idx; - if((idx = self.indexOfListener('*', handler))>-1){ - self.listeners[idx] = getEventPlaceholder(self, '*', handler); - } - if(typeof(callback)=='function') callback(idx); - }); - }else{ - var listener = getEventPlaceholder(self, '*', {}, handler); - } - var idx; - if((idx = self.indexOfListener('*', handler)) == -1) self.listeners.push(listener); - else self.listeners[idx] = listener; + return this.on('*', handler); }; MongoMQ.prototype.indexOfListener = function(event, handler){ var self = this, l = self.listeners.length; - if(typeof(event)=='function'){ + if(typeof(event)==='function'){ handler = event; event = false; } - if(!(event||handler)) return false; + if(!(event||handler)){ + return false; + } var found = false, listener; for(var i = 0; i-1){ return self.listeners[idx]; } return false; @@ -311,21 +351,27 @@ MongoMQ.prototype.getListenerFor = function(event, handler){ MongoMQ.prototype.removeListener = function(event, handler){ var self = this, idx = self.indexOfListener(event, handler); - if((idx!==false)&&idx>-1){ - if(self.listeners[idx].active){ - self.listeners[idx].stop(function(idx){ - if(idx>-1) self.listeners.splice(idx, 1); - }); - }else{ + var listenerStopped = function(err, listener){ + var idx = self.listeners.indexOf(listener); + if((idx!==false)&&(idx>-1)){ self.listeners.splice(idx, 1); } + }; + if((idx!==false)&&(idx>-1)){ + self.listeners[i].stop(listenerStopped); return true; } return false; }; -MongoMQ.prototype.removeListeners = function(event){ +MongoMQ.prototype.removeListeners = function(event, handler){ var self = this, l = self.listeners.length, numRemoved = 0; + var listenerStopped = function(err, listener){ + var idx = self.listeners.indexOf(listener); + if((idx!==false)&&(idx>-1)){ + self.listeners.splice(idx, 1); + } + }; if(typeof(event)=='function'){ handler = event; event = false; @@ -338,9 +384,7 @@ MongoMQ.prototype.removeListeners = function(event){ if(event) found = (event==listener.event)||(listener.event=='*'); if(handler) found = found && listener.handler == handler; if(found){ - self.listeners[i].stop(function(idx){ - if(idx>-1) self.listeners.splice(idx, 1); - }); + listener.stop(listenerStopped); numRemoved++; } } @@ -348,12 +392,19 @@ MongoMQ.prototype.removeListeners = function(event){ }; MongoMQ.prototype.removeAllListeners = function(){ - var self = this, l = self.listeners.length; - for(var i = l-1; i>-1; i--){ - self.listeners[i].stop(function(idx){ - if(idx>-1) self.listeners.splice(idx, 1); - }); + try{ + var self = this, l = self.listeners.length; + var listenerStopped = function(err, listener){ + var idx = self.listeners.indexOf(listener); + if((idx!==false)&&(idx>-1)){ + self.listeners.splice(idx, 1); + } + }; + for(var i = l-1; i>-1; i--){ + self.listeners[i].stop(listenerStopped); + } + return true; + }catch(e){ + return false; } - return true; }; - diff --git a/lib/MongoMQ_bu.js b/lib/MongoMQ_bu.js deleted file mode 100644 index 65baf79..0000000 --- a/lib/MongoMQ_bu.js +++ /dev/null @@ -1,181 +0,0 @@ -var MC = require("./MongoConnection").MongoConnection; - -var MongoMQ = exports.MongoMQ = function(options){ - var self = this; - options = options || {}; - self.mqCollectionName = options.mqCollectionName||'queue'; - self.mqDB = options.mqDB||options.db||'MongoMQ'; - options.db = self.mqDB; - self.collectionSize = options.collectionSize||100000000; - self.handlers = []; - self.listening = false; - self.options = options; - if(options.autoStart||(typeof(options.autoStart)=='undefined')) self.start(); -}; - -MongoMQ.prototype.start = function(){ - var self = this; - if(self.listening) return; - var server = self.server = new MC(self.options, function(err, conn){ - if(err){ - console.log(err); - }else{ - self.db = conn._db; - self.db.collection(self.mqCollectionName, {safe: true}, function(err, collection){ - if(err){ - if(collection){ - throw err; - }else{ - self.createCollection(); - } - }else{ - self.startListening(collection); - } - }); - } - }); -}; - -MongoMQ.prototype.close = function(force, callback){ - var self = this; - if(!self.listening) return; - self.db.close(typeof(force)!=='undefined'?force:true, callback||function(){}); - self.collection = false; - self.listening = false; -}; - -MongoMQ.prototype.createCollection = function(){ - var self = this; - self.db.dropCollection(self.mqCollectionName, function(){ - self.db.createCollection(self.mqCollectionName, { - capped: true, - autoIndexId: true, - size: self.collectionSize - }, function(err, collection){ - self.startListening(collection); - }); - }); -}; - -MongoMQ.prototype.startListening = function(collection){ - var self = this; - if(self.listening) return; - collection = collection||self.db.collection(self.mqCollectionName, {safe: true}); - if(!collection) self.createCollection(); - else{ - collection.isCapped(function(err, isCapped){ - if(!isCapped) self.createCollection(); - else{ - self.collection = collection; - self.listening = true; - if(self.handlers.length>0){ - setTimeout(function(){ - var l = self.handlers.length, handler; - for(var i = 0; i0?msg[0]:msg, {emitted: -1}, {$set: {handled: true}}, {}, - function(err, data){ - if(err||(!data)){ - if(self.listening) next(); // someone else picked it up - }else doCallback(err, data); - }); - }); - }; - } - next(); - return next; -}; - -MongoMQ.prototype.nextMessage = function(msgType, passive, fromDT, callback){ - var self = this, filter = {$and: [{msg: msgType}, {$or: [{handled: false}, {handled: {$exists: false}}]}]}; - if(fromDT) filter.$and.push({emitted: {$gte: fromDT}}); - startListener(self, filter, passive||false, callback); -}; - -MongoMQ.prototype.emit = function(msg, pkt){ - var self = this; - if(!self.listening) throw "Queue Listener not started!"; - self.collection = self.collection||self.db.collection(self.mqCollectionName, {safe: true}); - msg = { - msg: msg, - pkt: pkt, - handled: false, - emitted: new Date() - }; - self.collection.insert(msg, function(){}); -}; - -MongoMQ.prototype.addHandler = function(info){ - var self = this, l = self.handlers.length, matches, key, handler; - for(var i = 0; i-1; i--){ - if(self.handlers[i].handler===callback) self.handlers.splice(i, 1); - } -}; - -MongoMQ.prototype.clearListeners = function(){ - var self = this; - if(self.listening) throw "Can't clear while listening"; - self.handlers = []; -}; \ No newline at end of file diff --git a/lib/QueueListener.js b/lib/QueueListener.js new file mode 100644 index 0000000..8dc4fe7 --- /dev/null +++ b/lib/QueueListener.js @@ -0,0 +1,175 @@ +/* + QueueListener(options, callback): + options: + mongoConnection: + collectionName: + handler(err, msg): + selector: + : + : + : + : + : + QueueListener.start(callback): + QueueListener.stop(callback): + QueueListener.next(data, closeCursor): +*/ + +var UUID = require('node-uuid'); + +var errors = { + E_CONNCLOSED: 'No active server connection!', + E_NODB: 'No database name provided!', + E_INVALIDFILTER: 'Invalid or no filter provided!', + E_NOCALLBACK: 'No callback provided, callback is required!', + E_INVALIDINDEX: 'Invalid or no index provided!', + E_INVALIDHANDLER: 'Must provide a valid function as handler!', + E_MISSINGCONSTRUCTOROPTIONS: 'Must provide options to constructor!', + E_INVALIDCONSTRUCTOROPTIONS: 'Constructor options must contain db, collectionName, handler, and selector!', + E_NOTACTIVE: 'Listener is not active!' + }; + +var passiveNext = function(err, msg){ + var self = this; + var next = function(){ + self.next.apply(self, arguments); + }; + var data = (msg&&msg.length>0)?msg[0]:msg; + if(data) data = data.data; + self.handler(err, data, next); +}; + +var activeNext = function(err, msg){ + var self = this; + var record = (msg&&msg.length>0)?msg[0]:msg; + var conversationId = msg.conversationId||false; + var next = function(){ + if(conversationId){ + var args = Array.prototype.slice.apply(arguments), l = args.length, done = false, data; +//console.log('next: ', conversationId, args); +/* + next(data, done); + next(data); + next(done); + next(); +*/ + switch(args.length){ + case 0: + done = true; + break; + case 1: + if(typeof(args[0])==='boolean'){ + done = args[0]; + }else{ + done = false; + data = args[0]; + } + break; + default: + data = args[0]; + done = !!args[1]; + break; + } + + var msgPkt = { + event: conversationId, + partial: !done, + data: data, + handled: false, + emitted: new Date(), + }; + + self.mongoConnection.insert(self.options.collectionName, msgPkt, function(){}); + self.next.call(self, done); + }else{ + self.next.apply(self, arguments); + } + }; + self.mongoConnection.findAndModify(self.options.collectionName, record, {emitted: -1}, {$set: {handled: true}}, {}, + function(err, data){ + if(!data) next(); + else self.handler(err, data.data, next); + }); +}; + +var QueueListener = exports.QueueListener = function(options, callback){ + var self = this, nextHandler; + if(typeof(options)!=='object'){ + throw errors.E_MISSINGCONSTRUCTOROPTIONS; + } + if(!(options.mongoConnection&&options.collectionName&&options.handler&&(typeof(options.selector)==='object'))){ + throw errors.E_INVALIDCONSTRUCTOROPTIONS; + } + self.options = options; + self.__defineGetter__('active', function(){ + return (this.options.mongoConnection.active)&&(!!this.cursor); + }); + self.__defineGetter__('nextHandler', function(){ + return nextHandler; + }); + self.__defineGetter__('event', function(){ + return options.event; + }); + self.__defineGetter__('handler', function(){ + return options.handler; + }); + self.__defineGetter__('mongoConnection', function(){ + return options.mongoConnection; + }); + if(options.passive){ + nextHandler = passiveNext; + }else{ + nextHandler = activeNext; + } + if(options.autoStart&&options.mongoConnection.db&&options.mongoConnection.db.openCalled){ + self.start(callback); + }else if(typeof(callback)==='function'){ + callback(null, self); + } +}; + +QueueListener.prototype.start = function(callback){ + var self = this; + if(self.active){ + if(typeof(callback)=='function'){ + callback(null, self); + } + }else{ + self.mongoConnection.tailedCursor(self.options.collectionName, self.options.selector, {$natural: 1}, function(err, cursor){ + self.cursor = cursor; + self.next(); + if(typeof(callback)=='function'){ + callback(null, self); + } + }); + } +}; + +QueueListener.prototype.stop = function(callback){ + var self = this; + if(self.cursor){ + self.cursor.close(); + if(typeof(self.options.closedHandler)==='function'){ + self.options.closedHandler(null, self); + } + self.cursor = false; + } + if(typeof(callback)==='function') callback(null, self); +}; + +QueueListener.prototype.next = function(closeCursor){ + var self = this; + if(closeCursor||(!self.active)){ + self.stop(); + }else{ + self.cursor.nextObject(function(err, msg){ + if(msg){ + self.nextHandler.apply(self, arguments); + }else{ + process.nextTick(function(){ + self.next(); + }); + } + }); + } +}; diff --git a/package.json b/package.json index c4906a4..9de086a 100644 --- a/package.json +++ b/package.json @@ -4,7 +4,7 @@ "email": "jeremy.darling@gmail.com" }, "name": "mongomq", - "version": "0.2.1", + "version": "0.2.2", "repository": { "type": "git", "url": "git://github.com/jdarling/MongoMQ.git" @@ -19,6 +19,7 @@ "node": ">= v0.6.0" }, "dependencies": { - "mongodb": ">= 1.0.2" + "mongodb": ">= 1.0.2", + "node-uuid": ">= 1.3.3" } } diff --git a/readme.md b/readme.md index c89239d..c6172b3 100644 --- a/readme.md +++ b/readme.md @@ -1,7 +1,7 @@ MongoMQ - Node.js MongoMQ ========================= -Version 0.2.1 presents what will hopefully be the final API, but is not feature complete yet. Hopefully version 0.2.2 will be feature complete. +Version 0.2.2 presents what will hopefully be the final API, but is not feature complete yet. Hopefully version 0.2.2 will be feature complete. Installation ============ @@ -69,13 +69,11 @@ MongoMQ.emit(msgType, data, [partialCallback], [completeCallback]) Places the a message of msgTye on the queue with the provided data for handlers to consume. -NOTE: partialCallback and completeCallback are not yet implemented, they are here as a prototype of how this functionality may be implemented. - Params: * msgType - The message type to emit. * data - a JSON serializeable collection of data to be sent. -* partialCallback - NOT IMPLEMENTED - Will be called for large or long running result sets to return partial data back. Optional and if not present then completeCallback will be called with buffered results once all operations have completed. -* completeCallback - NOT IMPLEMENTED - Will be called once all remote processing has been completed. If partialCallback is not provided and completeCallback is provided a temporary buffer will be setup and the final result set will be sent to completeCallback. +* partialCallback - Will be called for large or long running result sets to return partial data back. Optional and if not present then completeCallback will be called with buffered results once all operations have completed. +* completeCallback - Will be called once all remote processing has been completed. If partialCallback is not provided and completeCallback is provided a temporary buffer will be setup and the final result set will be sent to completeCallback. MongoMQ.on(msgType, [passive||options], handler) --------------------------------------- @@ -86,7 +84,8 @@ Params: * msgType - The message type to listen for * passive - If true will not mark the message as handled when a message is consumed from the queue * options - additional options that can be passed -* callback(err, messageContents, next) - Use next() to look for another message in the queue, don't call next() if you only want a one time listener +* handler(err, messageContents, next) - Use next() to look for another message in the queue, don't call next() if you only want a one time listener + * If you want to send back data or partial data use next(data, complete) where complete should be true if you have sent all of your responses, see test.js r.context.tmp for a simple example. options - * passive - If true will not mark the message as handled when a message is consumed from the queue @@ -247,6 +246,10 @@ Planned Improvements Update History ============== +v0.2.2 + * Completed code to allow for callbacks and partial callbacks to be issued back to emit statements + * Complteed refactoring of code to properly seperate functionality into objects + v0.2.1 * Majorly refactored code * Added autoIndexId: true to queue collection creation