diff --git a/README.md b/README.md index 42a38333..882e381c 100644 --- a/README.md +++ b/README.md @@ -110,6 +110,8 @@ Options: packet to arrive, defaults to `30000` milliseconds * `id`: id used to identify this broker instance in `$SYS` messages, defaults to `uuidv4()` +* `decodeProtocol`: function called when a valid buffer is received, see + [instance.decodeProtocol()](#decodeProtocol) * `preConnect`: function called when a valid CONNECT is received, see [instance.preConnect()](#preConnect) * `authenticate`: function used to authenticate clients, see @@ -220,7 +222,19 @@ Both `topic` and `payload` can be `Buffer` objects instead of strings. ### instance.unsubscribe(topic, func(packet, cb), done) The reverse of [subscribe](#subscribe). +------------------------------------------------------ + +### instance.decodeProtocol(client, buffer) +It will be called when aedes instance trustProxy is true and that it receives a first valid buffer from client. client object state is in default and its connected state is false. A default function parse https headers (x-real-ip | x-forwarded-for) and proxy protocol v1 and v2 to retrieve information in client.connDetails. Override to supply custom protocolDecoder logic, if it returns an object with data property, this property will be parsed as an mqtt-packet. + + +```js +instance.decodeProtocol = function(client, buffer) { + var protocol = yourDecoder(client, buffer) + return protocol +} +``` ------------------------------------------------------- ### instance.preConnect(client, done(err, successful)) diff --git a/aedes.js b/aedes.js index 8036c135..e2d5c268 100644 --- a/aedes.js +++ b/aedes.js @@ -11,6 +11,7 @@ var Packet = require('aedes-packet') var bulk = require('bulk-write-stream') var reusify = require('reusify') var Client = require('./lib/client') +var protocolDecoder = require('./lib/protocol-decoder') module.exports = Aedes Aedes.Server = Aedes @@ -19,12 +20,15 @@ var defaultOptions = { concurrency: 100, heartbeatInterval: 60000, // 1 minute connectTimeout: 30000, // 30 secs + decodeProtocol: defaultDecodeProtocol, preConnect: defaultPreConnect, authenticate: defaultAuthenticate, authorizePublish: defaultAuthorizePublish, authorizeSubscribe: defaultAuthorizeSubscribe, authorizeForward: defaultAuthorizeForward, - published: defaultPublished + published: defaultPublished, + trustProxy: false, + trustedProxies: [] } function Aedes (opts) { @@ -59,6 +63,10 @@ function Aedes (opts) { this.authorizeForward = opts.authorizeForward this.published = opts.published + this.decodeProtocol = opts.decodeProtocol + this.trustProxy = opts.trustProxy + this.trustedProxies = opts.trustedProxies + this.clients = {} this.brokers = {} @@ -295,9 +303,15 @@ Aedes.prototype.close = function (cb = noop) { Aedes.prototype.version = require('./package.json').version +function defaultDecodeProtocol (client, buffer) { + var proto = protocolDecoder(client, buffer) + return proto +} + function defaultPreConnect (client, callback) { callback(null, true) } + function defaultAuthenticate (client, username, password, callback) { callback(null, true) } diff --git a/examples/proxy/index.js b/examples/proxy/index.js new file mode 100644 index 00000000..971dfcd6 --- /dev/null +++ b/examples/proxy/index.js @@ -0,0 +1,175 @@ +'use strict' + +var aedes = require('../../aedes') +var mqemitter = require('mqemitter') +var persistence = require('aedes-persistence') +var mqttPacket = require('mqtt-packet') +var net = require('net') +var proxyProtocol = require('proxy-protocol-js') + +var brokerPort = 4883 + +// from https://stackoverflow.com/questions/57077161/how-do-i-convert-hex-buffer-to-ipv6-in-javascript +function parseIpV6 (ip) { + return ip.match(/.{1,4}/g) + .map((val) => val.replace(/^0+/, '')) + .join(':') + .replace(/0000:/g, ':') + .replace(/:{2,}/g, '::') +} + +function sendProxyPacket (version = 1, ipFamily = 4) { + var packet = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: `my-client-${version}`, + keepalive: 0 + } + var hostIpV4 = '0.0.0.0' + var clientIpV4 = '192.168.1.128' + var hostIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0] + var clientIpV6 = [0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 1, 128] + var protocol + if (version === 1) { + if (ipFamily === 4) { + protocol = new proxyProtocol.V1BinaryProxyProtocol( + proxyProtocol.INETProtocol.TCP4, + new proxyProtocol.Peer(clientIpV4, 12345), + new proxyProtocol.Peer(hostIpV4, brokerPort), + mqttPacket.generate(packet) + ).build() + } else if (ipFamily === 6) { + protocol = new proxyProtocol.V1BinaryProxyProtocol( + proxyProtocol.INETProtocol.TCP6, + new proxyProtocol.Peer(parseIpV6(Buffer.from(clientIpV6).toString('hex')), 12345), + new proxyProtocol.Peer(parseIpV6(Buffer.from(hostIpV6).toString('hex')), brokerPort), + mqttPacket.generate(packet) + ).build() + } + } else if (version === 2) { + if (ipFamily === 4) { + protocol = new proxyProtocol.V2ProxyProtocol( + proxyProtocol.Command.LOCAL, + proxyProtocol.TransportProtocol.STREAM, + new proxyProtocol.IPv4ProxyAddress( + proxyProtocol.IPv4Address.createFrom(clientIpV4.split('.')), + 12346, + proxyProtocol.IPv4Address.createFrom(hostIpV4.split('.')), + brokerPort + ), + mqttPacket.generate(packet) + ).build() + } else if (ipFamily === 6) { + protocol = new proxyProtocol.V2ProxyProtocol( + proxyProtocol.Command.PROXY, + proxyProtocol.TransportProtocol.STREAM, + new proxyProtocol.IPv6ProxyAddress( + proxyProtocol.IPv6Address.createFrom(clientIpV6), + 12346, + proxyProtocol.IPv6Address.createFrom(hostIpV6), + brokerPort + ), + mqttPacket.generate(packet) + ).build() + } + } + + var parsedProto = version === 1 + ? proxyProtocol.V1BinaryProxyProtocol.parse(protocol) + : proxyProtocol.V2ProxyProtocol.parse(protocol) + // console.log(parsedProto) + + var dstPort = version === 1 + ? parsedProto.destination.port + : parsedProto.proxyAddress.destinationPort + + var dstHost + if (version === 1) { + if (ipFamily === 4) { + dstHost = parsedProto.destination.ipAddress + } else if (ipFamily === 6) { + dstHost = parsedProto.destination.ipAddress + // console.log('ipV6 host :', parsedProto.destination.ipAddress) + } + } else if (version === 2) { + if (ipFamily === 4) { + dstHost = parsedProto.proxyAddress.destinationAddress.address.join('.') + } else if (ipFamily === 6) { + // console.log('ipV6 client :', parseIpV6(Buffer.from(clientIpV6).toString('hex'))) + dstHost = parseIpV6(Buffer.from(parsedProto.proxyAddress.destinationAddress.address).toString('hex')) + } + } + + console.log('Connection to :', dstHost, dstPort) + var mqttConn = net.createConnection( + { + port: dstPort, + host: dstHost, + timeout: 150 + } + ) + + var data = protocol + + mqttConn.on('timeout', function () { + mqttConn.end(data) + }) +} + +function startAedes () { + var broker = aedes({ + mq: mqemitter({ + concurrency: 100 + }), + persistence: persistence(), + preConnect: function (client, done) { + console.log('Aedes preConnect check client ip:', client.connDetails) + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + } + client.close() + return done(null, true) + }, + trustProxy: true + }) + + var server = require('net').createServer(broker.handle) + + server.listen(brokerPort, function () { + console.log('Aedes listening on :', server.address()) + broker.publish({ topic: 'aedes/hello', payload: "I'm broker " + broker.id }) + setTimeout(() => sendProxyPacket(1), 250) + setTimeout(() => sendProxyPacket(1, 6), 500) + setTimeout(() => sendProxyPacket(2), 750) + setTimeout(() => sendProxyPacket(2, 6), 1000) + }) + + broker.on('subscribe', function (subscriptions, client) { + console.log('MQTT client \x1b[32m' + (client ? client.id : client) + + '\x1b[0m subscribed to topics: ' + subscriptions.map(s => s.topic).join('\n'), 'from broker', broker.id) + }) + + broker.on('unsubscribe', function (subscriptions, client) { + console.log('MQTT client \x1b[32m' + (client ? client.id : client) + + '\x1b[0m unsubscribed to topics: ' + subscriptions.join('\n'), 'from broker', broker.id) + }) + + // fired when a client connects + broker.on('client', function (client) { + console.log('Client Connected: \x1b[33m' + (client ? client.id : client) + ' ip ' + (client ? client.ip : null) + '\x1b[0m', 'to broker', broker.id) + }) + + // fired when a client disconnects + broker.on('clientDisconnect', function (client) { + console.log('Client Disconnected: \x1b[31m' + (client ? client.id : client) + '\x1b[0m', 'to broker', broker.id) + }) + + // fired when a message is published + broker.on('publish', async function (packet, client) { + console.log('Client \x1b[31m' + (client ? client.id : 'BROKER_' + broker.id) + '\x1b[0m has published', packet.payload.toString(), 'on', packet.topic, 'to broker', broker.id) + }) +} + +startAedes() diff --git a/examples/proxy/package.json b/examples/proxy/package.json new file mode 100644 index 00000000..d7ff8d91 --- /dev/null +++ b/examples/proxy/package.json @@ -0,0 +1,17 @@ +{ + "name": "aedes_proxy", + "version": "1.0.0", + "description": "Testing Aedes Broker behing proxy", + "main": "index.js", + "scripts": { + "test": "echo \"Error: no test specified\" && exit 1" + }, + "author": "getlarge", + "license": "MIT", + "dependencies": { + "aedes": "git+https://git@github.com/getlarge/aedes.git#proxy_and_ip_decoder", + "mqemitter": "^3.0.0", + "mqtt-packet": "^6.2.1", + "proxy-protocol-js": "^4.0.2" + } +} diff --git a/lib/client.js b/lib/client.js index b1be41c8..ddd2e796 100644 --- a/lib/client.js +++ b/lib/client.js @@ -45,6 +45,8 @@ function Client (broker, conn, req) { this.disconnected = false + this.connDetails = null + this.parser.on('packet', enqueue) function nextBatch (err) { @@ -64,8 +66,14 @@ function Client (broker, conn, req) { that._parsingBatch = 0 var buf = empty buf = client.conn.read(null) - - if (buf) { + if (!client.connackSent && client.broker.trustProxy && buf) { + var { data } = client.broker.decodeProtocol(client, buf) + if (data) { + client.parser.parse(data) + } else { + client.parser.parse(buf) + } + } else if (buf) { client.parser.parse(buf) } } diff --git a/lib/protocol-decoder.js b/lib/protocol-decoder.js new file mode 100644 index 00000000..3b466419 --- /dev/null +++ b/lib/protocol-decoder.js @@ -0,0 +1,129 @@ +'use strict' + +var proxyProtocol = require('proxy-protocol-js') + +var v1ProxyProtocolSignature = Buffer.from('PROXY ', 'utf8') +var v2ProxyProtocolSignature = Buffer.from([ + 0x0d, + 0x0a, + 0x0d, + 0x0a, + 0x00, + 0x0d, + 0x0a, + 0x51, + 0x55, + 0x49, + 0x54, + 0x0a +]) + +function isValidV1ProxyProtocol (buffer) { + for (var i = 0; i < v1ProxyProtocolSignature.length; i++) { + if (buffer[i] !== v1ProxyProtocolSignature[i]) { + return false + } + } + return true +} + +function isValidV2ProxyProtocol (buffer) { + for (var i = 0; i < v2ProxyProtocolSignature.length; i++) { + if (buffer[i] !== v2ProxyProtocolSignature[i]) { + return false + } + } + return true +} + +// from https://stackoverflow.com/questions/57077161/how-do-i-convert-hex-buffer-to-ipv6-in-javascript +function parseIpV6Array (ip) { + var ipHex = Buffer.from(ip).toString('hex') + return ipHex.match(/.{1,4}/g) + .map((val) => val.replace(/^0+/, '')) + .join(':') + .replace(/0000:/g, ':') + .replace(/:{2,}/g, '::') +} + +function protocolDecoder (client, data) { + var proto = {} + if (!data) return proto + var trustProxy = client.broker.trustProxy + var ipFamily + var conn = client.conn + var socket = conn.socket || conn + proto.isProxy = 0 + proto.isWebsocket = false + if (trustProxy) { + var headers = client.req && client.req.headers ? client.req.headers : null + var proxyProto + if (headers) { + if (headers['x-real-ip']) proto.ipAddress = headers['x-real-ip'] + else if (headers['x-forwarded-for']) proto.ipAddress = headers['x-forwarded-for'] + proto.port = socket._socket.remotePort + proto.isWebsocket = true + } + if (isValidV1ProxyProtocol(data)) { + proxyProto = proxyProtocol.V1BinaryProxyProtocol.parse(data) + if (proxyProto && proxyProto.source && proxyProto.data) { + ipFamily = proxyProto.inetProtocol + proto.ipAddress = proxyProto.source.ipAddress + proto.port = proxyProto.source.port + proto.data = proxyProto.data + proto.isProxy = 1 + } + } else if (isValidV2ProxyProtocol(data)) { + proxyProto = proxyProtocol.V2ProxyProtocol.parse(data) + if (proxyProto && proxyProto.proxyAddress && proxyProto.data) { + if (proxyProto.proxyAddress instanceof proxyProtocol.IPv4ProxyAddress) { + ipFamily = 'IPv4' + proto.ipAddress = proxyProto.proxyAddress.sourceAddress.address.join('.') + proto.port = proxyProto.proxyAddress.sourceAddress.address.port + } else if (proxyProto.proxyAddress instanceof proxyProtocol.IPv6ProxyAddress) { + ipFamily = 'IPv6' + proto.ipAddress = parseIpV6Array(proxyProto.proxyAddress.sourceAddress.address) + proto.port = proxyProto.proxyAddress.sourceAddress.address.port + } + proto.isProxy = 2 + if (Buffer.isBuffer(proxyProto.data)) { + proto.data = proxyProto.data + } else { + proto.data = Buffer.from(proxyProto.data) + } + } + } + } + if (!proto.ipAddress) { + if (socket._socket && socket._socket.address) { + proto.isWebsocket = true + proto.ipAddress = socket._socket.remoteAddress + proto.port = socket._socket.remotePort + ipFamily = socket._socket.remoteFamily + } else if (socket.address) { + proto.ipAddress = socket.remoteAddress + proto.port = socket.remotePort + ipFamily = socket.remoteFamily + } + } + if (ipFamily && ipFamily.endsWith('4')) { + proto.ipFamily = 4 + } else if (ipFamily && ipFamily.endsWith('6')) { + proto.ipFamily = 6 + } else { + proto.ipFamily = 0 + } + if (!client.connDetails) client.connDetails = {} + if (proto.ipAddress) { + client.connDetails.ipAddress = proto.ipAddress + } + if (proto.port) { + client.connDetails.port = proto.port + } + client.connDetails.ipFamily = proto.ipFamily + client.connDetails.isProxy = proto.isProxy + client.connDetails.isWebsocket = proto.isWebsocket + return proto +} + +module.exports = protocolDecoder diff --git a/package.json b/package.json index 04f501a6..23a715aa 100644 --- a/package.json +++ b/package.json @@ -98,6 +98,7 @@ "from2": "^2.3.0", "mqemitter": "^3.0.0", "mqtt-packet": "^6.2.1", + "proxy-protocol-js": "^4.0.2", "pump": "^3.0.0", "retimer": "^2.0.0", "reusify": "^1.0.4", diff --git a/test/basic.js b/test/basic.js index 88bfe47a..0f48e50f 100644 --- a/test/basic.js +++ b/test/basic.js @@ -157,13 +157,15 @@ test('unsubscribe without subscribe', function (t) { }) test('unsubscribe on disconnect for a clean=true client', function (t) { - t.plan(6) + t.plan(7) var opts = { clean: true } - var s = noError(connect(setup(), opts), t) + var s = connect(setup(), opts) subscribe(t, s, 'hello', 0, function () { - s.conn.emit('close') + s.conn.destroy(null, function () { + t.pass('closed streams') + }) s.outStream.on('data', function () { t.fail('should not receive any more messages') }) @@ -185,13 +187,15 @@ test('unsubscribe on disconnect for a clean=true client', function (t) { }) test('unsubscribe on disconnect for a clean=false client', function (t) { - t.plan(5) + t.plan(6) var opts = { clean: false } - var s = noError(connect(setup(), opts), t) + var s = connect(setup(), opts) subscribe(t, s, 'hello', 0, function () { - s.conn.emit('close') + s.conn.destroy(null, function () { + t.pass('closed streams') + }) s.outStream.on('data', function () { t.fail('should not receive any more messages') }) diff --git a/test/connect.js b/test/connect.js index c2c5e1f6..5eefd9ae 100644 --- a/test/connect.js +++ b/test/connect.js @@ -8,6 +8,9 @@ var connect = helper.connect var http = require('http') var ws = require('websocket-stream') var mqtt = require('mqtt') +var mqttPacket = require('mqtt-packet') +var net = require('net') +var proxyProtocol = require('proxy-protocol-js') ;[{ ver: 3, id: 'MQIsdp' }, { ver: 4, id: 'MQTT' }].forEach(function (ele) { test('connect and connack (minimal)', function (t) { @@ -410,6 +413,352 @@ test('websocket clients have access to the request object', function (t) { function finish () { clearTimeout(timer) + client.end() + broker.close() + server.close() + t.end() + } +}) + +// test ipAddress property presence when trustProxy is enabled +test('tcp clients have access to the ipAddress from the socket', function (t) { + t.plan(2) + + var port = 4883 + var broker = aedes({ + preConnect: function (client, done) { + if (client && client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal('::ffff:127.0.0.1', client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + }, + trustProxy: true + }) + + var server = net.createServer(broker.handle) + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = mqtt.connect({ + port, + keepalive: 0, + clientId: 'mqtt-client', + clean: false + }) + + function finish () { + client.end() + broker.close() + server.close() + t.end() + } +}) + +test('tcp proxied (protocol v1) clients have access to the ipAddress(v4)', function (t) { + t.plan(2) + + var port = 4883 + var clientIp = '192.168.0.140' + var packet = { + cmd: 'connect', + protocolId: 'MQIsdp', + protocolVersion: 3, + clean: true, + clientId: 'my-client-proxyV1', + keepalive: 0 + } + + var buf = mqttPacket.generate(packet) + var src = new proxyProtocol.Peer(clientIp, 12345) + var dst = new proxyProtocol.Peer('127.0.0.1', port) + var protocol = new proxyProtocol.V1BinaryProxyProtocol( + proxyProtocol.INETProtocol.TCP4, + src, + dst, + buf + ).build() + + var broker = aedes({ + preConnect: function (client, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + }, + trustProxy: true + }) + + var server = net.createServer(broker.handle) + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = net.connect({ + port, + timeout: 0 + }, function () { + client.write(protocol) + }) + + function finish () { + client.end() + broker.close() + server.close() + t.end() + } +}) + +test('tcp proxied (protocol v2) clients have access to the ipAddress(v4)', function (t) { + t.plan(2) + + var port = 4883 + var clientIp = '192.168.0.140' + var packet = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client-proxyV2' + } + + var protocol = new proxyProtocol.V2ProxyProtocol( + proxyProtocol.Command.LOCAL, + proxyProtocol.TransportProtocol.DGRAM, + new proxyProtocol.IPv4ProxyAddress( + proxyProtocol.IPv4Address.createFrom(clientIp.split('.')), + 12345, + proxyProtocol.IPv4Address.createFrom([127, 0, 0, 1]), + port + ), + mqttPacket.generate(packet) + ).build() + + var broker = aedes({ + preConnect: function (client, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + }, + trustProxy: true + }) + + var server = net.createServer(broker.handle) + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = net.createConnection( + { + port, + timeout: 0 + }, function () { + client.write(Buffer.from(protocol)) + } + ) + + function finish () { + client.end() + broker.close() + server.close() + t.end() + } +}) + +test('tcp proxied (protocol v2) clients have access to the ipAddress(v6)', function (t) { + t.plan(2) + + var port = 4883 + var clientIpArray = [0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 192, 168, 1, 128] + var clientIp = '::ffff:c0a8:180:' + var packet = { + cmd: 'connect', + protocolId: 'MQTT', + protocolVersion: 4, + clean: true, + clientId: 'my-client-proxyV2' + } + + var protocol = new proxyProtocol.V2ProxyProtocol( + proxyProtocol.Command.PROXY, + proxyProtocol.TransportProtocol.STREAM, + new proxyProtocol.IPv6ProxyAddress( + proxyProtocol.IPv6Address.createFrom(clientIpArray), + 12345, + proxyProtocol.IPv6Address.createWithEmptyAddress(), + port + ), + mqttPacket.generate(packet) + ).build() + + var broker = aedes({ + preConnect: function (client, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + }, + trustProxy: true + }) + + var server = net.createServer(broker.handle) + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = net.createConnection( + { + port, + timeout: 0 + }, function () { + client.write(Buffer.from(protocol)) + } + ) + + function finish () { + client.end() + broker.close() + server.close() + t.end() + } +}) + +test('websocket clients have access to the ipAddress from the socket (if no ip header)', function (t) { + t.plan(2) + + var clientIp = '::ffff:127.0.0.1' + var port = 4883 + var broker = aedes({ + preConnect: function (client, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + }, + trustProxy: true + }) + + var server = http.createServer() + ws.createServer({ + server: server + }, broker.handle) + + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = mqtt.connect(`ws://localhost:${port}`) + + function finish () { + broker.close() + server.close() + client.end() + t.end() + } +}) + +test('websocket proxied clients have access to the ipAddress from x-real-ip header', function (t) { + t.plan(2) + + var clientIp = '192.168.0.140' + var port = 4883 + var broker = aedes({ + preConnect: function (client, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + }, + trustProxy: true + }) + + var server = http.createServer() + ws.createServer({ + server: server + }, broker.handle) + + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = mqtt.connect(`ws://localhost:${port}`, { + wsOptions: { + headers: { + 'X-Real-Ip': clientIp + } + } + }) + + function finish () { + broker.close() + server.close() + client.end() + t.end() + } +}) + +test('websocket proxied clients have access to the ipAddress from x-forwarded-for header', function (t) { + t.plan(2) + + var clientIp = '192.168.0.140' + var port = 4883 + var broker = aedes({ + preConnect: function (client, done) { + if (client.connDetails && client.connDetails.ipAddress) { + client.ip = client.connDetails.ipAddress + t.equal(clientIp, client.ip) + } else { + t.fail('no ip address present') + } + done(null, true) + setImmediate(finish) + }, + trustProxy: true + }) + + var server = http.createServer() + ws.createServer({ + server: server + }, broker.handle) + + server.listen(port, function (err) { + t.error(err, 'no error') + }) + + var client = mqtt.connect(`ws://localhost:${port}`, { + wsOptions: { + headers: { + 'X-Forwarded-For': clientIp + } + } + }) + + function finish () { broker.close() server.close() client.end() diff --git a/test/meta.js b/test/meta.js index cc670847..4c335e68 100644 --- a/test/meta.js +++ b/test/meta.js @@ -26,7 +26,7 @@ test('count connected clients', function (t) { // needed because destroy() will do the trick before // the next tick - process.nextTick(function () { + setImmediate(function () { t.equal(broker.connectedClients, 1, 'one connected clients') }) }) diff --git a/test/will.js b/test/will.js index 28b1a638..71473475 100644 --- a/test/will.js +++ b/test/will.js @@ -305,17 +305,18 @@ test('does not deliver will if keepalive is triggered during authentication', fu // [MQTT-3.14.4-1] test('does not deliver will when client sends a DISCONNECT', function (t) { + t.plan(0) + var broker = aedes() - var s = willConnect(setup(broker), {}, - function () { - s.inStream.end({ - cmd: 'disconnect' - }) - } - ) + var s = willConnect(setup(broker), {}, function () { + s.inStream.end({ + cmd: 'disconnect' + }) + }) s.broker.mq.on('mywill', function (packet, cb) { t.fail(packet) }) + broker.on('closed', t.end.bind(t)) })