Skip to content

Commit

Permalink
Proxy and ip decoder (#334)
Browse files Browse the repository at this point in the history
* add trustProxy config in aedes ; protocol decoding ( proxy parser ) and ip setting at nextBatch

* lint and add proxy-protocol in dependency ( test °333 failed )

* adding proxy protocol example, and tests for protocol decoder

* add example and test for proxy protocol v2 ( count connected clients still errored, why ?)

* implement protocol-decoder in an external module, called with broker.decodeProtocol (by default) if broker.trustProxy & before client.connackSent; update tests and doc

* check buffer signature before parsing

* add tests and example for protocol decoder ( using IPv6 ); try a fix for failing tests

* remove extra comments ; extract client port property in protocol decoder
  • Loading branch information
getlarge authored and mcollina committed Dec 16, 2019
1 parent bccfca9 commit 1eada99
Show file tree
Hide file tree
Showing 11 changed files with 729 additions and 17 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,8 @@ Options:
packet to arrive, defaults to `30000` milliseconds
* `id`: id used to identify this broker instance in `$SYS` messages,
defaults to `uuidv4()`
* `decodeProtocol`: function called when a valid buffer is received, see
[instance.decodeProtocol()](#decodeProtocol)
* `preConnect`: function called when a valid CONNECT is received, see
[instance.preConnect()](#preConnect)
* `authenticate`: function used to authenticate clients, see
Expand Down Expand Up @@ -220,7 +222,19 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings.
### instance.unsubscribe(topic, func(packet, cb), done)

The reverse of [subscribe](#subscribe).
------------------------------------------------------
<a name="decodeProtocol"></a>
### instance.decodeProtocol(client, buffer)

It will be called when aedes instance trustProxy is true and that it receives a first valid buffer from client. client object state is in default and its connected state is false. A default function parse https headers (x-real-ip | x-forwarded-for) and proxy protocol v1 and v2 to retrieve information in client.connDetails. Override to supply custom protocolDecoder logic, if it returns an object with data property, this property will be parsed as an mqtt-packet.


```js
instance.decodeProtocol = function(client, buffer) {
var protocol = yourDecoder(client, buffer)
return protocol
}
```
-------------------------------------------------------
<a name="preConnect"></a>
### instance.preConnect(client, done(err, successful))
Expand Down
16 changes: 15 additions & 1 deletion aedes.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ var Packet = require('aedes-packet')
var bulk = require('bulk-write-stream')
var reusify = require('reusify')
var Client = require('./lib/client')
var protocolDecoder = require('./lib/protocol-decoder')

module.exports = Aedes
Aedes.Server = Aedes
Expand All @@ -19,12 +20,15 @@ var defaultOptions = {
concurrency: 100,
heartbeatInterval: 60000, // 1 minute
connectTimeout: 30000, // 30 secs
decodeProtocol: defaultDecodeProtocol,
preConnect: defaultPreConnect,
authenticate: defaultAuthenticate,
authorizePublish: defaultAuthorizePublish,
authorizeSubscribe: defaultAuthorizeSubscribe,
authorizeForward: defaultAuthorizeForward,
published: defaultPublished
published: defaultPublished,
trustProxy: false,
trustedProxies: []
}

function Aedes (opts) {
Expand Down Expand Up @@ -59,6 +63,10 @@ function Aedes (opts) {
this.authorizeForward = opts.authorizeForward
this.published = opts.published

this.decodeProtocol = opts.decodeProtocol
this.trustProxy = opts.trustProxy
this.trustedProxies = opts.trustedProxies

this.clients = {}
this.brokers = {}

Expand Down Expand Up @@ -295,9 +303,15 @@ Aedes.prototype.close = function (cb = noop) {

Aedes.prototype.version = require('./package.json').version

function defaultDecodeProtocol (client, buffer) {
var proto = protocolDecoder(client, buffer)
return proto
}

function defaultPreConnect (client, callback) {
callback(null, true)
}

function defaultAuthenticate (client, username, password, callback) {
callback(null, true)
}
Expand Down
175 changes: 175 additions & 0 deletions examples/proxy/index.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
'use strict'

var aedes = require('../../aedes')
var mqemitter = require('mqemitter')
var persistence = require('aedes-persistence')
var mqttPacket = require('mqtt-packet')
var net = require('net')
var proxyProtocol = require('proxy-protocol-js')

var brokerPort = 4883

// from https://stackoverflow.com/questions/57077161/how-do-i-convert-hex-buffer-to-ipv6-in-javascript
function parseIpV6 (ip) {
return ip.match(/.{1,4}/g)
.map((val) => val.replace(/^0+/, ''))
.join(':')
.replace(/0000:/g, ':')
.replace(/:{2,}/g, '::')
}

function sendProxyPacket (version = 1, ipFamily = 4) {
var packet = {
cmd: 'connect',
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: `my-client-${version}`,
keepalive: 0
}
var hostIpV4 = '0.0.0.0'
var clientIpV4 = '192.168.1.128'
var hostIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
var clientIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 1, 128]
var protocol
if (version === 1) {
if (ipFamily === 4) {
protocol = new proxyProtocol.V1BinaryProxyProtocol(
proxyProtocol.INETProtocol.TCP4,
new proxyProtocol.Peer(clientIpV4, 12345),
new proxyProtocol.Peer(hostIpV4, brokerPort),
mqttPacket.generate(packet)
).build()
} else if (ipFamily === 6) {
protocol = new proxyProtocol.V1BinaryProxyProtocol(
proxyProtocol.INETProtocol.TCP6,
new proxyProtocol.Peer(parseIpV6(Buffer.from(clientIpV6).toString('hex')), 12345),
new proxyProtocol.Peer(parseIpV6(Buffer.from(hostIpV6).toString('hex')), brokerPort),
mqttPacket.generate(packet)
).build()
}
} else if (version === 2) {
if (ipFamily === 4) {
protocol = new proxyProtocol.V2ProxyProtocol(
proxyProtocol.Command.LOCAL,
proxyProtocol.TransportProtocol.STREAM,
new proxyProtocol.IPv4ProxyAddress(
proxyProtocol.IPv4Address.createFrom(clientIpV4.split('.')),
12346,
proxyProtocol.IPv4Address.createFrom(hostIpV4.split('.')),
brokerPort
),
mqttPacket.generate(packet)
).build()
} else if (ipFamily === 6) {
protocol = new proxyProtocol.V2ProxyProtocol(
proxyProtocol.Command.PROXY,
proxyProtocol.TransportProtocol.STREAM,
new proxyProtocol.IPv6ProxyAddress(
proxyProtocol.IPv6Address.createFrom(clientIpV6),
12346,
proxyProtocol.IPv6Address.createFrom(hostIpV6),
brokerPort
),
mqttPacket.generate(packet)
).build()
}
}

var parsedProto = version === 1
? proxyProtocol.V1BinaryProxyProtocol.parse(protocol)
: proxyProtocol.V2ProxyProtocol.parse(protocol)
// console.log(parsedProto)

var dstPort = version === 1
? parsedProto.destination.port
: parsedProto.proxyAddress.destinationPort

var dstHost
if (version === 1) {
if (ipFamily === 4) {
dstHost = parsedProto.destination.ipAddress
} else if (ipFamily === 6) {
dstHost = parsedProto.destination.ipAddress
// console.log('ipV6 host :', parsedProto.destination.ipAddress)
}
} else if (version === 2) {
if (ipFamily === 4) {
dstHost = parsedProto.proxyAddress.destinationAddress.address.join('.')
} else if (ipFamily === 6) {
// console.log('ipV6 client :', parseIpV6(Buffer.from(clientIpV6).toString('hex')))
dstHost = parseIpV6(Buffer.from(parsedProto.proxyAddress.destinationAddress.address).toString('hex'))
}
}

console.log('Connection to :', dstHost, dstPort)
var mqttConn = net.createConnection(
{
port: dstPort,
host: dstHost,
timeout: 150
}
)

var data = protocol

mqttConn.on('timeout', function () {
mqttConn.end(data)
})
}

function startAedes () {
var broker = aedes({
mq: mqemitter({
concurrency: 100
}),
persistence: persistence(),
preConnect: function (client, done) {
console.log('Aedes preConnect check client ip:', client.connDetails)
if (client.connDetails && client.connDetails.ipAddress) {
client.ip = client.connDetails.ipAddress
}
client.close()
return done(null, true)
},
trustProxy: true
})

var server = require('net').createServer(broker.handle)

server.listen(brokerPort, function () {
console.log('Aedes listening on :', server.address())
broker.publish({ topic: 'aedes/hello', payload: "I'm broker " + broker.id })
setTimeout(() => sendProxyPacket(1), 250)
setTimeout(() => sendProxyPacket(1, 6), 500)
setTimeout(() => sendProxyPacket(2), 750)
setTimeout(() => sendProxyPacket(2, 6), 1000)
})

broker.on('subscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', broker.id)
})

broker.on('unsubscribe', function (subscriptions, client) {
console.log('MQTT client \x1b[32m' + (client ? client.id : client) +
'\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', broker.id)
})

// fired when a client connects
broker.on('client', function (client) {
console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + ' ip ' + (client ? client.ip : null) + '\x1b[0m', 'to broker', broker.id)
})

// fired when a client disconnects
broker.on('clientDisconnect', function (client) {
console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', broker.id)
})

// fired when a message is published
broker.on('publish', async function (packet, client) {
console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + broker.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', broker.id)
})
}

startAedes()
17 changes: 17 additions & 0 deletions examples/proxy/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"name": "aedes_proxy",
"version": "1.0.0",
"description": "Testing Aedes Broker behing proxy",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1"
},
"author": "getlarge",
"license": "MIT",
"dependencies": {
"aedes": "git+https://git@github.com/getlarge/aedes.git#proxy_and_ip_decoder",
"mqemitter": "^3.0.0",
"mqtt-packet": "^6.2.1",
"proxy-protocol-js": "^4.0.2"
}
}
12 changes: 10 additions & 2 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ function Client (broker, conn, req) {

this.disconnected = false

this.connDetails = null

this.parser.on('packet', enqueue)

function nextBatch (err) {
Expand All @@ -64,8 +66,14 @@ function Client (broker, conn, req) {
that._parsingBatch = 0
var buf = empty
buf = client.conn.read(null)

if (buf) {
if (!client.connackSent && client.broker.trustProxy && buf) {
var { data } = client.broker.decodeProtocol(client, buf)
if (data) {
client.parser.parse(data)
} else {
client.parser.parse(buf)
}
} else if (buf) {
client.parser.parse(buf)
}
}
Expand Down

0 comments on commit 1eada99

Please sign in to comment.