diff --git a/lib/cmap/message_stream.js b/lib/cmap/message_stream.js index 4c64895724..c8f458e53a 100644 --- a/lib/cmap/message_stream.js +++ b/lib/cmap/message_stream.js @@ -39,32 +39,7 @@ class MessageStream extends Duplex { const buffer = this[kBuffer]; buffer.append(chunk); - while (buffer.length >= 4) { - const sizeOfMessage = buffer.readInt32LE(0); - if (sizeOfMessage < 0) { - callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); - return; - } - - if (sizeOfMessage > this.maxBsonMessageSize) { - callback( - new MongoParseError( - `Invalid message size: ${sizeOfMessage}, max allowed: ${this.maxBsonMessageSize}` - ) - ); - return; - } - - if (sizeOfMessage > buffer.length) { - callback(); - return; - } - - const messageBuffer = buffer.slice(0, sizeOfMessage); - buffer.consume(sizeOfMessage); - - processMessage(this, messageBuffer, callback); - } + processIncomingData(this, callback); } _read(/* size */) { @@ -125,7 +100,36 @@ function canCompress(command) { return !uncompressibleCommands.has(commandName); } -function processMessage(stream, message, callback) { +function processIncomingData(stream, callback) { + const buffer = stream[kBuffer]; + if (buffer.length < 4) { + callback(); + return; + } + + const sizeOfMessage = buffer.readInt32LE(0); + if (sizeOfMessage < 0) { + callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); + return; + } + + if (sizeOfMessage > stream.maxBsonMessageSize) { + callback( + new MongoParseError( + `Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}` + ) + ); + return; + } + + if (sizeOfMessage > buffer.length) { + callback(); + return; + } + + const message = buffer.slice(0, sizeOfMessage); + buffer.consume(sizeOfMessage); + const messageHeader = { length: message.readInt32LE(0), requestId: message.readInt32LE(4), @@ -142,7 +146,12 @@ function processMessage(stream, message, callback) { new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) ); - callback(); + if (buffer.length >= 4) { + processIncomingData(stream, callback); + } else { + callback(); + } + return; } @@ -176,7 +185,11 @@ function processMessage(stream, message, callback) { new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) ); - callback(); + if (buffer.length >= 4) { + processIncomingData(stream, callback); + } else { + callback(); + } }); } diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index 6487dcf57e..aed67e2c4e 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -8,7 +8,12 @@ const expect = require('chai').expect; function bufferToStream(buffer) { const stream = new Readable(); - stream.push(buffer); + if (Array.isArray(buffer)) { + buffer.forEach(b => stream.push(b)); + } else { + stream.push(buffer); + } + stream.push(null); return stream; } @@ -24,6 +29,31 @@ describe('Message Stream', function() { ), documents: [{ ismaster: 1 }] }, + { + description: 'valid multiple OP_REPLY', + expectedMessageCount: 4, + data: Buffer.from( + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', + 'hex' + ), + documents: [{ ismaster: 1 }] + }, + { + description: 'valid OP_REPLY (partial)', + data: [ + Buffer.from('37', 'hex'), + Buffer.from('0000', 'hex'), + Buffer.from( + '000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', + 'hex' + ) + ], + documents: [{ ismaster: 1 }] + }, + { description: 'valid OP_MSG', data: Buffer.from( @@ -32,6 +62,19 @@ describe('Message Stream', function() { ), documents: [{ $db: 'admin', ismaster: 1 }] }, + { + description: 'valid multiple OP_MSG', + expectedMessageCount: 4, + data: Buffer.from( + '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + + '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + + '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + + '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000', + 'hex' + ), + documents: [{ $db: 'admin', ismaster: 1 }] + }, + { description: 'Invalid message size (negative)', data: Buffer.from('ffffffff', 'hex'), @@ -46,10 +89,13 @@ describe('Message Stream', function() { it(test.description, function(done) { const bson = new BSON(); const error = test.error; + const expectedMessageCount = test.expectedMessageCount || 1; const inputStream = bufferToStream(test.data); const messageStream = new MessageStream({ bson }); + let messageCount = 0; messageStream.on('message', msg => { + messageCount++; if (error) { done(new Error(`expected error: ${error}`)); return; @@ -63,7 +109,9 @@ describe('Message Stream', function() { .that.deep.equals(test.documents); } - done(); + if (messageCount === expectedMessageCount) { + done(); + } }); messageStream.on('error', err => {