Skip to content

Commit

Permalink
Merge 462695f into ae63c0e
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando committed Apr 9, 2020
2 parents ae63c0e + 462695f commit dec64a7
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
2 changes: 2 additions & 0 deletions docs/Aedes.md
Expand Up @@ -369,6 +369,8 @@ aedes.authorizeSubscribe = function (client, sub, callback) {
}
```
__ATTENTION__: When a subscription is negated, your client will receive back a SubAck with `qos: 128` that means `Failure` and your client will never receive packets published to that topic.
## Handler: authorizeForward (client, packet)
- client: [`<Client>`](./Client.md)
Expand Down
22 changes: 14 additions & 8 deletions lib/handlers/subscribe.js
Expand Up @@ -19,6 +19,7 @@ const restoreTopicActions = fastfall([
function SubAck (packet, granted) {
this.cmd = 'suback'
this.messageId = packet.messageId
// the qos granted
this.granted = granted
}

Expand All @@ -27,7 +28,7 @@ function Subscription (qos, func) {
this.func = func
}

function SubscribeState (client, packet, restore, finish, granted) {
function SubscribeState (client, packet, restore, finish) {
this.client = client
this.packet = packet
this.actions = restore ? restoreTopicActions : subscribeTopicActions
Expand Down Expand Up @@ -56,11 +57,11 @@ function _dedupe (subs) {
}

function handleSubscribe (client, packet, restore, done) {
const subs = packet.subscriptions
packet.subscriptions = packet.subscriptions.length === 1 ? packet.subscriptions : _dedupe(packet.subscriptions)
client.broker._parallel(
new SubscribeState(client, packet, restore, done),
doSubscribe,
subs.length === 1 ? subs : _dedupe(subs),
packet.subscriptions,
restore ? done : completeSubscribe)
}

Expand Down Expand Up @@ -92,6 +93,7 @@ function blockDollarSignTopics (func) {

function storeSubscriptions (sub, done) {
if (!sub || typeof sub !== 'object') {
// means failure: MQTT 3.1.1 specs > 3.9.3 Payload
this.granted = 128
return done(null, null)
}
Expand Down Expand Up @@ -161,22 +163,26 @@ function completeSubscribe (err) {
} else {
done()
}
this.subState = []

const broker = client.broker
const subs = packet.subscriptions

var topics = []

for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
subs.qos = this.subState[i].granted
}

this.subState = []

broker.emit('subscribe', subs, client)

// Conform to MQTT 3.1.1 section 3.1.2.4
// Restored sessions should not contain any retained message.
// Retained message should be only fetched from SUBSCRIBE.

const persistence = broker.persistence
var topics = []
for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
}
const stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
packet = new Packet({
Expand Down

0 comments on commit dec64a7

Please sign in to comment.