Skip to content

Commit

Permalink
Fix for wire protocol parser for corner situation where the message i…
Browse files Browse the repository at this point in the history
…s larger than the maximum socket buffer in node.js (Issue #464, #461, #447)
  • Loading branch information
christkv committed Dec 30, 2011
1 parent 3884a78 commit 673fc2b
Show file tree
Hide file tree
Showing 5 changed files with 48 additions and 14 deletions.
1 change: 1 addition & 0 deletions HISTORY
@@ -1,6 +1,7 @@
* Moved BSON_BINARY_SUBTYPE_DEFAULT from BSON object to Binary object and removed the BSON_BINARY_ prefixes
* Removed Native BSON types, C++ parser uses JS types (faster due to cost of crossing the JS-C++ barrier for each call)
* Added build fix for 0.4.X branch of Node.js where GetOwnPropertyNames is not defined in v8
* Fix for wire protocol parser for corner situation where the message is larger than the maximum socket buffer in node.js (Issue #464, #461, #447)

0.9.7-2-5 2011-12-22
* Brand spanking new Streaming Cursor support Issue #458 (https://github.com/christkv/node-mongodb-native/pull/458) thanks to Mr Aaron Heckmann
Expand Down
16 changes: 13 additions & 3 deletions lib/mongodb/connection/connection.js
Expand Up @@ -21,8 +21,7 @@ var Connection = exports.Connection = function(id, socketOptions) {
//
// Connection parsing state
//

this.maxBsonSize = socketOptions.maxBsonSize ? socketOptions.maxBsonSize : global.DEFAULT_MAX_BSON_SIZE;
this.maxBsonSize = socketOptions.maxBsonSize ? socketOptions.maxBsonSize : Connection.DEFAULT_MAX_BSON_SIZE;
// Contains the current message bytes
this.buffer = null;
// Contains the current message size
Expand Down Expand Up @@ -85,7 +84,7 @@ Connection.prototype.start = function() {
// Create a new stream
this.connection = new net.Stream();
// // Create new connection instance
// this.connection = new net.Socket();
this.connection = new net.Socket();
// Set options on the socket
this.connection.setTimeout(this.socketOptions.timeout);
this.connection.setNoDelay(this.socketOptions.noDelay);
Expand Down Expand Up @@ -179,6 +178,9 @@ var createDataHandler = exports.Connection.createDataHandler = function(self) {
// We need to handle the parsing of the data
// and emit the messages when there is a complete one
return function(data) {
// console.log("----------------------------------------------------------- DATA :: " + data.length)
// console.dir(data.toString('hex'))

// Parse until we are done with the data
while(data.length > 0) {
// If we still have bytes to read on the current message
Expand Down Expand Up @@ -255,6 +257,9 @@ var createDataHandler = exports.Connection.createDataHandler = function(self) {
if(data.length > 4) {
// Retrieve the message size
var sizeOfMessage = binaryutils.decodeUInt32(data, 0);
// console.log("=================================================== parse sizeOfMessage")
// console.dir(sizeOfMessage)

// If we have a negative sizeOfMessage emit error and return
if(sizeOfMessage < 0 || sizeOfMessage > self.maxBsonSize) {
var errorObject = {err:"socketHandler", trace:'', bin:self.buffer, parseState:{
Expand All @@ -267,6 +272,11 @@ var createDataHandler = exports.Connection.createDataHandler = function(self) {
return;
}

// console.log("--------------------------------------------------------------------")
// console.dir(sizeOfMessage)
// console.dir(self.maxBsonSize)
// console.dir(data.length)

// Ensure that the size of message is larger than 0 and less than the max allowed
if(sizeOfMessage > 4 && sizeOfMessage < self.maxBsonSize && sizeOfMessage > data.length) {
self.buffer = new Buffer(sizeOfMessage);
Expand Down
2 changes: 1 addition & 1 deletion lib/mongodb/connection/connection_pool.js
Expand Up @@ -52,7 +52,7 @@ var ConnectionPool = exports.ConnectionPool = function(host, port, poolSize, bso
inherits(ConnectionPool, EventEmitter);

ConnectionPool.prototype.setMaxBsonSize = function(maxBsonSize) {
if (typeof maxBsonSize == null){
if(maxBsonSize == null){
maxBsonSize = Connection.DEFAULT_MAX_BSON_SIZE;
}

Expand Down
6 changes: 3 additions & 3 deletions lib/mongodb/cursor.js
Expand Up @@ -170,7 +170,7 @@ Cursor.prototype.each = function(callback) {
//FIX: stack overflow (on deep callback) (cred: https://github.com/limp/node-mongodb-native/commit/27da7e4b2af02035847f262b29837a94bbbf6ce2)
process.nextTick(function(){
// Fetch the next object until there is no more objects
self.nextObject(function(err, item) {
self.nextObject(function(err, item) {
if(err != null) return callback(err, null);

if(item != null) {
Expand Down Expand Up @@ -506,7 +506,7 @@ Cursor.prototype.nextObject = function(callback) {
Cursor.prototype.getMore = function(callback) {
var self = this;
var limit = 0;

if (!self.tailable && self.limitValue > 0) {
limit = self.limitValue - self.totalNumberOfRecords;
if (limit < 1) {
Expand All @@ -529,7 +529,7 @@ Cursor.prototype.getMore = function(callback) {
var excessResult = self.totalNumberOfRecords - self.limitValue;

if (excessResult > 0) {
result.documents.splice(-1*excessResult, excessResult);
result.documents.splice(-1 * excessResult, excessResult);
}
}

Expand Down
37 changes: 30 additions & 7 deletions test/find_test.js
Expand Up @@ -601,7 +601,7 @@ var tests = testCase({
test.equal(3, updated_doc.a);
test.equal(2, updated_doc.b);

// // Let's upsert!
// Let's upsert!
collection.findAndModify({'a':4}, [], {'$set':{'b':3}}, {'new': true, upsert: true}, function(err, updated_doc) {
test.equal(4, updated_doc.a);
test.equal(3, updated_doc.b);
Expand Down Expand Up @@ -1101,16 +1101,39 @@ var tests = testCase({
if(running) process.nextTick(insert);
});
}

// while(running) {
// process.nextTick(function() {
// collection.insert({a:1});
// })
// }
});
});
},

shouldCorrectlyIterateOverCollection : function(test) {
var p_client = new Db(MONGODB, new Server("127.0.0.1", 27017, {auto_reconnect: true, poolSize:1}), {native_parser: (process.env['TEST_NATIVE'] != null)});
var numberOfSteps = 0;

// Open db connection
p_client.open(function(err, p_client) {
// Create a collection
p_client.createCollection('shouldCorrectlyIterateOverCollection', function(err, collection) {
for(var i = 0; i < 1000; i++) {
collection.insert({a:1, b:2, c:{d:3, f:'sfdsffffffffffffffffffffffffffffff'}});
}

collection.find({}, {}, function(err,cursor) {
cursor.count(function(err,count) {
cursor.each(function(err, obj) {
if (obj == null) {
p_client.close();
test.equal(1000, numberOfSteps);
test.done();
} else {
numberOfSteps = numberOfSteps + 1;
}
});
});
});
});
});
},

noGlobalsLeaked : function(test) {
var leaks = gleak.detectNew();
test.equal(0, leaks.length, "global var leak detected: " + leaks.join(', '));
Expand Down

0 comments on commit 673fc2b

Please sign in to comment.