Skip to content

Commit

Permalink
feat(rpc/client): batches requests
Browse files Browse the repository at this point in the history
  • Loading branch information
rafamel committed Nov 1, 2019
1 parent 95ccfff commit e5aa664
Show file tree
Hide file tree
Showing 7 changed files with 37 additions and 8 deletions.
5 changes: 5 additions & 0 deletions packages/rpc/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/rpc/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
"dependencies": {
"@karmic/core": "^0.1.0",
"ajv": "^6.10.2",
"dataloader": "^1.4.0",
"promist": "^2.0.0"
},
"peerDependencies": {
Expand Down
10 changes: 7 additions & 3 deletions packages/rpc/src/client/ClientManager/ClientManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,15 @@ export class ClientManager {
public constructor(
connection: RPCClientConnection,
parser: DataParser,
subscriptionConnectRetry: boolean
subscriptionConnectRetry: boolean,
batchRequests: boolean
) {
this.store = new ClientStore();
this.connection = new ConnectionManager(connection, parser, (data) =>
this.response(data)
this.connection = new ConnectionManager(
connection,
parser,
(data) => this.response(data),
batchRequests
);
this.subscriptionConnectRetry = subscriptionConnectRetry;
this.subscription = this.connection.status$.subscribe((status) => {
Expand Down
21 changes: 17 additions & 4 deletions packages/rpc/src/client/ClientManager/ConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { RPCClientConnection, RPCClientStatus } from '../types';
import { BehaviorSubject, Subject, Observable, Subscription } from 'rxjs';
import { DataInput, DataParser, RPCRequest, RPCNotification } from '~/types';
import { until } from 'promist';
import DataLoader from 'dataloader';

interface ConnectionManagerSubjects {
status: BehaviorSubject<RPCClientStatus>;
Expand All @@ -11,18 +12,29 @@ interface ConnectionManagerSubjects {
export class ConnectionManager {
public errors: Error[];
private connection: RPCClientConnection;
private parser: DataParser;
private loader: DataLoader<RPCRequest | RPCNotification, null | Error>;
private subjects: ConnectionManagerSubjects;
private subscription: Subscription;
public constructor(
connection: RPCClientConnection,
parser: DataParser,
onData: (data: object) => void
onData: (data: object) => void,
batchRequests: boolean
) {
this.errors = [];

this.parser = parser;
this.connection = connection;
this.loader = new DataLoader<RPCRequest | RPCNotification, null | Error>(
async (arr) => {
try {
await this.connection.actions.send(await parser.serialize(arr));
return Array(arr.length).fill(null);
} catch (err) {
return Array(arr.length).fill(err);
}
},
{ batch: batchRequests, cache: false }
);
this.subjects = {
status: new BehaviorSubject<RPCClientStatus>(connection.status),
errors: new Subject<Error>()
Expand Down Expand Up @@ -82,7 +94,8 @@ export class ConnectionManager {
}
public async send(data: RPCRequest | RPCNotification): Promise<void> {
if (this.status === 'complete') return;
await this.connection.actions.send(await this.parser.serialize(data));

await this.loader.load(data);
}
public report(error: Error): void {
if (this.status === 'complete') return;
Expand Down
3 changes: 2 additions & 1 deletion packages/rpc/src/client/RPCClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ export class RPCClient {
this.manager = new ClientManager(
connection,
this.options.parser,
this.options.subscribePolicy !== 'fail'
this.options.subscribePolicy !== 'fail',
this.options.batch
);
}
public get status(): RPCClientStatus {
Expand Down
1 change: 1 addition & 0 deletions packages/rpc/src/client/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { DataInput, DataOutput } from '~/types';

export function createDefaults(): Required<RPCClientOptions> {
return {
batch: true,
responseTimeout: 30000,
subscribePolicy: 'fail',
unsubscribePolicy: 'complete',
Expand Down
4 changes: 4 additions & 0 deletions packages/rpc/src/client/types/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ import { DataParser } from '~/types';
export type RPCClientStatus = RPCClientConnectionStatus | 'complete';

export interface RPCClientOptions {
/**
* Whether to batch requests. Default: `true`.
*/
batch?: boolean;
/**
* Timeout for request's first result when connected in milliseconds. `0` for *infinity.* It will cause all pending requests to fail. Default: `30000`.
*/
Expand Down

0 comments on commit e5aa664

Please sign in to comment.