Skip to content

Commit

Permalink
Store subscriptions before adding the listener to MQEmitter
Browse files Browse the repository at this point in the history
  • Loading branch information
mcollina committed Jun 8, 2016
1 parent 088bec3 commit 5c65006
Show file tree
Hide file tree
Showing 4 changed files with 83 additions and 29 deletions.
2 changes: 1 addition & 1 deletion lib/client.js
Expand Up @@ -209,7 +209,7 @@ Client.prototype.close = function (done) {

function finish () {
if (!that.disconnected && that.will) {
that.broker.publish(that.will)
that.broker.publish(that.will, that, nop)
}

conn.removeAllListeners('error')
Expand Down
8 changes: 7 additions & 1 deletion lib/handlers/connect.js
Expand Up @@ -147,7 +147,13 @@ function emptyQueueFilter (err, client, packet) {
return next()
}

if (client.clean) {
var authorized = true

if (packet.cmd === 'publish') {
authorized = client.broker.authorizeForward(client, packet)
}

if (client.clean || !authorized) {
persistence.outgoingClearMessageId(client, packet, next)
} else {
write(client, packet, next)
Expand Down
47 changes: 20 additions & 27 deletions lib/handlers/subscribe.js
Expand Up @@ -6,6 +6,7 @@ var Packet = require('aedes-packet')
var through = require('through2')
var topicActions = fastfall([
authorize,
storeSubscriptions,
subTopic
])

Expand Down Expand Up @@ -53,6 +54,17 @@ function Subscription (qos, func) {
this.func = func
}

function storeSubscriptions (sub, done) {
var packet = this.packet
var client = this.client
var broker = client.broker
var perst = broker.persistence

perst.addSubscriptions(client, packet.subscriptions, function (err) {
done(err, sub)
})
}

function subTopic (sub, done) {
if (!sub) {
this.granted.push(128)
Expand Down Expand Up @@ -105,29 +117,24 @@ function subTopic (sub, done) {
}
}

var completeSubscribeActions = [
storeSubscriptions,
doSuback
]
function completeSubscribe (err) {
var packet = this.packet
var client = this.client
var broker = client.broker
var granted = this.granted
var done = this.finish

if (err) {
return done(err)
}

broker._series(this, completeSubscribeActions, null, done || nop)
}

function storeSubscriptions (arg, done) {
var packet = this.packet
var client = this.client
var broker = client.broker
var perst = broker.persistence
broker.emit('subscribe', packet.subscriptions, client)

perst.addSubscriptions(client, packet.subscriptions, done)
if (packet.messageId) {
write(client, new SubAck(packet, granted), done)
} else {
done()
}
}

function SubAck (packet, granted) {
Expand All @@ -136,20 +143,6 @@ function SubAck (packet, granted) {
this.granted = granted
}

function doSuback (arg, done) {
var packet = this.packet
var client = this.client
var granted = this.granted

client.broker.emit('subscribe', packet.subscriptions, client)

if (packet.messageId) {
write(client, new SubAck(packet, granted), done)
} else {
done()
}
}

function nop () {}

module.exports = handleSubscribe
55 changes: 55 additions & 0 deletions test/qos1.js
Expand Up @@ -641,3 +641,58 @@ test('not clean and retain messages with QoS 1', function (t) {
})
})
})

test('subscribe and publish QoS 1 in parallel', function (t) {
var broker = aedes()
var s = connect(setup(broker))
var expected = {
cmd: 'publish',
topic: 'hello',
payload: new Buffer('world'),
qos: 1,
dup: false,
length: 14,
retain: false
}

broker.on('clientError', function (client, err) {
console.log(err.stack)
// t.fail('no client error')
})

s.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'puback')
t.equal(packet.messageId, 42, 'messageId must match differ')
s.outStream.once('data', function (packet) {
s.inStream.write({
cmd: 'puback',
messageId: packet.messageId
})
delete packet.messageId
t.deepEqual(packet, expected, 'packet must match')
s.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'suback')
t.deepEqual(packet.granted, [1])
t.equal(packet.messageId, 24)
t.end()
})
})
})

s.inStream.write({
cmd: 'subscribe',
messageId: 24,
subscriptions: [{
topic: 'hello',
qos: 1
}]
})

s.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
messageId: 42
})
})

0 comments on commit 5c65006

Please sign in to comment.