Skip to content

Commit

Permalink
Enhance & Optimize connect handler
Browse files Browse the repository at this point in the history
- For 06fc839, raise an "invalid protocol" callback error if there is

- aedes unsupported mqtt version: CONNACK return Code = 1
- client id length > 23 [MQTT 3.1.0 only]: CONNACK return Code = 2

- Added preConnect handler in handleConnect between earliest connect checks and normal checks. This is useful for users if they want to do some earilest DDoS check before server send any responses back, in this phrase connected=false

- Emit `connackSent` event and set connactSent=true if CONNACK is sent, not only after `client` event but also when normal connect checks phrase if necessary

- Added packet arguments in `connackSent` event

- set clientID to 'aedes_' + shortid() if empty [MQTT 3.1.1], it is better to keep it within 23 chars for better compatibility

- Emit `clientReady` event after we send back all offline messages to client

- Optimize negate function

- Optimize doConnack function and we could re-use it

- Set keepalive after authentication, save some resoures if there are plenty of failed authentication
  • Loading branch information
gnought committed Jul 29, 2019
1 parent 820ed91 commit dc650dc
Show file tree
Hide file tree
Showing 10 changed files with 466 additions and 159 deletions.
9 changes: 6 additions & 3 deletions README.md
Expand Up @@ -120,7 +120,9 @@ Options:

Events:

