Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Fixes to MongoS HA and ping strategy #927

  • Loading branch information...
commit 10437b1a4c04bd2fadd55e0c0e4222fcfedea8ad 1 parent 4204696
@christkv christkv authored
View
299 lib/mongodb/connection/mongos.js
@@ -1,5 +1,6 @@
var ReadPreference = require('./read_preference').ReadPreference
, Base = require('./base').Base
+ , Server = require('./server').Server
, inherits = require('util').inherits;
/**
@@ -32,8 +33,9 @@ var Mongos = function Mongos(servers, options) {
this.socketOptions = this.options.socketOptions != null ? this.options.socketOptions : {};
// Enabled ha
this.haEnabled = this.options['ha'] == null ? true : this.options['ha'];
+ this._haInProgress = false;
// How often are we checking for new servers in the replicaset
- this.mongosStatusCheckInterval = this.options['haInterval'] == null ? 2000 : this.options['haInterval'];
+ this.mongosStatusCheckInterval = this.options['haInterval'] == null ? 1000 : this.options['haInterval'];
// Save all the server connections
this.servers = servers;
// Servers we need to attempt reconnect with
@@ -58,6 +60,7 @@ var Mongos = function Mongos(servers, options) {
var keys = Object.keys(this.socketOptions);
for(var k = 0; k < keys.length;k++) socketOptions[keys[i]] = this.socketOptions[keys[i]];
}
+
// Set socket options
server.socketOptions = socketOptions;
}
@@ -89,10 +92,7 @@ Mongos.prototype.connect = function(db, options, callback) {
// Set server state to connecting
this._serverState = 'connecting';
// Number of total servers that need to initialized (known servers)
- this._numberOfServersLeftToInitialize = this.servers.length;
- // Default to the first proxy server as the first one to use
- this._currentMongos = this.servers[0];
-
+ this._numberOfServersLeftToInitialize = this.servers.length;
// Connect handler
var connectHandler = function(_server) {
return function(err, result) {
@@ -102,7 +102,8 @@ Mongos.prototype.connect = function(db, options, callback) {
// Start ha function if it exists
if(self.haEnabled) {
// Setup the ha process
- self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
+ if(self._replicasetTimeoutId != null) clearInterval(self._replicasetTimeoutId);
+ self._replicasetTimeoutId = setInterval(self.mongosCheckFunction, self.mongosStatusCheckInterval);
}
// Set the mongos to connected
@@ -118,123 +119,197 @@ Mongos.prototype.connect = function(db, options, callback) {
// Error handler
var errorOrCloseHandler = function(_server) {
return function(err, result) {
- // Create current mongos comparision
- var currentUrl = self._currentMongos.host + ":" + self._currentMongos.port;
- var serverUrl = this.host + ":" + this.port;
- // We need to check if the server that closed is the actual current proxy we are using, otherwise
- // just ignore
- if(currentUrl == serverUrl) {
- // Remove the server from the list
- if(self.servers.indexOf(_server) != -1) {
- self.servers.splice(self.servers.indexOf(_server), 1);
- }
-
- // Pick the next one on the list if there is one
- for(var i = 0; i < self.servers.length; i++) {
- // Grab the server out of the array (making sure there is no servers in the list if none available)
- var server = self.servers[i];
- // Generate url for comparision
- var serverUrl = server.host + ":" + server.port;
- // It's not the current one and connected set it as the current db
- if(currentUrl != serverUrl && server.isConnected()) {
- self._currentMongos = server;
- break;
- }
+ var validServers = [];
+ // Execute all the callbacks with errors
+ self.__executeAllCallbacksWithError(err);
+
+ // Save the down server
+ self.downServers.push(_server);
+
+ // Remove the current server from the list
+ for(var i = 0; i < self.servers.length; i++) {
+ if(!(self.servers[i].host == _server.host && self.servers[i].port == _server.port)) {
+ validServers.push(self.servers[i]);
}
}
- // Ensure we don't store the _server twice
- if(self.downServers.indexOf(_server) == -1) {
- // Add the server instances
- self.downServers.push(_server);
- }
+ // Set current list of servers
+ self.servers = validServers;
}
}
// Mongo function
this.mongosCheckFunction = function() {
- // If we have down servers let's attempt a reconnect
+ if(self._haInProgress) return;
+ // Set as not waiting for check event
+ self._haInProgress = true;
+ // Check downed servers
if(self.downServers.length > 0) {
var numberOfServersLeft = self.downServers.length;
- // Attempt to reconnect
+
+ // Iterate over all the downed servers
for(var i = 0; i < self.downServers.length; i++) {
+ // Pop a downed server
var downServer = self.downServers.pop();
-
- // Configuration
+
+ // Set up the connection options for a Mongos
var options = {
+ auto_reconnect: false,
+ returnIsMasterResults: true,
slaveOk: true,
- poolSize: 1,
- socketOptions: { connectTimeoutMS: self._connectTimeoutMS },
- returnIsMasterResults: true
- }
-
- // Attemp to reconnect
- downServer.connect(self.db, options, function(_server) {
- // Return a function to check for the values
- return function(err, result) {
- // Adjust the number of servers left
- numberOfServersLeft = numberOfServersLeft - 1;
-
- if(err != null) {
- self.downServers.push(_server);
- } else {
- // Add server event handlers
- _server.on("close", errorOrCloseHandler(_server));
- _server.on("error", errorOrCloseHandler(_server));
- // Add to list of servers
- self.servers.push(_server);
- }
-
- if(numberOfServersLeft <= 0) {
- // Perfom another ha
- self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
- }
+ poolSize: downServer.poolSize,
+ socketOptions: {
+ connectTimeoutMS: self._connectTimeoutMS,
+ socketTimeoutMS: self._socketTimeoutMS
+ }
+ }
+
+ // Create a new server object
+ var newServer = new Server(downServer.host, downServer.port, options);
+ // Setup the connection function
+ var connectFunction = function(_db, _server, _options) {
+ return function() {
+ // Attempt to connect
+ _server.connect(_db, _options, function(err, result) {
+ numberOfServersLeft = numberOfServersLeft - 1;
+
+ if(err) {
+ self.downServers.push(_server);
+ } else {
+ // Set the new server settings
+ _server._callBackStore = self._callBackStore;
+
+ // Add server event handlers
+ _server.on("close", errorOrCloseHandler(_server));
+ _server.on("timeout", errorOrCloseHandler(_server));
+ _server.on("error", errorOrCloseHandler(_server));
+
+ // Add the server to the list of available servers
+ self.servers.push(_server);
+
+ // Get a read connection
+ var _connection = _server.checkoutReader();
+ // Get the start time
+ var startTime = new Date().getTime();
+ // Execute ping command to mark each server with the expected times
+ self.db.command({ping:1}
+ , {failFast:true, connection:_connection}, function(err, result) {
+ // Get the start time
+ var endTime = new Date().getTime();
+ // Mark the server with the ping time
+ _server.runtimeStats['pingMs'] = endTime - startTime;
+ // Sort the servers on runtime so the first server always is the closest one
+ self.servers.sort(function(a, b) {
+ return a.runtimeStats['pingMs'] > b.runtimeStats['pingMs'];
+ });
+ });
+ }
+
+ if(numberOfServersLeft == 0) {
+ self._haInProgress = false;
+ }
+ });
}
- }(downServer));
- }
- } else if(self.servers.length > 0) {
- var numberOfServersLeft = self.servers.length;
- var _s = new Date().getTime()
+ }
- // Else let's perform a ping command
- for(var i = 0; i < self.servers.length; i++) {
- var executePing = function(_server) {
- // Get a read connection
- var _connection = _server.checkoutReader();
- // Execute ping command
- self.db.command({ping:1}, {failFast:true, connection:_connection}, function(err, result) {
- var pingTime = new Date().getTime() - _s;
- // If no server set set the first one, otherwise check
- // the lowest ping time and assign the server if it's got a lower ping time
- if(self.lowestPingTimeServer == null) {
- self.lowestPingTimeServer = _server;
- self.lowestPingTime = pingTime;
- self._currentMongos = _server;
- } else if(self.lowestPingTime > pingTime
- && (_server.host != self.lowestPingTimeServer.host || _server.port != self.lowestPingTimeServer.port)) {
- self.lowestPingTimeServer = _server;
- self.lowestPingTime = pingTime;
- self._currentMongos = _server;
- }
-
- // Number of servers left
- numberOfServersLeft = numberOfServersLeft - 1;
- // All active mongos's pinged
- if(numberOfServersLeft == 0) {
- // Perfom another ha
- self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
- }
- })
- }
-
- // Execute the function
- executePing(self.servers[i]);
+ // Attempt to connect to the database
+ connectFunction(self.db, newServer, options)();
}
} else {
- self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
+ self._haInProgress = false;
}
}
+
+ // console.log("========================================= mongos check function")
+ // // If we have down servers let's attempt a reconnect
+ // if(self.downServers.length > 0) {
+ // var numberOfServersLeft = self.downServers.length;
+ // // Attempt to reconnect
+ // for(var i = 0; i < self.downServers.length; i++) {
+ // var downServer = self.downServers.pop();
+
+ // // Configuration
+ // var options = {
+ // slaveOk: true,
+ // poolSize: 1,
+ // socketOptions: { connectTimeoutMS: self._connectTimeoutMS },
+ // returnIsMasterResults: true
+ // }
+
+ // // Attemp to reconnect
+ // downServer.connect(self.db, options, function(_server) {
+ // // Return a function to check for the values
+ // return function(err, result) {
+ // console.log("========================================= mongos check function 0")
+ // // Adjust the number of servers left
+ // numberOfServersLeft = numberOfServersLeft - 1;
+
+ // if(err != null) {
+ // console.log("========================================= mongos check function 1")
+ // self.downServers.push(_server);
+ // } else {
+ // console.log("========================================= mongos check function 2")
+ // // Add server event handlers
+ // _server.on("close", errorOrCloseHandler(_server));
+ // _server.on("error", errorOrCloseHandler(_server));
+ // _server.on("timeout", errorOrCloseHandler(_server));
+ // // Add to list of servers
+ // self.servers.push(_server);
+ // }
+
+ // if(numberOfServersLeft <= 0) {
+ // // Perfom another ha
+ // self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
+ // }
+ // }
+ // }(downServer));
+ // }
+ // } else if(self.servers.length > 0) {
+ // var numberOfServersLeft = self.servers.length;
+ // var _s = new Date().getTime()
+
+ // // Else let's perform a ping command
+ // for(var i = 0; i < self.servers.length; i++) {
+ // var executePing = function(_server) {
+ // // Get a read connection
+ // var _connection = _server.checkoutReader();
+ // // Execute ping command
+ // self.db.command({ping:1}, {failFast:true, connection:_connection}, function(err, result) {
+ // var pingTime = new Date().getTime() - _s;
+ // // If no server set set the first one, otherwise check
+ // // the lowest ping time and assign the server if it's got a lower ping time
+ // if(self.lowestPingTimeServer == null) {
+ // self.lowestPingTimeServer = _server;
+ // self.lowestPingTime = pingTime;
+ // console.log("=========================================== ping 0")
+ // self._currentMongos = _server;
+ // } else if(self.lowestPingTime > pingTime
+ // && (_server.host != self.lowestPingTimeServer.host || _server.port != self.lowestPingTimeServer.port)) {
+ // self.lowestPingTimeServer = _server;
+ // self.lowestPingTime = pingTime;
+ // console.log("=========================================== ping 1")
+ // self._currentMongos = _server;
+ // }
+
+ // // Number of servers left
+ // numberOfServersLeft = numberOfServersLeft - 1;
+ // // All active mongos's pinged
+ // if(numberOfServersLeft == 0) {
+ // // Perfom another ha
+ // self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
+ // }
+ // })
+ // }
+
+ // // Execute the function
+ // executePing(self.servers[i]);
+ // }
+ // } else {
+ // self._replicasetTimeoutId = setTimeout(self.mongosCheckFunction, self.mongosStatusCheckInterval);
+ // }
+ // }
+
// Connect all the server instances
for(var i = 0; i < this.servers.length; i++) {
// Get the connection
@@ -244,10 +319,11 @@ Mongos.prototype.connect = function(db, options, callback) {
server.on("close", errorOrCloseHandler(server));
server.on("timeout", errorOrCloseHandler(server));
server.on("error", errorOrCloseHandler(server));
+
// Configuration
var options = {
slaveOk: true,
- poolSize: 1,
+ poolSize: server.poolSize,
socketOptions: { connectTimeoutMS: self._connectTimeoutMS },
returnIsMasterResults: true
}
@@ -297,11 +373,8 @@ Mongos.prototype.isConnected = function() {
* @ignore
*/
Mongos.prototype.checkoutWriter = function() {
- // No current mongo, just pick first server
- if(this._currentMongos == null && this.servers.length > 0) {
- return this.servers[0].checkoutWriter();
- }
- return this._currentMongos.checkoutWriter();
+ if(this.servers.length == 0) return null;
+ return this.servers[0].checkoutWriter();
}
/**
@@ -316,11 +389,8 @@ Mongos.prototype.checkoutReader = function(read) {
throw new Error("Illegal readPreference mode specified, " + read);
}
- // No current mongo, just pick first server
- if(this._currentMongos == null && this.servers.length > 0) {
- return this.servers[0].checkoutReader();
- }
- return this._currentMongos.checkoutReader();
+ if(this.servers.length == 0) return null;
+ return this.servers[0].checkoutWriter();
}
/**
@@ -333,7 +403,8 @@ Mongos.prototype.close = function(callback) {
// Number of connections to close
var numberOfConnectionsToClose = self.servers.length;
// If we have a ha process running kill it
- if(self._replicasetTimeoutId != null) clearTimeout(self._replicasetTimeoutId);
+ if(self._replicasetTimeoutId != null) clearInterval(self._replicasetTimeoutId);
+ self._replicasetTimeoutId = null;
// Close all proxy connections
for(var i = 0; i < self.servers.length; i++) {
self.servers[i].close(function(err, result) {
View
3  lib/mongodb/connection/server.js
@@ -380,6 +380,9 @@ Server.prototype.connect = function(dbInstance, options, callback) {
// Callback instance
var callbackInfo = server._findHandler(mongoReply.responseTo.toString());
+ // console.log("====================== received message")
+ // console.dir(callbackInfo)
+
// The command executed another request, log the handler again under that request id
if(mongoReply.requestId > 0 && mongoReply.cursorId.toString() != "0"
&& callbackInfo && callbackInfo.info && callbackInfo.info.exhaust) {
View
1  lib/mongodb/mongo_client.js
@@ -1,5 +1,6 @@
var Db = require('./db').Db
, Server = require('./connection/server').Server
+ , Mongos = require('./connection/mongos').Mongos
, ReplSet = require('./connection/repl_set').ReplSet
, ReadPreference = require('./connection/read_preference').ReadPreference
, parse = require('./connection/url_parser').parse;
View
52 test/tests/sharded/ha_tests.js
@@ -32,7 +32,7 @@ exports['Should correctly connect and then handle a mongos failure'] = function(
});
}
- var killport = db.serverConfig._currentMongos.port;
+ var killport = db.serverConfig.servers[0].port;
// Kill the mongos proxy
configuration.killMongoS(killport, function(err, result) {
@@ -42,56 +42,6 @@ exports['Should correctly connect and then handle a mongos failure'] = function(
});
}
-// /**
-// * @ignore
-// */
-// exports.shouldCorrectlyPerformAllOperationsAgainstShardedSystem = function(configuration, test) {
-// var Mongos = configuration.getMongoPackage().Mongos
-// , MongoClient = configuration.getMongoPackage().MongoClient
-// , Server = configuration.getMongoPackage().Server
-// , Db = configuration.getMongoPackage().Db
-// , ReadPreference = configuration.getMongoPackage().ReadPreference;
-
-// return test.done();
-
-// // Set up mongos connection
-// var mongos = new Mongos([
-// new Server("localhost", 50000, { auto_reconnect: true })
-// ])
-
-// // Set up a bunch of documents
-// var docs = [];
-// for(var i = 0; i < 1000; i++) {
-// docs.push({a:i, data:new Buffer(1024)});
-// }
-
-// // Connect using the mongos connections
-// var db = new Db('integration_test_', mongos, {w:0});
-// db.open(function(err, db) {
-// test.equal(null, err);
-// test.ok(db != null);
-
-// var collection = db.collection("shard_all_operations_test");
-// collection.insert(docs, {safe:{w:1, wtimeout:1000}}, function(err, result) {
-// test.equal(null, err);
-
-// configuration.killShard(function() {
-
-// collection.find({}, {partial:true}).toArray(function(err, items) {
-// // test.equal(null, err);
-// // test.ok(items.length > 0)
-// console.log("-------------------------------------------------------------")
-// console.dir(err)
-// console.dir(items)
-
-// db.close();
-// test.done();
-// });
-// });
-// });
-// });
-// }
-
/**
* @ignore
*/
View
66 test/tests/sharded/operations_tests.js
@@ -11,9 +11,6 @@ exports.shouldCorrectlyPerformAllOperationsAgainstShardedSystem = function(confi
new Server("localhost", 50000, { auto_reconnect: true })
])
- // var mongos = new Server("localhost", 27017, { auto_reconnect: true })
- // var mongos = new Server("localhost", 50000, { auto_reconnect: true, poolSize:1 });
-
// Set up a bunch of documents
var docs = [];
for(var i = 0; i < 1000; i++) {
@@ -23,18 +20,15 @@ exports.shouldCorrectlyPerformAllOperationsAgainstShardedSystem = function(confi
// Connect using the mongos connections
var db = new Db('integration_test_', mongos, {w:0});
db.open(function(err, db) {
- // console.log("================================================ 0")
test.equal(null, err);
test.ok(db != null);
var collection = db.collection("shard_all_operations_test");
collection.insert(docs, {safe:{w:1, wtimeout:1000}}, function(err, result) {
- // console.log("================================================ 1")
test.equal(null, err);
// Perform an update
collection.update({a:0}, {$set: {c:1}}, {w:1}, function(err, result) {
- // console.log("================================================ 2")
test.equal(null, err);
var numberOfRecords = 0;
@@ -44,11 +38,9 @@ exports.shouldCorrectlyPerformAllOperationsAgainstShardedSystem = function(confi
if(item == null) {
test.equal(1000, numberOfRecords);
- // console.log("================================================ 3")
// Perform a find and each
collection.find().toArray(function(err, items) {
- // console.log("================================================ 4")
if(err) console.dir(err)
test.equal(1000, items.length);
@@ -62,4 +54,62 @@ exports.shouldCorrectlyPerformAllOperationsAgainstShardedSystem = function(confi
});
});
});
+}
+
+/**
+ * @ignore
+ */
+exports.shouldCorrectlyHandleSwitchOver = function(configuration, test) {
+ var Mongos = configuration.getMongoPackage().Mongos
+ , Server = configuration.getMongoPackage().Server
+ , Db = configuration.getMongoPackage().Db;
+
+ // Set up mongos connection
+ var mongos = new Mongos([
+ new Server("localhost", 50000, { auto_reconnect: true, socketOptions: {connectTimeoutMS: 3000, socketTimeoutMS: 3000, keepAlive:100}})
+ , new Server("localhost", 50001, { auto_reconnect: true, socketOptions: {connectTimeoutMS: 3000, socketTimeoutMS: 3000, keepAlive:100}})
+ ], {socketOptions: {connectTimeoutMS: 3000, socketTimeoutMS: 3000, keepAlive:100}});
+
+ // Var number of iterations
+ var iterations = 0;
+ var mongosUp = false;
+
+ // Connect using the mongos connections
+ var db = new Db('integration_test_', mongos, {w:0});
+ db.open(function(err, db) {
+ var intervalId = setInterval(function() {
+ db.collection('some_collection').update({a:1}, {$inc: {counter:1}}, {w:1, upsert:true}, function(err, result) {
+ iterations = iterations + 1;
+
+ if(iterations == 4) {
+ configuration.killMongoS(50000, function() {});
+ } else if(mongosUp) {
+ var connection = db.serverConfig.checkoutWriter();
+ if(connection != null && connection.socketOptions) {
+
+ // Check that we have the right fail over connection
+ if(connection.socketOptions.port == 50000) {
+ clearInterval(intervalId);
+ db.close();
+ test.done();
+ }
+ }
+ } else if(iterations > 4) {
+ var connection = db.serverConfig.checkoutWriter();
+ if(connection != null && connection.socketOptions) {
+
+ // Check that we have the right fail over connection
+ if(connection.socketOptions.port == 50001) {
+ mongosUp = true;
+ // Restart the 50000 server
+ configuration.restartMongoS(50000, function(err, result) {
+ configuration.killMongoS(50001, function(err, result) {
+ })
+ });
+ }
+ }
+ }
+ });
+ }, 1000);
+ });
}
Please sign in to comment.
Something went wrong with that request. Please try again.