Skip to content

Commit

Permalink
fix: problem with publish callback invoked twice (#1635)
Browse files Browse the repository at this point in the history
  • Loading branch information
ogis-yamazaki committed Jul 14, 2023
1 parent debb7d9 commit 79b23a8
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 12 deletions.
46 changes: 34 additions & 12 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1047,11 +1047,12 @@ class MqttClient extends EventEmitter {
* @example client.removeOutgoingMessage(client.getLastAllocated());
*/
removeOutgoingMessage (messageId) {
const cb = this.outgoing[messageId] ? this.outgoing[messageId].cb : null
delete this.outgoing[messageId]
this.outgoingStore.del({ messageId }, function () {
cb(new Error('Message removed'))
})
if (this.outgoing[messageId]) {
const cb = this.outgoing[messageId].cb
this._removeOutgoingAndStoreMessage(messageId, function () {
cb(new Error('Message removed'))
})
}
return this
}

Expand Down Expand Up @@ -1625,12 +1626,13 @@ class MqttClient extends EventEmitter {
if (pubackRC && pubackRC > 0 && pubackRC !== 16) {
err = new Error('Publish error: ' + errors[pubackRC])
err.code = pubackRC
cb(err, packet)
this._removeOutgoingAndStoreMessage(messageId, function () {
cb(err, packet)
})
} else {
this._removeOutgoingAndStoreMessage(messageId, cb)
}
delete this.outgoing[messageId]
this.outgoingStore.del(packet, cb)
this.messageIdProvider.deallocate(messageId)
this._invokeStoreProcessingQueue()

break
}
case 'pubrec': {
Expand All @@ -1644,7 +1646,9 @@ class MqttClient extends EventEmitter {
if (pubrecRC && pubrecRC > 0 && pubrecRC !== 16) {
err = new Error('Publish error: ' + errors[pubrecRC])
err.code = pubrecRC
cb(err, packet)
this._removeOutgoingAndStoreMessage(messageId, function () {
cb(err, packet)
})
} else {
this._sendPacket(response)
}
Expand Down Expand Up @@ -1888,7 +1892,9 @@ class MqttClient extends EventEmitter {
}

_invokeStoreProcessingQueue () {
if (this._storeProcessingQueue.length > 0) {
// If _storeProcessing is true, the message is resending.
// During resend, processing is skipped to prevent new messages from interrupting. #1635
if (!this._storeProcessing && this._storeProcessingQueue.length > 0) {
const f = this._storeProcessingQueue[0]
if (f && f.invoke()) {
this._storeProcessingQueue.shift()
Expand All @@ -1909,6 +1915,22 @@ class MqttClient extends EventEmitter {
}
this._storeProcessingQueue.splice(0)
}

/**
* _removeOutgoingAndStoreMessage
* @param {Number} messageId - messageId to remove message
* @param {Function} cb - called when the message removed
* @api private
*/
_removeOutgoingAndStoreMessage (messageId, cb) {
const self = this
delete this.outgoing[messageId]
self.outgoingStore.del({ messageId }, function (err, packet) {
cb(err, packet)
self.messageIdProvider.deallocate(messageId)
self._invokeStoreProcessingQueue()
})
}
}

module.exports = MqttClient
106 changes: 106 additions & 0 deletions test/abstract_client.js
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,57 @@ module.exports = function (server, config) {
})
})

it('should fire a callback (qos 1) on error', function (done) {
// 145 = Packet Identifier in use
const pubackReasonCode = 145
const pubOpts = { qos: 1 }
let client = null

const server2 = serverBuilder(config.protocol, function (serverClient) {
serverClient.on('connect', function () {
const connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
serverClient.connack(connack)
})
serverClient.on('publish', function (packet) {
if (packet.qos === 1) {
if (version === 5) {
serverClient.puback({
messageId: packet.messageId,
reasonCode: pubackReasonCode
})
} else {
serverClient.puback({ messageId: packet.messageId })
}
}
})
})

server2.listen(ports.PORTAND72, function () {
client = connect({
port: ports.PORTAND72,
host: 'localhost',
clean: true,
clientId: 'cid1',
reconnectPeriod: 0
})

client.once('connect', function () {
client.publish('a', 'b', pubOpts, function (err) {
if (version === 5) {
assert.strictEqual(err.code, pubackReasonCode)
} else {
assert.ifError(err)
}
setImmediate(function () {
client.end(() => {
server2.close(done())
})
})
})
})
})
})

it('should fire a callback (qos 2)', function (done) {
const client = connect()
const opts = { qos: 2 }
Expand All @@ -1025,6 +1076,61 @@ module.exports = function (server, config) {
})
})

it('should fire a callback (qos 2) on error', function (done) {
// 145 = Packet Identifier in use
const pubrecReasonCode = 145
const pubOpts = { qos: 2 }
let client = null

const server2 = serverBuilder(config.protocol, function (serverClient) {
serverClient.on('connect', function () {
const connack = version === 5 ? { reasonCode: 0 } : { returnCode: 0 }
serverClient.connack(connack)
})
serverClient.on('publish', function (packet) {
if (packet.qos === 2) {
if (version === 5) {
serverClient.pubrec({
messageId: packet.messageId,
reasonCode: pubrecReasonCode
})
} else {
serverClient.pubrec({ messageId: packet.messageId })
}
}
})
serverClient.on('pubrel', function (packet) {
if (!serverClient.writable) return false
serverClient.pubcomp(packet)
})
})

server2.listen(ports.PORTAND103, function () {
client = connect({
port: ports.PORTAND103,
host: 'localhost',
clean: true,
clientId: 'cid1',
reconnectPeriod: 0
})

client.once('connect', function () {
client.publish('a', 'b', pubOpts, function (err) {
if (version === 5) {
assert.strictEqual(err.code, pubrecReasonCode)
} else {
assert.ifError(err)
}
setImmediate(function () {
client.end(true, () => {
server2.close(done())
})
})
})
})
})
})

it('should support UTF-8 characters in topic', function (done) {
const client = connect()

Expand Down

0 comments on commit 79b23a8

Please sign in to comment.