From 2226d33f1662985e5e2ea4f2d38de941d335ff7a Mon Sep 17 00:00:00 2001 From: masad-frost Date: Thu, 13 Feb 2020 18:46:38 -0800 Subject: [PATCH 1/6] Add typings to client close event --- src/client.ts | 27 +++++++++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/src/client.ts b/src/client.ts index be9ffa07..41288a17 100644 --- a/src/client.ts +++ b/src/client.ts @@ -568,3 +568,30 @@ export class Client extends EventEmitter { ping(); }; } + +/** + * Emitted when there's an error while the channel is opening + * @asMemberOf Channel + * @event + */ +declare function close(c: { closeEvent?: CloseEvent; expected: boolean }): void; + +export declare interface Client extends EventEmitter { + on(event: 'close', listener: typeof close): this; + addListener(event: 'close', listener: typeof close): this; + + once(event: 'close', listener: typeof close): this; + + prependListener(event: 'close', listener: typeof close): this; + + prependOnceListener(event: 'close', listener: typeof close): this; + + off(event: 'close', listener: typeof close): this; + removeListener(event: 'close', listener: typeof close): this; + + emit(event: 'close', ...args: Parameters): boolean; + + removeAllListeners(event?: 'close'): this; + + eventNames(): Array<'close'>; +} From d7f8963ec84e6c6164790bc94849361e884fc813 Mon Sep 17 00:00:00 2001 From: masad-frost Date: Thu, 13 Feb 2020 18:49:42 -0800 Subject: [PATCH 2/6] Simplify tryConnect logic Much less indirection. Does not include timeouts --- src/client.ts | 210 ++++++++++++++++++++++++++++++++------------------ 1 file changed, 135 insertions(+), 75 deletions(-) diff --git a/src/client.ts b/src/client.ts index 41288a17..0731d6f8 100644 --- a/src/client.ts +++ b/src/client.ts @@ -3,7 +3,6 @@ import { EventEmitter } from 'events'; import { api } from '@replit/protocol'; import { Channel } from './channel'; -import { createDeferred, Deferred } from './deferred'; import { EIOCompat } from './EIOCompat'; enum ConnectionState { @@ -85,8 +84,6 @@ const getWebSocketClass = (options: ConnectOptions) => { }; export class Client extends EventEmitter { - public containerState: api.ContainerState.State | null; - public connectionState: ConnectionState; private token: string | null; @@ -97,8 +94,6 @@ export class Client extends EventEmitter { [id: number]: Channel; }; - private deferredReady: Deferred | null; - private debug: DebugFunc; private didConnect: boolean; @@ -116,8 +111,6 @@ export class Client extends EventEmitter { this.channels = { 0: new Channel(), }; - this.deferredReady = null; - this.containerState = null; this.token = null; this.connectionState = ConnectionState.DISCONNECTED; this.debug = debug; @@ -334,44 +327,6 @@ export class Client extends EventEmitter { this.getChannel(cmd.channel).onCommand(cmd); switch (cmd.body) { - case 'containerState': - if (cmd.containerState == null || cmd.containerState.state == null) { - const err = new Error('Expected container state to have state, got null or undefined'); - - if (this.deferredReady) { - this.deferredReady.reject(err); - return; - } - - this.debug({ type: 'breadcrumb', message: 'error', data: err.message }); - throw err; - } - - this.debug({ - type: 'breadcrumb', - message: 'containerState', - data: this.containerState, - }); - - this.containerState = cmd.containerState.state; - - if (this.containerState === api.ContainerState.State.READY) { - if (this.deferredReady) { - this.deferredReady.resolve(); - this.deferredReady = null; - } - - if (this.getChannel(0).isOpen === false) { - this.getChannel(0).onOpen(0, api.OpenChannelRes.State.CREATED, this.send); - } - } - - if (this.containerState === api.ContainerState.State.SLEEP) { - this.onClose({ expected: false }); - } - - break; - case 'closeChanRes': if (cmd.closeChanRes == null) { throw new Error('Expected closeChanRes'); @@ -421,7 +376,6 @@ export class Client extends EventEmitter { private onClose = ({ closeEvent, expected }: { closeEvent?: CloseEvent; expected: boolean }) => { this.connectionState = ConnectionState.DISCONNECTED; - this.containerState = null; this.debug({ type: 'breadcrumb', @@ -461,11 +415,6 @@ export class Client extends EventEmitter { }); } - if (this.deferredReady) { - this.deferredReady.reject(new Error('Connection closed before the server was ready')); - this.deferredReady = null; - } - this.emit('close', { closeEvent, expected }); }; @@ -503,40 +452,151 @@ export class Client extends EventEmitter { ws.onclose = this.onSocketClose; this.ws = ws; - this.deferredReady = createDeferred(); + let onSuccess: () => void; + let onFailed: (err: Error) => void; - const rej = this.deferredReady.reject; + const onCommand = (cmd: api.Command) => { + if (cmd.containerState == null) { + return; + } - let timeoutId: NodeJS.Timer; - if (timeout != null) { - timeoutId = setTimeout(() => { - this.debug({ type: 'breadcrumb', message: 'timeout' }); + if (cmd.containerState.state == null) { + onFailed(new Error('Got containterState but state was not defined')); - if (this.deferredReady) { - rej(new Error('timeout')); - this.deferredReady = null; - } + return; + } - this.close(); - }, timeout); - } + const { state } = cmd.containerState; + + this.debug({ + type: 'breadcrumb', + message: 'containerState', + data: state, + }); + + const StateEnum = api.ContainerState.State; + + switch (state) { + case StateEnum.READY: + onSuccess(); - this.deferredReady.reject = (reason) => { - // Make sure we clear the timeout when rejecting - clearTimeout(timeoutId); - rej(reason); + if (this.getChannel(0).isOpen === false) { + this.getChannel(0).onOpen(0, api.OpenChannelRes.State.CREATED, this.send); + } + + return; + + case StateEnum.SLEEP: + onFailed(new Error('Got SLEEP as container state')); + + break; + + default: + } }; + const chan0 = this.getChannel(0); + chan0.on('command', onCommand); - const res = this.deferredReady.resolve; - this.deferredReady.resolve = (v) => { - this.debug({ type: 'breadcrumb', message: 'connected!' }); - this.startPing(); + const originalOnClose = this.onClose; + this.onClose = ({ expected, closeEvent }) => { + originalOnClose({ expected, closeEvent }); - clearTimeout(timeoutId); - res(v); + if (expected) { + onFailed(new Error('You called `Client.close` before you connected')); + } + }; + + const cleanup = () => { + this.onClose = originalOnClose; + chan0.off('command', onCommand); }; - return this.deferredReady.promise; + return new Promise((_res, _rej) => { + onSuccess = () => { + _res(); + cleanup(); + + this.debug({ type: 'breadcrumb', message: 'connected!' }); + this.startPing(); + }; + + onFailed = (err) => { + _rej(err); + cleanup(); + + this.debug({ type: 'breadcrumb', message: 'connect failed' }); + }; + }); + // const onSuccess = () => { + // this.connectCallback = null; + + // this.debug({ type: 'breadcrumb', message: 'connected!' }); + // this.startPing(); + + // resolve(); + // }; + + // const onError = (err: Error) => { + // this.connectCallback = null; + + // this.debug({ type: 'breadcrumb', message: 'connection failed' }); + + // reject(err); + // }; + + // // Create a callback so that we can + // this.connectCallback = (err) => { + // if (err) { + // onError(err); + + // return; + // } + + // onSuccess(); + // }; + + // if (timeout == null) { + // return; + // } + + // let timeoutId: NodeJS.Timer; + // }); + + // TODO handle timeout case + // this.deferredReady = createDeferred(); + + // const rej = this.deferredReady.reject; + + // let timeoutId: NodeJS.Timer; + // if (timeout != null) { + // timeoutId = setTimeout(() => { + // this.debug({ type: 'breadcrumb', message: 'timeout' }); + + // if (this.deferredReady) { + // rej(new Error('timeout')); + // this.deferredReady = null; + // } + + // this.close(); + // }, timeout); + // } + + // this.deferredReady.reject = (reason) => { + // // Make sure we clear the timeout when rejecting + // clearTimeout(timeoutId); + // rej(reason); + // }; + + // const res = this.deferredReady.resolve; + // this.deferredReady.resolve = (v) => { + // this.debug({ type: 'breadcrumb', message: 'connected!' }); + // this.startPing(); + + // clearTimeout(timeoutId); + // res(v); + // }; + + // return this.deferredReady.promise; }; private startPing = () => { From 3c26f9d3c395633623f2aecd9066f34bb0a224a8 Mon Sep 17 00:00:00 2001 From: masad-frost Date: Thu, 13 Feb 2020 21:09:53 -0800 Subject: [PATCH 3/6] Re-add timeout support we changed the way it works a bit in that if we ever get a message on the socket we will reset the timeout --- src/client.ts | 140 ++++++++++++++++++++++---------------------------- 1 file changed, 60 insertions(+), 80 deletions(-) diff --git a/src/client.ts b/src/client.ts index 0731d6f8..b285ec15 100644 --- a/src/client.ts +++ b/src/client.ts @@ -375,6 +375,8 @@ export class Client extends EventEmitter { }; private onClose = ({ closeEvent, expected }: { closeEvent?: CloseEvent; expected: boolean }) => { + const prevState = this.connectionState; + this.connectionState = ConnectionState.DISCONNECTED; this.debug({ @@ -415,7 +417,9 @@ export class Client extends EventEmitter { }); } - this.emit('close', { closeEvent, expected }); + if (prevState !== ConnectionState.DISCONNECTED) { + this.emit('close', { closeEvent, expected }); + } }; private onSocketClose = (closeEvent: CloseEvent) => { @@ -455,7 +459,47 @@ export class Client extends EventEmitter { let onSuccess: () => void; let onFailed: (err: Error) => void; + /** + * If the user specifies a timeout we will short circuit + * the connection if we don't get READY from the container + * within the specified timeout. + * + * Every time we get a message we reset the connection timeout + * this is because it signifies that the connection will eventually work. + */ + let resetTimeout = () => {}; + let cancelTimeout = () => {}; + if (timeout) { + let timeoutId: NodeJS.Timer; // Can also be of type `number` in the browser + + cancelTimeout = () => clearTimeout(timeoutId); + + resetTimeout = () => { + cancelTimeout(); + + timeoutId = setTimeout(() => { + this.debug({ type: 'breadcrumb', message: 'connect timeout' }); + + onFailed(new Error('timeout')); + }, timeout); + }; + } + + /** Listen to incoming commands + * Every time we get a message we reset the connection timeout (if it exists) + * this is because it signifies that the connection will eventually work. + * + * If we ever get a ContainterState READY we can officially + * say that the connection is successful and we resolve the returned promise. + * + * If we ever get ContainterState SLEEP it means that something went wrong + * and connection should be dropped + */ const onCommand = (cmd: api.Command) => { + // Everytime we get a message on channel0 + // we will reset the timeout + resetTimeout(); + if (cmd.containerState == null) { return; } @@ -484,7 +528,7 @@ export class Client extends EventEmitter { this.getChannel(0).onOpen(0, api.OpenChannelRes.State.CREATED, this.send); } - return; + break; case StateEnum.SLEEP: onFailed(new Error('Got SLEEP as container state')); @@ -497,17 +541,21 @@ export class Client extends EventEmitter { const chan0 = this.getChannel(0); chan0.on('command', onCommand); - const originalOnClose = this.onClose; - this.onClose = ({ expected, closeEvent }) => { - originalOnClose({ expected, closeEvent }); - - if (expected) { - onFailed(new Error('You called `Client.close` before you connected')); - } + /** + * The user might call `close` before we even connect + * we wanna make sure we reject the promise if that happens + * so we monkey patch our own `close` function ;) + */ + let closeCalledByUser = false; + const originalClose = this.close; + this.close = () => { + closeCalledByUser = true; + onFailed(new Error('You called `Client.close` before you connected')); }; const cleanup = () => { - this.onClose = originalOnClose; + cancelTimeout(); + this.close = originalClose; chan0.off('command', onCommand); }; @@ -524,79 +572,11 @@ export class Client extends EventEmitter { _rej(err); cleanup(); + this.onClose({ expected: closeCalledByUser }); + this.debug({ type: 'breadcrumb', message: 'connect failed' }); }; }); - // const onSuccess = () => { - // this.connectCallback = null; - - // this.debug({ type: 'breadcrumb', message: 'connected!' }); - // this.startPing(); - - // resolve(); - // }; - - // const onError = (err: Error) => { - // this.connectCallback = null; - - // this.debug({ type: 'breadcrumb', message: 'connection failed' }); - - // reject(err); - // }; - - // // Create a callback so that we can - // this.connectCallback = (err) => { - // if (err) { - // onError(err); - - // return; - // } - - // onSuccess(); - // }; - - // if (timeout == null) { - // return; - // } - - // let timeoutId: NodeJS.Timer; - // }); - - // TODO handle timeout case - // this.deferredReady = createDeferred(); - - // const rej = this.deferredReady.reject; - - // let timeoutId: NodeJS.Timer; - // if (timeout != null) { - // timeoutId = setTimeout(() => { - // this.debug({ type: 'breadcrumb', message: 'timeout' }); - - // if (this.deferredReady) { - // rej(new Error('timeout')); - // this.deferredReady = null; - // } - - // this.close(); - // }, timeout); - // } - - // this.deferredReady.reject = (reason) => { - // // Make sure we clear the timeout when rejecting - // clearTimeout(timeoutId); - // rej(reason); - // }; - - // const res = this.deferredReady.resolve; - // this.deferredReady.resolve = (v) => { - // this.debug({ type: 'breadcrumb', message: 'connected!' }); - // this.startPing(); - - // clearTimeout(timeoutId); - // res(v); - // }; - - // return this.deferredReady.promise; }; private startPing = () => { From 8200b7888a81665a78888664355ac29f0c444e90 Mon Sep 17 00:00:00 2001 From: masad-frost Date: Thu, 13 Feb 2020 21:39:54 -0800 Subject: [PATCH 4/6] Cleanup socket when connection fails --- src/client.ts | 89 ++++++++++++++++++++++++++------------------------- 1 file changed, 46 insertions(+), 43 deletions(-) diff --git a/src/client.ts b/src/client.ts index b285ec15..ab5e300c 100644 --- a/src/client.ts +++ b/src/client.ts @@ -162,7 +162,11 @@ export class Client extends EventEmitter { const completeOptions: Required = { token: options.token, - urlOptions: options.urlOptions || { secure: false, host: 'eval.repl.it', port: '80' }, + urlOptions: options.urlOptions || { + secure: false, + host: 'eval.repl.it', + port: '80', + }, timeout: options.timeout || null, // eslint-disable-next-line @typescript-eslint/ban-ts-ignore // @ts-ignore: EIOCompat is compatible with the WebSocket api but @@ -266,7 +270,11 @@ export class Client extends EventEmitter { /** * Closes the socket connection and handles cleanup */ - public close = () => this.onClose({ expected: true }); + public close = () => { + this.debug({ type: 'breadcrumb', message: 'user close' }); + + this.onClose({ expected: true }); + }; /** Gets a channel by Id */ public getChannel(id: number): Channel { @@ -375,38 +383,7 @@ export class Client extends EventEmitter { }; private onClose = ({ closeEvent, expected }: { closeEvent?: CloseEvent; expected: boolean }) => { - const prevState = this.connectionState; - - this.connectionState = ConnectionState.DISCONNECTED; - - this.debug({ - type: 'breadcrumb', - message: 'close', - data: { - expected, - closeReason: closeEvent ? closeEvent.reason : undefined, - }, - }); - - if (this.ws) { - this.ws.onmessage = null; - this.ws.onclose = null; - - if (this.ws.readyState === 0 || this.ws.readyState === 1) { - this.debug({ - type: 'breadcrumb', - message: 'wsclose', - data: { - expected, - closeReason: closeEvent ? closeEvent.reason : undefined, - }, - }); - - this.ws.close(); - } - - this.ws = null; - } + this.cleanupSocket(); if (this.didConnect) { // Only close the channels if we ever connected @@ -417,20 +394,47 @@ export class Client extends EventEmitter { }); } - if (prevState !== ConnectionState.DISCONNECTED) { + if (this.connectionState !== ConnectionState.DISCONNECTED) { this.emit('close', { closeEvent, expected }); } + + this.connectionState = ConnectionState.DISCONNECTED; }; - private onSocketClose = (closeEvent: CloseEvent) => { - if (this.connectionState !== ConnectionState.DISCONNECTED) { - this.onClose({ - closeEvent, - expected: false, + private cleanupSocket = () => { + const { ws } = this; + + if (!ws) { + return; + } + + this.ws = null; + + ws.onmessage = null; + ws.onclose = null; + + if (ws.readyState === 0 || ws.readyState === 1) { + this.debug({ + type: 'breadcrumb', + message: 'wsclose', }); + + ws.close(); } }; + private onSocketClose = (closeEvent: CloseEvent) => { + this.debug({ + type: 'breadcrumb', + message: 'wsclose', + data: { + closeReason: closeEvent ? closeEvent.reason : undefined, + }, + }); + + this.onClose({ closeEvent, expected: false }); + }; + private tryConnect = async ({ token, urlOptions, @@ -546,10 +550,9 @@ export class Client extends EventEmitter { * we wanna make sure we reject the promise if that happens * so we monkey patch our own `close` function ;) */ - let closeCalledByUser = false; const originalClose = this.close; this.close = () => { - closeCalledByUser = true; + this.debug({ type: 'breadcrumb', message: 'user close' }); onFailed(new Error('You called `Client.close` before you connected')); }; @@ -572,7 +575,7 @@ export class Client extends EventEmitter { _rej(err); cleanup(); - this.onClose({ expected: closeCalledByUser }); + this.cleanupSocket(); this.debug({ type: 'breadcrumb', message: 'connect failed' }); }; From a48f35edf1c6cbbc33b8dca87a8ae2bce54f6ccb Mon Sep 17 00:00:00 2001 From: masad-frost Date: Thu, 13 Feb 2020 22:08:37 -0800 Subject: [PATCH 5/6] Make sure we fail on abrupt socket closures --- src/client.ts | 55 ++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/src/client.ts b/src/client.ts index ab5e300c..1c6f66d3 100644 --- a/src/client.ts +++ b/src/client.ts @@ -423,18 +423,6 @@ export class Client extends EventEmitter { } }; - private onSocketClose = (closeEvent: CloseEvent) => { - this.debug({ - type: 'breadcrumb', - message: 'wsclose', - data: { - closeReason: closeEvent ? closeEvent.reason : undefined, - }, - }); - - this.onClose({ closeEvent, expected: false }); - }; - private tryConnect = async ({ token, urlOptions, @@ -457,12 +445,29 @@ export class Client extends EventEmitter { ws.binaryType = 'arraybuffer'; ws.onmessage = this.onSocketMessage; - ws.onclose = this.onSocketClose; this.ws = ws; + /** + * success is only called when we get + * ContainerState.READY command + */ let onSuccess: () => void; + /** + * Failure can happen due to a number of reasons + * 1- Abrupt socket closure + * 2- Timedout connection request + * 3- ContainerState.SLEEP command + * 4- Use calling `close` before we connect + */ let onFailed: (err: Error) => void; + /** + * Abrupt socket closures should report failed + */ + ws.onclose = () => { + onFailed(new Error('WebSocket closed before we got READY')); + }; + /** * If the user specifies a timeout we will short circuit * the connection if we don't get READY from the container @@ -556,7 +561,10 @@ export class Client extends EventEmitter { onFailed(new Error('You called `Client.close` before you connected')); }; - const cleanup = () => { + /** + * We call this as a cleanup method after we settle the connection + */ + const onFinally = () => { cancelTimeout(); this.close = originalClose; chan0.off('command', onCommand); @@ -564,16 +572,31 @@ export class Client extends EventEmitter { return new Promise((_res, _rej) => { onSuccess = () => { + onFinally(); + + // Update socket closure to do something else + ws.onclose = (closeEvent: CloseEvent) => { + this.debug({ + type: 'breadcrumb', + message: 'wsclose', + data: { + closeReason: closeEvent ? closeEvent.reason : undefined, + }, + }); + + this.onClose({ closeEvent, expected: false }); + }; + _res(); - cleanup(); this.debug({ type: 'breadcrumb', message: 'connected!' }); this.startPing(); }; onFailed = (err) => { + onFinally(); + _rej(err); - cleanup(); this.cleanupSocket(); From d6dedc5aab6722c557da6df03b71e4e367af305d Mon Sep 17 00:00:00 2001 From: masad-frost Date: Thu, 13 Feb 2020 22:15:45 -0800 Subject: [PATCH 6/6] Always close channels --- src/client.ts | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/src/client.ts b/src/client.ts index 1c6f66d3..bea5cd22 100644 --- a/src/client.ts +++ b/src/client.ts @@ -268,7 +268,10 @@ export class Client extends EventEmitter { }; /** - * Closes the socket connection and handles cleanup + * Closes the connection. + * - If `connect` was called and not settled it will also reject the promise + * - If there's an open WebSocket connection it will be closed + * - Any open channels or channel requests are closed */ public close = () => { this.debug({ type: 'breadcrumb', message: 'user close' }); @@ -385,14 +388,9 @@ export class Client extends EventEmitter { private onClose = ({ closeEvent, expected }: { closeEvent?: CloseEvent; expected: boolean }) => { this.cleanupSocket(); - if (this.didConnect) { - // Only close the channels if we ever connected - // so that we can retry without losing queued up - // messages. - Object.keys(this.channels).forEach((id) => { - this.handleCloseChannel({ id: Number(id) }); - }); - } + Object.keys(this.channels).forEach((id) => { + this.handleCloseChannel({ id: Number(id) }); + }); if (this.connectionState !== ConnectionState.DISCONNECTED) { this.emit('close', { closeEvent, expected });