From 49f0825b6d85d4110017adfbf0e4bc21d01b4317 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Tue, 6 Aug 2019 22:58:30 +0800 Subject: [PATCH 1/2] Enhance test `resend publish on non-clean reconnect QoS 1` --- test/qos1.js | 22 +++++++++++++++++++--- 1 file changed, 19 insertions(+), 3 deletions(-) diff --git a/test/qos1.js b/test/qos1.js index 984e7628..dc739ce5 100644 --- a/test/qos1.js +++ b/test/qos1.js @@ -1,5 +1,6 @@ 'use strict' +var concat = require('concat-stream') var Buffer = require('safe-buffer').Buffer var test = require('tape').test var helper = require('./helper') @@ -273,12 +274,15 @@ test('remove stored subscriptions if connected with clean=true', function (t) { }) test('resend publish on non-clean reconnect QoS 1', function (t) { - t.plan(6) + t.plan(8) var broker = aedes() var publisher var opts = { clean: false, clientId: 'abcde' } var subscriber = connect(setup(broker), opts) + var subscriberClient = { + id: opts.clientId + } var expected = { cmd: 'publish', topic: 'hello', @@ -301,7 +305,13 @@ test('resend publish on non-clean reconnect QoS 1', function (t) { qos: 1, messageId: 42 }) - + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world world', + qos: 1, + messageId: 42 + }) publisher.outStream.once('data', function (packet) { t.equal(packet.cmd, 'puback') @@ -315,7 +325,13 @@ test('resend publish on non-clean reconnect QoS 1', function (t) { t.notEqual(packet.messageId, 42, 'messageId must differ') delete packet.messageId t.deepEqual(packet, expected, 'packet must match') - t.end() + setImmediate(() => { + var stream = broker.persistence.outgoingStream(subscriberClient) + stream.pipe(concat(function (list) { + t.equal(list.length, 1, 'should remain one item in queue') + t.deepEqual(list[0].payload, Buffer.from('world world'), 'packet must match') + })) + }) }) }) }) From 5d3528274179472cf720b01250e57b83cfc5f0a0 Mon Sep 17 00:00:00 2001 From: Gnought <1684105+gnought@users.noreply.github.com> Date: Tue, 6 Aug 2019 22:58:31 +0800 Subject: [PATCH 2/2] Add a qos 2 test, 'multiple publish and store one' --- test/qos2.js | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/test/qos2.js b/test/qos2.js index 5314db59..1bb7d573 100644 --- a/test/qos2.js +++ b/test/qos2.js @@ -425,3 +425,35 @@ test('publish after disconnection', function (t) { }) }) }) + +test('multiple publish and store one', function (t) { + t.plan(2) + + var broker = aedes() + var sid = { + id: 'abcde' + } + var s = connect(setup(broker), { clientId: sid.id }) + var toPublish = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 2, + retain: false, + messageId: 42 + } + + var count = 5 + while (--count) { + s.inStream.write(toPublish) + } + broker.on('closed', function () { + broker.persistence.incomingGetPacket(sid, toPublish, function (err, origPacket) { + delete origPacket.brokerId + delete origPacket.brokerCounter + t.deepEqual(origPacket, toPublish, 'packet must match') + t.error(err) + t.end() + }) + }) +})