From 24e4737ea9def32429936143ce1b61db99e5bae2 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Tue, 6 Aug 2019 22:58:32 +0800 Subject: [PATCH 01/11] Set keepalive after auth, save mem if auth fails --- lib/handlers/connect.js | 22 ++++++++++++++-------- 1 file changed, 14 insertions(+), 8 deletions(-) diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index 9470b74b..3e6ce12e 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -15,6 +15,7 @@ function ClientPacketStatus (client, packet) { var connectActions = [ authenticate, + setKeepAlive, fetchSubs, restoreSubs, storeWill, @@ -47,14 +48,6 @@ function handleConnect (client, packet, done) { clearTimeout(client._connectTimer) client._connectTimer = null - if (packet.keepalive > 0) { - client._keepaliveInterval = (packet.keepalive * 1500) + 1 - client._keepaliveTimer = retimer(function keepaliveTimeout () { - client.broker.emit('keepaliveTimeout', client) - client.emit('error', new Error('keep alive timeout')) - }, client._keepaliveInterval) - } - client.broker._series( new ClientPacketStatus(client, packet), connectActions, {}, function (err) { @@ -110,6 +103,19 @@ function authenticate (arg, done) { } } +function setKeepAlive (arg, done) { + if (this.packet.keepalive > 0) { + var client = this.client + // [MQTT-3.1.2-24] + client._keepaliveInterval = (this.packet.keepalive * 1500) + 1 + client._keepaliveTimer = retimer(function keepaliveTimeout () { + client.broker.emit('keepaliveTimeout', client) + client.emit('error', new Error('keep alive timeout')) + }, client._keepaliveInterval) + } + done() +} + function fetchSubs (arg, done) { if (!this.packet.clean) { this.client.broker.persistence.subscriptionsByClient({ From 19ac5fe6f3365fb6ad56c3b84875d600a8c67846 Mon Sep 17 00:00:00 2001 From: Gnought <1684105+gnought@users.noreply.github.com> Date: Tue, 6 Aug 2019 22:58:32 +0800 Subject: [PATCH 02/11] Enhance & Optimize connect handler - For https://github.com/mcollina/aedes/pull/260/commits/06fc8392a1ffea822d003ff6f4c01d100b0e616d, raise an "invalid protocol" callback error if there is - aedes unsupported mqtt version: CONNACK return Code = 1 - client id length > 23 [MQTT 3.1.0 only]: CONNACK return Code = 2 - Added preConnect handler in handleConnect between earliest connect checks and normal checks. This is useful for users if they want to do some earilest DDoS check before server send any responses back, in this phrase connected=false - Emit `connackSent` event and set connactSent=true if CONNACK is sent, not only after `client` event but also when normal connect checks phrase if necessary - Added packet arguments in `connackSent` event - set clientID to 'aedes_' + shortid() if empty [MQTT 3.1.1], it is better to keep it within 23 chars for better compatibility - Emit `clientReady` event after we send back all offline messages to client - Optimize negate function - Optimize doConnack function and we could re-use it - Set keepalive after authentication, save some resoures if there are plenty of failed authentication --- README.md | 9 +- aedes.js | 5 + lib/handlers/connect.js | 94 ++++++---- lib/handlers/index.js | 11 +- test/basic.js | 77 -------- test/connect.js | 379 +++++++++++++++++++++++++++++++++++++++ test/handlers/connect.js | 33 ---- test/helper.js | 4 +- test/meta.js | 9 +- types/index.d.ts | 6 +- 10 files changed, 466 insertions(+), 161 deletions(-) create mode 100644 test/connect.js delete mode 100644 test/handlers/connect.js diff --git a/README.md b/README.md index cb4598fe..fe57ca14 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,9 @@ Options: Events: -* `client`: when a new [Client](#client) connects, arguments: +* `client`: when a new [Client](#client) successfully connects and register itself to server, [connackSent event will be come after], arguments: + 1. `client` +* `clientReady`: when a new [Client](#client) received all its offline messages, it is ready, arguments: 1. `client` * `clientDisconnect`: when a [Client](#client) disconnects, arguments: 1. `client` @@ -152,8 +154,9 @@ packet. [UNSUBSCRIBE](https://github.com/mqttjs/mqtt-packet#unsubscribe) packet. 2. `client` -* `connackSent`: when a CONNACK packet is sent to a client [Client](#client) (happens after `'client'`), arguments: - 1. `client` +* `connackSent`: when a CONNACK packet is sent to a client, arguments: + 1. `packet` + 2. `client` * `closed`: when the broker is closed ------------------------------------------------------- diff --git a/aedes.js b/aedes.js index b11cdd48..f7ed7f5f 100644 --- a/aedes.js +++ b/aedes.js @@ -19,6 +19,7 @@ var defaultOptions = { concurrency: 100, heartbeatInterval: 60000, // 1 minute connectTimeout: 30000, // 30 secs + preConnect: defaultPreConnect, authenticate: defaultAuthenticate, authorizePublish: defaultAuthorizePublish, authorizeSubscribe: defaultAuthorizeSubscribe, @@ -50,6 +51,7 @@ function Aedes (opts) { this._series = series() this._enqueuers = reusify(DoEnqueues) + this.preConnect = opts.preConnect this.authenticate = opts.authenticate this.authorizePublish = opts.authorizePublish this.authorizeSubscribe = opts.authorizeSubscribe @@ -299,6 +301,9 @@ Aedes.prototype.close = function (cb = noop) { Aedes.prototype.version = require('./package.json').version +function defaultPreConnect (client) { + return true +} function defaultAuthenticate (client, username, password, callback) { callback(null, true) } diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index 3e6ce12e..ae685f15 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -6,7 +6,13 @@ var write = require('../write') var QoSPacket = require('../qos-packet') var through = require('through2') var handleSubscribe = require('./subscribe') -var uuid = require('uuid') +var shortid = require('shortid') + +function Connack (arg) { + this.cmd = 'connack' + this.returnCode = arg.returnCode + this.sessionPresent = arg.sessionPresent +} function ClientPacketStatus (client, packet) { this.client = client @@ -34,15 +40,33 @@ var errorMessages = [ ] function handleConnect (client, packet, done) { + if (client.broker.preConnect(client) === false) { + return client.conn.destroy() + } client.connected = true - client.clean = packet.clean - - if (!packet.clientId && packet.protocolVersion === 3) { - client.emit('error', new Error('Empty clientIds are supported only on MQTT 3.1.1')) - return done() + var clientId = packet.clientId + var returnCode = 0 + // [MQTT-3.1.2-2] + if (packet.protocolVersion < 3 || packet.protocolVersion > 4) { + returnCode = 1 + } + // MQTT 3.1.0 allows <= 23 client id length + if (packet.protocolVersion === 3 && clientId.length > 23) { + returnCode = 2 + } + // console.log(returnCode) + if (returnCode > 0) { + var error = new Error(errorMessages[returnCode]) + error.errorCode = returnCode + client.broker.emit('clientError', client, error) + doConnack( + { client: client, returnCode: returnCode, sessionPresent: false }, + done.bind(this, error)) + return client.conn.end() } - client.id = packet.clientId || uuid.v4() + client.id = clientId || 'aedes_' + shortid() + client.clean = packet.clean client._will = packet.will clearTimeout(client._connectTimer) @@ -50,9 +74,11 @@ function handleConnect (client, packet, done) { client.broker._series( new ClientPacketStatus(client, packet), - connectActions, {}, function (err) { + connectActions, + { returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4] + function (err) { + this.client.broker.emit('clientReady', client) this.client.emit('connected') - client.connackSent = true done(err) }) } @@ -68,38 +94,32 @@ function authenticate (arg, done) { function negate (err, successful) { if (!client.connected) { - // a hack, sometimes close happends before authenticate comes back + // a hack, sometimes close() happened before authenticate() comes back // we stop here for not to register it and deregister it in write() return } - var errCode if (!err && successful) { return done() - } else if (err) { - if (err.returnCode && (err.returnCode >= 1 && err.returnCode <= 3)) { - errCode = err.returnCode - write(client, { - cmd: 'connack', - returnCode: err.returnCode - }, client.close.bind(client, done.bind(this, err))) + } + + if (err) { + var errCode = err.returnCode + if (errCode && (errCode >= 1 && errCode <= 3)) { + arg.returnCode = errCode } else { // If errorCode is 4 or not a number - errCode = 4 - write(client, { - cmd: 'connack', - returnCode: 4 - }, client.close.bind(client, done.bind(this, err))) + arg.returnCode = 4 } } else { - errCode = 5 - write(client, { - cmd: 'connack', - returnCode: 5 - }, client.close.bind(client, done.bind(this, new Error(errorMessages[errCode])))) + arg.returnCode = 5 } - var error = new Error(errorMessages[errCode]) - error.errorCode = errCode + var error = new Error(errorMessages[arg.returnCode]) + error.errorCode = arg.returnCode client.broker.emit('clientError', client, error) + arg.client = client + doConnack(arg, + // [MQTT-3.2.2-5] + client.close.bind(client, done.bind(this, error))) } } @@ -124,6 +144,7 @@ function fetchSubs (arg, done) { arg: arg }, gotSubs) } else { + arg.sessionPresent = false // [MQTT-3.2.2-1] this.client.broker.persistence.cleanSubscriptions( this.client, done) @@ -141,7 +162,9 @@ function gotSubs (err, subs, client) { function restoreSubs (arg, done) { if (arg.subs) { handleSubscribe(this.client, { subscriptions: arg.subs, restore: true }, done) + arg.sessionPresent = !!arg.subs // cast to boolean, [MQTT-3.2.2-2] } else { + arg.sessionPresent = false // [MQTT-3.2.2-1], [MQTT-3.2.2-3] done() } } @@ -164,17 +187,12 @@ function registerClient (arg, done) { done() } -function Connack (arg) { - this.cmd = 'connack' - this.returnCode = 0 - this.sessionPresent = !!arg.subs // cast to boolean -} - function doConnack (arg, done) { - var client = this.client + var client = arg.client || this.client const connack = new Connack(arg) write(client, connack, function () { - client.broker.emit('connackSent', client) + client.broker.emit('connackSent', connack, client) + client.connackSent = true done() }) } diff --git a/lib/handlers/index.js b/lib/handlers/index.js index d4db8345..00d5cd21 100644 --- a/lib/handlers/index.js +++ b/lib/handlers/index.js @@ -11,12 +11,17 @@ var handlePing = require('./ping') function handle (client, packet, done) { if (packet.cmd === 'connect') { - // [MQTT-3.1.0-2] - return client.connected ? client.conn.destroy() : handleConnect(client, packet, done) + if (client.connected) { + // [MQTT-3.1.0-2] + client.conn.destroy() + return done(new Error('invalid protocol')) + } + return handleConnect(client, packet, done) } if (!client.connected) { // [MQTT-3.1.0-1] - return client.conn.destroy() + client.conn.destroy() + return done(new Error('invalid protocol')) } switch (packet.cmd) { diff --git a/test/basic.js b/test/basic.js index ef02782b..0e1f0547 100644 --- a/test/basic.js +++ b/test/basic.js @@ -9,83 +9,6 @@ var connect = helper.connect var noError = helper.noError var subscribe = helper.subscribe -test('connect and connack (minimal)', function (t) { - t.plan(1) - - var s = setup() - - s.inStream.write({ - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client', - keepalive: 0 - }) - - s.outStream.on('data', function (packet) { - t.deepEqual(packet, { - cmd: 'connack', - returnCode: 0, - length: 2, - qos: 0, - retain: false, - dup: false, - topic: null, - payload: null, - sessionPresent: false - }, 'successful connack') - t.end() - }) -}) - -test('the first Packet sent from the Client to the Server MUST be a CONNECT Packet [MQTT-3.1.0-1]', function (t) { - t.plan(1) - - var broker = aedes() - var s = setup(broker, false) - - var packet = { - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - qos: 0, - retain: false - } - s.inStream.write(packet) - setImmediate(() => { - t.ok(s.conn.destroyed, 'close connection if first packet is not a CONNECT') - s.conn.destroy() - broker.close() - t.end() - }) -}) - -test('second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client [MQTT-3.1.0-2]', function (t) { - t.plan(3) - - var broker = aedes() - var packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client', - keepalive: 0 - } - var s = connect(setup(broker, false), { clientId: 'abcde' }, function () { - t.ok(broker.clients['abcde'].connected) - s.inStream.write(packet) - setImmediate(() => { - t.equal(broker.clients['abcde'], undefined, 'client instance is removed') - t.ok(s.conn.destroyed, 'close connection if packet is a CONNECT after network is established') - s.conn.destroy() - broker.close() - t.end() - }) - }) -}) - test('publish QoS 0', function (t) { t.plan(2) diff --git a/test/connect.js b/test/connect.js new file mode 100644 index 00000000..9eac074d --- /dev/null +++ b/test/connect.js @@ -0,0 +1,379 @@ +'use strict' + +var test = require('tape').test +var helper = require('./helper') +var aedes = require('../') +var setup = helper.setup +var connect = helper.connect + +;[{ ver: 3, id: 'MQIsdp' }, { ver: 4, id: 'MQTT' }].forEach(function (ele) { + test('connect and connack (minimal)', function (t) { + t.plan(1) + + var s = setup() + + s.inStream.write({ + cmd: 'connect', + protocolId: ele.id, + protocolVersion: ele.ver, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + + s.outStream.on('data', function (packet) { + t.deepEqual(packet, { + cmd: 'connack', + returnCode: 0, + length: 2, + qos: 0, + retain: false, + dup: false, + topic: null, + payload: null, + sessionPresent: false + }, 'successful connack') + t.end() + }) + }) +}) + +// [MQTT-3.1.2-2] +test('reject client requested for unacceptable protocol version', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 5, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 1, 'unacceptable protocol version') + t.equal(broker.connectedClients, 0) + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'unacceptable protocol version') + }) + broker.on('closed', t.end.bind(t)) +}) + +// [MQTT-3.1.2-1], Guarded in mqtt-packet +test('reject client requested for unsupported protocol version', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 2, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'Invalid protocol version') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +// Guarded in mqtt-packet +test('reject clients with no clientId running on MQTT 3.1.0', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'clientId must be supplied before 3.1.1') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +// [MQTT-3.1.3-7], Guarded in mqtt-packet +test('reject clients without clientid and clean=false on MQTT 3.1.1', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: false, + clientId: '', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'clientId must be given if cleanSession set to 0') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +test('clients without clientid and clean=true on MQTT 3.1.1 will get a generated clientId', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) + broker.on('client', function (client) { + t.ok(client.id.startsWith('aedes_')) + }) + broker.on('closed', t.end.bind(t)) +}) + +test('clients with zero-byte clientid and clean=true on MQTT 3.1.1 will get a generated clientId', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: '', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) + broker.on('client', function (client) { + t.ok(client.id.startsWith('aedes_')) + }) + broker.on('closed', function () { + t.end() + }) +}) + +// [MQTT-3.1.3-7] +test('reject clients with > 23 clientId length in MQTT 3.1.0', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + clientId: 'abcdefghijklmnopqrstuvwxyz', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 2, 'identifier rejected') + t.equal(broker.connectedClients, 0) + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'identifier rejected') + }) + broker.on('closed', t.end.bind(t)) +}) + +test('connect with > 23 clientId length in MQTT 3.1.1', function (t) { + t.plan(3) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'abcdefghijklmnopqrstuvwxyz', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) + broker.on('closed', t.end.bind(t)) +}) + +// [MQTT-3.1.0-1] +test('the first Packet MUST be a CONNECT Packet', function (t) { + t.plan(1) + + var broker = aedes() + var s = setup(broker, false) + + var packet = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false + } + s.inStream.write(packet) + setImmediate(() => { + t.ok(s.conn.destroyed, 'close connection if first packet is not a CONNECT') + s.conn.destroy() + broker.close() + t.end() + }) +}) + +// [MQTT-3.1.0-2] +test('second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client ', function (t) { + t.plan(3) + + var broker = aedes() + var packet = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client', + keepalive: 0 + } + var s = connect(setup(broker, false), { clientId: 'abcde' }, function () { + t.ok(broker.clients['abcde'].connected) + s.inStream.write(packet) + setImmediate(() => { + t.equal(broker.clients['abcde'], undefined, 'client instance is removed') + t.ok(s.conn.destroyed, 'close connection if packet is a CONNECT after network is established') + broker.close() + t.end() + }) + }) +}) + +// [MQTT-3.1.2-1], Guarded in mqtt-packet +test('reject clients with wrong protocol name', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT_hello', + protocolVersion: 3, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'Invalid protocolId') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +test('preConnect handler', function (t) { + t.plan(0) + + var broker = aedes({ + preConnect: function (client, done) { + return false + } + }) + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + broker.on('client', function (client) { + t.fail('no reach here') + }) + broker.on('clientError', function (client, err) { + t.fail('no reach here') + }) + broker.on('connectionError', function (client, err) { + t.fail('no reach here') + }) + broker.on('closed', t.end.bind(t)) +}) + +/* +- We do some checks in earliest connect phrase, and destory connections if there are invalid checks. Designed for WAF. +* checking of zero-byte client id [MQTT 3.1.0 only] +* improper MQTT version and protocol id mapping in CONNECT +* includes https://github.com/mcollina/aedes/pull/260/commits/06fc8392a1ffea822d003ff6f4c01d100b0e616d +* raised an "invalid protocol" callback error if there is + +- Normal connect checks with CONNACK responses, and raised clientError +* Unsupported aedes supported mqtt version [MQTT-3.1.2-2]: CONNACK return Code = 1 +* Empty clientid but clean=false [MQTT-3.1.3-7]: CONNACK return Code = 2 +* client id length > 23 [MQTT 3.1.0 only]: CONNACK return Code = 2 + +- Emit `connackSent` event and set connactSent=true if CONNACK is sent, not only after `client` event but also when normal connect checks phrase if necessary + +- Added packet arguments in `connackSent` event + +- Added preConnect handler in handleConnect between earliest connect checks and normal checks. This is useful for users if they want to do some earilest DDoS check before server send any responses back, in this phrase connected=false + +- set clientID to 'aedes_' + shortid() if empty [MQTT 3.1.1], it is better to keep it within 23 chars for better compatibility + +- Emit `clientReady` event after we send back all offline messages to client + +- Optimize negate function + +- Optimize doConnack function and we could re-use it + +- Set keepalive after authentication, save some resoures if there are plenty of failed authentication +*/ diff --git a/test/handlers/connect.js b/test/handlers/connect.js deleted file mode 100644 index 502ee9c7..00000000 --- a/test/handlers/connect.js +++ /dev/null @@ -1,33 +0,0 @@ -'use strict' - -var test = require('tape').test -var EE = require('events').EventEmitter -var handle = require('../../lib/handlers/index') - -test('reject clients with no clientId running on MQTT 3.1', function (t) { - t.plan(2) - - var client = new EE() - var broker = { - registerClient: function () {} - } - - client.broker = broker - client.conn = { - destroy: function () { - t.fail('should not destroy') - } - } - - client.on('error', function (err) { - t.equal(err.message, 'Empty clientIds are supported only on MQTT 3.1.1', 'error message') - }) - - handle(client, { - cmd: 'connect', - protocolVersion: 3, - protocolId: 'MQIsdp' - }, function (err) { - t.error(err, 'no error in callback') - }) -}) diff --git a/test/helper.js b/test/helper.js index d83b2e5e..262dc045 100644 --- a/test/helper.js +++ b/test/helper.js @@ -38,8 +38,8 @@ function connect (s, opts, connected) { opts = opts || {} opts.cmd = 'connect' - opts.protocolId = 'MQTT' - opts.version = 4 + opts.protocolId = opts.protocolId || 'MQTT' + opts.protocolVersion = opts.protocolVersion || 4 opts.clean = !!opts.clean opts.clientId = opts.clientId || 'my-client-' + clients++ opts.keepalive = opts.keepalive || 0 diff --git a/test/meta.js b/test/meta.js index 29cb0102..a8dadb31 100644 --- a/test/meta.js +++ b/test/meta.js @@ -238,13 +238,14 @@ test('get aedes version', function (t) { }) test('connect and connackSent event', function (t) { + t.plan(3) + t.timeoutAfter(50) + var s = setup() var clientId = 'my-client' - t.plan(2) - t.timeoutAfter(50) - - s.broker.on('connackSent', function (client) { + s.broker.on('connackSent', function (packet, client) { + t.equal(packet.returnCode, 0) t.equal(client.id, clientId, 'connackSent event and clientId matches') t.end() }) diff --git a/types/index.d.ts b/types/index.d.ts index 538993bb..425aae6a 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -34,6 +34,8 @@ declare namespace aedes { close (callback?: () => void): void } + export type PreConnectCallback = (client: Client) => boolean + export type AuthenticateError = Error & { returnCode: AuthErrorCode } export type AuthenticateCallback = ( @@ -57,6 +59,7 @@ declare namespace aedes { concurrency?: number heartbeatInterval?: number connectTimeout?: number + preConnect?: PreConnectCallback authenticate?: AuthenticateCallback authorizePublish?: AuthorizePublishCallback authorizeSubscribe?: AuthorizeSubscribeCallback @@ -67,6 +70,7 @@ declare namespace aedes { export interface Aedes extends EventEmitter { handle: (stream: Duplex) => void + preConnect: PreConnectCallback authenticate: AuthenticateCallback authorizePublish: AuthorizePublishCallback authorizeSubscribe: AuthorizeSubscribeCallback @@ -74,7 +78,7 @@ declare namespace aedes { published: PublishedCallback on (event: 'closed', cb: () => void): this - on (event: 'client' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this + on (event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this on (event: 'clientError' | 'connectionError', cb: (client: Client, error: Error) => void): this on (event: 'ping' | 'publish' | 'ack', cb: (packet: any, client: Client) => void): this on (event: 'subscribe' | 'unsubscribe', cb: (subscriptions: ISubscription | ISubscription[] | ISubscribePacket, client: Client) => void): this From 3f06de4a491cd8b60817c4d6b7f021a7747663f1 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Sat, 10 Aug 2019 19:48:41 +0800 Subject: [PATCH 03/11] Clear connect timer earilest, performance-wise --- lib/handlers/connect.js | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index ae685f15..8742f34f 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -40,6 +40,9 @@ var errorMessages = [ ] function handleConnect (client, packet, done) { + clearTimeout(client._connectTimer) + client._connectTimer = null + if (client.broker.preConnect(client) === false) { return client.conn.destroy() } @@ -69,9 +72,6 @@ function handleConnect (client, packet, done) { client.clean = packet.clean client._will = packet.will - clearTimeout(client._connectTimer) - client._connectTimer = null - client.broker._series( new ClientPacketStatus(client, packet), connectActions, From ea41f12d40ed6e637e15cc23cd56b4fc96b3769d Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Sun, 11 Aug 2019 17:40:04 +0800 Subject: [PATCH 04/11] Refactored --- lib/handlers/connect.js | 38 +++++++++++++++++++------------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index 8742f34f..cd5e35cd 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -57,7 +57,6 @@ function handleConnect (client, packet, done) { if (packet.protocolVersion === 3 && clientId.length > 23) { returnCode = 2 } - // console.log(returnCode) if (returnCode > 0) { var error = new Error(errorMessages[returnCode]) error.errorCode = returnCode @@ -137,18 +136,18 @@ function setKeepAlive (arg, done) { } function fetchSubs (arg, done) { + var client = this.client if (!this.packet.clean) { - this.client.broker.persistence.subscriptionsByClient({ - id: this.client.id, + return client.broker.persistence.subscriptionsByClient({ + id: client.id, done: done, arg: arg }, gotSubs) - } else { - arg.sessionPresent = false // [MQTT-3.2.2-1] - this.client.broker.persistence.cleanSubscriptions( - this.client, - done) } + arg.sessionPresent = false // [MQTT-3.2.2-1] + client.broker.persistence.cleanSubscriptions( + client, + done) } function gotSubs (err, subs, client) { @@ -163,22 +162,22 @@ function restoreSubs (arg, done) { if (arg.subs) { handleSubscribe(this.client, { subscriptions: arg.subs, restore: true }, done) arg.sessionPresent = !!arg.subs // cast to boolean, [MQTT-3.2.2-2] - } else { - arg.sessionPresent = false // [MQTT-3.2.2-1], [MQTT-3.2.2-3] - done() + return } + arg.sessionPresent = false // [MQTT-3.2.2-1], [MQTT-3.2.2-3] + done() } function storeWill (arg, done) { - this.client.will = this.client._will - if (this.client.will) { - this.client.broker.persistence.putWill( - this.client, - this.client.will, + var client = this.client + client.will = client._will + if (client.will) { + return client.broker.persistence.putWill( + client, + client.will, done) - } else { - done() } + done() } function registerClient (arg, done) { @@ -213,7 +212,6 @@ function emptyQueue (arg, done) { function emptyQueueFilter (err, client, packet) { var next = packet.writeCallback - var persistence = client.broker.persistence if (err) { client.emit('error', err) @@ -226,6 +224,8 @@ function emptyQueueFilter (err, client, packet) { authorized = client.broker.authorizeForward(client, packet) } + var persistence = client.broker.persistence + if (client.clean || !authorized) { persistence.outgoingClearMessageId(client, packet, next) } else { From 9e02e161c57fdb404201470f20a86e76566deeac Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Wed, 14 Aug 2019 03:55:43 +0800 Subject: [PATCH 05/11] Drop useless comments --- test/connect.js | 29 ----------------------------- 1 file changed, 29 deletions(-) diff --git a/test/connect.js b/test/connect.js index 9eac074d..7bc516a9 100644 --- a/test/connect.js +++ b/test/connect.js @@ -348,32 +348,3 @@ test('preConnect handler', function (t) { }) broker.on('closed', t.end.bind(t)) }) - -/* -- We do some checks in earliest connect phrase, and destory connections if there are invalid checks. Designed for WAF. -* checking of zero-byte client id [MQTT 3.1.0 only] -* improper MQTT version and protocol id mapping in CONNECT -* includes https://github.com/mcollina/aedes/pull/260/commits/06fc8392a1ffea822d003ff6f4c01d100b0e616d -* raised an "invalid protocol" callback error if there is - -- Normal connect checks with CONNACK responses, and raised clientError -* Unsupported aedes supported mqtt version [MQTT-3.1.2-2]: CONNACK return Code = 1 -* Empty clientid but clean=false [MQTT-3.1.3-7]: CONNACK return Code = 2 -* client id length > 23 [MQTT 3.1.0 only]: CONNACK return Code = 2 - -- Emit `connackSent` event and set connactSent=true if CONNACK is sent, not only after `client` event but also when normal connect checks phrase if necessary - -- Added packet arguments in `connackSent` event - -- Added preConnect handler in handleConnect between earliest connect checks and normal checks. This is useful for users if they want to do some earilest DDoS check before server send any responses back, in this phrase connected=false - -- set clientID to 'aedes_' + shortid() if empty [MQTT 3.1.1], it is better to keep it within 23 chars for better compatibility - -- Emit `clientReady` event after we send back all offline messages to client - -- Optimize negate function - -- Optimize doConnack function and we could re-use it - -- Set keepalive after authentication, save some resoures if there are plenty of failed authentication -*/ From 6bf6e20d73ae5fa1b4e3421c5581be587cedb580 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Thu, 15 Aug 2019 00:07:55 +0800 Subject: [PATCH 06/11] Removed unused done arguments --- test/connect.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/connect.js b/test/connect.js index 7bc516a9..8318788a 100644 --- a/test/connect.js +++ b/test/connect.js @@ -323,7 +323,7 @@ test('preConnect handler', function (t) { t.plan(0) var broker = aedes({ - preConnect: function (client, done) { + preConnect: function (client) { return false } }) From 7ff58626bb1720896d16fc515ed89d33468bf733 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Thu, 15 Aug 2019 00:29:14 +0800 Subject: [PATCH 07/11] Added preConnect doc in README.md --- README.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/README.md b/README.md index fe57ca14..5ecc146f 100644 --- a/README.md +++ b/README.md @@ -69,6 +69,7 @@ server.listen(8883, function () { * instance.subscribe() * instance.publish() * instance.unsubscribe() + * instance.preConnect() * instance.authenticate() * instance.authorizePublish() * instance.authorizeSubscribe() @@ -107,6 +108,8 @@ Options: packet to arrive, defaults to `30000` milliseconds * `id`: id used to identify this broker instance in `$SYS` messages, defaults to `shortid()` +* `preConnect`: function called when a valid CONNECT is received, see + instance.preConnect()](#preConnect) * `authenticate`: function used to authenticate clients, see [instance.authenticate()](#authenticate) * `authorizePublish`: function used to authorize PUBLISH packets, see @@ -216,6 +219,26 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings. The reverse of [subscribe](#subscribe). +------------------------------------------------------- + +### instance.preConnect(client) + +It will be called when aedes instance receives a first valid CONNECT packet from client. client object state is in default and its connected state is false. Any values in CONNECT packet (like clientId, clean flag, keepalive) will pass to client object after this call. Override to supply custom preConnect logic. +Some use cases: +1. Rate Limit / Throttle by `client.conn.remoteAddress` +2. Check `instance.connectedClient` to limit maximum connections +3. IP blacklisting + +```js +instance.preConnect = function(client) { + if (client.conn.remoteAddress === '::1') { + // true => ok + return true + } + // false => client connection will be destroyed + return false +} +``` ------------------------------------------------------- ### instance.authenticate(client, username, password, done(err, successful)) From 37172f9d45eec3a79386c4d0905fb7e3e545d474 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Sat, 17 Aug 2019 14:27:43 +0800 Subject: [PATCH 08/11] Used connect callback/event in unit tests --- test/basic.js | 212 ++++++++++++++++++++++++++------------------------ test/meta.js | 10 +-- test/will.js | 51 ++++++------ 3 files changed, 143 insertions(+), 130 deletions(-) diff --git a/test/basic.js b/test/basic.js index 0e1f0547..d77aa019 100644 --- a/test/basic.js +++ b/test/basic.js @@ -254,37 +254,37 @@ test('broker closes', function (t) { t.plan(3) var broker = aedes() - var client = noError(connect(setup(broker, false), { clientId: 'abcde' })) - eos(client.conn, t.pass.bind('client closes')) - - setImmediate(() => { + var client = noError(connect(setup(broker, false), { + clientId: 'abcde' + }, function () { + eos(client.conn, t.pass.bind('client closes')) broker.close(function (err) { t.error(err, 'no error') t.equal(broker.clients['abcde'], undefined, 'client instance is removed') }) - }) + })) }) test('broker closes gracefully', function (t) { t.plan(7) var broker = aedes() - var client1 = noError(connect(setup(broker, false))) - var client2 = noError(connect(setup(broker, false))) - setImmediate(() => { - t.equal(broker.connectedClients, 2, '2 connected clients') - eos(client1.conn, t.pass.bind('client1 closes')) - eos(client2.conn, t.pass.bind('client2 closes')) - - setImmediate(() => { + var client1, client2 + client1 = noError(connect(setup(broker, false), { + }, function () { + client2 = noError(connect(setup(broker, false), { + }, function () { + t.equal(broker.connectedClients, 2, '2 connected clients') + eos(client1.conn, t.pass.bind('client1 closes')) + eos(client2.conn, t.pass.bind('client2 closes')) broker.close(function (err) { t.error(err, 'no error') t.ok(broker.mq.closed, 'broker mq closes') t.ok(broker.closed, 'broker closes') t.equal(broker.connectedClients, 0, 'no connected clients') }) - }) - }) + })) + })) }) test('testing other event', function (t) { @@ -396,8 +396,6 @@ test('restore QoS 0 subscriptions not clean', function (t) { t.plan(5) var broker = aedes() - var publisher - var subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }) var expected = { cmd: 'publish', topic: 'hello', @@ -407,26 +405,30 @@ test('restore QoS 0 subscriptions not clean', function (t) { length: 12, retain: false } + var publisher + var subscriber = connect(setup(broker), { + clean: false, clientId: 'abcde' + }, function () { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.inStream.end() - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.inStream.end() - - publisher = connect(setup(broker)) - - subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { - t.equal(connect.sessionPresent, true, 'session present is set to true') - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 0 + publisher = connect(setup(broker), { + }, function () { + subscriber = connect(setup(broker), { clean: false, clientId: 'abcde' }, function (connect) { + t.equal(connect.sessionPresent, true, 'session present is set to true') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0 + }) + }) + subscriber.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet must match') + t.end() + }) }) }) - - subscriber.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet must match') - t.end() - }) }) }) @@ -435,31 +437,36 @@ test('do not restore QoS 0 subscriptions when clean', function (t) { var broker = aedes() var publisher - var subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }) - - subscribe(t, subscriber, 'hello', 0, function () { - subscriber.inStream.end() - t.equal(subscriber.broker.persistence._subscriptions.size, 0, 'no previous subscriptions restored') - - publisher = connect(setup(broker)) - - subscriber = connect(setup(broker), { clean: true, clientId: 'abcde' }, function (connect) { - t.equal(connect.sessionPresent, false, 'session present is set to false') - publisher.inStream.write({ - cmd: 'publish', - topic: 'hello', - payload: 'world', - qos: 0 + var subscriber = connect(setup(broker), { + clean: true, clientId: 'abcde' + }, function () { + subscribe(t, subscriber, 'hello', 0, function () { + subscriber.inStream.end() + subscriber.broker.persistence.subscriptionsByClient(broker.clients['abcde'], function (_, subs, client) { + t.equal(subs, null, 'no previous subscriptions restored') + }) + publisher = connect(setup(broker), { + }, function () { + subscriber = connect(setup(broker), { + clean: true, clientId: 'abcde' + }, function (connect) { + t.equal(connect.sessionPresent, false, 'session present is set to false') + publisher.inStream.write({ + cmd: 'publish', + topic: 'hello', + payload: 'world', + qos: 0 + }) + }) + subscriber.outStream.once('data', function (packet) { + t.fail('packet received') + t.end() + }) + eos(subscriber.conn, function () { + t.equal(subscriber.broker.connectedClients, 0, 'no connected clients') + t.end() + }) }) - }) - - subscriber.outStream.once('data', function (packet) { - t.fail('packet received') - t.end() - }) - eos(subscriber.conn, function () { - t.equal(subscriber.broker.connectedClients, 0, 'no connected clients') - t.end() }) }) }) @@ -467,7 +474,6 @@ test('do not restore QoS 0 subscriptions when clean', function (t) { test('double sub does not double deliver', function (t) { t.plan(7) - var s = connect(setup()) var expected = { cmd: 'publish', topic: 'hello', @@ -477,22 +483,24 @@ test('double sub does not double deliver', function (t) { qos: 0, retain: false } - - subscribe(t, s, 'hello', 0, function () { + var s = connect(setup(), { + }, function () { subscribe(t, s, 'hello', 0, function () { - s.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet matches') - s.outStream.on('data', function () { - t.fail('double deliver') + subscribe(t, s, 'hello', 0, function () { + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + s.outStream.on('data', function () { + t.fail('double deliver') + }) + // wait for a tick, so it will double deliver + setImmediate(t.end.bind(t)) }) - // wait for a tick, so it will double deliver - setImmediate(t.end.bind(t)) - }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) }) @@ -501,7 +509,6 @@ test('double sub does not double deliver', function (t) { test('overlapping sub does not double deliver', function (t) { t.plan(7) - var s = connect(setup()) var expected = { cmd: 'publish', topic: 'hello', @@ -511,22 +518,24 @@ test('overlapping sub does not double deliver', function (t) { qos: 0, retain: false } - - subscribe(t, s, 'hello', 0, function () { - subscribe(t, s, 'hello/#', 0, function () { - s.outStream.once('data', function (packet) { - t.deepEqual(packet, expected, 'packet matches') - s.outStream.on('data', function () { - t.fail('double deliver') + var s = connect(setup(), { + }, function () { + subscribe(t, s, 'hello', 0, function () { + subscribe(t, s, 'hello/#', 0, function () { + s.outStream.once('data', function (packet) { + t.deepEqual(packet, expected, 'packet matches') + s.outStream.on('data', function () { + t.fail('double deliver') + }) + // wait for a tick, so it will double deliver + setImmediate(t.end.bind(t)) }) - // wait for a tick, so it will double deliver - setImmediate(t.end.bind(t)) - }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world' + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }) }) }) }) @@ -535,23 +544,24 @@ test('overlapping sub does not double deliver', function (t) { test('clear drain', function (t) { t.plan(4) - var s = connect(setup()) + var s = connect(setup(), { + }, function () { + subscribe(t, s, 'hello', 0, function () { + // fake a busy socket + s.conn.write = function (chunk, enc, cb) { + return false + } - subscribe(t, s, 'hello', 0, function () { - // fake a busy socket - s.conn.write = function (chunk, enc, cb) { - return false - } + s.broker.publish({ + cmd: 'publish', + topic: 'hello', + payload: 'world' + }, function () { + t.pass('callback called') + }) - s.broker.publish({ - cmd: 'publish', - topic: 'hello', - payload: 'world' - }, function () { - t.pass('callback called') + s.conn.destroy() }) - - s.conn.destroy() }) }) diff --git a/test/meta.js b/test/meta.js index a8dadb31..d847c253 100644 --- a/test/meta.js +++ b/test/meta.js @@ -14,14 +14,12 @@ test('count connected clients', function (t) { t.equal(broker.connectedClients, 0, 'no connected clients') - connect(setup(broker)) - - process.nextTick(function () { + connect(setup(broker), { + }, function () { t.equal(broker.connectedClients, 1, 'one connected clients') - var last = connect(setup(broker)) - - process.nextTick(function () { + var last = connect(setup(broker), { + }, function () { t.equal(broker.connectedClients, 2, 'two connected clients') last.conn.destroy() diff --git a/test/will.js b/test/will.js index 7c3b2fb7..ca9c3584 100644 --- a/test/will.js +++ b/test/will.js @@ -24,7 +24,11 @@ test('delivers a will', function (t) { var opts = {} // willConnect populates opts with a will - var s = willConnect(setup(), opts) + var s = willConnect(setup(), + opts, + function () { + s.conn.destroy() + }) s.broker.mq.on('mywill', function (packet, cb) { t.equal(packet.topic, opts.will.topic, 'topic matches') @@ -34,10 +38,6 @@ test('delivers a will', function (t) { cb() t.end() }) - - process.nextTick(() => { - s.conn.destroy() - }) }) test('calling close two times should not deliver two wills', function (t) { @@ -179,9 +179,13 @@ test('delivers a will with authorization', function (t) { let authorized = false var opts = {} - var broker = aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(null) } }) // willConnect populates opts with a will - var s = willConnect(setup(broker), opts) + var s = willConnect( + setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(null) } })), + opts, + function () { + s.conn.destroy() + }) s.broker.on('clientDisconnect', function (client) { t.equal(client.connected, false) @@ -197,10 +201,7 @@ test('delivers a will with authorization', function (t) { cb() }) - process.nextTick(function () { - s.conn.destroy() - }) - broker.on('closed', t.end.bind(t)) + s.broker.on('closed', t.end.bind(t)) }) test('delivers a will waits for authorization', function (t) { @@ -208,9 +209,13 @@ test('delivers a will waits for authorization', function (t) { let authorized = false var opts = {} - var broker = aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; setImmediate(() => { callback(null) }) } }) // willConnect populates opts with a will - var s = willConnect(setup(broker), opts) + var s = willConnect( + setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; setImmediate(() => { callback(null) }) } })), + opts, + function () { + s.conn.destroy() + }) s.broker.on('clientDisconnect', function () { t.pass('client is disconnected') @@ -225,10 +230,7 @@ test('delivers a will waits for authorization', function (t) { cb() }) - process.nextTick(function () { - s.conn.destroy() - }) - broker.on('closed', t.end.bind(t)) + s.broker.on('closed', t.end.bind(t)) }) test('does not deliver a will without authorization', function (t) { @@ -237,7 +239,12 @@ test('does not deliver a will without authorization', function (t) { let authorized = false var opts = {} // willConnect populates opts with a will - var s = willConnect(setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(new Error()) } })), opts) + var s = willConnect( + setup(aedes({ authorizePublish: (_1, _2, callback) => { authorized = true; callback(new Error()) } })), + opts, + function () { + s.conn.destroy() + }) s.broker.on('clientDisconnect', function () { t.equal(authorized, true, 'authorization called') @@ -248,10 +255,6 @@ test('does not deliver a will without authorization', function (t) { t.fail('received will without authorization') cb() }) - - process.nextTick(function () { - s.conn.destroy() - }) }) test('does not deliver a will without authentication', function (t) { @@ -260,7 +263,9 @@ test('does not deliver a will without authentication', function (t) { let authenticated = false var opts = {} // willConnect populates opts with a will - var s = willConnect(setup(aedes({ authenticate: (_1, _2, _3, callback) => { authenticated = true; callback(new Error(), false) } })), opts) + var s = willConnect( + setup(aedes({ authenticate: (_1, _2, _3, callback) => { authenticated = true; callback(new Error(), false) } })), + opts) s.broker.once('clientError', function () { t.equal(authenticated, true, 'authentication called') From fcc8ddc5771eec4442e45bfc9e3856685555a3d3 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Sat, 17 Aug 2019 14:33:16 +0800 Subject: [PATCH 09/11] Make preConnect() having a callback --- README.md | 18 +++++++++--------- aedes.js | 4 ++-- lib/handlers/connect.js | 19 ++++++++++++++++--- 3 files changed, 27 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 5ecc146f..d861f5fe 100644 --- a/README.md +++ b/README.md @@ -109,7 +109,7 @@ Options: * `id`: id used to identify this broker instance in `$SYS` messages, defaults to `shortid()` * `preConnect`: function called when a valid CONNECT is received, see - instance.preConnect()](#preConnect) + [instance.preConnect()](#preConnect) * `authenticate`: function used to authenticate clients, see [instance.authenticate()](#authenticate) * `authorizePublish`: function used to authorize PUBLISH packets, see @@ -221,7 +221,7 @@ The reverse of [subscribe](#subscribe). ------------------------------------------------------- -### instance.preConnect(client) +### instance.preConnect(client, done(err, successful)) It will be called when aedes instance receives a first valid CONNECT packet from client. client object state is in default and its connected state is false. Any values in CONNECT packet (like clientId, clean flag, keepalive) will pass to client object after this call. Override to supply custom preConnect logic. Some use cases: @@ -230,13 +230,13 @@ Some use cases: 3. IP blacklisting ```js -instance.preConnect = function(client) { - if (client.conn.remoteAddress === '::1') { - // true => ok - return true - } - // false => client connection will be destroyed - return false +instance.preConnect = function(client, callback) { + callback(null, client.conn.remoteAddress === '::1') { +} +``` +```js +instance.preConnect = function(client, callback) { + callback(new Error('connection error'), client.conn.remoteAddress !== '::1') { } ``` ------------------------------------------------------- diff --git a/aedes.js b/aedes.js index f7ed7f5f..d85def5f 100644 --- a/aedes.js +++ b/aedes.js @@ -301,8 +301,8 @@ Aedes.prototype.close = function (cb = noop) { Aedes.prototype.version = require('./package.json').version -function defaultPreConnect (client) { - return true +function defaultPreConnect (client, callback) { + callback(null, true) } function defaultAuthenticate (client, username, password, callback) { callback(null, true) diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index cd5e35cd..dd5b8f9a 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -43,9 +43,21 @@ function handleConnect (client, packet, done) { clearTimeout(client._connectTimer) client._connectTimer = null - if (client.broker.preConnect(client) === false) { - return client.conn.destroy() + client.broker.preConnect(client, negate) + + function negate (err, successful) { + if (!err && successful === true) { + setImmediate(init, client, packet, done) + return + } + if (err) { + client.broker.emit('connectionError', client, err) + } + client.conn.destroy() } +} + +function init (client, packet, done) { client.connected = true var clientId = packet.clientId var returnCode = 0 @@ -64,7 +76,8 @@ function handleConnect (client, packet, done) { doConnack( { client: client, returnCode: returnCode, sessionPresent: false }, done.bind(this, error)) - return client.conn.end() + client.conn.end() + return } client.id = clientId || 'aedes_' + shortid() From 83371d815cea702f9bbd2bb7a32e12c2c8368bcc Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Sat, 17 Aug 2019 14:33:53 +0800 Subject: [PATCH 10/11] Fixed preConnect unit test --- test/connect.js | 65 +++++++++++++++++++++++++++++-------------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/test/connect.js b/test/connect.js index 8318788a..cdffc03a 100644 --- a/test/connect.js +++ b/test/connect.js @@ -319,32 +319,45 @@ test('reject clients with wrong protocol name', function (t) { broker.on('closed', t.end.bind(t)) }) -test('preConnect handler', function (t) { - t.plan(0) - - var broker = aedes({ - preConnect: function (client) { - return false - } - }) - var s = setup(broker) +;[[0, null, false], [1, null, true], [1, new Error('connection banned'), false], [1, new Error('connection banned'), true]].forEach(function (ele) { + var plan = ele[0] + var err = ele[1] + var ok = ele[2] + test('preConnect handler', function (t) { + t.plan(plan) + + var broker = aedes({ + preConnect: function (client, done) { + return done(err, ok) + } + }) + var s = setup(broker) - s.inStream.write({ - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client', - keepalive: 0 - }) - broker.on('client', function (client) { - t.fail('no reach here') - }) - broker.on('clientError', function (client, err) { - t.fail('no reach here') - }) - broker.on('connectionError', function (client, err) { - t.fail('no reach here') + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + broker.on('client', function (client) { + if (ok) { + t.pass('connect ok') + } else { + t.fail('no reach here') + } + }) + broker.on('clientError', function (client, err) { + t.fail('no client error') + }) + broker.on('connectionError', function (client, err) { + if (err) { + t.equal(err.message, 'connection banned') + } else { + t.fail('no connection error') + } + }) + broker.on('closed', t.end.bind(t)) }) - broker.on('closed', t.end.bind(t)) }) From 0f159f7c04c887a96b518710431faae850f76b71 Mon Sep 17 00:00:00 2001 From: gnought <1684105+gnought@users.noreply.github.com> Date: Sat, 17 Aug 2019 14:55:27 +0800 Subject: [PATCH 11/11] Split return in favour of V8 optimization --- lib/client.js | 3 ++- lib/handlers/connect.js | 6 ++++-- lib/handlers/index.js | 3 ++- lib/handlers/pubrec.js | 3 ++- lib/handlers/unsubscribe.js | 3 ++- 5 files changed, 12 insertions(+), 6 deletions(-) diff --git a/lib/client.js b/lib/client.js index c4a73f8e..03cf8336 100644 --- a/lib/client.js +++ b/lib/client.js @@ -48,7 +48,8 @@ function Client (broker, conn) { function nextBatch (err) { if (err) { - return that.emit('error', err) + that.emit('error', err) + return } var buf = empty diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index dd5b8f9a..ead7dd3c 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -151,11 +151,12 @@ function setKeepAlive (arg, done) { function fetchSubs (arg, done) { var client = this.client if (!this.packet.clean) { - return client.broker.persistence.subscriptionsByClient({ + client.broker.persistence.subscriptionsByClient({ id: client.id, done: done, arg: arg }, gotSubs) + return } arg.sessionPresent = false // [MQTT-3.2.2-1] client.broker.persistence.cleanSubscriptions( @@ -185,10 +186,11 @@ function storeWill (arg, done) { var client = this.client client.will = client._will if (client.will) { - return client.broker.persistence.putWill( + client.broker.persistence.putWill( client, client.will, done) + return } done() } diff --git a/lib/handlers/index.js b/lib/handlers/index.js index 00d5cd21..dcf3b2bd 100644 --- a/lib/handlers/index.js +++ b/lib/handlers/index.js @@ -16,7 +16,8 @@ function handle (client, packet, done) { client.conn.destroy() return done(new Error('invalid protocol')) } - return handleConnect(client, packet, done) + handleConnect(client, packet, done) + return } if (!client.connected) { // [MQTT-3.1.0-1] diff --git a/lib/handlers/pubrec.js b/lib/handlers/pubrec.js index 852b451c..82c2f5ff 100644 --- a/lib/handlers/pubrec.js +++ b/lib/handlers/pubrec.js @@ -21,7 +21,8 @@ function handlePubrec (client, packet, done) { function reply (err) { if (err) { // TODO is this ok? - return client._onError(err) + client._onError(err) + return } write(client, pubrel, done) diff --git a/lib/handlers/unsubscribe.js b/lib/handlers/unsubscribe.js index 5a5b8ac9..d6c5458e 100644 --- a/lib/handlers/unsubscribe.js +++ b/lib/handlers/unsubscribe.js @@ -65,7 +65,8 @@ function completeUnsubscribe (err) { var done = this.finish if (err) { - return client.emit('error', err) + client.emit('error', err) + return } if ((!packet.close || client.clean === true) && packet.unsubscriptions.length > 0) {