Skip to content

Commit

Permalink
Enhanced retain tests and move into test/retain.js (#275)
Browse files Browse the repository at this point in the history
  • Loading branch information
gnought authored and mcollina committed Jul 20, 2019
1 parent 9bac5eb commit 308edc2
Show file tree
Hide file tree
Showing 3 changed files with 473 additions and 282 deletions.
107 changes: 0 additions & 107 deletions test/basic.js
Original file line number Diff line number Diff line change
Expand Up @@ -204,39 +204,6 @@ test('unsubscribe', function (t) {
})
})

test('live retain packets', function (t) {
t.plan(5)
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
retain: false,
dup: false,
length: 12,
qos: 0
}

var s = noError(connect(setup()), t)

subscribe(t, s, 'hello', 0, function () {
s.outStream.on('data', function (packet) {
t.deepEqual(packet, expected)
})

s.broker.publish({
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
retain: true,
dup: false,
length: 12,
qos: 0
}, function () {
t.pass('publish finished')
})
})
})

test('unsubscribe without subscribe', function (t) {
t.plan(1)

Expand Down Expand Up @@ -324,38 +291,6 @@ test('disconnect', function (t) {
})
})

test('retain messages', function (t) {
var broker = aedes()
var publisher = connect(setup(broker))
var subscriber = connect(setup(broker))
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
dup: false,
length: 12,
retain: true
}

broker.subscribe('hello', function (packet, cb) {
cb()

// defer this or it will receive the message which
// is being published
setImmediate(function () {
subscribe(t, subscriber, 'hello', 0, function () {
subscriber.outStream.once('data', function (packet) {
t.deepEqual(packet, expected, 'packet must match')
t.end()
})
})
})
})

publisher.inStream.write(expected)
})

test('client closes', function (t) {
t.plan(3)

Expand Down Expand Up @@ -638,48 +573,6 @@ test('overlapping sub does not double deliver', function (t) {
})
})

test('avoid wrong deduping of retain messages', function (t) {
var broker = aedes()
var publisher = connect(setup(broker))
var subscriber = connect(setup(broker))
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
dup: false,
length: 12,
retain: true
}

broker.subscribe('hello', function (packet, cb) {
cb()
// subscribe and publish another topic
subscribe(t, subscriber, 'hello2', 0, function () {
cb()

publisher.inStream.write({
cmd: 'publish',
topic: 'hello2',
payload: Buffer.from('world'),
qos: 0,
dup: false
})

subscriber.outStream.once('data', function (packet) {
subscribe(t, subscriber, 'hello', 0, function () {
subscriber.outStream.once('data', function (packet) {
t.deepEqual(packet, expected, 'packet must match')
t.end()
})
})
})
})
})

publisher.inStream.write(expected)
})

test('publish empty topic', function (t) {
var s = connect(setup())

Expand Down
175 changes: 0 additions & 175 deletions test/qos1.js
Original file line number Diff line number Diff line change
Expand Up @@ -383,109 +383,6 @@ test('do not resend QoS 1 packets at reconnect if puback was received', function
})
})

test('deliver QoS 1 retained messages', function (t) {
var broker = aedes()
var publisher = connect(setup(broker))
var subscriber = connect(setup(broker))
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
dup: false,
length: 12,
retain: true
}

publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
messageId: 42,
retain: true
})

publisher.outStream.on('data', function (packet) {
subscribe(t, subscriber, 'hello', 1, function () {
subscriber.outStream.once('data', function (packet) {
t.deepEqual(packet, expected, 'packet must match')
t.end()
})
})
})
})

test('deliver QoS 1 retained messages', function (t) {
var broker = aedes()
var publisher = connect(setup(broker))
var subscriber = connect(setup(broker))
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 1,
dup: false,
length: 14,
retain: false
}

subscribe(t, subscriber, 'hello', 1, function () {
subscriber.outStream.once('data', function (packet) {
delete packet.messageId
t.deepEqual(packet, expected, 'packet must match')
t.end()
})
publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
messageId: 42,
retain: true
})
})
})

test('deliver QoS 0 retained message with QoS 1 subscription', function (t) {
var broker = aedes()
var publisher = connect(setup(broker))
var subscriber = connect(setup(broker))
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
dup: false,
length: 12,
retain: true
}

broker.mq.on('hello', function (msg, cb) {
cb()

// defer this or it will receive the message which
// is being published
setImmediate(function () {
subscribe(t, subscriber, 'hello', 1, function () {
subscriber.outStream.once('data', function (packet) {
t.deepEqual(packet, expected, 'packet must match')
t.end()
})
})
})
})

publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
messageId: 42,
retain: true
})
})

test('remove stored subscriptions after unsubscribe', function (t) {
var broker = aedes()
var publisher
Expand Down Expand Up @@ -602,78 +499,6 @@ test('downgrade QoS 0 publish on QoS 1 subsciption', function (t) {
})
})

test('not clean and retain messages with QoS 1', function (t) {
var broker = aedes()
var publisher
var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' })
var expected = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 1,
dup: false,
length: 14,
retain: true
}

subscribe(t, subscriber, 'hello', 1, function () {
subscriber.inStream.write({
cmd: 'disconnect'
})

subscriber.outStream.on('data', function (packet) {
console.log('original', packet)
})

publisher = connect(setup(broker))

publisher.inStream.write({
cmd: 'publish',
topic: 'hello',
payload: 'world',
qos: 1,
messageId: 42,
retain: true
})

publisher.outStream.once('data', function (packet) {
t.equal(packet.cmd, 'puback')

broker.on('clientError', function (client, err) {
t.fail('no error')
})

subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) {
t.equal(connect.sessionPresent, true, 'session present is set to true')
})

subscriber.outStream.once('data', function (packet) {
t.notEqual(packet.messageId, 42, 'messageId must differ')
t.equal(packet.qos, 0, 'qos degraded to 0 for retained')
var prevId = packet.messageId
delete packet.messageId
packet.qos = 1
packet.length = 14
t.deepEqual(packet, expected, 'packet must match')

// message is duplicated
subscriber.outStream.once('data', function (packet2) {
var curId = packet2.messageId
t.notOk(curId === prevId, 'messageId must differ')
subscriber.inStream.write({
cmd: 'puback',
messageId: curId
})
delete packet2.messageId
t.deepEqual(packet, expected, 'packet must match')

t.end()
})
})
})
})
})

test('subscribe and publish QoS 1 in parallel', function (t) {
t.plan(5)

Expand Down
Loading

0 comments on commit 308edc2

Please sign in to comment.