Skip to content
Permalink
Browse files Browse the repository at this point in the history
Do not stack overflow if a TCP frame contains too many PUBLISH
  • Loading branch information
mcollina committed Dec 9, 2017
1 parent 800a4dd commit 403ba53
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 3 deletions.
10 changes: 7 additions & 3 deletions lib/client.js
Expand Up @@ -249,12 +249,16 @@ MqttClient.prototype._setupStream = function () {
packets.push(packet)
})

function process () {
function nextTickWork () {
process.nextTick(work)
}

function work () {
var packet = packets.shift()
var done = completeParse

if (packet) {
that._handlePacket(packet, process)
that._handlePacket(packet, nextTickWork)
} else {
completeParse = null
done()
Expand All @@ -264,7 +268,7 @@ MqttClient.prototype._setupStream = function () {
writable._write = function (buf, enc, done) {
completeParse = done
parser.parse(buf)
process()
work()
}

this.stream.pipe(writable)
Expand Down
51 changes: 51 additions & 0 deletions test/client.js
Expand Up @@ -7,6 +7,9 @@ var path = require('path')
var abstractClientTests = require('./abstract_client')
var net = require('net')
var eos = require('end-of-stream')
var mqttPacket = require('mqtt-packet')
var Buffer = require('safe-buffer').Buffer
var Duplex = require('readable-stream').Duplex
var Connection = require('mqtt-connection')
var Server = require('./server')
var port = 9876
Expand Down Expand Up @@ -148,6 +151,54 @@ describe('MqttClient', function () {
})
})
})

it('should not go overflow if the TCP frame contains a lot of PUBLISH packets', function (done) {
var parser = mqttPacket.parser()
var count = 0
var max = 1000
var duplex = new Duplex({
read: function (n) {},
write: function (chunk, enc, cb) {
parser.parse(chunk)
cb() // nothing to do
}
})
var client = new mqtt.MqttClient(function () {
return duplex
}, {})

client.on('message', function (t, p, packet) {
if (++count === max) {
done()
}
})

parser.on('packet', function (packet) {
var packets = []

if (packet.cmd === 'connect') {
duplex.push(mqttPacket.generate({
cmd: 'connack',
sessionPresent: false,
returnCode: 0
}))

for (var i = 0; i < max; i++) {
packets.push(mqttPacket.generate({
cmd: 'publish',
topic: Buffer.from('hello'),
payload: Buffer.from('world'),
retain: false,
dup: false,
messageId: i + 1,
qos: 1
}))
}

duplex.push(Buffer.concat(packets))
}
})
})
})

describe('reconnecting', function () {
Expand Down

0 comments on commit 403ba53

Please sign in to comment.