Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e8fc686
feat(reactive-rpc): 🎸 add typed rpc client interface
streamich Nov 17, 2023
74d2b14
fix(reactive-rpc): 🐛 sequence can be negative
streamich Nov 18, 2023
413359f
fix(reactive-rpc): 🐛 do not swallow socket send errors
streamich Nov 18, 2023
4fea477
fix(reactive-rpc): 🐛 use global JSON Type system for RPC types
streamich Nov 18, 2023
6827a5a
feat(reactive-rpc): 🎸 improve how 404 method is reported
streamich Nov 18, 2023
1cc819a
feat(reactive-rpc): 🎸 wrap in try/catch websocket routes
streamich Nov 18, 2023
a7cc7c1
feat(reactive-rpc): 🎸 improve error handling
streamich Nov 18, 2023
7a88aae
style(reactive-rpc): 💄 run Prettier
streamich Nov 18, 2023
a79fe86
test: 💍 fix tests
streamich Nov 18, 2023
6cc7182
feat(reactive-rpc): 🎸 improve auth token ingestion specifier
streamich Nov 18, 2023
5907992
feat(reactive-rpc): 🎸 allow user to provide authentication token
streamich Nov 18, 2023
fd65e49
style(reactive-rpc): 💄 run Prettier
streamich Nov 18, 2023
c88732e
fix(reactive-rpc): 🐛 copy websocket message payload
streamich Nov 18, 2023
d79083c
feat(reactive-rpc): 🎸 add blocks.history method
streamich Nov 18, 2023
2353693
style(reactive-rpc): 💄 run Prettier
streamich Nov 18, 2023
eb5bc7f
test(reactive-rpc): 💍 setup emulated caller
streamich Nov 18, 2023
7f029fb
test(reactive-rpc): 💍 create E2E testing client
streamich Nov 18, 2023
340c749
style(reactive-rpc): 💄 run Prettier
streamich Nov 18, 2023
070c4c9
feat(reactive-rpc): 🎸 allow to specify IP and token
streamich Nov 18, 2023
33aed5c
feat(reactive-rpc): 🎸 add type safety to emulated E2E client
streamich Nov 18, 2023
65829f9
test(reactive-rpc): 💍 use client.* in blocks.* tests
streamich Nov 18, 2023
5e11985
test(reactive-rpc): 💍 move all demo tests to use E2E client implement…
streamich Nov 18, 2023
6b28610
style(reactive-rpc): 💄 run Prettier
streamich Nov 18, 2023
34368c4
fix(reactive-rpc): 🐛 correct initialization after refactor
streamich Nov 18, 2023
7b14376
feat(reactive-rpc): 🎸 remove old blocks after some period of time
streamich Nov 18, 2023
b3ee10c
test(reactive-rpc): 💍 remove of() helper usage
streamich Nov 18, 2023
2f258ab
test(reactive-rpc): 💍 remove message latency and reordering
streamich Nov 18, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions src/reactive-rpc/browser/createBinaryWsRpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import {RpcPersistentClient, WebSocketChannel} from '../common';
import {RpcCodec} from '../common/codec/RpcCodec';
import {BinaryRpcMessageCodec} from '../common/codec/binary';

