Skip to content

Commit

Permalink
Merge 9c66cfb into e3aa214
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando committed Apr 21, 2021
2 parents e3aa214 + 9c66cfb commit 136a3fa
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 11 deletions.
16 changes: 10 additions & 6 deletions aedes.js
Expand Up @@ -12,6 +12,7 @@ const Packet = require('aedes-packet')
const memory = require('aedes-persistence')
const mqemitter = require('mqemitter')
const Client = require('./lib/client')
const { $SYS_PREFIX } = require('./lib/utils')

module.exports = Aedes.Server = Aedes

Expand Down Expand Up @@ -78,7 +79,7 @@ function Aedes (opts) {
this.clients = {}
this.brokers = {}

const heartbeatTopic = '$SYS/' + that.id + '/heartbeat'
const heartbeatTopic = $SYS_PREFIX + that.id + '/heartbeat'
this._heartbeatInterval = setInterval(heartbeat, opts.heartbeatInterval)

const bufId = Buffer.from(that.id, 'utf8')
Expand Down Expand Up @@ -138,12 +139,12 @@ function Aedes (opts) {
}
}

this.mq.on('$SYS/+/heartbeat', function storeBroker (packet, done) {
this.mq.on($SYS_PREFIX + '+/heartbeat', function storeBroker (packet, done) {
that.brokers[packet.payload.toString()] = Date.now()
done()
})

this.mq.on('$SYS/+/new/clients', function closeSameClients (packet, done) {
this.mq.on($SYS_PREFIX + '+/new/clients', function closeSameClients (packet, done) {
const serverId = packet.topic.split('/')[1]
const clientId = packet.payload.toString()

Expand Down Expand Up @@ -206,7 +207,7 @@ function DoEnqueues () {
return
}

if (that.topic.indexOf('$SYS') === 0) {
if (that.topic.indexOf($SYS_PREFIX) === 0) {
subs = subs.filter(removeSharp)
}

Expand Down Expand Up @@ -281,7 +282,7 @@ Aedes.prototype._finishRegisterClient = function (client) {
this.clients[client.id] = client
this.emit('client', client)
this.publish({
topic: '$SYS/' + this.id + '/new/clients',
topic: $SYS_PREFIX + this.id + '/new/clients',
payload: Buffer.from(client.id, 'utf8')
}, noop)
}
Expand All @@ -291,7 +292,7 @@ Aedes.prototype.unregisterClient = function (client) {
delete this.clients[client.id]
this.emit('clientDisconnect', client)
this.publish({
topic: '$SYS/' + this.id + '/disconnect/clients',
topic: $SYS_PREFIX + this.id + '/disconnect/clients',
payload: Buffer.from(client.id, 'utf8')
}, noop)
}
Expand Down Expand Up @@ -326,6 +327,9 @@ function defaultAuthenticate (client, username, password, callback) {
}

function defaultAuthorizePublish (client, packet, callback) {
if (packet.topic.startsWith($SYS_PREFIX)) {
return callback(new Error($SYS_PREFIX + ' topic is reserved'))
}
callback(null)
}

Expand Down
11 changes: 11 additions & 0 deletions docs/Aedes.md
Expand Up @@ -315,6 +315,17 @@ aedes.authorizePublish = function (client, packet, callback) {
}
```
By default `authorizePublish` throws errors in case a client publish to `$SYS/` topics to prevent possible DoS (see #597). If you write your own implementation of authorizePublish we suggest you to add a check for this. Default:
```js
function defaultAuthorizePublish (client, packet, callback) {
if (packet.topic.startsWith($SYS_PREFIX)) {
return callback(new Error($SYS_PREFIX + ' topic is reserved'))
}
callback(null)
}
```
## Handler: authorizeSubscribe (client, subscription, callback)
- client: [`<Client>`](./Client.md)
Expand Down
4 changes: 2 additions & 2 deletions lib/handlers/subscribe.js
Expand Up @@ -3,7 +3,7 @@
const fastfall = require('fastfall')
const Packet = require('aedes-packet')
const { through } = require('../utils')
const { validateTopic } = require('../utils')
const { validateTopic, $SYS_PREFIX } = require('../utils')
const write = require('../write')

const subscribeTopicActions = fastfall([
Expand Down Expand Up @@ -207,7 +207,7 @@ function completeSubscribe (err) {

broker.emit('subscribe', subs, client)
broker.publish({
topic: '$SYS/' + broker.id + '/new/subscribes',
topic: $SYS_PREFIX + broker.id + '/new/subscribes',
payload: Buffer.from(JSON.stringify({
clientId: client.id,
subs: subs
Expand Down
4 changes: 2 additions & 2 deletions lib/handlers/unsubscribe.js
@@ -1,7 +1,7 @@
'use strict'

const write = require('../write')
const { validateTopic } = require('../utils')
const { validateTopic, $SYS_PREFIX } = require('../utils')

function UnSubAck (packet) {
this.cmd = 'unsuback'
Expand Down Expand Up @@ -90,7 +90,7 @@ function completeUnsubscribe (err) {
if ((!client.closed || client.clean === true) && packet.unsubscriptions.length > 0) {
client.broker.emit('unsubscribe', packet.unsubscriptions, client)
client.broker.publish({
topic: '$SYS/' + client.broker.id + '/new/unsubscribes',
topic: $SYS_PREFIX + client.broker.id + '/new/unsubscribes',
payload: Buffer.from(JSON.stringify({
clientId: client.id,
subs: packet.unsubscriptions
Expand Down
3 changes: 2 additions & 1 deletion lib/utils.js
Expand Up @@ -39,5 +39,6 @@ function through (transform) {

module.exports = {
validateTopic,
through
through,
$SYS_PREFIX: '$SYS/'
}
17 changes: 17 additions & 0 deletions test/basic.js
Expand Up @@ -95,6 +95,23 @@ test('publish empty topic throws error', function (t) {
})
})

test('publish to $SYS topic throws error', function (t) {
t.plan(1)

const s = connect(setup())
t.teardown(s.broker.close.bind(s.broker))

s.inStream.write({
cmd: 'publish',
topic: '$SYS/not/allowed',
payload: 'world'
})

s.broker.on('clientError', function (client, err) {
t.pass('should emit error')
})
})

;[{ qos: 0, clean: false }, { qos: 0, clean: true }, { qos: 1, clean: false }, { qos: 1, clean: true }].forEach(function (ele) {
test('subscribe a single topic in QoS ' + ele.qos + ' [clean=' + ele.clean + ']', function (t) {
t.plan(5)
Expand Down

0 comments on commit 136a3fa

Please sign in to comment.