Skip to content

Commit

Permalink
server state is shared with pool instead of seperate
Browse files Browse the repository at this point in the history
  • Loading branch information
christkv committed Jun 16, 2016
1 parent 412b50d commit a481a42
Show file tree
Hide file tree
Showing 26 changed files with 552 additions and 71 deletions.
52 changes: 52 additions & 0 deletions boot_auth.js
@@ -0,0 +1,52 @@
var ReplSetManager = require('mongodb-topology-manager').ReplSet,
f = require('util').format;

var rsOptions = {
server: {
keyFile: __dirname + '/test/functional/data/keyfile.txt',
auth: null,
replSet: 'rs'
},
client: {
replSet: 'rs'
}
}

// Set up the nodes
var nodes = [{
options: {
bind_ip: 'localhost', port: 31000,
dbpath: f('%s/../db/31000', __dirname),
}
}, {
options: {
bind_ip: 'localhost', port: 31001,
dbpath: f('%s/../db/31001', __dirname),
}
}, {
options: {
bind_ip: 'localhost', port: 31002,
dbpath: f('%s/../db/31002', __dirname),
}
}]

// Merge in any node start up options
for(var i = 0; i < nodes.length; i++) {
for(var name in rsOptions.server) {
nodes[i].options[name] = rsOptions.server[name];
}
}

