Skip to content

Commit

Permalink
feat: custom websocket support (#1696)
Browse files Browse the repository at this point in the history
  • Loading branch information
helios57 committed Sep 19, 2023
1 parent 827daa8 commit d6fd3a8
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 7 deletions.
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,28 @@ use the latest auth token, you must have some outside mechanism running that
handles application-level authentication refreshing so that the websocket
connection can simply grab the latest valid token or signed url.

#### Customize Websockets with `createWebsocket` (Websocket Only)

When you need to add a custom websocket subprotocol or header to open a connection
through a proxy with custom authentication this callback allows you to create your own
instance of a websocket which will be used in the mqtt client.

```js
const createWebsocket = createWebsocket(url, websocketSubProtocols, options) => {
const subProtocols = [
websocketSubProtocols[0],
'myCustomSubprotocolOrOAuthToken',
]
return new WebSocket(url, subProtocols)
}

const connection = await mqtt.connectAsync(<wss url>, {
...,
createWebsocket: createWebsocket,
});
```


#### Enabling Reconnection with `reconnectPeriod` option

To ensure that the mqtt client automatically tries to reconnect when the
Expand Down Expand Up @@ -429,6 +451,8 @@ The arguments are:
- `transformWsUrl` : optional `(url, options, client) => url` function
For ws/wss protocols only. Can be used to implement signing
urls which upon reconnect can have become expired.
- `createWebsocket` : optional `url, websocketSubProtocols, options) => Websocket` function
For ws/wss protocols only. Can be used to implement a custom websocket subprotocol or implementation.
- `resubscribe` : if connection is broken and reconnects,
subscribed topics are automatically subscribed again (default `true`)
- `messageIdProvider`: custom messageId provider. when `new UniqueMessageIdProvider()` is set, then non conflict messageId is provided.
Expand Down
7 changes: 7 additions & 0 deletions src/lib/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,13 @@ export interface IClientOptions extends ISecureClientOptions {
client: MqttClient,
) => string

/** when defined this function will be called to create the Websocket instance, used to add custom protocols or websocket implementations */
createWebsocket?: (
url: string,
websocketSubProtocols: string[],
options: IClientOptions,
) => any

/** Custom message id provider */
messageIdProvider?: IMessageIdProvider

Expand Down
22 changes: 16 additions & 6 deletions src/lib/connect/ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,16 @@ function createWebSocket(
debug(
`creating new Websocket for url: ${url} and protocol: ${websocketSubProtocol}`,
)
const socket = new WS(
url,
[websocketSubProtocol],
opts.wsOptions as ClientOptions,
)
let socket: WS
if (opts.createWebsocket) {
socket = opts.createWebsocket(url, [websocketSubProtocol], opts)
} else {
socket = new WS(
url,
[websocketSubProtocol],
opts.wsOptions as ClientOptions,
)
}
return socket
}

Expand All @@ -123,7 +128,12 @@ function createBrowserWebSocket(client: MqttClient, opts: IClientOptions) {
: 'mqtt'

const url = buildUrl(opts, client)
const socket = new WebSocket(url, [websocketSubProtocol])
let socket: WebSocket
if (opts.createWebsocket) {
socket = opts.createWebsocket(url, [websocketSubProtocol], opts)
} else {
socket = new WebSocket(url, [websocketSubProtocol])
}
socket.binaryType = 'arraybuffer'
return socket
}
Expand Down
34 changes: 33 additions & 1 deletion test/websocket_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ import { IClientOptions } from '../src/lib/client'

const port = 9999
const httpServer = http.createServer()

let lastProcotols = new Set<string>()
function attachWebsocketServer(httpServer2) {
const webSocketServer = new WebSocket.Server({
server: httpServer2,
handleProtocols: (protocols: Set<string>, request: any) => {
lastProcotols = protocols
return [...protocols][0]
},
perMessageDeflate: false,
})

Expand Down Expand Up @@ -132,6 +136,34 @@ describe('Websocket Client', () => {
})
})

it('should be able to create custom Websocket instance', function test(done) {
const baseUrl = 'ws://localhost:9999/mqtt'
let urlInCallback: string
const opts = makeOptions({
path: '/mqtt',
createWebsocket(
url: string,
websocketSubProtocols: string[],
options: IClientOptions,
) {
urlInCallback = url
assert.equal(url, baseUrl)
const subProtocols = [
websocketSubProtocols[0],
'myCustomSubprotocol',
]
return new WebSocket(url, subProtocols)
},
})
const client = mqtt.connect(opts)
client.on('connect', () => {
assert.equal((client.stream as any).url, urlInCallback)
assert.equal(baseUrl, urlInCallback)
assert.equal('myCustomSubprotocol', [...lastProcotols][1])
client.end(true, (err) => done(err))
})
})

it('should use mqttv3.1 as the protocol if using v3.1', function test(done) {
httpServer.once('client', (client) => {
assert.strictEqual(client.protocol, 'mqttv3.1')
Expand Down

0 comments on commit d6fd3a8

Please sign in to comment.