Skip to content

Commit

Permalink
Merge 7eef5a7 into ae63c0e
Browse files Browse the repository at this point in the history
  • Loading branch information
robertsLando committed Apr 10, 2020
2 parents ae63c0e + 7eef5a7 commit 27aa4ca
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 13 deletions.
4 changes: 2 additions & 2 deletions docs/Aedes.md
Expand Up @@ -138,7 +138,7 @@ Emitted when `client` sends a `PINGREQ`.

Emitted when `client` successfully subscribe the `subscriptions` in server.

`subscriptions` are an array of `{ topic: topic, qos: qos }`
`subscriptions` is an array of `{ topic: topic, qos: qos }`. The array excludes duplicated topics and includes negated subscriptions where `qos` equals to `128`. See more on [authorizeSubscribe](#handler-authorizesubscribe-client-subscription-callback)

## Event: unsubscribe

Expand Down Expand Up @@ -360,7 +360,7 @@ aedes.authorizeSubscribe = function (client, sub, callback) {
}
```
To negate a subscription, set the subscription to `null`:
To negate a subscription, set the subscription to `null`. Aedes ignores the negated subscription and the `qos` in `SubAck` is set to `128` based on [MQTT 3.11 spec](https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html#_Toc385349323):
```js
aedes.authorizeSubscribe = function (client, sub, callback) {
Expand Down
22 changes: 14 additions & 8 deletions lib/handlers/subscribe.js
Expand Up @@ -19,6 +19,7 @@ const restoreTopicActions = fastfall([
function SubAck (packet, granted) {
this.cmd = 'suback'
this.messageId = packet.messageId
// the qos granted
this.granted = granted
}

Expand All @@ -27,7 +28,7 @@ function Subscription (qos, func) {
this.func = func
}

function SubscribeState (client, packet, restore, finish, granted) {
function SubscribeState (client, packet, restore, finish) {
this.client = client
this.packet = packet
this.actions = restore ? restoreTopicActions : subscribeTopicActions
Expand Down Expand Up @@ -56,11 +57,11 @@ function _dedupe (subs) {
}

function handleSubscribe (client, packet, restore, done) {
const subs = packet.subscriptions
packet.subscriptions = packet.subscriptions.length === 1 ? packet.subscriptions : _dedupe(packet.subscriptions)
client.broker._parallel(
new SubscribeState(client, packet, restore, done),
doSubscribe,
subs.length === 1 ? subs : _dedupe(subs),
packet.subscriptions,
restore ? done : completeSubscribe)
}

Expand Down Expand Up @@ -92,6 +93,7 @@ function blockDollarSignTopics (func) {

function storeSubscriptions (sub, done) {
if (!sub || typeof sub !== 'object') {
// means failure: MQTT 3.1.1 specs > 3.9.3 Payload
this.granted = 128
return done(null, null)
}
Expand Down Expand Up @@ -161,22 +163,26 @@ function completeSubscribe (err) {
} else {
done()
}
this.subState = []

const broker = client.broker
const subs = packet.subscriptions

var topics = []

for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
subs.qos = this.subState[i].granted
}

this.subState = []

broker.emit('subscribe', subs, client)

// Conform to MQTT 3.1.1 section 3.1.2.4
// Restored sessions should not contain any retained message.
// Retained message should be only fetched from SUBSCRIBE.

const persistence = broker.persistence
var topics = []
for (var i = 0; i < subs.length; i++) {
topics.push(subs[i].topic)
}
const stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
packet = new Packet({
Expand Down
15 changes: 12 additions & 3 deletions test/client-pub-sub.js
Expand Up @@ -714,7 +714,7 @@ test('get message when client disconnects', function (t) {
})

test('should not receive a message on negated subscription', function (t) {
t.plan(2)
t.plan(4)

const broker = aedes()
t.tearDown(broker.close.bind(broker))
Expand All @@ -731,15 +731,24 @@ test('should not receive a message on negated subscription', function (t) {
retain: true
}, function (err) {
t.error(err, 'no error')
client.subscribe({
client.subscribe([{
topic: 'hello',
qos: 0
}, function (err) {
},
{
topic: 'hello',
qos: 0
}], function (err) {
t.error(err, 'no error')
})
})
})

broker.on('subscribe', function (subs) {
t.pass(subs.length, 1, 'Should dedupe subs')
t.pass(subs[0].qos, 128, 'Qos should be 128 (Fail)')
})

const s = connect(setup(broker))
s.outStream.once('data', function (packet) {
t.fail('Packet should not be received')
Expand Down

0 comments on commit 27aa4ca

Please sign in to comment.