Skip to content

Commit

Permalink
chore: websockets
Browse files Browse the repository at this point in the history
  • Loading branch information
Yoseph Maguire committed Aug 6, 2020
1 parent 963e554 commit d21d277
Show file tree
Hide file tree
Showing 5 changed files with 90 additions and 41 deletions.
42 changes: 42 additions & 0 deletions examples/ws/aedes_server.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
const aedes = require('aedes')()
const httpServer = require('http').createServer()
const WebSocket = require('ws')
const wsPort = 8080

// Here we are creating the Websocket Server that is using the HTTP Server...
const wss = new WebSocket.Server({ server: httpServer })
wss.on('connection', function connection (ws) {
const duplex = WebSocket.createWebSocketStream(ws)
aedes.handle(duplex)
})

httpServer.listen(wsPort, function () {
console.log('websocket server listening on port', wsPort)
})

aedes.on('clientError', function (client, err) {
console.log('client error', client.id, err.message, err.stack)
})

aedes.on('connectionError', function (client, err) {
console.log('client error', client, err.message, err.stack)
})

aedes.on('publish', function (packet, client) {
if (packet && packet.payload) {
console.log('publish packet:', packet.payload.toString())
}
if (client) {
console.log('message from client', client.id)
}
})

aedes.on('subscribe', function (subscriptions, client) {
if (client) {
console.log('subscribe from client', subscriptions, client.id)
}
})

aedes.on('client', function (client) {
console.log('new client', client.id)
})
22 changes: 13 additions & 9 deletions examples/wss/client.js → examples/ws/client.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
'use strict'

var mqtt = require('mqtt')
var mqtt = require('../../types')

var clientId = 'mqttjs_' + Math.random().toString(16).substr(2, 8)

var host = 'wss://localhost:3001/Mosca'
// This sample should be run in tandem with the aedes_server.js file.
// Simply run it:
// $ node aedes_server.js
//
// Then run this file in a separate console:
// $ node websocket_sample.js
//
var host = 'ws://localhost:8080'

var options = {
keepalive: 10,
keepalive: 30,
clientId: clientId,
protocolId: 'MQTT',
protocolVersion: 4,
Expand All @@ -20,11 +27,10 @@ var options = {
qos: 0,
retain: false
},
username: 'demo',
password: 'demo',
rejectUnauthorized: false
}

console.log('connecting mqtt client')
var client = mqtt.connect(host, options)

