diff --git a/README.md b/README.md index d7f3b46a..d99d7772 100644 --- a/README.md +++ b/README.md @@ -79,6 +79,8 @@ Options: [instance.authorizePublish()](#authorizePublish). * `authorizeSubscribe`: function used to authorize SUBSCRIBE packets, see [instance.authorizeSubscribe()](#authorizeSubscribe). +* `authorizeForward`: function used to authorize forwarded packets, see + [instance.authorizeForward()](#authorizeForward). * `published`: function called when a new packet is published, see [instance.published()](#published). @@ -201,6 +203,27 @@ instance.authorizeSubscribe = function (client, sub, cb) { callback(null, sub) } ``` +------------------------------------------------------- + +### instance.authorizeForward(clientId, packet, done(err)) + +It will be called when a client is set to recieve a message. Override to supply custom +authorization logic. + +```js +instance.authorizeForward = function (clientId, packet, callback) { + if (packet.topic === 'aaaa' && clientId === "I should not see this") { + return callback(new Error('client not allowed to recieve mesages on this topic')) + } + + if (packet.topic === 'bbb') { + packet.payload = new Buffer('overwrite packet payload') + } + + callback(null) +} +``` + ------------------------------------------------------- ### instance.published(packet, client, done()) diff --git a/aedes.js b/aedes.js index b873be5e..e216f04e 100644 --- a/aedes.js +++ b/aedes.js @@ -22,6 +22,7 @@ var defaultOptions = { authenticate: defaultAuthenticate, authorizePublish: defaultAuthorizePublish, authorizeSubscribe: defaultAuthorizeSubscribe, + authorizeForward: defaultAuthorizeForward, published: defaultPublished } @@ -52,6 +53,7 @@ function Aedes (opts) { this.authenticate = opts.authenticate this.authorizePublish = opts.authorizePublish this.authorizeSubscribe = opts.authorizeSubscribe + this.authorizeForward = opts.authorizeForward this.published = opts.published this.clients = {} @@ -292,6 +294,10 @@ function defaultAuthorizeSubscribe (client, sub, callback) { callback(null, sub) } +function defaultAuthorizeForward (client, packet, callback) { + callback(null) +} + function defaultPublished (packet, client, callback) { callback(null) } diff --git a/lib/client.js b/lib/client.js index ac587e01..b9d14678 100644 --- a/lib/client.js +++ b/lib/client.js @@ -69,20 +69,32 @@ function Client (broker, conn) { this._eos = eos(this.conn, this.close.bind(this)) this.deliver0 = function deliverQoS0 (_packet, cb) { - var packet = new Packet(_packet, broker) - packet.qos = 0 - write(that, packet, cb) + that.broker.authorizeForward(that, _packet, function (err) { + if (err != null) { + // todo add an error message + return + } + var packet = new Packet(_packet, broker) + packet.qos = 0 + write(that, packet, cb) + }) } this.deliverQoS = function deliverQoS (_packet, cb) { - // downgrade to qos0 if requested by publish - if (_packet.qos === 0) { - that.deliver0(_packet, cb) - } else { - var packet = new QoSPacket(_packet, that) - packet.writeCallback = cb - broker.persistence.outgoingUpdate(that, packet, writeQoS) - } + that.broker.authorizeForward(that, _packet, function (err) { + if (err != null) { + // todo add an error message + return + } + // downgrade to qos0 if requested by publish + if (_packet.qos === 0) { + that.deliver0(_packet, cb) + } else { + var packet = new QoSPacket(_packet, that) + packet.writeCallback = cb + broker.persistence.outgoingUpdate(that, packet, writeQoS) + } + }) } this._keepaliveTimer = null diff --git a/test/auth.js b/test/auth.js index 98f877e6..da6ce657 100644 --- a/test/auth.js +++ b/test/auth.js @@ -415,3 +415,90 @@ test('set authentication method in config options', function (t) { keepalive: 0 }) }) +test('change a topic name inside authorizeForward method', function (t) { + t.plan(3) + + var broker = aedes({ + authorizeForward: function (client, packet, cb) { + packet.payload = new Buffer('another-world') + cb(null) + } + }) + var expected = { + cmd: 'publish', + topic: 'hello', + payload: new Buffer('another-world'), + dup: false, + length: 20, + qos: 0, + retain: false + } + + broker.on('client', function (client) { + client.subscribe({ + topic: 'hello', + qos: 0 + }, function (err) { + t.error(err, 'no error') + + broker.publish({ + topic: 'hello', + payload: new Buffer('world'), + qos: 0 + }, function (err) { + t.error(err, 'no error') + }) + }) + }) + + var s = connect(setup(broker)) + + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + }) +}) + +test('change a topic name inside authorizeForward method QOS 1', function (t) { + t.plan(3) + + var broker = aedes({ + authorizeForward: function (client, packet, cb) { + packet.payload = new Buffer('another-world') + packet.messageId = 2 + cb(null) + } + }) + var expected = { + cmd: 'publish', + topic: 'hello', + payload: new Buffer('another-world'), + dup: false, + length: 22, + qos: 1, + retain: false, + messageId: 2 + } + + broker.on('client', function (client) { + client.subscribe({ + topic: 'hello', + qos: 1 + }, function (err) { + t.error(err, 'no error') + + broker.publish({ + topic: 'hello', + payload: new Buffer('world'), + qos: 1 + }, function (err) { + t.error(err, 'no error') + }) + }) + }) + + var s = connect(setup(broker)) + + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + }) +})