Skip to content

Commit

Permalink
feat(rpc/server): takes in stream batch requests
Browse files Browse the repository at this point in the history
  • Loading branch information
rafamel committed Oct 31, 2019
1 parent 46d878c commit 00a7bc7
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 73 deletions.
4 changes: 2 additions & 2 deletions packages/rpc/src/server/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {
import { RPCServerOptions, RPCServerConnection } from './types';
import { createDefaults } from './defaults';
import { ServerManager } from './ServerManager';
import { createEnsureError } from './errors';
import { createErrorProvider } from './errors';

export class RPCServer {
public declaration: CollectionTreeDeclaration;
Expand Down Expand Up @@ -47,7 +47,7 @@ export class RPCServer {
...app.flatten(':')
},
opts.parser,
createEnsureError(this.declaration, opts.complete)
createErrorProvider(this.declaration, opts.complete)
);
}
public connect(connection: RPCServerConnection): () => void {
Expand Down
26 changes: 15 additions & 11 deletions packages/rpc/src/server/ServerManager/ChannelManager.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import { Subscription, Observable } from 'rxjs';
import { PublicError } from '@karmic/core';
import { RPCNotification, RPCErrorResponse, RPCSingleResponse } from '~/types';
import { getError, EnsureErrorType, GetErrorType } from '../errors';
import {
RPCNotification,
RPCErrorResponse,
RPCSingleResponse,
RPCError
} from '~/types';
import { ErrorProvider } from '../errors';

export class ChannelManager {
private active: { [key: string]: boolean };
private subscriptions: { [key: string]: Subscription };
private ensure: (error: EnsureErrorType) => PublicError;
public constructor(ensure: (error: EnsureErrorType) => PublicError) {
private errors: ErrorProvider;
public constructor(errors: ErrorProvider) {
this.active = {};
this.subscriptions = {};
this.ensure = ensure;
this.errors = errors;
}
public exists(id: string | number): boolean {
return Object.hasOwnProperty.call(this.active, this.toStringId(id));
Expand All @@ -20,13 +24,13 @@ export class ChannelManager {
}
public error(
id: string | number | null,
error: GetErrorType | Error,
provide: (errors: ErrorProvider) => RPCError,
cb: (data: RPCErrorResponse) => void
): void {
cb({
jsonrpc: '2.0',
id,
error: getError(typeof error === 'string' ? error : this.ensure(error))
error: provide(this.errors)
});
if (typeof id !== 'object') {
this.setActive(id, true);
Expand Down Expand Up @@ -55,7 +59,7 @@ export class ChannelManager {
cb({
jsonrpc: '2.0',
id,
error: getError(this.ensure(err))
error: this.errors.core(err)
});
this.close(id);
});
Expand Down Expand Up @@ -86,7 +90,7 @@ export class ChannelManager {
cb({
jsonrpc: '2.0',
id,
error: getError(this.ensure(err))
error: this.errors.core(err)
});
this.close(id);
},
Expand All @@ -104,7 +108,7 @@ export class ChannelManager {
cb({
jsonrpc: '2.0',
id,
error: getError(this.ensure('EarlyComplete'))
error: this.errors.core('EarlyComplete')
});
}
this.close(id);
Expand Down
12 changes: 6 additions & 6 deletions packages/rpc/src/server/ServerManager/ServerManager.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
import { ApplicationServices, PublicError } from '@karmic/core';
import { ApplicationServices } from '@karmic/core';
import { RPCServerConnection } from '../types';
import { DataInput, DataParser } from '~/types';
import { ChannelManager } from './ChannelManager';
import { resolve } from './resolve';
import { LazyPromist, until } from 'promist';
import { EnsureErrorType } from '../errors';
import { ErrorProvider } from '../errors';

export class ServerManager {
private id: number;
private routes: ApplicationServices;
private parser: DataParser;
private ensure: (error: EnsureErrorType) => PublicError;
private errors: ErrorProvider;
private disconnects: { [key: string]: () => void };
public constructor(
routes: ApplicationServices,
parser: DataParser,
ensure: (error: EnsureErrorType) => PublicError
errors: ErrorProvider
) {
this.id = 0;
this.routes = routes;
this.parser = parser;
this.ensure = ensure;
this.errors = errors;
this.disconnects = {};
}
public connect(connection: RPCServerConnection): () => void {
const disconnects = this.disconnects;
const channels = new ChannelManager(this.ensure);
const channels = new ChannelManager(this.errors);
const context = LazyPromist.from(connection.context || (() => ({})));

let open = true;
Expand Down
2 changes: 1 addition & 1 deletion packages/rpc/src/server/ServerManager/resolve/handlers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ export function handleStream(

const service = routes[request.method];
if (!isServiceSubscription(service.declaration)) {
return channels.error(request.id, 'InvalidRequest', cb);
return channels.error(request.id, ({ spec }) => spec('InvalidRequest'), cb);
}

return channels.stream(
Expand Down
4 changes: 2 additions & 2 deletions packages/rpc/src/server/ServerManager/resolve/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,12 @@ export function resolve(
.catch((err: Error) => {
return channels.error(
hasNonNullId(request) ? request.id : null,
err,
({ core }) => core(err),
send
);
});
})
.catch(() => {
return channels.error(null, 'ParseError', send);
return channels.error(null, ({ spec }) => spec('ParseError'), send);
});
}
50 changes: 39 additions & 11 deletions packages/rpc/src/server/ServerManager/resolve/resolve.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { RPCResponse, RPCSingleResponse, RPCNotification } from '~/types';
import { handleNotification, handleUnary, handleStream } from './handlers';
import { validate } from '~/validate';
import { isValidRequest, hasNonNullId } from './helpers';
import { Promist } from 'promist';

export function resolveSingle(
request: object,
Expand All @@ -18,7 +19,7 @@ export function resolveSingle(

if (isValidRequest(request)) {
if (channels.exists(request.id)) {
return channels.error(null, 'InvalidRequest', send);
return channels.error(null, ({ spec }) => spec('InvalidRequest'), send);
}

return request.stream
Expand All @@ -28,7 +29,7 @@ export function resolveSingle(

return channels.error(
hasNonNullId(request) ? request.id : null,
'InvalidRequest',
({ spec }) => spec('InvalidRequest'),
send
);
}
Expand All @@ -38,32 +39,57 @@ export async function resolveBatch(
context: any,
channels: ChannelManager,
routes: ApplicationServices,
send: (data: RPCResponse) => void
send: (data: RPCResponse | RPCNotification) => void
): Promise<void> {
if (!requests.length) {
return channels.error(null, 'InvalidRequest', send);
return channels.error(null, ({ spec }) => spec('InvalidRequest'), send);
}

const promises: Array<Promise<RPCSingleResponse>> = [];
const promist = new Promist<void>();
let onall = Promise.resolve(promist);

for (const request of requests) {
if (validate.notification(request)) {
handleNotification(request, channels);
} else {
promises.push(
new Promise((resolve: (data: RPCSingleResponse) => void, reject) => {
new Promise<RPCSingleResponse>((resolve, reject) => {
try {
if (!isValidRequest(request) || request.stream) {
if (!isValidRequest(request)) {
return channels.error(
hasNonNullId(request) ? request.id : null,
'InvalidRequest',
({ spec }) => spec('InvalidRequest'),
resolve
);
}
if (channels.exists(request.id)) {
return channels.error(
null,
({ spec }) => spec('InvalidRequest'),
resolve
);
}
if (!request.stream) {
return handleUnary(request, context, channels, routes, resolve);
}

return channels.exists(request.id)
? channels.error(null, 'InvalidRequest', resolve)
: handleUnary(request, context, channels, routes, resolve);
let first = true;
handleStream(request, context, channels, routes, (data) => {
if (first) {
first = false;
return validate.notification(data)
? // :complete should never be the first response for a stream
channels.error(
request.id,
({ spec }) => spec('InternalError'),
resolve
)
: resolve(data);
} else {
onall = onall.then(() => send(data));
}
});
} catch (err) {
reject(err);
}
Expand All @@ -72,5 +98,7 @@ export async function resolveBatch(
}
}

await Promise.all(promises).then((arr) => send(arr));
await Promise.all(promises)
.then((arr) => send(arr))
.then(() => promist.resolve());
}
73 changes: 38 additions & 35 deletions packages/rpc/src/server/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import {
ErrorLabel,
CollectionError,
CollectionTree,
GeneralError,
ElementItem,
ErrorType
} from '@karmic/core';
Expand All @@ -30,55 +29,59 @@ export const hash: { [P in ErrorLabel]: number } = {
ServerTimeout: -32064
};

export type EnsureErrorType = Error | 'Server' | 'EarlyComplete';
export type GetErrorType =
export interface ErrorProvider {
core: (error: PublicErrorProviderType) => RPCError;
spec: (name: RPCErrorProviderType) => RPCError;
}

export type PublicErrorProviderType = Error | 'Server' | 'EarlyComplete';

export type RPCErrorProviderType =
| 'ParseError'
| 'InvalidRequest'
| 'InternalError'
| PublicError;
| 'InternalError';

export function createEnsureError(
export function createErrorProvider(
collection: CollectionTree,
complete: ElementItem<ErrorType>
): (error: EnsureErrorType) => PublicError {
const id: GeneralError = 'ServerError';
return function ensureError(error) {
): ErrorProvider {
function ensure(error: PublicErrorProviderType): PublicError {
return error instanceof PublicError
? error
: new CollectionError(
collection,
error === 'EarlyComplete' ? complete.name : id,
error === 'EarlyComplete' ? complete.name : 'ServerError',
null,
true
);
};
}

export function getError(err: GetErrorType): RPCError {
if (typeof err === 'string') {
const arr = Object.hasOwnProperty.call(ErrorCodes, err)
? ErrorCodes[err]
: ErrorCodes.InternalError;
return {
code: arr[0],
message: arr[1]
};
}

if (!Object.hasOwnProperty.call(hash, err.label)) {
return {
code: ErrorCodes.InternalError[0],
message: ErrorCodes.InternalError[1]
};
}

return {
code: hash[err.label],
message: `Server implementation specific error: ${err.label}`,
data: {
id: err.id,
label: err.label,
description: err.message
core(error: PublicErrorProviderType): RPCError {
const err = ensure(error);
return Object.hasOwnProperty.call(hash, err.label)
? {
code: hash[err.label],
message: `Server implementation specific error: ${err.label}`,
data: {
id: err.id,
label: err.label,
description: err.message
}
}
: {
code: ErrorCodes.InternalError[0],
message: ErrorCodes.InternalError[1]
};
},
spec(error: RPCErrorProviderType): RPCError {
const arr = Object.hasOwnProperty.call(ErrorCodes, error)
? ErrorCodes[error]
: ErrorCodes.InternalError;
return {
code: arr[0],
message: arr[1]
};
}
};
}
1 change: 0 additions & 1 deletion packages/rpc/src/server/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,2 @@
// TODO allow stream on batch calls
export * from './RPCServer';
export * from './types';
5 changes: 1 addition & 4 deletions packages/rpc/src/types/protocol/request.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { RPCSpecRequest } from './specification';
import { RPCNotification } from './notification';

export type RPCBatchRequest = Array<RPCUnaryRequest | RPCNotification>;
export type RPCBatchRequest = Array<RPCRequest | RPCNotification>;

/**
* An extension for the *request object* as specified by the
Expand Down Expand Up @@ -32,9 +32,6 @@ export interface RPCRequest extends RPCSpecRequest {
stream?: boolean;
}

/**
* Streaming requests **must not** be part of a batch request.
*/
export interface RPCUnaryRequest extends RPCRequest {
stream?: false;
}
Expand Down

0 comments on commit 00a7bc7

Please sign in to comment.