diff --git a/README.md b/README.md index cb4598fe..fe57ca14 100644 --- a/README.md +++ b/README.md @@ -120,7 +120,9 @@ Options: Events: -* `client`: when a new [Client](#client) connects, arguments: +* `client`: when a new [Client](#client) successfully connects and register itself to server, [connackSent event will be come after], arguments: + 1. `client` +* `clientReady`: when a new [Client](#client) received all its offline messages, it is ready, arguments: 1. `client` * `clientDisconnect`: when a [Client](#client) disconnects, arguments: 1. `client` @@ -152,8 +154,9 @@ packet. [UNSUBSCRIBE](https://github.com/mqttjs/mqtt-packet#unsubscribe) packet. 2. `client` -* `connackSent`: when a CONNACK packet is sent to a client [Client](#client) (happens after `'client'`), arguments: - 1. `client` +* `connackSent`: when a CONNACK packet is sent to a client, arguments: + 1. `packet` + 2. `client` * `closed`: when the broker is closed ------------------------------------------------------- diff --git a/aedes.js b/aedes.js index 9fe60ad2..edeabe6e 100644 --- a/aedes.js +++ b/aedes.js @@ -19,6 +19,7 @@ var defaultOptions = { concurrency: 100, heartbeatInterval: 60000, // 1 minute connectTimeout: 30000, // 30 secs + preConnect: defaultPreConnect, authenticate: defaultAuthenticate, authorizePublish: defaultAuthorizePublish, authorizeSubscribe: defaultAuthorizeSubscribe, @@ -50,6 +51,7 @@ function Aedes (opts) { this._series = series() this._enqueuers = reusify(DoEnqueues) + this.preConnect = opts.preConnect this.authenticate = opts.authenticate this.authorizePublish = opts.authorizePublish this.authorizeSubscribe = opts.authorizeSubscribe @@ -295,6 +297,9 @@ Aedes.prototype.close = function (cb = noop) { } } +function defaultPreConnect (client) { + return true +} function defaultAuthenticate (client, username, password, callback) { callback(null, true) } diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index 3e6ce12e..ae685f15 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -6,7 +6,13 @@ var write = require('../write') var QoSPacket = require('../qos-packet') var through = require('through2') var handleSubscribe = require('./subscribe') -var uuid = require('uuid') +var shortid = require('shortid') + +function Connack (arg) { + this.cmd = 'connack' + this.returnCode = arg.returnCode + this.sessionPresent = arg.sessionPresent +} function ClientPacketStatus (client, packet) { this.client = client @@ -34,15 +40,33 @@ var errorMessages = [ ] function handleConnect (client, packet, done) { + if (client.broker.preConnect(client) === false) { + return client.conn.destroy() + } client.connected = true - client.clean = packet.clean - - if (!packet.clientId && packet.protocolVersion === 3) { - client.emit('error', new Error('Empty clientIds are supported only on MQTT 3.1.1')) - return done() + var clientId = packet.clientId + var returnCode = 0 + // [MQTT-3.1.2-2] + if (packet.protocolVersion < 3 || packet.protocolVersion > 4) { + returnCode = 1 + } + // MQTT 3.1.0 allows <= 23 client id length + if (packet.protocolVersion === 3 && clientId.length > 23) { + returnCode = 2 + } + // console.log(returnCode) + if (returnCode > 0) { + var error = new Error(errorMessages[returnCode]) + error.errorCode = returnCode + client.broker.emit('clientError', client, error) + doConnack( + { client: client, returnCode: returnCode, sessionPresent: false }, + done.bind(this, error)) + return client.conn.end() } - client.id = packet.clientId || uuid.v4() + client.id = clientId || 'aedes_' + shortid() + client.clean = packet.clean client._will = packet.will clearTimeout(client._connectTimer) @@ -50,9 +74,11 @@ function handleConnect (client, packet, done) { client.broker._series( new ClientPacketStatus(client, packet), - connectActions, {}, function (err) { + connectActions, + { returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4] + function (err) { + this.client.broker.emit('clientReady', client) this.client.emit('connected') - client.connackSent = true done(err) }) } @@ -68,38 +94,32 @@ function authenticate (arg, done) { function negate (err, successful) { if (!client.connected) { - // a hack, sometimes close happends before authenticate comes back + // a hack, sometimes close() happened before authenticate() comes back // we stop here for not to register it and deregister it in write() return } - var errCode if (!err && successful) { return done() - } else if (err) { - if (err.returnCode && (err.returnCode >= 1 && err.returnCode <= 3)) { - errCode = err.returnCode - write(client, { - cmd: 'connack', - returnCode: err.returnCode - }, client.close.bind(client, done.bind(this, err))) + } + + if (err) { + var errCode = err.returnCode + if (errCode && (errCode >= 1 && errCode <= 3)) { + arg.returnCode = errCode } else { // If errorCode is 4 or not a number - errCode = 4 - write(client, { - cmd: 'connack', - returnCode: 4 - }, client.close.bind(client, done.bind(this, err))) + arg.returnCode = 4 } } else { - errCode = 5 - write(client, { - cmd: 'connack', - returnCode: 5 - }, client.close.bind(client, done.bind(this, new Error(errorMessages[errCode])))) + arg.returnCode = 5 } - var error = new Error(errorMessages[errCode]) - error.errorCode = errCode + var error = new Error(errorMessages[arg.returnCode]) + error.errorCode = arg.returnCode client.broker.emit('clientError', client, error) + arg.client = client + doConnack(arg, + // [MQTT-3.2.2-5] + client.close.bind(client, done.bind(this, error))) } } @@ -124,6 +144,7 @@ function fetchSubs (arg, done) { arg: arg }, gotSubs) } else { + arg.sessionPresent = false // [MQTT-3.2.2-1] this.client.broker.persistence.cleanSubscriptions( this.client, done) @@ -141,7 +162,9 @@ function gotSubs (err, subs, client) { function restoreSubs (arg, done) { if (arg.subs) { handleSubscribe(this.client, { subscriptions: arg.subs, restore: true }, done) + arg.sessionPresent = !!arg.subs // cast to boolean, [MQTT-3.2.2-2] } else { + arg.sessionPresent = false // [MQTT-3.2.2-1], [MQTT-3.2.2-3] done() } } @@ -164,17 +187,12 @@ function registerClient (arg, done) { done() } -function Connack (arg) { - this.cmd = 'connack' - this.returnCode = 0 - this.sessionPresent = !!arg.subs // cast to boolean -} - function doConnack (arg, done) { - var client = this.client + var client = arg.client || this.client const connack = new Connack(arg) write(client, connack, function () { - client.broker.emit('connackSent', client) + client.broker.emit('connackSent', connack, client) + client.connackSent = true done() }) } diff --git a/lib/handlers/index.js b/lib/handlers/index.js index d4db8345..00d5cd21 100644 --- a/lib/handlers/index.js +++ b/lib/handlers/index.js @@ -11,12 +11,17 @@ var handlePing = require('./ping') function handle (client, packet, done) { if (packet.cmd === 'connect') { - // [MQTT-3.1.0-2] - return client.connected ? client.conn.destroy() : handleConnect(client, packet, done) + if (client.connected) { + // [MQTT-3.1.0-2] + client.conn.destroy() + return done(new Error('invalid protocol')) + } + return handleConnect(client, packet, done) } if (!client.connected) { // [MQTT-3.1.0-1] - return client.conn.destroy() + client.conn.destroy() + return done(new Error('invalid protocol')) } switch (packet.cmd) { diff --git a/test/basic.js b/test/basic.js index ef02782b..0e1f0547 100644 --- a/test/basic.js +++ b/test/basic.js @@ -9,83 +9,6 @@ var connect = helper.connect var noError = helper.noError var subscribe = helper.subscribe -test('connect and connack (minimal)', function (t) { - t.plan(1) - - var s = setup() - - s.inStream.write({ - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client', - keepalive: 0 - }) - - s.outStream.on('data', function (packet) { - t.deepEqual(packet, { - cmd: 'connack', - returnCode: 0, - length: 2, - qos: 0, - retain: false, - dup: false, - topic: null, - payload: null, - sessionPresent: false - }, 'successful connack') - t.end() - }) -}) - -test('the first Packet sent from the Client to the Server MUST be a CONNECT Packet [MQTT-3.1.0-1]', function (t) { - t.plan(1) - - var broker = aedes() - var s = setup(broker, false) - - var packet = { - cmd: 'publish', - topic: 'hello', - payload: Buffer.from('world'), - qos: 0, - retain: false - } - s.inStream.write(packet) - setImmediate(() => { - t.ok(s.conn.destroyed, 'close connection if first packet is not a CONNECT') - s.conn.destroy() - broker.close() - t.end() - }) -}) - -test('second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client [MQTT-3.1.0-2]', function (t) { - t.plan(3) - - var broker = aedes() - var packet = { - cmd: 'connect', - protocolId: 'MQTT', - protocolVersion: 4, - clean: true, - clientId: 'my-client', - keepalive: 0 - } - var s = connect(setup(broker, false), { clientId: 'abcde' }, function () { - t.ok(broker.clients['abcde'].connected) - s.inStream.write(packet) - setImmediate(() => { - t.equal(broker.clients['abcde'], undefined, 'client instance is removed') - t.ok(s.conn.destroyed, 'close connection if packet is a CONNECT after network is established') - s.conn.destroy() - broker.close() - t.end() - }) - }) -}) - test('publish QoS 0', function (t) { t.plan(2) diff --git a/test/connect.js b/test/connect.js new file mode 100644 index 00000000..9eac074d --- /dev/null +++ b/test/connect.js @@ -0,0 +1,379 @@ +'use strict' + +var test = require('tape').test +var helper = require('./helper') +var aedes = require('../') +var setup = helper.setup +var connect = helper.connect + +;[{ ver: 3, id: 'MQIsdp' }, { ver: 4, id: 'MQTT' }].forEach(function (ele) { + test('connect and connack (minimal)', function (t) { + t.plan(1) + + var s = setup() + + s.inStream.write({ + cmd: 'connect', + protocolId: ele.id, + protocolVersion: ele.ver, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + + s.outStream.on('data', function (packet) { + t.deepEqual(packet, { + cmd: 'connack', + returnCode: 0, + length: 2, + qos: 0, + retain: false, + dup: false, + topic: null, + payload: null, + sessionPresent: false + }, 'successful connack') + t.end() + }) + }) +}) + +// [MQTT-3.1.2-2] +test('reject client requested for unacceptable protocol version', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 5, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 1, 'unacceptable protocol version') + t.equal(broker.connectedClients, 0) + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'unacceptable protocol version') + }) + broker.on('closed', t.end.bind(t)) +}) + +// [MQTT-3.1.2-1], Guarded in mqtt-packet +test('reject client requested for unsupported protocol version', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 2, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'Invalid protocol version') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +// Guarded in mqtt-packet +test('reject clients with no clientId running on MQTT 3.1.0', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'clientId must be supplied before 3.1.1') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +// [MQTT-3.1.3-7], Guarded in mqtt-packet +test('reject clients without clientid and clean=false on MQTT 3.1.1', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: false, + clientId: '', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'clientId must be given if cleanSession set to 0') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +test('clients without clientid and clean=true on MQTT 3.1.1 will get a generated clientId', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) + broker.on('client', function (client) { + t.ok(client.id.startsWith('aedes_')) + }) + broker.on('closed', t.end.bind(t)) +}) + +test('clients with zero-byte clientid and clean=true on MQTT 3.1.1 will get a generated clientId', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: '', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) + broker.on('client', function (client) { + t.ok(client.id.startsWith('aedes_')) + }) + broker.on('closed', function () { + t.end() + }) +}) + +// [MQTT-3.1.3-7] +test('reject clients with > 23 clientId length in MQTT 3.1.0', function (t) { + t.plan(4) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + clientId: 'abcdefghijklmnopqrstuvwxyz', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 2, 'identifier rejected') + t.equal(broker.connectedClients, 0) + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'identifier rejected') + }) + broker.on('closed', t.end.bind(t)) +}) + +test('connect with > 23 clientId length in MQTT 3.1.1', function (t) { + t.plan(3) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'abcdefghijklmnopqrstuvwxyz', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) + broker.on('closed', t.end.bind(t)) +}) + +// [MQTT-3.1.0-1] +test('the first Packet MUST be a CONNECT Packet', function (t) { + t.plan(1) + + var broker = aedes() + var s = setup(broker, false) + + var packet = { + cmd: 'publish', + topic: 'hello', + payload: Buffer.from('world'), + qos: 0, + retain: false + } + s.inStream.write(packet) + setImmediate(() => { + t.ok(s.conn.destroyed, 'close connection if first packet is not a CONNECT') + s.conn.destroy() + broker.close() + t.end() + }) +}) + +// [MQTT-3.1.0-2] +test('second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client ', function (t) { + t.plan(3) + + var broker = aedes() + var packet = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client', + keepalive: 0 + } + var s = connect(setup(broker, false), { clientId: 'abcde' }, function () { + t.ok(broker.clients['abcde'].connected) + s.inStream.write(packet) + setImmediate(() => { + t.equal(broker.clients['abcde'], undefined, 'client instance is removed') + t.ok(s.conn.destroyed, 'close connection if packet is a CONNECT after network is established') + broker.close() + t.end() + }) + }) +}) + +// [MQTT-3.1.2-1], Guarded in mqtt-packet +test('reject clients with wrong protocol name', function (t) { + t.plan(2) + + var broker = aedes() + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT_hello', + protocolVersion: 3, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.fail('no data sent') + }) + broker.on('connectionError', function (client, err) { + t.equal(err.message, 'Invalid protocolId') + t.equal(broker.connectedClients, 0) + }) + broker.on('closed', t.end.bind(t)) +}) + +test('preConnect handler', function (t) { + t.plan(0) + + var broker = aedes({ + preConnect: function (client, done) { + return false + } + }) + var s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client', + keepalive: 0 + }) + broker.on('client', function (client) { + t.fail('no reach here') + }) + broker.on('clientError', function (client, err) { + t.fail('no reach here') + }) + broker.on('connectionError', function (client, err) { + t.fail('no reach here') + }) + broker.on('closed', t.end.bind(t)) +}) + +/* +- We do some checks in earliest connect phrase, and destory connections if there are invalid checks. Designed for WAF. +* checking of zero-byte client id [MQTT 3.1.0 only] +* improper MQTT version and protocol id mapping in CONNECT +* includes https://github.com/mcollina/aedes/pull/260/commits/06fc8392a1ffea822d003ff6f4c01d100b0e616d +* raised an "invalid protocol" callback error if there is + +- Normal connect checks with CONNACK responses, and raised clientError +* Unsupported aedes supported mqtt version [MQTT-3.1.2-2]: CONNACK return Code = 1 +* Empty clientid but clean=false [MQTT-3.1.3-7]: CONNACK return Code = 2 +* client id length > 23 [MQTT 3.1.0 only]: CONNACK return Code = 2 + +- Emit `connackSent` event and set connactSent=true if CONNACK is sent, not only after `client` event but also when normal connect checks phrase if necessary + +- Added packet arguments in `connackSent` event + +- Added preConnect handler in handleConnect between earliest connect checks and normal checks. This is useful for users if they want to do some earilest DDoS check before server send any responses back, in this phrase connected=false + +- set clientID to 'aedes_' + shortid() if empty [MQTT 3.1.1], it is better to keep it within 23 chars for better compatibility + +- Emit `clientReady` event after we send back all offline messages to client + +- Optimize negate function + +- Optimize doConnack function and we could re-use it + +- Set keepalive after authentication, save some resoures if there are plenty of failed authentication +*/ diff --git a/test/handlers/connect.js b/test/handlers/connect.js deleted file mode 100644 index c0d863bd..00000000 --- a/test/handlers/connect.js +++ /dev/null @@ -1,31 +0,0 @@ -'use strict' - -var test = require('tape').test -var EE = require('events').EventEmitter -var handle = require('../../lib/handlers/index') - -test('reject clients with no clientId running on MQTT 3.1', function (t) { - t.plan(2) - - var client = new EE() - var broker = { - registerClient: function () {} - } - - client.broker = broker - client.conn = { - destroy: function () {} - } - - client.on('error', function (err) { - t.equal(err.message, 'Empty clientIds are supported only on MQTT 3.1.1', 'error message') - }) - - handle(client, { - cmd: 'connect', - protocolVersion: 3, - protocolId: 'MQIsdp' - }, function (err) { - t.error(err, 'no error in callback') - }) -}) diff --git a/test/helper.js b/test/helper.js index d83b2e5e..262dc045 100644 --- a/test/helper.js +++ b/test/helper.js @@ -38,8 +38,8 @@ function connect (s, opts, connected) { opts = opts || {} opts.cmd = 'connect' - opts.protocolId = 'MQTT' - opts.version = 4 + opts.protocolId = opts.protocolId || 'MQTT' + opts.protocolVersion = opts.protocolVersion || 4 opts.clean = !!opts.clean opts.clientId = opts.clientId || 'my-client-' + clients++ opts.keepalive = opts.keepalive || 0 diff --git a/test/meta.js b/test/meta.js index 5396daf6..c5c3774e 100644 --- a/test/meta.js +++ b/test/meta.js @@ -200,13 +200,14 @@ test('emits client', function (t) { }) test('connect and connackSent event', function (t) { + t.plan(3) + t.timeoutAfter(50) + var s = setup() var clientId = 'my-client' - t.plan(2) - t.timeoutAfter(50) - - s.broker.on('connackSent', function (client) { + s.broker.on('connackSent', function (packet, client) { + t.equal(packet.returnCode, 0) t.equal(client.id, clientId, 'connackSent event and clientId matches') t.end() }) diff --git a/types/index.d.ts b/types/index.d.ts index 538993bb..425aae6a 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -34,6 +34,8 @@ declare namespace aedes { close (callback?: () => void): void } + export type PreConnectCallback = (client: Client) => boolean + export type AuthenticateError = Error & { returnCode: AuthErrorCode } export type AuthenticateCallback = ( @@ -57,6 +59,7 @@ declare namespace aedes { concurrency?: number heartbeatInterval?: number connectTimeout?: number + preConnect?: PreConnectCallback authenticate?: AuthenticateCallback authorizePublish?: AuthorizePublishCallback authorizeSubscribe?: AuthorizeSubscribeCallback @@ -67,6 +70,7 @@ declare namespace aedes { export interface Aedes extends EventEmitter { handle: (stream: Duplex) => void + preConnect: PreConnectCallback authenticate: AuthenticateCallback authorizePublish: AuthorizePublishCallback authorizeSubscribe: AuthorizeSubscribeCallback @@ -74,7 +78,7 @@ declare namespace aedes { published: PublishedCallback on (event: 'closed', cb: () => void): this - on (event: 'client' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this + on (event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout' | 'connackSent', cb: (client: Client) => void): this on (event: 'clientError' | 'connectionError', cb: (client: Client, error: Error) => void): this on (event: 'ping' | 'publish' | 'ack', cb: (packet: any, client: Client) => void): this on (event: 'subscribe' | 'unsubscribe', cb: (subscriptions: ISubscription | ISubscription[] | ISubscribePacket, client: Client) => void): this