Skip to content

Commit

Permalink
feat(rpc-client): adds client
Browse files Browse the repository at this point in the history
  • Loading branch information
rafamel committed Oct 25, 2019
1 parent 045954b commit 6c17da7
Show file tree
Hide file tree
Showing 6 changed files with 374 additions and 0 deletions.
80 changes: 80 additions & 0 deletions packages/rpc-client/src/client/RPCClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
import { Observable } from 'rxjs';
import WebSocket from 'isomorphic-ws';
import { RPCClientStatus, RPCClientOptions } from './types';
import { ClientManager } from './client-manager';
import { defaultOptions } from './defaults';
import { handleUnaryRequest, handleStreamRequest } from './helpers';

export default class RPCClient {
private id: number;
private options: Required<RPCClientOptions>;
private manager: ClientManager;
constructor(
address: string,
wsco?: WebSocket.ClientOptions | null,
options?: RPCClientOptions | null
) {
this.id = 1;
this.options = defaultOptions(options);
this.manager = new ClientManager(
address,
wsco || {},
this.options.attempts,
this.options.timeouts.connect || 0,
this.options.timeouts.response || 0,
this.options.policies.subscribe !== 'fail'
);
}
public get status(): RPCClientStatus {
return this.manager.status;
}
public get errors(): Error[] {
return this.manager.errors;
}
public get status$(): Observable<RPCClientStatus> {
return this.manager.status$;
}
public get errors$(): Observable<Error> {
return this.manager.errors$;
}
public close(): void {
return this.manager.close();
}
public query(route: string, data?: any): Promise<any> {
return handleUnaryRequest(
{
id: this.nextId(),
action: 'query',
route,
data
},
this.manager,
this.options.timeouts.response || 0
);
}
public mutation(route: string, data?: any): Promise<any> {
return handleUnaryRequest(
{
id: this.nextId(),
action: 'mutate',
route,
data
},
this.manager,
this.options.timeouts.response || 0
);
}
public subscription(route: string, data?: any): Observable<any> {
return handleStreamRequest(
{ route, data },
this.manager,
this.options.timeouts.response || 0,
this.options.policies.subscribe === 'no-fail',
this.options.policies.unsubscribe || 'complete',
() => this.nextId()
);
}
private nextId(): string {
return String(this.id++);
}
}
260 changes: 260 additions & 0 deletions packages/rpc-client/src/client/helpers/handle-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
import { UnsubscribePolicy } from '../types';
import { RPCSubscribeRequest } from '@karmic/rpc-adapter';
import { ClientManager } from '../client-manager';
import { Observable, Subscription } from 'rxjs';
import { filter, take } from 'rxjs/operators';
import { resolvableWait, ResolvableWait } from './resolvable-wait';
import { Promist, deferred } from 'promist';

export const ONLY_FAIL_ON_CLOSE_RETRY_WAIT = 5000;

export function handleStreamRequest(
partial: Pick<RPCSubscribeRequest, 'route' | 'data'>,
manager: ClientManager,
responseTimeout: number,
subscribeOnlyFailOnClose: boolean,
unsubscribePolicy: UnsubscribePolicy,
nextId: () => string
): Observable<any> {
let count = 0;
let explicitUnsubscribe = false;
let current: null | {
id: string;
isUnsubscribed: boolean;
observable: Observable<any>;
} = null;

const create = (): {
id: string;
isUnsubscribed: boolean;
observable: Observable<any>;
} => {
let isUnsubscribed = false;
const id = nextId();
const a = manager.stream({
id,
action: 'subscribe',
route: partial.route,
data: partial.data
});
const b = withResponseTimeout(a, manager, responseTimeout, () => {
if (!isUnsubscribed) {
isUnsubscribed = true;
manager.unsubscribe(
{ id, action: 'unsubscribe' },
Error(`Request reached timeout: ${responseTimeout}ms`)
);
}
});
const c = onlyFailOnClose(
b,
partial,
manager,
responseTimeout,
subscribeOnlyFailOnClose,
unsubscribePolicy,
nextId
);
return {
id,
get isUnsubscribed() {
return c.didError || isUnsubscribed;
},
observable: c.observable
};
};

return new Observable((self) => {
if (current === null) {
if (explicitUnsubscribe && unsubscribePolicy === 'complete') {
self.complete();
return;
}
explicitUnsubscribe = false;
current = create();
}

count++;
let done = false;
const subscription = current.observable.subscribe({
next: (value) => {
self.next(value);
},
error: (err) => {
done = true;
self.error(err);
},
complete: () => {
done = true;
self.complete();
}
});

return () => {
count--;
subscription.unsubscribe();
if (count > 0 || done) return;

if (unsubscribePolicy !== 'keep-alive') {
if (current && !current.isUnsubscribed) {
manager.unsubscribe({ id: current.id, action: 'unsubscribe' });
}
explicitUnsubscribe = true;
current = null;
}
};
});
}

