Skip to content

Commit

Permalink
Fixes to test running, fixes to connection handling on dieing connnec…
Browse files Browse the repository at this point in the history
…tions
  • Loading branch information
christkv committed Nov 23, 2011
1 parent 2571244 commit 9b59bbc
Show file tree
Hide file tree
Showing 11 changed files with 52 additions and 62 deletions.
2 changes: 2 additions & 0 deletions HISTORY
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
* keepAlive = Set if keepAlive is used (default 0, which means no keepAlive, set higher than 0 for keepAlive)
* encoding = ['ascii', 'utf8', or 'base64'] (default null)
* Fixes for handling of errors during shutdown off a socket connection
* Correctly applies socket options including timeout
* Cleanup of test management code to close connections correctly

0.9.7 2011-11-10
* Added priority setting to replicaset manager
Expand Down
26 changes: 13 additions & 13 deletions lib/mongodb/connection/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,17 @@ Connection.prototype.start = function() {
// Create new connection instance
// this.connection = net.createConnection(this.socketOptions.port, this.socketOptions.host);
this.connection = new net.Socket();
// Set options on the socket
this.connection.setTimeout(this.socketOptions.timeout);
this.connection.setNoDelay(this.socketOptions.noDelay);
// Set keep alive if defined
if(this.socketOptions.keepAlive > 0) {
this.connection.setKeepAlive(true, this.socketOptions.keepAlive);
} else {
this.connection.setKeepAlive(false);
}

// Add handlers
this.connection.on("error", errorHandler(this));
// Add all handlers to the socket to manage it
this.connection.on("connect", connectHandler(this));
Expand Down Expand Up @@ -108,18 +119,7 @@ var resetHandlers = function(self, clearListeners) {

// Connect handler
var connectHandler = function(self) {
return function() {
// Set options on the socket
// this.setEncoding(self.socketOptions.encoding);
this.setTimeout(self.socketOptions.timeout);
this.setNoDelay(self.socketOptions.noDelay);
// Set keep alive if defined
if(self.socketOptions.keepAlive > 0) {
this.setKeepAlive(true, self.socketOptions.keepAlive);
} else {
this.setKeepAlive(false);
}

return function() {
// Set connected
self.connected = true;
// Emit the connect event with no error
Expand Down Expand Up @@ -288,7 +288,7 @@ var endHandler = function(self) {
}
}

var timeoutHandler = function(self) {
var timeoutHandler = function(self) {
return function() {
self.emit("error", {err: 'connection to [' + self.socketOptions.host + ':' + self.socketOptions.port + '] timed out'}, self);
}
Expand Down
2 changes: 1 addition & 1 deletion lib/mongodb/connection/connection_pool.js
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ var connectHandler = function(self) {
if(Object.keys(self.waitingToOpen).length == 0 && performedOperation) {
// If we have any errors notify the application, only fire if we don't have the element already in
// errors
if(Object.keys(self.connectionsWithErrors).length > 0 && fireError) {
if(Object.keys(self.connectionsWithErrors).length > 0 && Object.keys(self.openConnections).length == 0 && fireError) {
// Set pool type to disconnected
self._poolState = 'disconnected';
// Emit error
Expand Down
2 changes: 2 additions & 0 deletions lib/mongodb/connection/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,8 @@ Server.prototype.connect = function(dbInstance, options, callback) {
if(server._serverState === 'connecting' && firstCall) {
// Set server state to connected
server._serverState = 'disconnected';
// Shut down the pool
connectionPool.stop();
// Only do a callback if we have a valid callback function, on retries this might not be true
if(typeof callback === 'function') callback(new Error(message && message.err ? message.err : message));
} else {
Expand Down
2 changes: 2 additions & 0 deletions test/insert_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ var ISODate = function (string) {
var tests = testCase({
setUp: function(callback) {
client.open(function(err, db_p) {
if(err != null) throw err;

if(numberOfTestsRun == Object.keys(tests).length) {
// If first test drop the db
client.dropDatabase(function(err, done) {
Expand Down
4 changes: 2 additions & 2 deletions test/replicaset/connect_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ module.exports = testCase({
},

tearDown: function(callback) {
RS.restartKilledNodes(function(err, result) {
// RS.restartKilledNodes(function(err, result) {
callback();
});
// });
},

shouldCorrectlyConnectWithDefaultReplicaset : function(test) {
Expand Down
8 changes: 3 additions & 5 deletions test/replicaset/count_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ var ensureConnection = function(test, numberOfTries, callback) {

// Open the db
db.open(function(err, p_db) {
db.close();
if(err != null) {
db.close();
// Wait for a sec and retry
setTimeout(function() {
numberOfTries = numberOfTries - 1;
Expand Down Expand Up @@ -70,9 +70,9 @@ module.exports = testCase({
},

tearDown: function(callback) {
RS.restartKilledNodes(function(err, result) {
// RS.restartKilledNodes(function(err, result) {
callback();
});
// });
},

shouldRetrieveCorrectCountAfterInsertionReconnect : function(test) {
Expand Down Expand Up @@ -119,9 +119,7 @@ module.exports = testCase({
// Do inserts
ensureConnection(test, retries, function(err, p_db) {
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));

test.ok(err == null);
test.equal(true, p_db.serverConfig.isConnected());

p_db.collection('testsets', function(err, collection) {
if(err != null) debug("shouldRetrieveCorrectCountAfterInsertionReconnect :: " + inspect(err));
Expand Down
6 changes: 3 additions & 3 deletions test/replicaset/insert_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ module.exports = testCase({
},

tearDown: function(callback) {
RS.restartKilledNodes(function(err, result) {
// RS.restartKilledNodes(function(err, result) {
callback();
});
// });
},

shouldCorrectlyWaitForReplicationToServersOnInserts : function(test) {
Expand Down Expand Up @@ -232,7 +232,7 @@ module.exports = testCase({
setTimeout(function() {
// console.log("--------------------------------------------------------------------- -1")
// Kill the primary
RS.killPrimary(2, {killNodeWaitTime:10}, function(node) {
RS.killPrimary(2, {killNodeWaitTime:1}, function(node) {
// console.log("--------------------------------------------------------------------- 0")
// Attempt insert (should fail)
collection.insert({a:30}, {safe: {w:2, wtimeout: 10000}}, function(err, r) {
Expand Down
35 changes: 9 additions & 26 deletions test/replicaset/query_secondaries_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ var RS = RS == null ? null : RS;
var ensureConnection = function(test, numberOfTries, callback) {
// Replica configuration
var replSet = new ReplSetServers( [
// new Server( RS.host, RS.ports[1], { auto_reconnect: true } ),
new Server( RS.host, RS.ports[1], { auto_reconnect: true } ),
new Server( RS.host, RS.ports[0], { auto_reconnect: true } ),
// new Server( RS.host, RS.ports[2], { auto_reconnect: true } )
new Server( RS.host, RS.ports[2], { auto_reconnect: true } )
],
{rs_name:RS.name}
);
Expand All @@ -38,8 +38,8 @@ var ensureConnection = function(test, numberOfTries, callback) {

// Open the db
db.open(function(err, p_db) {
db.close();
if(err != null) {
db.close();
// Wait for a sec and retry
setTimeout(function() {
numberOfTries = numberOfTries - 1;
Expand Down Expand Up @@ -71,10 +71,10 @@ module.exports = testCase({
},

tearDown: function(callback) {
RS.restartKilledNodes(function(err, result) {
if(err != null) throw err;
// RS.restartKilledNodes(function(err, result) {
// if(err != null) throw err;
callback();
})
// })
},

shouldReadPrimary : function(test) {
Expand All @@ -93,9 +93,9 @@ module.exports = testCase({
// Drop collection on replicaset
p_db.dropCollection('testsets', function(err, r) {
if(err != null) debug("shouldReadPrimary :: " + inspect(err));

test.equal(false, p_db.serverConfig.isReadPrimary());
test.equal(false, p_db.serverConfig.isPrimary());
p_db.close();
test.done();
});
})
Expand All @@ -122,6 +122,7 @@ module.exports = testCase({
test.ok(p_db.serverConfig.primary != null);
test.ok(p_db.serverConfig.read != null);
test.ok(p_db.serverConfig.primary.port != p_db.serverConfig.read.port);
p_db.close();
test.done();
});
})
Expand Down Expand Up @@ -156,6 +157,7 @@ module.exports = testCase({
collection.find().toArray(function(err, items) {
test.equal(null, err);
test.equal(3, items.length);
p_db.close();
test.done();
});
});
Expand All @@ -172,25 +174,6 @@ module.exports = testCase({
}
})

var retryEnsure = function(numberOfRetries, execute, callback) {
execute(function(done) {
if(done) {
return callback(null, null);
} else {
numberOfRetries = numberOfRetries - 1;

if(numberOfRetries <= 0) {
return callback(new Error("Failed to execute command"), null);
} else {
setTimeout(function() {
retryEnsure(numberOfRetries, execute, callback);
}, 1000);
}
}
});
}





Expand Down
18 changes: 10 additions & 8 deletions test/replicaset/tags_test.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ module.exports = testCase({
},

tearDown: function(callback) {
RS.restartKilledNodes(function(err, result) {
// RS.restartKilledNodes(function(err, result) {
callback();
});
// });
},

'Should Correctly Connect With Default Replicaset And Insert Document For Tag Dc:NY' : function(test) {
Expand All @@ -98,9 +98,9 @@ module.exports = testCase({

// Do a read for the value
collection.findOne({a:20}, function(err, item) {
p_db.close();
test.equal(20, item.a);
test.done();
p_db.close();
})
});
});
Expand Down Expand Up @@ -133,8 +133,8 @@ module.exports = testCase({
var primaryAddress = replSet._state.master.host + ":" + replSet._state.master.port;
test.equal(primaryAddress, readerAddress);
// End test and close db
test.done();
p_db.close();
test.done();
})
},

Expand Down Expand Up @@ -163,8 +163,8 @@ module.exports = testCase({
// Check that it's in the list of primary servers
test.ok(replSet._state.secondaries[readerAddress] != null);
// End test and close db
test.done();
p_db.close();
test.done();
})
},

Expand Down Expand Up @@ -194,8 +194,8 @@ module.exports = testCase({
test.equal(1, replSet._state.byTags['dc1']['ny'].length);
test.equal(1, replSet._state.byTags['dc2']['sf'].length);
// End test and close db
test.done();
p_db.close();
test.done();
})
},

Expand All @@ -220,8 +220,8 @@ module.exports = testCase({
// Locate server instance associated with this id
var serverInstance = replSet._state.addresses[readerAddress];
test.deepEqual({ dc2: 'sf' }, serverInstance.tags)
test.done();
p_db.close();
test.done();
})
},

Expand Down Expand Up @@ -250,8 +250,8 @@ module.exports = testCase({
test.ok(server.queryStats.standardDeviation >= 0);
}

test.done();
p_db.close();
test.done();
}, 5000)
})
},
Expand Down Expand Up @@ -281,6 +281,7 @@ module.exports = testCase({
collection.find().toArray(function(err, items) {
test.equal(null, err);
test.equal(4, items.length);
p_db.close();
test.done();
});
});
Expand Down Expand Up @@ -329,6 +330,7 @@ module.exports = testCase({
totalNumberOfStrategyEntries += server.queryStats.numDataValues;
}

p_db.close();
test.equal(4, totalNumberOfStrategyEntries);
test.done();
});
Expand Down
9 changes: 5 additions & 4 deletions test/replicaset/two_server_tests.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ var ensureConnection = function(test, numberOfTries, callback) {

var db = new Db('integration_test_', replSet);
db.open(function(err, p_db) {
db.close();
if(err != null) {
db.close();
// Wait for a sec and retry
setTimeout(function() {
numberOfTries = numberOfTries - 1;
Expand Down Expand Up @@ -66,10 +66,10 @@ module.exports = testCase({
},

tearDown: function(callback) {
RS.restartKilledNodes(function(err, result) {
if(err != null) throw err;
// RS.restartKilledNodes(function(err, result) {
// if(err != null) throw err;
callback();
})
// })
},

shouldCorrectlyExecuteSafeFindAndModify : function(test) {
Expand Down Expand Up @@ -100,6 +100,7 @@ module.exports = testCase({
collection.findAndModify({'a':20}, [['a', 1]], {'$set':{'b':3}}, {'new':true, safe: {w:7, wtimeout: 10000}}, function(err, updated_doc) {
test.equal('timeout', err.err)
test.equal(true, err.wtimeout)
p_db.close();
test.done();
});
});
Expand Down

0 comments on commit 9b59bbc

Please sign in to comment.