Skip to content

Commit

Permalink
fix(ws-client): fixes reconnect reaching max call stack
Browse files Browse the repository at this point in the history
  • Loading branch information
rafamel committed Nov 2, 2019
1 parent 0c2e60b commit db2f859
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 81 deletions.
82 changes: 46 additions & 36 deletions packages/ws-client/src/connect/connect.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@ import { connectEach } from './each';
import { createTracker } from './tracker';
import {
RPCClientConnectionEvent,
RPCClientConnectionActions,
RPCClientConnectionStatus,
RPCClientConnection,
DataOutput
} from '@karmic/rpc';
import { until } from 'promist';
import { until, Promist } from 'promist';

export const RECONNECT_DELAY = 5000;

Expand All @@ -21,22 +19,18 @@ export function connect(
): RPCClientConnection {
let retries = 0;
const events$ = new Subject<RPCClientConnectionEvent>();
let timer: null | NodeJS.Timer = null;
let current = new Promist<Pick<RPCClientConnection, 'actions' | 'status'>>();

let child = trunk();
function trunk(
delay?: number
): {
actions: RPCClientConnectionActions;
status: RPCClientConnectionStatus;
} {
function trunk(): void {
retries++;
let active = true;
// Reset retries when a request and a response have been successful
const tracker = createTracker(() => {
retries = 0;
});

const connection = connectEach(address, wsco, timeout, delay);
const connection = connectEach(address, wsco, timeout);
const subscription = connection.events$.subscribe({
next(value) {
if (!active) return;
Expand All @@ -62,6 +56,23 @@ export function connect(
}
});

current.resolve({
get status() {
return active ? connection.status : 'close';
},
actions: {
close: () => {
if (!active) return;
events$.next({ event: 'close', data: null });
close(false);
},
async send(data: DataOutput) {
await connection.actions.send(data);
tracker.request();
}
}
});

function close(retry: boolean): void {
if (!active) return;
active = false;
Expand All @@ -72,41 +83,40 @@ export function connect(
});

if (retry && (attempts <= 0 || attempts > retries)) {
// FIXME: callback stack error when too many reconnects
// (calls too deep)
child = trunk(RECONNECT_DELAY);
current = new Promist();
timer = setTimeout(() => {
timer = null;
trunk();
}, Math.max(RECONNECT_DELAY, 50));
} else {
events$.complete();
}
}

const send = async (data: DataOutput): Promise<void> => {
return connection.actions.send(data);
};
return {
get status() {
return active ? connection.status : 'close';
},
actions: {
close: () => {
if (!active) return;
events$.next({ event: 'close', data: null });
close(false);
},
send: (data: DataOutput) => {
return send(data).then(() => tracker.request());
}
}
};
}

trunk();
let closed = false;
return {
get status() {
return child.status;
return current.value && !closed ? current.value.status : 'close';
},
actions: {
send: (data: DataOutput) => child.actions.send(data),
close: () => child.actions.close()
async send(data: DataOutput) {
if (closed) throw Error(`Can't request over a closed socket`);
return current.then(({ actions }) => actions.send(data));
},
close: () => {
if (closed) return;
closed = true;

if (current.value) {
return current.value.actions.close();
}
if (timer) {
clearTimeout(timer);
return events$.complete();
}
}
},
events$: events$.asObservable()
};
Expand Down
80 changes: 35 additions & 45 deletions packages/ws-client/src/connect/each.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,50 +12,42 @@ import { Promist } from 'promist';
export function connectEach(
address: string,
wsco: WebSocket.ClientOptions,
timeout?: number,
delay?: number
timeout?: number
): RPCClientConnection {
let active = true;
let socket: null | WebSocket = null;
let status: RPCClientConnectionStatus = 'close';
const events$ = new Subject<RPCClientConnectionEvent>();

const socket = new WebSocket(address, wsco);
const earlyClose = new Promist<RPCClientConnectionEvent>();
const delayWait = Promist.wait(delay ? Math.max(0, delay) : 0);
earlyClose.then(() => delayWait.resolve());

delayWait.then(() => {
if (!active) return;

socket = new WebSocket(address, wsco);
if (timeout && timeout > 0) {
const timeoutWait = Promist.wait(timeout);
earlyClose.then(() => timeoutWait.resolve());
timeoutWait.then(() => {
if (!active || status === 'open') return;
close(
Error(`Connection didn't open by timeout: ${timeout}ms`),
true,
false
);
});
}

socket.on('open', () => {
if (!active) return;
status = 'open';
events$.next({ event: 'open' });
});
socket.on('error', (err) => {
close(err, true, false);
});
socket.on('close', () => {
close(Error(`Connection closed by server`), false, false);
});
socket.on('message', (message) => {
if (status !== 'open') return;
events$.next({ event: 'data', data: message });
if (timeout && timeout > 0) {
const timeoutWait = Promist.wait(timeout);
earlyClose.then(() => timeoutWait.resolve());
timeoutWait.then(() => {
if (!active || status === 'open') return;
close(
Error(`Connection didn't open by timeout: ${timeout}ms`),
true,
false
);
});
}

socket.on('open', () => {
if (!active) return;
status = 'open';
events$.next({ event: 'open' });
});
socket.on('error', (err) => {
close(err, true, false);
});
socket.on('close', () => {
close(Error(`Connection closed by server`), false, false);
});
socket.on('message', (message) => {
if (status !== 'open') return;
events$.next({ event: 'data', data: message });
});

function close(error: Error | null, explicit: boolean, early: boolean): void {
Expand All @@ -66,15 +58,13 @@ export function connectEach(
events$.next({ event: 'close', data: error });
events$.complete();
if (!early) earlyClose.resolve({ event: 'close', data: error });
if (socket) {
try {
socket.removeAllListeners();
if (explicit) socket.close();
} catch (err) {}
}
try {
socket.removeAllListeners();
if (explicit) socket.close();
} catch (err) {}
}

const ready = Promise.race([
const ready: Promise<RPCClientConnectionEvent | void> = Promise.race([
earlyClose,
events$
.pipe(
Expand All @@ -93,10 +83,10 @@ export function connectEach(
return new Promise((resolve, reject) => {
ready
.then((x) => {
if (socket && status === 'open') {
if (status === 'open') {
socket.send(data, (err) => (err ? reject(err) : resolve()));
} else {
if (x.event === 'close' && x.data) reject(x.data);
if (x && x.event === 'close' && x.data) reject(x.data);
else reject(Error(`Can't request over a closed socket`));
}
})
Expand Down

0 comments on commit db2f859

Please sign in to comment.