Skip to content

Commit

Permalink
Fixing up some bugs for connection pooling, still missing authenticat…
Browse files Browse the repository at this point in the history
…ion for pools
  • Loading branch information
christkv committed Jun 1, 2011
1 parent f868a93 commit 038abb5
Show file tree
Hide file tree
Showing 11 changed files with 261 additions and 202 deletions.
18 changes: 17 additions & 1 deletion lib/mongodb/admin.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
var Collection = require('./collection').Collection,
Cursor = require('./cursor').Cursor,
DbCommand = require('./commands/db_command').DbCommand;
DbCommand = require('./commands/db_command').DbCommand,
debug = require('util').debug,
inspect = require('util').inspect;

var Admin = exports.Admin = function(db) {
this.db = db;
Expand All @@ -21,6 +23,7 @@ Admin.prototype.profilingLevel = function(callback) {

this.command(command, function(err, doc) {
doc = doc.documents[0];

if(err == null && (doc.ok == 1 || doc.was.constructor == Numeric)) {
var was = doc.was;
if(was == 0) {
Expand All @@ -38,6 +41,17 @@ Admin.prototype.profilingLevel = function(callback) {
});
};

Admin.prototype.authenticate = function(username, password, callback) {
var self = this;

var databaseName = this.db.databaseName;
this.db.databaseName = 'admin';
this.db.authenticate(username, password, function(err, result) {
self.db.databaseName = databaseName;
return callback(err, result);
})
}

Admin.prototype.setProfilingLevel = function(level, callback) {
var self = this;
var command = {};
Expand All @@ -56,6 +70,7 @@ Admin.prototype.setProfilingLevel = function(level, callback) {

this.command(command, function(err, doc) {
doc = doc.documents[0];

if(err == null && (doc.ok == 1 || doc.was.constructor == Numeric)) {
return callback(null, level);
} else {
Expand All @@ -78,6 +93,7 @@ Admin.prototype.profilingInfo = function(callback) {

Admin.prototype.command = function(command, callback) {
var self = this;

// Execute a command
this.db.executeDbAdminCommand(command, function(err, result) {
// Ensure change before event loop executes
Expand Down
20 changes: 12 additions & 8 deletions lib/mongodb/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ Collection.prototype.remove = function remove (selector, options, callback) {
var errorOptions = options.safe != null ? options.safe : null;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;
// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection)
this.db.executeCommand(deleteCommand, false, errorOptions, function (err, error) {
this.db.executeCommand(deleteCommand, {read:false, safe: errorOptions}, function (err, error) {
error = error && error.documents;
if(!callback) return;

Expand Down Expand Up @@ -272,8 +272,12 @@ Collection.prototype.insertAll = function insertAll (docs, options, callback) {
var errorOptions = options.safe != null ? options.safe : null;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;

// Insert options
var insertOptions = {read:false, safe: errorOptions};
// If we have safe set set async to false
if(errorOptions == null) insertOptions['async'] = true;
// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection)
this.db.executeCommand(insertCommand, false, errorOptions, function (err, error) {
this.db.executeCommand(insertCommand, insertOptions, function (err, error) {
error = error && error.documents;
if(!callback) return;

Expand Down Expand Up @@ -393,7 +397,7 @@ Collection.prototype.update = function update (selector, document, options, call
var errorOptions = (options && options.safe != null) ? options.safe : null;
errorOptions = errorOptions == null && this.opts.safe != null ? this.opts.safe : errorOptions;
// Execute command with safe options (rolls up both command and safe command into one and executes them on the same connection)
this.db.executeCommand(updateCommand, false, errorOptions, function (err, error) {
this.db.executeCommand(updateCommand, {read:false, safe: errorOptions}, function (err, error) {
error = error && error.documents;
if(!callback) return;

Expand Down Expand Up @@ -461,7 +465,7 @@ Collection.prototype.distinct = function distinct (key, query, callback) {

var cmd = DbCommand.createDbCommand(this.db, mapCommandHash);

this.db.executeCommand(cmd, true, function (err, result) {
this.db.executeCommand(cmd, {read:true}, function (err, result) {
if (err) {
return callback(err);
}
Expand Down Expand Up @@ -500,7 +504,7 @@ Collection.prototype.count = function count (query, callback) {
, null
);

this.db.executeCommand(queryCommand, true, function (err, result) {
this.db.executeCommand(queryCommand, {read:true}, function (err, result) {
if (err) {
callback(err);
} else if (result.documents[0].ok != 1) {
Expand Down Expand Up @@ -751,7 +755,7 @@ Collection.prototype.findOne = function findOne (queryObject, options, callback)
, query
, fields);

this.db.executeCommand(queryCommand, true, function (err, result) {
this.db.executeCommand(queryCommand, {read:true}, function (err, result) {
if (!err && result.documents[0] && result.documents[0]['$err']) {
return callback(result.documents[0]['$err']);
}
Expand Down Expand Up @@ -870,7 +874,7 @@ Collection.prototype.mapReduce = function mapReduce (map, reduce, options, callb
var self = this;
var cmd = DbCommand.createDbCommand(this.db, mapCommandHash);

this.db.executeCommand(cmd, true, function (err, result) {
this.db.executeCommand(cmd, {read:true}, function (err, result) {
if (err) {
return callback(err);
}
Expand Down Expand Up @@ -987,7 +991,7 @@ Collection.prototype.group = function group (keys, condition, initial, reduce, c

var cmd = DbCommand.createDbCommand(this.db, selector);

this.db.executeCommand(cmd, true, function (err, result) {
this.db.executeCommand(cmd, {read:true}, function (err, result) {
if (err) return callback(err);

var document = result.documents[0];
Expand Down
5 changes: 4 additions & 1 deletion lib/mongodb/commands/db_command.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ DbCommand.createGetNonceCommand = function(db) {
DbCommand.createAuthenticationCommand = function(db, username, password, nonce) {
// Generate keys used for authentication
var hash_password = MD5.hex_md5(username + ":mongo:" + password);
// debug("=========================== hash_password :: " + hash_password)
var key = MD5.hex_md5(nonce + username + hash_password);
// debug("=========================== pre_hash_key :: " + (nonce + username + hash_password))
// debug("=========================== key :: " + key)
var selector = {'authenticate':1, 'user':username, 'nonce':nonce, 'key':key};
// Create db command
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NO_CURSOR_TIMEOUT, 0, -1, selector, null);
return new DbCommand(db, db.databaseName + "." + DbCommand.SYSTEM_COMMAND_COLLECTION, QueryCommand.OPTS_NONE, 0, -1, selector, null);
};

DbCommand.createLogoutCommand = function(db) {
Expand Down
26 changes: 17 additions & 9 deletions lib/mongodb/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,16 +46,19 @@ var setupConnectionPool = function(self, poolSize, reconnect) {
//
// Listener that handles callbacks for the connection
// Uses the internal object states to keep individual tcp connections seperate
var receiveListener = function(result) {
var receiveListener = function(result, fd) {
fd = fd == null ? this.fd : fd;

// Fetch the pool reference
var conObj = self.poolByReference[this.fd];
var conObj = self.poolByReference[fd];

// if(conObj == null) {
// debug("================================================================ failed to find connection :: " + this.fd)
// debug(inspect(self.poolByReference))
// }

// Check if we have an unfinished message
if(conObj.bytesRead > 0 && conObj.sizeOfMessage > 0) {
if(conObj != null && conObj.bytesRead > 0 && conObj.sizeOfMessage > 0) {
// Calculate remaing bytes to fetch
var remainingBytes = conObj.sizeOfMessage - conObj.bytesRead;
// Check if we have multiple packet messages and save the pieces otherwise emit the message
Expand All @@ -70,10 +73,12 @@ var setupConnectionPool = function(self, poolSize, reconnect) {
conObj.buffer = ''; conObj.bytesRead = 0; conObj.sizeOfMessage = 0;
// If message is longer than the current one, keep parsing
if(remainingBytes < result.length) {
receiveListener.call(this, result.substr(remainingBytes, (result.length - remainingBytes)));
// debug("--------------------------------------- remainingBytes < result.length :: " + this.fd)
// receiveListener.call(this, result.substr(remainingBytes, (result.length - remainingBytes)));
receiveListener(result.substr(remainingBytes, (result.length - remainingBytes)), fd);
}
}
} else {
} else if(conObj != null){
if(conObj.stubBuffer.length > 0) {
result = conObj.stubBuffer + result;
conObj.stubBuffer = '';
Expand All @@ -88,7 +93,9 @@ var setupConnectionPool = function(self, poolSize, reconnect) {
self.emit("data", result);
} else if(sizeOfMessage < result.length) {
self.emit("data", result.substr(0, sizeOfMessage));
receiveListener.call(this, result.substr(sizeOfMessage, (result.length - sizeOfMessage)));
// debug("--------------------------------------- sizeOfMessage < result.length :: " + this.fd)
// receiveListener.call(this, result.substr(sizeOfMessage, (result.length - sizeOfMessage)));
receiveListener(result.substr(sizeOfMessage, (result.length - sizeOfMessage)), fd);
}
} else {
conObj.stubBuffer = result;
Expand Down Expand Up @@ -278,15 +285,16 @@ Connection.prototype.send = function(command) {
// Send the command, if it's an array of commands execute them all on the same connection
if(Array.isArray(command)) {
for(var i = 0; i < command.length; i++) {
// debug("========================================================================= command string")
// BinaryParser.ilprint((command.constructor == String) ? command : command.toBinary())
connection.write((command[i].constructor == String) ? command[i] : command[i].toBinary(), "binary");
}
} else {
// debug("========================================================================= command string")
// BinaryParser.ilprint((command.constructor == String) ? command : command.toBinary())
connection.write((command.constructor == String) ? command : command.toBinary(), "binary");
}
} catch(err) {
// debug("======================================================= err")
// debug(inspect(err))

// Check if the connection is closed
if(connection.readyState != "open" && self.autoReconnect) {
// Add the message to the queue of messages to send
Expand Down
8 changes: 4 additions & 4 deletions lib/mongodb/cursor.js
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ Cursor.prototype.nextObject = function(callback) {
result = null;
};

self.db.executeCommand(self.generateQueryCommand(), true, commandHandler);
self.db.executeCommand(self.generateQueryCommand(), {read:true}, commandHandler);
commandHandler = null;
} catch(err) {
callback(new Error(err.toString()), null);
Expand Down Expand Up @@ -499,7 +499,7 @@ Cursor.prototype.getMore = function(callback) {
try {
var getMoreCommand = new GetMoreCommand(self.db, self.collectionName, self.limitRequest(), self.cursorId);
// Execute the command
self.db.executeCommand(getMoreCommand, true, function(err, result) {
self.db.executeCommand(getMoreCommand, {read:true}, function(err, result) {
self.cursorId = result.cursorId;
self.totalNumberOfRecords += result.numberReturned;
// Determine if there's more documents to fetch
Expand Down Expand Up @@ -574,7 +574,7 @@ Cursor.prototype.streamRecords = function(options) {
execute(queryCommand);

function execute(command) {
self.db.executeCommand(command, true, function(err,result) {
self.db.executeCommand(command, {read:true}, function(err,result) {
if (!self.queryRun && result) {
self.queryRun = true;
self.cursorId = result.cursorId;
Expand Down Expand Up @@ -624,7 +624,7 @@ Cursor.prototype.close = function(callback) {
if(this.cursorId instanceof self.db.bson_serializer.Long && this.cursorId.greaterThan(self.db.bson_serializer.Long.fromInt(0))) {
try {
var command = new KillCursorCommand(this.db, [this.cursorId]);
this.db.executeCommand(command, true, null);
this.db.executeCommand(command, {read:true}, null);
} catch(err) {}
}

Expand Down
42 changes: 30 additions & 12 deletions lib/mongodb/db.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ var Db = exports.Db = function(databaseName, serverConfig, options) {
// If we tried to instantiate the native driver
throw "Native bson parser not compiled, please compile or avoid using native_parser=true";
}
debug(inspect(options))

// this.connections = [];
// State of the db connection
Expand Down Expand Up @@ -202,7 +201,7 @@ Db.prototype.authenticate = function(username, password, callback) {
// Nonce used to make authentication request with md5 hash
var nonce = reply.documents[0].nonce;
// Execute command
self.executeCommand(DbCommand.createAuthenticationCommand(self, username, password, nonce), function(err, result) {
self.executeCommand(DbCommand.createAuthenticationCommand(self, username, password, nonce), function(err, result) {
if(err == null && result.documents[0].ok == 1) {
callback(null, true);
} else {
Expand All @@ -223,7 +222,7 @@ Db.prototype.addUser = function(username, password, callback) {
// Fetch a user collection
this.collection(DbCommand.SYSTEM_USER_COLLECTION, function(err, collection) {
// Insert the user into the system users collections
collection.insert({user: username, pwd: userPassword}, function(err, documents) {
collection.insert({user: username, pwd: userPassword}, {safe:true}, function(err, documents) {
callback(err, documents);
});
});
Expand Down Expand Up @@ -278,7 +277,7 @@ Db.prototype.createCollection = function(collectionName, options, callback) {
}

// Create a new collection and return it
self.executeCommand(DbCommand.createCreateCollectionCommand(self, collectionName, options), false, true, function(err, result) {
self.executeCommand(DbCommand.createCreateCollectionCommand(self, collectionName, options), {read:false, safe:true}, function(err, result) {
if(err == null && result.documents[0].ok == 1) {
callback(null, new Collection(self, collectionName, self.pkFactory));
} else {
Expand Down Expand Up @@ -371,7 +370,7 @@ Db.prototype.resetErrorHistory = function(callback) {
Db.prototype.createIndex = function(collectionName, fieldOrSpec, options, callback) {
if(callback == null) { callback = options; options = null; }
var command = DbCommand.createCreateIndexCommand(this, collectionName, fieldOrSpec, options);
this.executeCommand(command, false, true, function(err, result) {
this.executeCommand(command, {read:false, safe:true}, function(err, result) {
result = result && result.documents;
if (err) {
callback(err);
Expand All @@ -394,7 +393,7 @@ Db.prototype.ensureIndex = function(collectionName, fieldOrSpec, options, callba
// Check if the index allready exists
this.indexInformation(collectionName, function(err, collectionInfo) {
if(!collectionInfo[index_name]) {
self.executeCommand(command, false, true, function(err, result) {
self.executeCommand(command, {read:false, safe:true}, function(err, result) {
result = result && result.documents;
if (err) {
callback(err);
Expand Down Expand Up @@ -460,17 +459,31 @@ Db.prototype.dropDatabase = function(callback) {
/**
Execute db command
**/
Db.prototype.executeCommand = function(db_command, read, safe, callback) {
Db.prototype.executeCommand = function(db_command, options, callback) {
var self = this;

var args = Array.prototype.slice.call(arguments, 1);
callback = args.pop();
read = args.length ? args.shift() : false;
safe = args.length ? args.shift() : false;
options = args.length ? args.shift() : {};

// debug("---------------------------------------------------------------------------")
// debug("---------------------------------------------------------------------------")
//
// debug(inspect(db_command))
// debug(inspect(options))

// Options unpacking
var read = options['read'] != null ? options['read'] : false;
var safe = options['safe'] != null ? options['safe'] : false;
var executeAll = options['executeAll'] != null ? options['executeAll'] : false;

// debug("=================== read :: " + read)
// debug("=================== safe :: " + safe)
// debug("=================== callback :: " + callback)

// if()
var errorCommand = null;
if(safe != false) {
if(safe == true) {
errorCommand = DbCommand.createGetLastErrorCommand(safe, this);
}

Expand Down Expand Up @@ -538,10 +551,15 @@ Db.prototype.executeCommand = function(db_command, read, safe, callback) {
} else {
writer.send(db_command)
}
} catch(err){

// Ensure proper callback if we provide a callback but the operation is async
// if(callback instanceof Function) {
// return callback(err, null);
// }
} catch(err){
if(!(err instanceof Error)) err = new Error(err);
if(callback instanceof Function) {
delete self.notReplied[db_command.getRequestId().toString()];
if(errorCommand != null) delete self.notReplied[errorCommand.getRequestId().toString()];
return callback(err, null);
}

Expand Down
Loading

0 comments on commit 038abb5

Please sign in to comment.