Skip to content

Commit

Permalink
feat(rpc/server): emits an error if a call subscription completes bef…
Browse files Browse the repository at this point in the history
…ore emitting any value
  • Loading branch information
rafamel committed Oct 31, 2019
1 parent 9a3680d commit 46d878c
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 22 deletions.
11 changes: 8 additions & 3 deletions packages/rpc/src/server/RPCServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import {
application,
CollectionTreeDeclaration,
services,
query
query,
collections,
types
} from '@karmic/core';
import { RPCServerOptions, RPCServerConnection } from './types';
import { createDefaults } from './defaults';
Expand All @@ -19,7 +21,10 @@ export class RPCServer {
) {
const opts = Object.assign(createDefaults(), options);
const app = application(
collection,
collections(
collection,
types({ [opts.complete.name]: opts.complete.item })
),
options && options.fallback
? { fallback: options.fallback, children: opts.children }
: { children: opts.children }
Expand All @@ -42,7 +47,7 @@ export class RPCServer {
...app.flatten(':')
},
opts.parser,
createEnsureError(this.declaration)
createEnsureError(this.declaration, opts.complete)
);
}
public connect(connection: RPCServerConnection): () => void {
Expand Down
31 changes: 22 additions & 9 deletions packages/rpc/src/server/ServerManager/ChannelManager.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Subscription, Observable } from 'rxjs';
import { PublicError } from '@karmic/core';
import { RPCNotification, RPCErrorResponse, RPCSingleResponse } from '~/types';
import { getError } from '../errors';
import { getError, EnsureErrorType, GetErrorType } from '../errors';

export class ChannelManager {
private active: { [key: string]: boolean };
private subscriptions: { [key: string]: Subscription };
private ensure: (error: Error) => PublicError;
public constructor(ensure: (error: Error) => PublicError) {
private ensure: (error: EnsureErrorType) => PublicError;
public constructor(ensure: (error: EnsureErrorType) => PublicError) {
this.active = {};
this.subscriptions = {};
this.ensure = ensure;
Expand All @@ -20,7 +20,7 @@ export class ChannelManager {
}
public error(
id: string | number | null,
error: 'ParseError' | 'InvalidRequest' | Error,
error: GetErrorType | Error,
cb: (data: RPCErrorResponse) => void
): void {
cb({
Expand Down Expand Up @@ -67,11 +67,13 @@ export class ChannelManager {
): void {
this.setActive(id, true);

let hasEmitted = false;
this.setSubscription(
id,
source.subscribe({
next: (data: any) => {
if (!this.getActive(id)) return;
hasEmitted = true;
cb({
jsonrpc: '2.0',
id,
Expand All @@ -80,6 +82,7 @@ export class ChannelManager {
},
error: (err: Error) => {
if (!this.getActive(id)) return;
hasEmitted = true;
cb({
jsonrpc: '2.0',
id,
Expand All @@ -89,11 +92,21 @@ export class ChannelManager {
},
complete: () => {
if (!this.getActive(id)) return;
cb({
jsonrpc: '2.0',
method: ':complete',
params: { id }
});
if (hasEmitted) {
cb({
jsonrpc: '2.0',
method: ':complete',
params: { id }
});
} else {
// If no values have been emitted
// before :complete, we emit an error
cb({
jsonrpc: '2.0',
id,
error: getError(this.ensure('EarlyComplete'))
});
}
this.close(id);
}
})
Expand Down
5 changes: 3 additions & 2 deletions packages/rpc/src/server/ServerManager/ServerManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,18 @@ import { DataInput, DataParser } from '~/types';
import { ChannelManager } from './ChannelManager';
import { resolve } from './resolve';
import { LazyPromist, until } from 'promist';
import { EnsureErrorType } from '../errors';

export class ServerManager {
private id: number;
private routes: ApplicationServices;
private parser: DataParser;
private ensure: (error: Error) => PublicError;
private ensure: (error: EnsureErrorType) => PublicError;
private disconnects: { [key: string]: () => void };
public constructor(
routes: ApplicationServices,
parser: DataParser,
ensure: (error: Error) => PublicError
ensure: (error: EnsureErrorType) => PublicError
) {
this.id = 0;
this.routes = routes;
Expand Down
5 changes: 5 additions & 0 deletions packages/rpc/src/server/defaults.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,14 @@
import { RPCServerOptions } from './types';
import { DataOutput, DataInput } from '~/types';
import { error } from '@karmic/core';

export function createDefaults(): Required<Omit<RPCServerOptions, 'fallback'>> {
return {
children: true,
complete: {
name: 'EarlyCompleteError',
item: error({ label: 'ServerError' })
},
parser: {
serialize(data: object): DataOutput {
return JSON.stringify(data);
Expand Down
27 changes: 20 additions & 7 deletions packages/rpc/src/server/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ import {
ErrorLabel,
CollectionError,
CollectionTree,
GeneralError
GeneralError,
ElementItem,
ErrorType
} from '@karmic/core';
import { RPCError } from '~/types';
import { ErrorCodes } from '~/errors';
Expand All @@ -28,20 +30,31 @@ export const hash: { [P in ErrorLabel]: number } = {
ServerTimeout: -32064
};

export type EnsureErrorType = Error | 'Server' | 'EarlyComplete';
export type GetErrorType =
| 'ParseError'
| 'InvalidRequest'
| 'InternalError'
| PublicError;

export function createEnsureError(
collection: CollectionTree
): (error: Error) => PublicError {
collection: CollectionTree,
complete: ElementItem<ErrorType>
): (error: EnsureErrorType) => PublicError {
const id: GeneralError = 'ServerError';
return function ensureError(error) {
return error instanceof PublicError
? error
: new CollectionError(collection, id, error, true);
: new CollectionError(
collection,
error === 'EarlyComplete' ? complete.name : id,
null,
true
);
};
}

export function getError(
err: 'ParseError' | 'InvalidRequest' | PublicError
): RPCError {
export function getError(err: GetErrorType): RPCError {
if (typeof err === 'string') {
const arr = Object.hasOwnProperty.call(ErrorCodes, err)
? ErrorCodes[err]
Expand Down
11 changes: 10 additions & 1 deletion packages/rpc/src/server/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
import { QueryServiceImplementation } from '@karmic/core';
import {
QueryServiceImplementation,
ElementItem,
ErrorType
} from '@karmic/core';
import { DataOutput, DataInput, DataParser } from '~/types';
import { Observable } from 'rxjs';

Expand All @@ -12,6 +16,11 @@ export interface RPCServerOptions {
* Serializer and deserializer
*/
parser?: DataParser;
/**
* An error to emit if a stream completes before emitting any values.
* Defaults to an *EarlyCompleteError* `ServerError`.
*/
complete?: ElementItem<ErrorType>;
/**
* A fallback service for adapters to use when the route is non existent.
* Defaults to a `ClientNotFound` error throwing service.
Expand Down

0 comments on commit 46d878c

Please sign in to comment.