diff --git a/README.md b/README.md index cb4598fe..d861f5fe 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,7 @@ server.listen(8883, function () { * instance.subscribe() * instance.publish() * instance.unsubscribe() + * instance.preConnect() * instance.authenticate() * instance.authorizePublish() * instance.authorizeSubscribe() @@ -107,6 +108,8 @@ Options: packet to arrive, defaults to `30000` milliseconds * `id`: id used to identify this broker instance in `$SYS` messages, defaults to `shortid()` +* `preConnect`: function called when a valid CONNECT is received, see + [instance.preConnect()](#preConnect) * `authenticate`: function used to authenticate clients, see [instance.authenticate()](#authenticate) * `authorizePublish`: function used to authorize PUBLISH packets, see @@ -120,7 +123,9 @@ Options: Events: -* `client`: when a new [Client](#client) connects, arguments: +* `client`: when a new [Client](#client) successfully connects and register itself to server, [connackSent event will be come after], arguments: + 1. `client` +* `clientReady`: when a new [Client](#client) received all its offline messages, it is ready, arguments: 1. `client` * `clientDisconnect`: when a [Client](#client) disconnects, arguments: 1. `client` @@ -152,8 +157,9 @@ packet. [UNSUBSCRIBE](https://github.com/mqttjs/mqtt-packet#unsubscribe) packet. 2. `client` -* `connackSent`: when a CONNACK packet is sent to a client [Client](#client) (happens after `'client'`), arguments: - 1. `client` +* `connackSent`: when a CONNACK packet is sent to a client, arguments: + 1. `packet` + 2. `client` * `closed`: when the broker is closed ------------------------------------------------------- @@ -213,6 +219,26 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings. The reverse of [subscribe](#subscribe). +------------------------------------------------------- + +### instance.preConnect(client, done(err, successful)) + +It will be called when aedes instance receives a first valid CONNECT packet from client. client object state is in default and its connected state is false. Any values in CONNECT packet (like clientId, clean flag, keepalive) will pass to client object after this call. Override to supply custom preConnect logic. +Some use cases: +1. Rate Limit / Throttle by `client.conn.remoteAddress` +2. Check `instance.connectedClient` to limit maximum connections +3. IP blacklisting + +```js +instance.preConnect = function(client, callback) { + callback(null, client.conn.remoteAddress === '::1') { +} +``` +```js +instance.preConnect = function(client, callback) { + callback(new Error('connection error'), client.conn.remoteAddress !== '::1') { +} +``` ------------------------------------------------------- ### instance.authenticate(client, username, password, done(err, successful)) diff --git a/aedes.js b/aedes.js index b11cdd48..d85def5f 100644 --- a/aedes.js +++ b/aedes.js @@ -19,6 +19,7 @@ var defaultOptions = { concurrency: 100, heartbeatInterval: 60000, // 1 minute connectTimeout: 30000, // 30 secs + preConnect: defaultPreConnect, authenticate: defaultAuthenticate, authorizePublish: defaultAuthorizePublish, authorizeSubscribe: defaultAuthorizeSubscribe, @@ -50,6 +51,7 @@ function Aedes (opts) { this._series = series() this._enqueuers = reusify(DoEnqueues) + this.preConnect = opts.preConnect this.authenticate = opts.authenticate this.authorizePublish = opts.authorizePublish this.authorizeSubscribe = opts.authorizeSubscribe @@ -299,6 +301,9 @@ Aedes.prototype.close = function (cb = noop) { Aedes.prototype.version = require('./package.json').version +function defaultPreConnect (client, callback) { + callback(null, true) +} function defaultAuthenticate (client, username, password, callback) { callback(null, true) } diff --git a/lib/client.js b/lib/client.js index c4a73f8e..03cf8336 100644 --- a/lib/client.js +++ b/lib/client.js @@ -48,7 +48,8 @@ function Client (broker, conn) { function nextBatch (err) { if (err) { - return that.emit('error', err) + that.emit('error', err) + return } var buf = empty diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index 9470b74b..ead7dd3c 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -6,7 +6,13 @@ var write = require('../write') var QoSPacket = require('../qos-packet') var through = require('through2') var handleSubscribe = require('./subscribe') -var uuid = require('uuid') +var shortid = require('shortid') + +function Connack (arg) { + this.cmd = 'connack' + this.returnCode = arg.returnCode + this.sessionPresent = arg.sessionPresent +} function ClientPacketStatus (client, packet) { this.client = client @@ -15,6 +21,7 @@ function ClientPacketStatus (client, packet) { var connectActions = [ authenticate, + setKeepAlive, fetchSubs, restoreSubs, storeWill, @@ -33,33 +40,57 @@ var errorMessages = [ ] function handleConnect (client, packet, done) { - client.connected = true - client.clean = packet.clean - - if (!packet.clientId && packet.protocolVersion === 3) { - client.emit('error', new Error('Empty clientIds are supported only on MQTT 3.1.1')) - return done() - } - - client.id = packet.clientId || uuid.v4() - client._will = packet.will - clearTimeout(client._connectTimer) client._connectTimer = null - if (packet.keepalive > 0) { - client._keepaliveInterval = (packet.keepalive * 1500) + 1 - client._keepaliveTimer = retimer(function keepaliveTimeout () { - client.broker.emit('keepaliveTimeout', client) - client.emit('error', new Error('keep alive timeout')) - }, client._keepaliveInterval) + client.broker.preConnect(client, negate) + + function negate (err, successful) { + if (!err && successful === true) { + setImmediate(init, client, packet, done) + return + } + if (err) { + client.broker.emit('connectionError', client, err) + } + client.conn.destroy() } +} + +function init (client, packet, done) { + client.connected = true + var clientId = packet.clientId + var returnCode = 0 + // [MQTT-3.1.2-2] + if (packet.protocolVersion < 3 || packet.protocolVersion > 4) { + returnCode = 1 + } + // MQTT 3.1.0 allows <= 23 client id length + if (packet.protocolVersion === 3 && clientId.length > 23) { + returnCode = 2 + } + if (returnCode > 0) { + var error = new Error(errorMessages[returnCode]) + error.errorCode = returnCode + client.broker.emit('clientError', client, error) + doConnack( + { client: client, returnCode: returnCode, sessionPresent: false }, + done.bind(this, error)) + client.conn.end() + return + } + + client.id = clientId || 'aedes_' + shortid() + client.clean = packet.clean + client._will = packet.will client.broker._series( new ClientPacketStatus(client, packet), - connectActions, {}, function (err) { + connectActions, + { returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4] + function (err) { + this.client.broker.emit('clientReady', client) this.client.emit('connected') - client.connackSent = true done(err) }) } @@ -75,53 +106,62 @@ function authenticate (arg, done) { function negate (err, successful) { if (!client.connected) { - // a hack, sometimes close happends before authenticate comes back + // a hack, sometimes close() happened before authenticate() comes back // we stop here for not to register it and deregister it in write() return } - var errCode if (!err && successful) { return done() - } else if (err) { - if (err.returnCode && (err.returnCode >= 1 && err.returnCode <= 3)) { - errCode = err.returnCode - write(client, { - cmd: 'connack', - returnCode: err.returnCode - }, client.close.bind(client, done.bind(this, err))) + } + + if (err) { + var errCode = err.returnCode + if (errCode && (errCode >= 1 && errCode <= 3)) { + arg.returnCode = errCode } else { // If errorCode is 4 or not a number - errCode = 4 - write(client, { - cmd: 'connack', - returnCode: 4 - }, client.close.bind(client, done.bind(this, err))) + arg.returnCode = 4 } } else { - errCode = 5 - write(client, { - cmd: 'connack', - returnCode: 5 - }, client.close.bind(client, done.bind(this, new Error(errorMessages[errCode])))) + arg.returnCode = 5 } - var error = new Error(errorMessages[errCode]) - error.errorCode = errCode + var error = new Error(errorMessages[arg.returnCode]) + error.errorCode = arg.returnCode client.broker.emit('clientError', client, error) + arg.client = client + doConnack(arg, + // [MQTT-3.2.2-5] + client.close.bind(client, done.bind(this, error))) } } +function setKeepAlive (arg, done) { + if (this.packet.keepalive > 0) { + var client = this.client + // [MQTT-3.1.2-24] + client._keepaliveInterval = (this.packet.keepalive * 1500) + 1 + client._keepaliveTimer = retimer(function keepaliveTimeout () { + client.broker.emit('keepaliveTimeout', client) + client.emit('error', new Error('keep alive timeout')) + }, client._keepaliveInterval) + } + done() +} + function fetchSubs (arg, done) { + var client = this.client if (!this.packet.clean) { - this.client.broker.persistence.subscriptionsByClient({ - id: this.client.id, + client.broker.persistence.subscriptionsByClient({ + id: client.id, done: done, arg: arg }, gotSubs) - } else { - this.client.broker.persistence.cleanSubscriptions( - this.client, - done) + return } + arg.sessionPresent = false // [MQTT-3.2.2-1] + client.broker.persistence.cleanSubscriptions( + client, + done) } function gotSubs (err, subs, client) { @@ -135,21 +175,24 @@ function gotSubs (err, subs, client) { function restoreSubs (arg, done) { if (arg.subs) { handleSubscribe(this.client, { subscriptions: arg.subs, restore: true }, done) - } else { - done() + arg.sessionPresent = !!arg.subs // cast to boolean, [MQTT-3.2.2-2] + return } + arg.sessionPresent = false // [MQTT-3.2.2-1], [MQTT-3.2.2-3] + done() } function storeWill (arg, done) { - this.client.will = this.client._will - if (this.client.will) { - this.client.broker.persistence.putWill( - this.client, - this.client.will, + var client = this.client + client.will = client._will + if (client.will) { + client.broker.persistence.putWill( + client, + client.will, done) - } else { - done() + return } + done() } function registerClient (arg, done) { @@ -158,17 +201,12 @@ function registerClient (arg, done) { done() } -function Connack (arg) { - this.cmd = 'connack' - this.returnCode = 0 - this.sessionPresent = !!arg.subs // cast to boolean -} - function doConnack (arg, done) { - var client = this.client + var client = arg.client || this.client const connack = new Connack(arg) write(client, connack, function () { - client.broker.emit('connackSent', client) + client.broker.emit('connackSent', connack, client) + client.connackSent = true done() }) } @@ -189,7 +227,6 @@ function emptyQueue (arg, done) { function emptyQueueFilter (err, client, packet) { var next = packet.writeCallback - var persistence = client.broker.persistence if (err) { client.emit('error', err) @@ -202,6 +239,8 @@ function emptyQueueFilter (err, client, packet) { authorized = client.broker.authorizeForward(client, packet) } + var persistence = client.broker.persistence + if (client.clean || !authorized) { persistence.outgoingClearMessageId(client, packet, next) } else { diff --git a/lib/handlers/index.js b/lib/handlers/index.js index d4db8345..dcf3b2bd 100644 --- a/lib/handlers/index.js +++ b/lib/handlers/index.js @@ -11,12 +11,18 @@ var handlePing = require('./ping') function handle (client, packet, done) { if (packet.cmd === 'connect') { - // [MQTT-3.1.0-2] - return client.connected ? client.conn.destroy() : handleConnect(client, packet, done) + if (client.connected) { + // [MQTT-3.1.0-2] + client.conn.destroy() + return done(new Error('invalid protocol')) + } + handleConnect(client, packet, done) + return } if (!client.connected) { // [MQTT-3.1.0-1] - return client.conn.destroy() + client.conn.destroy() + return done(new Error('invalid protocol')) } switch (packet.cmd) { diff --git a/lib/handlers/pubrec.js b/lib/handlers/pubrec.js index 852b451c..82c2f5ff 100644 --- a/lib/handlers/pubrec.js +++ b/lib/handlers/pubrec.js @@ -21,7 +21,8 @@ function handlePubrec (client, packet, done) { function reply (err) { if (err) { // TODO is this ok? - return client._onError(err) + client._onError(err) + return } write(client, pubrel, done) diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 5a5b8ac9..d6c5458e 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -65,7 +65,8 @@ function completeUnsubscribe (err) { var done = this.finish if (err) { - return client.emit('error', err) + client.emit('error', err) + return } if ((!packet.close || client.clean === true) && packet.unsubscriptions.length > 0) { diff --git a/test/basic.js b/test/basic.js index ef02782b..d77aa019 100644 --- a/test/basic.js +++ b/test/basic.js @@ -9,83 +9,6 @@ var connect = helper.connect var noError = helper.noError var subscribe = helper.subscribe -test('connect and connack (minimal)', function (t) { - t.plan(1) - - var s = setup() - - s.inStream.write({ - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client', - keepalive: 0 - }) - - s.outStream.on('data', function (packet) { - t.deepEqual(packet, { - cmd: 'connack', - returnCode: 0, - length: 2, - qos: 0, - retain: false, - dup: false, - topic: null, - payload: null, - sessionPresent: false - }, 'successful connack') - t.end() - }) -}) - -test('the first Packet sent from the Client to the Server MUST be a CONNECT Packet [MQTT-3.1.0-1]', function (t) { - t.plan(1) - - var broker = aedes() - var s = setup(broker, false) - - var packet = { - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - qos: 0, - retain: false - } - s.inStream.write(packet) - setImmediate(() => { - t.ok(s.conn.destroyed, 'close connection if first packet is not a CONNECT') - s.conn.destroy() - broker.close() - t.end() - }) -}) - -test('second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client [MQTT-3.1.0-2]', function (t) { - t.plan(3) - - var broker = aedes() - var packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client', - keepalive: 0 - } - var s = connect(setup(broker, false), { clientId: 'abcde' }, function () { - t.ok(broker.clients['abcde'].connected) - s.inStream.write(packet) - setImmediate(() => { - t.equal(broker.clients['abcde'], undefined, 'client instance is removed') - t.ok(s.conn.destroyed, 'close connection if packet is a CONNECT after network is established') - s.conn.destroy() - broker.close() - t.end() - }) - }) -}) - test('publish QoS 0', function (t) { t.plan(2) @@ -331,37 +254,37 @@ test('broker closes', function (t) { t.plan(3) var broker = aedes() - var client = noError(connect(setup(broker, false), { clientId: 'abcde' })) - eos(client.conn, t.pass.bind('client closes')) - - setImmediate(() => { + var client = noError(connect(setup(broker, false), { + clientId: 'abcde' + }, function () { + eos(client.conn, t.pass.bind('client closes')) broker.close(function (err) { t.error(err, 'no error') t.equal(broker.clients['abcde'], undefined, 'client instance is removed') }) - }) + })) }) test('broker closes gracefully', function (t) { t.plan(7) var broker = aedes() - var client1 = noError(connect(setup(broker, false))) - var client2 = noError(connect(setup(broker, false))) - setImmediate(() => { - t.equal(broker.connectedClients, 2, '2 connected clients') - eos(client1.conn, t.pass.bind('client1 closes')) - eos(client2.conn, t.pass.bind('client2 closes')) - - setImmediate(() => { + var client1, client2 + client1 = noError(connect(setup(broker, false), { + }, function () { + client2 = noError(connect(setup(broker, false), { + }, function () { + t.equal(broker.connectedClients, 2, '2 connected clients') + eos(client1.conn, t.pass.bind('client1 closes')) + eos(client2.conn, t.pass.bind('client2 closes')) broker.close(function (err) { t.error(err, 'no error') t.ok(broker.mq.closed, 'broker mq closes') t.ok(broker.closed, 'broker closes') t.equal(broker.connectedClients, 0, 'no connected clients') }) - }) - }) + })) + })) }) test('testing other event', function (t) { @@ -473,8 +396,6 @@ test('restore QoS 0 subscriptions not clean', function (t) { t.plan(5) var broker = aedes() - var publisher - var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) var expected = { cmd: 'publish', topic: 'hello', @@ -484,26 +405,30 @@ test('restore QoS 0 subscriptions not clean', function (t) { length: 12, retain: false } + var publisher + var subscriber = connect(setup(broker), { + clean: false, clientId: 'abcde' + }, function () { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.inStream.end() - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.inStream.end() - - publisher = connect(setup(broker)) - - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { - t.equal(connect.sessionPresent, true, 'session present is set to true') - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 0 + publisher = connect(setup(broker), { + }, function () { + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { + t.equal(connect.sessionPresent, true, 'session present is set to true') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0 + }) + }) + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + t.end() + }) }) }) - - subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') - t.end() - }) }) }) @@ -512,31 +437,36 @@ test('do not restore QoS 0 subscriptions when clean', function (t) { var broker = aedes() var publisher - var subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }) - - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.inStream.end() - t.equal(subscriber.broker.persistence._subscriptions.size, 0, 'no previous subscriptions restored') - - publisher = connect(setup(broker)) - - subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }, function (connect) { - t.equal(connect.sessionPresent, false, 'session present is set to false') - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 0 + var subscriber = connect(setup(broker), { + clean: true, clientId: 'abcde' + }, function () { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.inStream.end() + subscriber.broker.persistence.subscriptionsByClient(broker.clients['abcde'], function (_, subs, client) { + t.equal(subs, null, 'no previous subscriptions restored') + }) + publisher = connect(setup(broker), { + }, function () { + subscriber = connect(setup(broker), { + clean: true, clientId: 'abcde' + }, function (connect) { + t.equal(connect.sessionPresent, false, 'session present is set to false') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0 + }) + }) + subscriber.outStream.once('data', function (packet) { + t.fail('packet received') + t.end() + }) + eos(subscriber.conn, function () { + t.equal(subscriber.broker.connectedClients, 0, 'no connected clients') + t.end() + }) }) - }) - - subscriber.outStream.once('data', function (packet) { - t.fail('packet received') - t.end() - }) - eos(subscriber.conn, function () { - t.equal(subscriber.broker.connectedClients, 0, 'no connected clients') - t.end() }) }) }) @@ -544,7 +474,6 @@ test('do not restore QoS 0 subscriptions when clean', function (t) { test('double sub does not double deliver', function (t) { t.plan(7) - var s = connect(setup()) var expected = { cmd: 'publish', topic: 'hello', @@ -554,22 +483,24 @@ test('double sub does not double deliver', function (t) { qos: 0, retain: false } - - subscribe(t, s, 'hello', 0, function () { + var s = connect(setup(), { + }, function () { subscribe(t, s, 'hello', 0, function () { - s.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet matches') - s.outStream.on('data', function () { - t.fail('double deliver') + subscribe(t, s, 'hello', 0, function () { + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + s.outStream.on('data', function () { + t.fail('double deliver') + }) + // wait for a tick, so it will double deliver + setImmediate(t.end.bind(t)) }) - // wait for a tick, so it will double deliver - setImmediate(t.end.bind(t)) - }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) }) @@ -578,7 +509,6 @@ test('double sub does not double deliver', function (t) { test('overlapping sub does not double deliver', function (t) { t.plan(7) - var s = connect(setup()) var expected = { cmd: 'publish', topic: 'hello', @@ -588,22 +518,24 @@ test('overlapping sub does not double deliver', function (t) { qos: 0, retain: false } - - subscribe(t, s, 'hello', 0, function () { - subscribe(t, s, 'hello/#', 0, function () { - s.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet matches') - s.outStream.on('data', function () { - t.fail('double deliver') + var s = connect(setup(), { + }, function () { + subscribe(t, s, 'hello', 0, function () { + subscribe(t, s, 'hello/#', 0, function () { + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + s.outStream.on('data', function () { + t.fail('double deliver') + }) + // wait for a tick, so it will double deliver + setImmediate(t.end.bind(t)) }) - // wait for a tick, so it will double deliver - setImmediate(t.end.bind(t)) - }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) }) @@ -612,23 +544,24 @@ test('overlapping sub does not double deliver', function (t) { test('clear drain', function (t) { t.plan(4) - var s = connect(setup()) + var s = connect(setup(), { + }, function () { + subscribe(t, s, 'hello', 0, function () { + // fake a busy socket + s.conn.write = function (chunk, enc, cb) { + return false + } - subscribe(t, s, 'hello', 0, function () { - // fake a busy socket - s.conn.write = function (chunk, enc, cb) { - return false - } + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }, function () { + t.pass('callback called') + }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world' - }, function () { - t.pass('callback called') + s.conn.destroy() }) - - s.conn.destroy() }) }) diff --git a/test/connect.js b/test/connect.js new file mode 100644 index 00000000..cdffc03a --- /dev/null +++ b/test/connect.js @@ -0,0 +1,363 @@ +'use strict' + +var test = require('tape').test +var helper = require('./helper') +var aedes = require('../') +var setup = helper.setup +var connect = helper.connect + +;[{ ver: 3, id: 'MQIsdp' }, { ver: 4, id: 'MQTT' }].forEach(function (ele) { + test('connect and connack (minimal)', function (t) { + t.plan(1) + + var s = setup() + + s.inStream.write({ + cmd: 'connect', + protocolId: ele.id, + protocolVersion: ele.ver, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + + s.outStream.on('data', function (packet) { + t.deepEqual(packet, { + cmd: 'connack', + returnCode: 0, + length: 2, + qos: 0, + retain: false, + dup: false, + topic: null, + payload: null, + sessionPresent: false + }, 'successful connack') + t.end() + }) + }) +}) + +// [MQTT-3.1.2-2] +test('reject client requested for unacceptable protocol version', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 5, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 1, 'unacceptable protocol version') + t.equal(broker.connectedClients, 0) + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'unacceptable protocol version') + }) + broker.on('closed', t.end.bind(t)) +}) + +// [MQTT-3.1.2-1], Guarded in mqtt-packet +test('reject client requested for unsupported protocol version', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 2, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'Invalid protocol version') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +// Guarded in mqtt-packet +test('reject clients with no clientId running on MQTT 3.1.0', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'clientId must be supplied before 3.1.1') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +// [MQTT-3.1.3-7], Guarded in mqtt-packet +test('reject clients without clientid and clean=false on MQTT 3.1.1', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: false, + clientId: '', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'clientId must be given if cleanSession set to 0') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +test('clients without clientid and clean=true on MQTT 3.1.1 will get a generated clientId', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) + broker.on('client', function (client) { + t.ok(client.id.startsWith('aedes_')) + }) + broker.on('closed', t.end.bind(t)) +}) + +test('clients with zero-byte clientid and clean=true on MQTT 3.1.1 will get a generated clientId', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: '', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) + broker.on('client', function (client) { + t.ok(client.id.startsWith('aedes_')) + }) + broker.on('closed', function () { + t.end() + }) +}) + +// [MQTT-3.1.3-7] +test('reject clients with > 23 clientId length in MQTT 3.1.0', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + clientId: 'abcdefghijklmnopqrstuvwxyz', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 2, 'identifier rejected') + t.equal(broker.connectedClients, 0) + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'identifier rejected') + }) + broker.on('closed', t.end.bind(t)) +}) + +test('connect with > 23 clientId length in MQTT 3.1.1', function (t) { + t.plan(3) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'abcdefghijklmnopqrstuvwxyz', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) + broker.on('closed', t.end.bind(t)) +}) + +// [MQTT-3.1.0-1] +test('the first Packet MUST be a CONNECT Packet', function (t) { + t.plan(1) + + var broker = aedes() + var s = setup(broker, false) + + var packet = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false + } + s.inStream.write(packet) + setImmediate(() => { + t.ok(s.conn.destroyed, 'close connection if first packet is not a CONNECT') + s.conn.destroy() + broker.close() + t.end() + }) +}) + +// [MQTT-3.1.0-2] +test('second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client ', function (t) { + t.plan(3) + + var broker = aedes() + var packet = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client', + keepalive: 0 + } + var s = connect(setup(broker, false), { clientId: 'abcde' }, function () { + t.ok(broker.clients['abcde'].connected) + s.inStream.write(packet) + setImmediate(() => { + t.equal(broker.clients['abcde'], undefined, 'client instance is removed') + t.ok(s.conn.destroyed, 'close connection if packet is a CONNECT after network is established') + broker.close() + t.end() + }) + }) +}) + +// [MQTT-3.1.2-1], Guarded in mqtt-packet +test('reject clients with wrong protocol name', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT_hello', + protocolVersion: 3, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'Invalid protocolId') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +;[[0, null, false], [1, null, true], [1, new Error('connection banned'), false], [1, new Error('connection banned'), true]].forEach(function (ele) { + var plan = ele[0] + var err = ele[1] + var ok = ele[2] + test('preConnect handler', function (t) { + t.plan(plan) + + var broker = aedes({ + preConnect: function (client, done) { + return done(err, ok) + } + }) + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + broker.on('client', function (client) { + if (ok) { + t.pass('connect ok') + } else { + t.fail('no reach here') + } + }) + broker.on('clientError', function (client, err) { + t.fail('no client error') + }) + broker.on('connectionError', function (client, err) { + if (err) { + t.equal(err.message, 'connection banned') + } else { + t.fail('no connection error') + } + }) + broker.on('closed', t.end.bind(t)) + }) +}) diff --git a/test/handlers/connect.js b/test/handlers/connect.js deleted file mode 100644 index 502ee9c7..00000000 --- a/test/handlers/connect.js +++ /dev/null @@ -1,33 +0,0 @@ -'use strict' - -var test = require('tape').test -var EE = require('events').EventEmitter -var handle = require('../../lib/handlers/index') - -test('reject clients with no clientId running on MQTT 3.1', function (t) { - t.plan(2) - - var client = new EE() - var broker = { - registerClient: function () {} - } - - client.broker = broker - client.conn = { - destroy: function () { - t.fail('should not destroy') - } - } - - client.on('error', function (err) { - t.equal(err.message, 'Empty clientIds are supported only on MQTT 3.1.1', 'error message') - }) - - handle(client, { - cmd: 'connect', - protocolVersion: 3, - protocolId: 'MQIsdp' - }, function (err) { - t.error(err, 'no error in callback') - }) -}) diff --git a/test/helper.js b/test/helper.js index d83b2e5e..262dc045 100644 --- a/test/helper.js +++ b/test/helper.js @@ -38,8 +38,8 @@ function connect (s, opts, connected) { opts = opts || {} opts.cmd = 'connect' - opts.protocolId = 'MQTT' - opts.version = 4 + opts.protocolId = opts.protocolId || 'MQTT' + opts.protocolVersion = opts.protocolVersion || 4 opts.clean = !!opts.clean opts.clientId = opts.clientId || 'my-client-' + clients++ opts.keepalive = opts.keepalive || 0 diff --git a/test/meta.js b/test/meta.js index 29cb0102..d847c253 100644 --- a/test/meta.js +++ b/test/meta.js @@ -14,14 +14,12 @@ test('count connected clients', function (t) { t.equal(broker.connectedClients, 0, 'no connected clients') - connect(setup(broker)) - - process.nextTick(function () { + connect(setup(broker), { + }, function () { t.equal(broker.connectedClients, 1, 'one connected clients') - var last = connect(setup(broker)) - - process.nextTick(function () { + var last = connect(setup(broker), { + }, function () { t.equal(broker.connectedClients, 2, 'two connected clients') last.conn.destroy() @@ -238,13 +236,14 @@ test('get aedes version', function (t) { }) test('connect and connackSent event', function (t) { + t.plan(3) + t.timeoutAfter(50) + var s = setup() var clientId = 'my-client' - t.plan(2) - t.timeoutAfter(50) - - s.broker.on('connackSent', function (client) { + s.broker.on('connackSent', function (packet, client) { + t.equal(packet.returnCode, 0) t.equal(client.id, clientId, 'connackSent event and clientId matches') t.end() }) diff --git a/test/will.js b/test/will.js index 7c3b2fb7..ca9c3584 100644 --- a/test/will.js +++ b/test/will.js @@ -24,7 +24,11 @@ test('delivers a will', function (t) { var opts = {} // willConnect populates opts with a will - var s = willConnect(setup(), opts) + var s = willConnect(setup(), + opts, + function () { + s.conn.destroy() + }) s.broker.mq.on('mywill', function (packet, cb) { t.equal(packet.topic, opts.will.topic, 'topic matches') @@ -34,10 +38,6 @@ test('delivers a will', function (t) { cb() t.end() }) - - process.nextTick(() => { - s.conn.destroy() - }) }) test('calling close two times should not deliver two wills', function (t) { @@ -179,9 +179,13 @@ test('delivers a will with authorization', function (t) { let authorized = false var opts = {} - var broker = aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(null) } }) // willConnect populates opts with a will - var s = willConnect(setup(broker), opts) + var s = willConnect( + setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(null) } })), + opts, + function () { + s.conn.destroy() + }) s.broker.on('clientDisconnect', function (client) { t.equal(client.connected, false) @@ -197,10 +201,7 @@ test('delivers a will with authorization', function (t) { cb() }) - process.nextTick(function () { - s.conn.destroy() - }) - broker.on('closed', t.end.bind(t)) + s.broker.on('closed', t.end.bind(t)) }) test('delivers a will waits for authorization', function (t) { @@ -208,9 +209,13 @@ test('delivers a will waits for authorization', function (t) { let authorized = false var opts = {} - var broker = aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; setImmediate(() => { callback(null) }) } }) // willConnect populates opts with a will - var s = willConnect(setup(broker), opts) + var s = willConnect( + setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; setImmediate(() => { callback(null) }) } })), + opts, + function () { + s.conn.destroy() + }) s.broker.on('clientDisconnect', function () { t.pass('client is disconnected') @@ -225,10 +230,7 @@ test('delivers a will waits for authorization', function (t) { cb() }) - process.nextTick(function () { - s.conn.destroy() - }) - broker.on('closed', t.end.bind(t)) + s.broker.on('closed', t.end.bind(t)) }) test('does not deliver a will without authorization', function (t) { @@ -237,7 +239,12 @@ test('does not deliver a will without authorization', function (t) { let authorized = false var opts = {} // willConnect populates opts with a will - var s = willConnect(setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(new Error()) } })), opts) + var s = willConnect( + setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(new Error()) } })), + opts, + function () { + s.conn.destroy() + }) s.broker.on('clientDisconnect', function () { t.equal(authorized, true, 'authorization called') @@ -248,10 +255,6 @@ test('does not deliver a will without authorization', function (t) { t.fail('received will without authorization') cb() }) - - process.nextTick(function () { - s.conn.destroy() - }) }) test('does not deliver a will without authentication', function (t) { @@ -260,7 +263,9 @@ test('does not deliver a will without authentication', function (t) { let authenticated = false var opts = {} // willConnect populates opts with a will - var s = willConnect(setup(aedes({ authenticate: (_1, _2, _3, callback) => { authenticated = true; callback(new Error(), false) } })), opts) + var s = willConnect( + setup(aedes({ authenticate: (_1, _2, _3, callback) => { authenticated = true; callback(new Error(), false) } })), + opts) s.broker.once('clientError', function () { t.equal(authenticated, true, 'authentication called') diff --git a/types/index.d.ts b/types/index.d.ts index 538993bb..425aae6a 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -34,6 +34,8 @@ declare namespace aedes { close (callback?: () => void): void } + export type PreConnectCallback = (client: Client) => boolean + export type AuthenticateError = Error & { returnCode: AuthErrorCode } export type AuthenticateCallback = ( @@ -57,6 +59,7 @@ declare namespace aedes { concurrency?: number heartbeatInterval?: number connectTimeout?: number + preConnect?: PreConnectCallback authenticate?: AuthenticateCallback authorizePublish?: AuthorizePublishCallback authorizeSubscribe?: AuthorizeSubscribeCallback @@ -67,6 +70,7 @@ declare namespace aedes { export interface Aedes extends EventEmitter { handle: (stream: Duplex) => void + preConnect: PreConnectCallback authenticate: AuthenticateCallback authorizePublish: AuthorizePublishCallback authorizeSubscribe: AuthorizeSubscribeCallback @@ -74,7 +78,7 @@ declare namespace aedes { published: PublishedCallback on (event: 'closed', cb: () => void): this - on (event: 'client' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this + on (event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this on (event: 'clientError' | 'connectionError', cb: (client: Client, error: Error) => void): this on (event: 'ping' | 'publish' | 'ack', cb: (packet: any, client: Client) => void): this on (event: 'subscribe' | 'unsubscribe', cb: (subscriptions: ISubscription | ISubscription[] | ISubscribePacket, client: Client) => void): this