Skip to content

Commit

Permalink
Merge pull request #96 from embertec-eng/varbyteint-fixes-#95
Browse files Browse the repository at this point in the history
Fix: error handling for Variable Byte Integer #95
  • Loading branch information
Yoseph Maguire committed Nov 19, 2020
2 parents 2dcc70c + 909cbd3 commit b6de745
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 11 deletions.
1 change: 1 addition & 0 deletions constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ protocol.RETAIN_MASK = 0x01
/* Length */
protocol.VARBYTEINT_MASK = 0x7F
protocol.VARBYTEINT_FIN_MASK = 0x80
protocol.VARBYTEINT_MAX = 268435455

/* Connack */
protocol.SESSIONPRESENT_MASK = 0x01
Expand Down
9 changes: 6 additions & 3 deletions parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,6 @@ class Parser extends EventEmitter {
if (result) {
this.packet.length = result.value
this._list.consume(result.bytes)
} else {
this._emitError(new Error('Invalid length'))
}
debug('_parseLength %d', result.value)
return !!result
Expand Down Expand Up @@ -542,14 +540,15 @@ class Parser extends EventEmitter {

_parseVarByteNum (fullInfoFlag) {
debug('_parseVarByteNum')
const maxBytes = 4
let bytes = 0
let mul = 1
let value = 0
let result = false
let current
const padding = this._pos ? this._pos : 0

while (bytes < 5) {
while (bytes < maxBytes) {
current = this._list.readUInt8(padding + bytes++)
value += mul * (current & constants.VARBYTEINT_MASK)
mul *= 0x80
Expand All @@ -563,6 +562,10 @@ class Parser extends EventEmitter {
}
}

if (!result && bytes === maxBytes && this._list.length >= bytes) {
this._emitError(new Error('Invalid variable byte integer'))
}

if (padding) {
this._pos += bytes
}
Expand Down
157 changes: 150 additions & 7 deletions test.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
const util = require('util')

const test = require('tape')
const mqtt = require('./')
const WS = require('readable-stream').Writable
Expand Down Expand Up @@ -32,7 +34,24 @@ function testParseGenerate (name, object, buffer, opts) {
})

test(`${name} generate`, t => {
t.equal(mqtt.generate(object, opts).toString('hex'), buffer.toString('hex'))
// For really large buffers, the expanded hex string can be so long as to
// generate an error in nodejs 14.x, so only do the test with extra output
// for relatively small buffers.
const bigLength = 10000
const generatedBuffer = mqtt.generate(object, opts)
if (generatedBuffer.length < bigLength && buffer.length < bigLength) {
t.equal(generatedBuffer.toString('hex'), buffer.toString('hex'))
} else {
const bufferOkay = generatedBuffer.equals(buffer)
if (bufferOkay) {
t.pass()
} else {
// Output abbreviated representations of the buffers.
t.comment('Expected:\n' + util.inspect(buffer))
t.comment('Got:\n' + util.inspect(generatedBuffer))
t.fail('Large buffers not equal')
}
}
t.end()
})

Expand Down Expand Up @@ -205,9 +224,26 @@ test('disabled numbers cache', t => {
testGenerateError('Unknown command', {})

testParseError('Not supported', Buffer.from([0, 1, 0]), {})
testParseError('Invalid length', Buffer.from(

// Length header field
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 128]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255, 1]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255, 127]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255, 128]
), {})
testParseError('Invalid variable byte integer', Buffer.from(
[16, 255, 255, 255, 255, 255, 1]
), {})

testParseGenerate('minimal connect', {
cmd: 'connect',
Expand Down Expand Up @@ -1134,6 +1170,31 @@ testParseGenerate('publish MQTT 5 properties with 0-4 byte varbyte', {
116, 101, 115, 116 // Payload (test)
]), { protocolVersion: 5 })

testParseGenerate('publish MQTT 5 properties with max value varbyte', {
cmd: 'publish',
retain: true,
qos: 2,
dup: true,
length: 22,
topic: 'test',
payload: Buffer.from('test'),
messageId: 10,
properties: {
payloadFormatIndicator: false,
subscriptionIdentifier: [1, 268435455]
}
}, Buffer.from([
61, 22, // Header
0, 4, // Topic length
116, 101, 115, 116, // Topic (test)
0, 10, // Message ID
9, // properties length
1, 0, // payloadFormatIndicator
11, 1, // subscriptionIdentifier
11, 255, 255, 255, 127, // subscriptionIdentifier (max value)
116, 101, 115, 116 // Payload (test)
]), { protocolVersion: 5 })

; (() => {
const buffer = Buffer.alloc(2048)
testParseGenerate('2KB publish packet', {
Expand All @@ -1149,18 +1210,21 @@ testParseGenerate('publish MQTT 5 properties with 0-4 byte varbyte', {
0, 4, // Topic length
116, 101, 115, 116 // Topic (test)
]), buffer]))
})(); (() => {
const buffer = Buffer.alloc(2 * 1024 * 1024)
testParseGenerate('2MB publish packet', {
})()

; (() => {
const maxLength = 268435455
const buffer = Buffer.alloc(maxLength - 6)
testParseGenerate('Max payload publish packet', {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: 6 + 2 * 1024 * 1024,
length: maxLength,
topic: 'test',
payload: buffer
}, Buffer.concat([Buffer.from([
48, 134, 128, 128, 1, // Header
48, 255, 255, 255, 127, // Header
0, 4, // Topic length
116, 101, 115, 116 // Topic (test)
]), buffer]))
Expand Down Expand Up @@ -1250,6 +1314,85 @@ test('splitted publish parse', t => {
])), 0, 'remaining bytes')
})

test('split publish longer', t => {
t.plan(3)

const length = 255
const topic = 'test'
// Minus two bytes for the topic length specifier
const payloadLength = length - topic.length - 2

const parser = mqtt.parser()
const expected = {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: length,
topic: topic,
payload: Buffer.from('a'.repeat(payloadLength))
}

parser.on('packet', packet => {
t.deepLooseEqual(packet, expected, 'expected packet')
})

t.equal(parser.parse(Buffer.from([
48, 255, 1, // Header
0, topic.length, // Topic length
116, 101, 115, 116 // Topic (test)
])), 6, 'remaining bytes')

t.equal(parser.parse(Buffer.from(Array(payloadLength).fill(97))),
0, 'remaining bytes')
})

test('split length parse', t => {
t.plan(4)

const length = 255
const topic = 'test'
const payloadLength = length - topic.length - 2

const parser = mqtt.parser()
const expected = {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: length,
topic: topic,
payload: Buffer.from('a'.repeat(payloadLength))
}

parser.on('packet', packet => {
t.deepLooseEqual(packet, expected, 'expected packet')
})

t.equal(parser.parse(Buffer.from([
48, 255 // Header (partial length)
])), 1, 'remaining bytes')

t.equal(parser.parse(Buffer.from([
1, // Rest of header length
0, topic.length, // Topic length
116, 101, 115, 116 // Topic (test)
])), 6, 'remaining bytes')

t.equal(parser.parse(Buffer.from(Array(payloadLength).fill(97))),
0, 'remaining bytes')
})

testGenerateError('Invalid variable byte integer: 268435456', {
cmd: 'publish',
retain: false,
qos: 0,
dup: false,
length: (268435455 + 1),
topic: 'test',
payload: Buffer.alloc(268435455 + 1 - 6)
}, {}, 'Length var byte integer over max allowed value throws error')

testGenerateError('Invalid subscriptionIdentifier: 268435456', {
cmd: 'publish',
retain: true,
Expand Down
7 changes: 6 additions & 1 deletion writeToStream.js
Original file line number Diff line number Diff line change
Expand Up @@ -780,14 +780,19 @@ function auth (packet, stream, opts) {

const varByteIntCache = {}
function writeVarByteInt (stream, num) {
if (num > protocol.VARBYTEINT_MAX) {
stream.emit('error', new Error(`Invalid variable byte integer: ${num}`))
return false
}

let buffer = varByteIntCache[num]

if (!buffer) {
buffer = genBufVariableByteInt(num)
if (num < 16384) varByteIntCache[num] = buffer
}
debug('writeVarByteInt: writing to stream: %o', buffer)
stream.write(buffer)
return stream.write(buffer)
}

/**
Expand Down

0 comments on commit b6de745

Please sign in to comment.