From 83884432ce86bf320aae5aec45c92e451430911c Mon Sep 17 00:00:00 2001 From: Matt Broadstone Date: Mon, 27 Jan 2020 08:57:22 -0500 Subject: [PATCH] fix(message-stream): support multiple inbound message packets If there was enough data buffered to read multiple messages, the MessageStream would currently callback to the underlaying Writable callback multiple times. Also fixed here is that the callback would not be called in the event that there wasn't enough data to process a single message NODE-2437 --- lib/cmap/message_stream.js | 71 ++++++++++++++++----------- test/unit/cmap/message_stream.test.js | 52 +++++++++++++++++++- 2 files changed, 92 insertions(+), 31 deletions(-) diff --git a/lib/cmap/message_stream.js b/lib/cmap/message_stream.js index 4c64895724..c8f458e53a 100644 --- a/lib/cmap/message_stream.js +++ b/lib/cmap/message_stream.js @@ -39,32 +39,7 @@ class MessageStream extends Duplex { const buffer = this[kBuffer]; buffer.append(chunk); - while (buffer.length >= 4) { - const sizeOfMessage = buffer.readInt32LE(0); - if (sizeOfMessage < 0) { - callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); - return; - } - - if (sizeOfMessage > this.maxBsonMessageSize) { - callback( - new MongoParseError( - `Invalid message size: ${sizeOfMessage}, max allowed: ${this.maxBsonMessageSize}` - ) - ); - return; - } - - if (sizeOfMessage > buffer.length) { - callback(); - return; - } - - const messageBuffer = buffer.slice(0, sizeOfMessage); - buffer.consume(sizeOfMessage); - - processMessage(this, messageBuffer, callback); - } + processIncomingData(this, callback); } _read(/* size */) { @@ -125,7 +100,36 @@ function canCompress(command) { return !uncompressibleCommands.has(commandName); } -function processMessage(stream, message, callback) { +function processIncomingData(stream, callback) { + const buffer = stream[kBuffer]; + if (buffer.length < 4) { + callback(); + return; + } + + const sizeOfMessage = buffer.readInt32LE(0); + if (sizeOfMessage < 0) { + callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); + return; + } + + if (sizeOfMessage > stream.maxBsonMessageSize) { + callback( + new MongoParseError( + `Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}` + ) + ); + return; + } + + if (sizeOfMessage > buffer.length) { + callback(); + return; + } + + const message = buffer.slice(0, sizeOfMessage); + buffer.consume(sizeOfMessage); + const messageHeader = { length: message.readInt32LE(0), requestId: message.readInt32LE(4), @@ -142,7 +146,12 @@ function processMessage(stream, message, callback) { new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) ); - callback(); + if (buffer.length >= 4) { + processIncomingData(stream, callback); + } else { + callback(); + } + return; } @@ -176,7 +185,11 @@ function processMessage(stream, message, callback) { new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) ); - callback(); + if (buffer.length >= 4) { + processIncomingData(stream, callback); + } else { + callback(); + } }); } diff --git a/test/unit/cmap/message_stream.test.js b/test/unit/cmap/message_stream.test.js index 6487dcf57e..aed67e2c4e 100644 --- a/test/unit/cmap/message_stream.test.js +++ b/test/unit/cmap/message_stream.test.js @@ -8,7 +8,12 @@ const expect = require('chai').expect; function bufferToStream(buffer) { const stream = new Readable(); - stream.push(buffer); + if (Array.isArray(buffer)) { + buffer.forEach(b => stream.push(b)); + } else { + stream.push(buffer); + } + stream.push(null); return stream; } @@ -24,6 +29,31 @@ describe('Message Stream', function() { ), documents: [{ ismaster: 1 }] }, + { + description: 'valid multiple OP_REPLY', + expectedMessageCount: 4, + data: Buffer.from( + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' + + '370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', + 'hex' + ), + documents: [{ ismaster: 1 }] + }, + { + description: 'valid OP_REPLY (partial)', + data: [ + Buffer.from('37', 'hex'), + Buffer.from('0000', 'hex'), + Buffer.from( + '000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000', + 'hex' + ) + ], + documents: [{ ismaster: 1 }] + }, + { description: 'valid OP_MSG', data: Buffer.from( @@ -32,6 +62,19 @@ describe('Message Stream', function() { ), documents: [{ $db: 'admin', ismaster: 1 }] }, + { + description: 'valid multiple OP_MSG', + expectedMessageCount: 4, + data: Buffer.from( + '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + + '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + + '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' + + '370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000', + 'hex' + ), + documents: [{ $db: 'admin', ismaster: 1 }] + }, + { description: 'Invalid message size (negative)', data: Buffer.from('ffffffff', 'hex'), @@ -46,10 +89,13 @@ describe('Message Stream', function() { it(test.description, function(done) { const bson = new BSON(); const error = test.error; + const expectedMessageCount = test.expectedMessageCount || 1; const inputStream = bufferToStream(test.data); const messageStream = new MessageStream({ bson }); + let messageCount = 0; messageStream.on('message', msg => { + messageCount++; if (error) { done(new Error(`expected error: ${error}`)); return; @@ -63,7 +109,9 @@ describe('Message Stream', function() { .that.deep.equals(test.documents); } - done(); + if (messageCount === expectedMessageCount) { + done(); + } }); messageStream.on('error', err => {