Skip to content

Commit

Permalink
Merge 0f159f7 into 992cce1
Browse files Browse the repository at this point in the history
  • Loading branch information
gnought committed Aug 17, 2019
2 parents 992cce1 + 0f159f7 commit 033742c
Show file tree
Hide file tree
Showing 14 changed files with 671 additions and 321 deletions.
32 changes: 29 additions & 3 deletions README.md
Expand Up @@ -69,6 +69,7 @@ server.listen(8883, function () {
* <a href="#subscribe"><code>instance.<b>subscribe()</b></code></a>
* <a href="#publish"><code>instance.<b>publish()</b></code></a>
* <a href="#unsubscribe"><code>instance.<b>unsubscribe()</b></code></a>
* <a href="#preConnect"><code>instance.<b>preConnect()</b></code></a>
* <a href="#authenticate"><code>instance.<b>authenticate()</b></code></a>
* <a href="#authorizePublish"><code>instance.<b>authorizePublish()</b></code></a>
* <a href="#authorizeSubscribe"><code>instance.<b>authorizeSubscribe()</b></code></a>
Expand Down Expand Up @@ -107,6 +108,8 @@ Options:
packet to arrive, defaults to `30000` milliseconds
* `id`: id used to identify this broker instance in `$SYS` messages,
defaults to `shortid()`
* `preConnect`: function called when a valid CONNECT is received, see
[instance.preConnect()](#preConnect)
* `authenticate`: function used to authenticate clients, see
[instance.authenticate()](#authenticate)
* `authorizePublish`: function used to authorize PUBLISH packets, see
Expand All @@ -120,7 +123,9 @@ Options:

Events:

* `client`: when a new [Client](#client) connects, arguments:
* `client`: when a new [Client](#client) successfully connects and register itself to server, [connackSent event will be come after], arguments:
1. `client`
* `clientReady`: when a new [Client](#client) received all its offline messages, it is ready, arguments:
1. `client`
* `clientDisconnect`: when a [Client](#client) disconnects, arguments:
1. `client`
Expand Down Expand Up @@ -152,8 +157,9 @@ packet.
[UNSUBSCRIBE](https://github.com/mqttjs/mqtt-packet#unsubscribe)
packet.
2. `client`
* `connackSent`: when a CONNACK packet is sent to a client [Client](#client) (happens after `'client'`), arguments:
1. `client`
* `connackSent`: when a CONNACK packet is sent to a client, arguments:
1. `packet`
2. `client`
* `closed`: when the broker is closed

-------------------------------------------------------
Expand Down Expand Up @@ -213,6 +219,26 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings.

The reverse of [subscribe](#subscribe).

-------------------------------------------------------
<a name="preConnect"></a>
### instance.preConnect(client, done(err, successful))

It will be called when aedes instance receives a first valid CONNECT packet from client. client object state is in default and its connected state is false. Any values in CONNECT packet (like clientId, clean flag, keepalive) will pass to client object after this call. Override to supply custom preConnect logic.
Some use cases:
1. Rate Limit / Throttle by `client.conn.remoteAddress`
2. Check `instance.connectedClient` to limit maximum connections
3. IP blacklisting

```js
instance.preConnect = function(client, callback) {
callback(null, client.conn.remoteAddress === '::1') {
}
```
```js
instance.preConnect = function(client, callback) {
callback(new Error('connection error'), client.conn.remoteAddress !== '::1') {
}
```
-------------------------------------------------------
<a name="authenticate"></a>
### instance.authenticate(client, username, password, done(err, successful))
Expand Down
5 changes: 5 additions & 0 deletions aedes.js
Expand Up @@ -19,6 +19,7 @@ var defaultOptions = {
concurrency: 100,
heartbeatInterval: 60000, // 1 minute
connectTimeout: 30000, // 30 secs
preConnect: defaultPreConnect,
authenticate: defaultAuthenticate,
authorizePublish: defaultAuthorizePublish,
authorizeSubscribe: defaultAuthorizeSubscribe,
Expand Down Expand Up @@ -50,6 +51,7 @@ function Aedes (opts) {
this._series = series()
this._enqueuers = reusify(DoEnqueues)

this.preConnect = opts.preConnect
this.authenticate = opts.authenticate
this.authorizePublish = opts.authorizePublish
this.authorizeSubscribe = opts.authorizeSubscribe
Expand Down Expand Up @@ -299,6 +301,9 @@ Aedes.prototype.close = function (cb = noop) {

Aedes.prototype.version = require('./package.json').version

function defaultPreConnect (client, callback) {
callback(null, true)
}
function defaultAuthenticate (client, username, password, callback) {
callback(null, true)
}
Expand Down
3 changes: 2 additions & 1 deletion lib/client.js
Expand Up @@ -48,7 +48,8 @@ function Client (broker, conn) {

function nextBatch (err) {
if (err) {
return that.emit('error', err)
that.emit('error', err)
return
}

var buf = empty
Expand Down
169 changes: 104 additions & 65 deletions lib/handlers/connect.js
Expand Up @@ -6,7 +6,13 @@ var write = require('../write')
var QoSPacket = require('../qos-packet')
var through = require('through2')
var handleSubscribe = require('./subscribe')
var uuid = require('uuid')
var shortid = require('shortid')

function Connack (arg) {
this.cmd = 'connack'
this.returnCode = arg.returnCode
this.sessionPresent = arg.sessionPresent
}

function ClientPacketStatus (client, packet) {
this.client = client
Expand All @@ -15,6 +21,7 @@ function ClientPacketStatus (client, packet) {

var connectActions = [
authenticate,
setKeepAlive,
fetchSubs,
restoreSubs,
storeWill,
Expand All @@ -33,33 +40,57 @@ var errorMessages = [
]

function handleConnect (client, packet, done) {
client.connected = true
client.clean = packet.clean

if (!packet.clientId && packet.protocolVersion === 3) {
client.emit('error', new Error('Empty clientIds are supported only on MQTT 3.1.1'))
return done()
}

client.id = packet.clientId || uuid.v4()
client._will = packet.will

clearTimeout(client._connectTimer)
client._connectTimer = null

if (packet.keepalive > 0) {
client._keepaliveInterval = (packet.keepalive * 1500) + 1
client._keepaliveTimer = retimer(function keepaliveTimeout () {
client.broker.emit('keepaliveTimeout', client)
client.emit('error', new Error('keep alive timeout'))
}, client._keepaliveInterval)
client.broker.preConnect(client, negate)

function negate (err, successful) {
if (!err && successful === true) {
setImmediate(init, client, packet, done)
return
}
if (err) {
client.broker.emit('connectionError', client, err)
}
client.conn.destroy()
}
}

function init (client, packet, done) {
client.connected = true
var clientId = packet.clientId
var returnCode = 0
// [MQTT-3.1.2-2]
if (packet.protocolVersion < 3 || packet.protocolVersion > 4) {
returnCode = 1
}
// MQTT 3.1.0 allows <= 23 client id length
if (packet.protocolVersion === 3 && clientId.length > 23) {
returnCode = 2
}
if (returnCode > 0) {
var error = new Error(errorMessages[returnCode])
error.errorCode = returnCode
client.broker.emit('clientError', client, error)
doConnack(
{ client: client, returnCode: returnCode, sessionPresent: false },
done.bind(this, error))
client.conn.end()
return
}

client.id = clientId || 'aedes_' + shortid()
client.clean = packet.clean
client._will = packet.will

client.broker._series(
new ClientPacketStatus(client, packet),
connectActions, {}, function (err) {
connectActions,
{ returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4]
function (err) {
this.client.broker.emit('clientReady', client)
this.client.emit('connected')
client.connackSent = true
done(err)
})
}
Expand All @@ -75,53 +106,62 @@ function authenticate (arg, done) {

function negate (err, successful) {
if (!client.connected) {
// a hack, sometimes close happends before authenticate comes back
// a hack, sometimes close() happened before authenticate() comes back
// we stop here for not to register it and deregister it in write()
return
}
var errCode
if (!err && successful) {
return done()
} else if (err) {
if (err.returnCode && (err.returnCode >= 1 && err.returnCode <= 3)) {
errCode = err.returnCode
write(client, {
cmd: 'connack',
returnCode: err.returnCode
}, client.close.bind(client, done.bind(this, err)))
}

if (err) {
var errCode = err.returnCode
if (errCode && (errCode >= 1 && errCode <= 3)) {
arg.returnCode = errCode
} else {
// If errorCode is 4 or not a number
errCode = 4
write(client, {
cmd: 'connack',
returnCode: 4
}, client.close.bind(client, done.bind(this, err)))
arg.returnCode = 4
}
} else {
errCode = 5
write(client, {
cmd: 'connack',
returnCode: 5
}, client.close.bind(client, done.bind(this, new Error(errorMessages[errCode]))))
arg.returnCode = 5
}
var error = new Error(errorMessages[errCode])
error.errorCode = errCode
var error = new Error(errorMessages[arg.returnCode])
error.errorCode = arg.returnCode
client.broker.emit('clientError', client, error)
arg.client = client
doConnack(arg,
// [MQTT-3.2.2-5]
client.close.bind(client, done.bind(this, error)))
}
}

function setKeepAlive (arg, done) {
if (this.packet.keepalive > 0) {
var client = this.client
// [MQTT-3.1.2-24]
client._keepaliveInterval = (this.packet.keepalive * 1500) + 1
client._keepaliveTimer = retimer(function keepaliveTimeout () {
client.broker.emit('keepaliveTimeout', client)
client.emit('error', new Error('keep alive timeout'))
}, client._keepaliveInterval)
}
done()
}

function fetchSubs (arg, done) {
var client = this.client
if (!this.packet.clean) {
this.client.broker.persistence.subscriptionsByClient({
id: this.client.id,
client.broker.persistence.subscriptionsByClient({
id: client.id,
done: done,
arg: arg
}, gotSubs)
} else {
this.client.broker.persistence.cleanSubscriptions(
this.client,
done)
return
}
arg.sessionPresent = false // [MQTT-3.2.2-1]
client.broker.persistence.cleanSubscriptions(
client,
done)
}

function gotSubs (err, subs, client) {
Expand All @@ -135,21 +175,24 @@ function gotSubs (err, subs, client) {
function restoreSubs (arg, done) {
if (arg.subs) {
handleSubscribe(this.client, { subscriptions: arg.subs, restore: true }, done)
} else {
done()
arg.sessionPresent = !!arg.subs // cast to boolean, [MQTT-3.2.2-2]
return
}
arg.sessionPresent = false // [MQTT-3.2.2-1], [MQTT-3.2.2-3]
done()
}

function storeWill (arg, done) {
this.client.will = this.client._will
if (this.client.will) {
this.client.broker.persistence.putWill(
this.client,
this.client.will,
var client = this.client
client.will = client._will
if (client.will) {
client.broker.persistence.putWill(
client,
client.will,
done)
} else {
done()
return
}
done()
}

function registerClient (arg, done) {
Expand All @@ -158,17 +201,12 @@ function registerClient (arg, done) {
done()
}

function Connack (arg) {
this.cmd = 'connack'
this.returnCode = 0
this.sessionPresent = !!arg.subs // cast to boolean
}

function doConnack (arg, done) {
var client = this.client
var client = arg.client || this.client
const connack = new Connack(arg)
write(client, connack, function () {
client.broker.emit('connackSent', client)
client.broker.emit('connackSent', connack, client)
client.connackSent = true
done()
})
}
Expand All @@ -189,7 +227,6 @@ function emptyQueue (arg, done) {

function emptyQueueFilter (err, client, packet) {
var next = packet.writeCallback
var persistence = client.broker.persistence

if (err) {
client.emit('error', err)
Expand All @@ -202,6 +239,8 @@ function emptyQueueFilter (err, client, packet) {
authorized = client.broker.authorizeForward(client, packet)
}

var persistence = client.broker.persistence

if (client.clean || !authorized) {
persistence.outgoingClearMessageId(client, packet, next)
} else {
Expand Down
12 changes: 9 additions & 3 deletions lib/handlers/index.js
Expand Up @@ -11,12 +11,18 @@ var handlePing = require('./ping')

function handle (client, packet, done) {
if (packet.cmd === 'connect') {
// [MQTT-3.1.0-2]
return client.connected ? client.conn.destroy() : handleConnect(client, packet, done)
if (client.connected) {
// [MQTT-3.1.0-2]
client.conn.destroy()
return done(new Error('invalid protocol'))
}
handleConnect(client, packet, done)
return
}
if (!client.connected) {
// [MQTT-3.1.0-1]
return client.conn.destroy()
client.conn.destroy()
return done(new Error('invalid protocol'))
}

switch (packet.cmd) {
Expand Down
3 changes: 2 additions & 1 deletion lib/handlers/pubrec.js
Expand Up @@ -21,7 +21,8 @@ function handlePubrec (client, packet, done) {
function reply (err) {
if (err) {
// TODO is this ok?
return client._onError(err)
client._onError(err)
return
}

write(client, pubrel, done)
Expand Down

0 comments on commit 033742c

Please sign in to comment.