* `client`: when a new [Client](#client) connects, arguments:
* `client`: when a new [Client](#client) successfully connects and register itself to server, [connackSent event will be come after], arguments:
1. `client`
* `clientReady`: when a new [Client](#client) received all its offline messages, it is ready, arguments:
1. `client`
* `clientDisconnect`: when a [Client](#client) disconnects, arguments:
1. `client`
Expand Down Expand Up @@ -152,8 +154,9 @@ packet.
[UNSUBSCRIBE](https://github.com/mqttjs/mqtt-packet#unsubscribe)
packet.
2. `client`
* `connackSent`: when a CONNACK packet is sent to a client [Client](#client) (happens after `'client'`), arguments:
1. `client`
* `connackSent`: when a CONNACK packet is sent to a client, arguments:
1. `packet`
2. `client`
* `closed`: when the broker is closed

-------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions aedes.js
Expand Up @@ -19,6 +19,7 @@ var defaultOptions = {
concurrency: 100,
heartbeatInterval: 60000, // 1 minute
connectTimeout: 30000, // 30 secs
preConnect: defaultPreConnect,
authenticate: defaultAuthenticate,
authorizePublish: defaultAuthorizePublish,
authorizeSubscribe: defaultAuthorizeSubscribe,
Expand Down Expand Up @@ -50,6 +51,7 @@ function Aedes (opts) {
this._series = series()
this._enqueuers = reusify(DoEnqueues)

this.preConnect = opts.preConnect
this.authenticate = opts.authenticate
this.authorizePublish = opts.authorizePublish
this.authorizeSubscribe = opts.authorizeSubscribe
Expand Down Expand Up @@ -295,6 +297,9 @@ Aedes.prototype.close = function (cb = noop) {
}
}

function defaultPreConnect (client) {
return true
}
function defaultAuthenticate (client, username, password, callback) {
callback(null, true)
}
Expand Down
94 changes: 56 additions & 38 deletions lib/handlers/connect.js
Expand Up @@ -6,7 +6,13 @@ var write = require('../write')
var QoSPacket = require('../qos-packet')
var through = require('through2')
var handleSubscribe = require('./subscribe')
var uuid = require('uuid')
var shortid = require('shortid')

function Connack (arg) {
this.cmd = 'connack'
this.returnCode = arg.returnCode
this.sessionPresent = arg.sessionPresent
}

function ClientPacketStatus (client, packet) {
this.client = client
Expand Down Expand Up @@ -34,25 +40,45 @@ var errorMessages = [
]

function handleConnect (client, packet, done) {
if (client.broker.preConnect(client) === false) {
return client.conn.destroy()
}
client.connected = true
client.clean = packet.clean

if (!packet.clientId && packet.protocolVersion === 3) {
client.emit('error', new Error('Empty clientIds are supported only on MQTT 3.1.1'))
return done()
var clientId = packet.clientId
var returnCode = 0
// [MQTT-3.1.2-2]
if (packet.protocolVersion < 3 || packet.protocolVersion > 4) {
returnCode = 1
}
// MQTT 3.1.0 allows <= 23 client id length
if (packet.protocolVersion === 3 && clientId.length > 23) {
returnCode = 2
}
// console.log(returnCode)
if (returnCode > 0) {
var error = new Error(errorMessages[returnCode])
error.errorCode = returnCode
client.broker.emit('clientError', client, error)
doConnack(
{ client: client, returnCode: returnCode, sessionPresent: false },
done.bind(this, error))
return client.conn.end()
}

client.id = packet.clientId || uuid.v4()
client.id = clientId || 'aedes_' + shortid()
client.clean = packet.clean
client._will = packet.will

clearTimeout(client._connectTimer)
client._connectTimer = null

client.broker._series(
new ClientPacketStatus(client, packet),
connectActions, {}, function (err) {
connectActions,
{ returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4]
function (err) {
this.client.broker.emit('clientReady', client)
this.client.emit('connected')
client.connackSent = true
done(err)
})
}
Expand All @@ -68,38 +94,32 @@ function authenticate (arg, done) {

function negate (err, successful) {
if (!client.connected) {
// a hack, sometimes close happends before authenticate comes back
// a hack, sometimes close() happened before authenticate() comes back
// we stop here for not to register it and deregister it in write()
return
}
var errCode
if (!err && successful) {
return done()
} else if (err) {
if (err.returnCode && (err.returnCode >= 1 && err.returnCode <= 3)) {
errCode = err.returnCode
write(client, {
cmd: 'connack',
returnCode: err.returnCode
}, client.close.bind(client, done.bind(this, err)))
}

if (err) {
var errCode = err.returnCode
if (errCode && (errCode >= 1 && errCode <= 3)) {
arg.returnCode = errCode
} else {
// If errorCode is 4 or not a number
errCode = 4
write(client, {
cmd: 'connack',
returnCode: 4
}, client.close.bind(client, done.bind(this, err)))
arg.returnCode = 4
}
} else {
errCode = 5
write(client, {
cmd: 'connack',
returnCode: 5
}, client.close.bind(client, done.bind(this, new Error(errorMessages[errCode]))))
arg.returnCode = 5
}
var error = new Error(errorMessages[errCode])
error.errorCode = errCode
var error = new Error(errorMessages[arg.returnCode])
error.errorCode = arg.returnCode
client.broker.emit('clientError', client, error)
arg.client = client
doConnack(arg,
// [MQTT-3.2.2-5]
client.close.bind(client, done.bind(this, error)))
}
}

Expand All @@ -124,6 +144,7 @@ function fetchSubs (arg, done) {
arg: arg
}, gotSubs)
} else {
arg.sessionPresent = false // [MQTT-3.2.2-1]
this.client.broker.persistence.cleanSubscriptions(
this.client,
done)
Expand All @@ -141,7 +162,9 @@ function gotSubs (err, subs, client) {
function restoreSubs (arg, done) {
if (arg.subs) {
handleSubscribe(this.client, { subscriptions: arg.subs, restore: true }, done)
arg.sessionPresent = !!arg.subs // cast to boolean, [MQTT-3.2.2-2]
} else {
arg.sessionPresent = false // [MQTT-3.2.2-1], [MQTT-3.2.2-3]
done()
}
}
Expand All @@ -164,17 +187,12 @@ function registerClient (arg, done) {
done()
}

function Connack (arg) {
this.cmd = 'connack'
this.returnCode = 0
this.sessionPresent = !!arg.subs // cast to boolean
}

function doConnack (arg, done) {
var client = this.client
var client = arg.client || this.client
const connack = new Connack(arg)
write(client, connack, function () {
client.broker.emit('connackSent', client)
client.broker.emit('connackSent', connack, client)
client.connackSent = true
done()
})
}
Expand Down
11 changes: 8 additions & 3 deletions lib/handlers/index.js
Expand Up @@ -11,12 +11,17 @@ var handlePing = require('./ping')

function handle (client, packet, done) {
if (packet.cmd === 'connect') {
// [MQTT-3.1.0-2]
return client.connected ? client.conn.destroy() : handleConnect(client, packet, done)
if (client.connected) {
// [MQTT-3.1.0-2]
client.conn.destroy()
return done(new Error('invalid protocol'))
}
return handleConnect(client, packet, done)
}
if (!client.connected) {
// [MQTT-3.1.0-1]
return client.conn.destroy()
client.conn.destroy()
return done(new Error('invalid protocol'))
}

switch (packet.cmd) {
Expand Down
77 changes: 0 additions & 77 deletions test/basic.js
Expand Up @@ -9,83 +9,6 @@ var connect = helper.connect
var noError = helper.noError
var subscribe = helper.subscribe

test('connect and connack (minimal)', function (t) {
t.plan(1)

var s = setup()

s.inStream.write({
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: 'my-client',
keepalive: 0
})

s.outStream.on('data', function (packet) {
t.deepEqual(packet, {
cmd: 'connack',
returnCode: 0,
length: 2,
qos: 0,
retain: false,
dup: false,
topic: null,
payload: null,
sessionPresent: false
}, 'successful connack')
t.end()
})
})

test('the first Packet sent from the Client to the Server MUST be a CONNECT Packet [MQTT-3.1.0-1]', function (t) {
t.plan(1)

var broker = aedes()
var s = setup(broker, false)

var packet = {
cmd: 'publish',
topic: 'hello',
payload: Buffer.from('world'),
qos: 0,
retain: false
}
s.inStream.write(packet)
setImmediate(() => {
t.ok(s.conn.destroyed, 'close connection if first packet is not a CONNECT')
s.conn.destroy()
broker.close()
t.end()
})
})

test('second CONNECT Packet sent from a Client as a protocol violation and disconnect the Client [MQTT-3.1.0-2]', function (t) {
t.plan(3)

var broker = aedes()
var packet = {
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: 'my-client',
keepalive: 0
}
var s = connect(setup(broker, false), { clientId: 'abcde' }, function () {
t.ok(broker.clients['abcde'].connected)
s.inStream.write(packet)
setImmediate(() => {
t.equal(broker.clients['abcde'], undefined, 'client instance is removed')
t.ok(s.conn.destroyed, 'close connection if packet is a CONNECT after network is established')
s.conn.destroy()
broker.close()
t.end()
})
})
})

test('publish QoS 0', function (t) {
t.plan(2)

Expand Down

0 comments on commit dc650dc

Please sign in to comment.