From e5d2f3333116b1b659c2a5d2922919391d6807c1 Mon Sep 17 00:00:00 2001 From: streamich Date: Thu, 16 Nov 2023 17:50:02 +0100 Subject: [PATCH 01/25] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20add?= =?UTF-8?q?=20util.schema=20route?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/routes/index.ts | 6 +++--- src/server/routes/types.ts | 2 ++ src/server/routes/util/index.ts | 19 ++++++++++++++++++- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts index 7196fba41e..e214200633 100644 --- a/src/server/routes/index.ts +++ b/src/server/routes/index.ts @@ -9,9 +9,9 @@ import {Value} from '../../reactive-rpc/common/messages/Value'; export const createRouter = (services: Services) => { const system = new TypeSystem(); - const r = new TypeRouter({system, routes: {}}); - const deps: RouteDeps = {services}; - return routes(deps)(r); + const router = new TypeRouter({system, routes: {}}); + const deps: RouteDeps = {services, router}; + return routes(deps)(router); }; export const createCaller = (services: Services) => { diff --git a/src/server/routes/types.ts b/src/server/routes/types.ts index 03f0e10d1b..c6cea26e88 100644 --- a/src/server/routes/types.ts +++ b/src/server/routes/types.ts @@ -1,5 +1,7 @@ +import type {TypeRouter} from '../../json-type/system/TypeRouter'; import type {Services} from '../services/Services'; export interface RouteDeps { services: Services; + router: TypeRouter; } diff --git a/src/server/routes/util/index.ts b/src/server/routes/util/index.ts index 47d5238666..890e9d95ba 100644 --- a/src/server/routes/util/index.ts +++ b/src/server/routes/util/index.ts @@ -53,9 +53,26 @@ export const info = return router.fn('util.info', Func); }; +export const schema = + (deps: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + const Request = t.any; + const Response = t.Object( + t.prop('typescript', t.str), + ); + const Func = t.Function(Request, Response).implement(async () => { + return { + typescript: deps.router.toString(), + }; + }); + return router.fn('util.schema', Func); + }; + // prettier-ignore export const util = (deps: RouteDeps) => (r: TypeRouter) => ( ping(deps) ( echo(deps) ( info(deps) - ( r )))); + ( schema(deps) + ( r ))))); From dc8110c7a2b589ffb0d65092ce55919354f66b85 Mon Sep 17 00:00:00 2001 From: streamich Date: Thu, 16 Nov 2023 21:49:20 +0100 Subject: [PATCH 02/25] =?UTF-8?q?feat(json-type):=20=F0=9F=8E=B8=20emit=20?= =?UTF-8?q?function=20TypeScript=20ASTs?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../type/__tests__/toTypeScriptAst.spec.ts | 83 +++++++++++++++++++ src/json-type/type/classes.ts | 57 ++++++++++++- src/json-type/typescript/toText.ts | 9 +- src/json-type/typescript/types.ts | 18 +++- 4 files changed, 158 insertions(+), 9 deletions(-) diff --git a/src/json-type/type/__tests__/toTypeScriptAst.spec.ts b/src/json-type/type/__tests__/toTypeScriptAst.spec.ts index 58f44bb0ab..396e631dfd 100644 --- a/src/json-type/type/__tests__/toTypeScriptAst.spec.ts +++ b/src/json-type/type/__tests__/toTypeScriptAst.spec.ts @@ -244,3 +244,86 @@ describe('or', () => { `); }); }); + +describe('fn', () => { + test('can emit reference AST', () => { + const system = new TypeSystem(); + const {t} = system; + const type = system.t.Function(t.str, t.num); + expect(type.toTypeScriptAst()).toMatchInlineSnapshot(` + { + "node": "FunctionType", + "parameters": [ + { + "name": { + "name": "request", + "node": "Identifier", + }, + "node": "Parameter", + "type": { + "node": "StringKeyword", + }, + }, + ], + "type": { + "node": "TypeReference", + "typeArguments": [ + { + "node": "NumberKeyword", + }, + ], + "typeName": { + "name": "Promise", + "node": "Identifier", + }, + }, + } + `); + }); +}); + +describe('fn$', () => { + test('can emit reference AST', () => { + const system = new TypeSystem(); + const {t} = system; + const type = system.t.Function$(t.str, t.num); + expect(type.toTypeScriptAst()).toMatchInlineSnapshot(` + { + "node": "FunctionType", + "parameters": [ + { + "name": { + "name": "request$", + "node": "Identifier", + }, + "node": "Parameter", + "type": { + "node": "TypeReference", + "typeArguments": [ + { + "node": "StringKeyword", + }, + ], + "typeName": { + "name": "Observable", + "node": "Identifier", + }, + }, + }, + ], + "type": { + "node": "TypeReference", + "typeArguments": [ + { + "node": "NumberKeyword", + }, + ], + "typeName": { + "name": "Observable", + "node": "Identifier", + }, + }, + } + `); + }); +}); diff --git a/src/json-type/type/classes.ts b/src/json-type/type/classes.ts index f818c4ebd9..15e4e09ec3 100644 --- a/src/json-type/type/classes.ts +++ b/src/json-type/type/classes.ts @@ -2228,8 +2228,29 @@ export class FunctionType extends AbstractTy return this; } - public toTypeScriptAst(): ts.TsUnionType { - throw new Error('Method not implemented.'); + public toTypeScriptAst(): ts.TsFunctionType { + const node: ts.TsFunctionType = { + node: 'FunctionType', + parameters: [ + { + node: 'Parameter', + name: { + node: 'Identifier', + name: 'request', + }, + type: this.req.toTypeScriptAst(), + }, + ], + type: { + node: 'TypeReference', + typeName: { + node: 'Identifier', + name: 'Promise', + }, + typeArguments: [this.res.toTypeScriptAst()], + }, + }; + return node; } public toString(tab: string = ''): string { @@ -2289,8 +2310,36 @@ export class FunctionStreamingType extends A return this; } - public toTypeScriptAst(): ts.TsUnionType { - throw new Error('Method not implemented.'); + public toTypeScriptAst(): ts.TsFunctionType { + const node: ts.TsFunctionType = { + node: 'FunctionType', + parameters: [ + { + node: 'Parameter', + name: { + node: 'Identifier', + name: 'request$', + }, + type: { + node: 'TypeReference', + typeName: { + node: 'Identifier', + name: 'Observable', + }, + typeArguments: [this.req.toTypeScriptAst()], + }, + }, + ], + type: { + node: 'TypeReference', + typeName: { + node: 'Identifier', + name: 'Observable', + }, + typeArguments: [this.res.toTypeScriptAst()], + }, + }; + return node; } public toString(tab: string = ''): string { diff --git a/src/json-type/typescript/toText.ts b/src/json-type/typescript/toText.ts index 8a46bbbdd8..db1512ebe2 100644 --- a/src/json-type/typescript/toText.ts +++ b/src/json-type/typescript/toText.ts @@ -1,5 +1,5 @@ import {wordWrap} from '../../util/strings/wordWrap'; -import {TsNode} from './types'; +import {TsIdentifier, TsNode} from './types'; import {TAB, isSimpleType, normalizeKey} from './util'; const formatComment = (comment: string | undefined, __: string): string => { @@ -8,7 +8,7 @@ const formatComment = (comment: string | undefined, __: string): string => { return __ + '/**\n' + __ + ' * ' + lines.join('\n' + __ + ' * ') + '\n' + __ + ' */\n'; }; -export const toText = (node: TsNode | TsNode[], __: string = ''): string => { +export const toText = (node: TsNode | TsNode[] | TsIdentifier, __: string = ''): string => { if (Array.isArray(node)) return node.map((s) => toText(s, __)).join('\n'); const ____ = __ + TAB; @@ -96,7 +96,10 @@ export const toText = (node: TsNode | TsNode[], __: string = ''): string => { return node.types.map((t) => toText(t, ____)).join(' | '); } case 'TypeReference': { - return node.typeName; + return typeof node.typeName === 'string' ? node.typeName : toText(node.typeName, __); + } + case 'Identifier': { + return node.name; } } diff --git a/src/json-type/typescript/types.ts b/src/json-type/typescript/types.ts index 8f759ffb95..af3f3ae541 100644 --- a/src/json-type/typescript/types.ts +++ b/src/json-type/typescript/types.ts @@ -134,7 +134,20 @@ export interface TsGenericTypeAnnotation { /** A reference to a type alias, e.g. "foo: Reference". */ export interface TsTypeReference { node: 'TypeReference'; - typeName: string; + typeName: string | TsIdentifier; + typeArguments?: TsType[]; +} + +export interface TsFunctionType { + node: 'FunctionType'; + parameters: TsParameter[]; + type: TsType; +} + +export interface TsParameter { + node: 'Parameter'; + name: TsIdentifier; + type: TsType; } /** All type annotations. */ @@ -155,7 +168,8 @@ export type TsType = | TsUnknownKeyword | TsUnionType | TsTypeReference - | TsGenericTypeAnnotation; + | TsGenericTypeAnnotation + | TsFunctionType; /** Any possible TypeScript AST node. */ export type TsNode = TsDeclaration | TsType | TsPropertySignature | TsIndexSignature; From d9234d0e0c023cc5d77449bd46a9b39bb538cf79 Mon Sep 17 00:00:00 2001 From: streamich Date: Thu, 16 Nov 2023 22:16:41 +0100 Subject: [PATCH 03/25] =?UTF-8?q?feat(json-type):=20=F0=9F=8E=B8=20export?= =?UTF-8?q?=20router=20to=20TypeScript=20type=20annotations=20text?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-type/system/TypeRouter.ts | 24 +++++++++++++++++++ .../system/__tests__/toTypeScript.spec.ts | 18 ++++++++++++++ src/json-type/typescript/toText.ts | 21 ++++++++++++---- src/json-type/typescript/types.ts | 18 +++++++------- 4 files changed, 68 insertions(+), 13 deletions(-) diff --git a/src/json-type/system/TypeRouter.ts b/src/json-type/system/TypeRouter.ts index 63a385054f..e0c5d0901b 100644 --- a/src/json-type/system/TypeRouter.ts +++ b/src/json-type/system/TypeRouter.ts @@ -1,5 +1,7 @@ import * as classes from '../type/classes'; import {TypeSystem} from './TypeSystem'; +import {toText} from '../typescript/toText'; +import type * as ts from '../typescript/types'; import type {TypeBuilder} from '../type/TypeBuilder'; export interface TypeRouterOptions { @@ -60,6 +62,28 @@ export class TypeRouter { this.routes[name] = type; return this; } + + public toTypeScriptAst(): ts.TsTypeLiteral { + const node: ts.TsTypeLiteral = { + node: 'TypeLiteral', + members: [], + }; + for (const [name, type] of Object.entries(this.routes)) { + const schema = type.getSchema(); + const property: ts.TsPropertySignature = { + node: 'PropertySignature', + name, + type: type.toTypeScriptAst(), + }; + if (schema.title) property.comment = schema.title; + node.members.push(property); + } + return node; + } + + public toTypeScript(): string { + return toText(this.toTypeScriptAst()); + } } export type RoutesBase = Record | classes.FunctionStreamingType>; diff --git a/src/json-type/system/__tests__/toTypeScript.spec.ts b/src/json-type/system/__tests__/toTypeScript.spec.ts index 7b8cd8dd5f..265d481c3a 100644 --- a/src/json-type/system/__tests__/toTypeScript.spec.ts +++ b/src/json-type/system/__tests__/toTypeScript.spec.ts @@ -1,4 +1,5 @@ import {TypeSystem} from '..'; +import {TypeRouter} from '../TypeRouter'; test('generates TypeScript source for simple string type', () => { const system = new TypeSystem(); @@ -91,3 +92,20 @@ test('type interface inside a tuple', () => { " `); }); + +test('can export whole router', () => { + const system = new TypeSystem(); + const {t} = system; + const router = new TypeRouter({system, routes: {}}).extend(() => ({ + callMe: t.Function(t.str, t.num), + subscribe: t.Function$(t.Object(t.prop('id', t.str)), t.obj), + })); + expect(router.toTypeScript()).toMatchInlineSnapshot(` + "{ + callMe: (request: string) => Promise; + subscribe: (request$: Observable<{ + id: string; + }>) => Observable<{}>; + }" + `); +}); diff --git a/src/json-type/typescript/toText.ts b/src/json-type/typescript/toText.ts index db1512ebe2..2562e4d166 100644 --- a/src/json-type/typescript/toText.ts +++ b/src/json-type/typescript/toText.ts @@ -1,5 +1,5 @@ import {wordWrap} from '../../util/strings/wordWrap'; -import {TsIdentifier, TsNode} from './types'; +import {TsIdentifier, TsNode, TsParameter} from './types'; import {TAB, isSimpleType, normalizeKey} from './util'; const formatComment = (comment: string | undefined, __: string): string => { @@ -8,7 +8,7 @@ const formatComment = (comment: string | undefined, __: string): string => { return __ + '/**\n' + __ + ' * ' + lines.join('\n' + __ + ' * ') + '\n' + __ + ' */\n'; }; -export const toText = (node: TsNode | TsNode[] | TsIdentifier, __: string = ''): string => { +export const toText = (node: TsNode | TsNode[] | TsIdentifier | TsParameter, __: string = ''): string => { if (Array.isArray(node)) return node.map((s) => toText(s, __)).join('\n'); const ____ = __ + TAB; @@ -78,7 +78,7 @@ export const toText = (node: TsNode | TsNode[] | TsIdentifier, __: string = ''): return 'unknown'; } case 'TypeLiteral': { - return `{\n${toText(node.members, ____)}\n${__}}`; + return !node.members.length ? '{}' : `{\n${toText(node.members, ____)}\n${__}}`; } case 'StringLiteral': { return JSON.stringify(node.text); @@ -96,11 +96,24 @@ export const toText = (node: TsNode | TsNode[] | TsIdentifier, __: string = ''): return node.types.map((t) => toText(t, ____)).join(' | '); } case 'TypeReference': { - return typeof node.typeName === 'string' ? node.typeName : toText(node.typeName, __); + return (typeof node.typeName === 'string' ? node.typeName : toText(node.typeName, __)) + + (node.typeArguments && node.typeArguments.length > 0 ? `<${node.typeArguments.map((t) => toText(t, __)).join(', ')}>` : ''); } case 'Identifier': { return node.name; } + case 'FunctionType': { + const {parameters, type} = node; + const params = parameters.map((p) => toText(p, __)).join(', '); + return `(${params}) => ${toText(type, __)}`; + } + case 'ObjectKeyword': { + return 'object'; + } + case 'Parameter': { + const {name, type} = node; + return `${toText(name, __)}: ${toText(type, __)}`; + } } return ''; diff --git a/src/json-type/typescript/types.ts b/src/json-type/typescript/types.ts index af3f3ae541..1e8cd6496b 100644 --- a/src/json-type/typescript/types.ts +++ b/src/json-type/typescript/types.ts @@ -152,20 +152,20 @@ export interface TsParameter { /** All type annotations. */ export type TsType = + | TsAnyKeyword + | TsUnknownKeyword + | TsNullKeyword + | TsBooleanKeyword + | TsTrueKeyword + | TsFalseKeyword + | TsNumberKeyword + | TsStringKeyword + | TsStringLiteral | TsArrayType | TsTupleType - | TsStringKeyword - | TsNumberKeyword - | TsBooleanKeyword - | TsNullKeyword | TsObjectKeyword - | TsAnyKeyword | TsTypeLiteral - | TsStringLiteral | TsNumericLiteral - | TsTrueKeyword - | TsFalseKeyword - | TsUnknownKeyword | TsUnionType | TsTypeReference | TsGenericTypeAnnotation From ddab5b2daae5fa00c1ef77a80b336e50161b042d Mon Sep 17 00:00:00 2001 From: streamich Date: Thu, 16 Nov 2023 22:40:15 +0100 Subject: [PATCH 04/25] =?UTF-8?q?feat(json-type):=20=F0=9F=8E=B8=20export?= =?UTF-8?q?=20router=20as=20a=20TypeScript=20modeul?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-type/system/TypeRouter.ts | 24 +++++++++++++++++-- src/json-type/system/TypeSystem.ts | 2 +- .../system/__tests__/toTypeScript.spec.ts | 17 +++++++------ src/json-type/typescript/toText.ts | 16 ++++++------- src/json-type/typescript/types.ts | 3 +++ 5 files changed, 44 insertions(+), 18 deletions(-) diff --git a/src/json-type/system/TypeRouter.ts b/src/json-type/system/TypeRouter.ts index e0c5d0901b..accfbce46c 100644 --- a/src/json-type/system/TypeRouter.ts +++ b/src/json-type/system/TypeRouter.ts @@ -29,7 +29,7 @@ export class TypeRouter { this.system = options.system; this.t = this.system.t; this.routes = options.routes; - this.system.importTypes(this.routes); + // this.system.importTypes(this.routes); } protected merge>(router: Router): TypeRouter> { @@ -81,8 +81,28 @@ export class TypeRouter { return node; } + public toTypeScriptModuleAst(): ts.TsModuleDeclaration { + const node: ts.TsModuleDeclaration = { + node: 'ModuleDeclaration', + name: 'Router', + export: true, + statements: [ + { + node: 'TypeAliasDeclaration', + name: 'Routes', + type: this.toTypeScriptAst(), + export: true, + }, + ], + }; + for (const alias of this.system.aliases.values()) + node.statements.push(alias.toTypeScriptAst()); + return node; + } + public toTypeScript(): string { - return toText(this.toTypeScriptAst()); + this.system.exportTypes + return toText(this.toTypeScriptModuleAst()); } } diff --git a/src/json-type/system/TypeSystem.ts b/src/json-type/system/TypeSystem.ts index ec8d27d5c0..735170fcb1 100644 --- a/src/json-type/system/TypeSystem.ts +++ b/src/json-type/system/TypeSystem.ts @@ -9,7 +9,7 @@ import type {Printable} from '../../util/print/types'; export class TypeSystem implements Printable { public readonly t = new TypeBuilder(this); - protected readonly aliases: Map> = new Map(); + public readonly aliases: Map> = new Map(); /** * @todo Add ability fetch object of given type by its ID, analogous to diff --git a/src/json-type/system/__tests__/toTypeScript.spec.ts b/src/json-type/system/__tests__/toTypeScript.spec.ts index 265d481c3a..654ff453a7 100644 --- a/src/json-type/system/__tests__/toTypeScript.spec.ts +++ b/src/json-type/system/__tests__/toTypeScript.spec.ts @@ -98,14 +98,17 @@ test('can export whole router', () => { const {t} = system; const router = new TypeRouter({system, routes: {}}).extend(() => ({ callMe: t.Function(t.str, t.num), - subscribe: t.Function$(t.Object(t.prop('id', t.str)), t.obj), + 'block.subscribe': t.Function$(t.Object(t.prop('id', t.str)), t.obj), })); expect(router.toTypeScript()).toMatchInlineSnapshot(` - "{ - callMe: (request: string) => Promise; - subscribe: (request$: Observable<{ - id: string; - }>) => Observable<{}>; - }" + "export namespace Router { + export type Routes = { + callMe: (request: string) => Promise; + "block.subscribe": (request$: Observable<{ + id: string; + }>) => Observable<{}>; + }; + } + " `); }); diff --git a/src/json-type/typescript/toText.ts b/src/json-type/typescript/toText.ts index 2562e4d166..73f48caa87 100644 --- a/src/json-type/typescript/toText.ts +++ b/src/json-type/typescript/toText.ts @@ -16,7 +16,7 @@ export const toText = (node: TsNode | TsNode[] | TsIdentifier | TsParameter, __: switch (node.node) { case 'ModuleDeclaration': { let out: string = ''; - out += `${__}namespace ${node.name} {\n`; + out += `${__}${node.export ? 'export ' : ''}namespace ${node.name} {\n`; out += toText(node.statements, ____); out += `${__}}\n`; return out; @@ -25,11 +25,17 @@ export const toText = (node: TsNode | TsNode[] | TsIdentifier | TsParameter, __: const {name, members, comment} = node; let out: string = ''; out += formatComment(comment, __); - out += `${__}interface ${name} {\n`; + out += `${__}${node.export ? 'export ' : ''}interface ${name} {\n`; out += toText(members, ____); out += `\n${__}}\n`; return out; } + case 'TypeAliasDeclaration': { + let out: string = ''; + out += formatComment(node.comment, __); + out += `${__}${node.export ? 'export ' : ''}type ${node.name} = ${toText(node.type, __)};\n`; + return out; + } case 'PropertySignature': { const name = normalizeKey(node.name); let out: string = ''; @@ -39,12 +45,6 @@ export const toText = (node: TsNode | TsNode[] | TsIdentifier | TsParameter, __: case 'IndexSignature': { return `${__}[key: string]: ${toText(node.type, __)};`; } - case 'TypeAliasDeclaration': { - let out: string = ''; - out += formatComment(node.comment, __); - out += `${__}type ${node.name} = ${toText(node.type)};\n`; - return out; - } case 'ArrayType': { const simple = isSimpleType(node.elementType); const inner = toText(node.elementType, __); diff --git a/src/json-type/typescript/types.ts b/src/json-type/typescript/types.ts index 1e8cd6496b..83398a51be 100644 --- a/src/json-type/typescript/types.ts +++ b/src/json-type/typescript/types.ts @@ -4,6 +4,7 @@ export interface TsModuleDeclaration { name: string; statements: TsDeclaration[]; comment?: string; + export?: boolean; } /** An interface declaration, e.g. "interface Bar {". */ @@ -12,6 +13,7 @@ export interface TsInterfaceDeclaration { name: string; members: Array; comment?: string; + export?: boolean; } /** A property of an interface type. */ @@ -35,6 +37,7 @@ export interface TsTypeAliasDeclaration { name: string; type: TsType; comment?: string; + export?: boolean; } /** All possible declarations that can be statements of a module. */ From 7d8a45a5e5b8ab8327af1bd40ee191327149c819 Mon Sep 17 00:00:00 2001 From: streamich Date: Thu, 16 Nov 2023 22:43:17 +0100 Subject: [PATCH 05/25] =?UTF-8?q?test(json-type):=20=F0=9F=92=8D=20check?= =?UTF-8?q?=20that=20type=20aliases=20are=20exported?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-type/system/TypeRouter.ts | 2 +- .../system/__tests__/toTypeScript.spec.ts | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+), 1 deletion(-) diff --git a/src/json-type/system/TypeRouter.ts b/src/json-type/system/TypeRouter.ts index accfbce46c..158aece4d8 100644 --- a/src/json-type/system/TypeRouter.ts +++ b/src/json-type/system/TypeRouter.ts @@ -96,7 +96,7 @@ export class TypeRouter { ], }; for (const alias of this.system.aliases.values()) - node.statements.push(alias.toTypeScriptAst()); + node.statements.push({...alias.toTypeScriptAst(), export: true}); return node; } diff --git a/src/json-type/system/__tests__/toTypeScript.spec.ts b/src/json-type/system/__tests__/toTypeScript.spec.ts index 654ff453a7..1b443730ca 100644 --- a/src/json-type/system/__tests__/toTypeScript.spec.ts +++ b/src/json-type/system/__tests__/toTypeScript.spec.ts @@ -112,3 +112,29 @@ test('can export whole router', () => { " `); }); + +test('can export whole router and aliases', () => { + const system = new TypeSystem(); + const {t} = system; + system.alias('Document', t.Object(t.prop('id', t.str), t.prop('title', t.str)).options({title: 'The document'})); + const router = new TypeRouter({system, routes: {}}).extend(() => ({ + callMe: t.Function(t.str, t.num), + 'block.subscribe': t.Function$(t.Object(t.prop('id', t.str)), t.Ref('Document')), + })); + expect(router.toTypeScript()).toMatchInlineSnapshot(` + "export namespace Router { + export type Routes = { + callMe: (request: string) => Promise; + "block.subscribe": (request$: Observable<{ + id: string; + }>) => Observable; + }; + + export interface Document { + id: string; + title: string; + } + } + " + `); +}); From a40d36f3a4349ded122f8d3faf7b095ce6af826b Mon Sep 17 00:00:00 2001 From: streamich Date: Thu, 16 Nov 2023 22:49:29 +0100 Subject: [PATCH 06/25] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20retu?= =?UTF-8?q?rn=20TS=20types=20from=20util.schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/routes/util/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/server/routes/util/index.ts b/src/server/routes/util/index.ts index 890e9d95ba..079f12b4a2 100644 --- a/src/server/routes/util/index.ts +++ b/src/server/routes/util/index.ts @@ -63,7 +63,7 @@ export const schema = ); const Func = t.Function(Request, Response).implement(async () => { return { - typescript: deps.router.toString(), + typescript: deps.router.toTypeScript(), }; }); return router.fn('util.schema', Func); From 9bdd15c2d372384e2b43e5f57ee59fce8686716e Mon Sep 17 00:00:00 2001 From: streamich Date: Thu, 16 Nov 2023 22:50:07 +0100 Subject: [PATCH 07/25] =?UTF-8?q?style:=20=F0=9F=92=84=20run=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-type/system/TypeRouter.ts | 5 ++--- src/json-type/typescript/toText.ts | 8 ++++++-- src/server/routes/util/index.ts | 4 +--- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/src/json-type/system/TypeRouter.ts b/src/json-type/system/TypeRouter.ts index 158aece4d8..40d9688be6 100644 --- a/src/json-type/system/TypeRouter.ts +++ b/src/json-type/system/TypeRouter.ts @@ -95,13 +95,12 @@ export class TypeRouter { }, ], }; - for (const alias of this.system.aliases.values()) - node.statements.push({...alias.toTypeScriptAst(), export: true}); + for (const alias of this.system.aliases.values()) node.statements.push({...alias.toTypeScriptAst(), export: true}); return node; } public toTypeScript(): string { - this.system.exportTypes + this.system.exportTypes; return toText(this.toTypeScriptModuleAst()); } } diff --git a/src/json-type/typescript/toText.ts b/src/json-type/typescript/toText.ts index 73f48caa87..4e3710eb57 100644 --- a/src/json-type/typescript/toText.ts +++ b/src/json-type/typescript/toText.ts @@ -96,8 +96,12 @@ export const toText = (node: TsNode | TsNode[] | TsIdentifier | TsParameter, __: return node.types.map((t) => toText(t, ____)).join(' | '); } case 'TypeReference': { - return (typeof node.typeName === 'string' ? node.typeName : toText(node.typeName, __)) + - (node.typeArguments && node.typeArguments.length > 0 ? `<${node.typeArguments.map((t) => toText(t, __)).join(', ')}>` : ''); + return ( + (typeof node.typeName === 'string' ? node.typeName : toText(node.typeName, __)) + + (node.typeArguments && node.typeArguments.length > 0 + ? `<${node.typeArguments.map((t) => toText(t, __)).join(', ')}>` + : '') + ); } case 'Identifier': { return node.name; diff --git a/src/server/routes/util/index.ts b/src/server/routes/util/index.ts index 079f12b4a2..d4f085a514 100644 --- a/src/server/routes/util/index.ts +++ b/src/server/routes/util/index.ts @@ -58,9 +58,7 @@ export const schema = (router: TypeRouter) => { const t = router.t; const Request = t.any; - const Response = t.Object( - t.prop('typescript', t.str), - ); + const Response = t.Object(t.prop('typescript', t.str)); const Func = t.Function(Request, Response).implement(async () => { return { typescript: deps.router.toTypeScript(), From 4f9f8953359ce7e829fbab9c26df5f967ab50fdd Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 01:29:10 +0100 Subject: [PATCH 08/25] =?UTF-8?q?chore(reactive-rpc):=20=F0=9F=A4=96=20rep?= =?UTF-8?q?licate=20.ping=20error=20responseee?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 2 +- src/__tests__/reactive-rpc/README.md | 2 +- .../uws/ws/rx-rpc/RpcPersistentClient.spec.ts | 77 +++++++++++++++++++ src/reactive-rpc/common/channel/channel.ts | 1 + .../common/rpc/RpcPersistentClient.ts | 8 +- src/server/test.ts | 50 ++++++++++++ yarn.lock | 2 +- 7 files changed, 138 insertions(+), 4 deletions(-) create mode 100644 src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts create mode 100644 src/server/test.ts diff --git a/package.json b/package.json index 54aba6e369..932122037c 100644 --- a/package.json +++ b/package.json @@ -158,7 +158,7 @@ "webpack": "^5.84.1", "webpack-cli": "^5.1.1", "webpack-dev-server": "^4.15.0", - "ws": "^8.6.0", + "ws": "^8.14.2", "yjs": "13.6.8", "ywasm": "0.16.10" }, diff --git a/src/__tests__/reactive-rpc/README.md b/src/__tests__/reactive-rpc/README.md index 5e271d1bb8..e9c7f2aa79 100644 --- a/src/__tests__/reactive-rpc/README.md +++ b/src/__tests__/reactive-rpc/README.md @@ -22,5 +22,5 @@ yarn test:reactive-rpc:jest To run a specific test suite use this command: ``` -TEST_E2E=1 npx jest --no-cache src/__tests__/reactive-rpc/ws-binary.spec.ts +TEST_E2E=1 npx jest --no-cache src/__tests__/reactive-rpc/uws/http/rx-rpc/FetchRpcClient.spec.ts ``` diff --git a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts new file mode 100644 index 0000000000..62e4ee5a83 --- /dev/null +++ b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts @@ -0,0 +1,77 @@ +/** + * @jest-environment node + */ + +import {ApiTestSetup, runApiTests} from '../../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; +import WebSocket from "ws"; +import {RpcCodecs} from '../../../../../reactive-rpc/common/codec/RpcCodecs'; +import {RpcMessageCodecs} from '../../../../../reactive-rpc/common/codec/RpcMessageCodecs'; +import {Writer} from '../../../../../util/buffers/Writer'; +import {Codecs} from '../../../../../json-pack/codecs/Codecs'; +import {RpcMessageCodec} from '../../../../../reactive-rpc/common/codec/types'; +import {JsonValueCodec} from '../../../../../json-pack/codecs/types'; +import {FetchRpcClient} from '../../../../../reactive-rpc/common/rpc/client/FetchRpcClient'; +import {RpcCodec} from '../../../../../reactive-rpc/common/codec/RpcCodec'; +import {RpcPersistentClient, WebSocketChannel} from '../../../../../reactive-rpc/common'; + +if (process.env.TEST_E2E) { + const codecs = new RpcCodecs(new Codecs(new Writer()), new RpcMessageCodecs()); + const {binary, compact, jsonRpc2} = codecs.messages; + const {json, cbor, msgpack} = codecs.value; + const cases: [specifier: string, protocol: RpcMessageCodec, req: JsonValueCodec, res: JsonValueCodec][] = [ + ['rpc.rx.compact.json', compact, json, json], + // ['rpc.rx.compact.cbor', compact, cbor, cbor], + // ['rpc.rx.compact.msgpack', compact, msgpack, msgpack], + // ['rpc.rx.compact.json-cbor', compact, json, cbor], + // ['rpc.rx.compact.json-msgpack', compact, json, msgpack], + // ['rpc.rx.compact.cbor-json', compact, cbor, json], + // ['rpc.rx.compact.cbor-msgpack', compact, cbor, msgpack], + // ['rpc.rx.compact.msgpack-json', compact, msgpack, json], + // ['rpc.rx.compact.msgpack-cbor', compact, msgpack, cbor], + + // ['rpc.rx.binary.cbor', binary, cbor, cbor], + // ['rpc.rx.binary.msgpack', binary, msgpack, msgpack], + // ['rpc.rx.binary.json', binary, json, json], + // ['rpc.rx.binary.json-cbor', binary, json, cbor], + // ['rpc.rx.binary.json-msgpack', binary, json, msgpack], + // ['rpc.rx.binary.cbor-json', binary, cbor, json], + // ['rpc.rx.binary.cbor-msgpack', binary, cbor, msgpack], + // ['rpc.rx.binary.msgpack-json', binary, msgpack, json], + // ['rpc.rx.binary.msgpack-cbor', binary, msgpack, cbor], + + // ['rpc.json2.verbose.json', jsonRpc2, json, json], + // ['rpc.json2.verbose.cbor', jsonRpc2, cbor, cbor], + // ['rpc.json2.verbose.msgpack', jsonRpc2, msgpack, msgpack], + // ['rpc.json2.verbose.json-cbor', jsonRpc2, json, cbor], + // ['rpc.json2.verbose.json-msgpack', jsonRpc2, json, msgpack], + // ['rpc.json2.verbose.cbor-json', jsonRpc2, cbor, json], + // ['rpc.json2.verbose.cbor-msgpack', jsonRpc2, cbor, msgpack], + // ['rpc.json2.verbose.msgpack-json', jsonRpc2, msgpack, json], + // ['rpc.json2.verbose.msgpack-cbor', jsonRpc2, msgpack, cbor], + ]; + + for (const [protocolSpecifier, msgCodec, reqCodec, resCodec] of cases) { + const setup: ApiTestSetup = async () => { + const port = +(process.env.PORT || 9999); + const url = `ws://localhost:${port}/rpc`; + const codec = new RpcCodec(reqCodec, msgCodec); + const client = new RpcPersistentClient({ + codec, + channel: { + newChannel: () => + new WebSocketChannel({ + newSocket: () => new WebSocket(url, { + protocol: protocolSpecifier, + }) as any, + }), + }, + }); + return {client}; + }; + describe(`protocol: application/x.${protocolSpecifier}`, () => { + runApiTests(setup, {staticOnly: true}); + }); + } +} else { + test.skip('set TEST_E2E=1 env var to run this test suite', () => {}); +} diff --git a/src/reactive-rpc/common/channel/channel.ts b/src/reactive-rpc/common/channel/channel.ts index 10690944f1..93947400b2 100644 --- a/src/reactive-rpc/common/channel/channel.ts +++ b/src/reactive-rpc/common/channel/channel.ts @@ -221,6 +221,7 @@ export class PersistentChannel !!channel), switchMap((channel) => channel!.message$), + tap(msg => console.log('->', msg)) ); /** Number of times we have attempted to reconnect. */ diff --git a/src/reactive-rpc/common/rpc/RpcPersistentClient.ts b/src/reactive-rpc/common/rpc/RpcPersistentClient.ts index 85ddd9fcab..50870669ce 100644 --- a/src/reactive-rpc/common/rpc/RpcPersistentClient.ts +++ b/src/reactive-rpc/common/rpc/RpcPersistentClient.ts @@ -20,6 +20,12 @@ export interface RpcPersistentClientParams { * not send ping messages. */ ping?: number; + + /** + * The notification method name that is used for ping keep-alive messages, if + * not specified, defaults to ".ping". + */ + pingMethod?: string; } export class RpcPersistentClient { @@ -68,7 +74,7 @@ export class RpcPersistentClient { timer(ping, ping) .pipe(takeUntil(close$)) .subscribe(() => { - duplex.notify('.ping', undefined); + duplex.notify(params.pingMethod || '.ping', undefined); }); } diff --git a/src/server/test.ts b/src/server/test.ts new file mode 100644 index 0000000000..9bb9e5a001 --- /dev/null +++ b/src/server/test.ts @@ -0,0 +1,50 @@ +// Run: npx ts-node src/server/test.ts + +import WebSocket from "ws"; +import {CborJsonValueCodec} from "../json-pack/codecs/cbor"; +import {RpcPersistentClient, WebSocketChannel} from "../reactive-rpc/common"; +import {RpcCodec} from "../reactive-rpc/common/codec/RpcCodec"; +import {BinaryRpcMessageCodec} from "../reactive-rpc/common/codec/binary"; +import {Writer} from "../util/buffers/Writer"; +import {JsonJsonValueCodec} from "../json-pack/codecs/json"; +import {CompactRpcMessageCodec} from "../reactive-rpc/common/codec/compact"; + +const writer = new Writer(); +const codec = new RpcCodec(new CborJsonValueCodec(writer), new BinaryRpcMessageCodec()); +// const codec = new RpcCodec(new JsonJsonValueCodec(writer), new CompactRpcMessageCodec()); +const client = new RpcPersistentClient({ + codec, + channel: { + newChannel: () => + new WebSocketChannel({ + newSocket: () => new WebSocket('ws://127.0.0.1:9999/rpc', { + // protocol: 'rpc.rx.compact.json', + protocol: 'rpc.rx.binary.cbor', + perMessageDeflate: false, + }) as any, + // newSocket: () => new WebSocket(this.wsHost, 'rpc.rx.compact.json'), + }), + }, +}); + +client.start(); + +console.log('call'); +client.notify('.ping', {}); +client.call('util.ping', {}).then((value) => { + console.log('then', value); +}).catch((error) => { + console.log('catch', error); +}); + +// const ws = new WebSocket('ws://127.0.0.1:9999/rpc', { +// protocol: 'rpc.rx.compact.json', +// }); + +// ws.onmessage = (msg) => console.log('msg', msg.data); + +// ws.onopen = () => { +// console.log('open'); +// ws.send(JSON.stringify([1,1,'util.ping',{}])); +// }; + diff --git a/yarn.lock b/yarn.lock index 64d325de9d..41b83e33d4 100644 --- a/yarn.lock +++ b/yarn.lock @@ -5737,7 +5737,7 @@ write-file-atomic@^4.0.2: imurmurhash "^0.1.4" signal-exit "^3.0.7" -ws@^8.13.0, ws@^8.6.0: +ws@^8.13.0, ws@^8.14.2: version "8.14.2" resolved "https://registry.yarnpkg.com/ws/-/ws-8.14.2.tgz#6c249a806eb2db7a20d26d51e7709eab7b2e6c7f" integrity sha512-wEBG1ftX4jcglPxgFCMJmZ2PLtSbJ2Peg6TmpJFTbe9GZYOQCDPdMYu/Tm0/bGZkw8paZnJY45J4K2PZrLYq8g== From a11c5c2583fda43c666e2cd6f7b904a6054dc719 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 10:15:56 +0100 Subject: [PATCH 09/25] =?UTF-8?q?refactor(reactive-rpc):=20=F0=9F=92=A1=20?= =?UTF-8?q?improve=20error=20method=20name?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/codec/compact/CompactRpcMessageCodec.ts | 2 +- .../codec/json-rpc-2/JsonRpc2RpcMessageCodec.ts | 12 ++++++------ src/reactive-rpc/common/rpc/caller/RpcCaller.ts | 2 +- src/reactive-rpc/common/rpc/caller/error/RpcError.ts | 3 +-- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/src/reactive-rpc/common/codec/compact/CompactRpcMessageCodec.ts b/src/reactive-rpc/common/codec/compact/CompactRpcMessageCodec.ts index 41242c7ecb..1b86e0461a 100644 --- a/src/reactive-rpc/common/codec/compact/CompactRpcMessageCodec.ts +++ b/src/reactive-rpc/common/codec/compact/CompactRpcMessageCodec.ts @@ -95,7 +95,7 @@ export class CompactRpcMessageCodec implements RpcMessageCodec { public decodeBatch(jsonCodec: JsonValueCodec, uint8: Uint8Array): msg.ReactiveRpcMessage[] { const decoder = jsonCodec.decoder; const value = decoder.read(uint8); - if (!(value instanceof Array)) throw RpcError.invalidRequest(); + if (!(value instanceof Array)) throw RpcError.badRequest(); if (typeof value[0] === 'number') return [fromJson(value as unknown[])]; const result: msg.ReactiveRpcMessage[] = []; const length = value.length; diff --git a/src/reactive-rpc/common/codec/json-rpc-2/JsonRpc2RpcMessageCodec.ts b/src/reactive-rpc/common/codec/json-rpc-2/JsonRpc2RpcMessageCodec.ts index e7e3edd668..c04c8f5f75 100644 --- a/src/reactive-rpc/common/codec/json-rpc-2/JsonRpc2RpcMessageCodec.ts +++ b/src/reactive-rpc/common/codec/json-rpc-2/JsonRpc2RpcMessageCodec.ts @@ -114,12 +114,12 @@ export class JsonRpc2RpcMessageCodec implements RpcMessageCodec { return messages; } catch (error) { if (error instanceof RpcError) throw error; - throw RpcError.invalidRequest(); + throw RpcError.badRequest(); } } public fromJson(message: schema.JsonRpc2Message): msg.ReactiveRpcMessage { - if (!message || typeof message !== 'object') throw RpcError.invalidRequest(); + if (!message || typeof message !== 'object') throw RpcError.badRequest(); if ((message as any).id === undefined) { const notification = message as schema.JsonRpc2NotificationMessage; const data = notification.params; @@ -130,12 +130,12 @@ export class JsonRpc2RpcMessageCodec implements RpcMessageCodec { const request = message as schema.JsonRpc2RequestMessage; const data = request.params; const value = data === undefined ? undefined : new Value(request.params, undefined); - if (typeof request.id !== 'number') throw RpcError.invalidRequest(); + if (typeof request.id !== 'number') throw RpcError.badRequest(); return new msg.RequestCompleteMessage(request.id, request.method, value); } if ((message as schema.JsonRpc2ResponseMessage).result !== undefined) { const response = message as schema.JsonRpc2ResponseMessage; - if (typeof response.id !== 'number') throw RpcError.invalidRequest(); + if (typeof response.id !== 'number') throw RpcError.badRequest(); const data = response.result; const value = data === undefined ? undefined : new Value(response.result, undefined); return new msg.ResponseCompleteMessage(response.id, value); @@ -143,9 +143,9 @@ export class JsonRpc2RpcMessageCodec implements RpcMessageCodec { if ((message as schema.JsonRpc2ErrorMessage).error !== undefined) { const response = message as schema.JsonRpc2ErrorMessage; const value = new Value(response.error.data, undefined); - if (typeof response.id !== 'number') throw RpcError.invalidRequest(); + if (typeof response.id !== 'number') throw RpcError.badRequest(); return new msg.ResponseErrorMessage(response.id, value); } - throw RpcError.invalidRequest(); + throw RpcError.badRequest(); } } diff --git a/src/reactive-rpc/common/rpc/caller/RpcCaller.ts b/src/reactive-rpc/common/rpc/caller/RpcCaller.ts index 7ed6cac788..d1987f1ccd 100644 --- a/src/reactive-rpc/common/rpc/caller/RpcCaller.ts +++ b/src/reactive-rpc/common/rpc/caller/RpcCaller.ts @@ -22,7 +22,7 @@ export interface RpcApiCallerOptions { wrapInternalError?: (error: unknown) => unknown; } -const INVALID_REQUEST_ERROR_VALUE = RpcError.value(RpcError.invalidRequest()); +const INVALID_REQUEST_ERROR_VALUE = RpcError.value(RpcError.badRequest()); const defaultWrapInternalError = (error: unknown) => RpcError.valueFrom(error); diff --git a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts index c1711862f8..30d2de254a 100644 --- a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts +++ b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts @@ -48,8 +48,7 @@ export class RpcError extends Error implements IRpcError { return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message); } - /** @todo Rename to "badRequest". */ - public static invalidRequest(): RpcError { + public static badRequest(): RpcError { return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, 'Bad Request'); } From eb1a5f9e9cbe8a9ebe21a84964f3cdbc44c5684a Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 10:16:31 +0100 Subject: [PATCH 10/25] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20impr?= =?UTF-8?q?ove=20decoding=20error=20handling?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/server/uws/RpcApp.ts | 16 +++++++++++----- src/reactive-rpc/server/uws/types.ts | 2 +- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index 8bc255ff39..7c50d73026 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -129,7 +129,7 @@ export class RpcApp { if (res.aborted) return; res.end(buf); } catch (err: any) { - if (typeof err === 'object' && err) if (err.message === 'Invalid JSON') throw RpcError.invalidRequest(); + if (typeof err === 'object' && err) if (err.message === 'Invalid JSON') throw RpcError.badRequest(); throw RpcError.from(err); } }); @@ -139,6 +139,7 @@ export class RpcApp { public enableWsRpc(path: string = '/rpc'): this { const maxBackpressure = 4 * 1024 * 1024; const augmentContext = this.options.augmentContext ?? noop; + const logger = this.options.logger ?? console; this.app.ws(path, { idleTimeout: 0, maxPayloadLength: 4 * 1024 * 1024, @@ -180,10 +181,15 @@ export class RpcApp { const uint8 = new Uint8Array(buf); const rpc = ws.rpc!; try { - const messages = msgCodec.decodeBatch(reqCodec, uint8); - rpc.onMessages(messages as ReactiveRpcClientMessage[], ctx); + const messages = msgCodec.decodeBatch(reqCodec, uint8) as ReactiveRpcClientMessage[]; + try { + rpc.onMessages(messages, ctx); + } catch (error) { + logger.error('RX_RPC_PROCESSING_ERROR', error, Buffer.from(uint8).toString()); + return; + } } catch (error) { - rpc.sendNotification('.err', RpcError.value(RpcError.invalidRequest())); + logger.error('RX_RPC_DECODING_ERROR', error, Buffer.from(uint8).toString()); } }, close: (ws_: types.WebSocket, code: number, message: ArrayBuffer) => { @@ -239,7 +245,7 @@ export class RpcApp { }); return; } - logger.error(err); + logger.error('UWS_ROUTER_INTERNAL_ERROR', err); res.cork(() => { res.writeStatus(HDR_INTERNAL_SERVER_ERROR); res.end(RpcErrorType.encode(responseCodec, ERR_INTERNAL)); diff --git a/src/reactive-rpc/server/uws/types.ts b/src/reactive-rpc/server/uws/types.ts index 54d2c38095..dce0c2121b 100644 --- a/src/reactive-rpc/server/uws/types.ts +++ b/src/reactive-rpc/server/uws/types.ts @@ -27,5 +27,5 @@ export interface RpcWebSocket export interface ServerLogger { log(msg: unknown): void; - error(msg: unknown): void; + error(kind: string, error?: Error | unknown | null, meta?: unknown): void; } From 294d3b9c4e4063f54a425878d3b9accba065980f Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 10:48:18 +0100 Subject: [PATCH 11/25] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20impr?= =?UTF-8?q?ove=20error=20reporting?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/rpc/RpcMessageStreamProcessor.ts | 20 +++++-------------- src/reactive-rpc/server/uws/RpcApp.ts | 4 ++-- 2 files changed, 7 insertions(+), 17 deletions(-) diff --git a/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts b/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts index 19fadb29a5..e2e1a0466e 100644 --- a/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts +++ b/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts @@ -67,34 +67,24 @@ export class RpcMessageStreamProcessor { /** * Processes a single incoming Reactive-RPC message. * - * This method will not throw. - * * @param message A single Reactive-RPC message. * @param ctx Server context. */ public onMessage(message: msg.ReactiveRpcClientMessage, ctx: Ctx): void { - try { - /** @todo perf: could switch statement with message.constructor help here? */ - if (message instanceof msg.RequestDataMessage) this.onRequestDataMessage(message, ctx); - else if (message instanceof msg.RequestCompleteMessage) this.onRequestCompleteMessage(message, ctx); - else if (message instanceof msg.RequestErrorMessage) this.onRequestErrorMessage(message, ctx); - else if (message instanceof msg.NotificationMessage) this.onNotificationMessage(message, ctx); - else if (message instanceof msg.ResponseUnsubscribeMessage) this.onUnsubscribeMessage(message); - } catch (error) { - this.sendNotification('.err', RpcError.valueFrom(error)); - } + if (message instanceof msg.RequestDataMessage) this.onRequestDataMessage(message, ctx); + else if (message instanceof msg.RequestCompleteMessage) this.onRequestCompleteMessage(message, ctx); + else if (message instanceof msg.RequestErrorMessage) this.onRequestErrorMessage(message, ctx); + else if (message instanceof msg.NotificationMessage) this.onNotificationMessage(message, ctx); + else if (message instanceof msg.ResponseUnsubscribeMessage) this.onUnsubscribeMessage(message); } /** * Receives a list of all incoming messages from the client to process. * - * This method will not throw. - * * @param messages A list of received messages. * @param ctx Server context. */ public onMessages(messages: msg.ReactiveRpcClientMessage[], ctx: Ctx): void { - // This method should not throw. const length = messages.length; for (let i = 0; i < length; i++) this.onMessage(messages[i], ctx); } diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index 7c50d73026..b73a246bc0 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -185,11 +185,11 @@ export class RpcApp { try { rpc.onMessages(messages, ctx); } catch (error) { - logger.error('RX_RPC_PROCESSING_ERROR', error, Buffer.from(uint8).toString()); + logger.error('RX_RPC_PROCESSING_ERROR', error, messages); return; } } catch (error) { - logger.error('RX_RPC_DECODING_ERROR', error, Buffer.from(uint8).toString()); + logger.error('RX_RPC_DECODING_ERROR', error, {codec: reqCodec.id, buf: Buffer.from(uint8).toString()}); } }, close: (ws_: types.WebSocket, code: number, message: ArrayBuffer) => { From 409c7bef60e15e7ab2d6b85a7a135b08fb2169d2 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 10:53:49 +0100 Subject: [PATCH 12/25] =?UTF-8?q?chore:=20=F0=9F=A4=96=20run=20Prettier=20?= =?UTF-8?q?and=20remove=20old=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../uws/ws/rx-rpc/RpcPersistentClient.spec.ts | 9 ++-- src/reactive-rpc/common/channel/channel.ts | 2 +- .../RpcMessageStreamProcessor.spec.ts | 39 ------------------ src/server/test.ts | 41 ++++++++++--------- 4 files changed, 28 insertions(+), 63 deletions(-) diff --git a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts index 62e4ee5a83..4dfe9d151d 100644 --- a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts +++ b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts @@ -3,7 +3,7 @@ */ import {ApiTestSetup, runApiTests} from '../../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; -import WebSocket from "ws"; +import WebSocket from 'ws'; import {RpcCodecs} from '../../../../../reactive-rpc/common/codec/RpcCodecs'; import {RpcMessageCodecs} from '../../../../../reactive-rpc/common/codec/RpcMessageCodecs'; import {Writer} from '../../../../../util/buffers/Writer'; @@ -60,9 +60,10 @@ if (process.env.TEST_E2E) { channel: { newChannel: () => new WebSocketChannel({ - newSocket: () => new WebSocket(url, { - protocol: protocolSpecifier, - }) as any, + newSocket: () => + new WebSocket(url, { + protocol: protocolSpecifier, + }) as any, }), }, }); diff --git a/src/reactive-rpc/common/channel/channel.ts b/src/reactive-rpc/common/channel/channel.ts index 93947400b2..e434c6e0fe 100644 --- a/src/reactive-rpc/common/channel/channel.ts +++ b/src/reactive-rpc/common/channel/channel.ts @@ -221,7 +221,7 @@ export class PersistentChannel !!channel), switchMap((channel) => channel!.message$), - tap(msg => console.log('->', msg)) + tap((msg) => console.log('->', msg)), ); /** Number of times we have attempted to reconnect. */ diff --git a/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts b/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts index 501af7941e..a5fa6f537b 100644 --- a/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts +++ b/src/reactive-rpc/common/rpc/__tests__/RpcMessageStreamProcessor.spec.ts @@ -146,45 +146,6 @@ test('does not execute any methods on initialization', async () => { expect(send).toHaveBeenCalledTimes(0); }); -test('sends error notification on empty notification name', async () => { - const {server, send} = setup(); - expect(send).toHaveBeenCalledTimes(0); - server.onMessages([new NotificationMessage('', val(new Uint8Array([1])))], undefined); - expect(send).toHaveBeenCalledTimes(1); - expect(send.mock.calls[0][0][0]).toBeInstanceOf(NotificationMessage); - expect(send.mock.calls[0][0][0].value.data).toEqual(RpcError.fromCode(RpcErrorCodes.INVALID_METHOD)); -}); - -test('sends error notification when notification name longer than 128 chars', async () => { - const {server, send} = setup(); - expect(send).toHaveBeenCalledTimes(0); - server.onMessages( - [ - new NotificationMessage( - '012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678901234567890123456789012345678', - val(new Uint8Array([1])), - ), - ], - undefined, - ); - expect(send).toHaveBeenCalledTimes(1); - expect(send.mock.calls[0][0][0]).toBeInstanceOf(NotificationMessage); - expect(send.mock.calls[0][0][0].value.data).toEqual(RpcError.fromCode(RpcErrorCodes.INVALID_METHOD)); -}); - -// test('sends error notification when "notify" callback throws', async () => { -// const {server, send} = setup({}); -// const name = 'aga'; -// expect(send).toHaveBeenCalledTimes(0); -// const msg = [new NotificationMessage(name, val(new Uint8Array([1])))]; -// server.onMessages(msg, undefined); -// expect(send).toHaveBeenCalledTimes(1); -// // expect(send.mock.calls[0][0][0]).toBeInstanceOf(NotificationMessage); -// // expect(send.mock.calls[0][0][0].value).toEqual(val({ -// // message: 'test', -// // })); -// }); - test('if RPC method throws, sends back error message', async () => { const {server, send, caller, ctx} = setup(); jest.spyOn(caller, 'call'); diff --git a/src/server/test.ts b/src/server/test.ts index 9bb9e5a001..f81cb17c2f 100644 --- a/src/server/test.ts +++ b/src/server/test.ts @@ -1,13 +1,13 @@ // Run: npx ts-node src/server/test.ts -import WebSocket from "ws"; -import {CborJsonValueCodec} from "../json-pack/codecs/cbor"; -import {RpcPersistentClient, WebSocketChannel} from "../reactive-rpc/common"; -import {RpcCodec} from "../reactive-rpc/common/codec/RpcCodec"; -import {BinaryRpcMessageCodec} from "../reactive-rpc/common/codec/binary"; -import {Writer} from "../util/buffers/Writer"; -import {JsonJsonValueCodec} from "../json-pack/codecs/json"; -import {CompactRpcMessageCodec} from "../reactive-rpc/common/codec/compact"; +import WebSocket from 'ws'; +import {CborJsonValueCodec} from '../json-pack/codecs/cbor'; +import {RpcPersistentClient, WebSocketChannel} from '../reactive-rpc/common'; +import {RpcCodec} from '../reactive-rpc/common/codec/RpcCodec'; +import {BinaryRpcMessageCodec} from '../reactive-rpc/common/codec/binary'; +import {Writer} from '../util/buffers/Writer'; +import {JsonJsonValueCodec} from '../json-pack/codecs/json'; +import {CompactRpcMessageCodec} from '../reactive-rpc/common/codec/compact'; const writer = new Writer(); const codec = new RpcCodec(new CborJsonValueCodec(writer), new BinaryRpcMessageCodec()); @@ -17,11 +17,12 @@ const client = new RpcPersistentClient({ channel: { newChannel: () => new WebSocketChannel({ - newSocket: () => new WebSocket('ws://127.0.0.1:9999/rpc', { - // protocol: 'rpc.rx.compact.json', - protocol: 'rpc.rx.binary.cbor', - perMessageDeflate: false, - }) as any, + newSocket: () => + new WebSocket('ws://127.0.0.1:9999/rpc', { + // protocol: 'rpc.rx.compact.json', + protocol: 'rpc.rx.binary.cbor', + perMessageDeflate: false, + }) as any, // newSocket: () => new WebSocket(this.wsHost, 'rpc.rx.compact.json'), }), }, @@ -31,11 +32,14 @@ client.start(); console.log('call'); client.notify('.ping', {}); -client.call('util.ping', {}).then((value) => { - console.log('then', value); -}).catch((error) => { - console.log('catch', error); -}); +client + .call('util.ping', {}) + .then((value) => { + console.log('then', value); + }) + .catch((error) => { + console.log('catch', error); + }); // const ws = new WebSocket('ws://127.0.0.1:9999/rpc', { // protocol: 'rpc.rx.compact.json', @@ -47,4 +51,3 @@ client.call('util.ping', {}).then((value) => { // console.log('open'); // ws.send(JSON.stringify([1,1,'util.ping',{}])); // }; - From 83166bb6d6bc5aa0f375b75d7bed90e7159901df Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 12:17:14 +0100 Subject: [PATCH 13/25] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20impr?= =?UTF-8?q?ove=20how=20WebSocket=20context=20is=20constructed?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/common/channel/channel.ts | 1 - src/reactive-rpc/server/context.ts | 74 +++++++++++++++------- src/reactive-rpc/server/uws/RpcApp.ts | 2 +- 3 files changed, 53 insertions(+), 24 deletions(-) diff --git a/src/reactive-rpc/common/channel/channel.ts b/src/reactive-rpc/common/channel/channel.ts index e434c6e0fe..10690944f1 100644 --- a/src/reactive-rpc/common/channel/channel.ts +++ b/src/reactive-rpc/common/channel/channel.ts @@ -221,7 +221,6 @@ export class PersistentChannel !!channel), switchMap((channel) => channel!.message$), - tap((msg) => console.log('->', msg)), ); /** Number of times we have attempted to reconnect. */ diff --git a/src/reactive-rpc/server/context.ts b/src/reactive-rpc/server/context.ts index 83750be6d7..12600d6832 100644 --- a/src/reactive-rpc/server/context.ts +++ b/src/reactive-rpc/server/context.ts @@ -12,16 +12,13 @@ const X_AUTH_PARAM_LENGTH = X_AUTH_PARAM.length; const CODECS_REGEX = /rpc.(\w{0,32})\.(\w{0,32})\.(\w{0,32})(?:\-(\w{0,32}))?/; export class ConnectionContext> { - public static fromReqRes( - req: HttpRequest, - res: HttpResponse, - params: string[] | null, - app: RpcApp, - ): ConnectionContext { - const ip = - req.getHeader('x-forwarded-for') || + private static findIp(req: HttpRequest, res: HttpResponse): string { + return req.getHeader('x-forwarded-for') || req.getHeader('x-real-ip') || Buffer.from(res.getRemoteAddressAsText()).toString(); + } + + private static findToken(req: HttpRequest, params: string[] | null): string { let token: string = req.getHeader('authorization') || ''; if (!token) { const query = req.getQuery(); @@ -29,20 +26,53 @@ export class ConnectionContext> { token = params.get('access_token') || ''; if (!token) token = params.get('token') || ''; } - let secWebSocketProtocol: string = ''; - if (!token) { - secWebSocketProtocol = String(req.getHeader('sec-websocket-protocol')) || ''; - if (secWebSocketProtocol) { - const protocols = secWebSocketProtocol.split(','); - const length = protocols.length; - for (let i = 0; i < length; i++) { - let protocol = protocols[i].trim(); - if (protocol.startsWith(X_AUTH_PARAM)) { - protocol = protocol.slice(X_AUTH_PARAM_LENGTH); - if (protocol) { - token = Buffer.from(protocol, 'base64').toString(); - break; - } + return token; + } + + public static fromReqRes( + req: HttpRequest, + res: HttpResponse, + params: string[] | null, + app: RpcApp, + ): ConnectionContext { + const ip = ConnectionContext.findIp(req, res); + const token: string = ConnectionContext.findToken(req, params); + const codecs = app.codecs; + const valueCodecs = codecs.value; + const ctx = new ConnectionContext( + ip, + token, + params, + new NullObject(), + valueCodecs.json, + valueCodecs.json, + codecs.messages.compact, + res, + ); + const contentType = req.getHeader('content-type'); + if (contentType) ctx.setCodecs(contentType, codecs); + return ctx; + } + + public static fromWs( + req: HttpRequest, + res: HttpResponse, + secWebSocketProtocol: string, + params: string[] | null, + app: RpcApp, + ): ConnectionContext { + const ip = ConnectionContext.findIp(req, res); + let token: string = ConnectionContext.findToken(req, params); + if (!token && secWebSocketProtocol) { + const protocols = secWebSocketProtocol.split(','); + const length = protocols.length; + for (let i = 0; i < length; i++) { + let protocol = protocols[i].trim(); + if (protocol.startsWith(X_AUTH_PARAM)) { + protocol = protocol.slice(X_AUTH_PARAM_LENGTH); + if (protocol) { + token = Buffer.from(protocol, 'base64').toString(); + break; } } } diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index b73a246bc0..20f3c1e7e1 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -147,7 +147,7 @@ export class RpcApp { const secWebSocketKey = req.getHeader('sec-websocket-key'); const secWebSocketProtocol = req.getHeader('sec-websocket-protocol'); const secWebSocketExtensions = req.getHeader('sec-websocket-extensions'); - const ctx = ConnectionContext.fromReqRes(req, res, null, this); + const ctx = ConnectionContext.fromWs(req, res, secWebSocketProtocol, null, this); augmentContext(ctx); /* This immediately calls open handler, you must not use res after this call */ res.upgrade({ctx}, secWebSocketKey, secWebSocketProtocol, secWebSocketExtensions, context); From e018a72b56dc3a5b5c7dc590b74d2c8ee4bcc247 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 12:46:09 +0100 Subject: [PATCH 14/25] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20make?= =?UTF-8?q?=20E2E=20Websocket=20tests=20run?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../uws/ws/rx-rpc/RpcPersistentClient.spec.ts | 22 ++++---- src/reactive-rpc/__demos__/server.ts | 34 ++---------- .../common/rpc/RpcMessageStreamProcessor.ts | 3 +- src/server/test.ts | 53 ------------------- 4 files changed, 15 insertions(+), 97 deletions(-) delete mode 100644 src/server/test.ts diff --git a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts index 4dfe9d151d..34fce8ebcf 100644 --- a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts +++ b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts @@ -10,7 +10,6 @@ import {Writer} from '../../../../../util/buffers/Writer'; import {Codecs} from '../../../../../json-pack/codecs/Codecs'; import {RpcMessageCodec} from '../../../../../reactive-rpc/common/codec/types'; import {JsonValueCodec} from '../../../../../json-pack/codecs/types'; -import {FetchRpcClient} from '../../../../../reactive-rpc/common/rpc/client/FetchRpcClient'; import {RpcCodec} from '../../../../../reactive-rpc/common/codec/RpcCodec'; import {RpcPersistentClient, WebSocketChannel} from '../../../../../reactive-rpc/common'; @@ -20,8 +19,8 @@ if (process.env.TEST_E2E) { const {json, cbor, msgpack} = codecs.value; const cases: [specifier: string, protocol: RpcMessageCodec, req: JsonValueCodec, res: JsonValueCodec][] = [ ['rpc.rx.compact.json', compact, json, json], - // ['rpc.rx.compact.cbor', compact, cbor, cbor], - // ['rpc.rx.compact.msgpack', compact, msgpack, msgpack], + ['rpc.rx.compact.cbor', compact, cbor, cbor], + ['rpc.rx.compact.msgpack', compact, msgpack, msgpack], // ['rpc.rx.compact.json-cbor', compact, json, cbor], // ['rpc.rx.compact.json-msgpack', compact, json, msgpack], // ['rpc.rx.compact.cbor-json', compact, cbor, json], @@ -29,8 +28,8 @@ if (process.env.TEST_E2E) { // ['rpc.rx.compact.msgpack-json', compact, msgpack, json], // ['rpc.rx.compact.msgpack-cbor', compact, msgpack, cbor], - // ['rpc.rx.binary.cbor', binary, cbor, cbor], - // ['rpc.rx.binary.msgpack', binary, msgpack, msgpack], + ['rpc.rx.binary.cbor', binary, cbor, cbor], + ['rpc.rx.binary.msgpack', binary, msgpack, msgpack], // ['rpc.rx.binary.json', binary, json, json], // ['rpc.rx.binary.json-cbor', binary, json, cbor], // ['rpc.rx.binary.json-msgpack', binary, json, msgpack], @@ -39,9 +38,9 @@ if (process.env.TEST_E2E) { // ['rpc.rx.binary.msgpack-json', binary, msgpack, json], // ['rpc.rx.binary.msgpack-cbor', binary, msgpack, cbor], - // ['rpc.json2.verbose.json', jsonRpc2, json, json], - // ['rpc.json2.verbose.cbor', jsonRpc2, cbor, cbor], - // ['rpc.json2.verbose.msgpack', jsonRpc2, msgpack, msgpack], + ['rpc.json2.verbose.json', jsonRpc2, json, json], + ['rpc.json2.verbose.cbor', jsonRpc2, cbor, cbor], + ['rpc.json2.verbose.msgpack', jsonRpc2, msgpack, msgpack], // ['rpc.json2.verbose.json-cbor', jsonRpc2, json, cbor], // ['rpc.json2.verbose.json-msgpack', jsonRpc2, json, msgpack], // ['rpc.json2.verbose.cbor-json', jsonRpc2, cbor, json], @@ -61,16 +60,15 @@ if (process.env.TEST_E2E) { newChannel: () => new WebSocketChannel({ newSocket: () => - new WebSocket(url, { - protocol: protocolSpecifier, - }) as any, + new WebSocket(url, [protocolSpecifier]) as any, }), }, }); + client.start(); return {client}; }; describe(`protocol: application/x.${protocolSpecifier}`, () => { - runApiTests(setup, {staticOnly: true}); + runApiTests(setup); }); } } else { diff --git a/src/reactive-rpc/__demos__/server.ts b/src/reactive-rpc/__demos__/server.ts index 76692195a0..7f71178f21 100644 --- a/src/reactive-rpc/__demos__/server.ts +++ b/src/reactive-rpc/__demos__/server.ts @@ -3,38 +3,10 @@ import {App} from 'uWebSockets.js'; import {createCaller} from '../common/rpc/__tests__/sample-api'; import {RpcApp} from '../server/uws/RpcApp'; -import {Codecs} from '../../json-pack/codecs/Codecs'; -import {Writer} from '../../util/buffers/Writer'; -const uws = App({}); -const caller = createCaller(); -const codecs = new Codecs(new Writer()); const app = new RpcApp({ - uws, - caller, - codecs, - maxRequestBodySize: 1024 * 1024, - augmentContext: (ctx) => ctx, + uws: App({}), + caller: createCaller(), }); -app.enableCors(); -app.enableHttpPing(); -app.route('POST', '/echo', async (ctx) => { - const json = await ctx.requestBodyJson(1024); - return json; -}); -app.enableHttpRpc(); -app.enableWsRpc(); -app.startRouting(); - -const port = +(process.env.PORT || 9999); - -uws.listen(port, (token) => { - if (token) { - // tslint:disable-next-line no-console - console.log({msg: 'SERVER_STARTED', url: `http://localhost:${port}`}); - } else { - // tslint:disable-next-line no-console - console.error(`Failed to listen on ${port} port.`); - } -}); +app.startWithDefaults(); diff --git a/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts b/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts index e2e1a0466e..b90f3652a0 100644 --- a/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts +++ b/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts @@ -242,6 +242,7 @@ export class RpcMessageStreamProcessor { public onNotificationMessage(message: msg.NotificationMessage, ctx: Ctx): void { const {method, value} = message; if (!method || method.length > 128) throw RpcError.fromCode(RpcErrorCodes.INVALID_METHOD); - this.caller.notification(method, value.data, ctx).catch((error: unknown) => {}); + const request = value && (typeof value === 'object') ? value?.data : undefined; + this.caller.notification(method, request, ctx).catch((error: unknown) => {}); } } diff --git a/src/server/test.ts b/src/server/test.ts deleted file mode 100644 index f81cb17c2f..0000000000 --- a/src/server/test.ts +++ /dev/null @@ -1,53 +0,0 @@ -// Run: npx ts-node src/server/test.ts - -import WebSocket from 'ws'; -import {CborJsonValueCodec} from '../json-pack/codecs/cbor'; -import {RpcPersistentClient, WebSocketChannel} from '../reactive-rpc/common'; -import {RpcCodec} from '../reactive-rpc/common/codec/RpcCodec'; -import {BinaryRpcMessageCodec} from '../reactive-rpc/common/codec/binary'; -import {Writer} from '../util/buffers/Writer'; -import {JsonJsonValueCodec} from '../json-pack/codecs/json'; -import {CompactRpcMessageCodec} from '../reactive-rpc/common/codec/compact'; - -const writer = new Writer(); -const codec = new RpcCodec(new CborJsonValueCodec(writer), new BinaryRpcMessageCodec()); -// const codec = new RpcCodec(new JsonJsonValueCodec(writer), new CompactRpcMessageCodec()); -const client = new RpcPersistentClient({ - codec, - channel: { - newChannel: () => - new WebSocketChannel({ - newSocket: () => - new WebSocket('ws://127.0.0.1:9999/rpc', { - // protocol: 'rpc.rx.compact.json', - protocol: 'rpc.rx.binary.cbor', - perMessageDeflate: false, - }) as any, - // newSocket: () => new WebSocket(this.wsHost, 'rpc.rx.compact.json'), - }), - }, -}); - -client.start(); - -console.log('call'); -client.notify('.ping', {}); -client - .call('util.ping', {}) - .then((value) => { - console.log('then', value); - }) - .catch((error) => { - console.log('catch', error); - }); - -// const ws = new WebSocket('ws://127.0.0.1:9999/rpc', { -// protocol: 'rpc.rx.compact.json', -// }); - -// ws.onmessage = (msg) => console.log('msg', msg.data); - -// ws.onopen = () => { -// console.log('open'); -// ws.send(JSON.stringify([1,1,'util.ping',{}])); -// }; From 10ef9ebbaec5953c072936ae1cb3217ea24e9295 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 13:18:05 +0100 Subject: [PATCH 15/25] =?UTF-8?q?fix(reactive-rpc):=20=F0=9F=90=9B=20alway?= =?UTF-8?q?s=20use=20binary=20encoding=20for=20Websocket=20messages?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts | 2 +- src/reactive-rpc/server/uws/RpcApp.ts | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts index 34fce8ebcf..6669d0c40d 100644 --- a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts +++ b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts @@ -30,7 +30,7 @@ if (process.env.TEST_E2E) { ['rpc.rx.binary.cbor', binary, cbor, cbor], ['rpc.rx.binary.msgpack', binary, msgpack, msgpack], - // ['rpc.rx.binary.json', binary, json, json], + ['rpc.rx.binary.json', binary, json, json], // ['rpc.rx.binary.json-cbor', binary, json, cbor], // ['rpc.rx.binary.json-msgpack', binary, json, msgpack], // ['rpc.rx.binary.cbor-json', binary, cbor, json], diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index 20f3c1e7e1..dd47fa0ec6 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -158,7 +158,6 @@ export class RpcApp { const resCodec = ctx.resCodec; const msgCodec = ctx.msgCodec; const encoder = resCodec.encoder; - const isBinary = resCodec.format !== EncodingFormat.Json || msgCodec.format === RpcMessageFormat.Binary; ws.rpc = new RpcMessageStreamProcessor({ caller: this.options.caller, send: (messages: ReactiveRpcMessage[]) => { @@ -167,7 +166,7 @@ export class RpcApp { writer.reset(); msgCodec.encodeBatch(resCodec, messages); const encoded = writer.flush(); - ws.send(encoded, isBinary, false); + ws.send(encoded, true, false); }, bufferSize: 1, bufferTime: 0, From 3f43171ec51e25dafbc6206cae2614608424ea34 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 13:18:31 +0100 Subject: [PATCH 16/25] =?UTF-8?q?style(reactive-rpc):=20=F0=9F=92=84=20run?= =?UTF-8?q?=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts | 3 +-- src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts | 2 +- src/reactive-rpc/server/context.ts | 6 ++++-- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts index 6669d0c40d..18907fc7a5 100644 --- a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts +++ b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts @@ -59,8 +59,7 @@ if (process.env.TEST_E2E) { channel: { newChannel: () => new WebSocketChannel({ - newSocket: () => - new WebSocket(url, [protocolSpecifier]) as any, + newSocket: () => new WebSocket(url, [protocolSpecifier]) as any, }), }, }); diff --git a/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts b/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts index b90f3652a0..8371e7b4c1 100644 --- a/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts +++ b/src/reactive-rpc/common/rpc/RpcMessageStreamProcessor.ts @@ -242,7 +242,7 @@ export class RpcMessageStreamProcessor { public onNotificationMessage(message: msg.NotificationMessage, ctx: Ctx): void { const {method, value} = message; if (!method || method.length > 128) throw RpcError.fromCode(RpcErrorCodes.INVALID_METHOD); - const request = value && (typeof value === 'object') ? value?.data : undefined; + const request = value && typeof value === 'object' ? value?.data : undefined; this.caller.notification(method, request, ctx).catch((error: unknown) => {}); } } diff --git a/src/reactive-rpc/server/context.ts b/src/reactive-rpc/server/context.ts index 12600d6832..06a2db7e00 100644 --- a/src/reactive-rpc/server/context.ts +++ b/src/reactive-rpc/server/context.ts @@ -13,9 +13,11 @@ const CODECS_REGEX = /rpc.(\w{0,32})\.(\w{0,32})\.(\w{0,32})(?:\-(\w{0,32}))?/; export class ConnectionContext> { private static findIp(req: HttpRequest, res: HttpResponse): string { - return req.getHeader('x-forwarded-for') || + return ( + req.getHeader('x-forwarded-for') || req.getHeader('x-real-ip') || - Buffer.from(res.getRemoteAddressAsText()).toString(); + Buffer.from(res.getRemoteAddressAsText()).toString() + ); } private static findToken(req: HttpRequest, params: string[] | null): string { From fe690d8b12e9df98621597877cb418bca8e450d5 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 13:36:01 +0100 Subject: [PATCH 17/25] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20add?= =?UTF-8?q?=20ability=20to=20store=20response=20codec=20in=20RpcCodec?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../uws/ws/rx-rpc/RpcPersistentClient.spec.ts | 4 ++-- src/reactive-rpc/common/codec/RpcCodec.ts | 16 +++++++++++----- .../rpc/__tests__/RpcPersistentClient.spec.ts | 2 +- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts index 18907fc7a5..d75a0f6b4d 100644 --- a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts +++ b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts @@ -53,13 +53,13 @@ if (process.env.TEST_E2E) { const setup: ApiTestSetup = async () => { const port = +(process.env.PORT || 9999); const url = `ws://localhost:${port}/rpc`; - const codec = new RpcCodec(reqCodec, msgCodec); + const codec = new RpcCodec(msgCodec, reqCodec, resCodec); const client = new RpcPersistentClient({ codec, channel: { newChannel: () => new WebSocketChannel({ - newSocket: () => new WebSocket(url, [protocolSpecifier]) as any, + newSocket: () => new WebSocket(url, [codec.specifier()]) as any, }), }, }); diff --git a/src/reactive-rpc/common/codec/RpcCodec.ts b/src/reactive-rpc/common/codec/RpcCodec.ts index 02de481b45..9dcbbda632 100644 --- a/src/reactive-rpc/common/codec/RpcCodec.ts +++ b/src/reactive-rpc/common/codec/RpcCodec.ts @@ -1,24 +1,30 @@ +import type {RpcSpecifier} from '../rpc'; import type {ReactiveRpcMessage} from '../messages'; import type {JsonValueCodec} from '../../../json-pack/codecs/types'; import type {RpcMessageCodec} from './types'; export class RpcCodec { - constructor(public readonly valueCodec: JsonValueCodec, public readonly messageCodec: RpcMessageCodec) {} + constructor(public readonly msg: RpcMessageCodec, public readonly req: JsonValueCodec, public readonly res: JsonValueCodec) {} + + public specifier(): RpcSpecifier { + const specifier = `rpc.${this.msg.id}.${this.req.id}` + (this.req.id !== this.res.id ? `-${this.res.id}` : ''); + return specifier as RpcSpecifier; + } public encode(messages: ReactiveRpcMessage[]): Uint8Array { - const valueCodec = this.valueCodec; + const valueCodec = this.res; const encoder = valueCodec.encoder; const writer = encoder.writer; writer.reset(); - this.messageCodec.encodeBatch(valueCodec, messages); + this.msg.encodeBatch(valueCodec, messages); return writer.flush(); } public decode(data: Uint8Array): ReactiveRpcMessage[] { - const valueCodec = this.valueCodec; + const valueCodec = this.req; const decoder = valueCodec.decoder; const reader = decoder.reader; reader.reset(data); - return this.messageCodec.decodeBatch(valueCodec, data); + return this.msg.decodeBatch(valueCodec, data); } } diff --git a/src/reactive-rpc/common/rpc/__tests__/RpcPersistentClient.spec.ts b/src/reactive-rpc/common/rpc/__tests__/RpcPersistentClient.spec.ts index d88b237ee4..92125d1aee 100644 --- a/src/reactive-rpc/common/rpc/__tests__/RpcPersistentClient.spec.ts +++ b/src/reactive-rpc/common/rpc/__tests__/RpcPersistentClient.spec.ts @@ -15,7 +15,7 @@ test('on remote method execution, sends message over WebSocket only once', async const ws = new Ws(''); const valueCodecs = new Codecs(new Writer(128)); const messageCodecs = new RpcMessageCodecs(); - const codec = new RpcCodec(valueCodecs.cbor, messageCodecs.compact); + const codec = new RpcCodec(messageCodecs.compact, valueCodecs.cbor, valueCodecs.cbor); const client = new RpcPersistentClient({ channel: { newChannel: () => From 803401bf3f24a441402a9a58460095adcd3172a7 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 13:58:29 +0100 Subject: [PATCH 18/25] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20supp?= =?UTF-8?q?ort=20different=20request/response=20codecs=20in=20Websocket=20?= =?UTF-8?q?rpc?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../uws/ws/rx-rpc/RpcPersistentClient.spec.ts | 36 +++++++++---------- src/reactive-rpc/common/codec/RpcCodec.ts | 6 ++-- .../common/rpc/RpcPersistentClient.ts | 13 +++++-- .../rpc/__tests__/RpcPersistentClient.spec.ts | 2 +- 4 files changed, 31 insertions(+), 26 deletions(-) diff --git a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts index d75a0f6b4d..d9977813e1 100644 --- a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts +++ b/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts @@ -21,32 +21,32 @@ if (process.env.TEST_E2E) { ['rpc.rx.compact.json', compact, json, json], ['rpc.rx.compact.cbor', compact, cbor, cbor], ['rpc.rx.compact.msgpack', compact, msgpack, msgpack], - // ['rpc.rx.compact.json-cbor', compact, json, cbor], - // ['rpc.rx.compact.json-msgpack', compact, json, msgpack], - // ['rpc.rx.compact.cbor-json', compact, cbor, json], - // ['rpc.rx.compact.cbor-msgpack', compact, cbor, msgpack], - // ['rpc.rx.compact.msgpack-json', compact, msgpack, json], - // ['rpc.rx.compact.msgpack-cbor', compact, msgpack, cbor], + ['rpc.rx.compact.json-cbor', compact, json, cbor], + ['rpc.rx.compact.json-msgpack', compact, json, msgpack], + ['rpc.rx.compact.cbor-json', compact, cbor, json], + ['rpc.rx.compact.cbor-msgpack', compact, cbor, msgpack], + ['rpc.rx.compact.msgpack-json', compact, msgpack, json], + ['rpc.rx.compact.msgpack-cbor', compact, msgpack, cbor], ['rpc.rx.binary.cbor', binary, cbor, cbor], ['rpc.rx.binary.msgpack', binary, msgpack, msgpack], ['rpc.rx.binary.json', binary, json, json], - // ['rpc.rx.binary.json-cbor', binary, json, cbor], - // ['rpc.rx.binary.json-msgpack', binary, json, msgpack], - // ['rpc.rx.binary.cbor-json', binary, cbor, json], - // ['rpc.rx.binary.cbor-msgpack', binary, cbor, msgpack], - // ['rpc.rx.binary.msgpack-json', binary, msgpack, json], - // ['rpc.rx.binary.msgpack-cbor', binary, msgpack, cbor], + ['rpc.rx.binary.json-cbor', binary, json, cbor], + ['rpc.rx.binary.json-msgpack', binary, json, msgpack], + ['rpc.rx.binary.cbor-json', binary, cbor, json], + ['rpc.rx.binary.cbor-msgpack', binary, cbor, msgpack], + ['rpc.rx.binary.msgpack-json', binary, msgpack, json], + ['rpc.rx.binary.msgpack-cbor', binary, msgpack, cbor], ['rpc.json2.verbose.json', jsonRpc2, json, json], ['rpc.json2.verbose.cbor', jsonRpc2, cbor, cbor], ['rpc.json2.verbose.msgpack', jsonRpc2, msgpack, msgpack], - // ['rpc.json2.verbose.json-cbor', jsonRpc2, json, cbor], - // ['rpc.json2.verbose.json-msgpack', jsonRpc2, json, msgpack], - // ['rpc.json2.verbose.cbor-json', jsonRpc2, cbor, json], - // ['rpc.json2.verbose.cbor-msgpack', jsonRpc2, cbor, msgpack], - // ['rpc.json2.verbose.msgpack-json', jsonRpc2, msgpack, json], - // ['rpc.json2.verbose.msgpack-cbor', jsonRpc2, msgpack, cbor], + ['rpc.json2.verbose.json-cbor', jsonRpc2, json, cbor], + ['rpc.json2.verbose.json-msgpack', jsonRpc2, json, msgpack], + ['rpc.json2.verbose.cbor-json', jsonRpc2, cbor, json], + ['rpc.json2.verbose.cbor-msgpack', jsonRpc2, cbor, msgpack], + ['rpc.json2.verbose.msgpack-json', jsonRpc2, msgpack, json], + ['rpc.json2.verbose.msgpack-cbor', jsonRpc2, msgpack, cbor], ]; for (const [protocolSpecifier, msgCodec, reqCodec, resCodec] of cases) { diff --git a/src/reactive-rpc/common/codec/RpcCodec.ts b/src/reactive-rpc/common/codec/RpcCodec.ts index 9dcbbda632..37cbafc9c5 100644 --- a/src/reactive-rpc/common/codec/RpcCodec.ts +++ b/src/reactive-rpc/common/codec/RpcCodec.ts @@ -11,8 +11,7 @@ export class RpcCodec { return specifier as RpcSpecifier; } - public encode(messages: ReactiveRpcMessage[]): Uint8Array { - const valueCodec = this.res; + public encode(messages: ReactiveRpcMessage[], valueCodec: JsonValueCodec): Uint8Array { const encoder = valueCodec.encoder; const writer = encoder.writer; writer.reset(); @@ -20,8 +19,7 @@ export class RpcCodec { return writer.flush(); } - public decode(data: Uint8Array): ReactiveRpcMessage[] { - const valueCodec = this.req; + public decode(data: Uint8Array, valueCodec: JsonValueCodec): ReactiveRpcMessage[] { const decoder = valueCodec.decoder; const reader = decoder.reader; reader.reset(data); diff --git a/src/reactive-rpc/common/rpc/RpcPersistentClient.ts b/src/reactive-rpc/common/rpc/RpcPersistentClient.ts index 50870669ce..1e4f3aa1b8 100644 --- a/src/reactive-rpc/common/rpc/RpcPersistentClient.ts +++ b/src/reactive-rpc/common/rpc/RpcPersistentClient.ts @@ -12,6 +12,10 @@ export interface RpcPersistentClientParams { channel: PersistentChannelParams; codec: RpcCodec; client?: Omit; + + /** + * @todo Remove this option. Remove server from here. + */ server?: Omit, 'send'>; /** @@ -28,6 +32,9 @@ export interface RpcPersistentClientParams { pingMethod?: string; } +/** + * RPC client which automatically reconnects if disconnected. + */ export class RpcPersistentClient { public channel: PersistentChannel; public rpc?: RpcDuplex; @@ -45,7 +52,7 @@ export class RpcPersistentClient { client: new StreamingRpcClient({ ...(params.client || {}), send: (messages: msg.ReactiveRpcClientMessage[]): void => { - const encoded = codec.encode(messages); + const encoded = codec.encode(messages, codec.req); this.channel.send$(encoded).subscribe(); }, }), @@ -57,7 +64,7 @@ export class RpcPersistentClient { onNotification: () => {}, }), send: (messages: (msg.ReactiveRpcServerMessage | msg.NotificationMessage)[]): void => { - const encoded = codec.encode(messages); + const encoded = codec.encode(messages, codec.req); this.channel.send$(encoded).subscribe(); }, }), @@ -65,7 +72,7 @@ export class RpcPersistentClient { this.channel.message$.pipe(takeUntil(close$)).subscribe((data) => { const encoded = typeof data === 'string' ? textEncoder.encode(data) : new Uint8Array(data); - const messages = codec.decode(encoded); + const messages = codec.decode(encoded, codec.res); duplex.onMessages((messages instanceof Array ? messages : [messages]) as msg.ReactiveRpcMessage[], {} as Ctx); }); diff --git a/src/reactive-rpc/common/rpc/__tests__/RpcPersistentClient.spec.ts b/src/reactive-rpc/common/rpc/__tests__/RpcPersistentClient.spec.ts index 92125d1aee..80bd1abf1f 100644 --- a/src/reactive-rpc/common/rpc/__tests__/RpcPersistentClient.spec.ts +++ b/src/reactive-rpc/common/rpc/__tests__/RpcPersistentClient.spec.ts @@ -36,7 +36,7 @@ test('on remote method execution, sends message over WebSocket only once', async await until(() => onSend.mock.calls.length === 1); expect(onSend).toHaveBeenCalledTimes(1); const message = onSend.mock.calls[0][0]; - const decoded = codec.decode(message); + const decoded = codec.decode(message, codec.req); const messageDecoded = decoded[0]; expect(messageDecoded).toBeInstanceOf(RequestCompleteMessage); expect(messageDecoded).toMatchObject(new RequestCompleteMessage(1, 'foo.bar', new Value({foo: 'bar'}, undefined))); From acd6379f567f2fa5316b4033b1bf7da0994249e9 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 14:00:34 +0100 Subject: [PATCH 19/25] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20move?= =?UTF-8?q?=20folders=20of=20E2E=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../http/{rx-rpc => }/FetchRpcClient.spec.ts | 17 ++++++++--------- .../{rx-rpc => }/StreamingRpcClient.spec.ts | 16 ++++++++-------- .../{rx-rpc => }/RpcPersistentClient.spec.ts | 18 +++++++++--------- 3 files changed, 25 insertions(+), 26 deletions(-) rename src/__tests__/reactive-rpc/uws/http/{rx-rpc => }/FetchRpcClient.spec.ts (79%) rename src/__tests__/reactive-rpc/uws/http/{rx-rpc => }/StreamingRpcClient.spec.ts (84%) rename src/__tests__/reactive-rpc/uws/ws/{rx-rpc => }/RpcPersistentClient.spec.ts (81%) diff --git a/src/__tests__/reactive-rpc/uws/http/rx-rpc/FetchRpcClient.spec.ts b/src/__tests__/reactive-rpc/uws/http/FetchRpcClient.spec.ts similarity index 79% rename from src/__tests__/reactive-rpc/uws/http/rx-rpc/FetchRpcClient.spec.ts rename to src/__tests__/reactive-rpc/uws/http/FetchRpcClient.spec.ts index 1e3bd2ddc2..bb7244677b 100644 --- a/src/__tests__/reactive-rpc/uws/http/rx-rpc/FetchRpcClient.spec.ts +++ b/src/__tests__/reactive-rpc/uws/http/FetchRpcClient.spec.ts @@ -2,15 +2,14 @@ * @jest-environment node */ -import {ApiTestSetup, runApiTests} from '../../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; - -import {RpcCodecs} from '../../../../../reactive-rpc/common/codec/RpcCodecs'; -import {RpcMessageCodecs} from '../../../../../reactive-rpc/common/codec/RpcMessageCodecs'; -import {Writer} from '../../../../../util/buffers/Writer'; -import {Codecs} from '../../../../../json-pack/codecs/Codecs'; -import {RpcMessageCodec} from '../../../../../reactive-rpc/common/codec/types'; -import {JsonValueCodec} from '../../../../../json-pack/codecs/types'; -import {FetchRpcClient} from '../../../../../reactive-rpc/common/rpc/client/FetchRpcClient'; +import {ApiTestSetup, runApiTests} from '../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; +import {RpcCodecs} from '../../../../reactive-rpc/common/codec/RpcCodecs'; +import {RpcMessageCodecs} from '../../../../reactive-rpc/common/codec/RpcMessageCodecs'; +import {Writer} from '../../../../util/buffers/Writer'; +import {Codecs} from '../../../../json-pack/codecs/Codecs'; +import {RpcMessageCodec} from '../../../../reactive-rpc/common/codec/types'; +import {JsonValueCodec} from '../../../../json-pack/codecs/types'; +import {FetchRpcClient} from '../../../../reactive-rpc/common/rpc/client/FetchRpcClient'; if (process.env.TEST_E2E) { const codecs = new RpcCodecs(new Codecs(new Writer()), new RpcMessageCodecs()); diff --git a/src/__tests__/reactive-rpc/uws/http/rx-rpc/StreamingRpcClient.spec.ts b/src/__tests__/reactive-rpc/uws/http/StreamingRpcClient.spec.ts similarity index 84% rename from src/__tests__/reactive-rpc/uws/http/rx-rpc/StreamingRpcClient.spec.ts rename to src/__tests__/reactive-rpc/uws/http/StreamingRpcClient.spec.ts index 4797896b3b..599bc89c04 100644 --- a/src/__tests__/reactive-rpc/uws/http/rx-rpc/StreamingRpcClient.spec.ts +++ b/src/__tests__/reactive-rpc/uws/http/StreamingRpcClient.spec.ts @@ -2,14 +2,14 @@ * @jest-environment node */ -import {ApiTestSetup, runApiTests} from '../../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; -import {StreamingRpcClient} from '../../../../../reactive-rpc/common'; -import {RpcCodecs} from '../../../../../reactive-rpc/common/codec/RpcCodecs'; -import {RpcMessageCodecs} from '../../../../../reactive-rpc/common/codec/RpcMessageCodecs'; -import {Writer} from '../../../../../util/buffers/Writer'; -import {Codecs} from '../../../../../json-pack/codecs/Codecs'; -import {RpcMessageCodec} from '../../../../../reactive-rpc/common/codec/types'; -import {JsonValueCodec} from '../../../../../json-pack/codecs/types'; +import {ApiTestSetup, runApiTests} from '../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; +import {StreamingRpcClient} from '../../../../reactive-rpc/common'; +import {RpcCodecs} from '../../../../reactive-rpc/common/codec/RpcCodecs'; +import {RpcMessageCodecs} from '../../../../reactive-rpc/common/codec/RpcMessageCodecs'; +import {Writer} from '../../../../util/buffers/Writer'; +import {Codecs} from '../../../../json-pack/codecs/Codecs'; +import {RpcMessageCodec} from '../../../../reactive-rpc/common/codec/types'; +import {JsonValueCodec} from '../../../../json-pack/codecs/types'; if (process.env.TEST_E2E) { const codecs = new RpcCodecs(new Codecs(new Writer()), new RpcMessageCodecs()); diff --git a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts b/src/__tests__/reactive-rpc/uws/ws/RpcPersistentClient.spec.ts similarity index 81% rename from src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts rename to src/__tests__/reactive-rpc/uws/ws/RpcPersistentClient.spec.ts index d9977813e1..c43d70cbf3 100644 --- a/src/__tests__/reactive-rpc/uws/ws/rx-rpc/RpcPersistentClient.spec.ts +++ b/src/__tests__/reactive-rpc/uws/ws/RpcPersistentClient.spec.ts @@ -2,16 +2,16 @@ * @jest-environment node */ -import {ApiTestSetup, runApiTests} from '../../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; +import {ApiTestSetup, runApiTests} from '../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; import WebSocket from 'ws'; -import {RpcCodecs} from '../../../../../reactive-rpc/common/codec/RpcCodecs'; -import {RpcMessageCodecs} from '../../../../../reactive-rpc/common/codec/RpcMessageCodecs'; -import {Writer} from '../../../../../util/buffers/Writer'; -import {Codecs} from '../../../../../json-pack/codecs/Codecs'; -import {RpcMessageCodec} from '../../../../../reactive-rpc/common/codec/types'; -import {JsonValueCodec} from '../../../../../json-pack/codecs/types'; -import {RpcCodec} from '../../../../../reactive-rpc/common/codec/RpcCodec'; -import {RpcPersistentClient, WebSocketChannel} from '../../../../../reactive-rpc/common'; +import {RpcCodecs} from '../../../../reactive-rpc/common/codec/RpcCodecs'; +import {RpcMessageCodecs} from '../../../../reactive-rpc/common/codec/RpcMessageCodecs'; +import {Writer} from '../../../../util/buffers/Writer'; +import {Codecs} from '../../../../json-pack/codecs/Codecs'; +import {RpcMessageCodec} from '../../../../reactive-rpc/common/codec/types'; +import {JsonValueCodec} from '../../../../json-pack/codecs/types'; +import {RpcCodec} from '../../../../reactive-rpc/common/codec/RpcCodec'; +import {RpcPersistentClient, WebSocketChannel} from '../../../../reactive-rpc/common'; if (process.env.TEST_E2E) { const codecs = new RpcCodecs(new Codecs(new Writer()), new RpcMessageCodecs()); From 2fcdf69b0c2842a2d160b267ba0af385aef3a51f Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 14:08:53 +0100 Subject: [PATCH 20/25] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20move?= =?UTF-8?q?=20Reactive=20RPC=20E2E=20tests=20under=20the=20/src/reactive-r?= =?UTF-8?q?pc=20folder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- package.json | 4 ++-- .../__tests__/e2e}/README.md | 2 +- .../__tests__/e2e}/run.ts | 10 ++++------ .../e2e}/uws/http/FetchRpcClient.spec.ts | 16 ++++++++-------- .../e2e}/uws/http/StreamingRpcClient.spec.ts | 16 ++++++++-------- .../e2e}/uws/ws/RpcPersistentClient.spec.ts | 18 +++++++++--------- 6 files changed, 32 insertions(+), 34 deletions(-) rename src/{__tests__/reactive-rpc => reactive-rpc/__tests__/e2e}/README.md (80%) rename src/{__tests__/reactive-rpc => reactive-rpc/__tests__/e2e}/run.ts (89%) rename src/{__tests__/reactive-rpc => reactive-rpc/__tests__/e2e}/uws/http/FetchRpcClient.spec.ts (80%) rename src/{__tests__/reactive-rpc => reactive-rpc/__tests__/e2e}/uws/http/StreamingRpcClient.spec.ts (84%) rename src/{__tests__/reactive-rpc => reactive-rpc/__tests__/e2e}/uws/ws/RpcPersistentClient.spec.ts (81%) diff --git a/package.json b/package.json index 932122037c..7cc86267ef 100644 --- a/package.json +++ b/package.json @@ -53,8 +53,8 @@ "test:cli:pointer": "./bin/json-pointer-test.js ./bin/json-pointer.js", "test:cli:patch": "./bin/json-patch-test.js ./bin/json-patch.js", "test:cli:pack": "./bin/json-pack-test.js ./bin/json-pack.js", - "test:reactive-rpc": "node -r ts-node/register/transpile-only src/__tests__/reactive-rpc/run.ts", - "test:reactive-rpc:jest": "TEST_E2E=1 jest --maxWorkers 1 --no-cache src/__tests__/reactive-rpc/", + "test:reactive-rpc": "node -r ts-node/register/transpile-only src/reactive-rpc/__tests__/e2e/run.ts", + "test:reactive-rpc:jest": "TEST_E2E=1 jest --maxWorkers 1 --no-cache src/reactive-rpc/__tests__/e2e/", "demo:json-patch": "ts-node src/json-patch/__demos__/json-patch.ts", "demo:json-pointer": "ts-node src/json-pointer/__demos__/json-pointer.ts", "demo:reactive-rpc:server": "ts-node src/reactive-rpc/__demos__/server.ts", diff --git a/src/__tests__/reactive-rpc/README.md b/src/reactive-rpc/__tests__/e2e/README.md similarity index 80% rename from src/__tests__/reactive-rpc/README.md rename to src/reactive-rpc/__tests__/e2e/README.md index e9c7f2aa79..6f4e23fcd4 100644 --- a/src/__tests__/reactive-rpc/README.md +++ b/src/reactive-rpc/__tests__/e2e/README.md @@ -22,5 +22,5 @@ yarn test:reactive-rpc:jest To run a specific test suite use this command: ``` -TEST_E2E=1 npx jest --no-cache src/__tests__/reactive-rpc/uws/http/rx-rpc/FetchRpcClient.spec.ts +TEST_E2E=1 npx jest --no-cache src/reactive-rpc/__tests__/e2e/uws/ws/RpcPersistentClient.spec.ts ``` diff --git a/src/__tests__/reactive-rpc/run.ts b/src/reactive-rpc/__tests__/e2e/run.ts similarity index 89% rename from src/__tests__/reactive-rpc/run.ts rename to src/reactive-rpc/__tests__/e2e/run.ts index 0e5c994b96..9f50727240 100644 --- a/src/__tests__/reactive-rpc/run.ts +++ b/src/reactive-rpc/__tests__/e2e/run.ts @@ -1,5 +1,5 @@ import {spawn} from 'child_process'; -import {Defer} from '../../util/Defer'; +import {Defer} from '../../../util/Defer'; const startServer = async () => { const started = new Defer(); @@ -59,11 +59,9 @@ const runTests = async () => { const server = await startServer(); await server.started; let exitCode = 0; - for (let i = 0; i < 3; i++) { - const jest = await runTests(); - exitCode = await jest.exitCode; - if (exitCode !== 0) throw exitCode; - } + const jest = await runTests(); + exitCode = await jest.exitCode; + if (exitCode !== 0) throw exitCode; process.exit(exitCode); } catch (error) { // tslint:disable-next-line no-console diff --git a/src/__tests__/reactive-rpc/uws/http/FetchRpcClient.spec.ts b/src/reactive-rpc/__tests__/e2e/uws/http/FetchRpcClient.spec.ts similarity index 80% rename from src/__tests__/reactive-rpc/uws/http/FetchRpcClient.spec.ts rename to src/reactive-rpc/__tests__/e2e/uws/http/FetchRpcClient.spec.ts index bb7244677b..b989653b43 100644 --- a/src/__tests__/reactive-rpc/uws/http/FetchRpcClient.spec.ts +++ b/src/reactive-rpc/__tests__/e2e/uws/http/FetchRpcClient.spec.ts @@ -2,14 +2,14 @@ * @jest-environment node */ -import {ApiTestSetup, runApiTests} from '../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; -import {RpcCodecs} from '../../../../reactive-rpc/common/codec/RpcCodecs'; -import {RpcMessageCodecs} from '../../../../reactive-rpc/common/codec/RpcMessageCodecs'; -import {Writer} from '../../../../util/buffers/Writer'; -import {Codecs} from '../../../../json-pack/codecs/Codecs'; -import {RpcMessageCodec} from '../../../../reactive-rpc/common/codec/types'; -import {JsonValueCodec} from '../../../../json-pack/codecs/types'; -import {FetchRpcClient} from '../../../../reactive-rpc/common/rpc/client/FetchRpcClient'; +import {ApiTestSetup, runApiTests} from '../../../../common/rpc/__tests__/runApiTests'; +import {RpcCodecs} from '../../../../common/codec/RpcCodecs'; +import {RpcMessageCodecs} from '../../../../common/codec/RpcMessageCodecs'; +import {Writer} from '../../../../../util/buffers/Writer'; +import {Codecs} from '../../../../../json-pack/codecs/Codecs'; +import {RpcMessageCodec} from '../../../../common/codec/types'; +import {JsonValueCodec} from '../../../../../json-pack/codecs/types'; +import {FetchRpcClient} from '../../../../common/rpc/client/FetchRpcClient'; if (process.env.TEST_E2E) { const codecs = new RpcCodecs(new Codecs(new Writer()), new RpcMessageCodecs()); diff --git a/src/__tests__/reactive-rpc/uws/http/StreamingRpcClient.spec.ts b/src/reactive-rpc/__tests__/e2e/uws/http/StreamingRpcClient.spec.ts similarity index 84% rename from src/__tests__/reactive-rpc/uws/http/StreamingRpcClient.spec.ts rename to src/reactive-rpc/__tests__/e2e/uws/http/StreamingRpcClient.spec.ts index 599bc89c04..924d564e86 100644 --- a/src/__tests__/reactive-rpc/uws/http/StreamingRpcClient.spec.ts +++ b/src/reactive-rpc/__tests__/e2e/uws/http/StreamingRpcClient.spec.ts @@ -2,14 +2,14 @@ * @jest-environment node */ -import {ApiTestSetup, runApiTests} from '../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; -import {StreamingRpcClient} from '../../../../reactive-rpc/common'; -import {RpcCodecs} from '../../../../reactive-rpc/common/codec/RpcCodecs'; -import {RpcMessageCodecs} from '../../../../reactive-rpc/common/codec/RpcMessageCodecs'; -import {Writer} from '../../../../util/buffers/Writer'; -import {Codecs} from '../../../../json-pack/codecs/Codecs'; -import {RpcMessageCodec} from '../../../../reactive-rpc/common/codec/types'; -import {JsonValueCodec} from '../../../../json-pack/codecs/types'; +import {ApiTestSetup, runApiTests} from '../../../../common/rpc/__tests__/runApiTests'; +import {StreamingRpcClient} from '../../../../common'; +import {RpcCodecs} from '../../../../common/codec/RpcCodecs'; +import {RpcMessageCodecs} from '../../../../common/codec/RpcMessageCodecs'; +import {Writer} from '../../../../../util/buffers/Writer'; +import {Codecs} from '../../../../../json-pack/codecs/Codecs'; +import {RpcMessageCodec} from '../../../../common/codec/types'; +import {JsonValueCodec} from '../../../../../json-pack/codecs/types'; if (process.env.TEST_E2E) { const codecs = new RpcCodecs(new Codecs(new Writer()), new RpcMessageCodecs()); diff --git a/src/__tests__/reactive-rpc/uws/ws/RpcPersistentClient.spec.ts b/src/reactive-rpc/__tests__/e2e/uws/ws/RpcPersistentClient.spec.ts similarity index 81% rename from src/__tests__/reactive-rpc/uws/ws/RpcPersistentClient.spec.ts rename to src/reactive-rpc/__tests__/e2e/uws/ws/RpcPersistentClient.spec.ts index c43d70cbf3..73c63d20f8 100644 --- a/src/__tests__/reactive-rpc/uws/ws/RpcPersistentClient.spec.ts +++ b/src/reactive-rpc/__tests__/e2e/uws/ws/RpcPersistentClient.spec.ts @@ -2,16 +2,16 @@ * @jest-environment node */ -import {ApiTestSetup, runApiTests} from '../../../../reactive-rpc/common/rpc/__tests__/runApiTests'; +import {ApiTestSetup, runApiTests} from '../../../../common/rpc/__tests__/runApiTests'; import WebSocket from 'ws'; -import {RpcCodecs} from '../../../../reactive-rpc/common/codec/RpcCodecs'; -import {RpcMessageCodecs} from '../../../../reactive-rpc/common/codec/RpcMessageCodecs'; -import {Writer} from '../../../../util/buffers/Writer'; -import {Codecs} from '../../../../json-pack/codecs/Codecs'; -import {RpcMessageCodec} from '../../../../reactive-rpc/common/codec/types'; -import {JsonValueCodec} from '../../../../json-pack/codecs/types'; -import {RpcCodec} from '../../../../reactive-rpc/common/codec/RpcCodec'; -import {RpcPersistentClient, WebSocketChannel} from '../../../../reactive-rpc/common'; +import {RpcCodecs} from '../../../../common/codec/RpcCodecs'; +import {RpcMessageCodecs} from '../../../../common/codec/RpcMessageCodecs'; +import {Writer} from '../../../../../util/buffers/Writer'; +import {Codecs} from '../../../../../json-pack/codecs/Codecs'; +import {RpcMessageCodec} from '../../../../common/codec/types'; +import {JsonValueCodec} from '../../../../../json-pack/codecs/types'; +import {RpcCodec} from '../../../../common/codec/RpcCodec'; +import {RpcPersistentClient, WebSocketChannel} from '../../../../common'; if (process.env.TEST_E2E) { const codecs = new RpcCodecs(new Codecs(new Writer()), new RpcMessageCodecs()); From 4ee7d5c434c64c814b3145f026995c171592c32d Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 14:09:41 +0100 Subject: [PATCH 21/25] =?UTF-8?q?style(reactive-rpc):=20=F0=9F=92=84=20run?= =?UTF-8?q?=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/common/codec/RpcCodec.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/reactive-rpc/common/codec/RpcCodec.ts b/src/reactive-rpc/common/codec/RpcCodec.ts index 37cbafc9c5..7dba4e63d6 100644 --- a/src/reactive-rpc/common/codec/RpcCodec.ts +++ b/src/reactive-rpc/common/codec/RpcCodec.ts @@ -4,7 +4,11 @@ import type {JsonValueCodec} from '../../../json-pack/codecs/types'; import type {RpcMessageCodec} from './types'; export class RpcCodec { - constructor(public readonly msg: RpcMessageCodec, public readonly req: JsonValueCodec, public readonly res: JsonValueCodec) {} + constructor( + public readonly msg: RpcMessageCodec, + public readonly req: JsonValueCodec, + public readonly res: JsonValueCodec, + ) {} public specifier(): RpcSpecifier { const specifier = `rpc.${this.msg.id}.${this.req.id}` + (this.req.id !== this.res.id ? `-${this.res.id}` : ''); From a16547e0e57ebdf5586595daec87c865da0b8bab Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 14:25:55 +0100 Subject: [PATCH 22/25] =?UTF-8?q?test(reactive-rpc):=20=F0=9F=92=8D=20exit?= =?UTF-8?q?=20testing=20process=20after=20E2E=20tests=20are=20done?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/ci.yml | 3 +-- package.json | 2 +- .../common/rpc/__tests__/runApiTests.ts | 15 ++++++++++++++- .../common/rpc/client/FetchRpcClient.ts | 2 ++ 4 files changed, 18 insertions(+), 4 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index be55d61a93..c6109c669a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -59,5 +59,4 @@ jobs: node-version: ${{ matrix.node-version }} cache: yarn - run: yarn install --frozen-lockfile - - run: yarn build:all - - run: PORT=9999 yarn test:reactive-rpc + - run: yarn test:reactive-rpc diff --git a/package.json b/package.json index 7cc86267ef..30e71c27bc 100644 --- a/package.json +++ b/package.json @@ -47,7 +47,7 @@ "build": "yarn build:es2020", "jest": "node -r ts-node/register ./node_modules/.bin/jest", "test": "jest --maxWorkers 7", - "test:all": "yarn lint && yarn test && yarn test:cli:pointer && yarn test:cli:patch && yarn test:cli:pack && yarn test:reactive-rpc && yarn demo:json-patch && yarn demo:json-pointer", + "test:all": "yarn lint && yarn test && yarn build:all && yarn test:cli:pointer && yarn test:cli:patch && yarn test:cli:pack && yarn test:reactive-rpc && yarn demo:json-patch && yarn demo:json-pointer", "test:ci": "yarn jest --maxWorkers 3 --no-cache", "test:cli": "yarn test:cli:pointer && yarn test:cli:patch && yarn test:cli:pack", "test:cli:pointer": "./bin/json-pointer-test.js ./bin/json-pointer.js", diff --git a/src/reactive-rpc/common/rpc/__tests__/runApiTests.ts b/src/reactive-rpc/common/rpc/__tests__/runApiTests.ts index 0d22b11c8c..a4e43ac47e 100644 --- a/src/reactive-rpc/common/rpc/__tests__/runApiTests.ts +++ b/src/reactive-rpc/common/rpc/__tests__/runApiTests.ts @@ -4,7 +4,7 @@ import {of} from '../../util/of'; import {RpcError} from '../caller'; export interface ApiTestSetupResult { - client: Pick; + client: Pick; } export type ApiTestSetup = () => ApiTestSetupResult | Promise; @@ -15,18 +15,21 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} const {client} = await setup(); const result = await firstValueFrom(client.call$('ping', {})); expect(result).toBe('pong'); + await client.stop(); }, 15_000); test('can execute without payload', async () => { const {client} = await setup(); const result = await firstValueFrom(client.call$('ping', undefined)); expect(result).toBe('pong'); + await client.stop(); }); test('can execute with unexpected payload', async () => { const {client} = await setup(); const result = await firstValueFrom(client.call$('ping', 'VERY_UNEXPECTED')); expect(result).toBe('pong'); + await client.stop(); }); }); @@ -35,6 +38,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} const {client} = await setup(); const result = await firstValueFrom(client.call$('double', {num: 1.2})); expect(result).toEqual({num: 2.4}); + await client.stop(); }); test('can execute two request in parallel', async () => { @@ -44,6 +48,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} const [res1, res2] = await Promise.all([promise1, promise2]); expect(res1[0]).toEqual({num: 2}); expect(res2[0]).toEqual({num: 4}); + await client.stop(); }); test('throws error when validation fails', async () => { @@ -51,6 +56,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} const [, error] = await of(firstValueFrom(client.call$('double', {num: {}}))); expect((error as RpcError).code).toBe('BAD_REQUEST'); expect((error as RpcError).message).toBe('Payload .num field missing.'); + await client.stop(); }); }); @@ -59,6 +65,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} const {client} = await setup(); const [, error] = await of(firstValueFrom(client.call$('error', {}))); expect(error).toMatchObject({message: 'this promise can throw'}); + await client.stop(); }); }); @@ -67,6 +74,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} const {client} = await setup(); const [, error] = await of(lastValueFrom(client.call$('streamError', {}))); expect(error).toMatchObject({message: 'Stream always errors'}); + await client.stop(); }); }); @@ -79,6 +87,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} commit: 'AAAAAAAAAAAAAAAAAAA', sha1: 'BBBBBBBBBBBBBBBBBBB', }); + await client.stop(); }); }); @@ -89,6 +98,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} expect(result).toEqual({ bar: 'aa', }); + await client.stop(); }); test('throws on invalid data', async () => { @@ -97,6 +107,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} expect(error).toMatchObject({ message: '"foo" property missing.', }); + await client.stop(); }); }); @@ -110,6 +121,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} expect(result).toEqual({ bar: 'aa', }); + await client.stop(); }); test('throws on invalid data', async () => { @@ -118,6 +130,7 @@ export const runApiTests = (setup: ApiTestSetup, params: {staticOnly?: boolean} expect(error).toMatchObject({ message: '"foo" property missing.', }); + await client.stop(); }); }); } diff --git a/src/reactive-rpc/common/rpc/client/FetchRpcClient.ts b/src/reactive-rpc/common/rpc/client/FetchRpcClient.ts index 9372b622ce..9082e6a873 100644 --- a/src/reactive-rpc/common/rpc/client/FetchRpcClient.ts +++ b/src/reactive-rpc/common/rpc/client/FetchRpcClient.ts @@ -59,4 +59,6 @@ export class FetchRpcClient implements RpcClient { public notify(method: string, data: undefined | unknown): void { this.notify(method, data); } + + public stop() {} } From 50d3b0a3590b6cbcf2e0a34cc0f7308e0b7e5c0f Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 14:30:12 +0100 Subject: [PATCH 23/25] =?UTF-8?q?test:=20=F0=9F=92=8D=20add=20.stop()=20me?= =?UTF-8?q?thods=20for=20RPC=20clients?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .github/workflows/ci.yml | 2 +- .../common/rpc/caller/__tests__/RpcApiCaller.spec.ts | 1 + src/reactive-rpc/common/rpc/client/StaticRpcClient.ts | 2 ++ 3 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c6109c669a..e896c77cad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -46,7 +46,7 @@ jobs: - run: yarn test:cli:pack - run: yarn demo:json-patch - run: yarn demo:json-pointer - rx-rpc: + e2e-rx-rpc: runs-on: ubuntu-latest strategy: matrix: diff --git a/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts b/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts index 6239819ae0..485b1ad9e5 100644 --- a/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts +++ b/src/reactive-rpc/common/rpc/caller/__tests__/RpcApiCaller.spec.ts @@ -114,6 +114,7 @@ describe('smoke tests', () => { throw error.data; }), ), + stop: () => {}, }, }; }); diff --git a/src/reactive-rpc/common/rpc/client/StaticRpcClient.ts b/src/reactive-rpc/common/rpc/client/StaticRpcClient.ts index 3c9f0f14fe..d6398d0964 100644 --- a/src/reactive-rpc/common/rpc/client/StaticRpcClient.ts +++ b/src/reactive-rpc/common/rpc/client/StaticRpcClient.ts @@ -97,4 +97,6 @@ export class StaticRpcClient implements RpcClient { const value = new Value(data, undefined); this.buffer.push(new msg.NotificationMessage(method, value)); } + + public stop() {} } From 32f17916b814cc24a45aa6b87136c4ab4b2afcb1 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 14:39:02 +0100 Subject: [PATCH 24/25] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20add?= =?UTF-8?q?=20convenience=20methods=20for=20RPC=20client=20instantiation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../browser/createBinaryWsRpcClient.ts | 23 +++++++++++++++++++ .../browser/createJsonWsRpcClient.ts | 23 +++++++++++++++++++ 2 files changed, 46 insertions(+) create mode 100644 src/reactive-rpc/browser/createBinaryWsRpcClient.ts create mode 100644 src/reactive-rpc/browser/createJsonWsRpcClient.ts diff --git a/src/reactive-rpc/browser/createBinaryWsRpcClient.ts b/src/reactive-rpc/browser/createBinaryWsRpcClient.ts new file mode 100644 index 0000000000..9ceb194839 --- /dev/null +++ b/src/reactive-rpc/browser/createBinaryWsRpcClient.ts @@ -0,0 +1,23 @@ +import {CborJsonValueCodec} from "../../json-pack/codecs/cbor"; +import {Writer} from "../../util/buffers/Writer"; +import {RpcPersistentClient, WebSocketChannel} from "../common"; +import {RpcCodec} from "../common/codec/RpcCodec"; +import {BinaryRpcMessageCodec} from "../common/codec/binary"; + +export const createBinaryWsRpcClient = (url: string) => { + const writer = new Writer(1024 * 4); + const msg = new BinaryRpcMessageCodec(); + const req = new CborJsonValueCodec(writer); + const codec = new RpcCodec(msg, req, req); + const client = new RpcPersistentClient({ + codec, + channel: { + newChannel: () => + new WebSocketChannel({ + newSocket: () => new WebSocket(url, [codec.specifier()]), + }), + }, + }); + client.start(); + return client; +}; diff --git a/src/reactive-rpc/browser/createJsonWsRpcClient.ts b/src/reactive-rpc/browser/createJsonWsRpcClient.ts new file mode 100644 index 0000000000..53bd5202b9 --- /dev/null +++ b/src/reactive-rpc/browser/createJsonWsRpcClient.ts @@ -0,0 +1,23 @@ +import {JsonJsonValueCodec} from "../../json-pack/codecs/json"; +import {Writer} from "../../util/buffers/Writer"; +import {RpcPersistentClient, WebSocketChannel} from "../common"; +import {RpcCodec} from "../common/codec/RpcCodec"; +import {CompactRpcMessageCodec} from "../common/codec/compact"; + +export const createJsonWsRpcClient = (url: string) => { + const writer = new Writer(1024 * 4); + const msg = new CompactRpcMessageCodec(); + const req = new JsonJsonValueCodec(writer); + const codec = new RpcCodec(msg, req, req); + const client = new RpcPersistentClient({ + codec, + channel: { + newChannel: () => + new WebSocketChannel({ + newSocket: () => new WebSocket(url, [codec.specifier()]), + }), + }, + }); + client.start(); + return client; +}; From c9e186c5940c3395352063b6799c099c06631991 Mon Sep 17 00:00:00 2001 From: streamich Date: Fri, 17 Nov 2023 14:43:40 +0100 Subject: [PATCH 25/25] =?UTF-8?q?refactor(reactive-rpc):=20=F0=9F=92=A1=20?= =?UTF-8?q?remove=20server=20part=20from=20RpcPersistentClient?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../browser/createBinaryWsRpcClient.ts | 10 ++-- .../browser/createJsonWsRpcClient.ts | 10 ++-- .../common/rpc/RpcPersistentClient.ts | 57 ++++++------------- 3 files changed, 27 insertions(+), 50 deletions(-) diff --git a/src/reactive-rpc/browser/createBinaryWsRpcClient.ts b/src/reactive-rpc/browser/createBinaryWsRpcClient.ts index 9ceb194839..60b65b4c0b 100644 --- a/src/reactive-rpc/browser/createBinaryWsRpcClient.ts +++ b/src/reactive-rpc/browser/createBinaryWsRpcClient.ts @@ -1,8 +1,8 @@ -import {CborJsonValueCodec} from "../../json-pack/codecs/cbor"; -import {Writer} from "../../util/buffers/Writer"; -import {RpcPersistentClient, WebSocketChannel} from "../common"; -import {RpcCodec} from "../common/codec/RpcCodec"; -import {BinaryRpcMessageCodec} from "../common/codec/binary"; +import {CborJsonValueCodec} from '../../json-pack/codecs/cbor'; +import {Writer} from '../../util/buffers/Writer'; +import {RpcPersistentClient, WebSocketChannel} from '../common'; +import {RpcCodec} from '../common/codec/RpcCodec'; +import {BinaryRpcMessageCodec} from '../common/codec/binary'; export const createBinaryWsRpcClient = (url: string) => { const writer = new Writer(1024 * 4); diff --git a/src/reactive-rpc/browser/createJsonWsRpcClient.ts b/src/reactive-rpc/browser/createJsonWsRpcClient.ts index 53bd5202b9..3e50e4abff 100644 --- a/src/reactive-rpc/browser/createJsonWsRpcClient.ts +++ b/src/reactive-rpc/browser/createJsonWsRpcClient.ts @@ -1,8 +1,8 @@ -import {JsonJsonValueCodec} from "../../json-pack/codecs/json"; -import {Writer} from "../../util/buffers/Writer"; -import {RpcPersistentClient, WebSocketChannel} from "../common"; -import {RpcCodec} from "../common/codec/RpcCodec"; -import {CompactRpcMessageCodec} from "../common/codec/compact"; +import {JsonJsonValueCodec} from '../../json-pack/codecs/json'; +import {Writer} from '../../util/buffers/Writer'; +import {RpcPersistentClient, WebSocketChannel} from '../common'; +import {RpcCodec} from '../common/codec/RpcCodec'; +import {CompactRpcMessageCodec} from '../common/codec/compact'; export const createJsonWsRpcClient = (url: string) => { const writer = new Writer(1024 * 4); diff --git a/src/reactive-rpc/common/rpc/RpcPersistentClient.ts b/src/reactive-rpc/common/rpc/RpcPersistentClient.ts index 1e4f3aa1b8..1ad960ae5e 100644 --- a/src/reactive-rpc/common/rpc/RpcPersistentClient.ts +++ b/src/reactive-rpc/common/rpc/RpcPersistentClient.ts @@ -1,23 +1,15 @@ +import * as msg from '../messages'; import {firstValueFrom, Observable, ReplaySubject, timer} from 'rxjs'; import {filter, first, share, switchMap, takeUntil} from 'rxjs/operators'; -import * as msg from '../messages'; import {StreamingRpcClient, StreamingRpcClientOptions} from './client/StreamingRpcClient'; -import {ApiRpcCaller} from './caller/ApiRpcCaller'; -import {RpcDuplex} from '../rpc/RpcDuplex'; -import {RpcMessageStreamProcessor, RpcMessageStreamProcessorOptions} from './RpcMessageStreamProcessor'; import {PersistentChannel, PersistentChannelParams} from '../channel'; -import {RpcCodec} from '../codec/RpcCodec'; +import type {RpcCodec} from '../codec/RpcCodec'; -export interface RpcPersistentClientParams { +export interface RpcPersistentClientParams { channel: PersistentChannelParams; codec: RpcCodec; client?: Omit; - /** - * @todo Remove this option. Remove server from here. - */ - server?: Omit, 'send'>; - /** * Number of milliseconds to periodically send keep-alive ".ping" notification * messages. If not specified, will default to 15,000 (15 seconds). If 0, will @@ -35,45 +27,30 @@ export interface RpcPersistentClientParams { /** * RPC client which automatically reconnects if disconnected. */ -export class RpcPersistentClient { +export class RpcPersistentClient { public channel: PersistentChannel; - public rpc?: RpcDuplex; - public readonly rpc$ = new ReplaySubject>(1); + public rpc?: StreamingRpcClient; + public readonly rpc$ = new ReplaySubject(1); - constructor(params: RpcPersistentClientParams) { + constructor(params: RpcPersistentClientParams) { const ping = params.ping ?? 15000; const codec = params.codec; const textEncoder = new TextEncoder(); this.channel = new PersistentChannel(params.channel); this.channel.open$.pipe(filter((open) => open)).subscribe(() => { const close$ = this.channel.open$.pipe(filter((open) => !open)); - - const duplex = new RpcDuplex({ - client: new StreamingRpcClient({ - ...(params.client || {}), - send: (messages: msg.ReactiveRpcClientMessage[]): void => { - const encoded = codec.encode(messages, codec.req); - this.channel.send$(encoded).subscribe(); - }, - }), - server: new RpcMessageStreamProcessor({ - ...(params.server || { - caller: new ApiRpcCaller({ - api: {}, - }), - onNotification: () => {}, - }), - send: (messages: (msg.ReactiveRpcServerMessage | msg.NotificationMessage)[]): void => { - const encoded = codec.encode(messages, codec.req); - this.channel.send$(encoded).subscribe(); - }, - }), + const client = new StreamingRpcClient({ + ...(params.client || {}), + send: (messages: msg.ReactiveRpcClientMessage[]): void => { + const encoded = codec.encode(messages, codec.req); + this.channel.send$(encoded).subscribe(); + }, }); this.channel.message$.pipe(takeUntil(close$)).subscribe((data) => { const encoded = typeof data === 'string' ? textEncoder.encode(data) : new Uint8Array(data); const messages = codec.decode(encoded, codec.res); - duplex.onMessages((messages instanceof Array ? messages : [messages]) as msg.ReactiveRpcMessage[], {} as Ctx); + client.onMessages((messages instanceof Array ? messages : [messages]) as msg.ReactiveRpcServerMessage[]); }); // Send ping notifications to keep the connection alive. @@ -81,13 +58,13 @@ export class RpcPersistentClient { timer(ping, ping) .pipe(takeUntil(close$)) .subscribe(() => { - duplex.notify(params.pingMethod || '.ping', undefined); + client.notify(params.pingMethod || '.ping', undefined); }); } if (this.rpc) this.rpc.disconnect(); - this.rpc = duplex; - this.rpc$.next(duplex); + this.rpc = client; + this.rpc$.next(client); }); }