Skip to content

Commit

Permalink
feat(rpc/client): adds RPCClient
Browse files Browse the repository at this point in the history
  • Loading branch information
rafamel committed Oct 28, 2019
1 parent 2d07c91 commit 21f7a66
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 0 deletions.
66 changes: 66 additions & 0 deletions packages/rpc/src/client/RPCClient/RPCClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { ClientManager } from './ClientManager';
import {
RPCClientConnection,
RPCClientOptions,
RPCClientStatus
} from '../types';
import { createDefaults } from './defaults';
import { Observable } from 'rxjs';
import { handleUnaryRequest } from './handle-unary';
import { handleStreamRequest } from './handle-stream';

export class RPCClient {
private id: number;
private options: Required<RPCClientOptions>;
private manager: ClientManager;
public constructor(
connection: RPCClientConnection,
options?: RPCClientOptions
) {
this.id = 1;
this.options = Object.assign(createDefaults(), options);
this.manager = new ClientManager(
connection,
this.options.parser,
this.options.subscribePolicy !== 'fail'
);
}
public get status(): RPCClientStatus {
return this.manager.status;
}
public get errors(): Error[] {
return this.manager.errors;
}
public get status$(): Observable<RPCClientStatus> {
return this.manager.status$;
}
public get errors$(): Observable<Error> {
return this.manager.errors$;
}
public close(): void {
return this.manager.close();
}
public unary(route: string, data?: any): Promise<any> {
return handleUnaryRequest(
route,
data || null,
() => this.nextId(),
this.manager,
this.options.responseTimeout || 0
);
}
public stream(route: string, data?: any): Observable<any> {
return handleStreamRequest(
route,
data || null,
() => this.nextId(),
this.manager,
this.options.responseTimeout || 0,
this.options.subscribePolicy === 'no-fail',
this.options.unsubscribePolicy || 'complete'
);
}
private nextId(): number | string {
return this.id++;
}
}
261 changes: 261 additions & 0 deletions packages/rpc/src/client/RPCClient/handle-stream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
import { UnsubscribePolicy } from '../types';
import { Observable, Subscription } from 'rxjs';
import { filter, take } from 'rxjs/operators';
import { resolvableWait, ResolvableWait, safeTrigger } from '@karmic/core';
import { Promist, deferred } from 'promist';
import { ClientManager } from './ClientManager';

export const ONLY_FAIL_ON_CLOSE_RETRY_WAIT = 5000;

export function handleStreamRequest(
method: string,
params: object | null,
nextId: () => string | number,
manager: ClientManager,
responseTimeout: number,
subscribeOnlyFailOnClose: boolean,
unsubscribePolicy: UnsubscribePolicy
): Observable<any> {
let count = 0;
let explicitUnsubscribe = false;
let current: null | {
id: string | number;
isUnsubscribed: boolean;
observable: Observable<any>;
} = null;

const create = (): {
id: string | number;
isUnsubscribed: boolean;
observable: Observable<any>;
} => {
let isUnsubscribed = false;
const id = nextId();
const a = manager.stream(id, method, params);
const b = withResponseTimeout(a, manager, responseTimeout, () => {
if (!isUnsubscribed) {
isUnsubscribed = true;
manager.unsubscribe(
id,
Error(`Request reached timeout: ${responseTimeout}ms`)
);
}
});
const c = onlyFailOnClose(
b,
method,
params,
nextId,
manager,
responseTimeout,
subscribeOnlyFailOnClose,
unsubscribePolicy
);
return {
id,
get isUnsubscribed() {
return c.didError || isUnsubscribed;
},
observable: c.observable
};
};

return new Observable((self) => {
if (current === null) {
if (explicitUnsubscribe && unsubscribePolicy === 'complete') {
self.complete();
return;
}
explicitUnsubscribe = false;
current = create();
}

count++;
let done = false;
const subscription = current.observable.subscribe({
next: (value) => {
self.next(value);
},
error: (err) => {
done = true;
self.error(err);
},
complete: () => {
done = true;
self.complete();
}
});

return () => {
count--;
subscription.unsubscribe();
if (count > 0 || done) return;

if (unsubscribePolicy !== 'keep-alive') {
if (current && !current.isUnsubscribed) {
manager.unsubscribe(current.id);
}
explicitUnsubscribe = true;
current = null;
}
};
});
}

