Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
language: node_js
sudo: false
node_js:
- 7
- 6
- 5
- 4
- iojs-v3
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ will emit:
Parse a given `Buffer` and emits synchronously all the MQTT packets that
are included. Returns the number of bytes left to parse.

If an error happens, an `error` event will be emitted, but no `packet` events
will be emitted after that. Calling `parse()` again clears the error and
previous buffer as if you created a new `Parser`.

Packets
-------

Expand Down
78 changes: 46 additions & 32 deletions parser.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,37 +10,35 @@ function Parser() {
return new Parser()
}

this._list = bl()
this._newPacket()

this._states = [
'_parseHeader'
, '_parseLength'
, '_parsePayload'
, '_newPacket'
]
this._stateCounter = 0

this._resetState()
}

inherits(Parser, EE)

Parser.prototype._newPacket = function () {
if (this.packet) {
this._list.consume(this.packet.length)
this.emit('packet', this.packet)
}

Parser.prototype._resetState = function () {
this.packet = new Packet()

return true
this.error = null
this._list = bl()
this._stateCounter = 0
}

Parser.prototype.parse = function (buf) {
if (this.error) {
this._resetState()
}

this._list.append(buf)

while ((this.packet.length != -1 || this._list.length > 0) &&
this[this._states[this._stateCounter]]()) {
this[this._states[this._stateCounter]]() &&
!this.error) {
this._stateCounter++

if (this._stateCounter >= this._states.length) {
Expand Down Expand Up @@ -140,7 +138,7 @@ Parser.prototype._parsePayload = function () {
// these are empty, nothing to do
break
default:
this.emit('error', new Error('not supported'))
this._emitError(new Error('not supported'))
}

result = true
Expand All @@ -162,29 +160,29 @@ Parser.prototype._parseConnect = function () {
// Parse constants id
protocolId = this._parseString()
if (protocolId === null)
return this.emit('error', new Error('cannot parse protocol id'))
return this._emitError(new Error('cannot parse protocol id'))

if (protocolId != 'MQTT' && protocolId != 'MQIsdp') {

return this.emit('error', new Error('invalid protocol id'))
return this._emitError(new Error('invalid protocol id'))
}

packet.protocolId = protocolId

// Parse constants version number
if(this._pos >= this._list.length)
return this.emit('error', new Error('packet too short'))
return this._emitError(new Error('packet too short'))

packet.protocolVersion = this._list.readUInt8(this._pos)

if(packet.protocolVersion != 3 && packet.protocolVersion != 4) {

return this.emit('error', new Error('invalid protocol version'))
return this._emitError(new Error('invalid protocol version'))
}

this._pos++
if(this._pos >= this._list.length)
return this.emit('error', new Error('packet too short'))
return this._emitError(new Error('packet too short'))

// Parse connect flags
flags.username = (this._list.readUInt8(this._pos) & constants.USERNAME_MASK)
Expand All @@ -204,41 +202,41 @@ Parser.prototype._parseConnect = function () {
// Parse keepalive
packet.keepalive = this._parseNum()
if(packet.keepalive === -1)
return this.emit('error', new Error('packet too short'))
return this._emitError(new Error('packet too short'))

// Parse client ID
clientId = this._parseString()
if(clientId === null)
return this.emit('error', new Error('packet too short'))
return this._emitError(new Error('packet too short'))
packet.clientId = clientId

if (flags.will) {
// Parse will topic
topic = this._parseString()
if (topic === null)
return this.emit('error', new Error('cannot parse will topic'))
return this._emitError(new Error('cannot parse will topic'))
packet.will.topic = topic

// Parse will payload
payload = this._parseBuffer()
if (payload === null)
return this.emit('error', new Error('cannot parse will payload'))
return this._emitError(new Error('cannot parse will payload'))
packet.will.payload = payload
}

// Parse username
if (flags.username) {
username = this._parseString()
if(username === null)
return this.emit('error', new Error('cannot parse username'))
return this._emitError(new Error('cannot parse username'))
packet.username = username
}

// Parse password
if(flags.password) {
password = this._parseBuffer()
if(password === null)
return this.emit('error', new Error('cannot parse username'))
return this._emitError(new Error('cannot parse username'))
packet.password = password
}

Expand All @@ -252,15 +250,15 @@ Parser.prototype._parseConnack = function () {
packet.sessionPresent = !!(this._list.readUInt8(this._pos++) & constants.SESSIONPRESENT_MASK)
packet.returnCode = this._list.readUInt8(this._pos)
if(packet.returnCode === -1)
return this.emit('error', new Error('cannot parse return code'))
return this._emitError(new Error('cannot parse return code'))
}

Parser.prototype._parsePublish = function () {
var packet = this.packet
packet.topic = this._parseString()

if(packet.topic === null)
return this.emit('error', new Error('cannot parse topic'))
return this._emitError(new Error('cannot parse topic'))

// Parse message ID
if (packet.qos > 0) {
Expand All @@ -276,7 +274,7 @@ Parser.prototype._parseSubscribe = function() {
, qos

if (packet.qos != 1) {
return this.emit('error', new Error('wrong subscribe header'))
return this._emitError(new Error('wrong subscribe header'))
}

packet.subscriptions = []
Expand All @@ -288,7 +286,7 @@ Parser.prototype._parseSubscribe = function() {
// Parse topic
topic = this._parseString()
if (topic === null)
return this.emit('error', new Error('Parse error - cannot parse topic'))
return this._emitError(new Error('Parse error - cannot parse topic'))

qos = this._list.readUInt8(this._pos++)

Expand Down Expand Up @@ -322,7 +320,7 @@ Parser.prototype._parseUnsubscribe = function() {
// Parse topic
topic = this._parseString()
if (topic === null)
return this.emit('error', new Error('cannot parse topic'))
return this._emitError(new Error('cannot parse topic'))

// Push topic to unsubscriptions
packet.unsubscriptions.push(topic);
Expand All @@ -331,7 +329,7 @@ Parser.prototype._parseUnsubscribe = function() {

Parser.prototype._parseUnsuback = function() {
if (!this._parseMessageId())
return this.emit('error', new Error('cannot parse message id'))
return this._emitError(new Error('cannot parse message id'))
}

Parser.prototype._parseMessageId = function() {
Expand All @@ -340,7 +338,7 @@ Parser.prototype._parseMessageId = function() {
packet.messageId = this._parseNum()

if(packet.messageId === null) {
this.emit('error', new Error('cannot parse message id'))
this._emitError(new Error('cannot parse message id'))
return false
}

Expand Down Expand Up @@ -385,4 +383,20 @@ Parser.prototype._parseNum = function() {
return result
}

Parser.prototype._newPacket = function () {
if (this.packet) {
this._list.consume(this.packet.length)
this.emit('packet', this.packet)
}

this.packet = new Packet()

return true
}

Parser.prototype._emitError = function(err) {
this.error = err
this.emit('error', err)
}

module.exports = Parser
77 changes: 77 additions & 0 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ function testParseError(expected, fixture) {
t.equal(err.message, expected, 'expected error message')
})

parser.on('packet', function() {
t.fail('parse errors should not be followed by packet events')
})

parser.parse(fixture)
})
}
Expand Down Expand Up @@ -916,3 +920,76 @@ testParseError('cannot parse protocol id', new Buffer([
77, 81, 73, 115, 100, 112,
77, 81, 73, 115, 100, 112
]))

test('stops parsing after first error', function(t) {
t.plan(4)

var parser = mqtt.parser()

var packetCount = 0
var errorCount = 0
var expectedPackets = 1
var expectedErrors = 1

parser.on('packet', function(packet) {
t.ok(++packetCount <= expectedPackets, 'expected <= ' + expectedPackets + ' packets')
})

parser.on('error', function(err) {
t.ok(++errorCount <= expectedErrors, 'expected <= ' + expectedErrors + ' errors')
})

parser.parse(new Buffer([
// first, a valid connect packet:

16, 12, // Header
0, 4, // Protocol id length
77, 81, 84, 84, // Protocol id
4, // Protocol version
2, // Connect flags
0, 30, // Keepalive
0, 0, //Client id length

// then an invalid subscribe packet:

128, 9, // Header (subscribe, qos=0, length=9)
0, 6, // message id (6)
0, 4, // topic length,
116, 101, 115, 116, // Topic (test)
0, // qos (0)

// and another invalid subscribe packet:

128, 9, // Header (subscribe, qos=0, length=9)
0, 6, // message id (6)
0, 4, // topic length,
116, 101, 115, 116, // Topic (test)
0, // qos (0)

// finally, a valid disconnect packet:

224, 0, // Header
]))

// calling parse again clears the error and continues parsing
packetCount = 0
errorCount = 0
expectedPackets = 2
expectedErrors = 0

parser.parse(new Buffer([
// connect:

16, 12, // Header
0, 4, // Protocol id length
77, 81, 84, 84, // Protocol id
4, // Protocol version
2, // Connect flags
0, 30, // Keepalive
0, 0, //Client id length

// disconnect:

224, 0, // Header
]))
})