diff --git a/test/auth.js b/test/auth.js index 869cd30d..8e21a769 100644 --- a/test/auth.js +++ b/test/auth.js @@ -567,7 +567,15 @@ test('negate multiple subscriptions', function (t) { }) test('negate subscription with correct persistence', function (t) { - t.plan(7) + t.plan(6) + + var expected = [{ + topic: 'hello', + qos: 0 + }, { + topic: 'world', + qos: 0 + }] var broker = aedes() broker.authorizeSubscribe = function (client, sub, cb) { @@ -582,8 +590,9 @@ test('negate subscription with correct persistence', function (t) { s.outStream.once('data', function (packet) { t.equal(packet.cmd, 'suback') t.deepEqual(packet.granted, [128, 0]) - t.notEqual(broker.persistence._subscriptions.get('abcde'), undefined) - t.deepEqual(broker.persistence._subscriptions.get('abcde').size, 2) + broker.persistence.subscriptionsByClient(broker.clients['abcde'], function (_, subs, client) { + t.deepEqual(subs, expected) + }) t.equal(packet.messageId, 24) }) diff --git a/test/basic.js b/test/basic.js index 70fa6ca3..853cacfb 100644 --- a/test/basic.js +++ b/test/basic.js @@ -11,6 +11,8 @@ var noError = helper.noError var subscribe = helper.subscribe test('connect and connack (minimal)', function (t) { + t.plan(1) + var s = setup() s.inStream.write({ @@ -86,6 +88,8 @@ test('second CONNECT Packet sent from a Client as a protocol violation and disco }) test('publish QoS 0', function (t) { + t.plan(2) + var s = connect(setup()) var expected = { cmd: 'publish', @@ -112,6 +116,8 @@ test('publish QoS 0', function (t) { }) test('subscribe QoS 0', function (t) { + t.plan(4) + var s = connect(setup()) var expected = { cmd: 'publish', @@ -139,6 +145,7 @@ test('subscribe QoS 0', function (t) { test('does not die badly on connection error', function (t) { t.plan(3) + var s = connect(setup()) s.inStream.write({ @@ -228,6 +235,8 @@ test('unsubscribe without subscribe', function (t) { }) test('unsubscribe on disconnect for a clean=true client', function (t) { + t.plan(6) + var opts = { clean: true } var s = noError(connect(setup(), opts), t) @@ -238,10 +247,10 @@ test('unsubscribe on disconnect for a clean=true client', function (t) { }) s.broker.once('unsubscribe', function () { t.pass('should emit unsubscribe') - t.end() }) s.broker.once('closed', function () { t.ok(true) + t.end() }) s.broker.publish({ cmd: 'publish', @@ -254,6 +263,8 @@ test('unsubscribe on disconnect for a clean=true client', function (t) { }) test('unsubscribe on disconnect for a clean=false client', function (t) { + t.plan(5) + var opts = { clean: false } var s = noError(connect(setup(), opts), t) @@ -280,6 +291,8 @@ test('unsubscribe on disconnect for a clean=false client', function (t) { }) test('disconnect', function (t) { + t.plan(0) + var s = noError(connect(setup()), t) s.outStream.on('finish', function () { @@ -353,6 +366,8 @@ test('broker closes gracefully', function (t) { }) test('testing other event', function (t) { + t.plan(1) + var broker = aedes() var client = setup(broker) @@ -364,6 +379,8 @@ test('testing other event', function (t) { }) test('connect without a clientId for MQTT 3.1.1', function (t) { + t.plan(1) + var s = setup() s.inStream.write({ @@ -454,6 +471,8 @@ test('publish to $SYS/broker/new/clients', function (t) { }) test('restore QoS 0 subscriptions not clean', function (t) { + t.plan(5) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) @@ -490,6 +509,8 @@ test('restore QoS 0 subscriptions not clean', function (t) { }) test('do not restore QoS 0 subscriptions when clean', function (t) { + t.plan(6) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }) @@ -522,6 +543,8 @@ test('do not restore QoS 0 subscriptions when clean', function (t) { }) test('double sub does not double deliver', function (t) { + t.plan(7) + var s = connect(setup()) var expected = { cmd: 'publish', @@ -554,6 +577,8 @@ test('double sub does not double deliver', function (t) { }) test('overlapping sub does not double deliver', function (t) { + t.plan(7) + var s = connect(setup()) var expected = { cmd: 'publish', @@ -586,6 +611,8 @@ test('overlapping sub does not double deliver', function (t) { }) test('publish empty topic', function (t) { + t.plan(4) + var s = connect(setup()) subscribe(t, s, '#', 0, function () { @@ -608,6 +635,8 @@ test('publish empty topic', function (t) { }) test('publish invalid topic with #', function (t) { + t.plan(3) + var s = connect(setup()) subscribe(t, s, '#', 0, function () { @@ -629,6 +658,8 @@ test('publish invalid topic with #', function (t) { }) test('publish invalid topic with +', function (t) { + t.plan(3) + var s = connect(setup()) subscribe(t, s, '#', 0, function () { @@ -650,6 +681,8 @@ test('publish invalid topic with +', function (t) { ;['base/#/sub', 'base/#sub', 'base/sub#', 'base/xyz+/sub', 'base/+xyz/sub'].forEach(function (topic) { test('subscribe to invalid topic with "' + topic + '"', function (t) { + t.plan(0) + var s = connect(setup()) s.broker.on('clientError', function () { @@ -667,6 +700,8 @@ test('publish invalid topic with +', function (t) { }) test('unsubscribe to invalid topic with "' + topic + '"', function (t) { + t.plan(0) + var s = connect(setup()) s.broker.on('clientError', function () { diff --git a/test/client-pub-sub.js b/test/client-pub-sub.js index a0d5f1ac..e881312c 100644 --- a/test/client-pub-sub.js +++ b/test/client-pub-sub.js @@ -346,6 +346,7 @@ test('subscribe a client programmatically with full packet', function (t) { test('get message when client connects', function (t) { t.plan(2) + var client1 = 'gav' var broker = aedes() @@ -369,6 +370,7 @@ test('get message when client connects', function (t) { test('get message when client disconnects', function (t) { t.plan(2) + var client1 = 'gav' var client2 = 'friend' var broker = aedes() @@ -397,7 +399,7 @@ test('get message when client disconnects', function (t) { }) test('should not receive a message on negated subscription', function (t) { - t.plan(3) + t.plan(2) var broker = aedes() broker.authorizeSubscribe = function (client, sub, callback) { @@ -422,16 +424,8 @@ test('should not receive a message on negated subscription', function (t) { }) var s = connect(setup(broker)) - var receivedPacket = null s.outStream.once('data', function (packet) { - receivedPacket = packet + t.fail('Packet should not be received') }) - - setTimeout(function () { - if (receivedPacket != null) { - t.fail('Packet should not be received') - } else { - t.pass('Message not received') - } - }, 100) + broker.on('closed', t.end.bind(t)) }) diff --git a/test/events.js b/test/events.js index 25df8ea3..9861a109 100644 --- a/test/events.js +++ b/test/events.js @@ -9,6 +9,7 @@ var subscribe = helper.subscribe test('publishes an hearbeat', function (t) { t.plan(3) + var broker = aedes({ heartbeatInterval: 10 // ms }) @@ -23,6 +24,7 @@ test('publishes an hearbeat', function (t) { test('does not forward $SYS topics to # subscription', function (t) { t.plan(4) + var s = connect(setup()) subscribe(t, s, '#', 0, function () { @@ -42,6 +44,7 @@ test('does not forward $SYS topics to # subscription', function (t) { test('does not forward $SYS topics to +/# subscription', function (t) { t.plan(4) + var s = connect(setup()) subscribe(t, s, '+/#', 0, function () { @@ -89,6 +92,7 @@ test('does not store $SYS topics to QoS 1 # subscription', function (t) { test('Emit event when receives a ping', function (t) { t.plan(6) t.timeoutAfter(2000) + var broker = aedes() broker.on('ping', function (packet, client) { @@ -98,8 +102,8 @@ test('Emit event when receives a ping', function (t) { t.equal(packet.payload, null) t.equal(packet.topic, null) t.equal(packet.length, 0) - broker.close() t.pass('ended') + broker.close() } }) @@ -112,6 +116,7 @@ test('Emit event when receives a ping', function (t) { test('Emit event when broker closed', function (t) { t.plan(1) + var broker = aedes() broker.once('closed', function () { t.ok(true) diff --git a/test/keep-alive.js b/test/keep-alive.js index e7f03288..55b2c7f9 100644 --- a/test/keep-alive.js +++ b/test/keep-alive.js @@ -57,7 +57,7 @@ test('supports keep alive disconnections after a pingreq', function (t) { test('disconnect if a connect does not arrive in time', function (t) { t.plan(2) - t.timeoutAfter(200) + t.timeoutAfter(500) var s = setup(aedes({ connectTimeout: 100 diff --git a/test/not-blocking.js b/test/not-blocking.js index c2f0e971..b006adbc 100644 --- a/test/not-blocking.js +++ b/test/not-blocking.js @@ -1,12 +1,14 @@ 'use strict' var mqtt = require('mqtt') -var test = require('tape') +var test = require('tape').test var aedes = require('../') var net = require('net') var port = 4883 test('do not block after a subscription', function (t) { + t.plan(3) + var instance = aedes() var server = net.createServer(instance.handle) var total = 10000 @@ -73,6 +75,8 @@ test('do not block after a subscription', function (t) { }) test('do not block with overlapping subscription', function (t) { + t.plan(3) + var instance = aedes({ concurrency: 15 }) var server = net.createServer(instance.handle) var total = 10000 diff --git a/test/qos1.js b/test/qos1.js index 2e9aa8d4..984e7628 100644 --- a/test/qos1.js +++ b/test/qos1.js @@ -9,6 +9,8 @@ var connect = helper.connect var subscribe = helper.subscribe test('publish QoS 1', function (t) { + t.plan(1) + var s = connect(setup()) var expected = { cmd: 'puback', @@ -38,10 +40,10 @@ test('publish QoS 1 and check offline queue', function (t) { var broker = aedes() var publisher = connect(setup(broker), { clean: false }) - var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) var subscriberClient = { id: 'abcde' } + var subscriber = connect(setup(broker), { clean: false, clientId: subscriberClient.id }) var expected = { cmd: 'publish', topic: 'hello', @@ -104,6 +106,8 @@ test('publish QoS 1 and check offline queue', function (t) { }) test('subscribe QoS 1', function (t) { + t.plan(5) + var broker = aedes() var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) @@ -140,6 +144,8 @@ test('subscribe QoS 1', function (t) { }) test('subscribe QoS 0, but publish QoS 1', function (t) { + t.plan(4) + var broker = aedes() var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) @@ -170,6 +176,8 @@ test('subscribe QoS 0, but publish QoS 1', function (t) { }) test('restore QoS 1 subscriptions not clean', function (t) { + t.plan(7) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) @@ -217,6 +225,8 @@ test('restore QoS 1 subscriptions not clean', function (t) { }) test('remove stored subscriptions if connected with clean=true', function (t) { + t.plan(5) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) @@ -263,6 +273,8 @@ test('remove stored subscriptions if connected with clean=true', function (t) { }) test('resend publish on non-clean reconnect QoS 1', function (t) { + t.plan(6) + var broker = aedes() var publisher var opts = { clean: false, clientId: 'abcde' } @@ -310,6 +322,8 @@ test('resend publish on non-clean reconnect QoS 1', function (t) { }) test('do not resend QoS 1 packets at each reconnect', function (t) { + t.plan(6) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) @@ -356,17 +370,15 @@ test('do not resend QoS 1 packets at each reconnect', function (t) { subscriber2.outStream.once('data', function (packet) { t.fail('this should never happen') }) - - // TODO wait all packets to be sent - setTimeout(function () { - t.end() - }, 50) }) }) }) + broker.on('closed', t.end.bind(t)) }) test('do not resend QoS 1 packets if reconnect is clean', function (t) { + t.plan(4) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) @@ -392,16 +404,14 @@ test('do not resend QoS 1 packets if reconnect is clean', function (t) { subscriber.outStream.once('data', function (packet) { t.fail('this should never happen') }) - - // TODO wait all packets to be sent - setTimeout(function () { - t.end() - }, 50) }) }) + broker.on('closed', t.end.bind(t)) }) test('do not resend QoS 1 packets at reconnect if puback was received', function (t) { + t.plan(5) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) @@ -444,16 +454,14 @@ test('do not resend QoS 1 packets at reconnect if puback was received', function subscriber.outStream.once('data', function (packet) { t.fail('this should never happen') }) - - // TODO wait all packets to be sent - setTimeout(function () { - t.end() - }, 50) }) }) + broker.on('closed', t.end.bind(t)) }) test('remove stored subscriptions after unsubscribe', function (t) { + t.plan(5) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) @@ -513,6 +521,8 @@ test('remove stored subscriptions after unsubscribe', function (t) { }) test('upgrade a QoS 0 subscription to QoS 1', function (t) { + t.plan(8) + var s = connect(setup()) var expected = { cmd: 'publish', @@ -544,6 +554,8 @@ test('upgrade a QoS 0 subscription to QoS 1', function (t) { }) test('downgrade QoS 0 publish on QoS 1 subsciption', function (t) { + t.plan(4) + var s = connect(setup()) var expected = { cmd: 'publish', diff --git a/test/qos2.js b/test/qos2.js index 26525b1d..5314db59 100644 --- a/test/qos2.js +++ b/test/qos2.js @@ -79,6 +79,8 @@ function receive (t, subscriber, expected, done) { } test('publish QoS 2', function (t) { + t.plan(2) + var s = connect(setup()) var packet = { cmd: 'publish', @@ -91,6 +93,8 @@ test('publish QoS 2', function (t) { }) test('subscribe QoS 2', function (t) { + t.plan(8) + var broker = aedes() var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) @@ -113,6 +117,8 @@ test('subscribe QoS 2', function (t) { }) test('client.publish with clean=true subscribption QoS 2', function (t) { + t.plan(8) + var broker = aedes() var toPublish = { cmd: 'publish', @@ -178,9 +184,11 @@ test('call published method with client with QoS 2', function (t) { }) test('subscribe QoS 0, but publish QoS 2', function (t) { + t.plan(6) + var broker = aedes() var publisher = connect(setup(broker)) - var subscriber = connect(setup(broker)) + var subscriber = connect(setup(broker, false)) var expected = { cmd: 'publish', topic: 'hello', @@ -194,7 +202,6 @@ test('subscribe QoS 0, but publish QoS 2', function (t) { subscribe(t, subscriber, 'hello', 0, function () { subscriber.outStream.once('data', function (packet) { t.deepEqual(packet, expected, 'packet must match') - t.end() }) publish(t, publisher, { @@ -207,12 +214,15 @@ test('subscribe QoS 0, but publish QoS 2', function (t) { dup: false }) }) + broker.on('closed', t.end.bind(t)) }) test('subscribe QoS 1, but publish QoS 2', function (t) { + t.plan(6) + var broker = aedes() var publisher = connect(setup(broker)) - var subscriber = connect(setup(broker)) + var subscriber = connect(setup(broker, false)) var expected = { cmd: 'publish', topic: 'hello', @@ -227,7 +237,6 @@ test('subscribe QoS 1, but publish QoS 2', function (t) { subscriber.outStream.once('data', function (packet) { delete packet.messageId t.deepEqual(packet, expected, 'packet must match') - t.end() }) publish(t, publisher, { @@ -240,9 +249,12 @@ test('subscribe QoS 1, but publish QoS 2', function (t) { dup: false }) }) + broker.on('closed', t.end.bind(t)) }) test('restore QoS 2 subscriptions not clean', function (t) { + t.plan(9) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) @@ -272,6 +284,8 @@ test('restore QoS 2 subscriptions not clean', function (t) { }) test('resend publish on non-clean reconnect QoS 2', function (t) { + t.plan(8) + var broker = aedes() var publisher var opts = { clean: false, clientId: 'abcde' } @@ -301,6 +315,8 @@ test('resend publish on non-clean reconnect QoS 2', function (t) { }) test('resend pubrel on non-clean reconnect QoS 2', function (t) { + t.plan(9) + var broker = aedes() var publisher var opts = { clean: false, clientId: 'abcde' } @@ -375,6 +391,8 @@ test('resend pubrel on non-clean reconnect QoS 2', function (t) { }) test('publish after disconnection', function (t) { + t.plan(10) + var broker = aedes() var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) diff --git a/test/regr-21.js b/test/regr-21.js index a6e00b60..cc71b10c 100644 --- a/test/regr-21.js +++ b/test/regr-21.js @@ -7,6 +7,7 @@ var connect = helper.connect test('after an error, outstanding packets are discarded', function (t) { t.plan(1) + var s = connect(setup(), { keepalive: 1000 }) diff --git a/test/retain.js b/test/retain.js index aa2f589a..69f2a92c 100644 --- a/test/retain.js +++ b/test/retain.js @@ -45,6 +45,8 @@ test('live retain packets', function (t) { }) test('retain messages', function (t) { + t.plan(4) + var broker = aedes() var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) @@ -77,6 +79,8 @@ test('retain messages', function (t) { }) test('avoid wrong deduping of retain messages', function (t) { + t.plan(7) + var broker = aedes() var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) @@ -242,6 +246,8 @@ test('new QoS 0 subscribers receive downgraded QoS 1 retained messages when clea // [MQTT-3.3.1-10] test('clean retained messages', function (t) { + t.plan(3) + var broker = aedes() var publisher = connect(setup(broker), { clean: true }) publisher.inStream.write({ @@ -269,6 +275,8 @@ test('clean retained messages', function (t) { // [MQTT-3.3.1-11] test('broker not store zero-byte retained messages', function (t) { + t.plan(0) + var broker = aedes() var s = connect(setup(broker)) @@ -365,6 +373,8 @@ test('only get the last retained messages in same topic', function (t) { }) test('deliver QoS 1 retained messages to new subscriptions', function (t) { + t.plan(4) + var broker = aedes() var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) @@ -399,6 +409,8 @@ test('deliver QoS 1 retained messages to new subscriptions', function (t) { }) test('deliver QoS 1 retained messages to established subscriptions', function (t) { + t.plan(4) + var broker = aedes() var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) @@ -430,6 +442,8 @@ test('deliver QoS 1 retained messages to established subscriptions', function (t }) test('deliver QoS 0 retained message with QoS 1 subscription', function (t) { + t.plan(4) + var broker = aedes() var publisher = connect(setup(broker)) var subscriber = connect(setup(broker)) @@ -469,6 +483,8 @@ test('deliver QoS 0 retained message with QoS 1 subscription', function (t) { }) test('not clean and retain messages with QoS 1', function (t) { + t.plan(10) + var broker = aedes() var publisher var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) @@ -530,10 +546,9 @@ test('not clean and retain messages with QoS 1', function (t) { }) delete packet2.messageId t.deepEqual(packet, expected, 'packet must match') - - t.end() }) }) }) }) + broker.on('closed', t.end.bind(t)) }) diff --git a/test/will.js b/test/will.js index ac46885e..ca444f93 100644 --- a/test/will.js +++ b/test/will.js @@ -21,6 +21,8 @@ function willConnect (s, opts, connected) { } test('delivers a will', function (t) { + t.plan(4) + var opts = {} // willConnect populates opts with a will var s = willConnect(setup(), opts) @@ -41,6 +43,7 @@ test('delivers a will', function (t) { test('calling close two times should not deliver two wills', function (t) { t.plan(4) + var opts = {} var broker = aedes() @@ -67,6 +70,7 @@ test('calling close two times should not deliver two wills', function (t) { test('delivers old will in case of a crash', function (t) { t.plan(7) + var persistence = memory() var will = { topic: 'mywill', @@ -103,15 +107,17 @@ test('delivers old will in case of a crash', function (t) { broker.mq.on('mywill', function (packet) { t.fail('the will must be delivered only once') }) - setTimeout(function () { + setImmediate(function () { broker.close(t.pass.bind(t, 'server closes')) - }, 15) + }) cb() } }) }) test('store the will in the persistence', function (t) { + t.plan(5) + var opts = { clientId: 'abcde' } @@ -135,6 +141,8 @@ test('store the will in the persistence', function (t) { }) test('delete the will in the persistence after publish', function (t) { + t.plan(2) + var opts = { clientId: 'abcde' } @@ -168,6 +176,8 @@ test('delete the will in the persistence after publish', function (t) { }) test('delivers a will with authorization', function (t) { + t.plan(7) + let authorized = false var opts = {} var broker = aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(null) } }) @@ -195,13 +205,16 @@ test('delivers a will with authorization', function (t) { }) test('delivers a will waits for authorization', function (t) { + t.plan(6) + let authorized = false var opts = {} + var broker = aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; setImmediate(() => { callback(null) }) } }) // willConnect populates opts with a will - var s = willConnect(setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; setImmediate(() => { callback(null) }) } })), opts) + var s = willConnect(setup(broker), opts) s.broker.on('clientDisconnect', function () { - t.end() + t.pass('client is disconnected') }) s.broker.mq.on('mywill', function (packet, cb) { @@ -216,9 +229,12 @@ test('delivers a will waits for authorization', function (t) { process.nextTick(function () { s.conn.destroy() }) + broker.on('closed', t.end.bind(t)) }) test('does not deliver a will without authorization', function (t) { + t.plan(1) + let authorized = false var opts = {} // willConnect populates opts with a will @@ -240,6 +256,8 @@ test('does not deliver a will without authorization', function (t) { }) test('does not deliver a will without authentication', function (t) { + t.plan(1) + let authenticated = false var opts = {} // willConnect populates opts with a will @@ -257,6 +275,8 @@ test('does not deliver a will without authentication', function (t) { }) test('does not deliver will if keepalive is triggered during authentication', function (t) { + t.plan(0) + var opts = {} opts.keepalive = 1 var broker = aedes({