client.on('error', function (err) {
Expand All @@ -34,12 +40,10 @@ client.on('error', function (err) {

client.on('connect', function () {
console.log('client connected:' + clientId)
client.subscribe('topic', { qos: 0 })
client.publish('topic', 'wss secure connection demo...!', { qos: 0, retain: false })
})

client.subscribe('topic', { qos: 0 })

client.publish('topic', 'wss secure connection demo...!', { qos: 0, retain: false })

client.on('message', function (topic, message, packet) {
console.log('Received Message:= ' + message.toString() + '\nOn topic:= ' + topic)
})
Expand Down
9 changes: 6 additions & 3 deletions lib/connect/ws.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
'use strict'

var WebSocket = require('ws')
var debug = require('debug')('mqttjs:ws')
var websocket = require('websocket-stream')
var urlModule = require('url')
var WSS_OPTIONS = [
'rejectUnauthorized',
Expand Down Expand Up @@ -51,15 +51,18 @@ function setDefaultOpts (opts) {

function createWebSocket (client, opts) {
debug('createWebSocket')
debug('protocol: ' + opts.protocolId + ' ' + opts.protocolVersion)
var websocketSubProtocol =
(opts.protocolId === 'MQIsdp') && (opts.protocolVersion === 3)
? 'mqttv3.1'
: 'mqtt'

setDefaultOpts(opts)
var url = buildUrl(opts, client)
debug('url %s protocol %s', url, websocketSubProtocol)
return websocket(url, [websocketSubProtocol], opts.wsOptions)
debug('creating new Websocket for url: ' + url + ' and protocol: ' + websocketSubProtocol)
var ws = new WebSocket(url, [websocketSubProtocol], opts.wsOptions)
var duplex = WebSocket.createWebSocketStream(ws, opts.wsOptions)
return duplex
}

function streamBuilder (client, opts) {
Expand Down
21 changes: 11 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -64,19 +64,20 @@
"dependencies": {
"base64-js": "^1.3.0",
"commist": "^1.0.0",
"concat-stream": "^1.6.2",
"concat-stream": "^2.0.0",
"debug": "^4.1.1",
"end-of-stream": "^1.4.1",
"es6-map": "^0.1.5",
"help-me": "^1.0.1",
"inherits": "^2.0.3",
"minimist": "^1.2.0",
"mqtt-packet": "^6.0.0",
"minimist": "^1.2.5",
"mqtt-packet": "^6.3.2",
"pump": "^3.0.0",
"readable-stream": "^2.3.6",
"reinterval": "^1.1.0",
"split2": "^3.1.0",
"websocket-stream": "^5.1.2",
"utf-8-validate": "^5.0.2",
"ws": "^7.3.1",
"xtend": "^4.0.1"
},
"devDependencies": {
Expand All @@ -86,10 +87,11 @@
"chai": "^4.2.0",
"codecov": "^3.0.4",
"global": "^4.3.2",
"mkdirp": "^0.5.1",
"mocha": "^4.1.0",
"aedes": "^0.42.5",
"mkdirp": "^1.0.4",
"mocha": "^7.2.0",
"mqtt-connection": "^4.0.0",
"nyc": "^15.0.0",
"nyc": "^15.0.1",
"pre-commit": "^1.2.2",
"rimraf": "^3.0.2",
"safe-buffer": "^5.1.2",
Expand All @@ -98,11 +100,10 @@
"snazzy": "^8.0.0",
"standard": "^11.0.1",
"through2": "^3.0.0",
"tslint": "^5.11.0",
"tslint": "^6.1.2",
"tslint-config-standard": "^8.0.1",
"typescript": "^3.2.2",
"uglify-es": "^3.3.9",
"ws": "^3.3.3"
"uglify-js": "^3.4.5"
},
"standard": {
"env": [
Expand Down
37 changes: 18 additions & 19 deletions test/websocket_client.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,36 @@
'use strict'

var http = require('http')
var websocket = require('websocket-stream')
var WebSocketServer = require('ws').Server
var Connection = require('mqtt-connection')
var WebSocket = require('ws')
var MQTTConnection = require('mqtt-connection')
var abstractClientTests = require('./abstract_client')
var mqtt = require('../')
var xtend = require('xtend')
var assert = require('assert')
var port = 9999
var server = http.createServer()
var httpServer = http.createServer()

function attachWebsocketServer (wsServer) {
var wss = new WebSocketServer({server: wsServer, perMessageDeflate: false})
function attachWebsocketServer (httpServer) {
var webSocketServer = new WebSocket.Server({server: httpServer, perMessageDeflate: false})

wss.on('connection', function (ws) {
var stream = websocket(ws)
var connection = new Connection(stream)

wsServer.emit('client', connection)
webSocketServer.on('connection', function (ws) {
var stream = WebSocket.createWebSocketStream(ws)
var connection = new MQTTConnection(stream)
connection.protocol = ws.protocol
httpServer.emit('client', connection)
stream.on('error', function () {})
connection.on('error', function () {})
})

return wsServer
return httpServer
}

function attachClientEventHandlers (client) {
client.on('connect', function (packet) {
if (packet.clientId === 'invalid') {
client.connack({ returnCode: 2 })
} else {
server.emit('connect', client)
httpServer.emit('connect', client)
client.connack({returnCode: 0})
}
})
Expand Down Expand Up @@ -81,9 +80,9 @@ function attachClientEventHandlers (client) {
})
}

attachWebsocketServer(server)
attachWebsocketServer(httpServer)

server.on('client', attachClientEventHandlers).listen(port)
httpServer.on('client', attachClientEventHandlers).listen(port)

describe('Websocket Client', function () {
var baseConfig = { protocol: 'ws', port: port }
Expand All @@ -94,8 +93,8 @@ describe('Websocket Client', function () {
}

it('should use mqtt as the protocol by default', function (done) {
server.once('client', function (client) {
assert.strictEqual(client.stream.socket.protocol, 'mqtt')
httpServer.once('client', function (client) {
assert.strictEqual(client.protocol, 'mqtt')
})
mqtt.connect(makeOptions()).on('connect', function () {
this.end(true, done)
Expand Down Expand Up @@ -128,7 +127,7 @@ describe('Websocket Client', function () {
})

it('should use mqttv3.1 as the protocol if using v3.1', function (done) {
server.once('client', function (client) {
httpServer.once('client', function (client) {
assert.strictEqual(client.stream.socket.protocol, 'mqttv3.1')
})

Expand All @@ -142,5 +141,5 @@ describe('Websocket Client', function () {
})
})

abstractClientTests(server, makeOptions())
abstractClientTests(httpServer, makeOptions())
})

0 comments on commit d21d277

Please sign in to comment.