Skip to content

Commit

Permalink
Fixes to stop random shutdowns
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Feb 2, 2012
1 parent 1c5a983 commit a665f15
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 48 deletions.
134 changes: 87 additions & 47 deletions lib/mongodb/connection/repl_set_servers.js
Expand Up @@ -6,6 +6,7 @@ var Connection = require('./connection').Connection,
inherits = require('util').inherits,
inspect = require('util').inspect,
Server = require('./server').Server,
format = require('util').format,
PingStrategy = require('./strategies/ping_strategy').PingStrategy,
StatisticsStrategy = require('./strategies/statistics_strategy').StatisticsStrategy;

Expand Down Expand Up @@ -246,6 +247,7 @@ ReplSetServers.prototype.isReconnecting = function() {
}

ReplSetServers.prototype.isConnected = function() {
console.log("%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% isConnected = " + (this.primary != null && this._state.master != null && this._state.master.isConnected() && !this.replicaSetChanged))
// Return the state of the replicaset server
return this.primary != null && this._state.master != null && this._state.master.isConnected() && !this.replicaSetChanged;
}
Expand Down Expand Up @@ -348,6 +350,7 @@ var __executeAllCallbacksWithError = function(dbInstance, error) {
}

ReplSetServers.prototype.connect = function(parent, options, callback) {
// console.log("$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$$ ReplSetServers.connect")
var self = this;
var dateStamp = new Date().getTime();
if('function' === typeof options) callback = options, options = {};
Expand Down Expand Up @@ -375,38 +378,43 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
self.db.admin().command({replSetGetStatus:1}, {connection:connection}, function(err, result) {
// If no error let's look at the members
if(err == null) {
console.log("---------------------------------------------- CHANGE 0")
// For all members check if we have the same ones in the current set
var members = result.documents[0].members;
// Peform a set of check to see if the replicaset has changed
if(self._state.addresses != null && Object.keys(self._state.addresses).length != members.length) {
console.log("---------------------------------------------- CHANGE 0:1")
// We have changed update replicaset object to restart connections
self.replicaSetChanged = true;
} else {
// We need to dig a bit deeper and check if the roles of different servers have changed
// Located members must have the same setup (being secondary etc) for it to be equivalent
var numberOfFoundMembersWithSameSetup = 0;

// Check for the member to see if it's anywhere
for(var i = 0; i < members.length; i++) {
// Get member
var member = members[i];
// If it's a primary check that is a primary in the current set aswell
if(member.state == STATE_PRIMARY && self._state.master != null && (self._state.master.host + ":" + self._state.master.port) == member.name) {
numberOfFoundMembersWithSameSetup = numberOfFoundMembersWithSameSetup + 1;
} else if(member.state == STATE_SECONDARY && (self._state.secondaries[member.name] != null || self._state.passives[member.name] != null)) {
numberOfFoundMembersWithSameSetup = numberOfFoundMembersWithSameSetup + 1;
} else if(member.state == STATE_ARBITER && self._state.arbiters[member.name] != null) {
numberOfFoundMembersWithSameSetup = numberOfFoundMembersWithSameSetup + 1;
}
}

if(numberOfFoundMembersWithSameSetup != members.length) {
// We have changed update replicaset object to restart connections
self.replicaSetChanged = true;
} else {
setTimeout(checkStateOfReplicasetFunction, replicasetCheckFrequencey);
}
}

// } else {
// console.log("---------------------------------------------- CHANGE 0:2")
// // We need to dig a bit deeper and check if the roles of different servers have changed
// // Located members must have the same setup (being secondary etc) for it to be equivalent
// var numberOfFoundMembersWithSameSetup = 0;

// // Check for the member to see if it's anywhere
// for(var i = 0; i < members.length; i++) {
// // Get member
// var member = members[i];
// // If it's a primary check that is a primary in the current set aswell
// if(member.state == STATE_PRIMARY && self._state.master != null && (self._state.master.host + ":" + self._state.master.port) == member.name) {
// numberOfFoundMembersWithSameSetup = numberOfFoundMembersWithSameSetup + 1;
// } else if(member.state == STATE_SECONDARY && (self._state.secondaries[member.name] != null || self._state.passives[member.name] != null)) {
// numberOfFoundMembersWithSameSetup = numberOfFoundMembersWithSameSetup + 1;
// } else if(member.state == STATE_ARBITER && self._state.arbiters[member.name] != null) {
// numberOfFoundMembersWithSameSetup = numberOfFoundMembersWithSameSetup + 1;
// }
// }

// if(numberOfFoundMembersWithSameSetup != members.length) {
// // We have changed update replicaset object to restart connections
// self.replicaSetChanged = true;
// } else {
// setTimeout(checkStateOfReplicasetFunction, replicasetCheckFrequencey);
// }
// }
} else {
setTimeout(checkStateOfReplicasetFunction, replicasetCheckFrequencey);
}
Expand Down Expand Up @@ -513,28 +521,47 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {

// Handle an error
var errorHandler = function(err, server) {
// console.log("================================================ ERROR")
// console.log("--------------------------------- " + callback)
parent._state = 'disconnected';
// Shut down the replicaset for now and Fire off all the callbacks sitting with no reply
if(replSetSelf._serverState == 'connected') {
replSetSelf.close(function() {
__executeAllCallbacksWithError(parent, err);
// console.log("--------------------------------- " + callback)
// Ensure single callback only
if(callback != null) {
// Single callback only
var internalCallback = callback;
callback = null;
// Return the error
internalCallback(err, null);
} else {
// If the parent has listeners trigger an event
if(parent.listeners("error").length > 0) {
parent.emit("error", err);
// console.log("================================================ ERROR ReplSetServers")
// console.dir(err)

if(numberOfServersLeftToInitialize > 0) numberOfServersLeftToInitialize = numberOfServersLeftToInitialize - 1;

var closeServers = function() {
// Set the state to disconnected
parent._state = 'disconnected';
// Shut down the replicaset for now and Fire off all the callbacks sitting with no reply
if(replSetSelf._serverState == 'connected') {
replSetSelf.close(function() {
__executeAllCallbacksWithError(parent, err);
// Ensure single callback only
if(callback != null) {
// Single callback only
var internalCallback = callback;
callback = null;
// Return the error
internalCallback(err, null);
} else {
// If the parent has listeners trigger an event
if(parent.listeners("error").length > 0) {
parent.emit("error", err);
}
}
}
});
});
}
}

// Check if this is the primary server, then disconnect otherwise keep going
if(replSetSelf._serverState.master != null) {
// console.dir("---------------------------------------------------------- ReplSetServers::0")
var primaryAddress = format("%s:%s", replSetSelf._serverState.master.host, replSetSelf._serverState.master.port);
var errorServerAddress = format("%s:%s", server.host, server.port);

// Only shut down the set if we have a primary server error
if(primaryAddress == errorServerAddress) {
closeServers();
}
} else {
closeServers();
}
}

Expand Down Expand Up @@ -614,6 +641,8 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {

// Let's go throught all the "possible" servers in the replicaset
var candidateServers = hosts.concat(arbiters).concat(passives);
console.log("*************************************************************************************************8")
console.dir(candidateServers)

// If we have new servers let's add them
for(var i = 0; i < candidateServers.length; i++) {
Expand All @@ -635,6 +664,8 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
for(var i = 0; i < keys.length;i++) socketOptions[keys[i]] = replSetSelf.socketOptions[keys[i]];
}

console.log("---------------------------------- ADDING NEW SERVER " + candidateServerString)

// Add host information to socket options
socketOptions['host'] = parts[0];
socketOptions['port'] = parseInt(parts[1]);
Expand Down Expand Up @@ -666,35 +697,41 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
}
}

// console.log("================================================= numberOfServersLeftToInitialize :: " + numberOfServersLeftToInitialize);
console.log("================================================= numberOfServersLeftToInitialize :: " + numberOfServersLeftToInitialize);

// If done finish up
if((numberOfServersLeftToInitialize == 0) && replSetSelf._serverState === 'connecting' && replSetSelf._state.errorMessages.length == 0) {
console.log("####################################################### connect::0")
// Set db as connected
replSetSelf._serverState = 'connected';
// If we don't expect a master let's call back, otherwise we need a master before
// the connection is successful
if(replSetSelf.masterNotNeeded || replSetSelf._state.master != null) {
console.log("####################################################### connect::1")
// If we have a read strategy boot it
if(replSetSelf.strategyInstance != null) {
console.log("####################################################### connect::1:0")
// Ensure we have a proper replicaset defined
replSetSelf.strategyInstance.replicaset = replSetSelf;
// Start strategy
replSetSelf.strategyInstance.start(function(err) {
console.log("####################################################### connect::1:0:0")
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
// Perform callback
internalCallback(null, parent);
})
} else {
console.log("####################################################### connect::1:1")
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
// Perform callback
internalCallback(null, parent);
}
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length > 0) {
console.log("####################################################### connect::2")
// If we have a read strategy boot it
if(replSetSelf.strategyInstance != null) {
// Ensure we have a proper replicaset defined
Expand All @@ -715,6 +752,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
internalCallback(null, parent);
}
} else if(replSetSelf.readSecondary == true && Object.keys(replSetSelf._state.secondaries).length == 0) {
console.log("####################################################### connect::3")
replSetSelf._serverState = 'disconnected';
// ensure no callbacks get called twice
var internalCallback = callback;
Expand All @@ -724,6 +762,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
// Perform callback
internalCallback(new Error("no secondary server found"), null);
} else if(typeof callback === 'function'){
console.log("####################################################### connect::4")
replSetSelf._serverState = 'disconnected';
// ensure no callbacks get called twice
var internalCallback = callback;
Expand All @@ -734,6 +773,7 @@ ReplSetServers.prototype.connect = function(parent, options, callback) {
internalCallback(new Error("no primary server found"), null);
}
} else if((numberOfServersLeftToInitialize == 0) && replSetSelf._state.errorMessages.length > 0 && replSetSelf._serverState != 'disconnected') {
console.log("####################################################### connect::5")
// Set done
replSetSelf._serverState = 'disconnected';
// ensure no callbacks get called twice
Expand Down
10 changes: 9 additions & 1 deletion lib/mongodb/connection/server.js
Expand Up @@ -180,6 +180,7 @@ Server.prototype.isSetMember = function() {
}

Server.prototype.connect = function(dbInstance, options, callback) {
//console.log("--------------------------------------------- Server:connect")
if('function' === typeof options) callback = options, options = {};
if(options == null) options = {};
if(!('function' === typeof callback)) callback = null;
Expand Down Expand Up @@ -390,27 +391,33 @@ Server.prototype.connect = function(dbInstance, options, callback) {

// Handle errors
connectionPool.on("error", function(message) {
console.log("----------------------------------------- ERROR");
console.dir(message)
// If pool connection is already closed
if(server._serverState === 'disconnected') return;
// Set server state to disconnected
server._serverState = 'disconnected';
// Close the pool
connectionPool.stop();
// If we have a callback return the error
if(typeof callback === 'function') {
if(typeof callback === 'function' && !server.isSetMember()) {
console.log("------------------------- server::error:0")
// ensure no callbacks get called twice
var internalCallback = callback;
callback = null;
// Perform callback
internalCallback(new Error(message && message.err ? message.err : message), null);
} else if(server.isSetMember()) {
console.log("------------------------- server::error:1")
server.emit("error", new Error(message && message.err ? message.err : message), server);
} else {
console.log("------------------------- server::error:2")
eventReceiver.emit("error", new Error(message && message.err ? message.err : message), server);
}

// If we are a single server connection fire errors correctly
if(!server.isSetMember()) {
console.log("------------------------- server::error:3")
// Fire all callback errors
_fireCallbackErrors(server, new Error(message && message.err ? message.err : message));
// Emit error
Expand All @@ -420,6 +427,7 @@ Server.prototype.connect = function(dbInstance, options, callback) {

// Handle close events
connectionPool.on("close", function() {
console.log("----------------------------------------- CLOSE");
// If pool connection is already closed
if(server._serverState === 'disconnected') return;
// Set server state to disconnected
Expand Down

0 comments on commit a665f15

Please sign in to comment.