Skip to content

Commit

Permalink
feat(rpc-client): adds client connect
Browse files Browse the repository at this point in the history
  • Loading branch information
rafamel committed Oct 25, 2019
1 parent 18b3e1b commit 6f780bf
Show file tree
Hide file tree
Showing 9 changed files with 305 additions and 1 deletion.
13 changes: 13 additions & 0 deletions packages/rpc-client/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion packages/rpc-client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@
"ws": "^7.1.2"
},
"dependencies": {
"isomorphic-ws": "^4.0.1"
"isomorphic-ws": "^4.0.1",
"promist": "^1.0.0"
},
"peerDependencies": {
"@karmic/core": "0.0.0",
Expand Down
101 changes: 101 additions & 0 deletions packages/rpc-client/src/client/connect/connect.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { Subject } from 'rxjs';
import WebSocket from 'isomorphic-ws';
import { connectEach } from './each';
import {
ConnectionActions,
ConnectionStatus,
ConnectionEvent,
Connection
} from './types';
import { createTracker } from './tracker';

export const RECONNECT_DELAY = 5000;

export function connect(
address: string,
wsco: WebSocket.ClientOptions,
attempts: number,
timeout: number
): Connection {
let retries = 0;
const events$ = new Subject<ConnectionEvent>();

let child = trunk();
function trunk(
delay?: number
): { actions: ConnectionActions; status: ConnectionStatus } {
retries++;
let active = true;
// Reset retries when a request and a response have been successful
const tracker = createTracker(() => {
retries = 0;
});

const connection = connectEach(address, wsco, timeout, delay);
const subscription = connection.events$.subscribe({
next(value) {
if (!active) return;
events$.next(value);

if (value.event === 'message') tracker.response();
else if (value.event === 'close') close(true);
},
error(error) {
if (!active) return;
events$.next({ event: 'close', data: error });
close(true);
},
complete() {
if (!active) return;
events$.next({
event: 'close',
data: Error(
`Inner connection completed without an explicit call or an error`
)
});
close(true);
}
});

function close(retry: boolean): void {
if (!active) return;
active = false;

connection.actions.close();
setTimeout(() => subscription.unsubscribe(), 0);

if (retry && (attempts <= 0 || attempts > retries)) {
child = trunk(RECONNECT_DELAY);
} else {
events$.complete();
}
}

return {
get status(): ConnectionStatus {
return active ? connection.status : 'close';
},
actions: {
close: () => {
if (!active) return;
events$.next({ event: 'close', data: null });
close(false);
},
send: (data: string) => {
return connection.actions.send(data).then(() => tracker.request());
}
}
};
}

return {
get status() {
return child.status;
},
actions: {
send: (data: string) => child.actions.send(data),
close: () => child.actions.close()
},
events$: events$.asObservable()
};
}
111 changes: 111 additions & 0 deletions packages/rpc-client/src/client/connect/each.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import WebSocket from 'isomorphic-ws';
import { BehaviorSubject } from 'rxjs';
import { deferred } from 'promist';
import { filter, take } from 'rxjs/operators';
import { Connection, ConnectionStatus, ConnectionEvent } from './types';
import { resolvableWait } from '../helpers';

export function connectEach(
address: string,
wsco: WebSocket.ClientOptions,
timeout?: number,
delay?: number
): Connection {
let socket: null | WebSocket = null;
let status: ConnectionStatus = 'pending';
const events$ = new BehaviorSubject<ConnectionEvent>({ event: 'pending' });

const earlyClose = deferred<ConnectionEvent>();

const delayWait = resolvableWait(delay ? Math.max(0, delay) : 0);
earlyClose.then(() => delayWait.resolve());

delayWait.promise.then(() => {
if (status === 'close') return;

socket = new WebSocket(address, wsco);
if (timeout && timeout > 0) {
const timeoutWait = resolvableWait(timeout);
earlyClose.then(() => timeoutWait.resolve());
timeoutWait.promise.then(() => {
if (status !== 'pending') return;
close(
Error(`Connection didn't open by timeout: ${timeout}ms`),
true,
false
);
});
}

socket.on('open', () => {
if (status !== 'pending') return;
status = 'open';
events$.next({ event: 'open' });
});
socket.on('error', (err) => {
close(err, true, false);
});
socket.on('close', () => {
close(Error(`Connection closed by server`), false, false);
});
socket.on('message', (message) => {
if (status !== 'open') return;
events$.next({ event: 'message', data: message });
});
});

function close(error: Error | null, explicit: boolean, early: boolean): void {
if (status === 'close') return;

status = 'close';
events$.next({ event: 'close', data: error });
events$.complete();
if (!early) earlyClose.resolve({ event: 'close', data: error });
if (socket) {
try {
socket.removeAllListeners();
if (explicit) socket.close();
} catch (err) {}
}
}

const ready = Promise.race([
earlyClose,
events$
.pipe(
filter((x) => x.event === 'open' || x.event === 'close'),
take(1)
)
.toPromise()
]);

return {
get status() {
return status;
},
actions: {
send: (data) => {
return new Promise((resolve, reject) => {
ready
.then((x) => {
if (socket && status === 'open') {
socket.send(data, (err) => (err ? reject(err) : resolve()));
} else {
if (x.event === 'close' && x.data) reject(x.data);
else reject(Error(`Can't request over a closed socket`));
}
})
.catch((err) => reject(err));
});
},
close: () => {
earlyClose.resolve({
event: 'close',
data: Error(`Connection closed by client`)
});
close(null, true, true);
}
},
events$: events$.asObservable()
};
}
2 changes: 2 additions & 0 deletions packages/rpc-client/src/client/connect/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export * from './connect';
export * from './types';
29 changes: 29 additions & 0 deletions packages/rpc-client/src/client/connect/tracker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
export interface Tracker {
request: () => void;
response: () => void;
}

export function createTracker(action: () => void): Tracker {
let hasRun = false;
let request = false;
let response = false;

function run(): void {
if (!request || !response) return;
hasRun = true;
action();
}

return {
request() {
if (hasRun) return;
request = true;
run();
},
response() {
if (hasRun) return;
response = true;
run();
}
};
}
22 changes: 22 additions & 0 deletions packages/rpc-client/src/client/connect/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Observable } from 'rxjs';
import WebSocket from 'isomorphic-ws';
import { RPCClientStatus } from '../types';

export interface Connection {
status: ConnectionStatus;
actions: ConnectionActions;
events$: Observable<ConnectionEvent>;
}

export type ConnectionStatus = RPCClientStatus;

export interface ConnectionActions {
send: (data: string) => Promise<void>;
close: () => void;
}

export type ConnectionEvent =
| { event: 'pending' }
| { event: 'open' }
| { event: 'close'; data: Error | null }
| { event: 'message'; data: WebSocket.Data };
1 change: 1 addition & 0 deletions packages/rpc-client/src/client/helpers/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './resolvable-wait';
24 changes: 24 additions & 0 deletions packages/rpc-client/src/client/helpers/resolvable-wait.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import { deferred } from 'promist';

export interface ResolvableWait {
promise: Promise<void>;
resolve: () => void;
}

export function resolvableWait(ms: number): ResolvableWait {
const promise = deferred<void>();
let timer: null | NodeJS.Timer = setTimeout(resolve, ms);

function resolve(): void {
if (!timer) return;

clearTimeout(timer);
timer = null;
promise.resolve();
}

return {
promise: promise.then((x) => x),
resolve: resolve
};
}

0 comments on commit 6f780bf

Please sign in to comment.