Skip to content

Commit

Permalink
Introduce connecting and closed metadata (#409)
Browse files Browse the repository at this point in the history
* Introduce connecting and closed metadata

* Fixed write fn

* Removed redundant finish call

* Set connected=true only in the completion stage

* Set connecting=false when preConnect fails

* We allow queue post connected messages

* connected=true not ready in connack(rc=0)

* Added connecting metadata test

* Added closed metdata tests

* Update README.md

* Update README.md
  • Loading branch information
gnought committed Feb 8, 2020
1 parent 03de415 commit c291ea4
Show file tree
Hide file tree
Showing 8 changed files with 109 additions and 98 deletions.
116 changes: 53 additions & 63 deletions README.md
@@ -1,3 +1,4 @@
<!-- markdownlint-disable MD024 -->
# Aedes
![](https://github.com/moscajs/aedes/workflows/ci/badge.svg)
[![Dependencies Status](https://david-dm.org/moscajs/aedes/status.svg)](https://david-dm.org/moscajs/aedes)
Expand Down Expand Up @@ -31,8 +32,8 @@ Barebone MQTT server that can run on any stream server.
## Install
To install aedes, simply use npm:

```
npm install aedes --save
```sh
npm install aedes
```

<a name="example"></a>
Expand Down Expand Up @@ -106,10 +107,6 @@ httpServer.listen(wsPort, function () {
* <a href="#published"><code>instance.<b>published()</b></code></a>
* <a href="#close"><code>instance.<b>close()</b></code></a>
* <a href="#client"><code><b>Client</b></code></a>
* <a href="#clientid"><code>client.<b>id</b></code></a>
* <a href="#clientclean"><code>client.<b>clean</b></code></a>
* <a href="#clientconn"><code>client.<b>conn</b></code></a>
* <a href="#clientreq"><code>client.<b>req</b></code></a>
* <a href="#clientpublish"><code>client.<b>publish()</b></code></a>
* <a href="#clientsubscribe"><code>client.<b>subscribe()</b></code></a>
* <a href="#clientunsubscribe"><code>client.<b>unsubscribe()</b></code></a>
Expand All @@ -121,7 +118,7 @@ httpServer.listen(wsPort, function () {

Creates a new instance of Aedes.

Options:
#### Options

* `mq`: an instance of [MQEmitter](http://npm.im/mqemitter), check [plugins](#plugins) for more mqemitters options. Used to share messages between multiple brokers instances (ex: clusters)
* `persistence`: an instance of [AedesPersistence](http://npm.im/aedes-persistence), check [plugins](#plugins) for more persistence options. It's used to store *QoS > 1*, *retained*, *will* packets and subscriptions in memory or on disk (if not specified default persistence is in memory)
Expand Down Expand Up @@ -151,7 +148,11 @@ Options:
* `published`: function called when a new packet is published, see
[instance.published()](#published)

Events:
#### Properties

* `closed`: read-only, shows the aedes closed state

#### Events

* `client`: when a new [Client](#client) successfully connects and register itself to server, [connackSent event will be come after], arguments:
1. `client`
Expand Down Expand Up @@ -248,7 +249,8 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings.
### instance.unsubscribe(topic, func(packet, cb), done)

The reverse of [subscribe](#subscribe).
------------------------------------------------------

-------------------------------------------------------
<a name="decodeProtocol"></a>
### instance.decodeProtocol(client, buffer)

Expand All @@ -260,12 +262,14 @@ instance.decodeProtocol = function(client, buffer) {
return protocol
}
```

-------------------------------------------------------
<a name="preConnect"></a>
### instance.preConnect(client, done(err, successful))

It will be called when aedes instance receives a first valid CONNECT packet from client. client object state is in default and its connected state is false. Any values in CONNECT packet (like clientId, clean flag, keepalive) will pass to client object after this call. Override to supply custom preConnect logic.
Some use cases:

1. Rate Limit / Throttle by `client.conn.remoteAddress`
2. Check `instance.connectedClient` to limit maximum connections
3. IP blacklisting
Expand All @@ -275,11 +279,13 @@ instance.preConnect = function(client, callback) {
callback(null, client.conn.remoteAddress === '::1') {
}
```
```js
instance.preConnect = function(client, callback) {
callback(new Error('connection error'), client.conn.remoteAddress !== '::1') {
}
```
-------------------------------------------------------
<a name="authenticate"></a>
### instance.authenticate(client, username, password, done(err, successful))
Expand All @@ -292,6 +298,7 @@ instance.authenticate = function (client, username, password, callback) {
callback(null, username === 'matteo')
}
```
Other return codes can passed as follows :-
```js
Expand All @@ -301,12 +308,13 @@ instance.authenticate = function (client, username, password, callback) {
callback(error, null)
}
```
The return code values and their responses which can be passed are given below:
* `1` - Unacceptable protocol version
* `2` - Identifier rejected
* `3` - Server unavailable
* `4` - Bad user name or password
* `1` - Unacceptable protocol version
* `2` - Identifier rejected
* `3` - Server unavailable
* `4` - Bad user name or password
-------------------------------------------------------
<a name="authorizePublish"></a>
Expand Down Expand Up @@ -409,40 +417,22 @@ Events:
Classes for all connected clients.
Events:
* `error`, in case something bad happended
-------------------------------------------------------
<a name="clientid"></a>
### client#id
The id of the client, as specified by the CONNECT packet, defaults to 'aedes_' + shortid()
-------------------------------------------------------
<a name="clientclean"></a>
### client#clean
`true` if the client connected (CONNECT) with `clean: true`, `false`
otherwise. Check the MQTT spec for what this means.
-------------------------------------------------------
<a name="clientconn"></a>
### client#conn
#### Properties
The client's connection stream object.
* `connecting` read-only, it is true when Client is in CONNECT phase. Aedes emits `connackSent` event will not reset `connecting` to `false` until it received all its offline messagess to the Client
* `connected`: read-only, it is true when `connected` event is emitted, and false when Client is closed
* `closed`: read-only, shows Client closed state
* `id`: Client id, specified by CONNECT packet, defaults to `'aedes_' + shortid()`. It is `null` in `preConnect` hooks unless it returns good
* `clean`: Client clean flag, specified by CONNECT packet.
* `conn`: Client connection stream object.
* In the case of `net.createServer`, `conn` passed to the `connectionlistener` function by node's [net.createServer](https://nodejs.org/api/net.html#net_net_createserver_options_connectionlistener) API
* In the case of [`websocket-stream`](https://www.npmjs.com/package/websocket-stream), it's the `stream` argument passed to the websocket `handle` function in [`websocket-stream` documentation](https://github.com/maxogden/websocket-stream/blob/e2a51644bb35132d7aa477ae1a27ff083fedbf08/readme.md#on-the-server)
* `req`: only for [`websocket-stream`](https://www.npmjs.com/package/websocket-stream). It is a HTTP Websocket upgrade request object passed to websocket `handle` function in [`websocket-stream` documentation](https://github.com/maxogden/websocket-stream/blob/e2a51644bb35132d7aa477ae1a27ff083fedbf08/readme.md#on-the-server). It gives an option for accessing headers or cookies
In the case of `net.createServer` brokers, it's the connection passed to the `connectionlistener` function by node's [net.createServer](https://nodejs.org/api/net.html#net_net_createserver_options_connectionlistener) API.
#### Events
In the case of [websocket-stream](https://www.npmjs.com/package/websocket-stream) brokers, it's the `stream` argument passed to the `handle` function described in [that library's documentation](https://github.com/maxogden/websocket-stream/blob/e2a51644bb35132d7aa477ae1a27ff083fedbf08/readme.md#on-the-server).
-------------------------------------------------------
<a name="clientreq"></a>
### client#req
The HTTP Websocket upgrade request object passed to websocket broker's `handle` function by the [`websocket-stream` library](https://github.com/maxogden/websocket-stream/blob/e2a51644bb35132d7aa477ae1a27ff083fedbf08/readme.md#on-the-server).
If your clients are connecting to aedes via websocket and you need access to headers or cookies, you can get them here. **NOTE:** this property is only present for websocket connections.
* `connected`, this is the same as aedes `clientReady` but in client-wise
* `error`, in case something bad happended
-------------------------------------------------------
<a name="clientpublish"></a>
Expand Down Expand Up @@ -496,27 +486,27 @@ Disconnects the client
You can subscribe on the following `$SYS` topics to get client presence:
- `$SYS/+/new/clients` - will inform about new clients connections
- `$SYS/+/disconnect/clients` - will inform about client disconnections.
* `$SYS/+/new/clients` - will inform about new clients connections
* `$SYS/+/disconnect/clients` - will inform about client disconnections.
The payload will contain the `clientId` of the connected/disconnected client
## Plugins
- [aedes-persistence](https://github.com/moscajs/aedes-persistence): In-memory implementation of an Aedes persistence
- [aedes-persistence-mongodb](https://github.com/moscajs/aedes-persistence-mongodb): MongoDB persistence for Aedes
- [aedes-persistence-redis](https://github.com/moscajs/aedes-persistence-redis): Redis persistence for Aedes
- [aedes-persistence-level](https://github.com/moscajs/aedes-persistence-level): LevelDB persistence for Aedes
- [aedes-persistence-nedb](https://github.com/ovhemert/aedes-persistence-nedb#readme): NeDB persistence for Aedes
- [mqemitter](https://github.com/mcollina/mqemitter): An Opinionated Message Queue with an emitter-style API
- [mqemitter-redis](https://github.com/mcollina/mqemitter-redis): Redis-powered mqemitter
- [mqemitter-mongodb](https://github.com/mcollina/mqemitter-mongodb): Mongodb based mqemitter
- [mqemitter-child-process](https://github.com/mcollina/mqemitter-child-process): Share the same mqemitter between a hierarchy of child processes
- [mqemitter-cs](https://github.com/mcollina/mqemitter-cs): Expose a MQEmitter via a simple client/server protocol
- [mqemitter-p2p](https://github.com/mcollina/mqemitter-p2p): A P2P implementation of MQEmitter, based on HyperEmitter and a Merkle DAG
- [mqemitter-aerospike](https://github.com/GavinDmello/mqemitter-aerospike): Aerospike mqemitter based on @mcollina 's mqemitter
- [aedes-logging](https://github.com/moscajs/aedes-logging): Logging module for Aedes, based on Pino
- [aedes-stats](https://github.com/moscajs/aedes-stats): Stats for Aedes
- [aedes-protocol-decoder](https://github.com/moscajs/aedes-protocol-decoder): Protocol decoder for Aedes MQTT Broker
* [aedes-persistence](https://github.com/moscajs/aedes-persistence): In-memory implementation of an Aedes persistence
* [aedes-persistence-mongodb](https://github.com/moscajs/aedes-persistence-mongodb): MongoDB persistence for Aedes
* [aedes-persistence-redis](https://github.com/moscajs/aedes-persistence-redis): Redis persistence for Aedes
* [aedes-persistence-level](https://github.com/moscajs/aedes-persistence-level): LevelDB persistence for Aedes
* [aedes-persistence-nedb](https://github.com/ovhemert/aedes-persistence-nedb#readme): NeDB persistence for Aedes
* [mqemitter](https://github.com/mcollina/mqemitter): An Opinionated Message Queue with an emitter-style API
* [mqemitter-redis](https://github.com/mcollina/mqemitter-redis): Redis-powered mqemitter
* [mqemitter-mongodb](https://github.com/mcollina/mqemitter-mongodb): Mongodb based mqemitter
* [mqemitter-child-process](https://github.com/mcollina/mqemitter-child-process): Share the same mqemitter between a hierarchy of child processes
* [mqemitter-cs](https://github.com/mcollina/mqemitter-cs): Expose a MQEmitter via a simple client/server protocol
* [mqemitter-p2p](https://github.com/mcollina/mqemitter-p2p): A P2P implementation of MQEmitter, based on HyperEmitter and a Merkle DAG
* [mqemitter-aerospike](https://github.com/GavinDmello/mqemitter-aerospike): Aerospike mqemitter based on @mcollina 's mqemitter
* [aedes-logging](https://github.com/moscajs/aedes-logging): Logging module for Aedes, based on Pino
* [aedes-stats](https://github.com/moscajs/aedes-stats): Stats for Aedes
* [aedes-protocol-decoder](https://github.com/moscajs/aedes-protocol-decoder): Protocol decoder for Aedes MQTT Broker
## Collaborators
Expand All @@ -543,7 +533,7 @@ Example benchmark test with 1000 clients sending 5000 QoS 1 messsages. Used [mqt

### Aedes

```
```sh
========= TOTAL (1000) =========
Total Ratio: 1.000 (5000000/5000000)
Total Runtime (sec): 178.495
Expand All @@ -558,7 +548,7 @@ Total Bandwidth (msg/sec): 28114.678

### Mosca

```
```sh
========= TOTAL (1000) =========
Total Ratio: 1.000 (5000000/5000000)
Total Runtime (sec): 264.934
Expand Down
32 changes: 16 additions & 16 deletions lib/client.js
Expand Up @@ -18,6 +18,8 @@ function Client (broker, conn, req) {
const that = this

// metadata
this.closed = false
this.connecting = false
this.connected = false
this.connackSent = false
this.errored = false
Expand Down Expand Up @@ -232,17 +234,18 @@ Client.prototype.unsubscribe = function (packet, done) {
}

Client.prototype.close = function (done) {
const that = this

if (this._eos === nop) {
if (this.closed) {
if (typeof done === 'function') {
done()
}
return
}

const that = this
const conn = this.conn

this.closed = true

this._parser.removeAllListeners('packet')
conn.removeAllListeners('readable')

Expand All @@ -262,17 +265,13 @@ Client.prototype.close = function (done) {
this._eos()
this._eos = nop

if (this.connected) {
handleUnsubscribe(
this,
{
close: true,
unsubscriptions: Object.keys(this.subscriptions)
},
finish)
} else {
finish()
}
handleUnsubscribe(
this,
{
close: true,
unsubscriptions: Object.keys(this.subscriptions)
},
finish)

function finish () {
if (!that._disconnected && that.will) {
Expand All @@ -297,11 +296,12 @@ Client.prototype.close = function (done) {
that.will = null // this function might be called twice
that._will = null

that.connected = false
that.connecting = false

conn.removeAllListeners('error')
conn.on('error', nop)

that.connected = false

if (that.broker.clients[that.id] && that._authorized) {
that.broker.unregisterClient(that)
}
Expand Down
8 changes: 5 additions & 3 deletions lib/handlers/connect.js
Expand Up @@ -42,14 +42,15 @@ const errorMessages = [
function handleConnect (client, packet, done) {
clearTimeout(client._connectTimer)
client._connectTimer = null

client.connecting = true
client.broker.preConnect(client, negate)

function negate (err, successful) {
if (!err && successful === true) {
setImmediate(init, client, packet, done)
return
}
client.connecting = false
if (err) {
client.broker.emit('connectionError', client, err)
}
Expand All @@ -58,7 +59,6 @@ function handleConnect (client, packet, done) {
}

function init (client, packet, done) {
client.connected = true
const clientId = packet.clientId
var returnCode = 0
// [MQTT-3.1.2-2]
Expand Down Expand Up @@ -88,7 +88,9 @@ function init (client, packet, done) {
connectActions,
{ returnCode: 0, sessionPresent: false }, // [MQTT-3.1.4-4], [MQTT-3.2.2-4]
function (err) {
this.client.connecting = false
if (!err) {
this.client.connected = true
this.client.broker.emit('clientReady', client)
this.client.emit('connected')
}
Expand All @@ -106,7 +108,7 @@ function authenticate (arg, done) {
negate)

function negate (err, successful) {
if (!client.connected || client.broker.closed) {
if (client.closed || client.broker.closed) {
// a hack, sometimes client.close() or broker.close() happened
// before authenticate() comes back
// we stop here for not to register it and deregister it in write()
Expand Down
5 changes: 3 additions & 2 deletions lib/handlers/index.js
Expand Up @@ -11,15 +11,16 @@ const handlePing = require('./ping')

function handle (client, packet, done) {
if (packet.cmd === 'connect') {
if (client.connected || !client._connectTimer) {
if (client.connecting || client.connected) {
// [MQTT-3.1.0-2]
finish(client.conn, packet, done)
return
}
handleConnect(client, packet, done)
return
}
if (!client.connected) {

if (!client.connecting && !client.connected) {
// [MQTT-3.1.0-1]
finish(client.conn, packet, done)
return
Expand Down
2 changes: 1 addition & 1 deletion lib/write.js
Expand Up @@ -3,7 +3,7 @@
const mqtt = require('mqtt-packet')

function write (client, packet, done) {
if (client.conn.writable && client.connected) {
if (client.conn.writable && (client.connecting || client.connected)) {
const result = mqtt.writeToStream(packet, client.conn)
if (!result && !client.errored && done) {
client.conn.once('drain', done)
Expand Down

0 comments on commit c291ea4

Please sign in to comment.