export function withResponseTimeout(
observable: Observable<any>,
manager: ClientManager,
timeout: number,
onTimeout?: () => void
): Observable<any> {
if (!timeout) return observable;

let state: null | {
promise: Promist<void, 'deferrable'>;
subscription: Subscription;
} = null;
let timer: null | NodeJS.Timer = null;

let count = 0;
function start(): void {
count++;
if (state) return;

const promise = deferred();
const subscription = manager.status$.subscribe((status) => {
if (status === 'open') {
stop();
timer = setTimeout(() => promise.resolve(), timeout);
} else if (status === 'close') {
stop();
}
});
state = { promise, subscription };
promise.then(() => (timer && onTimeout ? onTimeout() : null));
}
function stop(): void {
if (timer) {
clearTimeout(timer);
timer = null;
}
}
function end(): void {
count--;
if (count <= 0) {
stop();
if (state) {
state.promise.resolve();
state.subscription.unsubscribe();
state = null;
}
}
}

return new Observable((self) => {
start();
let open = true;
const subscription = observable.subscribe({
next: (value) => {
if (!open) return;
stop();
self.next(value);
},
error: (err) => {
if (!open) return;
stop();
self.error(err);
close();
},
complete: () => {
if (!open) return;
stop();
self.complete();
close();
}
});

function close(): void {
open = false;
end();
safeTrigger(
() => Boolean(subscription),
() => subscription.unsubscribe()
);
}

return () => {
if (open) close();
};
});
}

export function onlyFailOnClose(
observable: Observable<any>,
method: string,
params: object | null,
nextId: () => string | number,
manager: ClientManager,
responseTimeout: number,
subscribeOnlyFailOnClose: boolean,
unsubscribePolicy: UnsubscribePolicy
): { didError: boolean; observable: Observable<any> } {
let obs: null | Observable<any> = null;
let sourceDidError = false;

return {
get didError(): boolean {
return sourceDidError;
},
observable: subscribeOnlyFailOnClose
? new Observable((self) => {
let unsubscribed = false;
let retryWait: null | ResolvableWait = null;
let subscription = observable.subscribe({
next: (value) => self.next(value),
error: (err) => {
sourceDidError = true;
if (manager.status === 'complete') return self.error(err);

manager.report(err);
retryWait = resolvableWait(ONLY_FAIL_ON_CLOSE_RETRY_WAIT);
retryWait.promise.then(() => {
if (unsubscribed) return;
if (manager.status === 'complete') return self.error(err);

if (!obs) {
obs = handleStreamRequest(
method,
params,
nextId,
manager,
responseTimeout,
subscribeOnlyFailOnClose,
unsubscribePolicy
);
}
subscription.unsubscribe();
subscription = obs.subscribe(self);
});
},
complete: () => self.complete()
});

manager.status$
.pipe(
filter((x) => x === 'complete'),
take(1)
)
.toPromise()
.then(() => {
if (retryWait) retryWait.resolve();
});

return () => {
unsubscribed = true;
subscription.unsubscribe();
if (retryWait) retryWait.resolve();
};
})
: observable
};
}
29 changes: 29 additions & 0 deletions packages/rpc/src/client/RPCClient/handle-unary.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { resolvableWait } from '@karmic/core';
import { ClientManager } from './ClientManager';

export function handleUnaryRequest(
method: string,
params: object | null,
nextId: () => string | number,
manager: ClientManager,
responseTimeout: number
): Promise<any> {
const promise = manager.unary(nextId(), method, params);
if (!responseTimeout) return promise;

const timeoutWait = resolvableWait(responseTimeout);
return Promise.race([
timeoutWait.promise.then(() =>
Promise.reject(Error(`Request reached timeout: ${responseTimeout}ms`))
),
promise
])
.then((value) => {
timeoutWait.resolve();
return value;
})
.catch((err) => {
timeoutWait.resolve();
return Promise.reject(err);
});
}
1 change: 1 addition & 0 deletions packages/rpc/src/client/RPCClient/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export * from './RPCClient';
1 change: 1 addition & 0 deletions packages/rpc/src/client/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
export * from './RPCClient';
export * from './types';

0 comments on commit 21f7a66

Please sign in to comment.