diff --git a/src/json-type/system/TypeRouter.ts b/src/json-type/system/TypeRouter.ts index 7e508a60cd..63a385054f 100644 --- a/src/json-type/system/TypeRouter.ts +++ b/src/json-type/system/TypeRouter.ts @@ -44,7 +44,23 @@ export class TypeRouter { const router = new TypeRouter({system: this.system, routes: routes(this)}); return this.merge(router); } + + public fn>( + name: K, + type: R, + ): TypeRouter { + this.routes[name] = type; + return this; + } + + public fn$>( + name: K, + type: R, + ): TypeRouter { + this.routes[name] = type; + return this; + } } -export type RoutesBase = Record>; +export type RoutesBase = Record | classes.FunctionStreamingType>; type TypeRouterRoutes> = R extends TypeRouter ? R2 : never; diff --git a/src/reactive-rpc/__demos__/server.ts b/src/reactive-rpc/__demos__/server.ts index c7c1b347cd..76692195a0 100644 --- a/src/reactive-rpc/__demos__/server.ts +++ b/src/reactive-rpc/__demos__/server.ts @@ -14,6 +14,7 @@ const app = new RpcApp({ caller, codecs, maxRequestBodySize: 1024 * 1024, + augmentContext: (ctx) => ctx, }); app.enableCors(); diff --git a/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts b/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts index 88ae2e70da..69d5e3ab17 100644 --- a/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts +++ b/src/reactive-rpc/common/rpc/caller/TypeRouterCaller.ts @@ -5,6 +5,8 @@ import {StaticRpcMethod, type StaticRpcMethodOptions} from '../methods/StaticRpc import {StreamingRpcMethod, type StreamingRpcMethodOptions} from '../methods/StreamingRpcMethod'; import type {Schema, SchemaOf, TypeOf, TypeSystem} from '../../../../json-type'; import type {TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {Value} from '../../messages/Value'; +import type {Observable} from 'rxjs'; export interface TypedApiCallerOptions, Ctx = unknown> extends Omit, 'getMethod'> { @@ -68,9 +70,25 @@ export class TypeRouterCaller, Ctx = unknown> ext id: K, request: MethodReq[K]>, ctx: Ctx, - ): Promise[K]>> { + ): Promise[K]>>> { return super.call(id as string, request, ctx) as any; } + + public async callSimple>( + id: K, + request: MethodReq[K]>, + ctx: Ctx = {} as any, + ): Promise[K]>> { + return (await super.call(id as string, request, ctx)).data as any; + } + + public call$>( + id: K, + request: Observable[K]>>, + ctx: Ctx, + ): Observable[K]>>> { + return super.call$(id as string, request, ctx) as any; + } } type Routes = Router extends TypeRouter ? R : never; diff --git a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts index 456204edc9..c1711862f8 100644 --- a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts +++ b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts @@ -39,21 +39,21 @@ export class RpcError extends Error implements IRpcError { return RpcError.internal(); } - public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined) { + public static fromCode(errno: RpcErrorCodes, message: string = '', meta: unknown = undefined): RpcError { const code = RpcErrorCodes[errno]; return new RpcError(message || code, code, errno, undefined, meta || undefined); } - public static internal(message: string = 'Internal Server Error') { + public static internal(message: string = 'Internal Server Error'): RpcError { return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message); } /** @todo Rename to "badRequest". */ - public static invalidRequest() { + public static invalidRequest(): RpcError { return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, 'Bad Request'); } - public static validation(message: string, meta?: unknown) { + public static validation(message: string, meta?: unknown): RpcError { return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, message, meta); } diff --git a/src/reactive-rpc/server/uws/RpcApp.ts b/src/reactive-rpc/server/uws/RpcApp.ts index 4fcf4be467..ba068a172f 100644 --- a/src/reactive-rpc/server/uws/RpcApp.ts +++ b/src/reactive-rpc/server/uws/RpcApp.ts @@ -1,6 +1,5 @@ import {enableCors} from './util'; import {Match, Router} from '../../../util/router'; -import {listToUint8} from '../../../util/buffers/concat'; import {IncomingBatchMessage, RpcMessageBatchProcessor} from '../../common/rpc/RpcMessageBatchProcessor'; import {RpcError, RpcErrorCodes, RpcErrorType} from '../../common/rpc/caller/error'; import {ConnectionContext} from '../context'; @@ -11,15 +10,7 @@ import {RpcMessageFormat} from '../../common/codec/constants'; import {RpcCodecs} from '../../common/codec/RpcCodecs'; import {type ReactiveRpcMessage, RpcMessageStreamProcessor, ReactiveRpcClientMessage} from '../../common'; import type {Codecs} from '../../../json-pack/codecs/Codecs'; -import type { - TemplatedApp, - HttpRequest, - HttpResponse, - HttpMethodPermissive, - JsonRouteHandler, - WebSocket, - RpcWebSocket, -} from './types'; +import type * as types from './types'; import type {RouteHandler} from './types'; import type {RpcCaller} from '../../common/rpc/caller/RpcCaller'; import type {JsonValueCodec} from '../../../json-pack/codecs/types'; @@ -31,15 +22,16 @@ const ERR_NOT_FOUND = RpcError.fromCode(RpcErrorCodes.NOT_FOUND, 'Not Found'); const ERR_INTERNAL = RpcError.internal(); export interface RpcAppOptions { - uws: TemplatedApp; + uws: types.TemplatedApp; maxRequestBodySize: number; codecs: Codecs; - caller: RpcCaller; + caller: RpcCaller; + augmentContext: (ctx: ConnectionContext) => void; } export class RpcApp { public readonly codecs: RpcCodecs; - protected readonly app: TemplatedApp; + protected readonly app: types.TemplatedApp; protected readonly maxRequestBodySize: number; protected readonly router = new Router(); protected readonly batchProcessor: RpcMessageBatchProcessor; @@ -55,12 +47,12 @@ export class RpcApp { enableCors(this.options.uws); } - public routeRaw(method: HttpMethodPermissive, path: string, handler: RouteHandler): void { - method = method.toLowerCase() as HttpMethodPermissive; + public routeRaw(method: types.HttpMethodPermissive, path: string, handler: RouteHandler): void { + method = method.toLowerCase() as types.HttpMethodPermissive; this.router.add(method + path, handler); } - public route(method: HttpMethodPermissive, path: string, handler: JsonRouteHandler): void { + public route(method: types.HttpMethodPermissive, path: string, handler: types.JsonRouteHandler): void { this.routeRaw(method, path, async (ctx: Ctx) => { const result = await handler(ctx); const res = ctx.res!; @@ -112,6 +104,7 @@ export class RpcApp { public enableWsRpc(path: string = '/rpc'): this { const maxBackpressure = 4 * 1024 * 1024; + const augmentContext = this.options.augmentContext; this.app.ws(path, { idleTimeout: 0, maxPayloadLength: 4 * 1024 * 1024, @@ -120,11 +113,12 @@ export class RpcApp { const secWebSocketProtocol = req.getHeader('sec-websocket-protocol'); const secWebSocketExtensions = req.getHeader('sec-websocket-extensions'); const ctx = ConnectionContext.fromReqRes(req, res, null, this); + augmentContext(ctx); /* This immediately calls open handler, you must not use res after this call */ res.upgrade({ctx}, secWebSocketKey, secWebSocketProtocol, secWebSocketExtensions, context); }, - open: (ws_: WebSocket) => { - const ws = ws_ as RpcWebSocket; + open: (ws_: types.WebSocket) => { + const ws = ws_ as types.RpcWebSocket; const ctx = ws.ctx; const resCodec = ctx.resCodec; const msgCodec = ctx.msgCodec; @@ -144,8 +138,8 @@ export class RpcApp { bufferTime: 0, }); }, - message: (ws_: WebSocket, buf: ArrayBuffer, isBinary: boolean) => { - const ws = ws_ as RpcWebSocket; + message: (ws_: types.WebSocket, buf: ArrayBuffer, isBinary: boolean) => { + const ws = ws_ as types.RpcWebSocket; const ctx = ws.ctx; const reqCodec = ctx.reqCodec; const msgCodec = ctx.msgCodec; @@ -158,8 +152,8 @@ export class RpcApp { rpc.sendNotification('.err', RpcError.value(RpcError.invalidRequest())); } }, - close: (ws_: WebSocket, code: number, message: ArrayBuffer) => { - const ws = ws_ as RpcWebSocket; + close: (ws_: types.WebSocket, code: number, message: ArrayBuffer) => { + const ws = ws_ as types.RpcWebSocket; ws.rpc!.stop(); }, }); @@ -170,7 +164,8 @@ export class RpcApp { const matcher = this.router.compile(); const codecs = this.codecs; let responseCodec: JsonValueCodec = codecs.value.json; - this.app.any('/*', async (res: HttpResponse, req: HttpRequest) => { + const augmentContext = this.options.augmentContext; + this.app.any('/*', async (res: types.HttpResponse, req: types.HttpRequest) => { res.onAborted(() => { res.aborted = true; }); @@ -189,6 +184,7 @@ export class RpcApp { const params = match.params; const ctx = ConnectionContext.fromReqRes(req, res, params, this) as Ctx; responseCodec = ctx.resCodec; + augmentContext(ctx); await handler(ctx); } catch (err) { if (err instanceof RpcError) { diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/blocks.spec.ts new file mode 100644 index 0000000000..45124294de --- /dev/null +++ b/src/server/__tests__/blocks.spec.ts @@ -0,0 +1,360 @@ +import {of} from 'rxjs'; +import {Model} from '../../json-crdt'; +import {Value} from '../../reactive-rpc/common/messages/Value'; +import {RpcError, RpcErrorCodes} from '../../reactive-rpc/common/rpc/caller'; +import {setup} from './setup'; +import {tick, until} from '../../__tests__/util'; + +describe('blocks.*', () => { + describe('blocks.create', () => { + test('can create an empty block', async () => { + const {caller} = setup(); + await caller.call('blocks.create', {id: 'my-block', patches: []}, {}); + const {block} = (await caller.call('blocks.get', {id: 'my-block'}, {})).data; + expect(block).toMatchObject({ + id: 'my-block', + seq: -1, + blob: expect.any(Uint8Array), + created: expect.any(Number), + updated: expect.any(Number), + }); + const model = Model.fromBinary(block.blob); + expect(model.view()).toBe(undefined); + }); + + test('can create a block with value', async () => { + const {call} = setup(); + const model = Model.withLogicalClock(); + model.api.root({ + name: 'Super Woman', + age: 25, + }); + const patch1 = model.api.flush(); + model.api.obj([]).set({ + age: 26, + }); + const patch2 = model.api.flush(); + await call('blocks.create', { + id: '123412341234', + patches: [ + { + seq: 0, + created: Date.now(), + blob: patch1.toBinary(), + }, + { + seq: 1, + created: Date.now(), + blob: patch2.toBinary(), + }, + ], + }); + const {block} = await call('blocks.get', {id: '123412341234'}); + expect(block).toMatchObject({ + id: '123412341234', + seq: 1, + blob: expect.any(Uint8Array), + created: expect.any(Number), + updated: expect.any(Number), + }); + const model2 = Model.fromBinary(block.blob); + expect(model2.view()).toStrictEqual({ + name: 'Super Woman', + age: 26, + }); + }); + }); + + describe('blocks.remove', () => { + test('can remove an existing block', async () => { + const {call} = setup(); + await call('blocks.create', {id: 'my-block', patches: []}); + const {block} = await call('blocks.get', {id: 'my-block'}); + expect(block.id).toBe('my-block'); + await call('blocks.remove', {id: 'my-block'}); + try { + await call('blocks.get', {id: 'my-block'}); + throw new Error('not this error'); + } catch (err) { + if (!(err instanceof Value)) throw err; + const error = err.data; + if (!(error instanceof RpcError)) throw err; + expect(error.errno).toBe(RpcErrorCodes.NOT_FOUND); + } + }); + }); + + describe('blocks.edit', () => { + test('can edit a document sequentially', async () => { + const {call} = setup(); + const id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'; + const model = Model.withLogicalClock(); + model.api.root({ + text: 'Hell', + }); + const patch1 = model.api.flush(); + await call('blocks.create', { + id, + patches: [ + { + seq: 0, + created: Date.now(), + blob: patch1.toBinary(), + }, + ], + }); + model.api.str(['text']).ins(4, 'o'); + const patch2 = model.api.flush(); + model.api.str(['text']).ins(5, ' World'); + const patch3 = model.api.flush(); + await call('blocks.edit', { + id, + patches: [ + { + seq: 1, + created: Date.now(), + blob: patch2.toBinary(), + }, + { + seq: 2, + created: Date.now(), + blob: patch3.toBinary(), + }, + ], + }); + const block2 = await call('blocks.get', {id}); + expect(Model.fromBinary(block2.block.blob).view()).toStrictEqual({ + text: 'Hello World', + }); + model.api.str(['text']).del(5, 1).ins(5, ', '); + const patch4 = model.api.flush(); + model.api.str(['text']).ins(12, '!'); + const patch5 = model.api.flush(); + await call('blocks.edit', { + id, + patches: [ + { + seq: 3, + created: Date.now(), + blob: patch4.toBinary(), + }, + { + seq: 4, + created: Date.now(), + blob: patch5.toBinary(), + }, + ], + }); + const block3 = await call('blocks.get', {id}); + expect(Model.fromBinary(block3.block.blob).view()).toStrictEqual({ + text: 'Hello, World!', + }); + }); + + test('can edit a document concurrently', async () => { + const {call} = setup(); + const id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'; + + // User 1 + const model = Model.withLogicalClock(); + model.api.root({ + text: 'Hell', + }); + const patch1 = model.api.flush(); + await call('blocks.create', { + id, + patches: [ + { + seq: 0, + created: Date.now(), + blob: patch1.toBinary(), + }, + ], + }); + + // User 2 + const block2 = await call('blocks.get', {id}); + const model2 = Model.fromBinary(block2.block.blob).fork(); + model2.api.str(['text']).ins(4, ' yeah!'); + const patch2User2 = model2.api.flush(); + await call('blocks.edit', { + id, + patches: [ + { + seq: 1, + created: Date.now(), + blob: patch2User2.toBinary(), + }, + ], + }); + expect(model2.view()).toStrictEqual({text: 'Hell yeah!'}); + + const block3 = await call('blocks.get', {id}); + const model3 = Model.fromBinary(block3.block.blob).fork(); + expect(model3.view()).toStrictEqual({text: 'Hell yeah!'}); + + // User 1 + model.api.str(['text']).ins(4, 'o'); + const patch2 = model.api.flush(); + model.api.str(['text']).ins(5, ' World'); + const patch3 = model.api.flush(); + const {patches} = await call('blocks.edit', { + id, + patches: [ + { + seq: 1, + created: Date.now(), + blob: patch2.toBinary(), + }, + { + seq: 2, + created: Date.now(), + blob: patch3.toBinary(), + }, + ], + }); + + const block4 = await call('blocks.get', {id}); + const model4 = Model.fromBinary(block4.block.blob).fork(); + expect(model4.view()).not.toStrictEqual({text: 'Hell yeah!'}); + }); + + test('returns patches that happened concurrently', async () => { + const {call} = setup(); + const id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'; + + // User 1 + const model = Model.withLogicalClock(); + model.api.root({ + text: 'Hell', + }); + const patch1 = model.api.flush(); + await call('blocks.create', { + id, + patches: [ + { + seq: 0, + created: Date.now(), + blob: patch1.toBinary(), + }, + ], + }); + + // User 2 + const block2 = await call('blocks.get', {id}); + const model2 = Model.fromBinary(block2.block.blob).fork(); + model2.api.str(['text']).ins(4, ' yeah!'); + const patch2User2 = model2.api.flush(); + await call('blocks.edit', { + id, + patches: [ + { + seq: 1, + created: Date.now(), + blob: patch2User2.toBinary(), + }, + ], + }); + + // User 1 + model.api.str(['text']).ins(4, 'o'); + const patch2 = model.api.flush(); + model.api.str(['text']).ins(5, ' World'); + const patch3 = model.api.flush(); + const {patches} = await call('blocks.edit', { + id, + patches: [ + { + seq: 1, + created: Date.now(), + blob: patch2.toBinary(), + }, + { + seq: 2, + created: Date.now(), + blob: patch3.toBinary(), + }, + ], + }); + expect(patches.length).toBe(3); + expect(patches[0].seq).toBe(1); + expect(patches[1].seq).toBe(2); + expect(patches[2].seq).toBe(3); + expect(patches[1].blob).toStrictEqual(patch2.toBinary()); + expect(patches[2].blob).toStrictEqual(patch3.toBinary()); + }); + }); + + describe('blocks.listen', () => { + test('can listen for block changes', async () => { + const {call, caller} = setup(); + await call('blocks.create', {id: 'my-block', patches: []}); + await tick(11); + const emits: any[] = []; + caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data)); + const model = Model.withLogicalClock(); + model.api.root({ + text: 'Hell', + }); + const patch1 = model.api.flush(); + await tick(12); + expect(emits.length).toBe(0); + await call('blocks.edit', {id: 'my-block', patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}]}); + await until(() => emits.length === 1); + expect(emits.length).toBe(1); + expect(emits[0].data.patches.length).toBe(1); + expect(emits[0].data.patches[0].seq).toBe(0); + model.api.root({ + text: 'Hello', + }); + const patch2 = model.api.flush(); + await tick(12); + expect(emits.length).toBe(1); + await call('blocks.edit', {id: 'my-block', patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}]}); + await until(() => emits.length === 2); + expect(emits.length).toBe(2); + expect(emits[1].data.patches.length).toBe(1); + expect(emits[1].data.patches[0].seq).toBe(1); + }); + + test('can subscribe before block is created', async () => { + const {call, caller} = setup(); + const emits: any[] = []; + caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data)); + const model = Model.withLogicalClock(); + model.api.root({ + text: 'Hell', + }); + const patch1 = model.api.flush(); + await tick(12); + expect(emits.length).toBe(0); + await call('blocks.create', { + id: 'my-block', + patches: [ + { + seq: 0, + created: Date.now(), + blob: patch1.toBinary(), + }, + ], + }); + await until(() => emits.length === 1); + expect(emits.length).toBe(1); + expect(emits[0].data.patches.length).toBe(1); + expect(emits[0].data.patches[0].seq).toBe(0); + expect(emits[0].data.patches[0].blob).toStrictEqual(patch1.toBinary()); + }); + + test('can receive deletion events', async () => { + const {call, caller} = setup(); + const emits: any[] = []; + caller.call$('blocks.listen', of({id: 'my-block'}), {}).subscribe((data) => emits.push(data)); + await call('blocks.create', {id: 'my-block', patches: []}); + await until(() => emits.length === 1); + expect(emits[0].data.block.seq).toBe(-1); + await call('blocks.remove', {id: 'my-block'}); + await until(() => emits.length === 2); + expect(emits[1].data.deleted).toBe(true); + }); + }); +}); diff --git a/src/server/__tests__/presence.spec.ts b/src/server/__tests__/presence.spec.ts new file mode 100644 index 0000000000..04b9cbeb11 --- /dev/null +++ b/src/server/__tests__/presence.spec.ts @@ -0,0 +1,131 @@ +import {of} from 'rxjs'; +import {setup} from './setup'; +import {tick, until} from '../../__tests__/util'; + +describe('presence', () => { + test('can subscribe and receive published presence entries', async () => { + const {caller} = setup(); + const emits: any[] = []; + caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { + emits.push(res.data); + }); + await caller.call( + 'presence.update', + { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', + }, + }, + {}, + ); + await until(() => emits.length === 1); + expect(emits[0]).toMatchObject({ + time: expect.any(Number), + entries: [ + { + id: 'user-1', + lastSeen: expect.any(Number), + validUntil: expect.any(Number), + data: { + hello: 'world', + }, + }, + ], + }); + }); + + test('can receive an existing record when subscribing after it was created', async () => { + const {caller} = setup(); + const emits: any[] = []; + caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { + emits.push(res.data); + }); + await caller.call( + 'presence.update', + { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', + }, + }, + {}, + ); + await until(() => emits.length === 1); + const emits2: any[] = []; + caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { + emits2.push(res.data); + }); + await until(() => emits2.length === 1); + expect(emits2[0]).toMatchObject({ + time: expect.any(Number), + entries: [ + { + id: 'user-1', + lastSeen: expect.any(Number), + validUntil: expect.any(Number), + data: { + hello: 'world', + }, + }, + ], + }); + }); + + test('can remove existing entries', async () => { + const {caller} = setup(); + const emits: any[] = []; + caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { + emits.push(res.data); + }); + await caller.call( + 'presence.update', + { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', + }, + }, + {}, + ); + await until(() => emits.length === 1); + await caller.call('presence.remove', {room: 'my-room', id: 'user-1'}, {}); + await until(() => emits.length === 2); + const emits2: any[] = []; + caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { + emits2.push(res.data); + }); + await tick(50); + expect(emits2.length).toBe(0); + }); + + test('emits entry deletion messages', async () => { + const {caller} = setup(); + await caller.call( + 'presence.update', + { + room: 'my-room', + id: 'user-1', + data: { + hello: 'world', + }, + }, + {}, + ); + const emits: any[] = []; + caller.call$('presence.listen', of({room: 'my-room'}), {}).subscribe((res) => { + emits.push(res.data); + }); + await caller.call('presence.remove', {room: 'my-room', id: 'user-1'}, {}); + await until(() => emits.length === 2); + expect(emits[1].entries[0]).toMatchObject({ + id: 'user-1', + lastSeen: expect.any(Number), + validUntil: 0, + data: expect.any(Object), + }); + }); +}); diff --git a/src/server/__tests__/pubsub.spec.ts b/src/server/__tests__/pubsub.spec.ts new file mode 100644 index 0000000000..5d3514b7cd --- /dev/null +++ b/src/server/__tests__/pubsub.spec.ts @@ -0,0 +1,131 @@ +import {of} from 'rxjs'; +import {setup} from './setup'; +import {tick, until} from '../../__tests__/util'; + +describe('pubsub', () => { + test('can subscribe and receive published messages', async () => { + const {caller} = setup(); + const emits: any[] = []; + caller.call$('pubsub.listen', of({channel: 'my-channel'}), {}).subscribe((res) => { + emits.push(res.data.message); + }); + await caller.call( + 'pubsub.publish', + { + channel: 'my-channel', + message: 'hello world', + }, + {}, + ); + await until(() => emits.length === 1); + expect(emits).toStrictEqual(['hello world']); + }); + + test('does not receive messages after un-subscription', async () => { + const {caller} = setup(); + const emits: any[] = []; + const sub = caller.call$('pubsub.listen', of({channel: 'my-channel'}), {}).subscribe((res) => { + emits.push(res.data.message); + }); + await caller.call( + 'pubsub.publish', + { + channel: 'my-channel', + message: 'msg1', + }, + {}, + ); + await caller.call( + 'pubsub.publish', + { + channel: 'my-channel', + message: 'msg2', + }, + {}, + ); + await until(() => emits.length === 2); + sub.unsubscribe(); + await caller.call( + 'pubsub.publish', + { + channel: 'my-channel', + message: 'msg3', + }, + {}, + ); + await tick(50); + expect(emits).toStrictEqual(['msg1', 'msg2']); + }); + + test('multiple multiple subscribers can subscribe to multiple channels', async () => { + const {caller} = setup(); + const user1: any[] = []; + const user2: any[] = []; + const user3: any[] = []; + caller.call$('pubsub.listen', of({channel: 'channel-1'}), {}).subscribe((res) => { + user1.push(res.data.message); + }); + const sub2 = caller.call$('pubsub.listen', of({channel: 'channel-2'}), {}).subscribe((res) => { + user2.push(res.data.message); + }); + caller.call$('pubsub.listen', of({channel: 'channel-1'}), {}).subscribe((res) => { + user3.push(res.data.message); + }); + caller.call$('pubsub.listen', of({channel: 'channel-2'}), {}).subscribe((res) => { + user3.push(res.data.message); + }); + await caller.call( + 'pubsub.publish', + { + channel: 'my-channel', + message: 'hello world', + }, + {}, + ); + await tick(50); + expect(user1).toStrictEqual([]); + expect(user2).toStrictEqual([]); + expect(user3).toStrictEqual([]); + caller + .call( + 'pubsub.publish', + { + channel: 'channel-1', + message: 'msg1', + }, + {}, + ) + .catch(() => {}); + caller + .call( + 'pubsub.publish', + { + channel: 'channel-2', + message: 'msg2', + }, + {}, + ) + .catch(() => {}); + await until(() => user1.length === 1); + await until(() => user2.length === 1); + await until(() => user3.length === 2); + expect(user1).toStrictEqual(['msg1']); + expect(user2).toStrictEqual(['msg2']); + expect(user3).toStrictEqual(['msg1', 'msg2']); + sub2.unsubscribe(); + caller + .call( + 'pubsub.publish', + { + channel: 'channel-2', + message: 'msg3', + }, + {}, + ) + .catch(() => {}); + await until(() => user3.length === 3); + expect(user1).toStrictEqual(['msg1']); + expect(user2).toStrictEqual(['msg2']); + expect(user3).toStrictEqual(['msg1', 'msg2', 'msg3']); + }); +}); diff --git a/src/server/__tests__/setup.ts b/src/server/__tests__/setup.ts new file mode 100644 index 0000000000..41f9bd0ba1 --- /dev/null +++ b/src/server/__tests__/setup.ts @@ -0,0 +1,9 @@ +import {createCaller} from '../routes'; +import {Services} from '../services/Services'; + +export const setup = () => { + const services = new Services(); + const caller = createCaller(services); + const call = caller.callSimple.bind(caller); + return {services, caller, call}; +}; diff --git a/src/server/__tests__/util.spec.ts b/src/server/__tests__/util.spec.ts new file mode 100644 index 0000000000..69c57dc977 --- /dev/null +++ b/src/server/__tests__/util.spec.ts @@ -0,0 +1,50 @@ +import {setup} from './setup'; + +describe('util.*', () => { + describe('util.ping', () => { + test('returns pong', async () => { + const {caller} = setup(); + const res = await caller.call('util.ping', {}, {}); + expect(res.data).toBe('pong'); + }); + }); + + describe('util.echo', () => { + test('returns strings', async () => { + const {caller} = setup(); + const res = await caller.call('util.echo', 'hello world', {}); + expect(res.data).toBe('hello world'); + }); + + test('returns objects', async () => { + const {caller} = setup(); + const res = await caller.call('util.echo', {foo: 'bar'}, {}); + expect(res.data).toStrictEqual({foo: 'bar'}); + }); + }); + + describe('util.info', () => { + test('returns stats object', async () => { + const {call} = setup(); + const res = await call('util.info', {}, {}); + expect(res).toMatchObject({ + now: expect.any(Number), + stats: { + pubsub: { + channels: expect.any(Number), + observers: expect.any(Number), + }, + presence: { + rooms: expect.any(Number), + entries: expect.any(Number), + observers: expect.any(Number), + }, + blocks: { + blocks: expect.any(Number), + patches: expect.any(Number), + }, + }, + }); + }); + }); +}); diff --git a/src/server/index.ts b/src/server/index.ts new file mode 100644 index 0000000000..d70f661ae5 --- /dev/null +++ b/src/server/index.ts @@ -0,0 +1,44 @@ +// Run: npx ts-node src/server/index.ts + +import {App} from 'uWebSockets.js'; +import {RpcApp} from '../reactive-rpc/server/uws/RpcApp'; +import {Codecs} from '../json-pack/codecs/Codecs'; +import {Writer} from '../util/buffers/Writer'; +import {createCaller} from './routes'; +import {Services} from './services/Services'; +import type {RpcCaller} from '../reactive-rpc/common/rpc/caller/RpcCaller'; +import type {MyCtx} from './services/types'; + +const uws = App({}); +const codecs = new Codecs(new Writer()); +const services = new Services(); +const caller = createCaller(services); +const app = new RpcApp({ + uws, + caller: caller as RpcCaller, + codecs, + maxRequestBodySize: 1024 * 1024, + augmentContext: (ctx) => ctx, +}); + +app.enableCors(); +app.enableHttpPing(); +app.route('POST', '/echo', async (ctx) => { + const json = await ctx.requestBodyJson(1024); + return json; +}); +app.enableHttpRpc(); +app.enableWsRpc(); +app.startRouting(); + +const port = +(process.env.PORT || 9999); + +uws.listen(port, (token) => { + if (token) { + // tslint:disable-next-line no-console + console.log({msg: 'SERVER_STARTED', url: `http://localhost:${port}`}); + } else { + // tslint:disable-next-line no-console + console.error(`Failed to listen on ${port} port.`); + } +}); diff --git a/src/server/routes/blocks/index.ts b/src/server/routes/blocks/index.ts new file mode 100644 index 0000000000..fb39b20751 --- /dev/null +++ b/src/server/routes/blocks/index.ts @@ -0,0 +1,26 @@ +import {create} from './methods/create'; +import {get} from './methods/get'; +import {remove} from './methods/remove'; +import {edit} from './methods/edit'; +import {listen} from './methods/listen'; +import {Block, BlockId, BlockPatch, BlockSeq} from './schema'; +import type {RoutesBase, TypeRouter} from '../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../types'; + +export const blocks = + (d: RouteDeps) => + (r: TypeRouter) => { + r.system.alias('BlockId', BlockId); + r.system.alias('BlockSeq', BlockSeq); + r.system.alias('Block', Block); + r.system.alias('BlockPatch', BlockPatch); + + // prettier-ignore + return ( + ( create(d) + ( get(d) + ( remove(d) + ( edit(d) + ( listen(d) + ( r ))))))); + }; diff --git a/src/server/routes/blocks/methods/create.ts b/src/server/routes/blocks/methods/create.ts new file mode 100644 index 0000000000..3b82a46d48 --- /dev/null +++ b/src/server/routes/blocks/methods/create.ts @@ -0,0 +1,36 @@ +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; +import type {BlockId, BlockPatch} from '../schema'; + +export const create = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const Request = t.Object( + t.prop('id', t.Ref('BlockId')).options({ + title: 'New block ID', + description: 'The ID of the new block.', + }), + t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ + title: 'Patches', + description: 'The patches to apply to the document.', + }), + ); + + const Response = t.obj; + + const Func = t + .Function(Request, Response) + .options({ + title: 'Create Block', + intro: 'Creates a new block or applies patches to it.', + description: 'Creates a new block or applies patches to it.', + }) + .implement(async ({id, patches}) => { + const {block} = await services.blocks.create(id, patches); + return {}; + }); + + return router.fn('blocks.create', Func); + }; diff --git a/src/server/routes/blocks/methods/edit.ts b/src/server/routes/blocks/methods/edit.ts new file mode 100644 index 0000000000..f0505d937f --- /dev/null +++ b/src/server/routes/blocks/methods/edit.ts @@ -0,0 +1,52 @@ +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; +import type {BlockId, BlockPatch} from '../schema'; + +export const edit = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + const PatchType = t.Ref('BlockPatch'); + + const Request = t.Object( + t.prop('id', t.Ref('BlockId')).options({ + title: 'Document ID', + description: 'The ID of the document to apply the patch to.', + }), + // This can be inferred from the "seq" of the first patch: + // t.prop('seq', t.Ref('BlockSeq')).options({ + // title: 'Last known sequence number', + // description: + // 'The last known sequence number of the document. ' + + // 'If the document has changed since this sequence number, ' + + // 'the response will contain all the necessary patches for the client to catch up.', + // }), + t.prop('patches', t.Array(PatchType)).options({ + title: 'Patches', + description: 'The patches to apply to the document.', + }), + ); + + const Response = t.Object( + t.prop('patches', t.Array(PatchType)).options({ + title: 'Latest patches', + description: 'The list of patches that the client might have missed and should apply to the document.', + }), + ); + + const Func = t + .Function(Request, Response) + .options({ + title: 'Edit Block', + intro: 'Applies patches to an existing block.', + description: 'Applies patches to an existing document and returns the latest concurrent changes.', + }) + .implement(async ({id, patches}) => { + const res = await services.blocks.edit(id, patches); + return { + patches: res.patches, + }; + }); + + return router.fn('blocks.edit', Func); + }; diff --git a/src/server/routes/blocks/methods/get.ts b/src/server/routes/blocks/methods/get.ts new file mode 100644 index 0000000000..300db9bece --- /dev/null +++ b/src/server/routes/blocks/methods/get.ts @@ -0,0 +1,34 @@ +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; +import type {Block, BlockId} from '../schema'; + +export const get = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const Request = t.Object( + t.prop('id', t.Ref('BlockId')).options({ + title: 'Block ID', + description: 'The ID of the block to retrieve.', + }), + ); + + const Response = t.Object(t.prop('block', t.Ref('Block').options({}))); + + const Func = t + .Function(Request, Response) + .options({ + title: 'Read Block', + intro: 'Retrieves a block by ID.', + description: 'Fetches a block by ID.', + }) + .implement(async ({id}) => { + const {block} = await services.blocks.get(id); + return { + block: block, + }; + }); + + return router.fn('blocks.get', Func); + }; diff --git a/src/server/routes/blocks/methods/listen.ts b/src/server/routes/blocks/methods/listen.ts new file mode 100644 index 0000000000..1801953d04 --- /dev/null +++ b/src/server/routes/blocks/methods/listen.ts @@ -0,0 +1,45 @@ +import {switchMap} from 'rxjs'; +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; +import type {BlockId, BlockPatch, Block} from '../schema'; + +export const listen = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + const PatchType = t.Ref('BlockPatch'); + + const Request = t.Object( + t.prop('id', t.Ref('BlockId')).options({ + title: 'Block ID', + description: 'The ID of the block to subscribe to.', + }), + ); + + const Response = t.Object( + t.propOpt('deleted', t.Boolean()).options({ + title: 'Deleted', + description: 'Emitted only when the block is deleted.', + }), + t.propOpt('block', t.Ref('Block')).options({ + title: 'Block', + description: 'The whole block object, emitted only when the block is created.', + }), + t.propOpt('patches', t.Array(PatchType)).options({ + title: 'Latest patches', + description: 'Patches that have been applied to the block.', + }), + ); + + const Func = t + .Function$(Request, Response) + .options({ + title: 'Listen for block changes', + description: 'Subscribe to a block to receive updates when it changes.', + }) + .implement((req$) => { + return req$.pipe(switchMap(({id}) => services.pubsub.listen$(`__block:${id}`))) as any; + }); + + return router.fn$('blocks.listen', Func); + }; diff --git a/src/server/routes/blocks/methods/remove.ts b/src/server/routes/blocks/methods/remove.ts new file mode 100644 index 0000000000..1113444f1f --- /dev/null +++ b/src/server/routes/blocks/methods/remove.ts @@ -0,0 +1,32 @@ +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; +import type {BlockId} from '../schema'; + +export const remove = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const Request = t.Object( + t.prop('id', t.Ref('BlockId')).options({ + title: 'Block ID', + description: 'The ID of the block to delete.', + }), + ); + + const Response = t.obj; + + const Func = t + .Function(Request, Response) + .options({ + title: 'Read Block', + intro: 'Retrieves a block by ID.', + description: 'Fetches a block by ID.', + }) + .implement(async ({id}) => { + await services.blocks.remove(id); + return {}; + }); + + return router.fn('blocks.remove', Func); + }; diff --git a/src/server/routes/blocks/schema.ts b/src/server/routes/blocks/schema.ts new file mode 100644 index 0000000000..20f57df5a1 --- /dev/null +++ b/src/server/routes/blocks/schema.ts @@ -0,0 +1,27 @@ +import {type ResolveType, t} from '../../../json-type'; + +export type TBlockId = ResolveType; +export const BlockId = t.str.options({ + title: 'Block ID', + min: 6, + max: 40, +}); + +export type TBlockSeq = ResolveType; +export const BlockSeq = t.num.options({ + title: 'Block Sequence Number', + gte: 0, + format: 'u32', +}); + +export type TBlock = ResolveType; +export const Block = t.Object( + t.prop('id', t.Ref('BlockId')), + t.prop('seq', t.Ref('BlockSeq')), + t.prop('created', t.num), + t.prop('updated', t.num), + t.prop('blob', t.bin), +); + +export type TBlockPatch = ResolveType; +export const BlockPatch = t.Object(t.prop('seq', t.num), t.prop('created', t.num), t.prop('blob', t.bin)); diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts new file mode 100644 index 0000000000..18e387a6bf --- /dev/null +++ b/src/server/routes/index.ts @@ -0,0 +1,19 @@ +import {TypeSystem} from '../../json-type'; +import {TypeRouter} from '../../json-type/system/TypeRouter'; +import {TypeRouterCaller} from '../../reactive-rpc/common/rpc/caller/TypeRouterCaller'; +import {routes} from './routes'; +import type {Services} from '../services/Services'; +import type {RouteDeps} from './types'; + +export const createRouter = (services: Services) => { + const system = new TypeSystem(); + const r = new TypeRouter({system, routes: {}}); + const deps: RouteDeps = {services}; + return routes(deps)(r); +}; + +export const createCaller = (services: Services) => { + const router = createRouter(services); + const caller = new TypeRouterCaller({router}); + return caller; +}; diff --git a/src/server/routes/presence/index.ts b/src/server/routes/presence/index.ts new file mode 100644 index 0000000000..6227153dc9 --- /dev/null +++ b/src/server/routes/presence/index.ts @@ -0,0 +1,19 @@ +import {update} from './methods/update'; +import {listen} from './methods/listen'; +import {remove} from './methods/remove'; +import {PresenceEntry} from './schema'; +import type {RoutesBase, TypeRouter} from '../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../types'; + +export const presence = + (d: RouteDeps) => + (r: TypeRouter) => { + r.system.alias('PresenceEntry', PresenceEntry); + + // prettier-ignore + return ( + ( update(d) + ( remove(d) + ( listen(d) + ( r ))))); + }; diff --git a/src/server/routes/presence/methods/listen.ts b/src/server/routes/presence/methods/listen.ts new file mode 100644 index 0000000000..5baae8f85f --- /dev/null +++ b/src/server/routes/presence/methods/listen.ts @@ -0,0 +1,47 @@ +import {map, switchMap} from 'rxjs'; +import type {PresenceEntry, TPresenceEntry} from '../schema'; +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; + +export const listen = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const Request = t.Object( + t.prop('room', t.str).options({ + title: 'Room ID', + description: 'The ID of the room to subscribe to.', + }), + ); + + const Response = t.Object( + t.prop('entries', t.Array(t.Ref('PresenceEntry'))), + t.prop('time', t.num).options({ + title: 'Current time', + description: 'The current server time in milliseconds since the UNIX epoch.', + }), + ); + + const Func = t + .Function$(Request, Response) + .options({ + title: 'Subscribe to a room.', + intro: 'Subscribes to presence updates in a room.', + description: + 'This method subscribes to presence updates in a room. ' + + 'It returns an array of all current presence entries in the room, and then emits an update whenever ' + + 'a presence entry is updated or deleted. ', + }) + .implement((req$) => { + return req$.pipe( + switchMap((req) => services.presence.listen$(req.room)), + map((entries: TPresenceEntry[]) => ({ + entries, + time: Date.now(), + })), + ); + }); + + return router.fn$('presence.listen', Func); + }; diff --git a/src/server/routes/presence/methods/remove.ts b/src/server/routes/presence/methods/remove.ts new file mode 100644 index 0000000000..398570a271 --- /dev/null +++ b/src/server/routes/presence/methods/remove.ts @@ -0,0 +1,35 @@ +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; + +export const remove = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const Request = t.Object( + t.prop('room', t.str).options({ + title: 'Room ID', + description: 'The ID of the room from which to remove the entry.', + }), + t.prop('id', t.str).options({ + title: 'ID of the entry', + description: 'The ID of the entry to remove.', + }), + ); + + const Response = t.obj; + + const Func = t + .Function(Request, Response) + .options({ + title: 'Remove a presence entry.', + intro: 'Removes a presence entry from a room and notifies all listeners.', + description: 'This method removes a presence entry from a room and notifies all listeners. ', + }) + .implement(async ({room, id}) => { + await services.presence.remove(room, id); + return {}; + }); + + return router.fn('presence.remove', Func); + }; diff --git a/src/server/routes/presence/methods/update.ts b/src/server/routes/presence/methods/update.ts new file mode 100644 index 0000000000..5b300fc8a1 --- /dev/null +++ b/src/server/routes/presence/methods/update.ts @@ -0,0 +1,80 @@ +import type {ResolveType} from '../../../../json-type'; +import type {PresenceEntry} from '../schema'; +import type {RoutesBase, TypeRouter} from '../../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../../types'; + +/** Entry TLL in seconds. */ +const ttl = 30; + +export const update = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const Request = t + .Object( + t.prop('room', t.str).options({ + title: 'Room ID', + description: 'The ID of the room to update.', + }), + t.prop('id', t.str).options({ + title: 'ID of the entry', + description: 'The ID of the entry to update.', + }), + t.prop('data', t.any).options({ + title: 'Entry data', + description: 'A map of key-value pairs to update. The object is merged with the existing entry data, if any.', + }), + ) + .options({ + examples: [ + { + title: 'Update user entry', + description: + 'The data section of the entry is merged with the existing data. ' + + 'It can contain any key-value pairs. For example, the `cursor` property is used to store the ' + + 'current cursor position of the user in the room.', + value: { + room: 'my-room', + id: 'user-1', + data: { + name: 'John Doe', + cursor: [123, 456], + }, + }, + }, + ], + }); + + const Response = t + .Object( + t.prop('entry', t.Ref('PresenceEntry')), + t.prop('time', t.num).options({ + title: 'Current time', + description: 'The current server time in milliseconds since the UNIX epoch.', + }), + ) + .options({ + title: 'Presence update response', + }); + + const Func = t + .Function(Request, Response) + .options({ + title: 'Update presence entry', + intro: 'Update a presence entry in a room.', + description: + 'This method updates a presence entry in a room. ' + + `The entry is automatically removed after ${ttl} seconds. ` + + `Every time the entry is updated, the TTL is reset to ${ttl} seconds.`, + }) + .implement(async ({room, id, data}) => { + const entry = (await services.presence.update(room, id, ttl * 1000, data)) as ResolveType; + return { + entry, + time: Date.now(), + }; + }); + + return router.fn('presence.update', Func); + }; diff --git a/src/server/routes/presence/schema.ts b/src/server/routes/presence/schema.ts new file mode 100644 index 0000000000..729733eb94 --- /dev/null +++ b/src/server/routes/presence/schema.ts @@ -0,0 +1,15 @@ +import {type ResolveType, t} from '../../../json-type'; + +export const PresenceEntry = t.Object( + t.prop('id', t.str), + t.prop('lastSeen', t.num), + t.prop('validUntil', t.num), + t.prop( + 'data', + t.obj.options({ + encodeUnknownFields: true, + }), + ), +); + +export type TPresenceEntry = ResolveType; diff --git a/src/server/routes/pubsub/index.ts b/src/server/routes/pubsub/index.ts new file mode 100644 index 0000000000..aacf55c136 --- /dev/null +++ b/src/server/routes/pubsub/index.ts @@ -0,0 +1,10 @@ +import {publish} from './publish'; +import {listen} from './listen'; +import type {RoutesBase, TypeRouter} from '../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../types'; + +// prettier-ignore +export const pubsub = (d: RouteDeps) => (r: TypeRouter) => + ( publish(d) + ( listen(d) + ( r ))); diff --git a/src/server/routes/pubsub/listen.ts b/src/server/routes/pubsub/listen.ts new file mode 100644 index 0000000000..ec8e05cd02 --- /dev/null +++ b/src/server/routes/pubsub/listen.ts @@ -0,0 +1,33 @@ +import {map, switchMap} from 'rxjs'; +import type {RoutesBase, TypeRouter} from '../../../json-type/system/TypeRouter'; +import type {RouteDeps} from '../types'; + +export const listen = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const req = t.Object( + t.prop('channel', t.str).options({ + title: 'Channel name', + description: 'The name of the channel to subscribe to.', + }), + ); + + const res = t.Object( + t.prop('message', t.any).options({ + title: 'Subscription message', + description: 'A message received from the channel. Emitted every time a message is published to the channel.', + }), + ); + + const func = t.Function$(req, res).implement((req) => { + const response = req.pipe( + switchMap((req) => services.pubsub.listen$(req.channel)), + map((message: any) => ({message})), + ); + return response; + }); + + return router.fn$('pubsub.listen', func); + }; diff --git a/src/server/routes/pubsub/publish.ts b/src/server/routes/pubsub/publish.ts new file mode 100644 index 0000000000..efe6c5c348 --- /dev/null +++ b/src/server/routes/pubsub/publish.ts @@ -0,0 +1,42 @@ +import type {RoutesBase, TypeRouter} from '../../../json-type/system/TypeRouter'; +import type {MyCtx} from '../../services/types'; +import type {RouteDeps} from '../types'; + +export const publish = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + + const req = t.Object( + t.prop('channel', t.str).options({ + title: 'Channel name', + description: 'The name of the channel to publish to.', + }), + t.prop('message', t.any).options({ + title: 'Message', + description: 'The message to publish to the channel. Can be any JSON/CBOR value.', + }), + ); + + const res = t.obj.options({ + title: 'Publish response', + description: 'An empty object.', + }); + + const func = t + .Function(req, res) + .options({ + title: 'Publish to channel', + intro: 'Publish a message to a channel.', + description: + 'This method publishes a message to a global channel with the given `channel` name. ' + + 'All subscribers to the channel will receive the message. The `message` can be any value. ' + + 'The most efficient way to publish a message is to send a primitive or a `Uint8Array` buffer.', + }) + .implement(async ({channel, message}) => { + services.pubsub.publish(channel, message); + return {}; + }); + + return router.fn('pubsub.publish', func); + }; diff --git a/src/server/routes/routes.ts b/src/server/routes/routes.ts new file mode 100644 index 0000000000..5a0dfd216f --- /dev/null +++ b/src/server/routes/routes.ts @@ -0,0 +1,14 @@ +import {util} from './util'; +import {pubsub} from './pubsub'; +import {presence} from './presence'; +import {blocks} from './blocks'; +import type {RoutesBase, TypeRouter} from '../../json-type/system/TypeRouter'; +import type {RouteDeps} from './types'; + +// prettier-ignore +export const routes = (d: RouteDeps) => (r: TypeRouter) => + ( util(d) + ( pubsub(d) + ( presence(d) + ( blocks(d) + ( r ))))); diff --git a/src/server/routes/types.ts b/src/server/routes/types.ts new file mode 100644 index 0000000000..03f0e10d1b --- /dev/null +++ b/src/server/routes/types.ts @@ -0,0 +1,5 @@ +import type {Services} from '../services/Services'; + +export interface RouteDeps { + services: Services; +} diff --git a/src/server/routes/util/index.ts b/src/server/routes/util/index.ts new file mode 100644 index 0000000000..47d5238666 --- /dev/null +++ b/src/server/routes/util/index.ts @@ -0,0 +1,61 @@ +import type {RoutesBase, TypeRouter} from '../../../json-type/system/TypeRouter'; +import type {MyCtx} from '../../services/types'; +import type {RouteDeps} from '../types'; + +export const ping = + (deps: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + const req = t.any; + const res = t.Const('pong'); + const func = t.Function(req, res).implement(async () => { + return 'pong'; + }); + return router.fn('util.ping', func); + }; + +export const echo = + (deps: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + const req = t.any; + const res = t.any; + const func = t.Function(req, res).implement(async (msg) => msg); + return router.fn('util.echo', func); + }; + +export const info = + ({services}: RouteDeps) => + (router: TypeRouter) => { + const t = router.t; + const Request = t.any; + const Response = t.Object( + t.prop('now', t.num), + t.prop( + 'stats', + t.Object( + t.prop('pubsub', t.Object(t.prop('channels', t.num), t.prop('observers', t.num))), + t.prop('presence', t.Object(t.prop('rooms', t.num), t.prop('entries', t.num), t.prop('observers', t.num))), + t.prop('blocks', t.Object(t.prop('blocks', t.num), t.prop('patches', t.num))), + ), + ), + ); + const Func = t.Function(Request, Response).implement(async () => { + return { + now: Date.now(), + stats: { + pubsub: services.pubsub.stats(), + presence: services.presence.stats(), + blocks: services.blocks.stats(), + }, + }; + }); + return router.fn('util.info', Func); + }; + +// prettier-ignore +export const util = (deps: RouteDeps) => (r: TypeRouter) => + ( ping(deps) + ( echo(deps) + ( info(deps) + ( r )))); diff --git a/src/server/services/PresenceService.ts b/src/server/services/PresenceService.ts new file mode 100644 index 0000000000..fca1c38a44 --- /dev/null +++ b/src/server/services/PresenceService.ts @@ -0,0 +1,101 @@ +import {Observable, Observer} from 'rxjs'; +import {TPresenceEntry} from '../routes/presence/schema'; + +export type PresenceRoom = Map; + +export class PresenceService { + private readonly rooms = new Map(); + private readonly observers = new Map[]>(); + + public async update(roomId: string, entryId: string, ttl: number, data: unknown): Promise { + const now = Date.now(); + const room = this.getRoom(roomId); + if (!data || typeof data !== 'object') throw new Error('ROOM_ENTRY_MUST_BE_OBJECT'); + const entry: TPresenceEntry = room.get(entryId) ?? { + id: entryId, + lastSeen: now, + validUntil: now + ttl, + data: {}, + }; + entry.lastSeen = now; + entry.validUntil = now + ttl; + Object.assign(entry.data, data); + room.set(entryId, entry); + this.cleanUpRoom(roomId); + await new Promise((resolve) => setImmediate(resolve)); + const observers = this.observers.get(roomId); + if (observers) for (const observer of observers) observer.next([entry]); + return entry; + } + + public async remove(roomId: string, entryId: string): Promise { + const room = this.getRoom(roomId); + room.delete(entryId); + if (!room.size) this.rooms.delete(roomId); + await new Promise((resolve) => setImmediate(resolve)); + const observers = this.observers.get(roomId); + if (observers) + for (const observer of observers) + observer.next([ + { + id: entryId, + lastSeen: Date.now(), + validUntil: 0, + data: {}, + }, + ]); + } + + public listen$(roomId: string): Observable { + return new Observable((observer) => { + this.cleanUpRoom(roomId); + if (!this.observers.has(roomId)) this.observers.set(roomId, []); + this.observers.get(roomId)!.push(observer); + const room = this.getRoom(roomId); + const entries: TPresenceEntry[] = []; + for (const entry of room.values()) { + entries.push(entry); + if (entries.length === 100) break; + } + if (entries.length) observer.next(entries); + return () => { + const observers: Observer[] = this.observers.get(roomId)!; + if (!observers) { + this.cleanUpRoom(roomId); + return; + } + const index = observers.findIndex((o) => o === observer); + if (index > -1) observers.splice(index, 1); + if (!observers.length) { + this.observers.delete(roomId); + } + }; + }); + } + + private getRoom(roomId: string): PresenceRoom { + const room = this.rooms.get(roomId); + if (room) return room; + const newRoom = new Map(); + this.rooms.set(roomId, newRoom); + return newRoom; + } + + private cleanUpRoom(roomId: string) { + const room = this.rooms.get(roomId); + if (!room) return; + const now = Date.now(); + for (const [entry, presence] of room.entries()) { + if (presence.validUntil < now) room.delete(entry); + } + if (!room.size) this.rooms.delete(roomId); + } + + public stats(): {rooms: number; entries: number; observers: number} { + return { + rooms: this.rooms.size, + entries: [...this.rooms.values()].reduce((acc, v) => acc + v.size, 0), + observers: [...this.observers.values()].reduce((acc, v) => acc + v.length, 0), + }; + } +} diff --git a/src/server/services/PubSubService.ts b/src/server/services/PubSubService.ts new file mode 100644 index 0000000000..d5a5b9b83b --- /dev/null +++ b/src/server/services/PubSubService.ts @@ -0,0 +1,36 @@ +import {Observable, Observer} from 'rxjs'; + +export class PubsubService { + private readonly observers = new Map[]>(); + + public listen$(channel: string): Observable { + return new Observable((observer) => { + if (!this.observers.has(channel)) this.observers.set(channel, []); + const observers: Observer[] = this.observers.get(channel)!; + observers.push(observer); + return () => { + const observers: Observer[] = this.observers.get(channel)!; + if (!observers) return; + const index = observers.findIndex((o) => o === observer); + if (index > -1) observers.splice(index, 1); + if (!observers.length) { + this.observers.delete(channel); + } + }; + }); + } + + public async publish(channel: string, message: unknown): Promise { + await new Promise((resolve) => setImmediate(resolve)); + const observers = this.observers.get(channel); + if (!observers) return; + for (const observer of observers) observer.next(message); + } + + public stats(): {channels: number; observers: number} { + return { + channels: this.observers.size, + observers: [...this.observers.values()].reduce((acc, v) => acc + v.length, 0), + }; + } +} diff --git a/src/server/services/Services.ts b/src/server/services/Services.ts new file mode 100644 index 0000000000..373700405b --- /dev/null +++ b/src/server/services/Services.ts @@ -0,0 +1,15 @@ +import {PresenceService} from './PresenceService'; +import {PubsubService} from './PubSubService'; +import {BlocksServices} from './blocks/BlocksServices'; + +export class Services { + public readonly pubsub: PubsubService; + public readonly presence: PresenceService; + public readonly blocks: BlocksServices; + + constructor() { + this.pubsub = new PubsubService(); + this.presence = new PresenceService(); + this.blocks = new BlocksServices(this); + } +} diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts new file mode 100644 index 0000000000..405dba62c8 --- /dev/null +++ b/src/server/services/blocks/BlocksServices.ts @@ -0,0 +1,66 @@ +import {MemoryStore} from './MemoryStore'; +import {StorePatch} from './types'; +import {RpcError, RpcErrorCodes} from '../../../reactive-rpc/common/rpc/caller'; +import type {Services} from '../Services'; + +export class BlocksServices { + protected readonly store = new MemoryStore(); + + constructor(protected readonly services: Services) {} + + public async create(id: string, patches: StorePatch[]) { + const {store} = this; + const {block} = await store.create(id, patches); + const data = { + block, + patches, + }; + this.services.pubsub.publish(`__block:${id}`, data).catch((error) => { + // tslint:disable-next-line:no-console + console.error('Error publishing block patches', error); + }); + return {block}; + } + + public async get(id: string) { + const {store} = this; + const result = await store.get(id); + if (!result) throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND); + const {block} = result; + return {block}; + } + + public async remove(id: string) { + await this.store.remove(id); + this.services.pubsub.publish(`__block:${id}`, {deleted: true}).catch((error) => { + // tslint:disable-next-line:no-console + console.error('Error publishing block deletion', error); + }); + } + + public async edit(id: string, patches: any[]) { + if (!Array.isArray(patches)) throw RpcError.validation('patches must be an array'); + if (!patches.length) throw RpcError.validation('patches must not be empty'); + const seq = patches[0].seq; + const {store} = this; + const {block} = await store.edit(id, patches); + this.services.pubsub.publish(`__block:${id}`, {patches}).catch((error) => { + // tslint:disable-next-line:no-console + console.error('Error publishing block patches', error); + }); + const expectedBlockSeq = seq + patches.length - 1; + const hadConcurrentEdits = block.seq !== expectedBlockSeq; + let patchesBack: StorePatch[] = []; + if (hadConcurrentEdits) { + patchesBack = await store.history(id, seq, block.seq); + } + return { + block, + patches: patchesBack, + }; + } + + public stats() { + return this.store.stats(); + } +} diff --git a/src/server/services/blocks/MemoryStore.ts b/src/server/services/blocks/MemoryStore.ts new file mode 100644 index 0000000000..1e21496f09 --- /dev/null +++ b/src/server/services/blocks/MemoryStore.ts @@ -0,0 +1,85 @@ +import {Model} from '../../../json-crdt'; +import {Patch} from '../../../json-crdt-patch'; +import type * as types from './types'; + +export class MemoryStore implements types.Store { + protected readonly blocks = new Map(); + protected readonly patches = new Map(); + + public async get(id: string): Promise { + await new Promise((resolve) => setImmediate(resolve)); + const block = this.blocks.get(id); + if (!block) return; + return {block}; + } + + public async create(id: string, patches: types.StorePatch[]): Promise { + await new Promise((resolve) => setImmediate(resolve)); + if (!Array.isArray(patches)) throw new Error('NO_PATCHES'); + if (this.blocks.has(id)) throw new Error('BLOCK_EXISTS'); + const model = Model.withLogicalClock(); + let seq = -1; + const now = Date.now(); + if (patches.length) { + for (const patch of patches) { + seq++; + if (seq !== patch.seq) throw new Error('PATCHES_OUT_OF_ORDER'); + model.applyPatch(Patch.fromBinary(patch.blob)); + if (patch.created > now) patch.created = now; + } + } + const block: types.StoreBlock = { + id, + seq: seq, + blob: model.toBinary(), + created: now, + updated: now, + }; + this.blocks.set(id, block); + this.patches.set(id, patches); + return {block}; + } + + public async edit(id: string, patches: types.StorePatch[]): Promise { + await new Promise((resolve) => setImmediate(resolve)); + if (!Array.isArray(patches) || !patches.length) throw new Error('NO_PATCHES'); + const block = this.blocks.get(id); + const existingPatches = this.patches.get(id); + if (!block || !existingPatches) throw new Error('BLOCK_NOT_FOUND'); + let seq = patches[0].seq; + const diff = seq - block.seq - 1; + if (block.seq + 1 < seq) throw new Error('PATCH_SEQ_TOO_HIGH'); + const model = Model.fromBinary(block.blob); + for (const patch of patches) { + if (seq !== patch.seq) throw new Error('PATCHES_OUT_OF_ORDER'); + model.applyPatch(Patch.fromBinary(patch.blob)); + patch.seq -= diff; + seq++; + } + block.seq += patches.length; + block.blob = model.toBinary(); + block.updated = Date.now(); + for (const patch of patches) existingPatches.push(patch); + return {block}; + } + + public async history(id: string, min: number, max: number): Promise { + await new Promise((resolve) => setImmediate(resolve)); + const patches = this.patches.get(id); + if (!patches) return []; + return patches.slice(min, max + 1); + } + + public async remove(id: string): Promise { + await new Promise((resolve) => setImmediate(resolve)); + this.blocks.delete(id); + this.patches.delete(id); + } + + public stats(): {blocks: number; patches: number} { + return { + blocks: this.blocks.size, + patches: [...this.patches.values()].reduce((acc, v) => acc + v.length, 0), + }; + } +} diff --git a/src/server/services/blocks/types.ts b/src/server/services/blocks/types.ts new file mode 100644 index 0000000000..d6932bed34 --- /dev/null +++ b/src/server/services/blocks/types.ts @@ -0,0 +1,29 @@ +export interface StoreBlock { + id: string; + seq: number; + created: number; + updated: number; + blob: Uint8Array; +} + +export interface StorePatch { + seq: number; + created: number; + blob: Uint8Array; +} + +export interface Store { + create(id: string, patches: StorePatch[]): Promise; + get(id: string): Promise; + edit(id: string, patches: StorePatch[]): Promise; + history(id: string, min: number, max: number): Promise; + remove(id: string): Promise; +} + +export interface StoreGetResult { + block: StoreBlock; +} + +export interface StoreApplyResult { + block: StoreBlock; +} diff --git a/src/server/services/types.ts b/src/server/services/types.ts new file mode 100644 index 0000000000..8da0c546f2 --- /dev/null +++ b/src/server/services/types.ts @@ -0,0 +1,4 @@ +import type {ConnectionContext} from '../../reactive-rpc/server/context'; +import type {Services} from './Services'; + +export type MyCtx = ConnectionContext<{services: Services}>;