Permalink
Browse files

Upgrades and fixes

v0.2.9
* Change SafeDBDriver default value from false to true, this fixes the
issue with multiple listeners picking up the same message since Mongo
doesn't perform record locking on updates if this isn't true.
* Fix autoStart
* Resolves #9 and #10

v0.2.8
* Upgraded code for new MongoDB Native Drivers (thanks mfrobben for
starting points)
* Readme cleanup (thanks ttezel for pointing this out and fixing it)
* Resolves #7 and #6
  • Loading branch information...
1 parent 2124bd5 commit f6617dad5adff1b755935b78c145375ba59f884d @jdarling committed Dec 12, 2012
Showing with 105 additions and 33 deletions.
  1. +22 −8 lib/MongoConnection.js
  2. +49 −16 lib/MongoMQ.js
  3. +10 −3 package.json
  4. +24 −6 readme.md
View
@@ -48,7 +48,9 @@ var defaults = {
CollectionOptions: {safe: false},
ServerHost: 'localhost',
ServerOptions: {auto_reconnect: false},
- CappedCollectionSize: 104857600
+ CappedCollectionSize: 104857600,
+ NativeParser : false,
+ SafeDBDriver : true // If this isn't true, then messages can get picked up by two different listeners
}
var MongoConnection = exports.MongoConnection = function(options, callback){
@@ -118,13 +120,17 @@ MongoConnection.prototype.open = function(callback){
}else{
serverConnection = new Server(self.server.host, self.server.port, options.serverOptions||defaults.ServerOptions);
}
-
- var db = new Db(self.database, serverConnection, {native_parser:(options.nativeParser==null?false:options.nativeParser)});
+
+ var dbOptions = {};
+ dbOptions.safe = options.safeDriver || defaults.SafeDBDriver;
+ dbOptions.native_parser = options.nativeParser || defaults.NativeParser;
+
+ var db = new Db(self.database, serverConnection, dbOptions);
db.open(function(err, _db){
self.db = db;
if(options.username&&options.password){
if(options.authenticateAgainstDb){
- adminDb.authenticate(options.username, options.password, function(err, result){
+ db.authenticate(options.username, options.password, function(err, result){
if(typeof(callback)=='function'){
callback(null, self);
}
@@ -229,7 +235,6 @@ MongoConnection.prototype.find = function(){
}
}else{
if(callback){
- //callback(null, collection.find.apply(collection, args));
collection.find.apply(collection, args);
}
}
@@ -278,17 +283,26 @@ MongoConnection.prototype.ensureCappedCollection = function(collectionName, size
self.createCollection(collectionName, collOptions, callback);
}else{
collection.isCapped(function(err, capped){
+ if(err){
+ if(typeof(callback)==='function'){
+ return callback(err, null);
+ }
+ throw err;
+ }
if(!!capped){
if(typeof(callback)==='function'){
callback(null, collection);
}
}else{
- collection.insert({workaround: 'This works around a bug with capping empty collections.'}, function(){
+ collection.insert({workaround: 'This works around a bug with capping empty collections.'}, {safe:true}, function(){
self.db.command({"convertToCapped": collectionName, size: collOptions.size}, function(err, result){
if(typeof(callback)==='function'){
if(err){
- callback(err, result);
- }else{
+ callback(err, null);
+ }else if (result.ok===0){
+ callback(new Error(result.errmsg), null);
+ }
+ else{
self.collection(collectionName, callback);
}
}
View
@@ -101,6 +101,32 @@ var MongoMQ = exports.MongoMQ = function(options, callback){
collectionName: 'queue'
};
var listeners = [], mongoConnection = options.mongoConnection;
+ var setupProperties = function(){
+ self.__defineGetter__('options', function(){
+ return options;
+ });
+ self.__defineGetter__('listeners', function(){
+ return listeners;
+ });
+ self.__defineGetter__('listening', function(){
+ return mongoConnection.active;
+ });
+ self.__defineGetter__('mongoConnection', function(){
+ return mongoConnection;
+ });
+ };
+ var doCallback = function(err){
+ if(typeof(callback)==='function'){
+ callback(err, self);
+ }
+ };
+ var checkStartDoCallback = function(){
+ if(options.autoStart){
+ self.start(doCallback);
+ }else{
+ doCallback();
+ }
+ };
if(typeof(options)==='function'){
callback = options;
options = {};
@@ -110,20 +136,15 @@ var MongoMQ = exports.MongoMQ = function(options, callback){
options.database = options.database||options.db||options.mqDB||defaults.db;
options.collectionName = options.collectionName||options.mqCollectionName||defaults.collectionName;
options.collectionSize = options.collectionSize||defaults.collectionSize;
- self.__defineGetter__('options', function(){
- return options;
- });
- self.__defineGetter__('listeners', function(){
- return listeners;
- });
- self.__defineGetter__('listening', function(){
- return mongoConnection.active;
- });
- self.__defineGetter__('mongoConnection', function(){
- return mongoConnection;
- });
if(!mongoConnection){
- mongoConnection = new MC(options, callback);
+ new MC(options, function(err, conn){
+ mongoConnection = conn;
+ setupProperties();
+ checkStartDoCallback();
+ });
+ }else{
+ setupProperties();
+ checkStartDoCallback();
}
};
@@ -140,7 +161,12 @@ MongoMQ.prototype.start = function(callback){
}
};
var ensureIndexes = function(err, collection){
- collection.ensureIndex({event: 1, handled: -1, emitted: 1, partial: -1}, {safe: true}, function(err, indexName){
+ if (err && typeof(callback) === 'function')
+ return callback(err);
+ else if (err)
+ throw err;
+ else
+ collection.ensureIndex({_id: 1, event: 1, handled: -1, emitted: 1, partial: -1}, {safe: true}, function(err, indexName){
startListeners(err, collection);
});
};
@@ -195,12 +221,19 @@ MongoMQ.prototype.emit = function(msgType, data, partialCallback, completeCallba
partial: false,
host: hostName
};
+
+ // closure expects partial callback to always exist. If caller passes completeCallback
+ // and not partialCallback, fix up the parameters for them.
+ if (!partialCallback && typeof(completeCallback) === 'function'){
+ partialCallback = completeCallback;
+ completeCallback = null;
+ }
+
if(typeof(partialCallback)==='function'){
(function(partialCallback, completeCallback){
var results = false;
var messageId = msgPkt.messageId = hostName+'::'+msgType+'::'+UUID.v4();
- var completeFilter = {event: messageId, handled: true, partial: true};//, emitted: {$gte: msgPkt.emitted}};
-//console.log('completeFilter:', completeFilter);
+ var completeFilter = {event: messageId, handled: true, partial: true};
var completeTest = function(call_err, next){
var numExpected = next&&next.msg?next.msg.numParts||0:0;
self.collection.find(completeFilter).count(function(err, count){
View
@@ -4,12 +4,19 @@
"email": "jeremy.darling@gmail.com"
},
"name": "mongomq",
- "version": "0.2.7",
+ "version": "0.2.9",
"repository": {
"type": "git",
"url": "git://github.com/jdarling/MongoMQ.git"
},
- "contributors": [],
+ "contributors": [
+ {
+ "name": "mfrobben"
+ },
+ {
+ "name": "joscha"
+ }
+ ],
"main": "./lib/MongoMQ",
"bin": {
"MongoMQ": "./bin/MongoMQ.js"
@@ -18,7 +25,7 @@
"node": ">= v0.8.2"
},
"dependencies": {
- "mongodb": ">= 1.0.2",
+ "mongodb": ">= 1.2.2",
"node-uuid": ">= 1.3.3"
},
"_id": "mongomq@0.2.3",
View
@@ -1,17 +1,25 @@
MongoMQ - Node.js MongoMQ
=========================
-Version 0.2.7 fixed a cursor leak when using passive callbacks.
-Version 0.2.6 bug fix related to relplica set configuration loading from config.json files.
-Version 0.2.5 general code cleanup and some optimizations.
+>Version 0.2.9 fixes settings so that two consumers can't pickup a message at the same time.
+>
+>Version 0.2.8 upgraded code for new MongoDB Native Drivers.
+>
+>Version 0.2.7 fixed a cursor leak when using passive callbacks.
+>
+>Version 0.2.6 bug fix related to relplica set configuration loading from config.json files.
+>
+>Version 0.2.5 general code cleanup and some optimizations.
+>
Installation
============
From GitHub
-----------
* Download from GitHub and extract.
- * npm install mongodb
+ * change to extracted directory
+ * execute "npm install"
Using NPM
---------
@@ -42,10 +50,10 @@ options
* mqDB - Database to store queue in, defaults to 'MongoMQ'
* username - Optional value of the username to validate against Mongo with
* password - Optional value of the password to validate against Mongo with
- * host - If not running against a ReplicaSet this is the server to connect to
+ * host - If not running against a ReplicaSet this is the server to connect to
* port - If not running against a ReplicaSet this is the server port to connect with
* servers[] - If connecting to a ReplicaSet then this is a collection of {host: 'hostname', port: 1234} objects defining the root entry for the ReplicaSet
- * collectionSize - The size in bytes to cap the collection at, defaults to 100000000
+ * collectionSize - The size in bytes to cap the collection at, defaults to 104857600
* serverOptions - An optional options object that will be passed to Mongo-Native when the Mongo connection is created
* nativeParser - Boolean (defaults false) to enable usage of Mongo-Native's native parser. Only use this if you install mongodb with native support
* autoStart - Boolean (defaults true) if the MongoMQ instance should start itself up when created, if set to false you need to call MongoMQ.start()
@@ -260,6 +268,16 @@ How Events are stored
Update History
==============
+v0.2.9
+ * Change SafeDBDriver default value from false to true, this fixes the issue with multiple listeners picking up the same message since Mongo doesn't perform record locking on updates if this isn't true.
+ * Fix autoStart
+ * Resolves #9 and #10
+
+v0.2.8
+ * Upgraded code for new MongoDB Native Drivers (thanks mfrobben for starting points)
+ * Readme cleanup (thanks ttezel for pointing this out and fixing it)
+ * Resolves #7 and #6
+
v0.2.7
* Fixed a cursor leak when using passive callbacks

0 comments on commit f6617da

Please sign in to comment.