Skip to content

Commit

Permalink
feat(rpc/server): adds ChannelManager
Browse files Browse the repository at this point in the history
  • Loading branch information
rafamel committed Oct 28, 2019
1 parent 8f12562 commit 81daabc
Showing 1 changed file with 143 additions and 0 deletions.
143 changes: 143 additions & 0 deletions packages/rpc/src/server/ChannelManager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import { Subscription, Observable } from 'rxjs';
import { PublicError } from '@karmic/core';
import { RPCNotification, RPCErrorResponse, RPCSingleResponse } from '~/types';
import { getError } from './errors';

export class ChannelManager {
private active: { [key: string]: boolean };
private subscriptions: { [key: string]: Subscription };
private ensure: (error: Error) => PublicError;
public constructor(ensure: (error: Error) => PublicError) {
this.active = {};
this.subscriptions = {};
this.ensure = ensure;
}
public exists(id: string | number): boolean {
return Object.hasOwnProperty.call(this.active, this.toStringId(id));
}
public isSubscription(id: string | number): boolean {
return Boolean(this.getSubscription(id));
}
public error(
id: string | number | null,
error: 'ParseError' | 'InvalidRequest' | Error,
cb: (data: RPCErrorResponse) => void
): void {
cb({
jsonrpc: '2.0',
id,
error: getError(typeof error === 'string' ? error : this.ensure(error))
});
if (typeof id !== 'object') {
this.setActive(id, true);
this.close(id);
}
}
public unary<T>(
id: string | number,
source: Promise<T>,
cb: (data: RPCSingleResponse) => void
): void {
this.setActive(id, true);

source
.then((data) => {
if (!this.getActive(id)) return;
cb({
jsonrpc: '2.0',
id,
result: data
});
this.close(id);
})
.catch((err) => {
if (!this.getActive(id)) return;
cb({
jsonrpc: '2.0',
id,
error: getError(this.ensure(err))
});
this.close(id);
});
}
public stream<T>(
id: string | number,
source: Observable<T>,
cb: (data: RPCSingleResponse | RPCNotification) => void
): void {
this.setActive(id, true);

this.setSubscription(
id,
source.subscribe({
next: (data: any) => {
if (!this.getActive(id)) return;
cb({
jsonrpc: '2.0',
id,
result: data
});
},
error: (err: Error) => {
if (!this.getActive(id)) return;
cb({
jsonrpc: '2.0',
id,
error: getError(this.ensure(err))
});
this.close(id);
},
complete: () => {
if (!this.getActive(id)) return;
cb({
jsonrpc: '2.0',
method: ':complete',
params: { id }
});
this.close(id);
}
})
);
}
public close(id: string | number): void {
if (!this.getActive(id)) return;

this.setActive(id, false);
const subscription = this.getSubscription(id);
if (subscription) {
subscription.unsubscribe();
this.setSubscription(id, null);
}
}
public destroy(): void {
for (const id of Object.keys(this.active)) {
this.close(this.fromStringId(id));
}
}
private getActive(id: string | number): boolean {
return this.active[this.toStringId(id)] || false;
}
private setActive(id: string | number, active: boolean): void {
this.active[this.toStringId(id)] = active;
}
private getSubscription(id: string | number): Subscription | null {
return this.subscriptions[this.toStringId(id)] || null;
}
private setSubscription(
id: string | number,
subscription: Subscription | null
): void {
if (subscription) {
this.subscriptions[this.toStringId(id)] = subscription;
} else {
delete this.subscriptions[this.toStringId(id)];
}
}
private toStringId(id: string | number): string {
return `${typeof id}:${id}`;
}
private fromStringId(id: string): string | number {
const [type, ...str] = id.split(':');
return type === 'number' ? parseInt(str[0]) : str.join(':');
}
}

0 comments on commit 81daabc

Please sign in to comment.