// Create a manager
var replicasetManager = new ReplSetManager('mongod', nodes, rsOptions.client);
// Purge the set
replicasetManager.purge().then(function() {
// Start the server
replicasetManager.start().then(function() {
process.exit(0);
}).catch(function(e) {
console.log("====== ")
console.dir(e)
// // console.dir(e);
});
});
18 changes: 12 additions & 6 deletions lib/apm.js
Expand Up @@ -110,7 +110,7 @@ var Instrumentation = function(core, options, callback) {
var db = ns.split('.')[0];

// Do we have a legacy insert/update/remove command
if(x == 'insert' && !this.lastIsMaster().maxWireVersion) {
if(x == 'insert') { //} && !this.lastIsMaster().maxWireVersion) {
commandName = 'insert';
// Get the collection
var col = ns.split('.');
Expand All @@ -127,7 +127,7 @@ var Instrumentation = function(core, options, callback) {
}

commandObj.ordered = options.ordered != undefined ? options.ordered : true;
} else if(x == 'update' && !this.lastIsMaster().maxWireVersion) {
} else if(x == 'update') { // && !this.lastIsMaster().maxWireVersion) {
commandName = 'update';

// Get the collection
Expand All @@ -145,7 +145,7 @@ var Instrumentation = function(core, options, callback) {
}

commandObj.ordered = options.ordered != undefined ? options.ordered : true;
} else if(x == 'remove' && !this.lastIsMaster().maxWireVersion) {
} else if(x == 'remove') { //&& !this.lastIsMaster().maxWireVersion) {
commandName = 'delete';

// Get the collection
Expand All @@ -163,20 +163,24 @@ var Instrumentation = function(core, options, callback) {
}

commandObj.ordered = options.ordered != undefined ? options.ordered : true;
} else if(x == 'insert' || x == 'update' || x == 'remove' && this.lastIsMaster().maxWireVersion >= 2) {
// Skip the insert/update/remove commands as they are executed as actual write commands in 2.6 or higher
return func.apply(this, args);
// } else if(x == 'insert' || x == 'update' || x == 'remove' && this.lastIsMaster().maxWireVersion >= 2) {
// // Skip the insert/update/remove commands as they are executed as actual write commands in 2.6 or higher
// return func.apply(this, args);
}

console.log("=== APM 0")

// Get the callback
var callback = args.pop();
// Set current callback operation id from the current context or create
// a new one
var ourOpId = callback.operationId || operationIdGenerator.next();
console.log("=== APM 1")

// Get a connection reference for this server instance
var connection = this.s.pool.get()

console.log("=== APM 2")
// Emit the start event for the command
var command = {
// Returns the command.
Expand All @@ -196,6 +200,8 @@ var Instrumentation = function(core, options, callback) {
connectionId: connection
};

console.log("=== APM 3")

// Filter out any sensitive commands
if(senstiveCommands.indexOf(commandName.toLowerCase())) {
command.commandObj = {};
Expand Down
7 changes: 7 additions & 0 deletions lib/collection.js
Expand Up @@ -611,6 +611,9 @@ var bulkWrite = function(self, operations, options, callback) {
var finalOptions = writeConcern(shallowClone(options), self.s.db, self, options);
var writeCon = finalOptions.writeConcern ? finalOptions.writeConcern : {};

// console.log("%%%%%%%%%%%%%%%%%%%%% execute")
// console.dir(options.ordered)

// Execute the bulk
bulk.execute(writeCon, function(err, r) {
// We have connection level error
Expand All @@ -620,6 +623,10 @@ var bulkWrite = function(self, operations, options, callback) {
return callback(toError(r.getWriteErrorAt(0)), r);
}

// console.log("!!!!!! BULK EXECUTE FINISHED")
// console.dir(err)
// console.dir(r)

// if(err) return callback(err);
r.insertedCount = r.nInserted;
r.matchedCount = r.nMatched;
Expand Down
5 changes: 5 additions & 0 deletions lib/cursor.js
Expand Up @@ -645,6 +645,7 @@ define.classMethod('skip', {callback: false, promise:false, returns: [Cursor]});
Cursor.prototype.nextObject = Cursor.prototype.next;

var nextObject = function(self, callback) {
// console.log("cursor:: nextObject")
if(self.s.state == Cursor.CLOSED || self.isDead && self.isDead()) return handleCallback(callback, MongoError.create({message: "Cursor is closed", driver:true}));
if(self.s.state == Cursor.INIT && self.s.cmd.sort) {
try {
Expand Down Expand Up @@ -826,13 +827,17 @@ Cursor.prototype.toArray = function(callback) {
var toArray = function(self, callback) {
var items = [];

// console.log("!!!!!!!!!!!!! toArray :: 0")
// Reset cursor
self.rewind();
self.s.state = Cursor.INIT;
// console.log("!!!!!!!!!!!!! toArray :: 1")

// Fetch all the documents
var fetchDocs = function() {
// console.log("!!!!!!!!!!!!! toArray :: 2")
self._next(function(err, doc) {
// console.log("!!!!!!!!!!!!! toArray :: 3")
if(err) return handleCallback(callback, err);
if(doc == null) {
self.s.state = Cursor.CLOSED;
Expand Down
9 changes: 0 additions & 9 deletions lib/db.js
Expand Up @@ -147,15 +147,6 @@ var Db = function(databaseName, topology, options) {
// Ensure we have a valid db name
validateDatabaseName(self.s.databaseName);

// If we have specified the type of parser
if(typeof self.s.nativeParser == 'boolean') {
if(self.s.nativeParser) {
topology.setBSONParserType("c++");
} else {
topology.setBSONParserType("js");
}
}

// Add a read Only property
getSingleProperty(this, 'serverConfig', self.s.topology);
getSingleProperty(this, 'bufferMaxEntries', self.s.bufferMaxEntries);
Expand Down
4 changes: 4 additions & 0 deletions lib/mongo_client.js
Expand Up @@ -216,8 +216,11 @@ var connect = function(url, options, callback) {
: new Server(object.servers[i].host, object.servers[i].port, _server_options);

var connectFunction = function(__server) {
console.log("!!!!!!!!!!!!!!!! attemptConnect")
// Attempt connect
new Db(object.dbName, __server, {w:1, native_parser:false}).open(function(err, db) {
console.log("!!!!!!!!!!!!!!!! attemptConnect 1")
console.dir(err)
// Update number of servers
totalNumberOfServers = totalNumberOfServers - 1;

Expand Down Expand Up @@ -423,6 +426,7 @@ var _finishConnecting = function(serverConfig, object, options, callback) {

// Authenticate
authentication_db.authenticate(options.user, options.password, options, function(err, success){
console.log("++++ _finishConnecting")
if(success){
process.nextTick(function() {
try {
Expand Down
4 changes: 0 additions & 4 deletions lib/mongos.js
Expand Up @@ -365,10 +365,6 @@ Mongos.prototype.cursor = function(ns, cmd, options) {

define.classMethod('cursor', {callback: false, promise:false, returns: [Cursor, AggregationCursor, CommandCursor]});

Mongos.prototype.setBSONParserType = function(type) {
return this.s.mongos.setBSONParserType(type);
}

Mongos.prototype.lastIsMaster = function() {
return this.s.mongos.lastIsMaster();
}
Expand Down
4 changes: 0 additions & 4 deletions lib/replset.js
Expand Up @@ -405,10 +405,6 @@ ReplSet.prototype.isConnected = function() {

define.classMethod('isConnected', {callback: false, promise:false, returns: [Boolean]});

ReplSet.prototype.setBSONParserType = function(type) {
return this.s.replset.setBSONParserType(type);
}

// Insert
ReplSet.prototype.cursor = function(ns, cmd, options) {
options = translateReadPreference(options);
Expand Down
11 changes: 7 additions & 4 deletions lib/server.js
Expand Up @@ -249,7 +249,10 @@ Server.prototype.connect = function(db, _options, callback) {

// Error handler
var reconnectHandler = function(err) {
// console.log("++ Server:: reconnect")
self.emit('reconnect', self);
// console.log("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! reconnectHandler")
// console.dir(self.s.store)
self.s.store.execute();
}

Expand All @@ -267,6 +270,8 @@ Server.prototype.connect = function(db, _options, callback) {
self.s.server.removeAllListeners(e);
});

// console.dir("========== connect")

// Set up listeners
self.s.server.once('timeout', errorHandler('timeout'));
self.s.server.once('error', errorHandler('error'));
Expand All @@ -291,6 +296,8 @@ Server.prototype.connect = function(db, _options, callback) {
self.s.server.on('topologyOpening', relay('topologyOpening'));
self.s.server.on('topologyClosed', relay('topologyClosed'));
self.s.server.on('topologyDescriptionChanged', relay('topologyDescriptionChanged'));
self.s.server.on('attemptReconnect', relay('attemptReconnect'));
self.s.server.on('monitoring', relay('monitoring'));

// Emit open event
self.emit('open', null, self);
Expand Down Expand Up @@ -380,10 +387,6 @@ Server.prototype.cursor = function(ns, cmd, options) {

define.classMethod('cursor', {callback: false, promise:false, returns: [Cursor, AggregationCursor, CommandCursor]});

Server.prototype.setBSONParserType = function(type) {
return this.s.server.setBSONParserType(type);
}

Server.prototype.lastIsMaster = function() {
return this.s.server.lastIsMaster();
}
Expand Down
3 changes: 3 additions & 0 deletions lib/topology_base.js
Expand Up @@ -79,6 +79,9 @@ Store.prototype.execute = function() {
while(ops.length > 0) {
var op = ops.shift();

// console.log("======= execute op")
// console.dir(op)

if(op.t == 'cursor') {
op.o[op.m].apply(op.o, op.p);
} else {
Expand Down
46 changes: 46 additions & 0 deletions test.js
@@ -0,0 +1,46 @@
var mongodb = require("./");
/*
db.createUser( {
user: "root",
pwd: "root",
roles: [ { role: "root", db: "admin" } ]
});
*/
// var uri = "mongodb://muser:mpass@ds021331-a0.mlab.com:21331,ds021331-a1.mlab.com:21331/mdb?replicaSet=rs-ds021331";
// var uri = "mongodb://root:root@localhost:31000,localhost:31001/admin?replicaSet=rs";
// var uri = "mongodb://foo:bar@ds021331-a0.mlab.com:21331,ds021331-a1.mlab.com:21331/driver-test?replicaSet=rs-ds021331";
var uri = "mongodb://root:root@localhost:27017/admin?socketTimeoutMS=5000&connectTimeoutMS=5000";
var interval = 1000;
var index = 0;
var total = 1000;

var openThenClose = function(){
var i = index;
var MongoClient = mongodb.MongoClient;
MongoClient.connect(uri, function(err, db) {
if(!err) {
console.log("open " + i);
var descriptors = db.db('test').collection("some-collection").find();
descriptors.toArray(function(err, docs){
db.close(function(){
console.log("close " + i);
});
});
}
});
};

// Run interval total times
var intervalId = setInterval(function(){
index = index + 1;

if(index > total) {
return clearInterval(intervalId);
}

openThenClose();
}, interval);

// keep alive
setInterval(function(){
}, interval * 10)
14 changes: 9 additions & 5 deletions test/functional/connection_tests.js
Expand Up @@ -11,10 +11,11 @@ exports['Should correctly start monitoring for single server connection'] = {
var db = configuration.newDbInstanceWithDomainSocket({w:1}, {poolSize: 1, host: "/tmp/mongodb-27017.sock"});
db.open(function(err, db) {
test.equal(null, err);
test.ok(db.serverConfig.s.server.s.inquireServerStateTimeout != null);

db.close();
test.done();
db.serverConfig.once('monitoring', function() {
db.close();
test.done();
});
});
}
}
Expand All @@ -30,7 +31,7 @@ exports['Should correctly disable monitoring for single server connection'] = {
var db = configuration.newDbInstanceWithDomainSocket({w:1}, {poolSize: 1, host: "/tmp/mongodb-27017.sock", monitoring: false});
db.open(function(err, db) {
test.equal(null, err);
test.equal(null, db.serverConfig.s.server.s.inquireServerStateTimeout);
test.equal(false, db.serverConfig.s.server.s.monitoring);

db.close();
test.done();
Expand Down Expand Up @@ -159,11 +160,14 @@ exports['Should fail to connect using non-domain socket with undefined port'] =
function connectionTester(test, testName, callback) {
return function(err, db) {
test.equal(err, null);

db.collection(testName, function(err, collection) {
test.equal(err, null);
var doc = {foo:123};

collection.insert({foo:123}, {w:1}, function(err, docs) {
test.equal(err, null);

db.dropDatabase(function(err, done) {
test.equal(err, null);
test.ok(done);
Expand Down Expand Up @@ -402,7 +406,7 @@ exports['Should correctly reconnect and finish query operation'] = {
test.equal(null, err);
test.equal(1, doc.a);
test.equal(2, dbReconnect);
test.equal(1, dbClose);
test.equal(2, dbClose);

db.close();
test.done();
Expand Down
8 changes: 8 additions & 0 deletions test/functional/db_tests.js
Expand Up @@ -75,10 +75,18 @@ exports.shouldCorrectlyPerformAutomaticConnect = {
var collection = automatic_connect_client.collection('test_object_id_generation_data2');
// Insert another test document and collect using ObjectId
collection.insert({"name":"Patty", "age":34}, configuration.writeConcernMax(), function(err, r) {
// console.log("----------------------------------------- TEST -1")
// console.dir(err)
// console.dir(r)
test.equal(1, r.ops.length);
test.ok(r.ops[0]._id.toHexString().length == 24);

// console.log("-------------------------------------- TEST 0")
collection.findOne({"name":"Patty"}, function(err, document) {
// console.log("-------------------------------------- TEST 1")
// console.log(r.ops[0])
// console.dir(err)
// console.log(document)
test.equal(r.ops[0]._id.toHexString(), document._id.toHexString());
// Let's close the db
automatic_connect_client.close();
Expand Down
2 changes: 2 additions & 0 deletions test/functional/error_tests.js
Expand Up @@ -8,7 +8,9 @@ exports.shouldFailInsertDueToUniqueIndex = {
var db = configuration.newDbInstance({w:1}, {poolSize:1});
db.open(function(err, db) {
var collection = db.collection('test_failing_insert_due_to_unique_index');
console.log("!!!!!!!!!!!!!!!!!!! ensureIndex 0")
collection.ensureIndex([['a', 1 ]], {unique:true, w:1}, function(err, indexName) {
console.log("!!!!!!!!!!!!!!!!!!! ensureIndex 1")
test.equal(null, err);

collection.insert({a:2}, {w: 1}, function(err, r) {
Expand Down

0 comments on commit a481a42

Please sign in to comment.