Skip to content

Commit

Permalink
fix(ws-client): works with browser native WebSockets
Browse files Browse the repository at this point in the history
  • Loading branch information
rafamel committed Nov 6, 2019
1 parent 90576de commit 29bb394
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 26 deletions.
4 changes: 1 addition & 3 deletions packages/ws-client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/ws-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@
"dependencies": {
"@karmic/core": "^0.2.0",
"@karmic/rpc": "^0.2.0",
"errorish": "^0.4.0",
"isomorphic-ws": "^4.0.1",
"promist": "^2.0.0"
},
Expand Down
9 changes: 6 additions & 3 deletions packages/ws-client/src/WSClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,18 @@ import { connect } from './connect';
import WebSocket from 'isomorphic-ws';

export class WSClient extends RPCClient {
/**
* `wso` options -options for `ws`- don't have any effect on browsers.
*/
public constructor(
address: string,
wsco?: WebSocket.ClientOptions | null,
options?: WSClientOptions
options?: WSClientOptions | null,
wso?: WebSocket.ClientOptions
) {
const opts = Object.assign(createDefaults(), options);
const connection = connect(
address,
wsco || {},
wso || {},
opts.attempts,
opts.connectTimeout
);
Expand Down
4 changes: 2 additions & 2 deletions packages/ws-client/src/connect/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const RECONNECT_DELAY = 5000;

export function connect(
address: string,
wsco: WebSocket.ClientOptions,
wso: WebSocket.ClientOptions,
attempts: number,
timeout: number
): RPCClientConnection {
Expand All @@ -30,7 +30,7 @@ export function connect(
retries = 0;
});

const connection = connectEach(address, wsco, timeout);
const connection = connectEach(address, wso, timeout);
const subscription = connection.events$.subscribe({
next(value) {
if (!active) return;
Expand Down
80 changes: 62 additions & 18 deletions packages/ws-client/src/connect/each.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,19 @@ import {
DataOutput
} from '@karmic/rpc';
import { Promist } from 'promist';
import { ensure } from 'errorish';

// It's duck typing day
export const isNative = !Object.hasOwnProperty.call(WebSocket, 'Server');

export function connectEach(
address: string,
wsco: WebSocket.ClientOptions,
wso: WebSocket.ClientOptions,
timeout?: number
): RPCClientConnection {
let active = true;
let status: RPCClientConnectionStatus = 'close';
const events$ = new Subject<RPCClientConnectionEvent>();

const socket = new WebSocket(address, wsco);
const earlyClose = new Promist<RPCClientConnectionEvent>();

if (timeout && timeout > 0) {
Expand All @@ -34,21 +36,52 @@ export function connectEach(
});
}

socket.on('open', () => {
let socket: null | WebSocket = null;
try {
socket = isNative ? new WebSocket(address) : new WebSocket(address, wso);
} catch (err) {
// failing asynchronously; if it fails synchronously
// events$ will complete before it is subscribed to on ./connect.ts
setTimeout(() => close(err, false, false), 0);
}
let send: (data: any) => Promise<void> = () => {
return Promise.reject(Error(`Can't request over a closed socket`));
};
if (socket) {
const fn = socket.send.bind(socket);
send = isNative
? async (data: any) => fn(data)
: (data: any) => {
return new Promise((resolve, reject) =>
fn(data, (err) => (err ? reject(err) : resolve()))
);
};
}

function onopen(): void {
if (!active) return;
status = 'open';
events$.next({ event: 'open' });
});
socket.on('error', (err) => {
close(err, true, false);
});
socket.on('close', () => {
}
function onerror(event: { error: any }): void {
if (!active) return;
close(ensure(event.error), true, false);
}
function onclose(): void {
if (!active) return;
close(Error(`Connection closed by server`), false, false);
});
socket.on('message', (message) => {
}
function onmessage(event: { data: any }): void {
if (status !== 'open') return;
events$.next({ event: 'data', data: message });
});
events$.next({ event: 'data', data: event.data });
}

if (socket) {
socket.addEventListener('open', onopen);
socket.addEventListener('error', onerror);
socket.addEventListener('close', onclose);
socket.addEventListener('message', onmessage);
}

function close(error: Error | null, explicit: boolean, early: boolean): void {
if (!active) return;
Expand All @@ -58,10 +91,19 @@ export function connectEach(
events$.next({ event: 'close', data: error });
events$.complete();
if (!early) earlyClose.resolve({ event: 'close', data: error });
try {
socket.removeAllListeners();
if (explicit) socket.close();
} catch (err) {}
if (socket) {
try {
socket.removeEventListener('open', onopen);
socket.removeEventListener('error', onerror);
socket.removeEventListener('close', onclose);
socket.removeEventListener('message', onmessage);
} catch (err) {}
if (explicit) {
try {
socket.close();
} catch (err) {}
}
}
}

const ready: Promise<RPCClientConnectionEvent | void> = Promise.race([
Expand All @@ -84,7 +126,9 @@ export function connectEach(
ready
.then((x) => {
if (status === 'open') {
socket.send(data, (err) => (err ? reject(err) : resolve()));
send(data)
.then(resolve)
.catch(reject);
} else {
if (x && x.event === 'close' && x.data) reject(x.data);
else reject(Error(`Can't request over a closed socket`));
Expand Down

0 comments on commit 29bb394

Please sign in to comment.