Permalink
Browse files

Fixed auth reconnect of replicaset members as well as Mongos #929

  • Loading branch information...
1 parent 3003ea4 commit 436242829acfde38fe87f2c3323b3f1c158571c8 @christkv christkv committed Apr 6, 2013
@@ -167,14 +167,14 @@ Mongos.prototype.connect = function(db, options, callback) {
// Create a new server object
var newServer = new Server(downServer.host, downServer.port, options);
// Setup the connection function
- var connectFunction = function(_db, _server, _options) {
+ var connectFunction = function(_db, _server, _options, _callback) {
return function() {
// Attempt to connect
_server.connect(_db, _options, function(err, result) {
numberOfServersLeft = numberOfServersLeft - 1;
if(err) {
- self.downServers.push(_server);
+ return _callback(err, _server);
} else {
// Set the new server settings
_server._callBackStore = self._callBackStore;
@@ -184,13 +184,11 @@ Mongos.prototype.connect = function(db, options, callback) {
_server.on("timeout", errorOrCloseHandler(_server));
_server.on("error", errorOrCloseHandler(_server));
- // Add the server to the list of available servers
- self.servers.push(_server);
-
// Get a read connection
var _connection = _server.checkoutReader();
// Get the start time
var startTime = new Date().getTime();
+
// Execute ping command to mark each server with the expected times
self.db.command({ping:1}
, {failFast:true, connection:_connection}, function(err, result) {
@@ -202,114 +200,107 @@ Mongos.prototype.connect = function(db, options, callback) {
self.servers.sort(function(a, b) {
return a.runtimeStats['pingMs'] > b.runtimeStats['pingMs'];
});
- });
- }
- if(numberOfServersLeft == 0) {
- self._haInProgress = false;
+ // Callback
+ return _callback(null, _server);
+ });
}
});
}
}
// Attempt to connect to the database
- connectFunction(self.db, newServer, options)();
+ connectFunction(self.db, newServer, options, function(err, _server) {
+ // If we have an error
+ if(err) {
+ self.downServers.push(_server);
+ }
+
+ // Connection function
+ var connectionFunction = function(_auth, _connection, _callback) {
+ var pending = _auth.length();
+
+ for(var j = 0; j < pending; j++) {
+ // Get the auth object
+ var _auth = _auth.get(j);
+ // Unpack the parameter
+ var username = _auth.username;
+ var password = _auth.password;
+ var options = {
+ authMechanism: _auth.authMechanism
+ , authSource: _auth.authdb
+ , connection: _connection
+ };
+
+ // Hold any error
+ var _error = null;
+ // Authenticate against the credentials
+ self.db.authenticate(username, password, options, function(err, result) {
+ _error = err != null ? err : _error;
+ // Adjust the pending authentication
+ pending = pending - 1;
+ // Finished up
+ if(pending == 0) _callback(_error ? _error : null, _error ? false : true);
+ });
+ }
+ }
+
+ // Run auths against the connections
+ if(self.auth.length() > 0) {
+ var connections = _server.allRawConnections();
+ var pendingAuthConn = connections.length;
+
+ // No connections we are done
+ if(connections.length == 0) {
+ // Set ha done
+ if(numberOfServersLeft == 0) {
+ self._haInProgress = false;
+ }
+ }
+
+ // Final error object
+ var finalError = null;
+ // Go over all the connections
+ for(var j = 0; j < connections.length; j++) {
+
+ // Execute against all the connections
+ connectionFunction(self.auth, connections[j], function(err, result) {
+ // Pending authentication
+ pendingAuthConn = pendingAuthConn - 1 ;
+
+ // Save error if any
+ finalError = err ? err : finalError;
+
+ // If we are done let's finish up
+ if(pendingAuthConn == 0) {
+ // Set ha done
+ if(numberOfServersLeft == 0) {
+ self._haInProgress = false;
+ }
+
+ if(finalError) {
+ return self.downServers.push(_server);
+ }
+
+ // Push to list of valid server
+ self.servers.push(_server);
+ }
+ });
+ }
+ } else {
+ self.servers.push(_server);
+ // Set ha done
+ if(numberOfServersLeft == 0) {
+ self._haInProgress = false;
+ }
+ }
+ })();
}
} else {
self._haInProgress = false;
}
}
-
- // console.log("========================================= mongos check function")
- // // If we have down servers let's attempt a reconnect
- // if(self.downServers.length > 0) {
- // var numberOfServersLeft = self.downServers.length;
- // // Attempt to reconnect
- // for(var i = 0; i < self.downServers.length; i++) {
- // var downServer = self.downServers.pop();
-
- // // Configuration
- // var options = {
- // slaveOk: true,
- // poolSize: 1,
- // socketOptions: { connectTimeoutMS: self._connectTimeoutMS },
- // returnIsMasterResults: true
- // }
-
- // // Attemp to reconnect
- // downServer.connect(self.db, options, function(_server) {
- // // Return a function to check for the values
- // return function(err, result) {
- // console.log("========================================= mongos check function 0")
- // // Adjust the number of servers left
- // numberOfServersLeft = numberOfServersLeft - 1;
-
- // if(err != null) {
- // console.log("========================================= mongos check function 1")
- // self.downServers.push(_server);
- // } else {
- // console.log("========================================= mongos check function 2")
- // // Add server event handlers
- // _server.on("close", errorOrCloseHandler(_server));
- // _server.on("error", errorOrCloseHandler(_server));
- // _server.on("timeout", errorOrCloseHandler(_server));
- // // Add to list of servers
- // self.servers.push(_server);
- // }
-
- // if(numberOfServersLeft <= 0) {
- // // Perfom another ha
- // self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
- // }
- // }
- // }(downServer));
- // }
- // } else if(self.servers.length > 0) {
- // var numberOfServersLeft = self.servers.length;
- // var _s = new Date().getTime()
-
- // // Else let's perform a ping command
- // for(var i = 0; i < self.servers.length; i++) {
- // var executePing = function(_server) {
- // // Get a read connection
- // var _connection = _server.checkoutReader();
- // // Execute ping command
- // self.db.command({ping:1}, {failFast:true, connection:_connection}, function(err, result) {
- // var pingTime = new Date().getTime() - _s;
- // // If no server set set the first one, otherwise check
- // // the lowest ping time and assign the server if it's got a lower ping time
- // if(self.lowestPingTimeServer == null) {
- // self.lowestPingTimeServer = _server;
- // self.lowestPingTime = pingTime;
- // console.log("=========================================== ping 0")
- // self._currentMongos = _server;
- // } else if(self.lowestPingTime > pingTime
- // && (_server.host != self.lowestPingTimeServer.host || _server.port != self.lowestPingTimeServer.port)) {
- // self.lowestPingTimeServer = _server;
- // self.lowestPingTime = pingTime;
- // console.log("=========================================== ping 1")
- // self._currentMongos = _server;
- // }
-
- // // Number of servers left
- // numberOfServersLeft = numberOfServersLeft - 1;
- // // All active mongos's pinged
- // if(numberOfServersLeft == 0) {
- // // Perfom another ha
- // self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
- // }
- // })
- // }
-
- // // Execute the function
- // executePing(self.servers[i]);
- // }
- // } else {
- // self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
- // }
- // }
-
// Connect all the server instances
for(var i = 0; i < this.servers.length; i++) {
// Get the connection
@@ -447,14 +447,16 @@ ReplSet.prototype._enableHA = function () {
self._haServer = null;
return;
}
+
// If error let's set perform another check
if(err) {
// Force new server selection
self._haServer = null;
return check();
}
+
// Validate the replicaset
- self._validateReplicaset(res, db.auths, function() {
+ self._validateReplicaset(res, self.auth, function() {
check();
});
});
@@ -473,7 +475,7 @@ ReplSet.prototype._enableHA = function () {
/**
* @ignore
*/
-ReplSet.prototype._validateReplicaset = function(result, auths, cb) {
+ReplSet.prototype._validateReplicaset = function(result, auth, cb) {
var self = this;
var res = result.documents[0];
@@ -507,25 +509,25 @@ ReplSet.prototype._validateReplicaset = function(result, auths, cb) {
}
}
- connectTo(hosts, auths, self, cb);
+ connectTo(hosts, auth, self, cb);
}
/**
* Create connections to all `hosts` firing `cb` after
* connections are attempted for all `hosts`.
*
* @param {Array} hosts
- * @param {Array} [auths]
+ * @param {AuthStore} [auth]
* @param {ReplSet} replset
* @param {Function} cb
* @ignore
*/
-function connectTo(hosts, auths, replset, cb) {
+function connectTo(hosts, auth, replset, cb) {
var pending = hosts.length;
if (!pending) return cb();
for(var i = 0; i < hosts.length; ++i) {
- connectToHost(hosts[i], auths, replset, handle);
+ connectToHost(hosts[i], auth, replset, handle);
}
function handle () {
@@ -539,12 +541,12 @@ function connectTo(hosts, auths, replset, cb) {
* for the given `replset` firing `cb` when finished.
*
* @param {String} host
- * @param {Array} auths
+ * @param {AuthStore} auth
* @param {ReplSet} replset
* @param {Function} cb
* @ignore
*/
-function connectToHost(host, auths, replset, cb) {
+function connectToHost(host, auth, replset, cb) {
var server = createServer(host, replset);
var options = {
@@ -579,34 +581,60 @@ function connectToHost(host, auths, replset, cb) {
}
// authenticate if necessary
- if(!(Array.isArray(auths) && auths.length > 0)) {
+ if(auth.length() == 0) {
return complete();
}
- var pending = auths.length;
-
+ var pending = auth.length();
var connections = server.allRawConnections();
var pendingAuthConn = connections.length;
- for(var x = 0; x <connections.length; x++) {
- var connection = connections[x];
- var authDone = false;
- for(var i = 0; i < auths.length; i++) {
- var auth = auths[i];
- var options = { authdb: auth.authdb, connection: connection };
- var username = auth.username;
- var password = auth.password;
- replset.db.authenticate(username, password, options, function() {
- --pending;
- if(0 === pending) {
- authDone = true;
- --pendingAuthConn;
- if(0 === pendingAuthConn) {
- return complete();
- }
- }
+
+ // Connection function
+ var connectionFunction = function(_auth, _connection, _callback) {
+ var pending = _auth.length();
+
+ for(var j = 0; j < pending; j++) {
+ // Get the auth object
+ var _auth = _auth.get(j);
+ // Unpack the parameter
+ var username = _auth.username;
+ var password = _auth.password;
+ var options = {
+ authMechanism: _auth.authMechanism
+ , authSource: _auth.authdb
+ , connection: _connection
+ };
+
+ // Hold any error
+ var _error = null;
+ // Authenticate against the credentials
+ replset.db.authenticate(username, password, options, function(err, result) {
+ _error = err != null ? err : _error;
+ // Adjust the pending authentication
+ pending = pending - 1;
+ // Finished up
+ if(pending == 0) _callback(_error ? _error : null, _error ? false : true);
});
}
}
+
+ // Final error object
+ var finalError = null;
+ // Iterate over all the connections
+ for(var i = 0; i < connections.length; i++) {
+ connectionFunction(auth, connections[i], function(err, result) {
+ // Pending authentication
+ pendingAuthConn = pendingAuthConn - 1 ;
+
+ // Save error if any
+ finalError = err ? err : finalError;
+
+ // If we are done let's finish up
+ if(pendingAuthConn == 0) {
+ complete();
+ }
+ });
+ }
});
}
Oops, something went wrong.

0 comments on commit 4362428

Please sign in to comment.