Skip to content

Commit

Permalink
Merged in and resolved issues from master
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Jan 25, 2013
2 parents 4102e48 + fbfd68a commit 63e1db7
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 44 deletions.
2 changes: 1 addition & 1 deletion docs/articles/NodeKOArticle1.md
Expand Up @@ -17,7 +17,7 @@ So there is an important thing to keep in mind when working with Mongo DB, and t
* **Integers** is a bit trickier due to the fact that Javascript represents all Numbers as 64 bit floats meaning that the maximum integer value is at a 53 bit. Mongo has two types for integers, a 32 bit and a 64 bit. The driver will try to fit the value into 32 bits if it can and promote it to 64 bits if it has to. Similarly it will deserialize attempting to fit it into 53 bits if it can. If it cannot it will return an instance of **Long** to avoid loosing precession.
* **Long class** a special class that let's you store 64 bit integers and also let's you operate on the 64 bits integers.
* **Date** maps directly to a Javascript Date
* **RegEp** maps directly to a Javascript RegExp
* **RegExp** maps directly to a Javascript RegExp
* **String** maps directly to a Javascript String (encoded in utf8)
* **Binary class** a special class that let's you store data in Mongo DB
* **Code class** a special class that let's you store javascript functions in Mongo DB, can also provide a scope to run the method in
Expand Down
113 changes: 70 additions & 43 deletions lib/mongodb/connection/repl_set.js
Expand Up @@ -208,9 +208,9 @@ var ReplSet = exports.ReplSet = function(servers, options) {

// Enabled ha
this.haEnabled = this.options['ha'] == null ? true : this.options['ha'];
// this.haEnabled = false
this._haServer = null;
// How often are we checking for new servers in the replicaset
this.replicasetStatusCheckInterval = this.options['haInterval'] == null ? 1000 : this.options['haInterval'];
this.replicasetStatusCheckInterval = this.options['haInterval'] == null ? 5000 : this.options['haInterval'];
this._replicasetTimeoutId = null;

// Connection timeout
Expand Down Expand Up @@ -342,57 +342,84 @@ ReplSet.prototype._enableHA = function () {

function ping () {
if("disconnected" == self._serverState) return;
// Create a list of all servers we can send the ismaster command to
var allServers = self._state.master != null ? [self._state.master] : [];

if(Object.keys(self._state.addresses).length == 0) return;
var selectedServer = self._state.addresses[Object.keys(self._state.addresses)[self.updateHealthServerTry++]];
if(self.updateHealthServerTry >= Object.keys(self._state.addresses).length) self.updateHealthServerTry = 0;
if(selectedServer == null) return check();
// Secondary keys
var keys = Object.keys(self._state.secondaries);
// Add all secondaries
for(var i = 0; i < keys.length; i++) {
allServers.push(self._state.secondaries[keys[i]]);
}

// If no servers quit as are probably connecting
if(allServers.length == 0) return;
// Pick one of the servers
self.updateHealthServerTry = self.updateHealthServerTry++ % allServers.length;
var selectedServer = allServers[self.updateHealthServerTry];

// If we have an active db instance
if(self.dbInstances.length > 0) {
var db = self.dbInstances[0];

// Create a new master connection
var _server = new Server(selectedServer.host, selectedServer.port, {
auto_reconnect: false,
returnIsMasterResults: true,
slaveOk: true,
poolSize: 1,
socketOptions: { connectTimeoutMS: self._connectTimeoutMS },
ssl: self.ssl,
ssl_validate: self.ssl_validate,
ssl_ca: self.ssl_ca,
ssl_cert: self.ssl_cert,
ssl_key: self.ssl_key,
ssl_pass: self.ssl_pass
});

// Connect using the new _server connection to not impact the driver
// behavior on any errors we could possibly run into
_server.connect(db, function(err, result, _server) {
if(err) {
if(_server.close) _server.close();
return check();
}

// Create is master command
var cmd = DbCommand.createIsMasterCommand(db);
// Execute is master command
db._executeQueryCommand(cmd, {failFast:true, connection: _server.checkoutWriter()}, function(err, res) {
// Close the connection used
_server.close();
// If error let's set perform another check
if(err) return check();
// Validate the replicaset
self._validateReplicaset(res, db.auths, function() {
check();
});
// We have an instance already
if(self._haServer == null) {
// Create a new master connection
self._haServer = new Server(selectedServer.host, selectedServer.port, {
auto_reconnect: false,
returnIsMasterResults: true,
slaveOk: true,
poolSize: 1,
socketOptions: { connectTimeoutMS: self._connectTimeoutMS },
ssl: self.ssl,
ssl_validate: self.ssl_validate,
ssl_ca: self.ssl_ca,
ssl_cert: self.ssl_cert,
ssl_key: self.ssl_key,
ssl_pass: self.ssl_pass
});
});

// Add close handler
self.on("close", function() {
self._haServer = null;
});

// Connect using the new _server connection to not impact the driver
// behavior on any errors we could possibly run into
self._haServer.connect(db, function(err, result, _server) {
if(err) {
if(_server.close) _server.close();
return check();
}

executeMasterCommand(db, _server);
});
} else {
executeMasterCommand(db, self._haServer);
}
}
}

function check () {
function executeMasterCommand(db, _server) {
try {
// Create is master command
var cmd = DbCommand.createIsMasterCommand(db);
// Execute is master command
db._executeQueryCommand(cmd, {failFast:true, connection: _server.checkoutReader()}, function(err, res) {
// If error let's set perform another check
if(err) return check();
// Validate the replicaset
self._validateReplicaset(res, db.auths, function() {
check();
});
});
} catch(err) {
self._haServer = null;
check();
}
}

function check() {
self._haTimer = setTimeout(ping, self.replicasetStatusCheckInterval);
}
}
Expand Down

0 comments on commit 63e1db7

Please sign in to comment.