export const createBinaryWsRpcClient = (url: string) => {
/**
* Constructs a JSON Reactive RPC client.
* @param url RPC endpoint.
* @param token Authentication token.
* @returns An RPC client.
*/
export const createBinaryWsRpcClient = (url: string, token: string) => {
const writer = new Writer(1024 * 4);
const msg = new BinaryRpcMessageCodec();
const req = new CborJsonValueCodec(writer);
Expand All @@ -14,7 +20,7 @@ export const createBinaryWsRpcClient = (url: string) => {
channel: {
newChannel: () =>
new WebSocketChannel({
newSocket: () => new WebSocket(url, [codec.specifier()]),
newSocket: () => new WebSocket(url, [codec.specifier(), token]),
}),
},
});
Expand Down
10 changes: 8 additions & 2 deletions src/reactive-rpc/browser/createJsonWsRpcClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,13 @@ import {RpcPersistentClient, WebSocketChannel} from '../common';
import {RpcCodec} from '../common/codec/RpcCodec';
import {CompactRpcMessageCodec} from '../common/codec/compact';

export const createJsonWsRpcClient = (url: string) => {
/**
* Constructs a JSON Reactive RPC client.
* @param url RPC endpoint.
* @param token Authentication token.
* @returns An RPC client.
*/
export const createJsonWsRpcClient = (url: string, token: string) => {
const writer = new Writer(1024 * 4);
const msg = new CompactRpcMessageCodec();
const req = new JsonJsonValueCodec(writer);
Expand All @@ -14,7 +20,7 @@ export const createJsonWsRpcClient = (url: string) => {
channel: {
newChannel: () =>
new WebSocketChannel({
newSocket: () => new WebSocket(url, [codec.specifier()]),
newSocket: () => new WebSocket(url, [codec.specifier(), token]),
}),
},
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ for (const jsonCodec of codecList) {
});

test('Response Error typed', () => {
const value = RpcError.internalErrorValue();
const value = RpcError.internalErrorValue(null);
const message = new ResponseErrorMessage(123, value);
const encoded = codec.encode(jsonCodec, [message]);
const decoded1 = jsonCodec.decoder.read(encoded);
Expand Down
2 changes: 1 addition & 1 deletion src/reactive-rpc/common/rpc/RpcMessageBatchProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ export class RpcMessageBatchProcessor<Ctx = unknown> {
}
return result;
} catch (error) {
const value = RpcError.internalErrorValue();
const value = RpcError.internalErrorValue(error);
return [new msg.ResponseErrorMessage(-1, value)];
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const setup = (
isStreaming: false,
call: async () => {
// tslint:disable-next-line:no-string-throw
throw RpcError.internal('this promise can throw');
throw RpcError.internal(null, 'this promise can throw');
},
},
promiseDelay: {
Expand All @@ -64,7 +64,7 @@ const setup = (
error: {
isStreaming: false,
call: async () => {
throw RpcError.internal('this promise can throw');
throw RpcError.internal(null, 'this promise can throw');
},
},
emitOnceSync: {
Expand Down Expand Up @@ -635,7 +635,7 @@ describe('pre-call checks', () => {

test('fails call when pre-call checks fail', async () => {
const onPreCall = jest.fn(async (request) => {
throw RpcError.internal('fail...');
throw RpcError.internal(null, 'fail...');
});
const {server, send} = setup(
{},
Expand Down
4 changes: 2 additions & 2 deletions src/reactive-rpc/common/rpc/__tests__/sample-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ const double: IStaticRpcMethod<object, {num: number}, {num: number}> = {
const error: IStaticRpcMethod<object, void, void> = {
isStreaming: false,
call: async () => {
throw new RpcError('this promise can throw', '', 0, '', undefined);
throw new RpcError('this promise can throw', '', 0, '', undefined, undefined);
},
};

Expand All @@ -96,7 +96,7 @@ const streamError: IStreamingRpcMethod<object, void, void> = {
call$: () =>
from(
(async () => {
throw RpcError.internal('Stream always errors');
throw RpcError.internal(null, 'Stream always errors');
})(),
),
};
Expand Down
4 changes: 2 additions & 2 deletions src/reactive-rpc/common/rpc/caller/RpcCaller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ export class RpcCaller<Ctx = unknown> {
* @returns Response data.
*/
public async call(name: string, request: unknown, ctx: Ctx): Promise<Value<unknown>> {
const method = this.getMethodStrict(name);
this.validate(method, request);
try {
const method = this.getMethodStrict(name);
this.validate(method, request);
const preCall = method.onPreCall;
if (preCall) await preCall(ctx, request);
const data = await method.call(request, ctx);
Expand Down
6 changes: 2 additions & 4 deletions src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,11 @@ export class TypeRouterCaller<Router extends TypeRouter<any>, Ctx = unknown> ext
public readonly req: {[K in keyof Routes<Router>]: MethodReq<Routes<Router>[K]>} = null as any;
public readonly res: {[K in keyof Routes<Router>]: MethodRes<Routes<Router>[K]>} = null as any;

public get<K extends keyof Routes<Router>>(id: K): MethodDefinition<Ctx, Routes<Router>[K]> {
public get<K extends keyof Routes<Router>>(id: K): MethodDefinition<Ctx, Routes<Router>[K]> | undefined {
let method = this.methods.get(id as string) as any;
if (method) return method;
const fn = this.router.routes[id as string];
// TODO: do this check without relying on constructor and importing the `FunctionType` class.
if (!fn || !(fn instanceof FunctionType || fn instanceof FunctionStreamingType))
throw RpcError.valueFromCode(RpcErrorCodes.METHOD_NOT_FOUND, `Type [alias = ${id as string}] is not a function.`);
if (!fn || !(fn instanceof FunctionType || fn instanceof FunctionStreamingType)) return undefined;
const validator = fn.req.validator('object');
const requestSchema = (fn.req as AbstractType<Schema>).getSchema();
const isRequestVoid = requestSchema.__t === 'const' && requestSchema.value === undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ describe('static calls', () => {
},
});
const [, error] = await of(caller.call('test', {}, {}));
expect(error).toEqual(RpcError.internalErrorValue());
expect(error).toEqual(RpcError.internalErrorValue(null));
});
});

Expand Down Expand Up @@ -95,10 +95,10 @@ describe('streaming calls', () => {
});

const [, error1] = await of(caller.call('test', {}, {}));
expect(error1).toEqual(RpcError.internalErrorValue());
expect(error1).toEqual(RpcError.internalErrorValue(null));

const [, error2] = await of(Rx.firstValueFrom(caller.call$('test', Rx.of(undefined), {})));
expect(error2).toEqual(RpcError.internalErrorValue());
expect(error2).toEqual(RpcError.internalErrorValue(null));
});
});

Expand Down
22 changes: 14 additions & 8 deletions src/reactive-rpc/common/rpc/caller/error/RpcError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ export type RpcErrorValue = Value<RpcError>;
export class RpcError extends Error implements IRpcError {
public static from(error: unknown) {
if (error instanceof RpcError) return error;
return RpcError.internal();
return RpcError.internal(error);
}

public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined): RpcError {
public static fromCode(
errno: RpcErrorCodes,
message: string = '',
meta: unknown = undefined,
originalError: unknown = undefined,
): RpcError {
const code = RpcErrorCodes[errno];
return new RpcError(message || code, code, errno, undefined, meta || undefined);
return new RpcError(message || code, code, errno, undefined, meta || undefined, originalError);
}

public static internal(message: string = 'Internal Server Error'): RpcError {
return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message);
public static internal(originalError: unknown, message: string = 'Internal Server Error'): RpcError {
return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message, undefined, originalError);
}

public static badRequest(): RpcError {
Expand All @@ -60,7 +65,7 @@ export class RpcError extends Error implements IRpcError {
return new Value(error, RpcErrorType);
}

public static valueFrom(error: unknown, def = RpcError.internalErrorValue()): RpcErrorValue {
public static valueFrom(error: unknown, def = RpcError.internalErrorValue(error)): RpcErrorValue {
if (error instanceof Value && error.data instanceof RpcError && error.type === RpcErrorType) return error;
if (error instanceof RpcError) return RpcError.value(error);
return def;
Expand All @@ -70,8 +75,8 @@ export class RpcError extends Error implements IRpcError {
return RpcError.value(RpcError.fromCode(errno, message));
}

public static internalErrorValue(): RpcErrorValue {
return RpcError.value(RpcError.internal());
public static internalErrorValue(originalError: unknown): RpcErrorValue {
return RpcError.value(RpcError.internal(originalError));
}

public static isRpcError(error: unknown): error is RpcError {
Expand All @@ -84,6 +89,7 @@ export class RpcError extends Error implements IRpcError {
public readonly errno: number,
public readonly errorId: string | undefined,
public readonly meta: unknown | undefined,
public readonly originalError: unknown | undefined,
) {
super(message);
if (message === code) this.code = undefined;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {RpcErrorType} from '../RpcErrorType';
const codecs = new Codecs(new Writer(16));

test('can encode an internal error', () => {
const error = RpcError.internal();
const error = RpcError.internal(null);
const encoded = RpcErrorType.encode(codecs.json, error);
// console.log(RpcErrorType.encoder(EncodingFormat.Json).toString());
const json = JSON.parse(Buffer.from(encoded).toString());
Expand Down
22 changes: 22 additions & 0 deletions src/reactive-rpc/common/rpc/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,26 @@ export interface RpcClient {
* @param data Static payload data.
*/
notify(method: string, data: undefined | unknown): void;

// start(): void;
// stop(): void;
}

type TypedRpcClientFn<Request, Response> = (req: Request) => Promise<Response>;
type TypedRpcClientFn$<Request, Response> = (req: Observable<Request>) => Observable<Response>;
type UnPromise<T> = T extends Promise<infer U> ? U : T;
type UnObservable<T> = T extends Observable<infer U> ? U : T;
type UnwrapResponse<T> = UnPromise<UnObservable<T>>;

export interface TypedRpcClient<Routes extends Record<string, TypedRpcClientFn<any, any> | TypedRpcClientFn$<any, any>>>
extends RpcClient {
call$<K extends keyof Routes>(
method: K,
data: Parameters<Routes[K]>[0] | UnObservable<Parameters<Routes[K]>[0]>,
): Observable<UnwrapResponse<ReturnType<Routes[K]>>>;
call<K extends keyof Routes>(
method: K,
data: Parameters<Routes[K]>[0],
): Promise<UnwrapResponse<ReturnType<Routes[K]>>>;
notify<K extends keyof Routes>(method: K, data: UnObservable<Parameters<Routes[K]>[0]>): void;
}
1 change: 1 addition & 0 deletions src/reactive-rpc/common/rpc/types.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './client/types';
export * from './methods/types';
export * from './caller/types';

Expand Down
118 changes: 118 additions & 0 deletions src/reactive-rpc/common/testing/buildE2eClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import {Codecs} from '../../../json-pack/codecs/Codecs';
import {Fuzzer} from '../../../util/Fuzzer';
import {Writer} from '../../../util/buffers/Writer';
import {ConnectionContext} from '../../server/context';
import {RpcCodecs} from '../codec/RpcCodecs';
import {RpcMessageCodecs} from '../codec/RpcMessageCodecs';
import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage} from '../messages';
import {RpcMessageStreamProcessor, StreamingRpcClient, TypedRpcClient} from '../rpc';
import type {FunctionStreamingType, FunctionType} from '../../../json-type/type/classes';
import type {Observable} from 'rxjs';
import type {ResolveType} from '../../../json-type';
import type {TypeRouter} from '../../../json-type/system/TypeRouter';
import type {TypeRouterCaller} from '../rpc/caller/TypeRouterCaller';

export interface BuildE2eClientOptions {
/**
* Writer to use for encoding messages. Defaults to `new Writer(4 * 1024)`.
*/
writer?: Writer;

/**
* Minimum and maximum size of the default buffer in kilobytes. An actual
* size will be picked randomly between these two values. Defaults to
* `[4, 4]`. Used when `writer` is not specified.
*/
writerDefaultBufferKb?: [min: number, max: number];

/**
* Number of messages to keep in buffer before sending them to the client.
* The actual number of messages will be picked randomly between these two
* values. Defaults to `[1, 1]`.
*/
serverBufferSize?: [min: number, max: number];

/**
* Time in milliseconds for how long to buffer messages before sending them
* to the client. The actual time will be picked randomly between these two
* values. Defaults to `[0, 0]`.
*/
serverBufferTime?: [min: number, max: number];

/**
* Number of messages to keep in buffer before sending them to the server.
* The actual number of messages will be picked randomly between these two
* values. Defaults to `[1, 1]`.
*/
clientBufferSize?: [min: number, max: number];

/**
* Time in milliseconds for how long to buffer messages before sending them
* to the server. The actual time will be picked randomly between these two
* values. Defaults to `[0, 0]`.
*/
clientBufferTime?: [min: number, max: number];

/**
* IP address to use for the connection. Defaults to `0.0.0.0`.
*/
ip?: string;

/**
* Authentication token to use for the connection. Defaults to empty string.
*/
token?: string;
}

export const buildE2eClient = <Caller extends TypeRouterCaller<any>>(caller: Caller, opt: BuildE2eClientOptions) => {
const writer = opt.writer ?? new Writer(Fuzzer.randomInt2(opt.writerDefaultBufferKb ?? [4, 4]) * 1024);
const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs());
const ctx = new ConnectionContext(
opt.ip ?? '0.0.0.0',
opt.ip ?? '',
null,
{},
codecs.value.cbor,
codecs.value.cbor,
codecs.messages.binary,
);
let client: StreamingRpcClient;
const streamProcessor = new RpcMessageStreamProcessor({
caller,
send: (messages: ReactiveRpcMessage[]) => {
const encoded = ctx.msgCodec.encode(ctx.resCodec, messages);
setTimeout(() => {
const decoded = ctx.msgCodec.decodeBatch(ctx.resCodec, encoded);
client.onMessages(decoded as ReactiveRpcServerMessage[]);
}, 1);
},
bufferSize: Fuzzer.randomInt2(opt.serverBufferSize ?? [1, 1]),
bufferTime: Fuzzer.randomInt2(opt.serverBufferTime ?? [0, 0]),
});
client = new StreamingRpcClient({
send: (messages: ReactiveRpcClientMessage[]) => {
const encoded = ctx.msgCodec.encode(ctx.reqCodec, messages);
setTimeout(() => {
const decoded = ctx.msgCodec.decodeBatch(ctx.reqCodec, encoded);
streamProcessor.onMessages(decoded as ReactiveRpcClientMessage[], {});
}, 1);
},
bufferSize: Fuzzer.randomInt2(opt.clientBufferSize ?? [1, 1]),
bufferTime: Fuzzer.randomInt2(opt.clientBufferTime ?? [0, 0]),
});
type Router = UnTypeRouterCaller<Caller>;
type Routes = UnTypeRouter<Router>;
type Methods = {[K in keyof Routes]: UnwrapFunction<Routes[K]>};
const typedClient = client as TypedRpcClient<Methods>;
return {
client: typedClient,
};
};

type UnTypeRouterCaller<T> = T extends TypeRouterCaller<infer R> ? R : never;
type UnTypeRouter<T> = T extends TypeRouter<infer R> ? R : never;
type UnwrapFunction<F> = F extends FunctionType<infer Req, infer Res>
? (req: ResolveType<Req>) => Promise<ResolveType<Res>>
: F extends FunctionStreamingType<infer Req, infer Res>
? (req$: Observable<ResolveType<Req>>) => Observable<ResolveType<Res>>
: never;
Loading