Skip to content

Commit

Permalink
fix(message-stream): support multiple inbound message packets
Browse files Browse the repository at this point in the history
If there was enough data buffered to read multiple messages, the
MessageStream would currently callback to the underlaying Writable
callback multiple times. Also fixed here is that the callback would
not be called in the event that there wasn't enough data to process
a single message

NODE-2437
  • Loading branch information
mbroadst committed Jan 27, 2020
1 parent fa4b01b commit 8388443
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 31 deletions.
71 changes: 42 additions & 29 deletions lib/cmap/message_stream.js
Expand Up @@ -39,32 +39,7 @@ class MessageStream extends Duplex {
const buffer = this[kBuffer];
buffer.append(chunk);

while (buffer.length >= 4) {
const sizeOfMessage = buffer.readInt32LE(0);
if (sizeOfMessage < 0) {
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
return;
}

if (sizeOfMessage > this.maxBsonMessageSize) {
callback(
new MongoParseError(
`Invalid message size: ${sizeOfMessage}, max allowed: ${this.maxBsonMessageSize}`
)
);
return;
}

if (sizeOfMessage > buffer.length) {
callback();
return;
}

const messageBuffer = buffer.slice(0, sizeOfMessage);
buffer.consume(sizeOfMessage);

processMessage(this, messageBuffer, callback);
}
processIncomingData(this, callback);
}

_read(/* size */) {
Expand Down Expand Up @@ -125,7 +100,36 @@ function canCompress(command) {
return !uncompressibleCommands.has(commandName);
}

function processMessage(stream, message, callback) {
function processIncomingData(stream, callback) {
const buffer = stream[kBuffer];
if (buffer.length < 4) {
callback();
return;
}

const sizeOfMessage = buffer.readInt32LE(0);
if (sizeOfMessage < 0) {
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
return;
}

if (sizeOfMessage > stream.maxBsonMessageSize) {
callback(
new MongoParseError(
`Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}`
)
);
return;
}

if (sizeOfMessage > buffer.length) {
callback();
return;
}

const message = buffer.slice(0, sizeOfMessage);
buffer.consume(sizeOfMessage);

const messageHeader = {
length: message.readInt32LE(0),
requestId: message.readInt32LE(4),
Expand All @@ -142,7 +146,12 @@ function processMessage(stream, message, callback) {
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
);

callback();
if (buffer.length >= 4) {
processIncomingData(stream, callback);
} else {
callback();
}

return;
}

Expand Down Expand Up @@ -176,7 +185,11 @@ function processMessage(stream, message, callback) {
new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions)
);

callback();
if (buffer.length >= 4) {
processIncomingData(stream, callback);
} else {
callback();
}
});
}

Expand Down
52 changes: 50 additions & 2 deletions test/unit/cmap/message_stream.test.js
Expand Up @@ -8,7 +8,12 @@ const expect = require('chai').expect;

function bufferToStream(buffer) {
const stream = new Readable();
stream.push(buffer);
if (Array.isArray(buffer)) {
buffer.forEach(b => stream.push(b));
} else {
stream.push(buffer);
}

stream.push(null);
return stream;
}
Expand All @@ -24,6 +29,31 @@ describe('Message Stream', function() {
),
documents: [{ ismaster: 1 }]
},
{
description: 'valid multiple OP_REPLY',
expectedMessageCount: 4,
data: Buffer.from(
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' +
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' +
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000' +
'370000000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000',
'hex'
),
documents: [{ ismaster: 1 }]
},
{
description: 'valid OP_REPLY (partial)',
data: [
Buffer.from('37', 'hex'),
Buffer.from('0000', 'hex'),
Buffer.from(
'000100000001000000010000000000000000000000000000000000000001000000130000001069736d6173746572000100000000',
'hex'
)
],
documents: [{ ismaster: 1 }]
},

{
description: 'valid OP_MSG',
data: Buffer.from(
Expand All @@ -32,6 +62,19 @@ describe('Message Stream', function() {
),
documents: [{ $db: 'admin', ismaster: 1 }]
},
{
description: 'valid multiple OP_MSG',
expectedMessageCount: 4,
data: Buffer.from(
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' +
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' +
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000' +
'370000000100000000000000dd0700000000000000220000001069736d6173746572000100000002246462000600000061646d696e0000',
'hex'
),
documents: [{ $db: 'admin', ismaster: 1 }]
},

{
description: 'Invalid message size (negative)',
data: Buffer.from('ffffffff', 'hex'),
Expand All @@ -46,10 +89,13 @@ describe('Message Stream', function() {
it(test.description, function(done) {
const bson = new BSON();
const error = test.error;
const expectedMessageCount = test.expectedMessageCount || 1;
const inputStream = bufferToStream(test.data);
const messageStream = new MessageStream({ bson });

let messageCount = 0;
messageStream.on('message', msg => {
messageCount++;
if (error) {
done(new Error(`expected error: ${error}`));
return;
Expand All @@ -63,7 +109,9 @@ describe('Message Stream', function() {
.that.deep.equals(test.documents);
}

done();
if (messageCount === expectedMessageCount) {
done();
}
});

messageStream.on('error', err => {
Expand Down

0 comments on commit 8388443

Please sign in to comment.