Permalink
Browse files

Workaround for MongoDB going away and tailed cursors not getting upda…

…ted.

In QueueListener see +var monitor = function(what){
+  what._monitor = setTimeout(function(){
+    if(!what.active){
+      what.start();
+    }else{
+      return monitor(what);
+    }
+  }, 100);
+};
  • Loading branch information...
1 parent d6942ae commit 348a29500d867ce0d275ddbbb95342a314d16a14 @jdarling committed Jan 7, 2013
Showing with 133 additions and 63 deletions.
  1. +111 −62 lib/MongoConnection.js
  2. +19 −0 lib/QueueListener.js
  3. +1 −1 package.json
  4. +2 −0 readme.md
View
@@ -95,10 +95,29 @@ var MongoConnection = exports.MongoConnection = function(options, callback){
}
};
+MongoConnection.prototype.checkOpen = function(callback){
+ var self = this;
+ if(self._open){
+ if(!self.active){
+ self.open(function(err, conn){
+ callback(err, !!conn);
+ });
+ }else{
+ callback(null, true);
+ }
+ }else{
+ throw errors.E_CONNCLOSED;
+ }
+};
+
MongoConnection.prototype.open = function(callback){
var self = this, options = self.options;
var serverConnection;
+ var setupOpen = function(){
+ self._open = true;
+ };
+
if(self.active){
if(typeof(callback)=='function'){
callback(null, self);
@@ -132,20 +151,23 @@ MongoConnection.prototype.open = function(callback){
if(options.authenticateAgainstDb){
db.authenticate(options.username, options.password, function(err, result){
if(typeof(callback)=='function'){
+ setupOpen();
callback(null, self);
}
});
}else{
db.admin(function(err, adminDb){
adminDb.authenticate(options.username, options.password, function(err, result){
if(typeof(callback)=='function'){
+ setupOpen();
callback(null, self);
}
});
});
}
}else{
if(typeof(callback)=='function'){
+ setupOpen();
callback(null, self);
}
}
@@ -154,6 +176,9 @@ MongoConnection.prototype.open = function(callback){
MongoConnection.prototype.close = function(callback){
var self = this;
+ if(self._open){
+ self._open = false;
+ }
if(self.active){
self.db.close(true, function(err, result){
self.db = false;
@@ -166,18 +191,22 @@ MongoConnection.prototype.close = function(callback){
MongoConnection.prototype.collection = function(collectionName, options, callback){
var self = this;
+ /*
if(!self.active){
throw errors.E_CONNCLOSED;
}
- if(typeof(options)==='function'){
- callback = options;
- options = defaults.CollectionOptions;
- }
- if(typeof(callback)==='function'){
- self.db.collection(collectionName, options||defaults.CollectionOptions, callback);
- }else{
- return self.db.collection(collectionName, options||defaults.CollectionOptions);
- }
+ */
+ self.checkOpen(function(){
+ if(typeof(options)==='function'){
+ callback = options;
+ options = defaults.CollectionOptions;
+ }
+ if(typeof(callback)==='function'){
+ self.db.collection(collectionName, options||defaults.CollectionOptions, callback);
+ }else{
+ return self.db.collection(collectionName, options||defaults.CollectionOptions);
+ }
+ });
};
MongoConnection.prototype.tailedCursor = function(collectionName, filter, sort, callback){
@@ -338,93 +367,113 @@ MongoConnection.prototype.remove = function(fromCollection, selector, options, c
MongoConnection.prototype.createCollection = function(collectionName, options, callback){
var self = this;
+ /*
if(!self.active){
throw errors.E_CONNCLOSED;
}
- if(typeof(options)=='function'){
- callback = options;
- options = {};
- }
- callback = callback || function(){};
- self.db.createCollection(collectionName, options, callback);
+ */
+ self.checkOpen(function(){
+ if(typeof(options)=='function'){
+ callback = options;
+ options = {};
+ }
+ callback = callback || function(){};
+ self.db.createCollection(collectionName, options, callback);
+ });
};
MongoConnection.prototype.dropCollection = function(collectionName, callback){
var self = this;
+ /*
if(!self.active){
throw errors.E_CONNCLOSED;
}
- callback = callback || function(){};
- self.db.dropCollection(collectionName, callback);
+ */
+ self.checkOpen(function(){
+ callback = callback || function(){};
+ self.db.dropCollection(collectionName, callback);
+ });
};
MongoConnection.prototype.ensureIndex = function(collectionName, index, options, callback){
var self = this;
+ /*
if(!self.active){
throw errors.E_CONNCLOSED;
}
- if(typeof(options)=='function'){
- callback = options;
- options = {};
- }
- callback = callback || function(){};
- if(typeof(index)!=='object'){
- throw errors.E_INVALIDINDEX;
- }
- self.db.ensureIndex(collectionName, index, options, callback);
+ */
+ self.checkOpen(function(){
+ if(typeof(options)=='function'){
+ callback = options;
+ options = {};
+ }
+ callback = callback || function(){};
+ if(typeof(index)!=='object'){
+ throw errors.E_INVALIDINDEX;
+ }
+ self.db.ensureIndex(collectionName, index, options, callback);
+ });
};
MongoConnection.prototype.readGridFS = function(fileName, callback){
var self = this;
+ /*
if(!self.active){
throw errors.E_CONNCLOSED;
}
- GridStore.exist(self.db, fileName, function(err, exists){
- if(exists===false){
- callback(err, false);
- }else{
- var gridStore = new GridStore(self.db, fileName, 'r');
- gridStore.open(function(err, gridStore) {
- gridStore.read(function(){
- callback.apply(self, arguments);
- gridStore.close(function(){});
+ */
+ self.checkOpen(function(){
+ GridStore.exist(self.db, fileName, function(err, exists){
+ if(exists===false){
+ callback(err, false);
+ }else{
+ var gridStore = new GridStore(self.db, fileName, 'r');
+ gridStore.open(function(err, gridStore) {
+ gridStore.read(function(){
+ callback.apply(self, arguments);
+ gridStore.close(function(){});
+ });
});
- });
- }
+ }
+ });
});
};
MongoConnection.prototype.streamGridFS = function(fileName, callback){
var self = this;
+ /*
if(!self.active){
throw errors.E_CONNCLOSED;
}
- GridStore.exist(self.db, fileName, function(err, exists){
- if(!exists){
- callback(err, false);
- }else{
- var gridStore = new GridStore(self.db, fileName, 'r');
- var doput = function(done){
- return function(data){
- if(data){
- callback(null, data.toString());
- }
- if(done){
- callback(null, null);
- }
+ */
+ self.checkOpen(function(){
+ GridStore.exist(self.db, fileName, function(err, exists){
+ if(!exists){
+ callback(err, false);
+ }else{
+ var gridStore = new GridStore(self.db, fileName, 'r');
+ var doput = function(done){
+ return function(data){
+ if(data){
+ callback(null, data.toString());
+ }
+ if(done){
+ callback(null, null);
+ }
+ };
};
- };
- var doerror = function(done){
- return function(err){
- callback(err, null);
+ var doerror = function(done){
+ return function(err){
+ callback(err, null);
+ };
};
- };
- gridStore.open(function(err, gridStore) {
- var stream = gridStore.stream(true);
- stream.on('data', doput(false));
- stream.on('error', doerror(true));
- stream.on('end', doput(true));
- });
- }
+ gridStore.open(function(err, gridStore) {
+ var stream = gridStore.stream(true);
+ stream.on('data', doput(false));
+ stream.on('error', doerror(true));
+ stream.on('end', doput(true));
+ });
+ }
+ });
});
};
View
@@ -153,6 +153,20 @@ var QueueListener = exports.QueueListener = function(options, callback){
}
};
+// NOTE: This is a workaround for when the servers go away,
+// shouldn't need it but do since auto_reconnect doesn't
+// reconnect tailedCursor's
+
+var monitor = function(what){
+ what._monitor = setTimeout(function(){
+ if(!what.active){
+ what.start();
+ }else{
+ return monitor(what);
+ }
+ }, 100);
+};
+
QueueListener.prototype.start = function(callback){
var self = this;
if(self.active){
@@ -163,6 +177,7 @@ QueueListener.prototype.start = function(callback){
self.mongoConnection.tailedCursor(self.options.collectionName, self.options.selector, {$natural: 1}, function(err, cursor){
self.cursor = cursor;
self.next();
+ monitor(self);
if(typeof(callback)=='function'){
callback(null, self);
}
@@ -173,6 +188,10 @@ QueueListener.prototype.start = function(callback){
QueueListener.prototype.stop = function(callback){
var self = this;
if(self.cursor){
+ if(self._monitor){
+ clearTimeout(self._monitor);
+ self._monitor = false;
+ }
self.cursor.close();
if(typeof(self.options.closedHandler)==='function'){
self.options.closedHandler(null, self);
View
@@ -4,7 +4,7 @@
"email": "jeremy.darling@gmail.com"
},
"name": "mongomq",
- "version": "0.2.9",
+ "version": "0.2.10",
"repository": {
"type": "git",
"url": "git://github.com/jdarling/MongoMQ.git"
View
@@ -1,6 +1,8 @@
MongoMQ - Node.js MongoMQ
=========================
+>Version 0.2.10 introduces a work around in QueneListener for when the Mongo server goes away.
+>
>Version 0.2.9 fixes settings so that two consumers can't pickup a message at the same time.
>
>Version 0.2.8 upgraded code for new MongoDB Native Drivers.

0 comments on commit 348a295

Please sign in to comment.