Skip to content

Commit

Permalink
The QoS of retained messages should depend on subscribers (#288)
Browse files Browse the repository at this point in the history
* Enhance retain test
broker not store zero-byte retained messages

* Send retained packets should incr broker.counter.

* clientSub.qos=0 should not be evaluated as false

* Retained messages delivery QoS should depend on subscribers
[MQTT-3.3.1-5] If the RETAIN flag is set to 1, in a PUBLISH Packet sent by a Client to a Server, the Server MUST store the Application Message and its QoS, so that it can be delivered to future subscribers whose subscriptions match its topic name
We store the retained message QoS in a persistence store. If the retain message is QoS 1
Sub (QoS 2) will get the msg in QoS 1
Sub (QoS 1) will get the msg in QoS 1
Sub (QoS 0) will get the msg in QoS 0 (downgraded)

* Refactored & standardized

* Added comment

* Fixed lint

* Fixed test that subscription happen after retained
  • Loading branch information
gnought authored and mcollina committed Aug 9, 2019
1 parent 3683101 commit cb3c3a6
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 19 deletions.
4 changes: 2 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -119,11 +119,11 @@ function Client (broker, conn) {
var packet = new QoSPacket(toForward, that)
// Downgrading to client subscription qos if needed
var clientSub = that.subscriptions[packet.topic]
if (clientSub && clientSub.qos && clientSub.qos < packet.qos) {
if (clientSub && (clientSub.qos || 0) < packet.qos) {
packet.qos = clientSub.qos
}
packet.writeCallback = cb
if (that.clean) {
if (that.clean || packet.retain) {
writeQoS(null, that, packet)
} else {
broker.persistence.outgoingUpdate(that, packet, writeQoS)
Expand Down
10 changes: 8 additions & 2 deletions lib/handlers/subscribe.js
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,16 @@ function completeSubscribe (err) {
}
var stream = persistence.createRetainedStreamCombi(topics)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
packet = new Packet(packet)
packet = new Packet({
cmd: packet.cmd,
qos: packet.qos,
topic: packet.topic,
payload: packet.payload,
retain: true
}, broker)
// this should not be deduped
packet.brokerId = null
client.deliver0(packet, cb)
client.deliverQoS(packet, cb)
}))
}

Expand Down
96 changes: 81 additions & 15 deletions test/retain.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@

var Buffer = require('safe-buffer').Buffer
var test = require('tape').test
var through = require('through2')
var helper = require('./helper')
var aedes = require('../')
var setup = helper.setup
var connect = helper.connect
var noError = helper.noError
var subscribe = helper.subscribe

// [MQTT-3.3.1-9]
test('live retain packets', function (t) {
t.plan(5)
var expected = {
Expand Down Expand Up @@ -160,8 +162,8 @@ test('reconnected subscriber will not receive retained messages when QoS 0 and c
})

// [MQTT-3.3.1-6]
test('new subscribers receive retained messages when QoS 0 and clean', function (t) {
t.plan(8)
test('new QoS 0 subscribers receive QoS 0 retained messages when clean', function (t) {
t.plan(9)

var broker = aedes()
var publisher = connect(setup(broker), { clean: true })
Expand Down Expand Up @@ -193,7 +195,49 @@ test('new subscribers receive retained messages when QoS 0 and clean', function
t.deepEqual(packet, expected, 'packet must match')
})
})
broker.on('closed', t.end.bind(t))
broker.on('closed', function () {
t.equal(broker.counter, 9)
t.end()
})
})

// [MQTT-3.3.1-5]
test('new QoS 0 subscribers receive downgraded QoS 1 retained messages when clean', function (t) {
t.plan(6)

var broker = aedes()
var publisher = connect(setup(broker), { clean: true })
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: true,
dup: false,
length: 12
}
publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
retain: true,
messageId: 42
})
publisher.outStream.on('data', function (packet) {
var subscriber = connect(setup(broker, false), { clean: true })
subscribe(t, subscriber, 'hello', 0, function () {
subscriber.outStream.on('data', function (packet) {
t.notEqual(packet.messageId, 42, 'messageId should not be the same')
delete packet.messageId
t.deepEqual(packet, expected, 'packet must match')
})
})
})
broker.on('closed', function () {
t.equal(broker.counter, 6)
t.end()
})
})

