From 976218855eec2111a6db1275e5753423b4b39b7b Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 08:28:41 +0100 Subject: [PATCH 1/3] feat: Max clients Id length option for MQTT 3.1.0 #434 --- aedes.d.ts | 45 +++++++++++++++++++++-------------------- aedes.js | 4 +++- lib/handlers/connect.js | 2 +- test/connect.js | 26 ++++++++++++++++++++++++ 4 files changed, 53 insertions(+), 24 deletions(-) diff --git a/aedes.d.ts b/aedes.d.ts index a33f065f..ccbf7590 100644 --- a/aedes.d.ts +++ b/aedes.d.ts @@ -11,7 +11,7 @@ import { Socket } from 'net' import { IncomingMessage } from 'http' import EventEmitter = NodeJS.EventEmitter -declare function aedes (options?: aedes.AedesOptions): aedes.Aedes +declare function aedes(options?: aedes.AedesOptions): aedes.Aedes // eslint-disable-next-line no-redeclare declare namespace aedes { @@ -69,6 +69,7 @@ declare namespace aedes { authorizeForward?: AuthorizeForwardHandler published?: PublishedHandler queueLimit?: number + maxClientsIdLength?: number } interface Client extends EventEmitter { id: string @@ -79,16 +80,16 @@ declare namespace aedes { connected: boolean closed: boolean - on (event: 'connected', listener: () => void): this - on (event: 'error', listener: (error: Error) => void): this + on(event: 'connected', listener: () => void): this + on(event: 'error', listener: (error: Error) => void): this - publish (message: PublishPacket, callback?: (error?: Error) => void): void - subscribe ( + publish(message: PublishPacket, callback?: (error?: Error) => void): void + subscribe( subscriptions: Subscriptions | Subscription | Subscription[] | SubscribePacket, callback?: (error?: Error) => void ): void - unsubscribe (topicObjects: Subscriptions | Subscription | Subscription[] | UnsubscribePacket, callback?: (error?: Error) => void): void - close (callback?: () => void): void + unsubscribe(topicObjects: Subscriptions | Subscription | Subscription[] | UnsubscribePacket, callback?: (error?: Error) => void): void + close(callback?: () => void): void } interface Aedes extends EventEmitter { @@ -98,34 +99,34 @@ declare namespace aedes { handle: (stream: Connection) => Client - on (event: 'closed', listener: () => void): this - on (event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout', listener: (client: Client) => void): this - on (event: 'clientError' | 'connectionError', listener: (client: Client, error: Error) => void): this - on (event: 'connackSent', listener: (packet: ConnackPacket, client: Client) => void): this - on (event: 'ping', listener: (packet: PingreqPacket, client: Client) => void): this - on (event: 'publish', listener: (packet: AedesPublishPacket, client: Client) => void): this - on (event: 'ack', listener: (packet: PublishPacket | PubrelPacket, client: Client) => void): this - on (event: 'subscribe', listener: (subscriptions: Subscription[], client: Client) => void): this - on (event: 'unsubscribe', listener: (unsubscriptions: string[], client: Client) => void): this - - publish ( + on(event: 'closed', listener: () => void): this + on(event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout', listener: (client: Client) => void): this + on(event: 'clientError' | 'connectionError', listener: (client: Client, error: Error) => void): this + on(event: 'connackSent', listener: (packet: ConnackPacket, client: Client) => void): this + on(event: 'ping', listener: (packet: PingreqPacket, client: Client) => void): this + on(event: 'publish', listener: (packet: AedesPublishPacket, client: Client) => void): this + on(event: 'ack', listener: (packet: PublishPacket | PubrelPacket, client: Client) => void): this + on(event: 'subscribe', listener: (subscriptions: Subscription[], client: Client) => void): this + on(event: 'unsubscribe', listener: (unsubscriptions: string[], client: Client) => void): this + + publish( packet: PublishPacket, callback: (error?: Error) => void ): void - subscribe ( + subscribe( topic: string, deliverfunc: (packet: AedesPublishPacket, callback: () => void) => void, callback: () => void ): void - unsubscribe ( + unsubscribe( topic: string, deliverfunc: (packet: AedesPublishPacket, callback: () => void) => void, callback: () => void ): void - close (callback?: () => void): void + close(callback?: () => void): void } - function Server (options?: aedes.AedesOptions): aedes.Aedes + function Server(options?: aedes.AedesOptions): aedes.Aedes } export = aedes diff --git a/aedes.js b/aedes.js index f71e1b38..2f1f46f1 100644 --- a/aedes.js +++ b/aedes.js @@ -28,7 +28,8 @@ const defaultOptions = { published: defaultPublished, trustProxy: false, trustedProxies: [], - queueLimit: 42 + queueLimit: 42, + maxClientsIdLength: 23 } function Aedes (opts) { @@ -44,6 +45,7 @@ function Aedes (opts) { this.counter = 0 this.queueLimit = opts.queueLimit this.connectTimeout = opts.connectTimeout + this.maxClientsIdLength = opts.maxClientsIdLength this.mq = opts.mq || mqemitter(opts) this.handle = function handle (conn, req) { conn.setMaxListeners(opts.concurrency * 2) diff --git a/lib/handlers/connect.js b/lib/handlers/connect.js index b0af2bfc..d53db1ad 100644 --- a/lib/handlers/connect.js +++ b/lib/handlers/connect.js @@ -66,7 +66,7 @@ function init (client, packet, done) { returnCode = 1 } // MQTT 3.1.0 allows <= 23 client id length - if (packet.protocolVersion === 3 && clientId.length > 23) { + if (packet.protocolVersion === 3 && clientId.length > client.broker.maxClientsIdLength) { returnCode = 2 } if (returnCode > 0) { diff --git a/test/connect.js b/test/connect.js index 179f3e16..83b855ff 100644 --- a/test/connect.js +++ b/test/connect.js @@ -304,6 +304,32 @@ test('reject clients with > 23 clientId length in MQTT 3.1.0', function (t) { }) }) +test('connect clients with > 23 clientId length using aedes maxClientsIdLength option in MQTT 3.1.0', function (t) { + t.plan(3) + + const broker = aedes({ maxClientsIdLength: 26 }) + t.tearDown(broker.close.bind(broker)) + + const s = setup(broker) + + s.inStream.write({ + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 3, + clean: true, + clientId: 'abcdefghijklmnopqrstuvwxyz', + keepalive: 0 + }) + s.outStream.on('data', function (packet) { + t.equal(packet.cmd, 'connack') + t.equal(packet.returnCode, 0) + t.equal(broker.connectedClients, 1) + }) + broker.on('connectionError', function (client, err) { + t.error(err, 'no error') + }) +}) + test('connect with > 23 clientId length in MQTT 3.1.1', function (t) { t.plan(3) From 91b9f2a5a9e6716cb9e91b5e1c635e92a6449f69 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 08:32:19 +0100 Subject: [PATCH 2/3] docs: maxClientsIdLength --- docs/Aedes.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/docs/Aedes.md b/docs/Aedes.md index 76112857..315685f0 100644 --- a/docs/Aedes.md +++ b/docs/Aedes.md @@ -23,7 +23,7 @@ - [aedes.subscribe (topic, deliverfunc, callback)](#aedessubscribe-topic-deliverfunc-callback) - [aedes.unsubscribe (topic, deliverfunc, callback)](#aedesunsubscribe-topic-deliverfunc-callback) - [aedes.publish (packet, callback)](#aedespublish-packet-callback) - - [aedes.close (callback)](#aedesclose-callback) + - [aedes.close ([callback])](#aedesclose-callback) - [Handler: decodeProtocol (client, buffer)](#handler-decodeprotocol-client-buffer) - [Handler: preConnect (client, callback)](#handler-preconnect-client-callback) - [Handler: authenticate (client, username, password, callback)](#handler-authenticate-client-username-password-callback) @@ -39,6 +39,7 @@ - `concurrency` `` maximum number of concurrent messages delivered by `mq`. __Default__: `100` - `persistence` [``](../README.md#persistence) a middleware stores _QoS > 0, retained, will_ packets and subscriptions. __Default__: `aedes-persistence` - `queueLimit` `` maximum number of queued messages before client session is established. If number of queued items exceeds, `connectionError` throws an error `Client queue limit reached`. __Default__: `42` + - `maxClientsIdLength` option to override MQTT 3.1.0 clients Id length limit. __Default__: `23` - `heartbeatInterval` `` an interval in millisconds at which server beats its health signal in `$SYS//heartbeat` topic. __Default__: `60000` - `connectTimeout` `` maximum waiting time in milliseconds waiting for a [`CONNECT`][CONNECT] packet. __Default__: `30000` - Returns `` From 6374a5243e40fee8ff5b5cd8f0f68d59fe237451 Mon Sep 17 00:00:00 2001 From: Daniel Lando Date: Thu, 20 Feb 2020 08:33:58 +0100 Subject: [PATCH 3/3] fixed formatting --- aedes.d.ts | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/aedes.d.ts b/aedes.d.ts index ccbf7590..9c39601b 100644 --- a/aedes.d.ts +++ b/aedes.d.ts @@ -11,7 +11,7 @@ import { Socket } from 'net' import { IncomingMessage } from 'http' import EventEmitter = NodeJS.EventEmitter -declare function aedes(options?: aedes.AedesOptions): aedes.Aedes +declare function aedes (options?: aedes.AedesOptions): aedes.Aedes // eslint-disable-next-line no-redeclare declare namespace aedes { @@ -80,16 +80,16 @@ declare namespace aedes { connected: boolean closed: boolean - on(event: 'connected', listener: () => void): this - on(event: 'error', listener: (error: Error) => void): this + on (event: 'connected', listener: () => void): this + on (event: 'error', listener: (error: Error) => void): this - publish(message: PublishPacket, callback?: (error?: Error) => void): void - subscribe( + publish (message: PublishPacket, callback?: (error?: Error) => void): void + subscribe ( subscriptions: Subscriptions | Subscription | Subscription[] | SubscribePacket, callback?: (error?: Error) => void ): void - unsubscribe(topicObjects: Subscriptions | Subscription | Subscription[] | UnsubscribePacket, callback?: (error?: Error) => void): void - close(callback?: () => void): void + unsubscribe (topicObjects: Subscriptions | Subscription | Subscription[] | UnsubscribePacket, callback?: (error?: Error) => void): void + close (callback?: () => void): void } interface Aedes extends EventEmitter { @@ -99,34 +99,34 @@ declare namespace aedes { handle: (stream: Connection) => Client - on(event: 'closed', listener: () => void): this - on(event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout', listener: (client: Client) => void): this - on(event: 'clientError' | 'connectionError', listener: (client: Client, error: Error) => void): this - on(event: 'connackSent', listener: (packet: ConnackPacket, client: Client) => void): this - on(event: 'ping', listener: (packet: PingreqPacket, client: Client) => void): this - on(event: 'publish', listener: (packet: AedesPublishPacket, client: Client) => void): this - on(event: 'ack', listener: (packet: PublishPacket | PubrelPacket, client: Client) => void): this - on(event: 'subscribe', listener: (subscriptions: Subscription[], client: Client) => void): this - on(event: 'unsubscribe', listener: (unsubscriptions: string[], client: Client) => void): this - - publish( + on (event: 'closed', listener: () => void): this + on (event: 'client' | 'clientReady' | 'clientDisconnect' | 'keepaliveTimeout', listener: (client: Client) => void): this + on (event: 'clientError' | 'connectionError', listener: (client: Client, error: Error) => void): this + on (event: 'connackSent', listener: (packet: ConnackPacket, client: Client) => void): this + on (event: 'ping', listener: (packet: PingreqPacket, client: Client) => void): this + on (event: 'publish', listener: (packet: AedesPublishPacket, client: Client) => void): this + on (event: 'ack', listener: (packet: PublishPacket | PubrelPacket, client: Client) => void): this + on (event: 'subscribe', listener: (subscriptions: Subscription[], client: Client) => void): this + on (event: 'unsubscribe', listener: (unsubscriptions: string[], client: Client) => void): this + + publish ( packet: PublishPacket, callback: (error?: Error) => void ): void - subscribe( + subscribe ( topic: string, deliverfunc: (packet: AedesPublishPacket, callback: () => void) => void, callback: () => void ): void - unsubscribe( + unsubscribe ( topic: string, deliverfunc: (packet: AedesPublishPacket, callback: () => void) => void, callback: () => void ): void - close(callback?: () => void): void + close (callback?: () => void): void } - function Server(options?: aedes.AedesOptions): aedes.Aedes + function Server (options?: aedes.AedesOptions): aedes.Aedes } export = aedes