Skip to content

Commit

Permalink
fix: setImmediate polyfill (#1626)
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando committed Jul 3, 2023
1 parent c8aa654 commit 0ed0754
Showing 1 changed file with 30 additions and 29 deletions.
59 changes: 30 additions & 29 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@ const xtend = require('xtend')
const debug = require('debug')('mqttjs:client')
const nextTick = process ? process.nextTick : function (callback) { setTimeout(callback, 0) }
const setImmediate = global.setImmediate || function (callback) {
// works in node v0.8
nextTick(callback)
const args = arguments.slice(1)
process.nextTick(callback.bind(null, ...args))
}

const defaultConnectOptions = {
keepalive: 60,
reschedulePings: true,
Expand Down Expand Up @@ -89,11 +90,11 @@ const errors = {
162: 'Wildcard Subscriptions not supported'
}

function defaultId () {
function defaultId() {
return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
}

function applyTopicAlias (client, packet) {
function applyTopicAlias(client, packet) {
if (client.options.protocolVersion === 5) {
if (packet.cmd === 'publish') {
let alias
Expand Down Expand Up @@ -143,7 +144,7 @@ function applyTopicAlias (client, packet) {
}
}

function removeTopicAliasAndRecoverTopicName (client, packet) {
function removeTopicAliasAndRecoverTopicName(client, packet) {
let alias
if (packet.properties) {
alias = packet.properties.topicAlias
Expand All @@ -168,7 +169,7 @@ function removeTopicAliasAndRecoverTopicName (client, packet) {
}
}

function sendPacket (client, packet, cb) {
function sendPacket(client, packet, cb) {
debug('sendPacket :: packet: %O', packet)
debug('sendPacket :: emitting `packetsend`')

Expand All @@ -186,7 +187,7 @@ function sendPacket (client, packet, cb) {
}
}

function flush (queue) {
function flush(queue) {
if (queue) {
debug('flush: queue exists? %b', !!(queue))
Object.keys(queue).forEach(function (messageId) {
Expand All @@ -200,7 +201,7 @@ function flush (queue) {
}
}

function flushVolatile (queue) {
function flushVolatile(queue) {
if (queue) {
debug('flushVolatile :: deleting volatile messages from the queue and setting their callbacks as error function')
Object.keys(queue).forEach(function (messageId) {
Expand All @@ -212,7 +213,7 @@ function flushVolatile (queue) {
}
}

function storeAndSend (client, packet, cb, cbStorePut) {
function storeAndSend(client, packet, cb, cbStorePut) {
debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd)
let storePacket = packet
let err
Expand All @@ -226,7 +227,7 @@ function storeAndSend (client, packet, cb, cbStorePut) {
return cb && cb(err)
}
}
client.outgoingStore.put(storePacket, function storedPacket (err) {
client.outgoingStore.put(storePacket, function storedPacket(err) {
if (err) {
return cb && cb(err)
}
Expand All @@ -235,7 +236,7 @@ function storeAndSend (client, packet, cb, cbStorePut) {
})
}

function nop (error) {
function nop(error) {
debug('nop ::', error)
}

Expand All @@ -246,7 +247,7 @@ function nop (error) {
* @param {Object} [options] - connection options
* (see Connection#connect)
*/
function MqttClient (streamBuilder, options) {
function MqttClient(streamBuilder, options) {
let k
const that = this

Expand Down Expand Up @@ -338,7 +339,7 @@ function MqttClient (streamBuilder, options) {
this.on('connect', function () {
const queue = that.queue

function deliver () {
function deliver() {
const entry = queue.shift()
debug('deliver :: entry %o', entry)
let packet = null
Expand Down Expand Up @@ -426,7 +427,7 @@ MqttClient.prototype._setupStream = function () {
packets.push(packet)
})

function nextTickWork () {
function nextTickWork() {
if (packets.length) {
nextTick(work)
} else {
Expand All @@ -436,7 +437,7 @@ MqttClient.prototype._setupStream = function () {
}
}

function work () {
function work() {
debug('work :: getting next packet in queue')
const packet = packets.shift()

Expand All @@ -459,7 +460,7 @@ MqttClient.prototype._setupStream = function () {
work()
}

function streamErrorHandler (error) {
function streamErrorHandler(error) {
debug('streamErrorHandler :: error', error.message)
if (socketErrors.includes(error.code)) {
// handle error
Expand Down Expand Up @@ -747,7 +748,7 @@ MqttClient.prototype.subscribe = function () {
debug('subscribe: array topic %s', topic)
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, topic) ||
that._resubscribeTopics[topic].qos < opts.qos ||
resubscribe) {
resubscribe) {
const currentOpts = {
topic: topic,
qos: opts.qos
Expand All @@ -769,7 +770,7 @@ MqttClient.prototype.subscribe = function () {
debug('subscribe: object topic %s', k)
if (!Object.prototype.hasOwnProperty.call(that._resubscribeTopics, k) ||
that._resubscribeTopics[k].qos < obj[k].qos ||
resubscribe) {
resubscribe) {
const currentOpts = {
topic: k,
qos: obj[k].qos
Expand Down Expand Up @@ -988,7 +989,7 @@ MqttClient.prototype.end = function (force, opts, cb) {
debug('end :: cb? %s', !!cb)
cb = cb || nop

function closeStores () {
function closeStores() {
debug('end :: closeStores: closing incoming and outgoing stores')
that.disconnected = true
that.incomingStore.close(function (e1) {
Expand All @@ -1007,7 +1008,7 @@ MqttClient.prototype.end = function (force, opts, cb) {
}
}

function finish () {
function finish() {
// defer closesStores of an I/O cycle,
// just to make sure things are
// ok for websockets
Expand Down Expand Up @@ -1263,7 +1264,7 @@ MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut, noStore) {
* anyway it will result in -1 evaluation
*/
case 0:
/* falls through */
/* falls through */
default:
sendPacket(this, packet, cb)
break
Expand Down Expand Up @@ -1622,7 +1623,7 @@ MqttClient.prototype._handleAck = function (packet) {
debug('_handleAck :: packet type', type)
switch (type) {
case 'pubcomp':
// same thing as puback for QoS 2
// same thing as puback for QoS 2
case 'puback': {
const pubackRC = packet.reasonCode
// Callback - we're done
Expand Down Expand Up @@ -1685,7 +1686,7 @@ MqttClient.prototype._handleAck = function (packet) {
}

if (this.disconnecting &&
Object.keys(this.outgoing).length === 0) {
Object.keys(this.outgoing).length === 0) {
this.emit('outgoingEmpty')
}
}
Expand Down Expand Up @@ -1754,8 +1755,8 @@ MqttClient.prototype._resubscribe = function () {
debug('_resubscribe')
const _resubscribeTopicsKeys = Object.keys(this._resubscribeTopics)
if (!this._firstConnection &&
(this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) &&
_resubscribeTopicsKeys.length > 0) {
(this.options.clean || (this.options.protocolVersion === 5 && !this.connackPacket.sessionPresent)) &&
_resubscribeTopicsKeys.length > 0) {
if (this.options.resubscribe) {
if (this.options.protocolVersion === 5) {
debug('_resubscribe: protocolVersion 5')
Expand Down Expand Up @@ -1796,10 +1797,10 @@ MqttClient.prototype._onConnect = function (packet) {

this.connected = true

function startStreamProcess () {
function startStreamProcess() {
let outStore = that.outgoingStore.createStream()

function clearStoreProcessing () {
function clearStoreProcessing() {
that._storeProcessing = false
that._packetIdsDuringStoreProcessing = {}
}
Expand All @@ -1812,14 +1813,14 @@ MqttClient.prototype._onConnect = function (packet) {
that.emit('error', err)
})

function remove () {
function remove() {
outStore.destroy()
outStore = null
that._flushStoreProcessingQueue()
clearStoreProcessing()
}

function storeDeliver () {
function storeDeliver() {
// edge case, we wrapped this twice
if (!outStore) {
return
Expand Down

0 comments on commit 0ed0754

Please sign in to comment.