export function withResponseTimeout(
observable: Observable<any>,
manager: ClientManager,
timeout: number,
onTimeout?: () => void
): Observable<any> {
if (!timeout) return observable;

let state: null | {
promise: Promist<void, 'deferrable'>;
subscription: Subscription;
} = null;
let timer: null | NodeJS.Timer = null;

let count = 0;
function start(): void {
count++;
if (state) return;

const promise = deferred();
const subscription = manager.status$.subscribe((status) => {
if (status === 'open') {
stop();
timer = setTimeout(() => promise.resolve(), timeout);
} else if (status === 'close') {
stop();
}
});
state = { promise, subscription };
promise.then(() => (timer && onTimeout ? onTimeout() : null));
}
function stop(): void {
if (timer) {
clearTimeout(timer);
timer = null;
}
}
function end(): void {
count--;
if (count <= 0) {
stop();
if (state) {
state.promise.resolve();
state.subscription.unsubscribe();
state = null;
}
}
}

return new Observable((self) => {
start();
let open = true;
const subscription = observable.subscribe({
next: (value) => {
if (!open) return;
stop();
self.next(value);
},
error: (err) => {
if (!open) return;
stop();
self.error(err);
close();
},
complete: () => {
if (!open) return;
stop();
self.complete();
close();
}
});

function close(): void {
open = false;
end();
setTimeout(() => subscription.unsubscribe(), 0);
}

return () => {
if (open) close();
};
});
}

export function onlyFailOnClose(
observable: Observable<any>,
partial: Pick<RPCSubscribeRequest, 'route' | 'data'>,
manager: ClientManager,
responseTimeout: number,
subscribeOnlyFailOnClose: boolean,
unsubscribePolicy: UnsubscribePolicy,
nextId: () => string
): { didError: boolean; observable: Observable<any> } {
let obs: null | Observable<any> = null;
let sourceDidError = false;

return {
get didError(): boolean {
return sourceDidError;
},
observable: subscribeOnlyFailOnClose
? new Observable((self) => {
let unsubscribed = false;
let retryWait: null | ResolvableWait = null;
let subscription = observable.subscribe({
next: (value) => self.next(value),
error: (err) => {
sourceDidError = true;
if (manager.status === 'complete') return self.error(err);

manager.report(err);
retryWait = resolvableWait(ONLY_FAIL_ON_CLOSE_RETRY_WAIT);
retryWait.promise.then(() => {
if (unsubscribed) return;
if (manager.status === 'complete') return self.error(err);

if (!obs) {
obs = handleStreamRequest(
partial,
manager,
responseTimeout,
subscribeOnlyFailOnClose,
unsubscribePolicy,
nextId
);
}
subscription.unsubscribe();
subscription = obs.subscribe(self);
});
},
complete: () => self.complete()
});

manager.status$
.pipe(
filter((x) => x === 'complete'),
take(1)
)
.toPromise()
.then(() => {
if (retryWait) retryWait.resolve();
});

return () => {
unsubscribed = true;
subscription.unsubscribe();
if (retryWait) retryWait.resolve();
};
})
: observable
};
}
28 changes: 28 additions & 0 deletions packages/rpc-client/src/client/helpers/handle-unary.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import { RPCUnaryRequest } from '@karmic/rpc-adapter';
import { ClientManager } from '../client-manager';
import { resolvableWait } from './resolvable-wait';

export function handleUnaryRequest(
request: RPCUnaryRequest,
manager: ClientManager,
responseTimeout: number
): Promise<any> {
const promise = manager.unary(request);
if (!responseTimeout) return promise;

const timeoutWait = resolvableWait(responseTimeout);
return Promise.race([
timeoutWait.promise.then(() =>
Promise.reject(Error(`Request reached timeout: ${responseTimeout}ms`))
),
promise
])
.then((value) => {
timeoutWait.resolve();
return value;
})
.catch((err) => {
timeoutWait.resolve();
return Promise.reject(err);
});
}
2 changes: 2 additions & 0 deletions packages/rpc-client/src/client/helpers/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
export * from './handle-unary';
export * from './handle-stream';
export * from './resolvable-wait';
2 changes: 2 additions & 0 deletions packages/rpc-client/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { default } from './RPCClient';
export * from './types';
2 changes: 2 additions & 0 deletions packages/rpc-client/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { default } from './client';
export * from './client';

0 comments on commit 6c17da7

Please sign in to comment.