Skip to content

Commit

Permalink
Dedupe subscriptions (#313)
Browse files Browse the repository at this point in the history
* Dedupe subscriptions

* Fixed typo

* Refactored from forEach to for loop
  • Loading branch information
gnought authored and mcollina committed Sep 2, 2019
1 parent 7d3f8c0 commit 81e75ac
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 2 deletions.
16 changes: 15 additions & 1 deletion lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,20 @@ function SubscribeState (client, packet, finish, granted) {
this.subsIndex = 0
}

// if same subscribed topic in subs array, we pick up the last one
function _dedupe (subs) {
var dedupeSubs = {}
for (var i = 0; i < subs.length; i++) {
var sub = subs[i]
dedupeSubs[sub.topic] = sub
}
var ret = []
for (var key in dedupeSubs) {
ret.push(dedupeSubs[key])
}
return ret
}

function handleSubscribe (client, packet, done) {
var broker = client.broker
var subs = packet.subscriptions
Expand All @@ -27,7 +41,7 @@ function handleSubscribe (client, packet, done) {
broker._series(
new SubscribeState(client, packet, done, granted),
doSubscribe,
subs,
subs.length === 1 ? subs : _dedupe(subs),
completeSubscribe)
}

Expand Down
57 changes: 57 additions & 0 deletions test/auth.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ var aedes = require('../')
var eos = require('end-of-stream')
var setup = helper.setup
var subscribe = helper.subscribe
var subscribeMultiple = helper.subscribeMultiple
var connect = helper.connect

test('authenticate successfully a client with username and password', function (t) {
Expand Down Expand Up @@ -489,6 +490,62 @@ test('authorize subscribe', function (t) {
subscribe(t, s, 'hello', 0)
})

test('authorize subscribe multiple same topics with same qos', function (t) {
t.plan(4)

var s = connect(setup())

s.broker.authorizeSubscribe = function (client, sub, cb) {
t.deepEqual(sub, {
topic: 'hello',
qos: 0
}, 'topic matches')
cb(null, sub)
}

subscribeMultiple(t, s, [{ topic: 'hello', qos: 0 }, { topic: 'hello', qos: 0 }], [0])
})

test('authorize subscribe multiple same topics with different qos', function (t) {
t.plan(4)

var s = connect(setup())

s.broker.authorizeSubscribe = function (client, sub, cb) {
t.deepEqual(sub, {
topic: 'hello',
qos: 1
}, 'topic matches')
cb(null, sub)
}

subscribeMultiple(t, s, [{ topic: 'hello', qos: 0 }, { topic: 'hello', qos: 1 }], [1])
})

test('authorize subscribe multiple different topics', function (t) {
t.plan(7)

var s = connect(setup())

s.broker.authorizeSubscribe = function (client, sub, cb) {
t.ok(client, 'client exists')
if (sub.topic === 'hello') {
t.deepEqual(sub, {
topic: 'hello',
qos: 0
}, 'topic matches')
} else if (sub.topic === 'foo') {
t.deepEqual(sub, {
topic: 'foo',
qos: 0
}, 'topic matches')
}
cb(null, sub)
}

subscribeMultiple(t, s, [{ topic: 'hello', qos: 0 }, { topic: 'foo', qos: 0 }], [0, 0])
})

test('authorize subscribe from config options', function (t) {
t.plan(5)

Expand Down
22 changes: 21 additions & 1 deletion test/helper.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,9 +95,29 @@ function subscribe (t, subscriber, topic, qos, done) {
})
}

// subs: [{topic:, qos:}]
function subscribeMultiple (t, subscriber, subs, expectedGranted, done) {
subscriber.inStream.write({
cmd: 'subscribe',
messageId: 24,
subscriptions: subs
})

subscriber.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'suback')
t.deepEqual(packet.granted, expectedGranted)
t.equal(packet.messageId, 24)

if (done) {
done(null, packet)
}
})
}

module.exports = {
setup: setup,
connect: connect,
noError: noError,
subscribe: subscribe
subscribe: subscribe,
subscribeMultiple: subscribeMultiple
}

0 comments on commit 81e75ac

Please sign in to comment.