From e8fc6864dfe867a872d379eee00ae5513571bc27 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 15:33:38 +0100 Subject: [PATCH 01/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20add?= =?UTF-8?q?=20typed=20rpc=20client=20interface?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/common/rpc/client/types.ts | 22 +++++++++++++++++++++ src/reactive-rpc/common/rpc/types.ts | 1 + 2 files changed, 23 insertions(+) diff --git a/src/reactive-rpc/common/rpc/client/types.ts b/src/reactive-rpc/common/rpc/client/types.ts index cafe069dad..1ca8edfa37 100644 --- a/src/reactive-rpc/common/rpc/client/types.ts +++ b/src/reactive-rpc/common/rpc/client/types.ts @@ -24,4 +24,26 @@ export interface RpcClient { * @param data Static payload data. */ notify(method: string, data: undefined | unknown): void; + + // start(): void; + // stop(): void; +} + +type TypedRpcClientFn = (req: Request) => Promise; +type TypedRpcClientFn$ = (req: Observable) => Observable; +type UnPromise = T extends Promise ? U : T; +type UnObservable = T extends Observable ? U : T; +type UnwrapResponse = UnPromise>; + +export interface TypedRpcClient | TypedRpcClientFn$>> + extends RpcClient { + call$( + method: K, + data: Parameters[0] | UnObservable[0]>, + ): Observable>>; + call( + method: K, + data: Parameters[0], + ): Promise>>; + notify(method: K, data: UnObservable[0]>): void; } diff --git a/src/reactive-rpc/common/rpc/types.ts b/src/reactive-rpc/common/rpc/types.ts index d1c7a6862f..96987a643a 100644 --- a/src/reactive-rpc/common/rpc/types.ts +++ b/src/reactive-rpc/common/rpc/types.ts @@ -1,3 +1,4 @@ +export * from './client/types'; export * from './methods/types'; export * from './caller/types'; From 74d2b1459cb1433a0033b48b49a10165955f4358 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 01:26:00 +0100 Subject: [PATCH 02/27] =?UTF-8?q?fix(reactive-rpc):=20=F0=9F=90=9B=20seque?= =?UTF-8?q?nce=20can=20be=20negative?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/routes/blocks/schema.ts | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/server/routes/blocks/schema.ts b/src/server/routes/blocks/schema.ts index 20f57df5a1..3f9e09f8b4 100644 --- a/src/server/routes/blocks/schema.ts +++ b/src/server/routes/blocks/schema.ts @@ -1,4 +1,5 @@ -import {type ResolveType, t} from '../../../json-type'; +import {type ResolveType} from '../../../json-type'; +import {t} from '../system'; export type TBlockId = ResolveType; export const BlockId = t.str.options({ @@ -11,7 +12,7 @@ export type TBlockSeq = ResolveType; export const BlockSeq = t.num.options({ title: 'Block Sequence Number', gte: 0, - format: 'u32', + format: 'i32', }); export type TBlock = ResolveType; From 413359facffbb9125b00831c9819919655d208e3 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 01:26:34 +0100 Subject: [PATCH 03/27] =?UTF-8?q?fix(reactive-rpc):=20=F0=9F=90=9B=20do=20?= =?UTF-8?q?not=20swallow=20socket=20send=20errors?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/server/uws/RpcApp.ts | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index dd47fa0ec6..7736c3220b 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -161,12 +161,16 @@ export class RpcApp { ws.rpc = new RpcMessageStreamProcessor({ caller: this.options.caller, send: (messages: ReactiveRpcMessage[]) => { - if (ws.getBufferedAmount() > maxBackpressure) return; - const writer = encoder.writer; - writer.reset(); - msgCodec.encodeBatch(resCodec, messages); - const encoded = writer.flush(); - ws.send(encoded, true, false); + try { + if (ws.getBufferedAmount() > maxBackpressure) return; + const writer = encoder.writer; + writer.reset(); + msgCodec.encodeBatch(resCodec, messages); + const encoded = writer.flush(); + ws.send(encoded, true, false); + } catch (error) { + logger.error('WS_SEND', error, {messages}); + } }, bufferSize: 1, bufferTime: 0, From 4fea4778d66a00e502a695ef3b2cf060862bad64 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 01:27:23 +0100 Subject: [PATCH 04/27] =?UTF-8?q?fix(reactive-rpc):=20=F0=9F=90=9B=20use?= =?UTF-8?q?=20global=20JSON=20Type=20system=20for=20RPC=20types?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/routes/index.ts | 5 ++--- src/server/routes/presence/schema.ts | 3 ++- src/server/routes/system.ts | 4 ++++ 3 files changed, 8 insertions(+), 4 deletions(-) create mode 100644 src/server/routes/system.ts diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts index e214200633..eb1f380de1 100644 --- a/src/server/routes/index.ts +++ b/src/server/routes/index.ts @@ -1,14 +1,13 @@ import {routes} from './routes'; -import {TypeSystem} from '../../json-type'; +import {system} from './system'; import {TypeRouter} from '../../json-type/system/TypeRouter'; import {TypeRouterCaller} from '../../reactive-rpc/common/rpc/caller/TypeRouterCaller'; import {RpcError} from '../../reactive-rpc/common/rpc/caller'; +import {Value} from '../../reactive-rpc/common/messages/Value'; import type {Services} from '../services/Services'; import type {RouteDeps} from './types'; -import {Value} from '../../reactive-rpc/common/messages/Value'; export const createRouter = (services: Services) => { - const system = new TypeSystem(); const router = new TypeRouter({system, routes: {}}); const deps: RouteDeps = {services, router}; return routes(deps)(router); diff --git a/src/server/routes/presence/schema.ts b/src/server/routes/presence/schema.ts index 729733eb94..e8eca6d8a1 100644 --- a/src/server/routes/presence/schema.ts +++ b/src/server/routes/presence/schema.ts @@ -1,4 +1,5 @@ -import {type ResolveType, t} from '../../../json-type'; +import {type ResolveType} from '../../../json-type'; +import {t} from '../system'; export const PresenceEntry = t.Object( t.prop('id', t.str), diff --git a/src/server/routes/system.ts b/src/server/routes/system.ts new file mode 100644 index 0000000000..28d0e17ae2 --- /dev/null +++ b/src/server/routes/system.ts @@ -0,0 +1,4 @@ +import {TypeSystem} from '../../json-type'; + +export const system = new TypeSystem(); +export const t = system.t; From 6827a5afa15c920fde7a4a4ea7c01065112b24f6 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 11:48:52 +0100 Subject: [PATCH 05/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20impr?= =?UTF-8?q?ove=20how=20404=20method=20is=20reported?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/common/rpc/caller/RpcCaller.ts | 4 ++-- src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/reactive-rpc/common/rpc/caller/RpcCaller.ts b/src/reactive-rpc/common/rpc/caller/RpcCaller.ts index d1987f1ccd..3d825199ce 100644 --- a/src/reactive-rpc/common/rpc/caller/RpcCaller.ts +++ b/src/reactive-rpc/common/rpc/caller/RpcCaller.ts @@ -86,9 +86,9 @@ export class RpcCaller { * @returns Response data. */ public async call(name: string, request: unknown, ctx: Ctx): Promise> { + const method = this.getMethodStrict(name); + this.validate(method, request); try { - const method = this.getMethodStrict(name); - this.validate(method, request); const preCall = method.onPreCall; if (preCall) await preCall(ctx, request); const data = await method.call(request, ctx); diff --git a/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts b/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts index 0788db5ab5..8021c7b025 100644 --- a/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts +++ b/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts @@ -30,13 +30,11 @@ export class TypeRouterCaller, Ctx = unknown> ext public readonly req: {[K in keyof Routes]: MethodReq[K]>} = null as any; public readonly res: {[K in keyof Routes]: MethodRes[K]>} = null as any; - public get>(id: K): MethodDefinition[K]> { + public get>(id: K): MethodDefinition[K]> | undefined { let method = this.methods.get(id as string) as any; if (method) return method; const fn = this.router.routes[id as string]; - // TODO: do this check without relying on constructor and importing the `FunctionType` class. - if (!fn || !(fn instanceof FunctionType || fn instanceof FunctionStreamingType)) - throw RpcError.valueFromCode(RpcErrorCodes.METHOD_NOT_FOUND, `Type [alias = ${id as string}] is not a function.`); + if (!fn || !(fn instanceof FunctionType || fn instanceof FunctionStreamingType)) return undefined; const validator = fn.req.validator('object'); const requestSchema = (fn.req as AbstractType).getSchema(); const isRequestVoid = requestSchema.__t === 'const' && requestSchema.value === undefined; From 1cc819a4e5036e280fb541c78b0a9d51005c2437 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 12:02:22 +0100 Subject: [PATCH 06/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20wrap?= =?UTF-8?q?=20in=20try/catch=20websocket=20routes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/server/uws/RpcApp.ts | 78 +++++++++++++++------------ 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index 7736c3220b..10f084ae1f 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -139,7 +139,9 @@ export class RpcApp { public enableWsRpc(path: string = '/rpc'): this { const maxBackpressure = 4 * 1024 * 1024; const augmentContext = this.options.augmentContext ?? noop; - const logger = this.options.logger ?? console; + const options = this.options; + const logger = options.logger ?? console; + const caller = options.caller; this.app.ws(path, { idleTimeout: 0, maxPayloadLength: 4 * 1024 * 1024, @@ -153,46 +155,54 @@ export class RpcApp { res.upgrade({ctx}, secWebSocketKey, secWebSocketProtocol, secWebSocketExtensions, context); }, open: (ws_: types.WebSocket) => { - const ws = ws_ as types.RpcWebSocket; - const ctx = ws.ctx; - const resCodec = ctx.resCodec; - const msgCodec = ctx.msgCodec; - const encoder = resCodec.encoder; - ws.rpc = new RpcMessageStreamProcessor({ - caller: this.options.caller, - send: (messages: ReactiveRpcMessage[]) => { - try { - if (ws.getBufferedAmount() > maxBackpressure) return; - const writer = encoder.writer; - writer.reset(); - msgCodec.encodeBatch(resCodec, messages); - const encoded = writer.flush(); - ws.send(encoded, true, false); - } catch (error) { - logger.error('WS_SEND', error, {messages}); - } - }, - bufferSize: 1, - bufferTime: 0, - }); + try { + const ws = ws_ as types.RpcWebSocket; + const ctx = ws.ctx; + const resCodec = ctx.resCodec; + const msgCodec = ctx.msgCodec; + const encoder = resCodec.encoder; + ws.rpc = new RpcMessageStreamProcessor({ + caller, + send: (messages: ReactiveRpcMessage[]) => { + try { + if (ws.getBufferedAmount() > maxBackpressure) return; + const writer = encoder.writer; + writer.reset(); + msgCodec.encodeBatch(resCodec, messages); + const encoded = writer.flush(); + ws.send(encoded, true, false); + } catch (error) { + logger.error('WS_SEND', error, {messages}); + } + }, + bufferSize: 1, + bufferTime: 0, + }); + } catch (error) { + logger.error('RX_WS_OPEN', error); + } }, message: (ws_: types.WebSocket, buf: ArrayBuffer, isBinary: boolean) => { - const ws = ws_ as types.RpcWebSocket; - const ctx = ws.ctx; - const reqCodec = ctx.reqCodec; - const msgCodec = ctx.msgCodec; - const uint8 = new Uint8Array(buf); - const rpc = ws.rpc!; try { - const messages = msgCodec.decodeBatch(reqCodec, uint8) as ReactiveRpcClientMessage[]; + const ws = ws_ as types.RpcWebSocket; + const ctx = ws.ctx; + const reqCodec = ctx.reqCodec; + const msgCodec = ctx.msgCodec; + const uint8 = new Uint8Array(buf); + const rpc = ws.rpc!; try { - rpc.onMessages(messages, ctx); + const messages = msgCodec.decodeBatch(reqCodec, uint8) as ReactiveRpcClientMessage[]; + try { + rpc.onMessages(messages, ctx); + } catch (error) { + logger.error('RX_RPC_PROCESSING', error, messages); + return; + } } catch (error) { - logger.error('RX_RPC_PROCESSING_ERROR', error, messages); - return; + logger.error('RX_RPC_DECODING', error, {codec: reqCodec.id, buf: Buffer.from(uint8).toString()}); } } catch (error) { - logger.error('RX_RPC_DECODING_ERROR', error, {codec: reqCodec.id, buf: Buffer.from(uint8).toString()}); + logger.error('RX_WS_MESSAGE', error); } }, close: (ws_: types.WebSocket, code: number, message: ArrayBuffer) => { From a7cc7c1e4fabd60337d2365c1e251c97e9c11828 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 12:28:49 +0100 Subject: [PATCH 07/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20impr?= =?UTF-8?q?ove=20error=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/rpc/RpcMessageBatchProcessor.ts | 2 +- .../RpcMessageStreamProcessor.spec.ts | 6 +- .../common/rpc/__tests__/sample-api.ts | 4 +- .../common/rpc/caller/error/RpcError.ts | 17 ++-- .../error/__tests__/RpcErrorType.spec.ts | 2 +- src/reactive-rpc/server/uws/RpcApp.ts | 95 +++++++++---------- 6 files changed, 60 insertions(+), 66 deletions(-) diff --git a/src/reactive-rpc/common/rpc/RpcMessageBatchProcessor.ts b/src/reactive-rpc/common/rpc/RpcMessageBatchProcessor.ts index 1e553c3ae7..e8f7410e32 100644 --- a/src/reactive-rpc/common/rpc/RpcMessageBatchProcessor.ts +++ b/src/reactive-rpc/common/rpc/RpcMessageBatchProcessor.ts @@ -59,7 +59,7 @@ export class RpcMessageBatchProcessor { } return result; } catch (error) { - const value = RpcError.internalErrorValue(); + const value = RpcError.internalErrorValue(error); return [new msg.ResponseErrorMessage(-1, value)]; } } diff --git a/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts b/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts index a5fa6f537b..ad33a07e5c 100644 --- a/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts +++ b/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts @@ -51,7 +51,7 @@ const setup = ( isStreaming: false, call: async () => { // tslint:disable-next-line:no-string-throw - throw RpcError.internal('this promise can throw'); + throw RpcError.internal(null, 'this promise can throw'); }, }, promiseDelay: { @@ -64,7 +64,7 @@ const setup = ( error: { isStreaming: false, call: async () => { - throw RpcError.internal('this promise can throw'); + throw RpcError.internal(null, 'this promise can throw'); }, }, emitOnceSync: { @@ -635,7 +635,7 @@ describe('pre-call checks', () => { test('fails call when pre-call checks fail', async () => { const onPreCall = jest.fn(async (request) => { - throw RpcError.internal('fail...'); + throw RpcError.internal(null, 'fail...'); }); const {server, send} = setup( {}, diff --git a/src/reactive-rpc/common/rpc/__tests__/sample-api.ts b/src/reactive-rpc/common/rpc/__tests__/sample-api.ts index 9b9ce3e7f7..5ed0e797e8 100644 --- a/src/reactive-rpc/common/rpc/__tests__/sample-api.ts +++ b/src/reactive-rpc/common/rpc/__tests__/sample-api.ts @@ -76,7 +76,7 @@ const double: IStaticRpcMethod = { const error: IStaticRpcMethod = { isStreaming: false, call: async () => { - throw new RpcError('this promise can throw', '', 0, '', undefined); + throw new RpcError('this promise can throw', '', 0, '', undefined, undefined); }, }; @@ -96,7 +96,7 @@ const streamError: IStreamingRpcMethod = { call$: () => from( (async () => { - throw RpcError.internal('Stream always errors'); + throw RpcError.internal(null, 'Stream always errors'); })(), ), }; diff --git a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts index 30d2de254a..fe793a5614 100644 --- a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts +++ b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts @@ -36,16 +36,16 @@ export type RpcErrorValue = Value; export class RpcError extends Error implements IRpcError { public static from(error: unknown) { if (error instanceof RpcError) return error; - return RpcError.internal(); + return RpcError.internal(error); } - public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined): RpcError { + public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined, originalError: unknown = undefined): RpcError { const code = RpcErrorCodes[errno]; - return new RpcError(message || code, code, errno, undefined, meta || undefined); + return new RpcError(message || code, code, errno, undefined, meta || undefined, originalError); } - public static internal(message: string = 'Internal Server Error'): RpcError { - return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message); + public static internal(originalError: unknown, message: string = 'Internal Server Error'): RpcError { + return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message, undefined, originalError); } public static badRequest(): RpcError { @@ -60,7 +60,7 @@ export class RpcError extends Error implements IRpcError { return new Value(error, RpcErrorType); } - public static valueFrom(error: unknown, def = RpcError.internalErrorValue()): RpcErrorValue { + public static valueFrom(error: unknown, def = RpcError.internalErrorValue(error)): RpcErrorValue { if (error instanceof Value && error.data instanceof RpcError && error.type === RpcErrorType) return error; if (error instanceof RpcError) return RpcError.value(error); return def; @@ -70,8 +70,8 @@ export class RpcError extends Error implements IRpcError { return RpcError.value(RpcError.fromCode(errno, message)); } - public static internalErrorValue(): RpcErrorValue { - return RpcError.value(RpcError.internal()); + public static internalErrorValue(originalError: unknown): RpcErrorValue { + return RpcError.value(RpcError.internal(originalError)); } public static isRpcError(error: unknown): error is RpcError { @@ -84,6 +84,7 @@ export class RpcError extends Error implements IRpcError { public readonly errno: number, public readonly errorId: string | undefined, public readonly meta: unknown | undefined, + public readonly originalError: unknown | undefined, ) { super(message); if (message === code) this.code = undefined; diff --git a/src/reactive-rpc/common/rpc/caller/error/__tests__/RpcErrorType.spec.ts b/src/reactive-rpc/common/rpc/caller/error/__tests__/RpcErrorType.spec.ts index f5bc858aeb..3cb8b62dad 100644 --- a/src/reactive-rpc/common/rpc/caller/error/__tests__/RpcErrorType.spec.ts +++ b/src/reactive-rpc/common/rpc/caller/error/__tests__/RpcErrorType.spec.ts @@ -6,7 +6,7 @@ import {RpcErrorType} from '../RpcErrorType'; const codecs = new Codecs(new Writer(16)); test('can encode an internal error', () => { - const error = RpcError.internal(); + const error = RpcError.internal(null); const encoded = RpcErrorType.encode(codecs.json, error); // console.log(RpcErrorType.encoder(EncodingFormat.Json).toString()); const json = JSON.parse(Buffer.from(encoded).toString()); diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index 10f084ae1f..10ec66bcf8 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -5,22 +5,18 @@ import {RpcError, RpcErrorCodes, RpcErrorType} from '../../common/rpc/caller/err import {ConnectionContext} from '../context'; import {RpcMessageCodecs} from '../../common/codec/RpcMessageCodecs'; import {Value} from '../../common/messages/Value'; -import {EncodingFormat} from '../../../json-pack/constants'; -import {RpcMessageFormat} from '../../common/codec/constants'; import {RpcCodecs} from '../../common/codec/RpcCodecs'; import {Codecs} from '../../../json-pack/codecs/Codecs'; +import {Writer} from '../../../util/buffers/Writer'; import {type ReactiveRpcMessage, RpcMessageStreamProcessor, ReactiveRpcClientMessage} from '../../common'; import type * as types from './types'; import type {RouteHandler} from './types'; import type {RpcCaller} from '../../common/rpc/caller/RpcCaller'; import type {JsonValueCodec} from '../../../json-pack/codecs/types'; -import {Writer} from '../../../util/buffers/Writer'; const HDR_BAD_REQUEST = Buffer.from('400 Bad Request', 'utf8'); const HDR_NOT_FOUND = Buffer.from('404 Not Found', 'utf8'); -const HDR_INTERNAL_SERVER_ERROR = Buffer.from('500 Internal Server Error', 'utf8'); const ERR_NOT_FOUND = RpcError.fromCode(RpcErrorCodes.NOT_FOUND, 'Not Found'); -const ERR_INTERNAL = RpcError.internal(); const noop = (x: any) => {}; @@ -121,16 +117,22 @@ export class RpcApp { if (res.aborted) return; const messageCodec = ctx.msgCodec; const incomingMessages = messageCodec.decodeBatch(ctx.reqCodec, bodyUint8); - const outgoingMessages = await this.batchProcessor.onBatch(incomingMessages as IncomingBatchMessage[], ctx); - if (res.aborted) return; - const resCodec = ctx.resCodec; - messageCodec.encodeBatch(resCodec, outgoingMessages); - const buf = resCodec.encoder.writer.flush(); - if (res.aborted) return; - res.end(buf); - } catch (err: any) { - if (typeof err === 'object' && err) if (err.message === 'Invalid JSON') throw RpcError.badRequest(); - throw RpcError.from(err); + try { + const outgoingMessages = await this.batchProcessor.onBatch(incomingMessages as IncomingBatchMessage[], ctx); + if (res.aborted) return; + const resCodec = ctx.resCodec; + messageCodec.encodeBatch(resCodec, outgoingMessages); + const buf = resCodec.encoder.writer.flush(); + if (res.aborted) return; + res.end(buf); + } catch (error) { + const logger = this.options.logger ?? console; + logger.error('HTTP_RPC_PROCESSING', error, {messages: incomingMessages}); + throw RpcError.internal(error); + } + } catch (error) { + if (typeof error === 'object' && error) if ((error as any).message === 'Invalid JSON') throw RpcError.badRequest(); + throw RpcError.from(error); } }); return this; @@ -221,49 +223,40 @@ export class RpcApp { const augmentContext = options.augmentContext ?? noop; const logger = options.logger ?? console; this.app.any('/*', async (res: types.HttpResponse, req: types.HttpRequest) => { - res.onAborted(() => { - res.aborted = true; - }); - const method = req.getMethod(); - const url = req.getUrl(); try { - const match = matcher(method + url) as undefined | Match; - if (!match) { - res.cork(() => { - res.writeStatus(HDR_NOT_FOUND); - res.end(RpcErrorType.encode(responseCodec, ERR_NOT_FOUND)); - }); - return; - } - const handler = match.data as RouteHandler; - const params = match.params; - const ctx = ConnectionContext.fromReqRes(req, res, params, this) as Ctx; - responseCodec = ctx.resCodec; - augmentContext(ctx); - await handler(ctx); - } catch (err) { - if (err instanceof RpcError) { + res.onAborted(() => { + res.aborted = true; + }); + const method = req.getMethod(); + const url = req.getUrl(); + try { + const match = matcher(method + url) as undefined | Match; + if (!match) { + res.cork(() => { + res.writeStatus(HDR_NOT_FOUND); + res.end(RpcErrorType.encode(responseCodec, ERR_NOT_FOUND)); + }); + return; + } + const handler = match.data as RouteHandler; + const params = match.params; + const ctx = ConnectionContext.fromReqRes(req, res, params, this) as Ctx; + responseCodec = ctx.resCodec; + augmentContext(ctx); + await handler(ctx); + } catch (err) { + if (err instanceof Value) err = err.data; + if (!(err instanceof RpcError)) err = RpcError.from(err); const error = err; + if (error.errno === RpcErrorCodes.INTERNAL_ERROR) { + logger.error('UWS_ROUTER_INTERNAL_ERROR', error, {originalError: error.originalError ?? null}); + } res.cork(() => { res.writeStatus(HDR_BAD_REQUEST); res.end(RpcErrorType.encode(responseCodec, error)); }); - return; - } - if (err instanceof Value && err.data instanceof RpcError) { - const error = err.data; - res.cork(() => { - res.writeStatus(HDR_BAD_REQUEST); - res.end(RpcErrorType.encode(responseCodec, error)); - }); - return; } - logger.error('UWS_ROUTER_INTERNAL_ERROR', err); - res.cork(() => { - res.writeStatus(HDR_INTERNAL_SERVER_ERROR); - res.end(RpcErrorType.encode(responseCodec, ERR_INTERNAL)); - }); - } + } catch {} }); } From 7a88aae2149ece576319e854506244559cdffd11 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 12:29:31 +0100 Subject: [PATCH 08/27] =?UTF-8?q?style(reactive-rpc):=20=F0=9F=92=84=20run?= =?UTF-8?q?=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/common/rpc/caller/error/RpcError.ts | 7 ++++++- src/reactive-rpc/server/uws/RpcApp.ts | 3 ++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts index fe793a5614..36ef63d139 100644 --- a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts +++ b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts @@ -39,7 +39,12 @@ export class RpcError extends Error implements IRpcError { return RpcError.internal(error); } - public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined, originalError: unknown = undefined): RpcError { + public static fromCode( + errno: RpcErrorCodes, + message: string = '', + meta: unknown = undefined, + originalError: unknown = undefined, + ): RpcError { const code = RpcErrorCodes[errno]; return new RpcError(message || code, code, errno, undefined, meta || undefined, originalError); } diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index 10ec66bcf8..49e554f6bc 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -131,7 +131,8 @@ export class RpcApp { throw RpcError.internal(error); } } catch (error) { - if (typeof error === 'object' && error) if ((error as any).message === 'Invalid JSON') throw RpcError.badRequest(); + if (typeof error === 'object' && error) + if ((error as any).message === 'Invalid JSON') throw RpcError.badRequest(); throw RpcError.from(error); } }); From a79fe867b6e2a4326e5bccbac0aa0d0525020207 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 12:39:20 +0100 Subject: [PATCH 09/27] =?UTF-8?q?test:=20=F0=9F=92=8D=20fix=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/codec/compact/__tests__/smoke-tests.spec.ts | 2 +- .../common/rpc/caller/__tests__/RpcApiCaller.spec.ts | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/src/reactive-rpc/common/codec/compact/__tests__/smoke-tests.spec.ts b/src/reactive-rpc/common/codec/compact/__tests__/smoke-tests.spec.ts index e731420499..b4aa178ded 100644 --- a/src/reactive-rpc/common/codec/compact/__tests__/smoke-tests.spec.ts +++ b/src/reactive-rpc/common/codec/compact/__tests__/smoke-tests.spec.ts @@ -84,7 +84,7 @@ for (const jsonCodec of codecList) { }); test('Response Error typed', () => { - const value = RpcError.internalErrorValue(); + const value = RpcError.internalErrorValue(null); const message = new ResponseErrorMessage(123, value); const encoded = codec.encode(jsonCodec, [message]); const decoded1 = jsonCodec.decoder.read(encoded); diff --git a/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts b/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts index 485b1ad9e5..35dc828ff7 100644 --- a/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts +++ b/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts @@ -46,7 +46,7 @@ describe('static calls', () => { }, }); const [, error] = await of(caller.call('test', {}, {})); - expect(error).toEqual(RpcError.internalErrorValue()); + expect(error).toEqual(RpcError.internalErrorValue(null)); }); }); @@ -95,10 +95,10 @@ describe('streaming calls', () => { }); const [, error1] = await of(caller.call('test', {}, {})); - expect(error1).toEqual(RpcError.internalErrorValue()); + expect(error1).toEqual(RpcError.internalErrorValue(null)); const [, error2] = await of(Rx.firstValueFrom(caller.call$('test', Rx.of(undefined), {}))); - expect(error2).toEqual(RpcError.internalErrorValue()); + expect(error2).toEqual(RpcError.internalErrorValue(null)); }); }); From 6cc718236025659af6a9cfb2c7c3e536bffc2c5e Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 14:05:13 +0100 Subject: [PATCH 10/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20impr?= =?UTF-8?q?ove=20auth=20token=20ingestion=20specifier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/server/context.ts | 64 +++++++++++++++++------------- 1 file changed, 36 insertions(+), 28 deletions(-) diff --git a/src/reactive-rpc/server/context.ts b/src/reactive-rpc/server/context.ts index 06a2db7e00..b9c973a6a5 100644 --- a/src/reactive-rpc/server/context.ts +++ b/src/reactive-rpc/server/context.ts @@ -7,9 +7,8 @@ import type {RpcApp} from './uws/RpcApp'; import type {HttpRequest, HttpResponse} from './uws/types'; import type {RpcCodecs} from '../common/codec/RpcCodecs'; -const X_AUTH_PARAM = 'X-Authorization='; -const X_AUTH_PARAM_LENGTH = X_AUTH_PARAM.length; -const CODECS_REGEX = /rpc.(\w{0,32})\.(\w{0,32})\.(\w{0,32})(?:\-(\w{0,32}))?/; +const REGEX_AUTH_TOKEN_SPECIFIER = /tkn\.([a-zA-Z0-9\-_]+)(?:[^a-zA-Z0-9\-_]|$)/; +const REGEX_CODECS_SPECIFIER = /rpc\.(\w{0,32})\.(\w{0,32})\.(\w{0,32})(?:\-(\w{0,32}))?/; export class ConnectionContext> { private static findIp(req: HttpRequest, res: HttpResponse): string { @@ -20,14 +19,37 @@ export class ConnectionContext> { ); } - private static findToken(req: HttpRequest, params: string[] | null): string { - let token: string = req.getHeader('authorization') || ''; - if (!token) { - const query = req.getQuery(); - const params = new URLSearchParams(query); - token = params.get('access_token') || ''; - if (!token) token = params.get('token') || ''; - } + private static findTokenInText(text: string): string { + const match = REGEX_AUTH_TOKEN_SPECIFIER.exec(text); + if (!match) return ''; + return match[1] || ''; + } + + /** + * Looks for an authentication token in the following places: + * + * 1. The `Authorization` header. + * 2. The URI query parameters. + * 3. The `Cookie` header. + * 4. The `Sec-Websocket-Protocol` header. + * + * @param req HTTP request + * @returns Authentication token, if any. + */ + private static findToken(req: HttpRequest): string { + let token: string = ''; + let text: string = ''; + text = req.getHeader('authorization'); + if (text) token = ConnectionContext.findTokenInText(text); + if (token) return token; + text = req.getQuery(); + if (text) token = ConnectionContext.findTokenInText(text); + if (token) return token; + text = req.getHeader('cookie'); + if (text) token = ConnectionContext.findTokenInText(text); + if (token) return token; + text = req.getHeader('sec-websocket-protocol'); + if (text) token = ConnectionContext.findTokenInText(text); return token; } @@ -38,7 +60,7 @@ export class ConnectionContext> { app: RpcApp, ): ConnectionContext { const ip = ConnectionContext.findIp(req, res); - const token: string = ConnectionContext.findToken(req, params); + const token: string = ConnectionContext.findToken(req); const codecs = app.codecs; const valueCodecs = codecs.value; const ctx = new ConnectionContext( @@ -64,21 +86,7 @@ export class ConnectionContext> { app: RpcApp, ): ConnectionContext { const ip = ConnectionContext.findIp(req, res); - let token: string = ConnectionContext.findToken(req, params); - if (!token && secWebSocketProtocol) { - const protocols = secWebSocketProtocol.split(','); - const length = protocols.length; - for (let i = 0; i < length; i++) { - let protocol = protocols[i].trim(); - if (protocol.startsWith(X_AUTH_PARAM)) { - protocol = protocol.slice(X_AUTH_PARAM_LENGTH); - if (protocol) { - token = Buffer.from(protocol, 'base64').toString(); - break; - } - } - } - } + const token: string = ConnectionContext.findToken(req); const codecs = app.codecs; const valueCodecs = codecs.value; const ctx = new ConnectionContext( @@ -114,7 +122,7 @@ export class ConnectionContext> { * - `rpc.json2.verbose.json` for JSON-RPC 2.0 with verbose messages encoded as JSON. */ public setCodecs(specifier: string, codecs: RpcCodecs): void { - const match = CODECS_REGEX.exec(specifier); + const match = REGEX_CODECS_SPECIFIER.exec(specifier); if (!match) return; const [, protocol, messageFormat, request, response] = match; switch (protocol) { From 5907992805c6cdf0a067bfb2aa48a61416cd29ec Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 14:07:51 +0100 Subject: [PATCH 11/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20allo?= =?UTF-8?q?w=20user=20to=20provide=20authentication=20token?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/browser/createBinaryWsRpcClient.ts | 10 ++++++++-- src/reactive-rpc/browser/createJsonWsRpcClient.ts | 10 ++++++++-- 2 files changed, 16 insertions(+), 4 deletions(-) diff --git a/src/reactive-rpc/browser/createBinaryWsRpcClient.ts b/src/reactive-rpc/browser/createBinaryWsRpcClient.ts index 60b65b4c0b..dbe9cec554 100644 --- a/src/reactive-rpc/browser/createBinaryWsRpcClient.ts +++ b/src/reactive-rpc/browser/createBinaryWsRpcClient.ts @@ -4,7 +4,13 @@ import {RpcPersistentClient, WebSocketChannel} from '../common'; import {RpcCodec} from '../common/codec/RpcCodec'; import {BinaryRpcMessageCodec} from '../common/codec/binary'; -export const createBinaryWsRpcClient = (url: string) => { +/** + * Constructs a JSON Reactive RPC client. + * @param url RPC endpoint. + * @param token Authentication token. + * @returns An RPC client. + */ +export const createBinaryWsRpcClient = (url: string, token: string) => { const writer = new Writer(1024 * 4); const msg = new BinaryRpcMessageCodec(); const req = new CborJsonValueCodec(writer); @@ -14,7 +20,7 @@ export const createBinaryWsRpcClient = (url: string) => { channel: { newChannel: () => new WebSocketChannel({ - newSocket: () => new WebSocket(url, [codec.specifier()]), + newSocket: () => new WebSocket(url, [codec.specifier(), token]), }), }, }); diff --git a/src/reactive-rpc/browser/createJsonWsRpcClient.ts b/src/reactive-rpc/browser/createJsonWsRpcClient.ts index 3e50e4abff..ec8cdb07b7 100644 --- a/src/reactive-rpc/browser/createJsonWsRpcClient.ts +++ b/src/reactive-rpc/browser/createJsonWsRpcClient.ts @@ -4,7 +4,13 @@ import {RpcPersistentClient, WebSocketChannel} from '../common'; import {RpcCodec} from '../common/codec/RpcCodec'; import {CompactRpcMessageCodec} from '../common/codec/compact'; -export const createJsonWsRpcClient = (url: string) => { +/** + * Constructs a JSON Reactive RPC client. + * @param url RPC endpoint. + * @param token Authentication token. + * @returns An RPC client. + */ +export const createJsonWsRpcClient = (url: string, token: string) => { const writer = new Writer(1024 * 4); const msg = new CompactRpcMessageCodec(); const req = new JsonJsonValueCodec(writer); @@ -14,7 +20,7 @@ export const createJsonWsRpcClient = (url: string) => { channel: { newChannel: () => new WebSocketChannel({ - newSocket: () => new WebSocket(url, [codec.specifier()]), + newSocket: () => new WebSocket(url, [codec.specifier(), token]), }), }, }); From fd65e497954da980140e050de88a48666bdf9047 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 14:09:36 +0100 Subject: [PATCH 12/27] =?UTF-8?q?style(reactive-rpc):=20=F0=9F=92=84=20run?= =?UTF-8?q?=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/server/context.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/reactive-rpc/server/context.ts b/src/reactive-rpc/server/context.ts index b9c973a6a5..210fb1ce6b 100644 --- a/src/reactive-rpc/server/context.ts +++ b/src/reactive-rpc/server/context.ts @@ -27,12 +27,12 @@ export class ConnectionContext> { /** * Looks for an authentication token in the following places: - * + * * 1. The `Authorization` header. * 2. The URI query parameters. * 3. The `Cookie` header. * 4. The `Sec-Websocket-Protocol` header. - * + * * @param req HTTP request * @returns Authentication token, if any. */ From c88732e0541d12005b67d8307798609899861783 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 15:08:22 +0100 Subject: [PATCH 13/27] =?UTF-8?q?fix(reactive-rpc):=20=F0=9F=90=9B=20copy?= =?UTF-8?q?=20websocket=20message=20payload?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/server/context.ts | 7 +++++-- src/reactive-rpc/server/uws/RpcApp.ts | 5 +++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/reactive-rpc/server/context.ts b/src/reactive-rpc/server/context.ts index 210fb1ce6b..d4f7a6618a 100644 --- a/src/reactive-rpc/server/context.ts +++ b/src/reactive-rpc/server/context.ts @@ -182,8 +182,11 @@ export class ConnectionContext> { let running = 0; res.onData((ab, isLast) => { running += ab.byteLength; - if (running > max) res.end('too large'); - // Last `ab` does not need to be copied. + if (running > max) { + res.aborted = true; + res.end('too large'); + } + // Last `ab` does not need to be copied, as per docs. if (isLast) list.push(new Uint8Array(ab)), resolve(list); else list.push(copy(new Uint8Array(ab))); }); diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index 49e554f6bc..de1dd40e3d 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -8,6 +8,7 @@ import {Value} from '../../common/messages/Value'; import {RpcCodecs} from '../../common/codec/RpcCodecs'; import {Codecs} from '../../../json-pack/codecs/Codecs'; import {Writer} from '../../../util/buffers/Writer'; +import {copy} from '../../../util/buffers/copy'; import {type ReactiveRpcMessage, RpcMessageStreamProcessor, ReactiveRpcClientMessage} from '../../common'; import type * as types from './types'; import type {RouteHandler} from './types'; @@ -191,7 +192,7 @@ export class RpcApp { const ctx = ws.ctx; const reqCodec = ctx.reqCodec; const msgCodec = ctx.msgCodec; - const uint8 = new Uint8Array(buf); + const uint8 = copy(new Uint8Array(buf)); const rpc = ws.rpc!; try { const messages = msgCodec.decodeBatch(reqCodec, uint8) as ReactiveRpcClientMessage[]; @@ -275,7 +276,7 @@ export class RpcApp { if (token) { logger.log({msg: 'SERVER_STARTED', url: `http://localhost:${port}`}); } else { - logger.error(`Failed to listen on ${port} port.`); + logger.error('SERVER_START', new Error(`Failed to listen on ${port} port.`)); } }); } From d79083c2bc45f881efdb38d8a160c65c2c7d9b17 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 15:23:28 +0100 Subject: [PATCH 14/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20add?= =?UTF-8?q?=20blocks.history=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/blocks.spec.ts | 58 +++++++++++++++++++- src/server/routes/blocks/index.ts | 4 +- src/server/routes/blocks/methods/history.ts | 45 +++++++++++++++ src/server/services/blocks/BlocksServices.ts | 6 ++ 4 files changed, 110 insertions(+), 3 deletions(-) create mode 100644 src/server/routes/blocks/methods/history.ts diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/blocks.spec.ts index 28157d0733..f1918e8dba 100644 --- a/src/server/__tests__/blocks.spec.ts +++ b/src/server/__tests__/blocks.spec.ts @@ -1,7 +1,6 @@ import {of} from 'rxjs'; import {Model} from '../../json-crdt'; -import {Value} from '../../reactive-rpc/common/messages/Value'; -import {RpcError, RpcErrorCodes} from '../../reactive-rpc/common/rpc/caller'; +import {RpcErrorCodes} from '../../reactive-rpc/common/rpc/caller'; import {setup} from './setup'; import {tick, until} from '../../__tests__/util'; @@ -354,4 +353,59 @@ describe('blocks.*', () => { expect(emits[1].data.deleted).toBe(true); }); }); + + describe('blocks.history', () => { + test('can retrieve change history', async () => { + const {call} = setup(); + const model = Model.withLogicalClock(); + model.api.root({ + text: 'Hell', + }); + const patch1 = model.api.flush(); + await call('blocks.create', {id: 'my-block', patches: [{ + seq: 0, + created: Date.now(), + blob: patch1.toBinary(), + }]}); + await tick(11); + model.api.str(['text']).ins(4, 'o'); + const patch2 = model.api.flush(); + model.api.obj([]).set({ + age: 26, + }); + const patch3 = model.api.flush(); + await call('blocks.edit', {id: 'my-block', patches: [ + { + seq: 1, + created: Date.now(), + blob: patch2.toBinary(), + }, + { + seq: 2, + created: Date.now(), + blob: patch3.toBinary(), + }, + ]}); + const history = await call('blocks.history', {id: 'my-block', min: 0, max: 2}); + expect(history).toMatchObject({ + patches: [ + { + seq: 0, + created: expect.any(Number), + blob: patch1.toBinary(), + }, + { + seq: 1, + created: expect.any(Number), + blob: patch2.toBinary(), + }, + { + seq: 2, + created: expect.any(Number), + blob: patch3.toBinary(), + }, + ], + }); + }); + }); }); diff --git a/src/server/routes/blocks/index.ts b/src/server/routes/blocks/index.ts index fb39b20751..ebe2dce3c7 100644 --- a/src/server/routes/blocks/index.ts +++ b/src/server/routes/blocks/index.ts @@ -4,6 +4,7 @@ import {remove} from './methods/remove'; import {edit} from './methods/edit'; import {listen} from './methods/listen'; import {Block, BlockId, BlockPatch, BlockSeq} from './schema'; +import {history} from './methods/history'; import type {RoutesBase, TypeRouter} from '../../../json-type/system/TypeRouter'; import type {RouteDeps} from '../types'; @@ -22,5 +23,6 @@ export const blocks = ( remove(d) ( edit(d) ( listen(d) - ( r ))))))); + ( history(d) + ( r )))))))); }; diff --git a/src/server/routes/blocks/methods/history.ts b/src/server/routes/blocks/methods/history.ts new file mode 100644 index 0000000000..9ac1d9dbe8 --- /dev/null +++ b/src/server/routes/blocks/methods/history.ts @@ -0,0 +1,45 @@ +import type {BlockPatch, BlockId} from '../schema'; +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; + +export const history = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const Request = t.Object( + t.prop('id', t.Ref('BlockId')).options({ + title: 'Block ID', + description: 'The ID of the block.', + }), + t.prop('max', t.num.options({format: 'u32'})).options({ + title: 'Max', + description: 'The maximum sequence number to return.', + }), + t.prop('min', t.num.options({format: 'u32'})).options({ + title: 'Min', + description: 'The minimum sequence number to return.', + }), + ); + + const Response = t.Object( + t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ + title: 'Patches', + description: 'The list of patches.', + }) + ); + + const Func = t + .Function(Request, Response) + .options({ + title: 'Block History', + intro: 'Fetch block history.', + description: 'Returns a list of specified change patches for a block.', + }) + .implement(async ({id, min, max}) => { + const {patches} = await services.blocks.history(id, min, max); + return {patches}; + }); + + return router.fn('blocks.history', Func); + }; diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 405dba62c8..835fd68fde 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -38,6 +38,12 @@ export class BlocksServices { }); } + public async history(id: string, min: number, max: number) { + const {store} = this; + const patches = await store.history(id, min, max); + return {patches}; + } + public async edit(id: string, patches: any[]) { if (!Array.isArray(patches)) throw RpcError.validation('patches must be an array'); if (!patches.length) throw RpcError.validation('patches must not be empty'); From 2353693d60c68a0fa5c72d2e74e653a7b96434fe Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 15:50:52 +0100 Subject: [PATCH 15/27] =?UTF-8?q?style(reactive-rpc):=20=F0=9F=92=84=20run?= =?UTF-8?q?=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/blocks.spec.ts | 42 ++++++++++++--------- src/server/routes/blocks/methods/history.ts | 2 +- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/blocks.spec.ts index f1918e8dba..a9cf3c955a 100644 --- a/src/server/__tests__/blocks.spec.ts +++ b/src/server/__tests__/blocks.spec.ts @@ -362,11 +362,16 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await call('blocks.create', {id: 'my-block', patches: [{ - seq: 0, - created: Date.now(), - blob: patch1.toBinary(), - }]}); + await call('blocks.create', { + id: 'my-block', + patches: [ + { + seq: 0, + created: Date.now(), + blob: patch1.toBinary(), + }, + ], + }); await tick(11); model.api.str(['text']).ins(4, 'o'); const patch2 = model.api.flush(); @@ -374,18 +379,21 @@ describe('blocks.*', () => { age: 26, }); const patch3 = model.api.flush(); - await call('blocks.edit', {id: 'my-block', patches: [ - { - seq: 1, - created: Date.now(), - blob: patch2.toBinary(), - }, - { - seq: 2, - created: Date.now(), - blob: patch3.toBinary(), - }, - ]}); + await call('blocks.edit', { + id: 'my-block', + patches: [ + { + seq: 1, + created: Date.now(), + blob: patch2.toBinary(), + }, + { + seq: 2, + created: Date.now(), + blob: patch3.toBinary(), + }, + ], + }); const history = await call('blocks.history', {id: 'my-block', min: 0, max: 2}); expect(history).toMatchObject({ patches: [ diff --git a/src/server/routes/blocks/methods/history.ts b/src/server/routes/blocks/methods/history.ts index 9ac1d9dbe8..e25f22cda3 100644 --- a/src/server/routes/blocks/methods/history.ts +++ b/src/server/routes/blocks/methods/history.ts @@ -26,7 +26,7 @@ export const history = t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ title: 'Patches', description: 'The list of patches.', - }) + }), ); const Func = t From eb5bc7f41dc59efc880cf106a2a60201dbbcaf76 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 15:54:59 +0100 Subject: [PATCH 16/27] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20setu?= =?UTF-8?q?p=20emulated=20caller?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/blocks.spec.ts | 8 +++---- src/server/__tests__/setup.ts | 36 ++++++++++++++++++++++++++++- 2 files changed, 39 insertions(+), 5 deletions(-) diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/blocks.spec.ts index a9cf3c955a..8d88034861 100644 --- a/src/server/__tests__/blocks.spec.ts +++ b/src/server/__tests__/blocks.spec.ts @@ -356,13 +356,13 @@ describe('blocks.*', () => { describe('blocks.history', () => { test('can retrieve change history', async () => { - const {call} = setup(); + const {client} = setup(); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', }); const patch1 = model.api.flush(); - await call('blocks.create', { + await client.call('blocks.create', { id: 'my-block', patches: [ { @@ -379,7 +379,7 @@ describe('blocks.*', () => { age: 26, }); const patch3 = model.api.flush(); - await call('blocks.edit', { + await client.call('blocks.edit', { id: 'my-block', patches: [ { @@ -394,7 +394,7 @@ describe('blocks.*', () => { }, ], }); - const history = await call('blocks.history', {id: 'my-block', min: 0, max: 2}); + const history = await client.call('blocks.history', {id: 'my-block', min: 0, max: 2}); expect(history).toMatchObject({ patches: [ { diff --git a/src/server/__tests__/setup.ts b/src/server/__tests__/setup.ts index 41f9bd0ba1..9f2b7128cf 100644 --- a/src/server/__tests__/setup.ts +++ b/src/server/__tests__/setup.ts @@ -1,3 +1,9 @@ +import {Codecs} from '../../json-pack/codecs/Codecs'; +import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage, RpcMessageBatchProcessor, RpcMessageStreamProcessor, StreamingRpcClient} from '../../reactive-rpc/common'; +import {RpcCodecs} from '../../reactive-rpc/common/codec/RpcCodecs'; +import {RpcMessageCodecs} from '../../reactive-rpc/common/codec/RpcMessageCodecs'; +import {ConnectionContext} from '../../reactive-rpc/server/context'; +import {Writer} from '../../util/buffers/Writer'; import {createCaller} from '../routes'; import {Services} from '../services/Services'; @@ -5,5 +11,33 @@ export const setup = () => { const services = new Services(); const caller = createCaller(services); const call = caller.callSimple.bind(caller); - return {services, caller, call}; + const writer = new Writer(); + const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs()); + const batchProcessor = new RpcMessageBatchProcessor({caller}); + const ctx = new ConnectionContext('0.0.0.0', '', null, {}, codecs.value.cbor, codecs.value.cbor, codecs.messages.binary); + let client: StreamingRpcClient; + const streamProcessor = new RpcMessageStreamProcessor({ + caller, + send: (messages: ReactiveRpcMessage[]) => { + const encoded = ctx.msgCodec.encode(ctx.resCodec, messages); + setTimeout(() => { + const decoded = ctx.msgCodec.decodeBatch(ctx.resCodec, encoded); + client.onMessages(decoded as ReactiveRpcServerMessage[]); + }, 1); // TODO: Randomize this + }, + bufferSize: 1, // TODO: Randomize this + bufferTime: 0, // TODO: Randomize this + }); + client = new StreamingRpcClient({ + send: (messages: ReactiveRpcClientMessage[]) => { + const encoded = ctx.msgCodec.encode(ctx.reqCodec, messages); + setTimeout(() => { + const decoded = ctx.msgCodec.decodeBatch(ctx.reqCodec, encoded); + streamProcessor.onMessages(decoded as ReactiveRpcClientMessage[], {}); + }, 1); // TODO: Randomize this + }, + bufferSize: 1, // TODO: Randomize this + bufferTime: 0, // TODO: Randomize this + }); + return {services, caller, call, client}; }; From 7f029fba0f69114e6fbace15b740a66d8efcd22a Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 16:24:21 +0100 Subject: [PATCH 17/27] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20crea?= =?UTF-8?q?te=20E2E=20testing=20client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/testing/buildE2eClient.ts | 104 ++++++++++++++++++ src/server/__tests__/setup.ts | 42 ++----- src/util/Fuzzer.ts | 5 +- 3 files changed, 118 insertions(+), 33 deletions(-) create mode 100644 src/reactive-rpc/common/testing/buildE2eClient.ts diff --git a/src/reactive-rpc/common/testing/buildE2eClient.ts b/src/reactive-rpc/common/testing/buildE2eClient.ts new file mode 100644 index 0000000000..48142522f9 --- /dev/null +++ b/src/reactive-rpc/common/testing/buildE2eClient.ts @@ -0,0 +1,104 @@ +import {Codecs} from "../../../json-pack/codecs/Codecs"; +import {Fuzzer} from "../../../util/Fuzzer"; +import {Writer} from "../../../util/buffers/Writer"; +import {ConnectionContext} from "../../server/context"; +import {RpcCodecs} from "../codec/RpcCodecs"; +import {RpcMessageCodecs} from "../codec/RpcMessageCodecs"; +import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage} from "../messages"; +import {RpcClient, RpcMessageStreamProcessor, StreamingRpcClient} from "../rpc"; +import type {RpcCaller} from "../rpc/caller/RpcCaller"; + +export interface BuildE2eClientOptions { + caller: RpcCaller; + + /** + * Writer to use for encoding messages. Defaults to `new Writer(4 * 1024)`. + */ + writer?: Writer; + + /** + * Minimum and maximum size of the default buffer in kilobytes. An actual + * size will be picked randomly between these two values. Defaults to + * `[4, 4]`. Used when `writer` is not specified. + */ + writerDefaultBufferKb?: [min: number, max: number]; + + /** + * Minimum and maximum request latencies in milliseconds. An actual latency + * number will be picked randomly between these two values. Defaults to + * `[1, 1]`. + */ + requestLatency?: [min: number, max: number]; + + /** + * Minimum and maximum response latencies in milliseconds. An actual latency + * number will be picked randomly between these two values. Defaults to + * `[1, 1]`. + */ + responseLatency?: [min: number, max: number]; + + /** + * Number of messages to keep in buffer before sending them to the client. + * The actual number of messages will be picked randomly between these two + * values. Defaults to `[1, 1]`. + */ + serverBufferSize?: [min: number, max: number]; + + /** + * Time in milliseconds for how long to buffer messages before sending them + * to the client. The actual time will be picked randomly between these two + * values. Defaults to `[0, 0]`. + */ + serverBufferTime?: [min: number, max: number]; + + /** + * Number of messages to keep in buffer before sending them to the server. + * The actual number of messages will be picked randomly between these two + * values. Defaults to `[1, 1]`. + */ + clientBufferSize?: [min: number, max: number]; + + /** + * Time in milliseconds for how long to buffer messages before sending them + * to the server. The actual time will be picked randomly between these two + * values. Defaults to `[0, 0]`. + */ + clientBufferTime?: [min: number, max: number]; +} + +export const buildE2eClient = (opt: BuildE2eClientOptions) => { + const caller = opt.caller; + const writer = opt.writer ?? new Writer(Fuzzer.randomInt2(opt.writerDefaultBufferKb ?? [4, 4]) * 1024); + const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs()); + const ctx = new ConnectionContext('0.0.0.0', '', null, {}, codecs.value.cbor, codecs.value.cbor, codecs.messages.binary); + let client: StreamingRpcClient; + const streamProcessor = new RpcMessageStreamProcessor({ + caller, + send: (messages: ReactiveRpcMessage[]) => { + const encoded = ctx.msgCodec.encode(ctx.resCodec, messages); + const latency = opt.responseLatency ?? [1, 1]; + setTimeout(() => { + const decoded = ctx.msgCodec.decodeBatch(ctx.resCodec, encoded); + client.onMessages(decoded as ReactiveRpcServerMessage[]); + }, Fuzzer.randomInt(latency[0], latency[1])); + }, + bufferSize: Fuzzer.randomInt2(opt.serverBufferSize ?? [1, 1]), + bufferTime: Fuzzer.randomInt2(opt.serverBufferTime ?? [0, 0]), + }); + client = new StreamingRpcClient({ + send: (messages: ReactiveRpcClientMessage[]) => { + const encoded = ctx.msgCodec.encode(ctx.reqCodec, messages); + const latency = opt.requestLatency ?? [1, 1]; + setTimeout(() => { + const decoded = ctx.msgCodec.decodeBatch(ctx.reqCodec, encoded); + streamProcessor.onMessages(decoded as ReactiveRpcClientMessage[], {}); + }, Fuzzer.randomInt(latency[0], latency[1])); + }, + bufferSize: Fuzzer.randomInt2(opt.clientBufferSize ?? [1, 1]), + bufferTime: Fuzzer.randomInt2(opt.clientBufferTime ?? [0, 0]), + }); + const typedClient = client as T; + return { + client: typedClient, + }; +}; diff --git a/src/server/__tests__/setup.ts b/src/server/__tests__/setup.ts index 9f2b7128cf..b235a37418 100644 --- a/src/server/__tests__/setup.ts +++ b/src/server/__tests__/setup.ts @@ -1,9 +1,4 @@ -import {Codecs} from '../../json-pack/codecs/Codecs'; -import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage, RpcMessageBatchProcessor, RpcMessageStreamProcessor, StreamingRpcClient} from '../../reactive-rpc/common'; -import {RpcCodecs} from '../../reactive-rpc/common/codec/RpcCodecs'; -import {RpcMessageCodecs} from '../../reactive-rpc/common/codec/RpcMessageCodecs'; -import {ConnectionContext} from '../../reactive-rpc/server/context'; -import {Writer} from '../../util/buffers/Writer'; +import {buildE2eClient} from '../../reactive-rpc/common/testing/buildE2eClient'; import {createCaller} from '../routes'; import {Services} from '../services/Services'; @@ -11,33 +6,16 @@ export const setup = () => { const services = new Services(); const caller = createCaller(services); const call = caller.callSimple.bind(caller); - const writer = new Writer(); - const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs()); - const batchProcessor = new RpcMessageBatchProcessor({caller}); - const ctx = new ConnectionContext('0.0.0.0', '', null, {}, codecs.value.cbor, codecs.value.cbor, codecs.messages.binary); - let client: StreamingRpcClient; - const streamProcessor = new RpcMessageStreamProcessor({ + // const {client} = buildE2eClient>({caller}); + const {client} = buildE2eClient({ caller, - send: (messages: ReactiveRpcMessage[]) => { - const encoded = ctx.msgCodec.encode(ctx.resCodec, messages); - setTimeout(() => { - const decoded = ctx.msgCodec.decodeBatch(ctx.resCodec, encoded); - client.onMessages(decoded as ReactiveRpcServerMessage[]); - }, 1); // TODO: Randomize this - }, - bufferSize: 1, // TODO: Randomize this - bufferTime: 0, // TODO: Randomize this - }); - client = new StreamingRpcClient({ - send: (messages: ReactiveRpcClientMessage[]) => { - const encoded = ctx.msgCodec.encode(ctx.reqCodec, messages); - setTimeout(() => { - const decoded = ctx.msgCodec.decodeBatch(ctx.reqCodec, encoded); - streamProcessor.onMessages(decoded as ReactiveRpcClientMessage[], {}); - }, 1); // TODO: Randomize this - }, - bufferSize: 1, // TODO: Randomize this - bufferTime: 0, // TODO: Randomize this + writerDefaultBufferKb: [1, 32], + clientBufferSize: [1, 3], + clientBufferTime: [1, 10], + serverBufferSize: [1, 3], + serverBufferTime: [1, 10], + requestLatency: [1, 10], + responseLatency: [1, 10], }); return {services, caller, call, client}; }; diff --git a/src/util/Fuzzer.ts b/src/util/Fuzzer.ts index 1891e3bdd1..694149ba73 100644 --- a/src/util/Fuzzer.ts +++ b/src/util/Fuzzer.ts @@ -16,11 +16,14 @@ function xoshiro128ss(a: number, b: number, c: number, d: number) { } export class Fuzzer { - /** @deprecated */ public static randomInt(min: number, max: number): number { return Math.floor(Math.random() * (max - min + 1)) + min; } + public static randomInt2([min, max]: [min: number, max: number]): number { + return Math.floor(Math.random() * (max - min + 1)) + min; + } + /** @deprecated */ public static pick(elements: T[]): T { return elements[Math.floor(Math.random() * elements.length)]; From 340c749396d1956aeb4d80f597ab6cc6d48ee7ce Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 16:25:40 +0100 Subject: [PATCH 18/27] =?UTF-8?q?style(reactive-rpc):=20=F0=9F=92=84=20run?= =?UTF-8?q?=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/testing/buildE2eClient.ts | 30 ++++++++++++------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/src/reactive-rpc/common/testing/buildE2eClient.ts b/src/reactive-rpc/common/testing/buildE2eClient.ts index 48142522f9..1bfc4f3655 100644 --- a/src/reactive-rpc/common/testing/buildE2eClient.ts +++ b/src/reactive-rpc/common/testing/buildE2eClient.ts @@ -1,12 +1,12 @@ -import {Codecs} from "../../../json-pack/codecs/Codecs"; -import {Fuzzer} from "../../../util/Fuzzer"; -import {Writer} from "../../../util/buffers/Writer"; -import {ConnectionContext} from "../../server/context"; -import {RpcCodecs} from "../codec/RpcCodecs"; -import {RpcMessageCodecs} from "../codec/RpcMessageCodecs"; -import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage} from "../messages"; -import {RpcClient, RpcMessageStreamProcessor, StreamingRpcClient} from "../rpc"; -import type {RpcCaller} from "../rpc/caller/RpcCaller"; +import {Codecs} from '../../../json-pack/codecs/Codecs'; +import {Fuzzer} from '../../../util/Fuzzer'; +import {Writer} from '../../../util/buffers/Writer'; +import {ConnectionContext} from '../../server/context'; +import {RpcCodecs} from '../codec/RpcCodecs'; +import {RpcMessageCodecs} from '../codec/RpcMessageCodecs'; +import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage} from '../messages'; +import {RpcClient, RpcMessageStreamProcessor, StreamingRpcClient} from '../rpc'; +import type {RpcCaller} from '../rpc/caller/RpcCaller'; export interface BuildE2eClientOptions { caller: RpcCaller; @@ -70,7 +70,15 @@ export const buildE2eClient = (opt: BuildE2eClientOptions) => { const caller = opt.caller; const writer = opt.writer ?? new Writer(Fuzzer.randomInt2(opt.writerDefaultBufferKb ?? [4, 4]) * 1024); const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs()); - const ctx = new ConnectionContext('0.0.0.0', '', null, {}, codecs.value.cbor, codecs.value.cbor, codecs.messages.binary); + const ctx = new ConnectionContext( + '0.0.0.0', + '', + null, + {}, + codecs.value.cbor, + codecs.value.cbor, + codecs.messages.binary, + ); let client: StreamingRpcClient; const streamProcessor = new RpcMessageStreamProcessor({ caller, @@ -100,5 +108,5 @@ export const buildE2eClient = (opt: BuildE2eClientOptions) => { const typedClient = client as T; return { client: typedClient, - }; + }; }; From 070c4c9a7c47e99bc33c26938cc63b0cce9d0ecc Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 16:26:48 +0100 Subject: [PATCH 19/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20allo?= =?UTF-8?q?w=20to=20specify=20IP=20and=20token?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/common/testing/buildE2eClient.ts | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/reactive-rpc/common/testing/buildE2eClient.ts b/src/reactive-rpc/common/testing/buildE2eClient.ts index 1bfc4f3655..605dc7cfb4 100644 --- a/src/reactive-rpc/common/testing/buildE2eClient.ts +++ b/src/reactive-rpc/common/testing/buildE2eClient.ts @@ -64,6 +64,16 @@ export interface BuildE2eClientOptions { * values. Defaults to `[0, 0]`. */ clientBufferTime?: [min: number, max: number]; + + /** + * IP address to use for the connection. Defaults to `0.0.0.0`. + */ + ip?: string; + + /** + * Authentication token to use for the connection. Defaults to empty string. + */ + token?: string; } export const buildE2eClient = (opt: BuildE2eClientOptions) => { @@ -71,8 +81,8 @@ export const buildE2eClient = (opt: BuildE2eClientOptions) => { const writer = opt.writer ?? new Writer(Fuzzer.randomInt2(opt.writerDefaultBufferKb ?? [4, 4]) * 1024); const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs()); const ctx = new ConnectionContext( - '0.0.0.0', - '', + opt.ip ?? '0.0.0.0', + opt.ip ?? '', null, {}, codecs.value.cbor, From 33aed5c43abe6a252ba3199ae974527c51ebae3b Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 16:44:56 +0100 Subject: [PATCH 20/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20add?= =?UTF-8?q?=20type=20safety=20to=20emulated=20E2E=20client?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/testing/buildE2eClient.ts | 26 ++++++++++++++----- src/server/__tests__/setup.ts | 6 ++--- src/server/routes/index.ts | 2 +- 3 files changed, 22 insertions(+), 12 deletions(-) diff --git a/src/reactive-rpc/common/testing/buildE2eClient.ts b/src/reactive-rpc/common/testing/buildE2eClient.ts index 605dc7cfb4..b885726c50 100644 --- a/src/reactive-rpc/common/testing/buildE2eClient.ts +++ b/src/reactive-rpc/common/testing/buildE2eClient.ts @@ -5,12 +5,14 @@ import {ConnectionContext} from '../../server/context'; import {RpcCodecs} from '../codec/RpcCodecs'; import {RpcMessageCodecs} from '../codec/RpcMessageCodecs'; import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage} from '../messages'; -import {RpcClient, RpcMessageStreamProcessor, StreamingRpcClient} from '../rpc'; -import type {RpcCaller} from '../rpc/caller/RpcCaller'; +import {RpcMessageStreamProcessor, StreamingRpcClient, TypedRpcClient} from '../rpc'; +import type {FunctionStreamingType, FunctionType} from '../../../json-type/type/classes'; +import type {Observable} from 'rxjs'; +import type {ResolveType} from '../../../json-type'; +import type {TypeRouter} from '../../../json-type/system/TypeRouter'; +import type {TypeRouterCaller} from '../rpc/caller/TypeRouterCaller'; export interface BuildE2eClientOptions { - caller: RpcCaller; - /** * Writer to use for encoding messages. Defaults to `new Writer(4 * 1024)`. */ @@ -76,8 +78,7 @@ export interface BuildE2eClientOptions { token?: string; } -export const buildE2eClient = (opt: BuildE2eClientOptions) => { - const caller = opt.caller; +export const buildE2eClient = >(caller: Caller, opt: BuildE2eClientOptions) => { const writer = opt.writer ?? new Writer(Fuzzer.randomInt2(opt.writerDefaultBufferKb ?? [4, 4]) * 1024); const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs()); const ctx = new ConnectionContext( @@ -115,8 +116,19 @@ export const buildE2eClient = (opt: BuildE2eClientOptions) => { bufferSize: Fuzzer.randomInt2(opt.clientBufferSize ?? [1, 1]), bufferTime: Fuzzer.randomInt2(opt.clientBufferTime ?? [0, 0]), }); - const typedClient = client as T; + type Router = UnTypeRouterCaller; + type Routes = UnTypeRouter; + type Methods = {[K in keyof Routes]: UnwrapFunction}; + const typedClient = client as TypedRpcClient; return { client: typedClient, }; }; + +type UnTypeRouterCaller = T extends TypeRouterCaller ? R : never; +type UnTypeRouter = T extends TypeRouter ? R : never; +type UnwrapFunction = F extends FunctionType + ? (req: ResolveType) => Promise> + : F extends FunctionStreamingType + ? (req$: Observable>) => Observable> + : never; diff --git a/src/server/__tests__/setup.ts b/src/server/__tests__/setup.ts index b235a37418..c989188e38 100644 --- a/src/server/__tests__/setup.ts +++ b/src/server/__tests__/setup.ts @@ -4,11 +4,9 @@ import {Services} from '../services/Services'; export const setup = () => { const services = new Services(); - const caller = createCaller(services); + const {caller} = createCaller(services); const call = caller.callSimple.bind(caller); - // const {client} = buildE2eClient>({caller}); - const {client} = buildE2eClient({ - caller, + const {client} = buildE2eClient(caller, { writerDefaultBufferKb: [1, 32], clientBufferSize: [1, 3], clientBufferTime: [1, 10], diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts index eb1f380de1..ed967ebd72 100644 --- a/src/server/routes/index.ts +++ b/src/server/routes/index.ts @@ -25,5 +25,5 @@ export const createCaller = (services: Services) => { return RpcError.valueFrom(error); }, }); - return caller; + return {router, caller}; }; From 65829f92eea051ae08d906fe8b99bab3e5d858fc Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 16:56:41 +0100 Subject: [PATCH 21/27] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20use?= =?UTF-8?q?=20client.*=20in=20blocks.*=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/blocks.spec.ts | 49 +++++++++++++++-------------- src/server/__tests__/setup.ts | 5 +-- 2 files changed, 28 insertions(+), 26 deletions(-) diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/blocks.spec.ts index 8d88034861..2e3c5a3ee0 100644 --- a/src/server/__tests__/blocks.spec.ts +++ b/src/server/__tests__/blocks.spec.ts @@ -7,9 +7,9 @@ import {tick, until} from '../../__tests__/util'; describe('blocks.*', () => { describe('blocks.create', () => { test('can create an empty block', async () => { - const {caller} = setup(); - await caller.call('blocks.create', {id: 'my-block', patches: []}, {}); - const {block} = (await caller.call('blocks.get', {id: 'my-block'}, {})).data; + const {call} = setup(); + await call('blocks.create', {id: 'my-block', patches: []}); + const {block} = (await call('blocks.get', {id: 'my-block'})); expect(block).toMatchObject({ id: 'my-block', seq: -1, @@ -283,11 +283,11 @@ describe('blocks.*', () => { describe('blocks.listen', () => { test('can listen for block changes', async () => { - const {call, caller} = setup(); - await call('blocks.create', {id: 'my-block', patches: []}); + const {client} = setup(); + await client.call('blocks.create', {id: 'my-block', patches: []}); await tick(11); const emits: any[] = []; - caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data)); + client.call$('blocks.listen', of({id: 'my-block'})).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -295,28 +295,28 @@ describe('blocks.*', () => { const patch1 = model.api.flush(); await tick(12); expect(emits.length).toBe(0); - await call('blocks.edit', {id: 'my-block', patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}]}); + await client.call('blocks.edit', {id: 'my-block', patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}]}); await until(() => emits.length === 1); expect(emits.length).toBe(1); - expect(emits[0].data.patches.length).toBe(1); - expect(emits[0].data.patches[0].seq).toBe(0); + expect(emits[0].patches.length).toBe(1); + expect(emits[0].patches[0].seq).toBe(0); model.api.root({ text: 'Hello', }); const patch2 = model.api.flush(); await tick(12); expect(emits.length).toBe(1); - await call('blocks.edit', {id: 'my-block', patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}]}); + await client.call('blocks.edit', {id: 'my-block', patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}]}); await until(() => emits.length === 2); expect(emits.length).toBe(2); - expect(emits[1].data.patches.length).toBe(1); - expect(emits[1].data.patches[0].seq).toBe(1); + expect(emits[1].patches.length).toBe(1); + expect(emits[1].patches[0].seq).toBe(1); }); test('can subscribe before block is created', async () => { - const {call, caller} = setup(); + const {client} = setup(); const emits: any[] = []; - caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data)); + client.call$('blocks.listen', of({id: 'my-block'})).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -324,7 +324,7 @@ describe('blocks.*', () => { const patch1 = model.api.flush(); await tick(12); expect(emits.length).toBe(0); - await call('blocks.create', { + await client.call('blocks.create', { id: 'my-block', patches: [ { @@ -336,21 +336,22 @@ describe('blocks.*', () => { }); await until(() => emits.length === 1); expect(emits.length).toBe(1); - expect(emits[0].data.patches.length).toBe(1); - expect(emits[0].data.patches[0].seq).toBe(0); - expect(emits[0].data.patches[0].blob).toStrictEqual(patch1.toBinary()); + expect(emits[0].patches.length).toBe(1); + expect(emits[0].patches[0].seq).toBe(0); + expect(emits[0].patches[0].blob).toStrictEqual(patch1.toBinary()); }); test('can receive deletion events', async () => { - const {call, caller} = setup(); + const {client} = setup(); const emits: any[] = []; - caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data)); - await call('blocks.create', {id: 'my-block', patches: []}); + client.call$('blocks.listen', of({id: 'my-block'})).subscribe((data) => emits.push(data)); + await client.call('blocks.create', {id: 'my-block', patches: []}); await until(() => emits.length === 1); - expect(emits[0].data.block.seq).toBe(-1); - await call('blocks.remove', {id: 'my-block'}); + expect(emits[0].block.seq).toBe(-1); + await tick(3); + await client.call('blocks.remove', {id: 'my-block'}); await until(() => emits.length === 2); - expect(emits[1].data.deleted).toBe(true); + expect(emits[1].deleted).toBe(true); }); }); diff --git a/src/server/__tests__/setup.ts b/src/server/__tests__/setup.ts index c989188e38..5c7b519d39 100644 --- a/src/server/__tests__/setup.ts +++ b/src/server/__tests__/setup.ts @@ -5,7 +5,6 @@ import {Services} from '../services/Services'; export const setup = () => { const services = new Services(); const {caller} = createCaller(services); - const call = caller.callSimple.bind(caller); const {client} = buildE2eClient(caller, { writerDefaultBufferKb: [1, 32], clientBufferSize: [1, 3], @@ -15,5 +14,7 @@ export const setup = () => { requestLatency: [1, 10], responseLatency: [1, 10], }); - return {services, caller, call, client}; + const call = client.call.bind(client); + const call$ = client.call$.bind(client); + return {services, caller, client, call, call$}; }; From 5e119851716b521cdba9f1b9cdea6f5c216122d5 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 17:21:12 +0100 Subject: [PATCH 22/27] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20move?= =?UTF-8?q?=20all=20demo=20tests=20to=20use=20E2E=20client=20implementatio?= =?UTF-8?q?n?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/presence.spec.ts | 49 +++++++++---------- src/server/__tests__/pubsub.spec.ts | 69 ++++++++++++--------------- src/server/__tests__/util.spec.ts | 20 ++++---- 3 files changed, 63 insertions(+), 75 deletions(-) diff --git a/src/server/__tests__/presence.spec.ts b/src/server/__tests__/presence.spec.ts index 04b9cbeb11..e29011ceba 100644 --- a/src/server/__tests__/presence.spec.ts +++ b/src/server/__tests__/presence.spec.ts @@ -1,15 +1,14 @@ -import {of} from 'rxjs'; import {setup} from './setup'; import {tick, until} from '../../__tests__/util'; describe('presence', () => { test('can subscribe and receive published presence entries', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits.push(res); }); - await caller.call( + await call( 'presence.update', { room: 'my-room', @@ -18,7 +17,6 @@ describe('presence', () => { hello: 'world', }, }, - {}, ); await until(() => emits.length === 1); expect(emits[0]).toMatchObject({ @@ -37,12 +35,12 @@ describe('presence', () => { }); test('can receive an existing record when subscribing after it was created', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits.push(res); }); - await caller.call( + await call( 'presence.update', { room: 'my-room', @@ -51,12 +49,11 @@ describe('presence', () => { hello: 'world', }, }, - {}, ); await until(() => emits.length === 1); const emits2: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits2.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits2.push(res); }); await until(() => emits2.length === 1); expect(emits2[0]).toMatchObject({ @@ -75,12 +72,12 @@ describe('presence', () => { }); test('can remove existing entries', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits.push(res); }); - await caller.call( + await call( 'presence.update', { room: 'my-room', @@ -89,22 +86,21 @@ describe('presence', () => { hello: 'world', }, }, - {}, ); await until(() => emits.length === 1); - await caller.call('presence.remove', {room: 'my-room', id: 'user-1'}, {}); + await call('presence.remove', {room: 'my-room', id: 'user-1'}); await until(() => emits.length === 2); const emits2: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits2.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits2.push(res); }); await tick(50); expect(emits2.length).toBe(0); }); test('emits entry deletion messages', async () => { - const {caller} = setup(); - await caller.call( + const {call, call$} = setup(); + await call( 'presence.update', { room: 'my-room', @@ -113,13 +109,12 @@ describe('presence', () => { hello: 'world', }, }, - {}, ); const emits: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits.push(res); }); - await caller.call('presence.remove', {room: 'my-room', id: 'user-1'}, {}); + await call('presence.remove', {room: 'my-room', id: 'user-1'}); await until(() => emits.length === 2); expect(emits[1].entries[0]).toMatchObject({ id: 'user-1', diff --git a/src/server/__tests__/pubsub.spec.ts b/src/server/__tests__/pubsub.spec.ts index 888471685a..a81e224ace 100644 --- a/src/server/__tests__/pubsub.spec.ts +++ b/src/server/__tests__/pubsub.spec.ts @@ -1,4 +1,3 @@ -import {of} from 'rxjs'; import {setup} from './setup'; import {tick, until} from '../../__tests__/util'; @@ -17,106 +16,99 @@ describe('pubsub', () => { }); test('can subscribe and receive published messages', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - caller.call$('pubsub.listen', of({channel: 'my-channel'}), {}).subscribe((res) => { - emits.push(res.data.message); + call$('pubsub.listen', {channel: 'my-channel'}).subscribe((res) => { + emits.push(res.message); }); - await caller.call( + await call( 'pubsub.publish', { channel: 'my-channel', message: 'hello world', }, - {}, ); await until(() => emits.length === 1); expect(emits).toStrictEqual(['hello world']); }); test('does not receive messages after un-subscription', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - const sub = caller.call$('pubsub.listen', of({channel: 'my-channel'}), {}).subscribe((res) => { - emits.push(res.data.message); + const sub = call$('pubsub.listen', {channel: 'my-channel'}).subscribe((res) => { + emits.push(res.message); }); - await caller.call( + await call( 'pubsub.publish', { channel: 'my-channel', message: 'msg1', }, - {}, ); - await caller.call( + await call( 'pubsub.publish', { channel: 'my-channel', message: 'msg2', }, - {}, ); await until(() => emits.length === 2); sub.unsubscribe(); - await caller.call( + await tick(25); + await call( 'pubsub.publish', { channel: 'my-channel', message: 'msg3', }, - {}, ); await tick(50); - expect(emits).toStrictEqual(['msg1', 'msg2']); + expect(emits.indexOf('msg1') > -1).toBe(true); + expect(emits.indexOf('msg2') > -1).toBe(true); }); test('multiple multiple subscribers can subscribe to multiple channels', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const user1: any[] = []; const user2: any[] = []; const user3: any[] = []; - caller.call$('pubsub.listen', of({channel: 'channel-1'}), {}).subscribe((res) => { - user1.push(res.data.message); + call$('pubsub.listen', {channel: 'channel-1'}).subscribe((res) => { + user1.push(res.message); }); - const sub2 = caller.call$('pubsub.listen', of({channel: 'channel-2'}), {}).subscribe((res) => { - user2.push(res.data.message); + const sub2 = call$('pubsub.listen', {channel: 'channel-2'}).subscribe((res) => { + user2.push(res.message); }); - caller.call$('pubsub.listen', of({channel: 'channel-1'}), {}).subscribe((res) => { - user3.push(res.data.message); + call$('pubsub.listen', {channel: 'channel-1'}).subscribe((res) => { + user3.push(res.message); }); - caller.call$('pubsub.listen', of({channel: 'channel-2'}), {}).subscribe((res) => { - user3.push(res.data.message); + call$('pubsub.listen', {channel: 'channel-2'}).subscribe((res) => { + user3.push(res.message); }); - await caller.call( + await call( 'pubsub.publish', { channel: 'my-channel', message: 'hello world', }, - {}, ); await tick(50); expect(user1).toStrictEqual([]); expect(user2).toStrictEqual([]); expect(user3).toStrictEqual([]); - caller - .call( + call( 'pubsub.publish', { channel: 'channel-1', message: 'msg1', }, - {}, ) .catch(() => {}); - caller - .call( + call( 'pubsub.publish', { channel: 'channel-2', message: 'msg2', }, - {}, ) .catch(() => {}); await until(() => user1.length === 1); @@ -124,21 +116,22 @@ describe('pubsub', () => { await until(() => user3.length === 2); expect(user1).toStrictEqual(['msg1']); expect(user2).toStrictEqual(['msg2']); - expect(user3).toStrictEqual(['msg1', 'msg2']); + expect(user3.indexOf('msg1') > -1).toBe(true); + expect(user3.indexOf('msg2') > -1).toBe(true); sub2.unsubscribe(); - caller - .call( + call( 'pubsub.publish', { channel: 'channel-2', message: 'msg3', }, - {}, ) .catch(() => {}); await until(() => user3.length === 3); expect(user1).toStrictEqual(['msg1']); expect(user2).toStrictEqual(['msg2']); - expect(user3).toStrictEqual(['msg1', 'msg2', 'msg3']); + expect(user3.indexOf('msg1') > -1).toBe(true); + expect(user3.indexOf('msg2') > -1).toBe(true); + expect(user3.indexOf('msg3') > -1).toBe(true); }); }); diff --git a/src/server/__tests__/util.spec.ts b/src/server/__tests__/util.spec.ts index 69c57dc977..1c6b59e123 100644 --- a/src/server/__tests__/util.spec.ts +++ b/src/server/__tests__/util.spec.ts @@ -3,30 +3,30 @@ import {setup} from './setup'; describe('util.*', () => { describe('util.ping', () => { test('returns pong', async () => { - const {caller} = setup(); - const res = await caller.call('util.ping', {}, {}); - expect(res.data).toBe('pong'); + const {call} = setup(); + const res = await call('util.ping', {}); + expect(res).toBe('pong'); }); }); describe('util.echo', () => { test('returns strings', async () => { - const {caller} = setup(); - const res = await caller.call('util.echo', 'hello world', {}); - expect(res.data).toBe('hello world'); + const {call} = setup(); + const res = await call('util.echo', 'hello world'); + expect(res).toBe('hello world'); }); test('returns objects', async () => { - const {caller} = setup(); - const res = await caller.call('util.echo', {foo: 'bar'}, {}); - expect(res.data).toStrictEqual({foo: 'bar'}); + const {call} = setup(); + const res = await call('util.echo', {foo: 'bar'}); + expect(res).toStrictEqual({foo: 'bar'}); }); }); describe('util.info', () => { test('returns stats object', async () => { const {call} = setup(); - const res = await call('util.info', {}, {}); + const res = await call('util.info', {}); expect(res).toMatchObject({ now: expect.any(Number), stats: { From 6b28610153758da1d0b42bf11342f72381900689 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 17:21:45 +0100 Subject: [PATCH 23/27] =?UTF-8?q?style(reactive-rpc):=20=F0=9F=92=84=20run?= =?UTF-8?q?=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/testing/buildE2eClient.ts | 4 +- src/server/__tests__/blocks.spec.ts | 12 ++- src/server/__tests__/presence.spec.ts | 60 +++++------- src/server/__tests__/pubsub.spec.ts | 91 +++++++------------ 4 files changed, 67 insertions(+), 100 deletions(-) diff --git a/src/reactive-rpc/common/testing/buildE2eClient.ts b/src/reactive-rpc/common/testing/buildE2eClient.ts index b885726c50..1b9873dc0c 100644 --- a/src/reactive-rpc/common/testing/buildE2eClient.ts +++ b/src/reactive-rpc/common/testing/buildE2eClient.ts @@ -130,5 +130,5 @@ type UnTypeRouter = T extends TypeRouter ? R : never; type UnwrapFunction = F extends FunctionType ? (req: ResolveType) => Promise> : F extends FunctionStreamingType - ? (req$: Observable>) => Observable> - : never; + ? (req$: Observable>) => Observable> + : never; diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/blocks.spec.ts index 2e3c5a3ee0..f03ca995aa 100644 --- a/src/server/__tests__/blocks.spec.ts +++ b/src/server/__tests__/blocks.spec.ts @@ -9,7 +9,7 @@ describe('blocks.*', () => { test('can create an empty block', async () => { const {call} = setup(); await call('blocks.create', {id: 'my-block', patches: []}); - const {block} = (await call('blocks.get', {id: 'my-block'})); + const {block} = await call('blocks.get', {id: 'my-block'}); expect(block).toMatchObject({ id: 'my-block', seq: -1, @@ -295,7 +295,10 @@ describe('blocks.*', () => { const patch1 = model.api.flush(); await tick(12); expect(emits.length).toBe(0); - await client.call('blocks.edit', {id: 'my-block', patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}]}); + await client.call('blocks.edit', { + id: 'my-block', + patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}], + }); await until(() => emits.length === 1); expect(emits.length).toBe(1); expect(emits[0].patches.length).toBe(1); @@ -306,7 +309,10 @@ describe('blocks.*', () => { const patch2 = model.api.flush(); await tick(12); expect(emits.length).toBe(1); - await client.call('blocks.edit', {id: 'my-block', patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}]}); + await client.call('blocks.edit', { + id: 'my-block', + patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}], + }); await until(() => emits.length === 2); expect(emits.length).toBe(2); expect(emits[1].patches.length).toBe(1); diff --git a/src/server/__tests__/presence.spec.ts b/src/server/__tests__/presence.spec.ts index e29011ceba..6bd66a3b0f 100644 --- a/src/server/__tests__/presence.spec.ts +++ b/src/server/__tests__/presence.spec.ts @@ -8,16 +8,13 @@ describe('presence', () => { call$('presence.listen', {room: 'my-room'}).subscribe((res) => { emits.push(res); }); - await call( - 'presence.update', - { - room: 'my-room', - id: 'user-1', - data: { - hello: 'world', - }, + await call('presence.update', { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', }, - ); + }); await until(() => emits.length === 1); expect(emits[0]).toMatchObject({ time: expect.any(Number), @@ -40,16 +37,13 @@ describe('presence', () => { call$('presence.listen', {room: 'my-room'}).subscribe((res) => { emits.push(res); }); - await call( - 'presence.update', - { - room: 'my-room', - id: 'user-1', - data: { - hello: 'world', - }, + await call('presence.update', { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', }, - ); + }); await until(() => emits.length === 1); const emits2: any[] = []; call$('presence.listen', {room: 'my-room'}).subscribe((res) => { @@ -77,16 +71,13 @@ describe('presence', () => { call$('presence.listen', {room: 'my-room'}).subscribe((res) => { emits.push(res); }); - await call( - 'presence.update', - { - room: 'my-room', - id: 'user-1', - data: { - hello: 'world', - }, + await call('presence.update', { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', }, - ); + }); await until(() => emits.length === 1); await call('presence.remove', {room: 'my-room', id: 'user-1'}); await until(() => emits.length === 2); @@ -100,16 +91,13 @@ describe('presence', () => { test('emits entry deletion messages', async () => { const {call, call$} = setup(); - await call( - 'presence.update', - { - room: 'my-room', - id: 'user-1', - data: { - hello: 'world', - }, + await call('presence.update', { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', }, - ); + }); const emits: any[] = []; call$('presence.listen', {room: 'my-room'}).subscribe((res) => { emits.push(res); diff --git a/src/server/__tests__/pubsub.spec.ts b/src/server/__tests__/pubsub.spec.ts index a81e224ace..f2c8725440 100644 --- a/src/server/__tests__/pubsub.spec.ts +++ b/src/server/__tests__/pubsub.spec.ts @@ -21,13 +21,10 @@ describe('pubsub', () => { call$('pubsub.listen', {channel: 'my-channel'}).subscribe((res) => { emits.push(res.message); }); - await call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'hello world', - }, - ); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'hello world', + }); await until(() => emits.length === 1); expect(emits).toStrictEqual(['hello world']); }); @@ -38,30 +35,21 @@ describe('pubsub', () => { const sub = call$('pubsub.listen', {channel: 'my-channel'}).subscribe((res) => { emits.push(res.message); }); - await call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'msg1', - }, - ); - await call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'msg2', - }, - ); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'msg1', + }); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'msg2', + }); await until(() => emits.length === 2); sub.unsubscribe(); await tick(25); - await call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'msg3', - }, - ); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'msg3', + }); await tick(50); expect(emits.indexOf('msg1') > -1).toBe(true); expect(emits.indexOf('msg2') > -1).toBe(true); @@ -84,33 +72,22 @@ describe('pubsub', () => { call$('pubsub.listen', {channel: 'channel-2'}).subscribe((res) => { user3.push(res.message); }); - await call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'hello world', - }, - ); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'hello world', + }); await tick(50); expect(user1).toStrictEqual([]); expect(user2).toStrictEqual([]); expect(user3).toStrictEqual([]); - call( - 'pubsub.publish', - { - channel: 'channel-1', - message: 'msg1', - }, - ) - .catch(() => {}); - call( - 'pubsub.publish', - { - channel: 'channel-2', - message: 'msg2', - }, - ) - .catch(() => {}); + call('pubsub.publish', { + channel: 'channel-1', + message: 'msg1', + }).catch(() => {}); + call('pubsub.publish', { + channel: 'channel-2', + message: 'msg2', + }).catch(() => {}); await until(() => user1.length === 1); await until(() => user2.length === 1); await until(() => user3.length === 2); @@ -119,14 +96,10 @@ describe('pubsub', () => { expect(user3.indexOf('msg1') > -1).toBe(true); expect(user3.indexOf('msg2') > -1).toBe(true); sub2.unsubscribe(); - call( - 'pubsub.publish', - { - channel: 'channel-2', - message: 'msg3', - }, - ) - .catch(() => {}); + call('pubsub.publish', { + channel: 'channel-2', + message: 'msg3', + }).catch(() => {}); await until(() => user3.length === 3); expect(user1).toStrictEqual(['msg1']); expect(user2).toStrictEqual(['msg2']); From 34368c423806a40070cb4bc9a1b62a0ac778b22c Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 17:23:11 +0100 Subject: [PATCH 24/27] =?UTF-8?q?fix(reactive-rpc):=20=F0=9F=90=9B=20corre?= =?UTF-8?q?ct=20initialization=20after=20refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/index.ts b/src/server/index.ts index 1a27082124..5d9bff49fe 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -8,6 +8,6 @@ import type {MyCtx} from './services/types'; const app = new RpcApp({ uws: App({}), - caller: createCaller(new Services()), + caller: createCaller(new Services()).caller, }); app.startWithDefaults(); From 7b14376e1b14e93d2023b2cc3ecc4b825c19539d Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 17:30:33 +0100 Subject: [PATCH 25/27] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20remo?= =?UTF-8?q?ve=20old=20blocks=20after=20some=20period=20of=20time?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/services/blocks/BlocksServices.ts | 18 ++++++++++++++++++ src/server/services/blocks/MemoryStore.ts | 9 +++++++++ 2 files changed, 27 insertions(+) diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 835fd68fde..f73085b4cf 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -3,12 +3,15 @@ import {StorePatch} from './types'; import {RpcError, RpcErrorCodes} from '../../../reactive-rpc/common/rpc/caller'; import type {Services} from '../Services'; +const BLOCK_TTL = 1000 * 60 * 20; // 20 minutes + export class BlocksServices { protected readonly store = new MemoryStore(); constructor(protected readonly services: Services) {} public async create(id: string, patches: StorePatch[]) { + this.maybeGc(); const {store} = this; const {block} = await store.create(id, patches); const data = { @@ -45,6 +48,7 @@ export class BlocksServices { } public async edit(id: string, patches: any[]) { + this.maybeGc(); if (!Array.isArray(patches)) throw RpcError.validation('patches must be an array'); if (!patches.length) throw RpcError.validation('patches must not be empty'); const seq = patches[0].seq; @@ -69,4 +73,18 @@ export class BlocksServices { public stats() { return this.store.stats(); } + + private maybeGc(): void { + if (Math.random() < 0.05) + this.gc().catch((error) => { + // tslint:disable-next-line:no-console + console.error('Error running gc', error); + }); + } + + private async gc(): Promise { + const ts = Date.now() - BLOCK_TTL; + const {store} = this; + await store.removeOlderThan(ts); + } } diff --git a/src/server/services/blocks/MemoryStore.ts b/src/server/services/blocks/MemoryStore.ts index 1e21496f09..85c060a7d4 100644 --- a/src/server/services/blocks/MemoryStore.ts +++ b/src/server/services/blocks/MemoryStore.ts @@ -72,6 +72,10 @@ export class MemoryStore implements types.Store { public async remove(id: string): Promise { await new Promise((resolve) => setImmediate(resolve)); + this.removeSync(id); + } + + private removeSync(id: string): void { this.blocks.delete(id); this.patches.delete(id); } @@ -82,4 +86,9 @@ export class MemoryStore implements types.Store { patches: [...this.patches.values()].reduce((acc, v) => acc + v.length, 0), }; } + + public async removeOlderThan(ts: number): Promise { + await new Promise((resolve) => setImmediate(resolve)); + for (const [id, block] of this.blocks) if (block.created < ts) this.removeSync(id); + } } From b3ee10c9f17f5ff182e5341e7e8dc188167298cf Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 17:32:33 +0100 Subject: [PATCH 26/27] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20remo?= =?UTF-8?q?ve=20of()=20helper=20usage?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/blocks.spec.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/blocks.spec.ts index f03ca995aa..e038a9e1f2 100644 --- a/src/server/__tests__/blocks.spec.ts +++ b/src/server/__tests__/blocks.spec.ts @@ -1,4 +1,3 @@ -import {of} from 'rxjs'; import {Model} from '../../json-crdt'; import {RpcErrorCodes} from '../../reactive-rpc/common/rpc/caller'; import {setup} from './setup'; @@ -287,7 +286,7 @@ describe('blocks.*', () => { await client.call('blocks.create', {id: 'my-block', patches: []}); await tick(11); const emits: any[] = []; - client.call$('blocks.listen', of({id: 'my-block'})).subscribe((data) => emits.push(data)); + client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -322,7 +321,7 @@ describe('blocks.*', () => { test('can subscribe before block is created', async () => { const {client} = setup(); const emits: any[] = []; - client.call$('blocks.listen', of({id: 'my-block'})).subscribe((data) => emits.push(data)); + client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -350,7 +349,7 @@ describe('blocks.*', () => { test('can receive deletion events', async () => { const {client} = setup(); const emits: any[] = []; - client.call$('blocks.listen', of({id: 'my-block'})).subscribe((data) => emits.push(data)); + client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); await client.call('blocks.create', {id: 'my-block', patches: []}); await until(() => emits.length === 1); expect(emits[0].block.seq).toBe(-1); From 2f258abbaee2cbcf1e6b44851f948dea13f19166 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 18 Nov 2023 17:36:34 +0100 Subject: [PATCH 27/27] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20remo?= =?UTF-8?q?ve=20message=20latency=20and=20reordering?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/testing/buildE2eClient.ts | 20 ++----------------- src/server/__tests__/setup.ts | 2 -- 2 files changed, 2 insertions(+), 20 deletions(-) diff --git a/src/reactive-rpc/common/testing/buildE2eClient.ts b/src/reactive-rpc/common/testing/buildE2eClient.ts index 1b9873dc0c..68c6259ef0 100644 --- a/src/reactive-rpc/common/testing/buildE2eClient.ts +++ b/src/reactive-rpc/common/testing/buildE2eClient.ts @@ -25,20 +25,6 @@ export interface BuildE2eClientOptions { */ writerDefaultBufferKb?: [min: number, max: number]; - /** - * Minimum and maximum request latencies in milliseconds. An actual latency - * number will be picked randomly between these two values. Defaults to - * `[1, 1]`. - */ - requestLatency?: [min: number, max: number]; - - /** - * Minimum and maximum response latencies in milliseconds. An actual latency - * number will be picked randomly between these two values. Defaults to - * `[1, 1]`. - */ - responseLatency?: [min: number, max: number]; - /** * Number of messages to keep in buffer before sending them to the client. * The actual number of messages will be picked randomly between these two @@ -95,11 +81,10 @@ export const buildE2eClient = >(caller: Cal caller, send: (messages: ReactiveRpcMessage[]) => { const encoded = ctx.msgCodec.encode(ctx.resCodec, messages); - const latency = opt.responseLatency ?? [1, 1]; setTimeout(() => { const decoded = ctx.msgCodec.decodeBatch(ctx.resCodec, encoded); client.onMessages(decoded as ReactiveRpcServerMessage[]); - }, Fuzzer.randomInt(latency[0], latency[1])); + }, 1); }, bufferSize: Fuzzer.randomInt2(opt.serverBufferSize ?? [1, 1]), bufferTime: Fuzzer.randomInt2(opt.serverBufferTime ?? [0, 0]), @@ -107,11 +92,10 @@ export const buildE2eClient = >(caller: Cal client = new StreamingRpcClient({ send: (messages: ReactiveRpcClientMessage[]) => { const encoded = ctx.msgCodec.encode(ctx.reqCodec, messages); - const latency = opt.requestLatency ?? [1, 1]; setTimeout(() => { const decoded = ctx.msgCodec.decodeBatch(ctx.reqCodec, encoded); streamProcessor.onMessages(decoded as ReactiveRpcClientMessage[], {}); - }, Fuzzer.randomInt(latency[0], latency[1])); + }, 1); }, bufferSize: Fuzzer.randomInt2(opt.clientBufferSize ?? [1, 1]), bufferTime: Fuzzer.randomInt2(opt.clientBufferTime ?? [0, 0]), diff --git a/src/server/__tests__/setup.ts b/src/server/__tests__/setup.ts index 5c7b519d39..075e7eeca7 100644 --- a/src/server/__tests__/setup.ts +++ b/src/server/__tests__/setup.ts @@ -11,8 +11,6 @@ export const setup = () => { clientBufferTime: [1, 10], serverBufferSize: [1, 3], serverBufferTime: [1, 10], - requestLatency: [1, 10], - responseLatency: [1, 10], }); const call = client.call.bind(client); const call$ = client.call$.bind(client);