Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refined Topic Alias support. (Implement #1300) #1301

Merged
merged 5 commits into from
Oct 4, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 37 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,41 @@ the final connection when it drops.
The default value is 1000 ms which means it will try to reconnect 1 second
after losing the connection.

<a name="topicalias"></a>
## About Topic Alias management
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: caps on Management

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.


### Enabling automatic Topic Alias using
If the client sets the option `autoUseTopicAlias:true` then MQTT.js uses existing topic alias automatically.

example scenario:
```
1. PUBLISH topic:'t1', ta:1 (register)
2. PUBLISH topic:'t1' -> topic:'', ta:1 (auto use existing map entry)
3. PUBLISH topic:'t2', ta:1 (register overwrite)
4. PUBLISH topic:'t2' -> topic:'', ta:1 (auto use existing map entry based on the receent map)
5. PUBLISH topic:'t1' (t1 is no longer mapped to ta:1)
```

User doesn't need to manage which topic is mapped to which topic alias.
If the user want to register topic alias, then publish topic with topic alias.
If the user want to use topic alias, then publish topic without topic alias. If there is a mapped topic alias then added it as a property and update the topic to empty string.

### Enabling automatic Topic Alias assign

If the client sets the option `autoAssignTopicAlias:true` then MQTT.js uses existing topic alias automatically.
If no topic alias exists, then assign a new vacant topic alias automatically. If topic alias is fully used, then LRU(Least Recently Used) topic-alias entry is overwritten.

example scenario:
```
The broker returns CONNACK (TopicAliasMaximum:3)
1. PUBLISH topic:'t1' -> 't1', ta:1 (auto assign t1:1 and register)
2. PUBLISH topic:'t1' -> '' , ta:1 (auto use existing map entry)
3. PUBLISH topic:'t2' -> 't2', ta:2 (auto assign t1:2 and register. 2 was vacant)
4. PUBLISH topic:'t3' -> 't3', ta:3 (auto assign t1:3 and register. 3 was vacant)
5. PUBLISH topic:'t4' -> 't4', ta:1 (LRU entry is overwritten)
```

Also user can manually register topic-alias pair using PUBLISH topic:'some', ta:X. It works well with automatic topic alias assign.

<a name="api"></a>
## API
Expand Down Expand Up @@ -291,6 +325,8 @@ the `connect` event. Typically a `net.Socket`.
```js
customHandleAcks: function(topic, message, packet, done) {/*some logic wit colling done(error, reasonCode)*/}
```
* `autoUseTopicAlias`: enabling automatic Topic Alias using functionality
* `autoAssignTopicAlias`: enabling automatic Topic Alias assign functionality
* `properties`: properties MQTT 5.0.
`object` that supports the following properties:
* `sessionExpiryInterval`: representing the Session Expiry Interval in seconds `number`,
Expand Down Expand Up @@ -661,7 +697,7 @@ npm install browserify
npm install tinyify
cd node_modules/mqtt/
npm install .
npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag
npx browserify mqtt.js -s mqtt >browserMqtt.js // use script tag
# show size for compressed browser transfer
gzip <browserMqtt.js | wc -c
```
Expand Down
191 changes: 172 additions & 19 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
*/
var EventEmitter = require('events').EventEmitter
var Store = require('./store')
var TopicAliasRecv = require('./topic-alias-recv')
var TopicAliasSend = require('./topic-alias-send')
var mqttPacket = require('mqtt-packet')
var DefaultMessageIdProvider = require('./default-message-id-provider')
var Writable = require('readable-stream').Writable
var inherits = require('inherits')
var reInterval = require('reinterval')
var clone = require('rfdc/default')
var validations = require('./validations')
var xtend = require('xtend')
var debug = require('debug')('mqttjs:client')
Expand Down Expand Up @@ -88,9 +91,86 @@ function defaultId () {
return 'mqttjs_' + Math.random().toString(16).substr(2, 8)
}

function applyTopicAlias (client, packet) {
if (client.options.protocolVersion === 5) {
if (packet.cmd === 'publish') {
var alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
var topic = packet.topic.toString()
if (client.topicAliasSend) {
if (alias) {
if (topic.length !== 0) {
// register topic alias
debug('applyTopicAlias :: register topic: %s - alias: %d', topic, alias)
if (!client.topicAliasSend.put(topic, alias)) {
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
return new Error('Sending Topic Alias out of range')
}
}
} else {
if (topic.length !== 0) {
if (client.options.autoAssignTopicAlias) {
alias = client.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = {...(packet.properties), topicAlias: alias}
debug('applyTopicAlias :: auto assign(use) topic: %s - alias: %d', topic, alias)
} else {
alias = client.topicAliasSend.getLruAlias()
client.topicAliasSend.put(topic, alias)
packet.properties = {...(packet.properties), topicAlias: alias}
debug('applyTopicAlias :: auto assign topic: %s - alias: %d', topic, alias)
}
} else if (client.options.autoUseTopicAlias) {
alias = client.topicAliasSend.getAliasByTopic(topic)
if (alias) {
packet.topic = ''
packet.properties = {...(packet.properties), topicAlias: alias}
debug('applyTopicAlias :: auto use topic: %s - alias: %d', topic, alias)
}
}
}
}
} else if (alias) {
debug('applyTopicAlias :: error out of range. topic: %s - alias: %d', topic, alias)
return new Error('Sending Topic Alias out of range')
}
}
}
}

function removeTopicAlias (client, packet) {
// remove topic alias because it shouldn't be used on re-sending
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a bit of a better comment here. What's the use for this function? Is it removing a topic alias because some other topic has overridden the topic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a kind of utility function for internal use.
I think that the commend should be removed here.
This function removes Topic Alias property and recover Topic Name from the Topic Alias.
Perhaps the function name should be removeTopicAliasAndRecoverTopicName. Is this too long?
I think that the comment should be written the caller side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

var alias
if (packet.properties) {
alias = packet.properties.topicAlias
}

var topic = packet.topic.toString()
if (topic.length === 0) {
// restore topic from alias
if (typeof alias === 'undefined') {
return new Error('Unregistered Topic Alias')
} else {
topic = client.topicAliasSend.getTopicByAlias(alias)
if (typeof topic === 'undefined') {
return new Error('Unregistered Topic Alias')
} else {
packet.topic = topic
}
}
}
if (alias) {
delete packet.properties.topicAlias
}
}

function sendPacket (client, packet, cb) {
debug('sendPacket :: packet: %O', packet)
debug('sendPacket :: emitting `packetsend`')

client.emit('packetsend', packet)

debug('sendPacket :: writing to stream')
Expand Down Expand Up @@ -131,7 +211,16 @@ function flushVolatile (queue) {

function storeAndSend (client, packet, cb, cbStorePut) {
debug('storeAndSend :: store packet with cmd %s to outgoingStore', packet.cmd)
client.outgoingStore.put(packet, function storedPacket (err) {
var storePacket = packet
var err
if (storePacket.cmd === 'publish') {
storePacket = clone(packet)
YoDaMa marked this conversation as resolved.
Show resolved Hide resolved
err = removeTopicAlias(client, storePacket)
if (err) {
return cb && cb(err)
}
}
client.outgoingStore.put(storePacket, function storedPacket (err) {
if (err) {
return cb && cb(err)
}
Expand Down Expand Up @@ -176,6 +265,7 @@ function MqttClient (streamBuilder, options) {
debug('MqttClient :: options.keepalive', options.keepalive)
debug('MqttClient :: options.reconnectPeriod', options.reconnectPeriod)
debug('MqttClient :: options.rejectUnauthorized', options.rejectUnauthorized)
debug('MqttClient :: options.topicAliasMaximum', options.topicAliasMaximum)

this.options.clientId = (typeof options.clientId === 'string') ? options.clientId : defaultId()

Expand Down Expand Up @@ -225,6 +315,14 @@ function MqttClient (streamBuilder, options) {
// True if connection is first time.
this._firstConnection = true

if (options.topicAliasMaximum > 0) {
if (options.topicAliasMaximum > 0xffff) {
debug('MqttClient :: options.topicAliasMaximum is out of range')
} else {
this.topicAliasRecv = new TopicAliasRecv(options.topicAliasMaximum)
}
}

// Send queued packets
this.on('connect', function () {
var queue = this.queue
Expand Down Expand Up @@ -282,6 +380,10 @@ function MqttClient (streamBuilder, options) {
that.pingTimer = null
}

if (this.topicAliasRecv) {
this.topicAliasRecv.clear()
}

debug('close :: calling _setupReconnect')
this._setupReconnect()
})
Expand Down Expand Up @@ -378,6 +480,14 @@ MqttClient.prototype._setupStream = function () {
debug('_setupStream: sending packet `connect`')
connectPacket = Object.create(this.options)
connectPacket.cmd = 'connect'
if (this.topicAliasRecv) {
if (!connectPacket.properties) {
connectPacket.properties = {}
}
if (this.topicAliasRecv) {
connectPacket.properties.topicAliasMaximum = this.topicAliasRecv.max
}
}
// avoid message queue
sendPacket(this, connectPacket)

Expand Down Expand Up @@ -526,17 +636,6 @@ MqttClient.prototype.publish = function (topic, message, opts, callback) {

if (options.protocolVersion === 5) {
packet.properties = opts.properties
if ((!options.properties && packet.properties && packet.properties.topicAlias) || ((opts.properties && options.properties) &&
((opts.properties.topicAlias && options.properties.topicAliasMaximum && opts.properties.topicAlias > options.properties.topicAliasMaximum) ||
(!options.properties.topicAliasMaximum && opts.properties.topicAlias)))) {
/*
if we are don`t setup topic alias or
topic alias maximum less than topic alias or
server don`t give topic alias maximum,
we are removing topic alias from packet
*/
delete packet.properties.topicAlias
}
}

debug('publish :: qos', opts.qos)
Expand Down Expand Up @@ -1102,6 +1201,13 @@ MqttClient.prototype._cleanUp = function (forced, done) {
MqttClient.prototype._sendPacket = function (packet, cb, cbStorePut) {
debug('_sendPacket :: (%s) :: start', this.options.clientId)
cbStorePut = cbStorePut || nop
cb = cb || nop

var err = applyTopicAlias(this, packet)
if (err) {
cb(err)
return
}

if (!this.connected) {
debug('_sendPacket :: client not connected. Storing packet offline.')
Expand Down Expand Up @@ -1154,12 +1260,20 @@ MqttClient.prototype._storePacket = function (packet, cb, cbStorePut) {
debug('_storePacket :: cb? %s', !!cb)
cbStorePut = cbStorePut || nop

var storePacket = packet
if (storePacket.cmd === 'publish') {
storePacket = clone(packet)
var err = removeTopicAlias(this, storePacket)
if (err) {
return cb && cb(err)
}
}
// check that the packet is not a qos of 0, or that the command is not a publish
if (((packet.qos || 0) === 0 && this.queueQoSZero) || packet.cmd !== 'publish') {
this.queue.push({ packet: packet, cb: cb })
} else if (packet.qos > 0) {
cb = this.outgoing[packet.messageId] ? this.outgoing[packet.messageId].cb : null
this.outgoingStore.put(packet, function (err) {
if (((storePacket.qos || 0) === 0 && this.queueQoSZero) || storePacket.cmd !== 'publish') {
this.queue.push({ packet: storePacket, cb: cb })
} else if (storePacket.qos > 0) {
cb = this.outgoing[storePacket.messageId] ? this.outgoing[storePacket.messageId].cb : null
this.outgoingStore.put(storePacket, function (err) {
if (err) {
return cb && cb(err)
}
Expand Down Expand Up @@ -1237,11 +1351,17 @@ MqttClient.prototype._handleConnack = function (packet) {
var rc = version === 5 ? packet.reasonCode : packet.returnCode

clearTimeout(this.connackTimer)
delete this.topicAliasSend

if (packet.properties) {
if (packet.properties.topicAliasMaximum) {
if (!options.properties) { options.properties = {} }
options.properties.topicAliasMaximum = packet.properties.topicAliasMaximum
if (packet.properties.topicAliasMaximum > 0xffff) {
this.emit('error', new Error('topicAliasMaximum from broker is out of range'))
return
}
if (packet.properties.topicAliasMaximum > 0) {
this.topicAliasSend = new TopicAliasSend(packet.properties.topicAliasMaximum)
}
}
if (packet.properties.serverKeepAlive && options.keepalive) {
options.keepalive = packet.properties.serverKeepAlive
Expand Down Expand Up @@ -1303,6 +1423,39 @@ MqttClient.prototype._handlePublish = function (packet, done) {
var that = this
var options = this.options
var validReasonCodes = [0, 16, 128, 131, 135, 144, 145, 151, 153]
if (this.options.protocolVersion === 5) {
var alias
if (packet.properties) {
alias = packet.properties.topicAlias
}
if (typeof alias !== 'undefined') {
if (topic.length === 0) {
if (alias > 0 && alias <= 0xffff) {
var gotTopic = this.topicAliasRecv.getTopicByAlias(alias)
if (gotTopic) {
topic = gotTopic
debug('_handlePublish :: topic complemented by alias. topic: %s - alias: %d', topic, alias)
} else {
debug('_handlePublish :: unregistered topic alias. alias: %d', alias)
this.emit('error', new Error('Received unregistered Topic Alias'))
return
}
} else {
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
this.emit('error', new Error('Received Topic Alias is out of range'))
return
}
} else {
if (this.topicAliasRecv.put(topic, alias)) {
debug('_handlePublish :: registered topic: %s - alias: %d', topic, alias)
} else {
debug('_handlePublish :: topic alias out of range. alias: %d', alias)
this.emit('error', new Error('Received Topic Alias is out of range'))
return
}
}
}
}
debug('_handlePublish: qos %d', qos)
switch (qos) {
case 2: {
Expand Down
47 changes: 47 additions & 0 deletions lib/topic-alias-recv.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
'use strict'

/**
* Topic Alias receiving manager
* This holds alias to topic map
* @param {Number} [max] - topic alias maximum entries
*/
function TopicAliasRecv (max) {
if (!(this instanceof TopicAliasRecv)) {
return new TopicAliasRecv(max)
}
this.aliasToTopic = {}
this.max = max
}

/**
* Insert or update topic - alias entry.
* @param {String} [topic] - topic
* @param {Number} [alias] - topic alias
* @returns {Boolean} - if success return true otherwise false
*/
TopicAliasRecv.prototype.put = function (topic, alias) {
if (alias === 0 || alias > this.max) {
return false
}
this.aliasToTopic[alias] = topic
this.length = Object.keys(this.aliasToTopic).length
return true
}

/**
* Get topic by alias
* @param {String} [topic] - topic
* @returns {Number} - if mapped topic exists return topic alias, otherwise return undefined
*/
TopicAliasRecv.prototype.getTopicByAlias = function (alias) {
return this.aliasToTopic[alias]
}

/**
* Clear all entries
*/
TopicAliasRecv.prototype.clear = function () {
this.aliasToTopic = {}
}

module.exports = TopicAliasRecv
Loading