diff --git a/src/reactive-rpc/browser/createBinaryWsRpcClient.ts b/src/reactive-rpc/browser/createBinaryWsRpcClient.ts index 60b65b4c0b..dbe9cec554 100644 --- a/src/reactive-rpc/browser/createBinaryWsRpcClient.ts +++ b/src/reactive-rpc/browser/createBinaryWsRpcClient.ts @@ -4,7 +4,13 @@ import {RpcPersistentClient, WebSocketChannel} from '../common'; import {RpcCodec} from '../common/codec/RpcCodec'; import {BinaryRpcMessageCodec} from '../common/codec/binary'; -export const createBinaryWsRpcClient = (url: string) => { +/** + * Constructs a JSON Reactive RPC client. + * @param url RPC endpoint. + * @param token Authentication token. + * @returns An RPC client. + */ +export const createBinaryWsRpcClient = (url: string, token: string) => { const writer = new Writer(1024 * 4); const msg = new BinaryRpcMessageCodec(); const req = new CborJsonValueCodec(writer); @@ -14,7 +20,7 @@ export const createBinaryWsRpcClient = (url: string) => { channel: { newChannel: () => new WebSocketChannel({ - newSocket: () => new WebSocket(url, [codec.specifier()]), + newSocket: () => new WebSocket(url, [codec.specifier(), token]), }), }, }); diff --git a/src/reactive-rpc/browser/createJsonWsRpcClient.ts b/src/reactive-rpc/browser/createJsonWsRpcClient.ts index 3e50e4abff..ec8cdb07b7 100644 --- a/src/reactive-rpc/browser/createJsonWsRpcClient.ts +++ b/src/reactive-rpc/browser/createJsonWsRpcClient.ts @@ -4,7 +4,13 @@ import {RpcPersistentClient, WebSocketChannel} from '../common'; import {RpcCodec} from '../common/codec/RpcCodec'; import {CompactRpcMessageCodec} from '../common/codec/compact'; -export const createJsonWsRpcClient = (url: string) => { +/** + * Constructs a JSON Reactive RPC client. + * @param url RPC endpoint. + * @param token Authentication token. + * @returns An RPC client. + */ +export const createJsonWsRpcClient = (url: string, token: string) => { const writer = new Writer(1024 * 4); const msg = new CompactRpcMessageCodec(); const req = new JsonJsonValueCodec(writer); @@ -14,7 +20,7 @@ export const createJsonWsRpcClient = (url: string) => { channel: { newChannel: () => new WebSocketChannel({ - newSocket: () => new WebSocket(url, [codec.specifier()]), + newSocket: () => new WebSocket(url, [codec.specifier(), token]), }), }, }); diff --git a/src/reactive-rpc/common/codec/compact/__tests__/smoke-tests.spec.ts b/src/reactive-rpc/common/codec/compact/__tests__/smoke-tests.spec.ts index e731420499..b4aa178ded 100644 --- a/src/reactive-rpc/common/codec/compact/__tests__/smoke-tests.spec.ts +++ b/src/reactive-rpc/common/codec/compact/__tests__/smoke-tests.spec.ts @@ -84,7 +84,7 @@ for (const jsonCodec of codecList) { }); test('Response Error typed', () => { - const value = RpcError.internalErrorValue(); + const value = RpcError.internalErrorValue(null); const message = new ResponseErrorMessage(123, value); const encoded = codec.encode(jsonCodec, [message]); const decoded1 = jsonCodec.decoder.read(encoded); diff --git a/src/reactive-rpc/common/rpc/RpcMessageBatchProcessor.ts b/src/reactive-rpc/common/rpc/RpcMessageBatchProcessor.ts index 1e553c3ae7..e8f7410e32 100644 --- a/src/reactive-rpc/common/rpc/RpcMessageBatchProcessor.ts +++ b/src/reactive-rpc/common/rpc/RpcMessageBatchProcessor.ts @@ -59,7 +59,7 @@ export class RpcMessageBatchProcessor { } return result; } catch (error) { - const value = RpcError.internalErrorValue(); + const value = RpcError.internalErrorValue(error); return [new msg.ResponseErrorMessage(-1, value)]; } } diff --git a/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts b/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts index a5fa6f537b..ad33a07e5c 100644 --- a/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts +++ b/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts @@ -51,7 +51,7 @@ const setup = ( isStreaming: false, call: async () => { // tslint:disable-next-line:no-string-throw - throw RpcError.internal('this promise can throw'); + throw RpcError.internal(null, 'this promise can throw'); }, }, promiseDelay: { @@ -64,7 +64,7 @@ const setup = ( error: { isStreaming: false, call: async () => { - throw RpcError.internal('this promise can throw'); + throw RpcError.internal(null, 'this promise can throw'); }, }, emitOnceSync: { @@ -635,7 +635,7 @@ describe('pre-call checks', () => { test('fails call when pre-call checks fail', async () => { const onPreCall = jest.fn(async (request) => { - throw RpcError.internal('fail...'); + throw RpcError.internal(null, 'fail...'); }); const {server, send} = setup( {}, diff --git a/src/reactive-rpc/common/rpc/__tests__/sample-api.ts b/src/reactive-rpc/common/rpc/__tests__/sample-api.ts index 9b9ce3e7f7..5ed0e797e8 100644 --- a/src/reactive-rpc/common/rpc/__tests__/sample-api.ts +++ b/src/reactive-rpc/common/rpc/__tests__/sample-api.ts @@ -76,7 +76,7 @@ const double: IStaticRpcMethod = { const error: IStaticRpcMethod = { isStreaming: false, call: async () => { - throw new RpcError('this promise can throw', '', 0, '', undefined); + throw new RpcError('this promise can throw', '', 0, '', undefined, undefined); }, }; @@ -96,7 +96,7 @@ const streamError: IStreamingRpcMethod = { call$: () => from( (async () => { - throw RpcError.internal('Stream always errors'); + throw RpcError.internal(null, 'Stream always errors'); })(), ), }; diff --git a/src/reactive-rpc/common/rpc/caller/RpcCaller.ts b/src/reactive-rpc/common/rpc/caller/RpcCaller.ts index d1987f1ccd..3d825199ce 100644 --- a/src/reactive-rpc/common/rpc/caller/RpcCaller.ts +++ b/src/reactive-rpc/common/rpc/caller/RpcCaller.ts @@ -86,9 +86,9 @@ export class RpcCaller { * @returns Response data. */ public async call(name: string, request: unknown, ctx: Ctx): Promise> { + const method = this.getMethodStrict(name); + this.validate(method, request); try { - const method = this.getMethodStrict(name); - this.validate(method, request); const preCall = method.onPreCall; if (preCall) await preCall(ctx, request); const data = await method.call(request, ctx); diff --git a/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts b/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts index 0788db5ab5..8021c7b025 100644 --- a/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts +++ b/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts @@ -30,13 +30,11 @@ export class TypeRouterCaller, Ctx = unknown> ext public readonly req: {[K in keyof Routes]: MethodReq[K]>} = null as any; public readonly res: {[K in keyof Routes]: MethodRes[K]>} = null as any; - public get>(id: K): MethodDefinition[K]> { + public get>(id: K): MethodDefinition[K]> | undefined { let method = this.methods.get(id as string) as any; if (method) return method; const fn = this.router.routes[id as string]; - // TODO: do this check without relying on constructor and importing the `FunctionType` class. - if (!fn || !(fn instanceof FunctionType || fn instanceof FunctionStreamingType)) - throw RpcError.valueFromCode(RpcErrorCodes.METHOD_NOT_FOUND, `Type [alias = ${id as string}] is not a function.`); + if (!fn || !(fn instanceof FunctionType || fn instanceof FunctionStreamingType)) return undefined; const validator = fn.req.validator('object'); const requestSchema = (fn.req as AbstractType).getSchema(); const isRequestVoid = requestSchema.__t === 'const' && requestSchema.value === undefined; diff --git a/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts b/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts index 485b1ad9e5..35dc828ff7 100644 --- a/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts +++ b/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts @@ -46,7 +46,7 @@ describe('static calls', () => { }, }); const [, error] = await of(caller.call('test', {}, {})); - expect(error).toEqual(RpcError.internalErrorValue()); + expect(error).toEqual(RpcError.internalErrorValue(null)); }); }); @@ -95,10 +95,10 @@ describe('streaming calls', () => { }); const [, error1] = await of(caller.call('test', {}, {})); - expect(error1).toEqual(RpcError.internalErrorValue()); + expect(error1).toEqual(RpcError.internalErrorValue(null)); const [, error2] = await of(Rx.firstValueFrom(caller.call$('test', Rx.of(undefined), {}))); - expect(error2).toEqual(RpcError.internalErrorValue()); + expect(error2).toEqual(RpcError.internalErrorValue(null)); }); }); diff --git a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts index 30d2de254a..36ef63d139 100644 --- a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts +++ b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts @@ -36,16 +36,21 @@ export type RpcErrorValue = Value; export class RpcError extends Error implements IRpcError { public static from(error: unknown) { if (error instanceof RpcError) return error; - return RpcError.internal(); + return RpcError.internal(error); } - public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined): RpcError { + public static fromCode( + errno: RpcErrorCodes, + message: string = '', + meta: unknown = undefined, + originalError: unknown = undefined, + ): RpcError { const code = RpcErrorCodes[errno]; - return new RpcError(message || code, code, errno, undefined, meta || undefined); + return new RpcError(message || code, code, errno, undefined, meta || undefined, originalError); } - public static internal(message: string = 'Internal Server Error'): RpcError { - return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message); + public static internal(originalError: unknown, message: string = 'Internal Server Error'): RpcError { + return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message, undefined, originalError); } public static badRequest(): RpcError { @@ -60,7 +65,7 @@ export class RpcError extends Error implements IRpcError { return new Value(error, RpcErrorType); } - public static valueFrom(error: unknown, def = RpcError.internalErrorValue()): RpcErrorValue { + public static valueFrom(error: unknown, def = RpcError.internalErrorValue(error)): RpcErrorValue { if (error instanceof Value && error.data instanceof RpcError && error.type === RpcErrorType) return error; if (error instanceof RpcError) return RpcError.value(error); return def; @@ -70,8 +75,8 @@ export class RpcError extends Error implements IRpcError { return RpcError.value(RpcError.fromCode(errno, message)); } - public static internalErrorValue(): RpcErrorValue { - return RpcError.value(RpcError.internal()); + public static internalErrorValue(originalError: unknown): RpcErrorValue { + return RpcError.value(RpcError.internal(originalError)); } public static isRpcError(error: unknown): error is RpcError { @@ -84,6 +89,7 @@ export class RpcError extends Error implements IRpcError { public readonly errno: number, public readonly errorId: string | undefined, public readonly meta: unknown | undefined, + public readonly originalError: unknown | undefined, ) { super(message); if (message === code) this.code = undefined; diff --git a/src/reactive-rpc/common/rpc/caller/error/__tests__/RpcErrorType.spec.ts b/src/reactive-rpc/common/rpc/caller/error/__tests__/RpcErrorType.spec.ts index f5bc858aeb..3cb8b62dad 100644 --- a/src/reactive-rpc/common/rpc/caller/error/__tests__/RpcErrorType.spec.ts +++ b/src/reactive-rpc/common/rpc/caller/error/__tests__/RpcErrorType.spec.ts @@ -6,7 +6,7 @@ import {RpcErrorType} from '../RpcErrorType'; const codecs = new Codecs(new Writer(16)); test('can encode an internal error', () => { - const error = RpcError.internal(); + const error = RpcError.internal(null); const encoded = RpcErrorType.encode(codecs.json, error); // console.log(RpcErrorType.encoder(EncodingFormat.Json).toString()); const json = JSON.parse(Buffer.from(encoded).toString()); diff --git a/src/reactive-rpc/common/rpc/client/types.ts b/src/reactive-rpc/common/rpc/client/types.ts index cafe069dad..1ca8edfa37 100644 --- a/src/reactive-rpc/common/rpc/client/types.ts +++ b/src/reactive-rpc/common/rpc/client/types.ts @@ -24,4 +24,26 @@ export interface RpcClient { * @param data Static payload data. */ notify(method: string, data: undefined | unknown): void; + + // start(): void; + // stop(): void; +} + +type TypedRpcClientFn = (req: Request) => Promise; +type TypedRpcClientFn$ = (req: Observable) => Observable; +type UnPromise = T extends Promise ? U : T; +type UnObservable = T extends Observable ? U : T; +type UnwrapResponse = UnPromise>; + +export interface TypedRpcClient | TypedRpcClientFn$>> + extends RpcClient { + call$( + method: K, + data: Parameters[0] | UnObservable[0]>, + ): Observable>>; + call( + method: K, + data: Parameters[0], + ): Promise>>; + notify(method: K, data: UnObservable[0]>): void; } diff --git a/src/reactive-rpc/common/rpc/types.ts b/src/reactive-rpc/common/rpc/types.ts index d1c7a6862f..96987a643a 100644 --- a/src/reactive-rpc/common/rpc/types.ts +++ b/src/reactive-rpc/common/rpc/types.ts @@ -1,3 +1,4 @@ +export * from './client/types'; export * from './methods/types'; export * from './caller/types'; diff --git a/src/reactive-rpc/common/testing/buildE2eClient.ts b/src/reactive-rpc/common/testing/buildE2eClient.ts new file mode 100644 index 0000000000..68c6259ef0 --- /dev/null +++ b/src/reactive-rpc/common/testing/buildE2eClient.ts @@ -0,0 +1,118 @@ +import {Codecs} from '../../../json-pack/codecs/Codecs'; +import {Fuzzer} from '../../../util/Fuzzer'; +import {Writer} from '../../../util/buffers/Writer'; +import {ConnectionContext} from '../../server/context'; +import {RpcCodecs} from '../codec/RpcCodecs'; +import {RpcMessageCodecs} from '../codec/RpcMessageCodecs'; +import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage} from '../messages'; +import {RpcMessageStreamProcessor, StreamingRpcClient, TypedRpcClient} from '../rpc'; +import type {FunctionStreamingType, FunctionType} from '../../../json-type/type/classes'; +import type {Observable} from 'rxjs'; +import type {ResolveType} from '../../../json-type'; +import type {TypeRouter} from '../../../json-type/system/TypeRouter'; +import type {TypeRouterCaller} from '../rpc/caller/TypeRouterCaller'; + +export interface BuildE2eClientOptions { + /** + * Writer to use for encoding messages. Defaults to `new Writer(4 * 1024)`. + */ + writer?: Writer; + + /** + * Minimum and maximum size of the default buffer in kilobytes. An actual + * size will be picked randomly between these two values. Defaults to + * `[4, 4]`. Used when `writer` is not specified. + */ + writerDefaultBufferKb?: [min: number, max: number]; + + /** + * Number of messages to keep in buffer before sending them to the client. + * The actual number of messages will be picked randomly between these two + * values. Defaults to `[1, 1]`. + */ + serverBufferSize?: [min: number, max: number]; + + /** + * Time in milliseconds for how long to buffer messages before sending them + * to the client. The actual time will be picked randomly between these two + * values. Defaults to `[0, 0]`. + */ + serverBufferTime?: [min: number, max: number]; + + /** + * Number of messages to keep in buffer before sending them to the server. + * The actual number of messages will be picked randomly between these two + * values. Defaults to `[1, 1]`. + */ + clientBufferSize?: [min: number, max: number]; + + /** + * Time in milliseconds for how long to buffer messages before sending them + * to the server. The actual time will be picked randomly between these two + * values. Defaults to `[0, 0]`. + */ + clientBufferTime?: [min: number, max: number]; + + /** + * IP address to use for the connection. Defaults to `0.0.0.0`. + */ + ip?: string; + + /** + * Authentication token to use for the connection. Defaults to empty string. + */ + token?: string; +} + +export const buildE2eClient = >(caller: Caller, opt: BuildE2eClientOptions) => { + const writer = opt.writer ?? new Writer(Fuzzer.randomInt2(opt.writerDefaultBufferKb ?? [4, 4]) * 1024); + const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs()); + const ctx = new ConnectionContext( + opt.ip ?? '0.0.0.0', + opt.ip ?? '', + null, + {}, + codecs.value.cbor, + codecs.value.cbor, + codecs.messages.binary, + ); + let client: StreamingRpcClient; + const streamProcessor = new RpcMessageStreamProcessor({ + caller, + send: (messages: ReactiveRpcMessage[]) => { + const encoded = ctx.msgCodec.encode(ctx.resCodec, messages); + setTimeout(() => { + const decoded = ctx.msgCodec.decodeBatch(ctx.resCodec, encoded); + client.onMessages(decoded as ReactiveRpcServerMessage[]); + }, 1); + }, + bufferSize: Fuzzer.randomInt2(opt.serverBufferSize ?? [1, 1]), + bufferTime: Fuzzer.randomInt2(opt.serverBufferTime ?? [0, 0]), + }); + client = new StreamingRpcClient({ + send: (messages: ReactiveRpcClientMessage[]) => { + const encoded = ctx.msgCodec.encode(ctx.reqCodec, messages); + setTimeout(() => { + const decoded = ctx.msgCodec.decodeBatch(ctx.reqCodec, encoded); + streamProcessor.onMessages(decoded as ReactiveRpcClientMessage[], {}); + }, 1); + }, + bufferSize: Fuzzer.randomInt2(opt.clientBufferSize ?? [1, 1]), + bufferTime: Fuzzer.randomInt2(opt.clientBufferTime ?? [0, 0]), + }); + type Router = UnTypeRouterCaller; + type Routes = UnTypeRouter; + type Methods = {[K in keyof Routes]: UnwrapFunction}; + const typedClient = client as TypedRpcClient; + return { + client: typedClient, + }; +}; + +type UnTypeRouterCaller = T extends TypeRouterCaller ? R : never; +type UnTypeRouter = T extends TypeRouter ? R : never; +type UnwrapFunction = F extends FunctionType + ? (req: ResolveType) => Promise> + : F extends FunctionStreamingType + ? (req$: Observable>) => Observable> + : never; diff --git a/src/reactive-rpc/server/context.ts b/src/reactive-rpc/server/context.ts index 06a2db7e00..d4f7a6618a 100644 --- a/src/reactive-rpc/server/context.ts +++ b/src/reactive-rpc/server/context.ts @@ -7,9 +7,8 @@ import type {RpcApp} from './uws/RpcApp'; import type {HttpRequest, HttpResponse} from './uws/types'; import type {RpcCodecs} from '../common/codec/RpcCodecs'; -const X_AUTH_PARAM = 'X-Authorization='; -const X_AUTH_PARAM_LENGTH = X_AUTH_PARAM.length; -const CODECS_REGEX = /rpc.(\w{0,32})\.(\w{0,32})\.(\w{0,32})(?:\-(\w{0,32}))?/; +const REGEX_AUTH_TOKEN_SPECIFIER = /tkn\.([a-zA-Z0-9\-_]+)(?:[^a-zA-Z0-9\-_]|$)/; +const REGEX_CODECS_SPECIFIER = /rpc\.(\w{0,32})\.(\w{0,32})\.(\w{0,32})(?:\-(\w{0,32}))?/; export class ConnectionContext> { private static findIp(req: HttpRequest, res: HttpResponse): string { @@ -20,14 +19,37 @@ export class ConnectionContext> { ); } - private static findToken(req: HttpRequest, params: string[] | null): string { - let token: string = req.getHeader('authorization') || ''; - if (!token) { - const query = req.getQuery(); - const params = new URLSearchParams(query); - token = params.get('access_token') || ''; - if (!token) token = params.get('token') || ''; - } + private static findTokenInText(text: string): string { + const match = REGEX_AUTH_TOKEN_SPECIFIER.exec(text); + if (!match) return ''; + return match[1] || ''; + } + + /** + * Looks for an authentication token in the following places: + * + * 1. The `Authorization` header. + * 2. The URI query parameters. + * 3. The `Cookie` header. + * 4. The `Sec-Websocket-Protocol` header. + * + * @param req HTTP request + * @returns Authentication token, if any. + */ + private static findToken(req: HttpRequest): string { + let token: string = ''; + let text: string = ''; + text = req.getHeader('authorization'); + if (text) token = ConnectionContext.findTokenInText(text); + if (token) return token; + text = req.getQuery(); + if (text) token = ConnectionContext.findTokenInText(text); + if (token) return token; + text = req.getHeader('cookie'); + if (text) token = ConnectionContext.findTokenInText(text); + if (token) return token; + text = req.getHeader('sec-websocket-protocol'); + if (text) token = ConnectionContext.findTokenInText(text); return token; } @@ -38,7 +60,7 @@ export class ConnectionContext> { app: RpcApp, ): ConnectionContext { const ip = ConnectionContext.findIp(req, res); - const token: string = ConnectionContext.findToken(req, params); + const token: string = ConnectionContext.findToken(req); const codecs = app.codecs; const valueCodecs = codecs.value; const ctx = new ConnectionContext( @@ -64,21 +86,7 @@ export class ConnectionContext> { app: RpcApp, ): ConnectionContext { const ip = ConnectionContext.findIp(req, res); - let token: string = ConnectionContext.findToken(req, params); - if (!token && secWebSocketProtocol) { - const protocols = secWebSocketProtocol.split(','); - const length = protocols.length; - for (let i = 0; i < length; i++) { - let protocol = protocols[i].trim(); - if (protocol.startsWith(X_AUTH_PARAM)) { - protocol = protocol.slice(X_AUTH_PARAM_LENGTH); - if (protocol) { - token = Buffer.from(protocol, 'base64').toString(); - break; - } - } - } - } + const token: string = ConnectionContext.findToken(req); const codecs = app.codecs; const valueCodecs = codecs.value; const ctx = new ConnectionContext( @@ -114,7 +122,7 @@ export class ConnectionContext> { * - `rpc.json2.verbose.json` for JSON-RPC 2.0 with verbose messages encoded as JSON. */ public setCodecs(specifier: string, codecs: RpcCodecs): void { - const match = CODECS_REGEX.exec(specifier); + const match = REGEX_CODECS_SPECIFIER.exec(specifier); if (!match) return; const [, protocol, messageFormat, request, response] = match; switch (protocol) { @@ -174,8 +182,11 @@ export class ConnectionContext> { let running = 0; res.onData((ab, isLast) => { running += ab.byteLength; - if (running > max) res.end('too large'); - // Last `ab` does not need to be copied. + if (running > max) { + res.aborted = true; + res.end('too large'); + } + // Last `ab` does not need to be copied, as per docs. if (isLast) list.push(new Uint8Array(ab)), resolve(list); else list.push(copy(new Uint8Array(ab))); }); diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index dd47fa0ec6..de1dd40e3d 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -5,22 +5,19 @@ import {RpcError, RpcErrorCodes, RpcErrorType} from '../../common/rpc/caller/err import {ConnectionContext} from '../context'; import {RpcMessageCodecs} from '../../common/codec/RpcMessageCodecs'; import {Value} from '../../common/messages/Value'; -import {EncodingFormat} from '../../../json-pack/constants'; -import {RpcMessageFormat} from '../../common/codec/constants'; import {RpcCodecs} from '../../common/codec/RpcCodecs'; import {Codecs} from '../../../json-pack/codecs/Codecs'; +import {Writer} from '../../../util/buffers/Writer'; +import {copy} from '../../../util/buffers/copy'; import {type ReactiveRpcMessage, RpcMessageStreamProcessor, ReactiveRpcClientMessage} from '../../common'; import type * as types from './types'; import type {RouteHandler} from './types'; import type {RpcCaller} from '../../common/rpc/caller/RpcCaller'; import type {JsonValueCodec} from '../../../json-pack/codecs/types'; -import {Writer} from '../../../util/buffers/Writer'; const HDR_BAD_REQUEST = Buffer.from('400 Bad Request', 'utf8'); const HDR_NOT_FOUND = Buffer.from('404 Not Found', 'utf8'); -const HDR_INTERNAL_SERVER_ERROR = Buffer.from('500 Internal Server Error', 'utf8'); const ERR_NOT_FOUND = RpcError.fromCode(RpcErrorCodes.NOT_FOUND, 'Not Found'); -const ERR_INTERNAL = RpcError.internal(); const noop = (x: any) => {}; @@ -121,16 +118,23 @@ export class RpcApp { if (res.aborted) return; const messageCodec = ctx.msgCodec; const incomingMessages = messageCodec.decodeBatch(ctx.reqCodec, bodyUint8); - const outgoingMessages = await this.batchProcessor.onBatch(incomingMessages as IncomingBatchMessage[], ctx); - if (res.aborted) return; - const resCodec = ctx.resCodec; - messageCodec.encodeBatch(resCodec, outgoingMessages); - const buf = resCodec.encoder.writer.flush(); - if (res.aborted) return; - res.end(buf); - } catch (err: any) { - if (typeof err === 'object' && err) if (err.message === 'Invalid JSON') throw RpcError.badRequest(); - throw RpcError.from(err); + try { + const outgoingMessages = await this.batchProcessor.onBatch(incomingMessages as IncomingBatchMessage[], ctx); + if (res.aborted) return; + const resCodec = ctx.resCodec; + messageCodec.encodeBatch(resCodec, outgoingMessages); + const buf = resCodec.encoder.writer.flush(); + if (res.aborted) return; + res.end(buf); + } catch (error) { + const logger = this.options.logger ?? console; + logger.error('HTTP_RPC_PROCESSING', error, {messages: incomingMessages}); + throw RpcError.internal(error); + } + } catch (error) { + if (typeof error === 'object' && error) + if ((error as any).message === 'Invalid JSON') throw RpcError.badRequest(); + throw RpcError.from(error); } }); return this; @@ -139,7 +143,9 @@ export class RpcApp { public enableWsRpc(path: string = '/rpc'): this { const maxBackpressure = 4 * 1024 * 1024; const augmentContext = this.options.augmentContext ?? noop; - const logger = this.options.logger ?? console; + const options = this.options; + const logger = options.logger ?? console; + const caller = options.caller; this.app.ws(path, { idleTimeout: 0, maxPayloadLength: 4 * 1024 * 1024, @@ -153,42 +159,54 @@ export class RpcApp { res.upgrade({ctx}, secWebSocketKey, secWebSocketProtocol, secWebSocketExtensions, context); }, open: (ws_: types.WebSocket) => { - const ws = ws_ as types.RpcWebSocket; - const ctx = ws.ctx; - const resCodec = ctx.resCodec; - const msgCodec = ctx.msgCodec; - const encoder = resCodec.encoder; - ws.rpc = new RpcMessageStreamProcessor({ - caller: this.options.caller, - send: (messages: ReactiveRpcMessage[]) => { - if (ws.getBufferedAmount() > maxBackpressure) return; - const writer = encoder.writer; - writer.reset(); - msgCodec.encodeBatch(resCodec, messages); - const encoded = writer.flush(); - ws.send(encoded, true, false); - }, - bufferSize: 1, - bufferTime: 0, - }); + try { + const ws = ws_ as types.RpcWebSocket; + const ctx = ws.ctx; + const resCodec = ctx.resCodec; + const msgCodec = ctx.msgCodec; + const encoder = resCodec.encoder; + ws.rpc = new RpcMessageStreamProcessor({ + caller, + send: (messages: ReactiveRpcMessage[]) => { + try { + if (ws.getBufferedAmount() > maxBackpressure) return; + const writer = encoder.writer; + writer.reset(); + msgCodec.encodeBatch(resCodec, messages); + const encoded = writer.flush(); + ws.send(encoded, true, false); + } catch (error) { + logger.error('WS_SEND', error, {messages}); + } + }, + bufferSize: 1, + bufferTime: 0, + }); + } catch (error) { + logger.error('RX_WS_OPEN', error); + } }, message: (ws_: types.WebSocket, buf: ArrayBuffer, isBinary: boolean) => { - const ws = ws_ as types.RpcWebSocket; - const ctx = ws.ctx; - const reqCodec = ctx.reqCodec; - const msgCodec = ctx.msgCodec; - const uint8 = new Uint8Array(buf); - const rpc = ws.rpc!; try { - const messages = msgCodec.decodeBatch(reqCodec, uint8) as ReactiveRpcClientMessage[]; + const ws = ws_ as types.RpcWebSocket; + const ctx = ws.ctx; + const reqCodec = ctx.reqCodec; + const msgCodec = ctx.msgCodec; + const uint8 = copy(new Uint8Array(buf)); + const rpc = ws.rpc!; try { - rpc.onMessages(messages, ctx); + const messages = msgCodec.decodeBatch(reqCodec, uint8) as ReactiveRpcClientMessage[]; + try { + rpc.onMessages(messages, ctx); + } catch (error) { + logger.error('RX_RPC_PROCESSING', error, messages); + return; + } } catch (error) { - logger.error('RX_RPC_PROCESSING_ERROR', error, messages); - return; + logger.error('RX_RPC_DECODING', error, {codec: reqCodec.id, buf: Buffer.from(uint8).toString()}); } } catch (error) { - logger.error('RX_RPC_DECODING_ERROR', error, {codec: reqCodec.id, buf: Buffer.from(uint8).toString()}); + logger.error('RX_WS_MESSAGE', error); } }, close: (ws_: types.WebSocket, code: number, message: ArrayBuffer) => { @@ -207,49 +225,40 @@ export class RpcApp { const augmentContext = options.augmentContext ?? noop; const logger = options.logger ?? console; this.app.any('/*', async (res: types.HttpResponse, req: types.HttpRequest) => { - res.onAborted(() => { - res.aborted = true; - }); - const method = req.getMethod(); - const url = req.getUrl(); try { - const match = matcher(method + url) as undefined | Match; - if (!match) { - res.cork(() => { - res.writeStatus(HDR_NOT_FOUND); - res.end(RpcErrorType.encode(responseCodec, ERR_NOT_FOUND)); - }); - return; - } - const handler = match.data as RouteHandler; - const params = match.params; - const ctx = ConnectionContext.fromReqRes(req, res, params, this) as Ctx; - responseCodec = ctx.resCodec; - augmentContext(ctx); - await handler(ctx); - } catch (err) { - if (err instanceof RpcError) { + res.onAborted(() => { + res.aborted = true; + }); + const method = req.getMethod(); + const url = req.getUrl(); + try { + const match = matcher(method + url) as undefined | Match; + if (!match) { + res.cork(() => { + res.writeStatus(HDR_NOT_FOUND); + res.end(RpcErrorType.encode(responseCodec, ERR_NOT_FOUND)); + }); + return; + } + const handler = match.data as RouteHandler; + const params = match.params; + const ctx = ConnectionContext.fromReqRes(req, res, params, this) as Ctx; + responseCodec = ctx.resCodec; + augmentContext(ctx); + await handler(ctx); + } catch (err) { + if (err instanceof Value) err = err.data; + if (!(err instanceof RpcError)) err = RpcError.from(err); const error = err; + if (error.errno === RpcErrorCodes.INTERNAL_ERROR) { + logger.error('UWS_ROUTER_INTERNAL_ERROR', error, {originalError: error.originalError ?? null}); + } res.cork(() => { res.writeStatus(HDR_BAD_REQUEST); res.end(RpcErrorType.encode(responseCodec, error)); }); - return; - } - if (err instanceof Value && err.data instanceof RpcError) { - const error = err.data; - res.cork(() => { - res.writeStatus(HDR_BAD_REQUEST); - res.end(RpcErrorType.encode(responseCodec, error)); - }); - return; } - logger.error('UWS_ROUTER_INTERNAL_ERROR', err); - res.cork(() => { - res.writeStatus(HDR_INTERNAL_SERVER_ERROR); - res.end(RpcErrorType.encode(responseCodec, ERR_INTERNAL)); - }); - } + } catch {} }); } @@ -267,7 +276,7 @@ export class RpcApp { if (token) { logger.log({msg: 'SERVER_STARTED', url: `http://localhost:${port}`}); } else { - logger.error(`Failed to listen on ${port} port.`); + logger.error('SERVER_START', new Error(`Failed to listen on ${port} port.`)); } }); } diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/blocks.spec.ts index 28157d0733..e038a9e1f2 100644 --- a/src/server/__tests__/blocks.spec.ts +++ b/src/server/__tests__/blocks.spec.ts @@ -1,16 +1,14 @@ -import {of} from 'rxjs'; import {Model} from '../../json-crdt'; -import {Value} from '../../reactive-rpc/common/messages/Value'; -import {RpcError, RpcErrorCodes} from '../../reactive-rpc/common/rpc/caller'; +import {RpcErrorCodes} from '../../reactive-rpc/common/rpc/caller'; import {setup} from './setup'; import {tick, until} from '../../__tests__/util'; describe('blocks.*', () => { describe('blocks.create', () => { test('can create an empty block', async () => { - const {caller} = setup(); - await caller.call('blocks.create', {id: 'my-block', patches: []}, {}); - const {block} = (await caller.call('blocks.get', {id: 'my-block'}, {})).data; + const {call} = setup(); + await call('blocks.create', {id: 'my-block', patches: []}); + const {block} = await call('blocks.get', {id: 'my-block'}); expect(block).toMatchObject({ id: 'my-block', seq: -1, @@ -284,11 +282,11 @@ describe('blocks.*', () => { describe('blocks.listen', () => { test('can listen for block changes', async () => { - const {call, caller} = setup(); - await call('blocks.create', {id: 'my-block', patches: []}); + const {client} = setup(); + await client.call('blocks.create', {id: 'my-block', patches: []}); await tick(11); const emits: any[] = []; - caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data)); + client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -296,28 +294,34 @@ describe('blocks.*', () => { const patch1 = model.api.flush(); await tick(12); expect(emits.length).toBe(0); - await call('blocks.edit', {id: 'my-block', patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}]}); + await client.call('blocks.edit', { + id: 'my-block', + patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}], + }); await until(() => emits.length === 1); expect(emits.length).toBe(1); - expect(emits[0].data.patches.length).toBe(1); - expect(emits[0].data.patches[0].seq).toBe(0); + expect(emits[0].patches.length).toBe(1); + expect(emits[0].patches[0].seq).toBe(0); model.api.root({ text: 'Hello', }); const patch2 = model.api.flush(); await tick(12); expect(emits.length).toBe(1); - await call('blocks.edit', {id: 'my-block', patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}]}); + await client.call('blocks.edit', { + id: 'my-block', + patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}], + }); await until(() => emits.length === 2); expect(emits.length).toBe(2); - expect(emits[1].data.patches.length).toBe(1); - expect(emits[1].data.patches[0].seq).toBe(1); + expect(emits[1].patches.length).toBe(1); + expect(emits[1].patches[0].seq).toBe(1); }); test('can subscribe before block is created', async () => { - const {call, caller} = setup(); + const {client} = setup(); const emits: any[] = []; - caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data)); + client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -325,7 +329,7 @@ describe('blocks.*', () => { const patch1 = model.api.flush(); await tick(12); expect(emits.length).toBe(0); - await call('blocks.create', { + await client.call('blocks.create', { id: 'my-block', patches: [ { @@ -337,21 +341,85 @@ describe('blocks.*', () => { }); await until(() => emits.length === 1); expect(emits.length).toBe(1); - expect(emits[0].data.patches.length).toBe(1); - expect(emits[0].data.patches[0].seq).toBe(0); - expect(emits[0].data.patches[0].blob).toStrictEqual(patch1.toBinary()); + expect(emits[0].patches.length).toBe(1); + expect(emits[0].patches[0].seq).toBe(0); + expect(emits[0].patches[0].blob).toStrictEqual(patch1.toBinary()); }); test('can receive deletion events', async () => { - const {call, caller} = setup(); + const {client} = setup(); const emits: any[] = []; - caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data)); - await call('blocks.create', {id: 'my-block', patches: []}); + client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); + await client.call('blocks.create', {id: 'my-block', patches: []}); await until(() => emits.length === 1); - expect(emits[0].data.block.seq).toBe(-1); - await call('blocks.remove', {id: 'my-block'}); + expect(emits[0].block.seq).toBe(-1); + await tick(3); + await client.call('blocks.remove', {id: 'my-block'}); await until(() => emits.length === 2); - expect(emits[1].data.deleted).toBe(true); + expect(emits[1].deleted).toBe(true); + }); + }); + + describe('blocks.history', () => { + test('can retrieve change history', async () => { + const {client} = setup(); + const model = Model.withLogicalClock(); + model.api.root({ + text: 'Hell', + }); + const patch1 = model.api.flush(); + await client.call('blocks.create', { + id: 'my-block', + patches: [ + { + seq: 0, + created: Date.now(), + blob: patch1.toBinary(), + }, + ], + }); + await tick(11); + model.api.str(['text']).ins(4, 'o'); + const patch2 = model.api.flush(); + model.api.obj([]).set({ + age: 26, + }); + const patch3 = model.api.flush(); + await client.call('blocks.edit', { + id: 'my-block', + patches: [ + { + seq: 1, + created: Date.now(), + blob: patch2.toBinary(), + }, + { + seq: 2, + created: Date.now(), + blob: patch3.toBinary(), + }, + ], + }); + const history = await client.call('blocks.history', {id: 'my-block', min: 0, max: 2}); + expect(history).toMatchObject({ + patches: [ + { + seq: 0, + created: expect.any(Number), + blob: patch1.toBinary(), + }, + { + seq: 1, + created: expect.any(Number), + blob: patch2.toBinary(), + }, + { + seq: 2, + created: expect.any(Number), + blob: patch3.toBinary(), + }, + ], + }); }); }); }); diff --git a/src/server/__tests__/presence.spec.ts b/src/server/__tests__/presence.spec.ts index 04b9cbeb11..6bd66a3b0f 100644 --- a/src/server/__tests__/presence.spec.ts +++ b/src/server/__tests__/presence.spec.ts @@ -1,25 +1,20 @@ -import {of} from 'rxjs'; import {setup} from './setup'; import {tick, until} from '../../__tests__/util'; describe('presence', () => { test('can subscribe and receive published presence entries', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits.push(res); }); - await caller.call( - 'presence.update', - { - room: 'my-room', - id: 'user-1', - data: { - hello: 'world', - }, + await call('presence.update', { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', }, - {}, - ); + }); await until(() => emits.length === 1); expect(emits[0]).toMatchObject({ time: expect.any(Number), @@ -37,26 +32,22 @@ describe('presence', () => { }); test('can receive an existing record when subscribing after it was created', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits.push(res); }); - await caller.call( - 'presence.update', - { - room: 'my-room', - id: 'user-1', - data: { - hello: 'world', - }, + await call('presence.update', { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', }, - {}, - ); + }); await until(() => emits.length === 1); const emits2: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits2.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits2.push(res); }); await until(() => emits2.length === 1); expect(emits2[0]).toMatchObject({ @@ -75,51 +66,43 @@ describe('presence', () => { }); test('can remove existing entries', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits.push(res); }); - await caller.call( - 'presence.update', - { - room: 'my-room', - id: 'user-1', - data: { - hello: 'world', - }, + await call('presence.update', { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', }, - {}, - ); + }); await until(() => emits.length === 1); - await caller.call('presence.remove', {room: 'my-room', id: 'user-1'}, {}); + await call('presence.remove', {room: 'my-room', id: 'user-1'}); await until(() => emits.length === 2); const emits2: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits2.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits2.push(res); }); await tick(50); expect(emits2.length).toBe(0); }); test('emits entry deletion messages', async () => { - const {caller} = setup(); - await caller.call( - 'presence.update', - { - room: 'my-room', - id: 'user-1', - data: { - hello: 'world', - }, + const {call, call$} = setup(); + await call('presence.update', { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', }, - {}, - ); + }); const emits: any[] = []; - caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { - emits.push(res.data); + call$('presence.listen', {room: 'my-room'}).subscribe((res) => { + emits.push(res); }); - await caller.call('presence.remove', {room: 'my-room', id: 'user-1'}, {}); + await call('presence.remove', {room: 'my-room', id: 'user-1'}); await until(() => emits.length === 2); expect(emits[1].entries[0]).toMatchObject({ id: 'user-1', diff --git a/src/server/__tests__/pubsub.spec.ts b/src/server/__tests__/pubsub.spec.ts index 888471685a..f2c8725440 100644 --- a/src/server/__tests__/pubsub.spec.ts +++ b/src/server/__tests__/pubsub.spec.ts @@ -1,4 +1,3 @@ -import {of} from 'rxjs'; import {setup} from './setup'; import {tick, until} from '../../__tests__/util'; @@ -17,128 +16,95 @@ describe('pubsub', () => { }); test('can subscribe and receive published messages', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - caller.call$('pubsub.listen', of({channel: 'my-channel'}), {}).subscribe((res) => { - emits.push(res.data.message); + call$('pubsub.listen', {channel: 'my-channel'}).subscribe((res) => { + emits.push(res.message); + }); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'hello world', }); - await caller.call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'hello world', - }, - {}, - ); await until(() => emits.length === 1); expect(emits).toStrictEqual(['hello world']); }); test('does not receive messages after un-subscription', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const emits: any[] = []; - const sub = caller.call$('pubsub.listen', of({channel: 'my-channel'}), {}).subscribe((res) => { - emits.push(res.data.message); + const sub = call$('pubsub.listen', {channel: 'my-channel'}).subscribe((res) => { + emits.push(res.message); + }); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'msg1', + }); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'msg2', }); - await caller.call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'msg1', - }, - {}, - ); - await caller.call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'msg2', - }, - {}, - ); await until(() => emits.length === 2); sub.unsubscribe(); - await caller.call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'msg3', - }, - {}, - ); + await tick(25); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'msg3', + }); await tick(50); - expect(emits).toStrictEqual(['msg1', 'msg2']); + expect(emits.indexOf('msg1') > -1).toBe(true); + expect(emits.indexOf('msg2') > -1).toBe(true); }); test('multiple multiple subscribers can subscribe to multiple channels', async () => { - const {caller} = setup(); + const {call, call$} = setup(); const user1: any[] = []; const user2: any[] = []; const user3: any[] = []; - caller.call$('pubsub.listen', of({channel: 'channel-1'}), {}).subscribe((res) => { - user1.push(res.data.message); + call$('pubsub.listen', {channel: 'channel-1'}).subscribe((res) => { + user1.push(res.message); }); - const sub2 = caller.call$('pubsub.listen', of({channel: 'channel-2'}), {}).subscribe((res) => { - user2.push(res.data.message); + const sub2 = call$('pubsub.listen', {channel: 'channel-2'}).subscribe((res) => { + user2.push(res.message); }); - caller.call$('pubsub.listen', of({channel: 'channel-1'}), {}).subscribe((res) => { - user3.push(res.data.message); + call$('pubsub.listen', {channel: 'channel-1'}).subscribe((res) => { + user3.push(res.message); }); - caller.call$('pubsub.listen', of({channel: 'channel-2'}), {}).subscribe((res) => { - user3.push(res.data.message); + call$('pubsub.listen', {channel: 'channel-2'}).subscribe((res) => { + user3.push(res.message); + }); + await call('pubsub.publish', { + channel: 'my-channel', + message: 'hello world', }); - await caller.call( - 'pubsub.publish', - { - channel: 'my-channel', - message: 'hello world', - }, - {}, - ); await tick(50); expect(user1).toStrictEqual([]); expect(user2).toStrictEqual([]); expect(user3).toStrictEqual([]); - caller - .call( - 'pubsub.publish', - { - channel: 'channel-1', - message: 'msg1', - }, - {}, - ) - .catch(() => {}); - caller - .call( - 'pubsub.publish', - { - channel: 'channel-2', - message: 'msg2', - }, - {}, - ) - .catch(() => {}); + call('pubsub.publish', { + channel: 'channel-1', + message: 'msg1', + }).catch(() => {}); + call('pubsub.publish', { + channel: 'channel-2', + message: 'msg2', + }).catch(() => {}); await until(() => user1.length === 1); await until(() => user2.length === 1); await until(() => user3.length === 2); expect(user1).toStrictEqual(['msg1']); expect(user2).toStrictEqual(['msg2']); - expect(user3).toStrictEqual(['msg1', 'msg2']); + expect(user3.indexOf('msg1') > -1).toBe(true); + expect(user3.indexOf('msg2') > -1).toBe(true); sub2.unsubscribe(); - caller - .call( - 'pubsub.publish', - { - channel: 'channel-2', - message: 'msg3', - }, - {}, - ) - .catch(() => {}); + call('pubsub.publish', { + channel: 'channel-2', + message: 'msg3', + }).catch(() => {}); await until(() => user3.length === 3); expect(user1).toStrictEqual(['msg1']); expect(user2).toStrictEqual(['msg2']); - expect(user3).toStrictEqual(['msg1', 'msg2', 'msg3']); + expect(user3.indexOf('msg1') > -1).toBe(true); + expect(user3.indexOf('msg2') > -1).toBe(true); + expect(user3.indexOf('msg3') > -1).toBe(true); }); }); diff --git a/src/server/__tests__/setup.ts b/src/server/__tests__/setup.ts index 41f9bd0ba1..075e7eeca7 100644 --- a/src/server/__tests__/setup.ts +++ b/src/server/__tests__/setup.ts @@ -1,9 +1,18 @@ +import {buildE2eClient} from '../../reactive-rpc/common/testing/buildE2eClient'; import {createCaller} from '../routes'; import {Services} from '../services/Services'; export const setup = () => { const services = new Services(); - const caller = createCaller(services); - const call = caller.callSimple.bind(caller); - return {services, caller, call}; + const {caller} = createCaller(services); + const {client} = buildE2eClient(caller, { + writerDefaultBufferKb: [1, 32], + clientBufferSize: [1, 3], + clientBufferTime: [1, 10], + serverBufferSize: [1, 3], + serverBufferTime: [1, 10], + }); + const call = client.call.bind(client); + const call$ = client.call$.bind(client); + return {services, caller, client, call, call$}; }; diff --git a/src/server/__tests__/util.spec.ts b/src/server/__tests__/util.spec.ts index 69c57dc977..1c6b59e123 100644 --- a/src/server/__tests__/util.spec.ts +++ b/src/server/__tests__/util.spec.ts @@ -3,30 +3,30 @@ import {setup} from './setup'; describe('util.*', () => { describe('util.ping', () => { test('returns pong', async () => { - const {caller} = setup(); - const res = await caller.call('util.ping', {}, {}); - expect(res.data).toBe('pong'); + const {call} = setup(); + const res = await call('util.ping', {}); + expect(res).toBe('pong'); }); }); describe('util.echo', () => { test('returns strings', async () => { - const {caller} = setup(); - const res = await caller.call('util.echo', 'hello world', {}); - expect(res.data).toBe('hello world'); + const {call} = setup(); + const res = await call('util.echo', 'hello world'); + expect(res).toBe('hello world'); }); test('returns objects', async () => { - const {caller} = setup(); - const res = await caller.call('util.echo', {foo: 'bar'}, {}); - expect(res.data).toStrictEqual({foo: 'bar'}); + const {call} = setup(); + const res = await call('util.echo', {foo: 'bar'}); + expect(res).toStrictEqual({foo: 'bar'}); }); }); describe('util.info', () => { test('returns stats object', async () => { const {call} = setup(); - const res = await call('util.info', {}, {}); + const res = await call('util.info', {}); expect(res).toMatchObject({ now: expect.any(Number), stats: { diff --git a/src/server/index.ts b/src/server/index.ts index 1a27082124..5d9bff49fe 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -8,6 +8,6 @@ import type {MyCtx} from './services/types'; const app = new RpcApp({ uws: App({}), - caller: createCaller(new Services()), + caller: createCaller(new Services()).caller, }); app.startWithDefaults(); diff --git a/src/server/routes/blocks/index.ts b/src/server/routes/blocks/index.ts index fb39b20751..ebe2dce3c7 100644 --- a/src/server/routes/blocks/index.ts +++ b/src/server/routes/blocks/index.ts @@ -4,6 +4,7 @@ import {remove} from './methods/remove'; import {edit} from './methods/edit'; import {listen} from './methods/listen'; import {Block, BlockId, BlockPatch, BlockSeq} from './schema'; +import {history} from './methods/history'; import type {RoutesBase, TypeRouter} from '../../../json-type/system/TypeRouter'; import type {RouteDeps} from '../types'; @@ -22,5 +23,6 @@ export const blocks = ( remove(d) ( edit(d) ( listen(d) - ( r ))))))); + ( history(d) + ( r )))))))); }; diff --git a/src/server/routes/blocks/methods/history.ts b/src/server/routes/blocks/methods/history.ts new file mode 100644 index 0000000000..e25f22cda3 --- /dev/null +++ b/src/server/routes/blocks/methods/history.ts @@ -0,0 +1,45 @@ +import type {BlockPatch, BlockId} from '../schema'; +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; + +export const history = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const Request = t.Object( + t.prop('id', t.Ref('BlockId')).options({ + title: 'Block ID', + description: 'The ID of the block.', + }), + t.prop('max', t.num.options({format: 'u32'})).options({ + title: 'Max', + description: 'The maximum sequence number to return.', + }), + t.prop('min', t.num.options({format: 'u32'})).options({ + title: 'Min', + description: 'The minimum sequence number to return.', + }), + ); + + const Response = t.Object( + t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ + title: 'Patches', + description: 'The list of patches.', + }), + ); + + const Func = t + .Function(Request, Response) + .options({ + title: 'Block History', + intro: 'Fetch block history.', + description: 'Returns a list of specified change patches for a block.', + }) + .implement(async ({id, min, max}) => { + const {patches} = await services.blocks.history(id, min, max); + return {patches}; + }); + + return router.fn('blocks.history', Func); + }; diff --git a/src/server/routes/blocks/schema.ts b/src/server/routes/blocks/schema.ts index 20f57df5a1..3f9e09f8b4 100644 --- a/src/server/routes/blocks/schema.ts +++ b/src/server/routes/blocks/schema.ts @@ -1,4 +1,5 @@ -import {type ResolveType, t} from '../../../json-type'; +import {type ResolveType} from '../../../json-type'; +import {t} from '../system'; export type TBlockId = ResolveType; export const BlockId = t.str.options({ @@ -11,7 +12,7 @@ export type TBlockSeq = ResolveType; export const BlockSeq = t.num.options({ title: 'Block Sequence Number', gte: 0, - format: 'u32', + format: 'i32', }); export type TBlock = ResolveType; diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts index e214200633..ed967ebd72 100644 --- a/src/server/routes/index.ts +++ b/src/server/routes/index.ts @@ -1,14 +1,13 @@ import {routes} from './routes'; -import {TypeSystem} from '../../json-type'; +import {system} from './system'; import {TypeRouter} from '../../json-type/system/TypeRouter'; import {TypeRouterCaller} from '../../reactive-rpc/common/rpc/caller/TypeRouterCaller'; import {RpcError} from '../../reactive-rpc/common/rpc/caller'; +import {Value} from '../../reactive-rpc/common/messages/Value'; import type {Services} from '../services/Services'; import type {RouteDeps} from './types'; -import {Value} from '../../reactive-rpc/common/messages/Value'; export const createRouter = (services: Services) => { - const system = new TypeSystem(); const router = new TypeRouter({system, routes: {}}); const deps: RouteDeps = {services, router}; return routes(deps)(router); @@ -26,5 +25,5 @@ export const createCaller = (services: Services) => { return RpcError.valueFrom(error); }, }); - return caller; + return {router, caller}; }; diff --git a/src/server/routes/presence/schema.ts b/src/server/routes/presence/schema.ts index 729733eb94..e8eca6d8a1 100644 --- a/src/server/routes/presence/schema.ts +++ b/src/server/routes/presence/schema.ts @@ -1,4 +1,5 @@ -import {type ResolveType, t} from '../../../json-type'; +import {type ResolveType} from '../../../json-type'; +import {t} from '../system'; export const PresenceEntry = t.Object( t.prop('id', t.str), diff --git a/src/server/routes/system.ts b/src/server/routes/system.ts new file mode 100644 index 0000000000..28d0e17ae2 --- /dev/null +++ b/src/server/routes/system.ts @@ -0,0 +1,4 @@ +import {TypeSystem} from '../../json-type'; + +export const system = new TypeSystem(); +export const t = system.t; diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 405dba62c8..f73085b4cf 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -3,12 +3,15 @@ import {StorePatch} from './types'; import {RpcError, RpcErrorCodes} from '../../../reactive-rpc/common/rpc/caller'; import type {Services} from '../Services'; +const BLOCK_TTL = 1000 * 60 * 20; // 20 minutes + export class BlocksServices { protected readonly store = new MemoryStore(); constructor(protected readonly services: Services) {} public async create(id: string, patches: StorePatch[]) { + this.maybeGc(); const {store} = this; const {block} = await store.create(id, patches); const data = { @@ -38,7 +41,14 @@ export class BlocksServices { }); } + public async history(id: string, min: number, max: number) { + const {store} = this; + const patches = await store.history(id, min, max); + return {patches}; + } + public async edit(id: string, patches: any[]) { + this.maybeGc(); if (!Array.isArray(patches)) throw RpcError.validation('patches must be an array'); if (!patches.length) throw RpcError.validation('patches must not be empty'); const seq = patches[0].seq; @@ -63,4 +73,18 @@ export class BlocksServices { public stats() { return this.store.stats(); } + + private maybeGc(): void { + if (Math.random() < 0.05) + this.gc().catch((error) => { + // tslint:disable-next-line:no-console + console.error('Error running gc', error); + }); + } + + private async gc(): Promise { + const ts = Date.now() - BLOCK_TTL; + const {store} = this; + await store.removeOlderThan(ts); + } } diff --git a/src/server/services/blocks/MemoryStore.ts b/src/server/services/blocks/MemoryStore.ts index 1e21496f09..85c060a7d4 100644 --- a/src/server/services/blocks/MemoryStore.ts +++ b/src/server/services/blocks/MemoryStore.ts @@ -72,6 +72,10 @@ export class MemoryStore implements types.Store { public async remove(id: string): Promise { await new Promise((resolve) => setImmediate(resolve)); + this.removeSync(id); + } + + private removeSync(id: string): void { this.blocks.delete(id); this.patches.delete(id); } @@ -82,4 +86,9 @@ export class MemoryStore implements types.Store { patches: [...this.patches.values()].reduce((acc, v) => acc + v.length, 0), }; } + + public async removeOlderThan(ts: number): Promise { + await new Promise((resolve) => setImmediate(resolve)); + for (const [id, block] of this.blocks) if (block.created < ts) this.removeSync(id); + } } diff --git a/src/util/Fuzzer.ts b/src/util/Fuzzer.ts index 1891e3bdd1..694149ba73 100644 --- a/src/util/Fuzzer.ts +++ b/src/util/Fuzzer.ts @@ -16,11 +16,14 @@ function xoshiro128ss(a: number, b: number, c: number, d: number) { } export class Fuzzer { - /** @deprecated */ public static randomInt(min: number, max: number): number { return Math.floor(Math.random() * (max - min + 1)) + min; } + public static randomInt2([min, max]: [min: number, max: number]): number { + return Math.floor(Math.random() * (max - min + 1)) + min; + } + /** @deprecated */ public static pick(elements: T[]): T { return elements[Math.floor(Math.random() * elements.length)];