// [MQTT-3.3.1-10]
Expand Down Expand Up @@ -223,6 +267,29 @@ test('clean retained messages', function (t) {
broker.on('closed', t.end.bind(t))
})

// [MQTT-3.3.1-11]
test('broker not store zero-byte retained messages', function (t) {
var broker = aedes()
var s = connect(setup(broker))

s.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: '',
retain: true
})
s.broker.on('publish', function (packet, client) {
if (packet.topic.startsWith('$SYS/')) {
return
}
var stream = s.broker.persistence.createRetainedStream(packet.topic)
stream.pipe(through.obj(function sendRetained (packet, enc, cb) {
t.fail('not store zero-byte retained messages')
}))
})
s.broker.on('closed', t.end.bind(t))
})

test('fail to clean retained messages without retain flag', function (t) {
t.plan(4)

Expand Down Expand Up @@ -251,9 +318,9 @@ test('fail to clean retained messages without retain flag', function (t) {
qos: 0,
retain: false
})
var subscriber1 = connect(setup(broker, false), { clean: true })
subscribe(t, subscriber1, 'hello', 0, function () {
subscriber1.outStream.on('data', function (packet) {
var subscriber = connect(setup(broker, false), { clean: true })
subscribe(t, subscriber, 'hello', 0, function () {
subscriber.outStream.on('data', function (packet) {
t.deepEqual(packet, expected, 'packet must match')
})
})
Expand Down Expand Up @@ -288,26 +355,26 @@ test('only get the last retained messages in same topic', function (t) {
qos: 0,
retain: true
})
var subscriber1 = connect(setup(broker, false), { clean: true })
subscribe(t, subscriber1, 'hello', 0, function () {
subscriber1.outStream.on('data', function (packet) {
var subscriber = connect(setup(broker, false), { clean: true })
subscribe(t, subscriber, 'hello', 0, function () {
subscriber.outStream.on('data', function (packet) {
t.deepEqual(packet, expected, 'packet must match')
})
})
broker.on('closed', t.end.bind(t))
})

test('deliver QoS 1 retained messages', function (t) {
test('deliver QoS 1 retained messages to new subscriptions', function (t) {
var broker = aedes()
var publisher = connect(setup(broker))
var subscriber = connect(setup(broker))
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
qos: 1,
dup: false,
length: 12,
length: 14,
retain: true
}

Expand All @@ -323,14 +390,15 @@ test('deliver QoS 1 retained messages', function (t) {
publisher.outStream.on('data', function (packet) {
subscribe(t, subscriber, 'hello', 1, function () {
subscriber.outStream.once('data', function (packet) {
delete packet.messageId
t.deepEqual(packet, expected, 'packet must match')
t.end()
})
})
})
})

test('deliver QoS 1 retained messages', function (t) {
test('deliver QoS 1 retained messages to established subscriptions', function (t) {
var broker = aedes()
var publisher = connect(setup(broker))
var subscriber = connect(setup(broker))
Expand Down Expand Up @@ -447,10 +515,8 @@ test('not clean and retain messages with QoS 1', function (t) {

subscriber.outStream.once('data', function (packet) {
t.notEqual(packet.messageId, 42, 'messageId must differ')
t.equal(packet.qos, 0, 'qos degraded to 0 for retained')
var prevId = packet.messageId
delete packet.messageId
packet.qos = 1
packet.length = 14
t.deepEqual(packet, expected, 'packet must match')

Expand Down

0 comments on commit cb3c3a6

Please sign in to comment.