diff --git a/src/json-crdt/history/LocalHistoryCrud.ts b/src/json-crdt-repo/LocalHistoryCrud.ts similarity index 80% rename from src/json-crdt/history/LocalHistoryCrud.ts rename to src/json-crdt-repo/LocalHistoryCrud.ts index 7baf866950..a886f3ba21 100644 --- a/src/json-crdt/history/LocalHistoryCrud.ts +++ b/src/json-crdt-repo/LocalHistoryCrud.ts @@ -1,11 +1,11 @@ import {CborEncoder} from '@jsonjoy.com/json-pack/lib/cbor/CborEncoder'; import {CborDecoder} from '@jsonjoy.com/json-pack/lib/cbor/CborDecoder'; -import {LogEncoder} from '../log/codec/LogEncoder'; -import {LogDecoder} from '../log/codec/LogDecoder'; +import {LogEncoder} from '../json-crdt/log/codec/LogEncoder'; +import {LogDecoder} from '../json-crdt/log/codec/LogDecoder'; import type {CrudApi} from 'memfs/lib/crud/types'; import type {Locks} from 'thingies/es2020/Locks'; -import type {Patch} from '../../json-crdt-patch'; -import type {Log} from '../log/Log'; +import type {Patch} from '../json-crdt-patch'; +import type {Log} from '../json-crdt/log/Log'; import type {LocalHistory} from './types'; export const genId = (octets: number = 8): string => { @@ -30,9 +30,8 @@ export class LocalHistoryCrud implements LocalHistory { protected readonly locks: Locks, ) {} - public async create(collection: string[], log: Log): Promise<{id: string}> { + public async create(collection: string[], log: Log, id: string = genId()): Promise<{id: string}> { const blob = this.encode(log); - const id = genId(); await this.lock(collection, id, async () => { await this.crud.put([...collection, id], STATE_FILE_NAME, blob, {throwIf: 'exists'}); }); @@ -54,12 +53,17 @@ export class LocalHistoryCrud implements LocalHistory { const {frontier} = this.decoder.decode(blob, {format: 'seq.cbor', frontier: true}); return { log: frontier!, - cursor: '', + cursor: '1', }; } - public readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}> { - throw new Error('Method not implemented.'); + public async readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}> { + const blob = await this.crud.get([...collection, id], STATE_FILE_NAME); + const {history} = this.decoder.decode(blob, {format: 'seq.cbor', history: true}); + return { + log: history!, + cursor: '', + }; } public async update(collection: string[], id: string, patches: Patch[]): Promise { diff --git a/src/json-crdt-repo/SessionHistory.ts b/src/json-crdt-repo/SessionHistory.ts new file mode 100644 index 0000000000..6bbc7ef660 --- /dev/null +++ b/src/json-crdt-repo/SessionHistory.ts @@ -0,0 +1,78 @@ +import {createRace} from 'thingies/es2020/createRace'; +import {FanOutUnsubscribe} from 'thingies/es2020/fanout'; +import {InsValOp, Patch} from '../json-crdt-patch'; +import {ValNode} from '../json-crdt/nodes'; +import {toSchema} from '../json-crdt/schema/toSchema'; +import {Log} from '../json-crdt/log/Log'; +import {RedoItem, UndoItem, UndoRedoStack} from './UndoRedoStack'; +import type {LocalHistory} from './types'; + +class Undo implements UndoItem { + constructor(public readonly undo: () => Redo) {} +} + +class Redo implements RedoItem { + constructor(public readonly redo: () => Undo) {} +} + +export class SessionHistory { + constructor( + public readonly collection: string[], + public readonly id: string, + protected readonly local: LocalHistory, + ) {} + + private readonly __onPatchRace = createRace(); + + public attachUndoRedo(stack: UndoRedoStack): FanOutUnsubscribe { + // const onBeforePatch = (patch: Patch) => { + // this.__onPatchRace(() => { + // const undo = this.createUndo(patch); + // stack.push(undo); + // }); + // }; + // const unsubscribe = this.log.end.api.onBeforePatch.listen(onBeforePatch); + // return unsubscribe; + throw new Error('Method not implemented.'); + } + + public createUndo(patch: Patch): Undo { + const undoTasks: Array<() => void> = []; + const ops = patch.ops; + const length = ops.length; + for (let i = length - 1; i >= 0; i--) { + const op = ops[i]; + switch (op.name()) { + case 'ins_val': { + // const insOp = op as InsValOp; + // const valNode = this.log.end.index.get(insOp.obj); + // if (!(valNode instanceof ValNode)) throw new Error('INVALID_NODE'); + // const copy = toSchema(valNode.node()); + // const valNodeId = valNode.id; + // const task = () => { + // const end = this.log.end; + // const valNode = end.index.get(valNodeId); + // if (!valNode) return; + // end.api.wrap(valNode).asVal().set(copy); + // }; + // undoTasks.push(task); + } + } + } + const undo = new Undo(() => { + this.__onPatchRace(() => { + for (const task of undoTasks) task(); + }); + return new Redo(() => { + const undo = this.__onPatchRace(() => { + // // TODO: This line needs to be changed: + // const redoPatch = Patch.fromBinary(patch.toBinary()); + // this.log.end.api.builder.patch = redoPatch; + // return this.createUndo(redoPatch); + }); + return undo!; + }); + }); + return undo; + } +} diff --git a/src/json-crdt/history/UndoRedoStack.ts b/src/json-crdt-repo/UndoRedoStack.ts similarity index 100% rename from src/json-crdt/history/UndoRedoStack.ts rename to src/json-crdt-repo/UndoRedoStack.ts diff --git a/src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts b/src/json-crdt-repo/__tests__/LocalHistoryCrud.spec.ts similarity index 60% rename from src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts rename to src/json-crdt-repo/__tests__/LocalHistoryCrud.spec.ts index cf6443657d..234b206db3 100644 --- a/src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts +++ b/src/json-crdt-repo/__tests__/LocalHistoryCrud.spec.ts @@ -2,8 +2,8 @@ import {memfs} from 'memfs'; import {NodeCrud} from 'memfs/lib/node-to-crud'; import {Locks} from 'thingies/es2020/Locks'; import {LocalHistoryCrud} from '../LocalHistoryCrud'; -import {Log} from '../../log/Log'; -import {Model} from '../../model'; +import {Log} from '../../json-crdt/log/Log'; +import {Model} from '../../json-crdt/model'; const setup = async () => { const {fs, vol} = memfs(); @@ -60,3 +60,37 @@ test('can delete a document', async () => { expect((err as Error).message).toBe(`Collection /test/${id} does not exist`); } }); + +test('can update document', async () => { + const {local} = await setup(); + const model = Model.withLogicalClock(); + model.api.root({ + foo: 'spam', + }); + const log = Log.fromNewModel(model); + const {id} = await local.create(['test'], log); + const {log: log2} = await local.read(['test'], id); + log2.end.api.obj([]).set({ + bar: 'eggs', + }); + const patch = log2.end.api.flush(); + await local.update(['test'], id, [patch]); + const {log: log3} = await local.read(['test'], id); + expect(log3.end.view()).toStrictEqual({ + foo: 'spam', + bar: 'eggs', + }); +}); + +test('can delete document', async () => { + const {local} = await setup(); + const model = Model.withLogicalClock(); + model.api.root({ + foo: 'spam', + }); + const log = Log.fromNewModel(model); + const {id} = await local.create(['test'], log); + await local.read(['test'], id); + await local.delete(['test'], id); + expect(() => local.read(['test'], id)).rejects.toThrow(`Collection /test/${id} does not exist`); +}); diff --git a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts new file mode 100644 index 0000000000..f3ed21b557 --- /dev/null +++ b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts @@ -0,0 +1,100 @@ +import type {CallerToMethods, TypedRpcClient} from '../../reactive-rpc/common'; +import type {JsonJoyDemoRpcCaller} from '../../server'; +import type {RemoteHistory, RemoteModel, RemotePatch} from './types'; + +type Methods = CallerToMethods; + +export type Cursor = number; + +export interface RemoteServerModel extends RemoteModel { + seq: number; + created: number; + updated: number; +} + +export interface RemoteServerPatch extends RemotePatch { + seq: number; +} + +export class RemoteHistoryDemoServer implements RemoteHistory { + constructor(protected readonly client: TypedRpcClient) {} + + public async create(id: string, patches: RemotePatch[]): Promise { + await this.client.call('block.new', { + id, + patches: patches.map((patch) => ({ + blob: patch.blob, + })), + }); + } + + /** + * Load latest state of the model, and any unmerged "tip" of patches + * it might have. + */ + public async read(id: string): Promise<{cursor: Cursor; model: RemoteServerModel; patches: RemoteServerPatch[]}> { + const {model, patches} = await this.client.call('block.get', {id}); + return { + cursor: model.seq, + model, + patches: [], + }; + } + + public async scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; patches: RemoteServerPatch[]}> { + const limit = 100; + const res = await this.client.call('block.scan', { + id, + seq: cursor, + limit: cursor + limit, + }); + if (res.patches.length === 0) { + return { + cursor, + patches: [], + }; + } + return { + cursor: res.patches[res.patches.length - 1].seq, + patches: res.patches, + }; + } + + public async scanBwd( + id: string, + cursor: Cursor, + ): Promise<{cursor: Cursor; model: RemoteServerModel; patches: RemoteServerPatch[]}> { + throw new Error('The "blocks.history" should be able to return starting model.'); + } + + public async update( + id: string, + cursor: Cursor, + patches: RemotePatch[], + ): Promise<{cursor: Cursor; patches: RemoteServerPatch[]}> { + const res = await this.client.call('block.upd', { + id, + patches: patches.map((patch, seq) => ({ + seq, + created: Date.now(), + blob: patch.blob, + })), + }); + return { + cursor: res.patches.length ? res.patches[res.patches.length - 1].seq : cursor, + patches: res.patches, + }; + } + + public async delete(id: string): Promise { + await this.client.call('block.del', {id}); + } + + /** + * Subscribe to the latest changes to the model. + * @param callback + */ + public listen(id: string, cursor: Cursor, callback: (changes: RemoteServerPatch[]) => void): void { + throw new Error('Method not implemented.'); + } +} diff --git a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts new file mode 100644 index 0000000000..31b85a7b14 --- /dev/null +++ b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts @@ -0,0 +1,36 @@ +import {Model} from '../../../json-crdt/model'; +import {buildE2eClient} from '../../../reactive-rpc/common/testing/buildE2eClient'; +import {createCaller} from '../../../server/routes/index'; +import {RemoteHistoryDemoServer} from '../RemoteHistoryDemoServer'; + +const setup = () => { + const {caller, router} = createCaller(); + const {client} = buildE2eClient(caller); + const remote = new RemoteHistoryDemoServer(client); + + return { + router, + caller, + client, + remote, + }; +}; + +let cnt = 0; +const genId = () => Math.random().toString(36).slice(2) + '-' + Date.now().toString(36) + '-' + cnt++; + +describe('.create()', () => { + test('can create a block with a simple patch', async () => { + const {remote, caller} = await setup(); + const model = Model.withLogicalClock(); + model.api.root({foo: 'bar'}); + const patch = model.api.flush(); + const blob = patch.toBinary(); + const id = genId(); + await remote.create(id, [{blob}]); + const {data} = await caller.call('block.get', {id}, {}); + // console.log(data.patches); + const model2 = Model.fromBinary(data.model.blob); + expect(model2.view()).toEqual({foo: 'bar'}); + }); +}); diff --git a/src/json-crdt-repo/remote/types.ts b/src/json-crdt-repo/remote/types.ts new file mode 100644 index 0000000000..2f76a1037a --- /dev/null +++ b/src/json-crdt-repo/remote/types.ts @@ -0,0 +1,38 @@ +/** + * A history of patches that have been applied to a model, stored on the + * "remote": (1) server; (2) content addressable storage; or (3) somewhere in a + * peer-to-peer network. + */ +export interface RemoteHistory { + create(id: string, patches: RemotePatch[]): Promise; + + /** + * Load latest state of the model, and any unmerged "tip" of patches + * it might have. + * + * @todo Maybe `state` and `tip` should be serialized to JSON? + */ + read(id: string): Promise<{cursor: Cursor; model: M; patches: P[]}>; + + scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; patches: P[]}>; + + scanBwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; model: M; patches: P[]}>; + + update(id: string, cursor: Cursor, patches: RemotePatch[]): Promise<{cursor: Cursor; patches: P[]}>; + + delete?(id: string): Promise; + + /** + * Subscribe to the latest changes to the model. + * @param callback + */ + listen(id: string, cursor: Cursor, callback: (patches: P[]) => void): void; +} + +export interface RemoteModel { + blob: Uint8Array; +} + +export interface RemotePatch { + blob: Uint8Array; +} diff --git a/src/json-crdt-repo/types.ts b/src/json-crdt-repo/types.ts new file mode 100644 index 0000000000..fa8a7a576f --- /dev/null +++ b/src/json-crdt-repo/types.ts @@ -0,0 +1,18 @@ +import type {Patch} from '../json-crdt-patch'; +import type {Log} from '../json-crdt/log/Log'; +import type {Model} from '../json-crdt/model'; + +export interface LocalHistory { + create(collection: string[], log: Log): Promise<{id: string}>; + read(collection: string[], id: string): Promise<{log: Log; cursor: string}>; + readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}>; + update(collection: string[], id: string, patches: Patch[]): Promise; + delete(collection: string[], id: string): Promise; +} + +export interface EditingSessionHistory { + load(id: string): Promise; + loadHistory(id: string): Promise; + undo(id: string): Promise; + redo(id: string): Promise; +} diff --git a/src/json-crdt/history/SessionHistory.ts b/src/json-crdt/history/SessionHistory.ts deleted file mode 100644 index 16622973ba..0000000000 --- a/src/json-crdt/history/SessionHistory.ts +++ /dev/null @@ -1,72 +0,0 @@ -import {createRace} from 'thingies/es2020/createRace'; -import {FanOutUnsubscribe} from 'thingies/es2020/fanout'; -import {InsValOp, Patch} from '../../json-crdt-patch'; -import {ValNode} from '../nodes'; -import {toSchema} from '../schema/toSchema'; -import {Log} from '../log/Log'; -import {RedoItem, UndoItem, UndoRedoStack} from './UndoRedoStack'; - -class Undo implements UndoItem { - constructor(public readonly undo: () => Redo) {} -} - -class Redo implements RedoItem { - constructor(public readonly redo: () => Undo) {} -} - -export class SessionHistory { - constructor(public readonly log: Log) {} - - private readonly __onPatchRace = createRace(); - - public attachUndoRedo(stack: UndoRedoStack): FanOutUnsubscribe { - const onBeforePatch = (patch: Patch) => { - this.__onPatchRace(() => { - const undo = this.createUndo(patch); - stack.push(undo); - }); - }; - const unsubscribe = this.log.end.api.onBeforePatch.listen(onBeforePatch); - return unsubscribe; - } - - public createUndo(patch: Patch): Undo { - const undoTasks: Array<() => void> = []; - const ops = patch.ops; - const length = ops.length; - for (let i = length - 1; i >= 0; i--) { - const op = ops[i]; - switch (op.name()) { - case 'ins_val': { - const insOp = op as InsValOp; - const valNode = this.log.end.index.get(insOp.obj); - if (!(valNode instanceof ValNode)) throw new Error('INVALID_NODE'); - const copy = toSchema(valNode.node()); - const valNodeId = valNode.id; - const task = () => { - const end = this.log.end; - const valNode = end.index.get(valNodeId); - if (!valNode) return; - end.api.wrap(valNode).asVal().set(copy); - }; - undoTasks.push(task); - } - } - } - const undo = new Undo(() => { - this.__onPatchRace(() => { - for (const task of undoTasks) task(); - }); - return new Redo(() => { - const undo = this.__onPatchRace(() => { - // TODO: This line needs to be changed: - const redoPatch = Patch.fromBinary(patch.toBinary()); - this.log.end.api.builder.patch = redoPatch; - return this.createUndo(redoPatch); - }); - return undo!; - }); - }); - return undo; - } -} diff --git a/src/json-crdt/history/types.ts b/src/json-crdt/history/types.ts deleted file mode 100644 index 59fcce0054..0000000000 --- a/src/json-crdt/history/types.ts +++ /dev/null @@ -1,52 +0,0 @@ -import type {Patch} from '../../json-crdt-patch'; -import type {Log} from '../log/Log'; -import type {Model} from '../model'; - -/** - * A history of patches that have been applied to a model, stored on the - * "remote": (1) server; (2) content addressable storage; or (3) peer-to-peer - * network. - * - * Cases: - * - * - `RemoteHistoryServer` - * - `RemoteHistoryServerIdempotent` - * - `RemoteHistoryCAS` - * - `RemoteHistoryP2P` - */ -export interface RemoteHistory { - /** - * Load latest state of the model, and any unmerged "tip" of patches - * it might have. - * - * @todo Maybe `state` and `tip` should be serialized to JSON? - */ - loadLatest(id: string): Promise<[cursor: Cursor, state: Model]>; - - loadAfter(id: string, cursor: Cursor): Promise<[cursor: Cursor, tip: Patch[]]>; - - loadBefore(id: string, cursor: Cursor): Promise<[cursor: Cursor, state: Model, tip: Patch[]]>; - - apply(id: string, patches: Patch[]): Promise; - - /** - * Subscribe to the latest changes to the model. - * @param callback - */ - subscribe(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void; -} - -export interface LocalHistory { - create(collection: string[], log: Log): Promise<{id: string}>; - read(collection: string[], id: string): Promise<{log: Log; cursor: string}>; - readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}>; - update(collection: string[], id: string, patches: Patch[]): Promise; - delete(collection: string[], id: string): Promise; -} - -export interface EditingSessionHistory { - load(id: string): Promise; - loadHistory(id: string): Promise; - undo(id: string): Promise; - redo(id: string): Promise; -} diff --git a/src/json-crdt/model/Model.ts b/src/json-crdt/model/Model.ts index 6dbcd1872d..49409d3099 100644 --- a/src/json-crdt/model/Model.ts +++ b/src/json-crdt/model/Model.ts @@ -65,6 +65,25 @@ export class Model> implements Printable { return decoder.decode(data); } + /** + * Instantiates a model from a collection of patches. The patches are applied + * to the model in the order they are provided. The session ID of the model is + * set to the session ID of the first patch. + * + * @param patches A collection of initial patches to apply to the model. + * @returns A model with the patches applied. + */ + public static fromPatches(patches: Patch[]): Model { + const length = patches.length; + if (!length) throw new Error('NO_PATCHES'); + const first = patches[0]; + const sid = first.getId()!.sid; + if (!sid) throw new Error('NO_SID'); + const model = Model.withLogicalClock(sid); + model.applyBatch(patches); + return model; + } + /** * Root of the JSON document is implemented as Last Write Wins Register, * so that the JSON document does not necessarily need to be an object. The diff --git a/src/json-type-value/ObjectValue.ts b/src/json-type-value/ObjectValue.ts index 85127538b5..795b74e936 100644 --- a/src/json-type-value/ObjectValue.ts +++ b/src/json-type-value/ObjectValue.ts @@ -4,7 +4,7 @@ import {TypeSystem} from '../json-type/system/TypeSystem'; import type {ResolveType} from '../json-type'; import type * as classes from '../json-type/type'; import type * as ts from '../json-type/typescript/types'; -import {TypeBuilder} from '../json-type/type/TypeBuilder'; +import type {TypeBuilder} from '../json-type/type/TypeBuilder'; export type UnObjectType = T extends classes.ObjectType ? U : never; export type UnObjectValue = T extends ObjectValue ? U : never; diff --git a/src/json-type/type/classes/ObjectType.ts b/src/json-type/type/classes/ObjectType.ts index 38e22ece42..9029cc229b 100644 --- a/src/json-type/type/classes/ObjectType.ts +++ b/src/json-type/type/classes/ObjectType.ts @@ -136,6 +136,10 @@ export class ObjectType[] = ObjectFieldType< return this.fields.find((f) => f.key === key); } + public extend[]>(o: ObjectType): ObjectType<[...F, ...F2]> { + return new ObjectType([...this.fields, ...o.fields]); + } + public validateSchema(): void { const schema = this.getSchema(); validateTType(schema, 'obj'); diff --git a/src/reactive-rpc/common/index.ts b/src/reactive-rpc/common/index.ts index d988e9bd86..0859fd73d9 100644 --- a/src/reactive-rpc/common/index.ts +++ b/src/reactive-rpc/common/index.ts @@ -1,3 +1,4 @@ +export * from './types'; export * from './messages'; export * from './channel'; export * from './rpc'; diff --git a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts index b272370634..3941d4e3ad 100644 --- a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts +++ b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts @@ -53,8 +53,8 @@ export class RpcError extends Error implements IRpcError { return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message, undefined, originalError); } - public static badRequest(): RpcError { - return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, 'Bad Request'); + public static badRequest(message = 'Bad Request'): RpcError { + return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, message); } public static validation(message: string, meta?: unknown): RpcError { diff --git a/src/reactive-rpc/common/testing/buildE2eClient.ts b/src/reactive-rpc/common/testing/buildE2eClient.ts index 937a200fb4..0958b5d727 100644 --- a/src/reactive-rpc/common/testing/buildE2eClient.ts +++ b/src/reactive-rpc/common/testing/buildE2eClient.ts @@ -6,14 +6,8 @@ import {RpcCodecs} from '../codec/RpcCodecs'; import {RpcMessageCodecs} from '../codec/RpcMessageCodecs'; import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage} from '../messages'; import {RpcMessageStreamProcessor, StreamingRpcClient, TypedRpcClient} from '../rpc'; -import type {FunctionStreamingType, FunctionType} from '../../../json-type/type/classes'; -import type {Observable} from 'rxjs'; -import type {ResolveType} from '../../../json-type'; -import type {TypeRouter} from '../../../json-type/system/TypeRouter'; -import type {TypeRouterCaller} from '../rpc/caller/TypeRouterCaller'; import type {RpcCaller} from '../rpc/caller/RpcCaller'; -import type {ObjectValueCaller} from '../rpc/caller/ObjectValueCaller'; -import type {ObjectValue, ObjectValueToTypeMap, UnObjectType} from '../../../json-type-value/ObjectValue'; +import type {CallerToMethods} from '../types'; export interface BuildE2eClientOptions { /** @@ -67,7 +61,7 @@ export interface BuildE2eClientOptions { token?: string; } -export const buildE2eClient = >(caller: Caller, opt: BuildE2eClientOptions) => { +export const buildE2eClient = >(caller: Caller, opt: BuildE2eClientOptions = {}) => { const writer = opt.writer ?? new Writer(Fuzzer.randomInt2(opt.writerDefaultBufferKb ?? [4, 4]) * 1024); const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs()); const ctx = new ConnectionContext( @@ -103,21 +97,8 @@ export const buildE2eClient = >(caller: Caller, op bufferSize: Fuzzer.randomInt2(opt.clientBufferSize ?? [1, 1]), bufferTime: Fuzzer.randomInt2(opt.clientBufferTime ?? [0, 0]), }); - type Router = UnTypeRouterCaller; - type Routes = UnTypeRouter; - type Methods = {[K in keyof Routes]: UnwrapFunction}; - const typedClient = client as TypedRpcClient; + const typedClient = client as TypedRpcClient>; return { client: typedClient, }; }; - -type UnTypeRouterCaller = T extends TypeRouterCaller ? R : T extends ObjectValueCaller ? R : never; -type UnTypeRouter = - T extends TypeRouter ? R : T extends ObjectValue ? ObjectValueToTypeMap> : never; -type UnwrapFunction = - F extends FunctionType - ? (req: ResolveType) => Promise> - : F extends FunctionStreamingType - ? (req$: Observable>) => Observable> - : never; diff --git a/src/reactive-rpc/common/types.ts b/src/reactive-rpc/common/types.ts new file mode 100644 index 0000000000..b9ee65d320 --- /dev/null +++ b/src/reactive-rpc/common/types.ts @@ -0,0 +1,21 @@ +import type {Observable} from 'rxjs'; +import type {FunctionStreamingType, FunctionType, ResolveType} from '../../json-type'; +import type {ObjectValue, ObjectValueToTypeMap, UnObjectType} from '../../json-type-value/ObjectValue'; +import type {TypeRouter} from '../../json-type/system/TypeRouter'; +import type {ObjectValueCaller} from './rpc/caller/ObjectValueCaller'; +import type {RpcCaller} from './rpc/caller/RpcCaller'; +import type {TypeRouterCaller} from './rpc/caller/TypeRouterCaller'; + +export type CallerToMethods> = { + [K in keyof UnTypeRouter>]: UnwrapFunction>[K]>; +}; + +type UnTypeRouterCaller = T extends TypeRouterCaller ? R : T extends ObjectValueCaller ? R : never; +type UnTypeRouter = + T extends TypeRouter ? R : T extends ObjectValue ? ObjectValueToTypeMap> : never; +type UnwrapFunction = + F extends FunctionType + ? (req: ResolveType) => Promise> + : F extends FunctionStreamingType + ? (req$: Observable>) => Observable> + : never; diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/block.spec.ts similarity index 72% rename from src/server/__tests__/blocks.spec.ts rename to src/server/__tests__/block.spec.ts index a000e4639e..4a52c1760c 100644 --- a/src/server/__tests__/blocks.spec.ts +++ b/src/server/__tests__/block.spec.ts @@ -3,21 +3,21 @@ import {RpcErrorCodes} from '../../reactive-rpc/common/rpc/caller'; import {setup} from './setup'; import {tick, until} from '../../__tests__/util'; -describe('blocks.*', () => { - describe('blocks.create', () => { +describe('block.*', () => { + describe('block.new', () => { test('can create an empty block', async () => { const {call} = setup(); - await call('blocks.create', {id: 'my-block', patches: []}); - const {block} = await call('blocks.get', {id: 'my-block'}); - expect(block).toMatchObject({ + await call('block.new', {id: 'my-block', patches: []}); + const {model} = await call('block.get', {id: 'my-block'}); + expect(model).toMatchObject({ id: 'my-block', seq: -1, blob: expect.any(Uint8Array), created: expect.any(Number), updated: expect.any(Number), }); - const model = Model.fromBinary(block.blob); - expect(model.view()).toBe(undefined); + const model2 = Model.fromBinary(model.blob); + expect(model2.view()).toBe(undefined); }); test('can create a block with value', async () => { @@ -32,30 +32,26 @@ describe('blocks.*', () => { age: 26, }); const patch2 = model.api.flush(); - await call('blocks.create', { + await call('block.new', { id: '123412341234', patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, { - seq: 1, - created: Date.now(), blob: patch2.toBinary(), }, ], }); - const {block} = await call('blocks.get', {id: '123412341234'}); - expect(block).toMatchObject({ + const res = await call('block.get', {id: '123412341234'}); + expect(res.model).toMatchObject({ id: '123412341234', seq: 1, blob: expect.any(Uint8Array), created: expect.any(Number), updated: expect.any(Number), }); - const model2 = Model.fromBinary(block.blob); + const model2 = Model.fromBinary(res.model.blob); expect(model2.view()).toStrictEqual({ name: 'Super Woman', age: 26, @@ -63,15 +59,15 @@ describe('blocks.*', () => { }); }); - describe('blocks.remove', () => { + describe('block.remove', () => { test('can remove an existing block', async () => { const {call} = setup(); - await call('blocks.create', {id: 'my-block', patches: []}); - const {block} = await call('blocks.get', {id: 'my-block'}); - expect(block.id).toBe('my-block'); - await call('blocks.remove', {id: 'my-block'}); + await call('block.new', {id: 'my-block', patches: []}); + const {model} = await call('block.get', {id: 'my-block'}); + expect(model.id).toBe('my-block'); + await call('block.del', {id: 'my-block'}); try { - await call('blocks.get', {id: 'my-block'}); + await call('block.get', {id: 'my-block'}); throw new Error('not this error'); } catch (err: any) { expect(err.errno).toBe(RpcErrorCodes.NOT_FOUND); @@ -79,7 +75,7 @@ describe('blocks.*', () => { }); }); - describe('blocks.edit', () => { + describe('block.upd', () => { test('can edit a document sequentially', async () => { const {call} = setup(); const id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'; @@ -88,12 +84,10 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await call('blocks.create', { + await call('block.new', { id, patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], @@ -102,7 +96,7 @@ describe('blocks.*', () => { const patch2 = model.api.flush(); model.api.str(['text']).ins(5, ' World'); const patch3 = model.api.flush(); - await call('blocks.edit', { + await call('block.upd', { id, patches: [ { @@ -117,15 +111,15 @@ describe('blocks.*', () => { }, ], }); - const block2 = await call('blocks.get', {id}); - expect(Model.fromBinary(block2.block.blob).view()).toStrictEqual({ + const block2 = await call('block.get', {id}); + expect(Model.fromBinary(block2.model.blob).view()).toStrictEqual({ text: 'Hello World', }); model.api.str(['text']).del(5, 1).ins(5, ', '); const patch4 = model.api.flush(); model.api.str(['text']).ins(12, '!'); const patch5 = model.api.flush(); - await call('blocks.edit', { + await call('block.upd', { id, patches: [ { @@ -140,8 +134,8 @@ describe('blocks.*', () => { }, ], }); - const block3 = await call('blocks.get', {id}); - expect(Model.fromBinary(block3.block.blob).view()).toStrictEqual({ + const block3 = await call('block.get', {id}); + expect(Model.fromBinary(block3.model.blob).view()).toStrictEqual({ text: 'Hello, World!', }); }); @@ -156,23 +150,21 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await call('blocks.create', { + await call('block.new', { id, patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], }); // User 2 - const block2 = await call('blocks.get', {id}); - const model2 = Model.fromBinary(block2.block.blob).fork(); + const block2 = await call('block.get', {id}); + const model2 = Model.fromBinary(block2.model.blob).fork(); model2.api.str(['text']).ins(4, ' yeah!'); const patch2User2 = model2.api.flush(); - await call('blocks.edit', { + await call('block.upd', { id, patches: [ { @@ -184,8 +176,8 @@ describe('blocks.*', () => { }); expect(model2.view()).toStrictEqual({text: 'Hell yeah!'}); - const block3 = await call('blocks.get', {id}); - const model3 = Model.fromBinary(block3.block.blob).fork(); + const block3 = await call('block.get', {id}); + const model3 = Model.fromBinary(block3.model.blob).fork(); expect(model3.view()).toStrictEqual({text: 'Hell yeah!'}); // User 1 @@ -193,7 +185,7 @@ describe('blocks.*', () => { const patch2 = model.api.flush(); model.api.str(['text']).ins(5, ' World'); const patch3 = model.api.flush(); - const {patches} = await call('blocks.edit', { + const {patches} = await call('block.upd', { id, patches: [ { @@ -209,8 +201,8 @@ describe('blocks.*', () => { ], }); - const block4 = await call('blocks.get', {id}); - const model4 = Model.fromBinary(block4.block.blob).fork(); + const block4 = await call('block.get', {id}); + const model4 = Model.fromBinary(block4.model.blob).fork(); expect(model4.view()).not.toStrictEqual({text: 'Hell yeah!'}); }); @@ -224,23 +216,21 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await call('blocks.create', { + await call('block.new', { id, patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], }); // User 2 - const block2 = await call('blocks.get', {id}); - const model2 = Model.fromBinary(block2.block.blob).fork(); + const block2 = await call('block.get', {id}); + const model2 = Model.fromBinary(block2.model.blob).fork(); model2.api.str(['text']).ins(4, ' yeah!'); const patch2User2 = model2.api.flush(); - await call('blocks.edit', { + await call('block.upd', { id, patches: [ { @@ -256,7 +246,7 @@ describe('blocks.*', () => { const patch2 = model.api.flush(); model.api.str(['text']).ins(5, ' World'); const patch3 = model.api.flush(); - const {patches} = await call('blocks.edit', { + const {patches} = await call('block.upd', { id, patches: [ { @@ -280,13 +270,13 @@ describe('blocks.*', () => { }); }); - describe('blocks.listen', () => { + describe('block.listen', () => { test('can listen for block changes', async () => { const {client} = setup(); - await client.call('blocks.create', {id: 'my-block', patches: []}); + await client.call('block.new', {id: 'my-block', patches: []}); await tick(11); const emits: any[] = []; - client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); + client.call$('block.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -294,34 +284,36 @@ describe('blocks.*', () => { const patch1 = model.api.flush(); await tick(12); expect(emits.length).toBe(0); - await client.call('blocks.edit', { + await client.call('block.upd', { id: 'my-block', patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}], }); await until(() => emits.length === 1); expect(emits.length).toBe(1); - expect(emits[0].patches.length).toBe(1); - expect(emits[0].patches[0].seq).toBe(0); + expect(emits[0][0]).toBe('upd'); + expect(emits[0][1].patches.length).toBe(1); + expect(emits[0][1].patches[0].seq).toBe(0); model.api.root({ text: 'Hello', }); const patch2 = model.api.flush(); await tick(12); expect(emits.length).toBe(1); - await client.call('blocks.edit', { + await client.call('block.upd', { id: 'my-block', patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}], }); await until(() => emits.length === 2); expect(emits.length).toBe(2); - expect(emits[1].patches.length).toBe(1); - expect(emits[1].patches[0].seq).toBe(1); + expect(emits[1][0]).toBe('upd'); + expect(emits[1][1].patches.length).toBe(1); + expect(emits[1][1].patches[0].seq).toBe(1); }); test('can subscribe before block is created', async () => { const {client} = setup(); const emits: any[] = []; - client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); + client.call$('block.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -329,38 +321,39 @@ describe('blocks.*', () => { const patch1 = model.api.flush(); await tick(12); expect(emits.length).toBe(0); - await client.call('blocks.create', { + await client.call('block.new', { id: 'my-block', patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], }); await until(() => emits.length === 1); expect(emits.length).toBe(1); - expect(emits[0].patches.length).toBe(1); - expect(emits[0].patches[0].seq).toBe(0); - expect(emits[0].patches[0].blob).toStrictEqual(patch1.toBinary()); + expect(emits[0][0]).toBe('upd'); + expect(emits[0][1].patches.length).toBe(1); + expect(emits[0][1].patches[0].seq).toBe(0); + expect(emits[0][1].patches[0].blob).toStrictEqual(patch1.toBinary()); }); test('can receive deletion events', async () => { const {client} = setup(); const emits: any[] = []; - client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); - await client.call('blocks.create', {id: 'my-block', patches: []}); + client.call$('block.listen', {id: 'my-block'}).subscribe((data) => { + emits.push(data); + }); + await client.call('block.new', {id: 'my-block', patches: []}); await until(() => emits.length === 1); - expect(emits[0].block.seq).toBe(-1); + expect(emits[0][1].model.seq).toBe(-1); await tick(3); - await client.call('blocks.remove', {id: 'my-block'}); + await client.call('block.del', {id: 'my-block'}); await until(() => emits.length === 2); - expect(emits[1].deleted).toBe(true); + expect(emits[1][0]).toBe('del'); }); }); - describe('blocks.history', () => { + describe('block.scan', () => { test('can retrieve change history', async () => { const {client} = setup(); const model = Model.withLogicalClock(); @@ -368,12 +361,10 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await client.call('blocks.create', { + await client.call('block.new', { id: 'my-block', patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], @@ -385,7 +376,7 @@ describe('blocks.*', () => { age: 26, }); const patch3 = model.api.flush(); - await client.call('blocks.edit', { + await client.call('block.upd', { id: 'my-block', patches: [ { @@ -400,7 +391,7 @@ describe('blocks.*', () => { }, ], }); - const history = await client.call('blocks.history', {id: 'my-block', min: 0, max: 2}); + const history = await client.call('block.scan', {id: 'my-block', seq: 0, limit: 3}); expect(history).toMatchObject({ patches: [ { @@ -423,7 +414,7 @@ describe('blocks.*', () => { }); }); - describe('blocks.get', () => { + describe('block.get', () => { test('returns whole history when block is loaded', async () => { const {client} = setup(); const model = Model.withLogicalClock(); @@ -431,12 +422,10 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await client.call('blocks.create', { + await client.call('block.new', { id: 'my-block', patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], @@ -447,7 +436,7 @@ describe('blocks.*', () => { age: 26, }); const patch3 = model.api.flush(); - await client.call('blocks.edit', { + await client.call('block.upd', { id: 'my-block', patches: [ { @@ -462,9 +451,15 @@ describe('blocks.*', () => { }, ], }); - const result = await client.call('blocks.get', {id: 'my-block'}); + const result = await client.call('block.get', {id: 'my-block', history: true}); expect(result).toMatchObject({ - block: expect.any(Object), + model: { + id: 'my-block', + seq: 2, + blob: expect.any(Uint8Array), + created: expect.any(Number), + updated: expect.any(Number), + }, patches: [ { seq: 0, diff --git a/src/server/index.ts b/src/server/index.ts index 0aef51667e..dfe587f508 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -7,6 +7,8 @@ import {Services} from './services/Services'; import type {MyCtx} from './services/types'; import {RpcServer} from '../reactive-rpc/server/http1/RpcServer'; +export type JsonJoyDemoRpcCaller = ReturnType['caller']; + const app = new RpcApp({ uws: App({}), caller: createCaller(new Services()).caller, diff --git a/src/server/routes/block/index.ts b/src/server/routes/block/index.ts new file mode 100644 index 0000000000..cda905e8fa --- /dev/null +++ b/src/server/routes/block/index.ts @@ -0,0 +1,42 @@ +import {new_} from './methods/new'; +import {get} from './methods/get'; +import {upd} from './methods/upd'; +import {del} from './methods/del'; +import {scan} from './methods/scan'; +import {listen} from './methods/listen'; +import { + Block, + BlockPartial, + BlockPartialReturn, + BlockId, + BlockPatch, + BlockPatchPartial, + BlockPatchPartialReturn, + BlockSeq, +} from './schema'; +import type {RouteDeps, Router, RouterBase} from '../types'; + +export const block = + (d: RouteDeps) => + (r: Router) => { + const {system} = d; + + system.alias('BlockId', BlockId); + system.alias('BlockSeq', BlockSeq); + system.alias('Block', Block); + system.alias('BlockPartial', BlockPartial); + system.alias('BlockPartialReturn', BlockPartialReturn); + system.alias('BlockPatch', BlockPatch); + system.alias('BlockPatchPartial', BlockPatchPartial); + system.alias('BlockPatchPartialReturn', BlockPatchPartialReturn); + + // prettier-ignore + return ( + ( new_(d) + ( get(d) + ( upd(d) + ( del(d) + ( listen(d) + ( scan(d) + ( r )))))))); + }; diff --git a/src/server/routes/blocks/methods/remove.ts b/src/server/routes/block/methods/del.ts similarity index 88% rename from src/server/routes/blocks/methods/remove.ts rename to src/server/routes/block/methods/del.ts index ae3400d397..f3e28617c8 100644 --- a/src/server/routes/blocks/methods/remove.ts +++ b/src/server/routes/block/methods/del.ts @@ -1,7 +1,7 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId} from '../schema'; -export const remove = +export const del = ({t, services}: RouteDeps) => (r: Router) => { const Request = t.Object( @@ -19,7 +19,7 @@ export const remove = description: 'Fetches a block by ID.', }); - return r.prop('blocks.remove', Func, async ({id}) => { + return r.prop('block.del', Func, async ({id}) => { await services.blocks.remove(id); return {}; }); diff --git a/src/server/routes/blocks/methods/get.ts b/src/server/routes/block/methods/get.ts similarity index 50% rename from src/server/routes/blocks/methods/get.ts rename to src/server/routes/block/methods/get.ts index 7ff48d2d8a..2135db2748 100644 --- a/src/server/routes/blocks/methods/get.ts +++ b/src/server/routes/block/methods/get.ts @@ -1,3 +1,4 @@ +import {ResolveType} from '../../../../json-type'; import type {RouteDeps, Router, RouterBase} from '../../types'; import type {Block, BlockId, BlockPatch} from '../schema'; @@ -9,11 +10,15 @@ export const get = title: 'Block ID', description: 'The ID of the block to retrieve.', }), + t.propOpt('history', t.bool).options({ + title: 'With History', + description: 'Whether to include the full history of patches in the response. Defaults to `false`.', + }), ); const Response = t.Object( - t.prop('block', t.Ref('Block').options({})), - t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ + t.prop('model', t.Ref('Block')), + t.propOpt('patches', t.Array(t.Ref('BlockPatch'))).options({ title: 'Patches', description: 'The list of all patches.', }), @@ -25,11 +30,13 @@ export const get = description: 'Fetches a block by ID.', }); - return r.prop('blocks.get', Func, async ({id}) => { - const {block, patches} = await services.blocks.get(id); - return { - block, - patches, - }; + return r.prop('block.get', Func, async ({id, history}) => { + const {model} = await services.blocks.get(id); + const response: ResolveType = {model}; + if (history) { + const {patches} = await services.blocks.scan(id, 0, model.seq); + response.patches = patches; + } + return response; }); }; diff --git a/src/server/routes/blocks/methods/listen.ts b/src/server/routes/block/methods/listen.ts similarity index 51% rename from src/server/routes/blocks/methods/listen.ts rename to src/server/routes/block/methods/listen.ts index f9272228af..bd3f9c8405 100644 --- a/src/server/routes/blocks/methods/listen.ts +++ b/src/server/routes/block/methods/listen.ts @@ -1,12 +1,10 @@ -import {switchMap} from 'rxjs'; +import {switchMap, tap} from 'rxjs'; import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId, BlockPatch, Block} from '../schema'; export const listen = ({t, services}: RouteDeps) => (r: Router) => { - const PatchType = t.Ref('BlockPatch'); - const Request = t.Object( t.prop('id', t.Ref('BlockId')).options({ title: 'Block ID', @@ -14,19 +12,21 @@ export const listen = }), ); - const Response = t.Object( - t.propOpt('deleted', t.Boolean()).options({ - title: 'Deleted', - description: 'Emitted only when the block is deleted.', - }), - t.propOpt('block', t.Ref('Block')).options({ - title: 'Block', - description: 'The whole block object, emitted only when the block is created.', - }), - t.propOpt('patches', t.Array(PatchType)).options({ - title: 'Latest patches', - description: 'Patches that have been applied to the block.', - }), + const Response = t.Or( + t.Tuple(t.Const('del')), + t.Tuple( + t.Const('upd'), + t.Object( + t.propOpt('model', t.Ref('Block')).options({ + title: 'Block', + description: 'The whole block object, emitted only when the block is created.', + }), + t.propOpt('patches', t.Array(t.Ref('BlockPatch'))).options({ + title: 'Latest Patches', + description: 'Patches that have been applied to the block.', + }), + ), + ), ); const Func = t.Function$(Request, Response).options({ @@ -34,7 +34,7 @@ export const listen = description: 'Subscribe to a block to receive updates when it changes.', }); - return r.prop('blocks.listen', Func, (req$) => { + return r.prop('block.listen', Func, (req$) => { return req$.pipe(switchMap(({id}) => services.pubsub.listen$(`__block:${id}`))) as any; }); }; diff --git a/src/server/routes/blocks/methods/create.ts b/src/server/routes/block/methods/new.ts similarity index 51% rename from src/server/routes/blocks/methods/create.ts rename to src/server/routes/block/methods/new.ts index 2f7dec6121..b270460417 100644 --- a/src/server/routes/blocks/methods/create.ts +++ b/src/server/routes/block/methods/new.ts @@ -1,7 +1,7 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; -import type {BlockId, BlockPatch} from '../schema'; +import type {Block, BlockId, BlockPatchPartial, BlockPatchPartialReturn} from '../schema'; -export const create = +export const new_ = ({t, services}: RouteDeps) => (r: Router) => { const Request = t.Object( @@ -9,13 +9,19 @@ export const create = title: 'New block ID', description: 'The ID of the new block.', }), - t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ + t.prop('patches', t.Array(t.Ref('BlockPatchPartial'))).options({ title: 'Patches', description: 'The patches to apply to the document.', }), ); - const Response = t.obj; + const Response = t.Object( + t.prop('model', t.Ref('Block')), + t.prop('patches', t.Array(t.Ref('BlockPatchPartialReturn'))).options({ + title: 'Patches', + description: 'The list of all patches.', + }), + ); const Func = t.Function(Request, Response).options({ title: 'Create Block', @@ -23,8 +29,8 @@ export const create = description: 'Creates a new block or applies patches to it.', }); - return r.prop('blocks.create', Func, async ({id, patches}) => { - const {block} = await services.blocks.create(id, patches); - return {}; + return r.prop('block.new', Func, async ({id, patches}) => { + const res = await services.blocks.create(id, patches); + return res; }); }; diff --git a/src/server/routes/block/methods/scan.ts b/src/server/routes/block/methods/scan.ts new file mode 100644 index 0000000000..ce5080199f --- /dev/null +++ b/src/server/routes/block/methods/scan.ts @@ -0,0 +1,50 @@ +import type {BlockPatch, BlockId} from '../schema'; +import type {RouteDeps, Router, RouterBase} from '../../types'; + +export const scan = + ({t, services}: RouteDeps) => + (r: Router) => { + const Request = t.Object( + t.prop('id', t.Ref('BlockId')).options({ + title: 'Block ID', + description: 'The ID of the block.', + }), + t.propOpt('seq', t.num.options({format: 'u32'})).options({ + title: 'Starting Sequence Number', + description: 'The sequence number to start from. Defaults to the latest sequence number.', + }), + t.propOpt('limit', t.num.options({format: 'u32'})).options({ + title: 'Number of Patches', + description: + 'The minimum number of patches to return. Defaults to 10. ' + + 'When positive, returns the patches ahead of the starting sequence number. ' + + 'When negative, returns the patches behind the starting sequence number.', + }), + t.propOpt('model', t.bool).options({ + title: 'With Model', + description: + 'Whether to include the model in the response. ' + + 'Defaults to `false`, when `len` is positive; and, defaults to `true`, when `len` is negative.', + }), + ); + + const Response = t.Object( + t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ + title: 'Patches', + description: 'The list of patches.', + }), + t.propOpt('modelBlob', t.bin), + ); + + const Func = t.Function(Request, Response).options({ + title: 'Block History', + intro: 'Fetch block history.', + description: 'Returns a list of specified change patches for a block.', + }); + + return r.prop('block.scan', Func, async ({id, seq, limit = 10, model: returnModel = limit > 0}) => { + const {patches, model} = await services.blocks.scan(id, seq, limit, returnModel); + const modelBlob: Uint8Array | undefined = model?.toBinary(); + return {patches, modelBlob}; + }); + }; diff --git a/src/server/routes/blocks/methods/edit.ts b/src/server/routes/block/methods/upd.ts similarity index 95% rename from src/server/routes/blocks/methods/edit.ts rename to src/server/routes/block/methods/upd.ts index cea15b0ebd..987854f5f5 100644 --- a/src/server/routes/blocks/methods/edit.ts +++ b/src/server/routes/block/methods/upd.ts @@ -1,7 +1,7 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId, BlockPatch} from '../schema'; -export const edit = +export const upd = ({t, services}: RouteDeps) => (r: Router) => { const PatchType = t.Ref('BlockPatch'); @@ -38,7 +38,7 @@ export const edit = description: 'Applies patches to an existing document and returns the latest concurrent changes.', }); - return r.prop('blocks.edit', Func, async ({id, patches}) => { + return r.prop('block.upd', Func, async ({id, patches}) => { const res = await services.blocks.edit(id, patches); return { patches: res.patches, diff --git a/src/server/routes/block/schema.ts b/src/server/routes/block/schema.ts new file mode 100644 index 0000000000..6d12c3eaf1 --- /dev/null +++ b/src/server/routes/block/schema.ts @@ -0,0 +1,60 @@ +import {type ResolveType} from '../../../json-type'; +import {t} from '../system'; + +export type TBlockId = ResolveType; +export const BlockId = t.str.options({ + title: 'Block ID', + min: 6, + max: 40, +}); + +export type TBlockSeq = ResolveType; +export const BlockSeq = t.num.options({ + title: 'Block Sequence Number', + gte: 0, + format: 'i32', +}); + +export type TBlock = ResolveType; + +// prettier-ignore +export const BlockPartial = t.Object( + t.prop('blob', t.bin), +); + +export const BlockPartialReturn = t.Object( + t.prop('id', t.Ref('BlockId')), + t.prop('seq', t.Ref('BlockSeq')), + t.prop('created', t.num), + t.prop('updated', t.num), +); + +export const Block = BlockPartial.extend(BlockPartialReturn); + +export type TBlockPatch = ResolveType; + +// prettier-ignore +export const BlockPatchPartial = t.Object( + t.prop('blob', t.bin).options({ + title: 'Patch Blob', + description: 'The binary data of the patch. The format of the data is defined by the patch type.', + }), +); + +// prettier-ignore +export const BlockPatchPartialReturn = t.Object( + t.prop('seq', t.num).options({ + title: 'Patch Sequence Number', + description: 'The sequence number of the patch in the block. A monotonically increasing integer, starting from 0.', + }), + t.prop('created', t.num).options({ + title: 'Patch Creation Time', + description: 'The time when the patch was created, in milliseconds since the Unix epoch.' + + '\n\n' + + 'This time is set by the server when the patch received and stored on the server. If you ' + + 'want to also store the time when the patch was created by the user, you can include this ' + + 'information in the patch blob itself.', + }), +); + +export const BlockPatch = BlockPatchPartial.extend(BlockPatchPartialReturn); diff --git a/src/server/routes/blocks/index.ts b/src/server/routes/blocks/index.ts deleted file mode 100644 index b219a7ce78..0000000000 --- a/src/server/routes/blocks/index.ts +++ /dev/null @@ -1,29 +0,0 @@ -import {create} from './methods/create'; -import {get} from './methods/get'; -import {remove} from './methods/remove'; -import {edit} from './methods/edit'; -import {listen} from './methods/listen'; -import {Block, BlockId, BlockPatch, BlockSeq} from './schema'; -import {history} from './methods/history'; -import type {RouteDeps, Router, RouterBase} from '../types'; - -export const blocks = - (d: RouteDeps) => - (r: Router) => { - const {system} = d; - - system.alias('BlockId', BlockId); - system.alias('BlockSeq', BlockSeq); - system.alias('Block', Block); - system.alias('BlockPatch', BlockPatch); - - // prettier-ignore - return ( - ( create(d) - ( get(d) - ( remove(d) - ( edit(d) - ( listen(d) - ( history(d) - ( r )))))))); - }; diff --git a/src/server/routes/blocks/methods/history.ts b/src/server/routes/blocks/methods/history.ts deleted file mode 100644 index ea71ef4f2e..0000000000 --- a/src/server/routes/blocks/methods/history.ts +++ /dev/null @@ -1,39 +0,0 @@ -import type {BlockPatch, BlockId} from '../schema'; -import type {RouteDeps, Router, RouterBase} from '../../types'; - -export const history = - ({t, services}: RouteDeps) => - (r: Router) => { - const Request = t.Object( - t.prop('id', t.Ref('BlockId')).options({ - title: 'Block ID', - description: 'The ID of the block.', - }), - t.prop('max', t.num.options({format: 'u32'})).options({ - title: 'Max', - description: 'The maximum sequence number to return.', - }), - t.prop('min', t.num.options({format: 'u32'})).options({ - title: 'Min', - description: 'The minimum sequence number to return.', - }), - ); - - const Response = t.Object( - t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ - title: 'Patches', - description: 'The list of patches.', - }), - ); - - const Func = t.Function(Request, Response).options({ - title: 'Block History', - intro: 'Fetch block history.', - description: 'Returns a list of specified change patches for a block.', - }); - - return r.prop('blocks.history', Func, async ({id, min, max}) => { - const {patches} = await services.blocks.history(id, min, max); - return {patches}; - }); - }; diff --git a/src/server/routes/blocks/schema.ts b/src/server/routes/blocks/schema.ts deleted file mode 100644 index 3f9e09f8b4..0000000000 --- a/src/server/routes/blocks/schema.ts +++ /dev/null @@ -1,28 +0,0 @@ -import {type ResolveType} from '../../../json-type'; -import {t} from '../system'; - -export type TBlockId = ResolveType; -export const BlockId = t.str.options({ - title: 'Block ID', - min: 6, - max: 40, -}); - -export type TBlockSeq = ResolveType; -export const BlockSeq = t.num.options({ - title: 'Block Sequence Number', - gte: 0, - format: 'i32', -}); - -export type TBlock = ResolveType; -export const Block = t.Object( - t.prop('id', t.Ref('BlockId')), - t.prop('seq', t.Ref('BlockSeq')), - t.prop('created', t.num), - t.prop('updated', t.num), - t.prop('blob', t.bin), -); - -export type TBlockPatch = ResolveType; -export const BlockPatch = t.Object(t.prop('seq', t.num), t.prop('created', t.num), t.prop('blob', t.bin)); diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts index 8d51c1f401..812c108169 100644 --- a/src/server/routes/index.ts +++ b/src/server/routes/index.ts @@ -4,7 +4,7 @@ import {RpcValue} from '../../reactive-rpc/common/messages/Value'; import {ObjectValueCaller} from '../../reactive-rpc/common/rpc/caller/ObjectValueCaller'; import {system} from './system'; import {ObjectValue} from '../../json-type-value/ObjectValue'; -import type {Services} from '../services/Services'; +import {Services} from '../services/Services'; import type {RouteDeps} from './types'; export const createRouter = (services: Services) => { @@ -18,7 +18,7 @@ export const createRouter = (services: Services) => { return routes(deps)(router); }; -export const createCaller = (services: Services) => { +export const createCaller = (services: Services = new Services()) => { const router = createRouter(services); const caller = new ObjectValueCaller({ router, diff --git a/src/server/routes/routes.ts b/src/server/routes/routes.ts index 3459273faf..2d5733147c 100644 --- a/src/server/routes/routes.ts +++ b/src/server/routes/routes.ts @@ -1,7 +1,7 @@ import {util} from './util'; import {pubsub} from './pubsub'; import {presence} from './presence'; -import {blocks} from './blocks'; +import {block} from './block'; import type {RouteDeps} from './types'; import type {ObjectValue} from '../../json-type-value/ObjectValue'; import type {ObjectType} from '../../json-type'; @@ -11,5 +11,6 @@ export const routes = (d: RouteDeps) => >(r: ObjectVal ( util(d) ( pubsub(d) ( presence(d) - ( blocks(d) + // TODO: rename "blocks" to "block", in all methods. + ( block(d) ( r ))))); diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 22f139dc03..dad9f11715 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -1,14 +1,16 @@ import {MemoryStore} from './MemoryStore'; -import {StorePatch} from './types'; import {RpcError, RpcErrorCodes} from '../../../reactive-rpc/common/rpc/caller'; +import {Model, Patch} from '../../../json-crdt'; +import {SESSION} from '../../../json-crdt-patch/constants'; +import type {StoreModel, StorePatch} from './types'; import type {Services} from '../Services'; -const BLOCK_TTL = 1000 * 60 * 60; // 1 hour +const BLOCK_TTL = 1000 * 60 * 30; // 30 minutes -const validatePatches = (patches: StorePatch[]) => { +const validatePatches = (patches: Pick[]) => { for (const patch of patches) { if (patch.blob.length > 2000) throw RpcError.validation('patch blob too large'); - if (patch.seq > 500_000) throw RpcError.validation('patch seq too large'); + // if (patch.seq > 500_000) throw RpcError.validation('patch seq too large'); } }; @@ -17,43 +19,107 @@ export class BlocksServices { constructor(protected readonly services: Services) {} - public async create(id: string, patches: StorePatch[]) { + public async create(id: string, partialPatches: Pick[]) { this.maybeGc(); - const {store} = this; - validatePatches(patches); - const {block} = await store.create(id, patches); - const data = { - block, + validatePatches(partialPatches); + if (!Array.isArray(partialPatches)) throw new Error('INVALID_PATCHES'); + const length = partialPatches.length; + const now = Date.now(); + if (!length) { + const rawModel = Model.withLogicalClock(SESSION.GLOBAL); + const model: StoreModel = { + id, + seq: -1, + blob: rawModel.toBinary(), + created: now, + updated: now, + }; + return await this.__create(id, model, []); + } + const rawPatches: Patch[] = []; + const patches: StorePatch[] = []; + let seq = 0; + for (; seq < length; seq++) { + const blob = partialPatches[seq].blob; + rawPatches.push(Patch.fromBinary(blob)); + patches.push({seq, created: now, blob}); + } + const rawModel = Model.fromPatches(rawPatches); + const model: StoreModel = { + id, + seq: seq - 1, + blob: rawModel.toBinary(), + created: now, + updated: now, + }; + return await this.__create(id, model, patches); + } + + private async __create(id: string, model: StoreModel, patches: StorePatch[]) { + await this.store.create(id, model, patches); + this.__emitUpd(id, model, patches); + return { + model, patches, }; - this.services.pubsub.publish(`__block:${id}`, data).catch((error) => { + } + + private __emitUpd(id: string, model: StoreModel, patches: StorePatch[]) { + const msg = ['upd', {model, patches}]; + this.services.pubsub.publish(`__block:${id}`, msg).catch((error) => { // tslint:disable-next-line:no-console console.error('Error publishing block patches', error); }); - return {block}; } public async get(id: string) { const {store} = this; const result = await store.get(id); if (!result) throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND); - const patches = await store.history(id, 0, result.block.seq); - const {block} = result; - return {block, patches}; + const {model} = result; + return {model}; } public async remove(id: string) { await this.store.remove(id); - this.services.pubsub.publish(`__block:${id}`, {deleted: true}).catch((error) => { + const msg = ['del']; + this.services.pubsub.publish(`__block:${id}`, msg).catch((error) => { // tslint:disable-next-line:no-console console.error('Error publishing block deletion', error); }); } - public async history(id: string, min: number, max: number) { + public async scan( + id: string, + offset: number | undefined, + limit: number | undefined = 10, + returnStartModel: boolean = limit < 0, + ) { const {store} = this; + if (typeof offset !== 'number') offset = await store.seq(id); + let min: number = 0, + max: number = 0; + if (!limit || Math.round(limit) !== limit) throw RpcError.badRequest('INVALID_LIMIT'); + if (limit > 0) { + min = Number(offset) || 0; + max = min + limit; + } else { + max = Number(offset) || 0; + min = max - limit; + } + if (min < 0) { + min = 0; + max = Math.abs(limit); + } const patches = await store.history(id, min, max); - return {patches}; + let model: Model | undefined; + if (returnStartModel) { + const startPatches = await store.history(id, 0, min); + if (startPatches.length) { + model = Model.fromPatches(startPatches.map((p) => Patch.fromBinary(p.blob))); + } + } + return {patches, model}; } public async edit(id: string, patches: StorePatch[]) { @@ -63,19 +129,14 @@ export class BlocksServices { const seq = patches[0].seq; const {store} = this; validatePatches(patches); - const {block} = await store.edit(id, patches); - this.services.pubsub.publish(`__block:${id}`, {patches}).catch((error) => { - // tslint:disable-next-line:no-console - console.error('Error publishing block patches', error); - }); + const {model} = await store.edit(id, patches); + this.__emitUpd(id, model, patches); const expectedBlockSeq = seq + patches.length - 1; - const hadConcurrentEdits = block.seq !== expectedBlockSeq; + const hadConcurrentEdits = model.seq !== expectedBlockSeq; let patchesBack: StorePatch[] = []; - if (hadConcurrentEdits) { - patchesBack = await store.history(id, seq, block.seq); - } + if (hadConcurrentEdits) patchesBack = await store.history(id, seq, model.seq); return { - block, + model, patches: patchesBack, }; } @@ -95,6 +156,6 @@ export class BlocksServices { private async gc(): Promise { const ts = Date.now() - BLOCK_TTL; const {store} = this; - await store.removeOlderThan(ts); + await store.removeAccessedBefore(ts); } } diff --git a/src/server/services/blocks/MemoryStore.ts b/src/server/services/blocks/MemoryStore.ts index 85c060a7d4..491c7fec13 100644 --- a/src/server/services/blocks/MemoryStore.ts +++ b/src/server/services/blocks/MemoryStore.ts @@ -2,48 +2,36 @@ import {Model} from '../../../json-crdt'; import {Patch} from '../../../json-crdt-patch'; import type * as types from './types'; +const tick = new Promise((resolve) => setImmediate(resolve)); + export class MemoryStore implements types.Store { - protected readonly blocks = new Map(); + protected readonly models = new Map(); protected readonly patches = new Map(); public async get(id: string): Promise { - await new Promise((resolve) => setImmediate(resolve)); - const block = this.blocks.get(id); - if (!block) return; - return {block}; + await tick; + const model = this.models.get(id); + if (!model) return; + return {model}; + } + + public async seq(id: string): Promise { + await tick; + return this.models.get(id)?.seq; } - public async create(id: string, patches: types.StorePatch[]): Promise { - await new Promise((resolve) => setImmediate(resolve)); + public async create(id: string, model: types.StoreModel, patches: types.StorePatch[]): Promise { + await tick; if (!Array.isArray(patches)) throw new Error('NO_PATCHES'); - if (this.blocks.has(id)) throw new Error('BLOCK_EXISTS'); - const model = Model.withLogicalClock(); - let seq = -1; - const now = Date.now(); - if (patches.length) { - for (const patch of patches) { - seq++; - if (seq !== patch.seq) throw new Error('PATCHES_OUT_OF_ORDER'); - model.applyPatch(Patch.fromBinary(patch.blob)); - if (patch.created > now) patch.created = now; - } - } - const block: types.StoreBlock = { - id, - seq: seq, - blob: model.toBinary(), - created: now, - updated: now, - }; - this.blocks.set(id, block); + if (this.models.has(id)) throw new Error('BLOCK_EXISTS'); + this.models.set(id, model); this.patches.set(id, patches); - return {block}; } public async edit(id: string, patches: types.StorePatch[]): Promise { - await new Promise((resolve) => setImmediate(resolve)); + await tick; if (!Array.isArray(patches) || !patches.length) throw new Error('NO_PATCHES'); - const block = this.blocks.get(id); + const block = this.models.get(id); const existingPatches = this.patches.get(id); if (!block || !existingPatches) throw new Error('BLOCK_NOT_FOUND'); let seq = patches[0].seq; @@ -60,35 +48,40 @@ export class MemoryStore implements types.Store { block.blob = model.toBinary(); block.updated = Date.now(); for (const patch of patches) existingPatches.push(patch); - return {block}; + return {model: block}; } public async history(id: string, min: number, max: number): Promise { - await new Promise((resolve) => setImmediate(resolve)); + await tick; const patches = this.patches.get(id); if (!patches) return []; return patches.slice(min, max + 1); } - public async remove(id: string): Promise { - await new Promise((resolve) => setImmediate(resolve)); - this.removeSync(id); + public async remove(id: string): Promise { + await tick; + return this.removeSync(id); } - private removeSync(id: string): void { - this.blocks.delete(id); - this.patches.delete(id); + private removeSync(id: string): boolean { + this.models.delete(id); + return this.patches.delete(id); } public stats(): {blocks: number; patches: number} { return { - blocks: this.blocks.size, + blocks: this.models.size, patches: [...this.patches.values()].reduce((acc, v) => acc + v.length, 0), }; } public async removeOlderThan(ts: number): Promise { - await new Promise((resolve) => setImmediate(resolve)); - for (const [id, block] of this.blocks) if (block.created < ts) this.removeSync(id); + await tick; + for (const [id, block] of this.models) if (block.created < ts) this.removeSync(id); + } + + public async removeAccessedBefore(ts: number): Promise { + await tick; + for (const [id, block] of this.models) if (block.updated < ts) this.removeSync(id); } } diff --git a/src/server/services/blocks/types.ts b/src/server/services/blocks/types.ts index d6932bed34..89a22c4404 100644 --- a/src/server/services/blocks/types.ts +++ b/src/server/services/blocks/types.ts @@ -1,4 +1,4 @@ -export interface StoreBlock { +export interface StoreModel { id: string; seq: number; created: number; @@ -13,17 +13,63 @@ export interface StorePatch { } export interface Store { - create(id: string, patches: StorePatch[]): Promise; + /** + * Create a new block. + * + * @param id Block ID. + * @param patches Initial patches to apply to a new block. + * @returns Newly created block data. + */ + create(id: string, model: StoreModel, patches: StorePatch[]): Promise; + + /** + * Retrieve an existing block. + * + * @param id Block ID. + * @returns Block data, or `undefined` if the block does not exist. + */ get(id: string): Promise; + + /** + * Retrieve the sequence number of a block. + * + * @param id Block ID. + * @returns Block sequence number, or `undefined` if the block does not exist. + */ + seq(id: string): Promise; + + /** + * Edit an existing block by applying new patches. + * + * @param id Block ID. + * @param patches Patches to apply to the block. + * @returns Updated block data. + */ edit(id: string, patches: StorePatch[]): Promise; + + /** + * Retrieve the history of patches for a block. + * + * @param id Block ID. + * @param min Minimum sequence number. + * @param max Maximum sequence number. + * @returns List of patches. + */ history(id: string, min: number, max: number): Promise; - remove(id: string): Promise; + + /** + * Remove a block. + * + * @param id Block ID. + * @returns `true` if the block was removed, `false` if the block did not exist. + */ + remove(id: string): Promise; } export interface StoreGetResult { - block: StoreBlock; + model: StoreModel; } export interface StoreApplyResult { - block: StoreBlock; + model: StoreModel; }