diff --git a/docs/Aedes.md b/docs/Aedes.md index 5926fcd0..7c952e28 100644 --- a/docs/Aedes.md +++ b/docs/Aedes.md @@ -369,6 +369,8 @@ aedes.authorizeSubscribe = function (client, sub, callback) { } ``` +__ATTENTION__: When a subscription is negated, your client will receive back a SubAck with `qos: 128` that means `Failure` and your client will never receive packets published to that topic. + ## Handler: authorizeForward (client, packet) - client: [``](./Client.md) diff --git a/lib/handlers/subscribe.js b/lib/handlers/subscribe.js index 886eab1b..3131eba5 100644 --- a/lib/handlers/subscribe.js +++ b/lib/handlers/subscribe.js @@ -19,6 +19,7 @@ const restoreTopicActions = fastfall([ function SubAck (packet, granted) { this.cmd = 'suback' this.messageId = packet.messageId + // the qos granted this.granted = granted } @@ -27,7 +28,7 @@ function Subscription (qos, func) { this.func = func } -function SubscribeState (client, packet, restore, finish, granted) { +function SubscribeState (client, packet, restore, finish) { this.client = client this.packet = packet this.actions = restore ? restoreTopicActions : subscribeTopicActions @@ -56,11 +57,11 @@ function _dedupe (subs) { } function handleSubscribe (client, packet, restore, done) { - const subs = packet.subscriptions + packet.subscriptions = packet.subscriptions.length === 1 ? packet.subscriptions : _dedupe(packet.subscriptions) client.broker._parallel( new SubscribeState(client, packet, restore, done), doSubscribe, - subs.length === 1 ? subs : _dedupe(subs), + packet.subscriptions, restore ? done : completeSubscribe) } @@ -92,6 +93,7 @@ function blockDollarSignTopics (func) { function storeSubscriptions (sub, done) { if (!sub || typeof sub !== 'object') { + // means failure: MQTT 3.1.1 specs > 3.9.3 Payload this.granted = 128 return done(null, null) } @@ -161,11 +163,19 @@ function completeSubscribe (err) { } else { done() } - this.subState = [] const broker = client.broker const subs = packet.subscriptions + var topics = [] + + for (var i = 0; i < subs.length; i++) { + topics.push(subs[i].topic) + subs.qos = this.subState[i].granted + } + + this.subState = [] + broker.emit('subscribe', subs, client) // Conform to MQTT 3.1.1 section 3.1.2.4 @@ -173,10 +183,6 @@ function completeSubscribe (err) { // Retained message should be only fetched from SUBSCRIBE. const persistence = broker.persistence - var topics = [] - for (var i = 0; i < subs.length; i++) { - topics.push(subs[i].topic) - } const stream = persistence.createRetainedStreamCombi(topics) stream.pipe(through.obj(function sendRetained (packet, enc, cb) { packet = new Packet({