Skip to content

Commit

Permalink
Optimize Connect handler (#301)
Browse files Browse the repository at this point in the history
* Set keepalive after auth, save mem if auth fails

* Enhance & Optimize connect handler
- For 06fc839, raise an "invalid protocol" callback error if there is

- aedes unsupported mqtt version: CONNACK return Code = 1
- client id length > 23 [MQTT 3.1.0 only]: CONNACK return Code = 2

- Added preConnect handler in handleConnect between earliest connect checks and normal checks. This is useful for users if they want to do some earilest DDoS check before server send any responses back, in this phrase connected=false

- Emit `connackSent` event and set connactSent=true if CONNACK is sent, not only after `client` event but also when normal connect checks phrase if necessary

- Added packet arguments in `connackSent` event

- set clientID to 'aedes_' + shortid() if empty [MQTT 3.1.1], it is better to keep it within 23 chars for better compatibility

- Emit `clientReady` event after we send back all offline messages to client

- Optimize negate function

- Optimize doConnack function and we could re-use it

- Set keepalive after authentication, save some resoures if there are plenty of failed authentication

* Clear connect timer earilest, performance-wise

* Refactored

* Drop useless comments

* Removed unused done arguments

* Added preConnect doc in README.md

* Used connect callback/event in unit tests

* Make preConnect() having a callback

* Fixed preConnect unit test

* Split return in favour of V8 optimization
  • Loading branch information
gnought authored and mcollina committed Aug 23, 2019
1 parent 992cce1 commit 1a87b45
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 321 deletions.
32 changes: 29 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ server.listen(8883, function () {
* <a href="#subscribe"><code>instance.<b>subscribe()</b></code></a>
* <a href="#publish"><code>instance.<b>publish()</b></code></a>
* <a href="#unsubscribe"><code>instance.<b>unsubscribe()</b></code></a>
* <a href="#preConnect"><code>instance.<b>preConnect()</b></code></a>
* <a href="#authenticate"><code>instance.<b>authenticate()</b></code></a>
* <a href="#authorizePublish"><code>instance.<b>authorizePublish()</b></code></a>
* <a href="#authorizeSubscribe"><code>instance.<b>authorizeSubscribe()</b></code></a>
Expand Down Expand Up @@ -107,6 +108,8 @@ Options:
packet to arrive, defaults to `30000` milliseconds
* `id`: id used to identify this broker instance in `$SYS` messages,
defaults to `shortid()`
* `preConnect`: function called when a valid CONNECT is received, see
[instance.preConnect()](#preConnect)
* `authenticate`: function used to authenticate clients, see
[instance.authenticate()](#authenticate)
* `authorizePublish`: function used to authorize PUBLISH packets, see
Expand All @@ -120,7 +123,9 @@ Options:

Events:

* `client`: when a new [Client](#client) connects, arguments:
* `client`: when a new [Client](#client) successfully connects and register itself to server, [connackSent event will be come after], arguments:
1. `client`
* `clientReady`: when a new [Client](#client) received all its offline messages, it is ready, arguments:
1. `client`
* `clientDisconnect`: when a [Client](#client) disconnects, arguments:
1. `client`
Expand Down Expand Up @@ -152,8 +157,9 @@ packet.
[UNSUBSCRIBE](https://github.com/mqttjs/mqtt-packet#unsubscribe)
packet.
2. `client`
* `connackSent`: when a CONNACK packet is sent to a client [Client](#client) (happens after `'client'`), arguments:
1. `client`
* `connackSent`: when a CONNACK packet is sent to a client, arguments:
1. `packet`
2. `client`
* `closed`: when the broker is closed

-------------------------------------------------------
Expand Down Expand Up @@ -213,6 +219,26 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings.

The reverse of [subscribe](#subscribe).

-------------------------------------------------------
<a name="preConnect"></a>
### instance.preConnect(client, done(err, successful))

It will be called when aedes instance receives a first valid CONNECT packet from client. client object state is in default and its connected state is false. Any values in CONNECT packet (like clientId, clean flag, keepalive) will pass to client object after this call. Override to supply custom preConnect logic.
Some use cases:
1. Rate Limit / Throttle by `client.conn.remoteAddress`
2. Check `instance.connectedClient` to limit maximum connections
3. IP blacklisting

```js
instance.preConnect = function(client, callback) {
callback(null, client.conn.remoteAddress === '::1') {
}
```
```js
instance.preConnect = function(client, callback) {
callback(new Error('connection error'), client.conn.remoteAddress !== '::1') {
}
```
-------------------------------------------------------
<a name="authenticate"></a>
### instance.authenticate(client, username, password, done(err, successful))
Expand Down
5 changes: 5 additions & 0 deletions aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var defaultOptions = {
concurrency: 100,
heartbeatInterval: 60000, // 1 minute
connectTimeout: 30000, // 30 secs
preConnect: defaultPreConnect,
authenticate: defaultAuthenticate,
authorizePublish: defaultAuthorizePublish,
authorizeSubscribe: defaultAuthorizeSubscribe,
Expand Down Expand Up @@ -50,6 +51,7 @@ function Aedes (opts) {
this._series = series()
this._enqueuers = reusify(DoEnqueues)

this.preConnect = opts.preConnect
this.authenticate = opts.authenticate
this.authorizePublish = opts.authorizePublish
this.authorizeSubscribe = opts.authorizeSubscribe
Expand Down Expand Up @@ -299,6 +301,9 @@ Aedes.prototype.close = function (cb = noop) {

Aedes.prototype.version = require('./package.json').version

function defaultPreConnect (client, callback) {
callback(null, true)
}
function defaultAuthenticate (client, username, password, callback) {
callback(null, true)
}
Expand Down
3 changes: 2 additions & 1 deletion lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ function Client (broker, conn) {

function nextBatch (err) {
if (err) {
return that.emit('error', err)
that.emit('error', err)
return
}

var buf = empty
Expand Down
169 changes: 104 additions & 65 deletions lib/handlers/connect.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,13 @@ var write = require('../write')
var QoSPacket = require('../qos-packet')
var through = require('through2')
var handleSubscribe = require('./subscribe')
var uuid = require('uuid')
var shortid = require('shortid')

function Connack (arg) {
this.cmd = 'connack'
this.returnCode = arg.returnCode
this.sessionPresent = arg.sessionPresent
}

function ClientPacketStatus (client, packet) {
this.client = client
Expand All @@ -15,6 +21,7 @@ function ClientPacketStatus (client, packet) {

var connectActions = [
authenticate,
setKeepAlive,
fetchSubs,
restoreSubs,
storeWill,
Expand All @@ -33,33 +40,57 @@ var errorMessages = [
]

function handleConnect (client, packet, done) {
client.connected = true
client.clean = packet.clean

if (!packet.clientId && packet.protocolVersion === 3) {
client.emit('error', new Error('Empty clientIds are supported only on MQTT 3.1.1'))
return done()
}

client.id = packet.clientId || uuid.v4()
client._will = packet.will

clearTimeout(client._connectTimer)
client._connectTimer = null

if (packet.keepalive > 0) {
client._keepaliveInterval = (packet.keepalive * 1500) + 1
client._keepaliveTimer = retimer(function keepaliveTimeout () {
client.broker.emit('keepaliveTimeout', client)
client.emit('error', new Error('keep alive timeout'))
}, client._keepaliveInterval)
client.broker.preConnect(client, negate)

function negate (err, successful) {
if (!err && successful === true) {
setImmediate(init, client, packet, done)
return
}
if (err) {
client.broker.emit('connectionError', client, err)
}
client.conn.destroy()
}
}

function init (client, packet, done) {
client.connected = true
var clientId = packet.clientId
var returnCode = 0
// [MQTT-3.1.2-2]
if (packet.protocolVersion < 3 || packet.protocolVersion > 4) {
returnCode = 1
}
// MQTT 3.1.0 allows <= 23 client id length
if (packet.protocolVersion === 3 && clientId.length > 23) {
returnCode = 2
}
if (returnCode > 0) {
var error = new Error(errorMessages[returnCode])
error.errorCode = returnCode
client.broker.emit('clientError', client, error)
doConnack(
{ client: client, returnCode: returnCode, sessionPresent: false },
done.bind(this, error))
client.conn.end()
return
}

client.id = clientId || 'aedes_' + shortid()
client.clean = packet.clean
client._will = packet.will

client.broker._series(
new ClientPacketStatus(client, packet),
connectActions, {}, function (err) {
connectActions,
{ returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4]
function (err) {
this.client.broker.emit('clientReady', client)
this.client.emit('connected')
client.connackSent = true
done(err)
})
}
Expand All @@ -75,53 +106,62 @@ function authenticate (arg, done) {

function negate (err, successful) {
if (!client.connected) {
// a hack, sometimes close happends before authenticate comes back
// a hack, sometimes close() happened before authenticate() comes back
// we stop here for not to register it and deregister it in write()
return
}
var errCode
if (!err && successful) {
return done()
} else if (err) {
if (err.returnCode && (err.returnCode >= 1 && err.returnCode <= 3)) {
errCode = err.returnCode
write(client, {
cmd: 'connack',
returnCode: err.returnCode
}, client.close.bind(client, done.bind(this, err)))
}

if (err) {
var errCode = err.returnCode
if (errCode && (errCode >= 1 && errCode <= 3)) {
arg.returnCode = errCode
} else {
// If errorCode is 4 or not a number
errCode = 4
write(client, {
cmd: 'connack',
returnCode: 4
}, client.close.bind(client, done.bind(this, err)))
arg.returnCode = 4
}
} else {
errCode = 5
write(client, {
cmd: 'connack',
returnCode: 5
}, client.close.bind(client, done.bind(this, new Error(errorMessages[errCode]))))
arg.returnCode = 5
}
var error = new Error(errorMessages[errCode])
error.errorCode = errCode
var error = new Error(errorMessages[arg.returnCode])
error.errorCode = arg.returnCode
client.broker.emit('clientError', client, error)
arg.client = client
doConnack(arg,
// [MQTT-3.2.2-5]
client.close.bind(client, done.bind(this, error)))
}
}

function setKeepAlive (arg, done) {
if (this.packet.keepalive > 0) {
var client = this.client
// [MQTT-3.1.2-24]
client._keepaliveInterval = (this.packet.keepalive * 1500) + 1
client._keepaliveTimer = retimer(function keepaliveTimeout () {
client.broker.emit('keepaliveTimeout', client)
client.emit('error', new Error('keep alive timeout'))
}, client._keepaliveInterval)
}
done()
}

function fetchSubs (arg, done) {
var client = this.client
if (!this.packet.clean) {
this.client.broker.persistence.subscriptionsByClient({
id: this.client.id,
client.broker.persistence.subscriptionsByClient({
id: client.id,
done: done,
arg: arg
}, gotSubs)
} else {
this.client.broker.persistence.cleanSubscriptions(
this.client,
done)
return
}
arg.sessionPresent = false // [MQTT-3.2.2-1]
client.broker.persistence.cleanSubscriptions(
client,
done)
}

function gotSubs (err, subs, client) {
Expand All @@ -135,21 +175,24 @@ function gotSubs (err, subs, client) {
function restoreSubs (arg, done) {
if (arg.subs) {
handleSubscribe(this.client, { subscriptions: arg.subs, restore: true }, done)
} else {
done()
arg.sessionPresent = !!arg.subs // cast to boolean, [MQTT-3.2.2-2]
return
}
arg.sessionPresent = false // [MQTT-3.2.2-1], [MQTT-3.2.2-3]
done()
}

function storeWill (arg, done) {
this.client.will = this.client._will
if (this.client.will) {
this.client.broker.persistence.putWill(
this.client,
this.client.will,
var client = this.client
client.will = client._will
if (client.will) {
client.broker.persistence.putWill(
client,
client.will,
done)
} else {
done()
return
}
done()
}

function registerClient (arg, done) {
Expand All @@ -158,17 +201,12 @@ function registerClient (arg, done) {
done()
}

function Connack (arg) {
this.cmd = 'connack'
this.returnCode = 0
this.sessionPresent = !!arg.subs // cast to boolean
}

function doConnack (arg, done) {
var client = this.client
var client = arg.client || this.client
const connack = new Connack(arg)
write(client, connack, function () {
client.broker.emit('connackSent', client)
client.broker.emit('connackSent', connack, client)
client.connackSent = true
done()
})
}
Expand All @@ -189,7 +227,6 @@ function emptyQueue (arg, done) {

function emptyQueueFilter (err, client, packet) {
var next = packet.writeCallback
var persistence = client.broker.persistence

if (err) {
client.emit('error', err)
Expand All @@ -202,6 +239,8 @@ function emptyQueueFilter (err, client, packet) {
authorized = client.broker.authorizeForward(client, packet)
}

var persistence = client.broker.persistence

if (client.clean || !authorized) {
persistence.outgoingClearMessageId(client, packet, next)
} else {
Expand Down
12 changes: 9 additions & 3 deletions lib/handlers/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ var handlePing = require('./ping')

function handle (client, packet, done) {
if (packet.cmd === 'connect') {
// [MQTT-3.1.0-2]
return client.connected ? client.conn.destroy() : handleConnect(client, packet, done)
if (client.connected) {
// [MQTT-3.1.0-2]
client.conn.destroy()
return done(new Error('invalid protocol'))
}
handleConnect(client, packet, done)
return
}
if (!client.connected) {
// [MQTT-3.1.0-1]
return client.conn.destroy()
client.conn.destroy()
return done(new Error('invalid protocol'))
}

switch (packet.cmd) {
Expand Down
3 changes: 2 additions & 1 deletion lib/handlers/pubrec.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ function handlePubrec (client, packet, done) {
function reply (err) {
if (err) {
// TODO is this ok?
return client._onError(err)
client._onError(err)
return
}

write(client, pubrel, done)
Expand Down
Loading

0 comments on commit 1a87b45

Please sign in to comment.