From 884b560b6b44e8107565743bee688a2d5c93ff1c Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 7 Apr 2024 01:43:07 +0200 Subject: [PATCH 01/23] =?UTF-8?q?feat(json-crdt):=20=F0=9F=8E=B8=20improve?= =?UTF-8?q?=20LocalHistoryCrud=20implementation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/history/LocalHistoryCrud.ts | 14 +++++--- .../__tests__/LocalHistoryCrud.spec.ts | 34 +++++++++++++++++++ 2 files changed, 43 insertions(+), 5 deletions(-) diff --git a/src/json-crdt/history/LocalHistoryCrud.ts b/src/json-crdt/history/LocalHistoryCrud.ts index b19ba39140..67d1048e24 100644 --- a/src/json-crdt/history/LocalHistoryCrud.ts +++ b/src/json-crdt/history/LocalHistoryCrud.ts @@ -30,9 +30,8 @@ export class LocalHistoryCrud implements LocalHistory { protected readonly locks: Locks, ) {} - public async create(collection: string[], log: Log): Promise<{id: string}> { + public async create(collection: string[], log: Log, id: string = genId()): Promise<{id: string}> { const blob = this.encode(log); - const id = genId(); await this.lock(collection, id, async () => { await this.crud.put([...collection, id], STATE_FILE_NAME, blob, {throwIf: 'exists'}); }); @@ -54,12 +53,17 @@ export class LocalHistoryCrud implements LocalHistory { const {frontier} = this.decoder.decode(blob, {format: 'seq.cbor', frontier: true}); return { log: frontier!, - cursor: '', + cursor: '1', }; } - public readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}> { - throw new Error('Method not implemented.'); + public async readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}> { + const blob = await this.crud.get([...collection, id], STATE_FILE_NAME); + const {history} = this.decoder.decode(blob, {format: 'seq.cbor', history: true}); + return { + log: history!, + cursor: '', + }; } public async update(collection: string[], id: string, patches: Patch[]): Promise { diff --git a/src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts b/src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts index cf6443657d..4faaa1adc1 100644 --- a/src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts +++ b/src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts @@ -60,3 +60,37 @@ test('can delete a document', async () => { expect((err as Error).message).toBe(`Collection /test/${id} does not exist`); } }); + +test('can update document', async () => { + const {local} = await setup(); + const model = Model.withLogicalClock(); + model.api.root({ + foo: 'spam', + }); + const log = Log.fromNewModel(model); + const {id} = await local.create(['test'], log); + const {log: log2} = await local.read(['test'], id); + log2.end.api.obj([]).set({ + bar: 'eggs', + }); + const patch = log2.end.api.flush(); + await local.update(['test'], id, [patch]); + const {log: log3} = await local.read(['test'], id); + expect(log3.end.view()).toStrictEqual({ + foo: 'spam', + bar: 'eggs', + }); +}); + +test('can delete document', async () => { + const {local} = await setup(); + const model = Model.withLogicalClock(); + model.api.root({ + foo: 'spam', + }); + const log = Log.fromNewModel(model); + const {id} = await local.create(['test'], log); + await local.read(['test'], id); + await local.delete(['test'], id); + expect(() => local.read(['test'], id)).rejects.toThrow(`Collection /test/${id} does not exist`); +}); From 339826f854b1db83c50400198e8c06422b2010d9 Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 7 Apr 2024 14:53:40 +0200 Subject: [PATCH 02/23] =?UTF-8?q?chore(json-crdt):=20=F0=9F=A4=96=20change?= =?UTF-8?q?=20SessionHistory=20dependencies?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/history/SessionHistory.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/json-crdt/history/SessionHistory.ts b/src/json-crdt/history/SessionHistory.ts index 16622973ba..759ae9a507 100644 --- a/src/json-crdt/history/SessionHistory.ts +++ b/src/json-crdt/history/SessionHistory.ts @@ -5,6 +5,7 @@ import {ValNode} from '../nodes'; import {toSchema} from '../schema/toSchema'; import {Log} from '../log/Log'; import {RedoItem, UndoItem, UndoRedoStack} from './UndoRedoStack'; +import type {LocalHistory} from './types'; class Undo implements UndoItem { constructor(public readonly undo: () => Redo) {} @@ -15,7 +16,11 @@ class Redo implements RedoItem { } export class SessionHistory { - constructor(public readonly log: Log) {} + constructor( + public readonly collection: string[], + public readonly id: string, + protected readonly local: LocalHistory, + ) {} private readonly __onPatchRace = createRace(); From 0dd6756c3abfe5a19607fc9376284c01a675188d Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 7 Apr 2024 19:31:17 +0200 Subject: [PATCH 03/23] =?UTF-8?q?refactor(json-crdt):=20=F0=9F=92=A1=20mov?= =?UTF-8?q?e=20repo=20code=20to=20its=20own=20subfolder?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../LocalHistoryCrud.ts | 12 +-- src/json-crdt-repo/SessionHistory.ts | 78 +++++++++++++++++++ .../UndoRedoStack.ts | 0 .../__tests__/LocalHistoryCrud.spec.ts | 4 +- src/json-crdt-repo/remote/types.ts | 38 +++++++++ .../history => json-crdt-repo}/types.ts | 6 +- src/json-crdt/history/SessionHistory.ts | 77 ------------------ 7 files changed, 127 insertions(+), 88 deletions(-) rename src/{json-crdt/history => json-crdt-repo}/LocalHistoryCrud.ts (89%) create mode 100644 src/json-crdt-repo/SessionHistory.ts rename src/{json-crdt/history => json-crdt-repo}/UndoRedoStack.ts (100%) rename src/{json-crdt/history => json-crdt-repo}/__tests__/LocalHistoryCrud.spec.ts (96%) create mode 100644 src/json-crdt-repo/remote/types.ts rename src/{json-crdt/history => json-crdt-repo}/types.ts (91%) delete mode 100644 src/json-crdt/history/SessionHistory.ts diff --git a/src/json-crdt/history/LocalHistoryCrud.ts b/src/json-crdt-repo/LocalHistoryCrud.ts similarity index 89% rename from src/json-crdt/history/LocalHistoryCrud.ts rename to src/json-crdt-repo/LocalHistoryCrud.ts index 67d1048e24..1b0a6359c6 100644 --- a/src/json-crdt/history/LocalHistoryCrud.ts +++ b/src/json-crdt-repo/LocalHistoryCrud.ts @@ -1,11 +1,11 @@ -import {CborEncoder} from '../../json-pack/cbor/CborEncoder'; -import {CborDecoder} from '../../json-pack/cbor/CborDecoder'; -import {LogEncoder} from '../log/codec/LogEncoder'; -import {LogDecoder} from '../log/codec/LogDecoder'; +import {CborEncoder} from '../json-pack/cbor/CborEncoder'; +import {CborDecoder} from '../json-pack/cbor/CborDecoder'; +import {LogEncoder} from '../json-crdt/log/codec/LogEncoder'; +import {LogDecoder} from '../json-crdt/log/codec/LogDecoder'; import type {CrudApi} from 'memfs/lib/crud/types'; import type {Locks} from 'thingies/es2020/Locks'; -import type {Patch} from '../../json-crdt-patch'; -import type {Log} from '../log/Log'; +import type {Patch} from '../json-crdt-patch'; +import type {Log} from '../json-crdt/log/Log'; import type {LocalHistory} from './types'; export const genId = (octets: number = 8): string => { diff --git a/src/json-crdt-repo/SessionHistory.ts b/src/json-crdt-repo/SessionHistory.ts new file mode 100644 index 0000000000..6bbc7ef660 --- /dev/null +++ b/src/json-crdt-repo/SessionHistory.ts @@ -0,0 +1,78 @@ +import {createRace} from 'thingies/es2020/createRace'; +import {FanOutUnsubscribe} from 'thingies/es2020/fanout'; +import {InsValOp, Patch} from '../json-crdt-patch'; +import {ValNode} from '../json-crdt/nodes'; +import {toSchema} from '../json-crdt/schema/toSchema'; +import {Log} from '../json-crdt/log/Log'; +import {RedoItem, UndoItem, UndoRedoStack} from './UndoRedoStack'; +import type {LocalHistory} from './types'; + +class Undo implements UndoItem { + constructor(public readonly undo: () => Redo) {} +} + +class Redo implements RedoItem { + constructor(public readonly redo: () => Undo) {} +} + +export class SessionHistory { + constructor( + public readonly collection: string[], + public readonly id: string, + protected readonly local: LocalHistory, + ) {} + + private readonly __onPatchRace = createRace(); + + public attachUndoRedo(stack: UndoRedoStack): FanOutUnsubscribe { + // const onBeforePatch = (patch: Patch) => { + // this.__onPatchRace(() => { + // const undo = this.createUndo(patch); + // stack.push(undo); + // }); + // }; + // const unsubscribe = this.log.end.api.onBeforePatch.listen(onBeforePatch); + // return unsubscribe; + throw new Error('Method not implemented.'); + } + + public createUndo(patch: Patch): Undo { + const undoTasks: Array<() => void> = []; + const ops = patch.ops; + const length = ops.length; + for (let i = length - 1; i >= 0; i--) { + const op = ops[i]; + switch (op.name()) { + case 'ins_val': { + // const insOp = op as InsValOp; + // const valNode = this.log.end.index.get(insOp.obj); + // if (!(valNode instanceof ValNode)) throw new Error('INVALID_NODE'); + // const copy = toSchema(valNode.node()); + // const valNodeId = valNode.id; + // const task = () => { + // const end = this.log.end; + // const valNode = end.index.get(valNodeId); + // if (!valNode) return; + // end.api.wrap(valNode).asVal().set(copy); + // }; + // undoTasks.push(task); + } + } + } + const undo = new Undo(() => { + this.__onPatchRace(() => { + for (const task of undoTasks) task(); + }); + return new Redo(() => { + const undo = this.__onPatchRace(() => { + // // TODO: This line needs to be changed: + // const redoPatch = Patch.fromBinary(patch.toBinary()); + // this.log.end.api.builder.patch = redoPatch; + // return this.createUndo(redoPatch); + }); + return undo!; + }); + }); + return undo; + } +} diff --git a/src/json-crdt/history/UndoRedoStack.ts b/src/json-crdt-repo/UndoRedoStack.ts similarity index 100% rename from src/json-crdt/history/UndoRedoStack.ts rename to src/json-crdt-repo/UndoRedoStack.ts diff --git a/src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts b/src/json-crdt-repo/__tests__/LocalHistoryCrud.spec.ts similarity index 96% rename from src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts rename to src/json-crdt-repo/__tests__/LocalHistoryCrud.spec.ts index 4faaa1adc1..234b206db3 100644 --- a/src/json-crdt/history/__tests__/LocalHistoryCrud.spec.ts +++ b/src/json-crdt-repo/__tests__/LocalHistoryCrud.spec.ts @@ -2,8 +2,8 @@ import {memfs} from 'memfs'; import {NodeCrud} from 'memfs/lib/node-to-crud'; import {Locks} from 'thingies/es2020/Locks'; import {LocalHistoryCrud} from '../LocalHistoryCrud'; -import {Log} from '../../log/Log'; -import {Model} from '../../model'; +import {Log} from '../../json-crdt/log/Log'; +import {Model} from '../../json-crdt/model'; const setup = async () => { const {fs, vol} = memfs(); diff --git a/src/json-crdt-repo/remote/types.ts b/src/json-crdt-repo/remote/types.ts new file mode 100644 index 0000000000..53c8404e10 --- /dev/null +++ b/src/json-crdt-repo/remote/types.ts @@ -0,0 +1,38 @@ +import type {Patch} from '../../json-crdt-patch'; +import type {Model} from '../../json-crdt/model'; + +/** + * A history of patches that have been applied to a model, stored on the + * "remote": (1) server; (2) content addressable storage; or (3) somewhere in a + * peer-to-peer network. + * + * Cases: + * + * - `RemoteHistoryServer` + * - `RemoteHistoryServerIdempotent` + * - `RemoteHistoryCAS` + * - `RemoteHistoryP2P` + */ +export interface RemoteHistory { + create(id: string, patches: Patch[], start?: Model): Promise; + + /** + * Load latest state of the model, and any unmerged "tip" of patches + * it might have. + * + * @todo Maybe `state` and `tip` should be serialized to JSON? + */ + loadLatest(id: string): Promise<[cursor: Cursor, state: Model]>; + + loadAfter(id: string, cursor: Cursor): Promise<[cursor: Cursor, tip: Patch[]]>; + + loadBefore(id: string, cursor: Cursor): Promise<[cursor: Cursor, state: Model, tip: Patch[]]>; + + apply(id: string, patches: Patch[]): Promise; + + /** + * Subscribe to the latest changes to the model. + * @param callback + */ + subscribe(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void; +} diff --git a/src/json-crdt/history/types.ts b/src/json-crdt-repo/types.ts similarity index 91% rename from src/json-crdt/history/types.ts rename to src/json-crdt-repo/types.ts index 59fcce0054..556859e1e0 100644 --- a/src/json-crdt/history/types.ts +++ b/src/json-crdt-repo/types.ts @@ -1,6 +1,6 @@ -import type {Patch} from '../../json-crdt-patch'; -import type {Log} from '../log/Log'; -import type {Model} from '../model'; +import type {Patch} from '../json-crdt-patch'; +import type {Log} from '../json-crdt/log/Log'; +import type {Model} from '../json-crdt/model'; /** * A history of patches that have been applied to a model, stored on the diff --git a/src/json-crdt/history/SessionHistory.ts b/src/json-crdt/history/SessionHistory.ts deleted file mode 100644 index 759ae9a507..0000000000 --- a/src/json-crdt/history/SessionHistory.ts +++ /dev/null @@ -1,77 +0,0 @@ -import {createRace} from 'thingies/es2020/createRace'; -import {FanOutUnsubscribe} from 'thingies/es2020/fanout'; -import {InsValOp, Patch} from '../../json-crdt-patch'; -import {ValNode} from '../nodes'; -import {toSchema} from '../schema/toSchema'; -import {Log} from '../log/Log'; -import {RedoItem, UndoItem, UndoRedoStack} from './UndoRedoStack'; -import type {LocalHistory} from './types'; - -class Undo implements UndoItem { - constructor(public readonly undo: () => Redo) {} -} - -class Redo implements RedoItem { - constructor(public readonly redo: () => Undo) {} -} - -export class SessionHistory { - constructor( - public readonly collection: string[], - public readonly id: string, - protected readonly local: LocalHistory, - ) {} - - private readonly __onPatchRace = createRace(); - - public attachUndoRedo(stack: UndoRedoStack): FanOutUnsubscribe { - const onBeforePatch = (patch: Patch) => { - this.__onPatchRace(() => { - const undo = this.createUndo(patch); - stack.push(undo); - }); - }; - const unsubscribe = this.log.end.api.onBeforePatch.listen(onBeforePatch); - return unsubscribe; - } - - public createUndo(patch: Patch): Undo { - const undoTasks: Array<() => void> = []; - const ops = patch.ops; - const length = ops.length; - for (let i = length - 1; i >= 0; i--) { - const op = ops[i]; - switch (op.name()) { - case 'ins_val': { - const insOp = op as InsValOp; - const valNode = this.log.end.index.get(insOp.obj); - if (!(valNode instanceof ValNode)) throw new Error('INVALID_NODE'); - const copy = toSchema(valNode.node()); - const valNodeId = valNode.id; - const task = () => { - const end = this.log.end; - const valNode = end.index.get(valNodeId); - if (!valNode) return; - end.api.wrap(valNode).asVal().set(copy); - }; - undoTasks.push(task); - } - } - } - const undo = new Undo(() => { - this.__onPatchRace(() => { - for (const task of undoTasks) task(); - }); - return new Redo(() => { - const undo = this.__onPatchRace(() => { - // TODO: This line needs to be changed: - const redoPatch = Patch.fromBinary(patch.toBinary()); - this.log.end.api.builder.patch = redoPatch; - return this.createUndo(redoPatch); - }); - return undo!; - }); - }); - return undo; - } -} From 0d3c8c9148f2db4bb25408bc054e16ad8acc8d44 Mon Sep 17 00:00:00 2001 From: Vadim Dalecky Date: Wed, 10 Apr 2024 11:34:04 +0200 Subject: [PATCH 04/23] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20improve=20create?= =?UTF-8?q?=20schema?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/routes/blocks/schema.ts | 23 ++++++++++++++++++++++- 1 file changed, 22 insertions(+), 1 deletion(-) diff --git a/src/server/routes/blocks/schema.ts b/src/server/routes/blocks/schema.ts index 3f9e09f8b4..7fb9483b5a 100644 --- a/src/server/routes/blocks/schema.ts +++ b/src/server/routes/blocks/schema.ts @@ -16,6 +16,8 @@ export const BlockSeq = t.num.options({ }); export type TBlock = ResolveType; + +// prettier-ignore export const Block = t.Object( t.prop('id', t.Ref('BlockId')), t.prop('seq', t.Ref('BlockSeq')), @@ -25,4 +27,23 @@ export const Block = t.Object( ); export type TBlockPatch = ResolveType; -export const BlockPatch = t.Object(t.prop('seq', t.num), t.prop('created', t.num), t.prop('blob', t.bin)); + +// prettier-ignore +export const BlockPatch = t.Object( + t.prop('seq', t.num).options({ + title: 'Patch Sequence Number', + description: 'The sequence number of the patch in the block. A monotonically increasing integer, starting from 0.', + }), + t.prop('created', t.num).options({ + title: 'Patch Creation Time', + description: 'The time when the patch was created, in milliseconds since the Unix epoch.' + + '\n\n' + + 'This time is set by the server when the patch received and stored on the server. If you ' + + 'want to also store the time when the patch was created by the user, you can include this ' + + 'information in the patch blob itself.', + }), + t.prop('blob', t.bin).options({ + title: 'Patch Blob', + description: 'The binary data of the patch. The format of the data is defined by the patch type.', + }), +); From 166f20999e92de2dee2dc12e54c9a1ca4f50c36c Mon Sep 17 00:00:00 2001 From: Vadim Dalecky Date: Wed, 10 Apr 2024 11:34:20 +0200 Subject: [PATCH 05/23] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20initialize=20servi?= =?UTF-8?q?ce=20by=20default?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/routes/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/routes/index.ts b/src/server/routes/index.ts index 8d51c1f401..812c108169 100644 --- a/src/server/routes/index.ts +++ b/src/server/routes/index.ts @@ -4,7 +4,7 @@ import {RpcValue} from '../../reactive-rpc/common/messages/Value'; import {ObjectValueCaller} from '../../reactive-rpc/common/rpc/caller/ObjectValueCaller'; import {system} from './system'; import {ObjectValue} from '../../json-type-value/ObjectValue'; -import type {Services} from '../services/Services'; +import {Services} from '../services/Services'; import type {RouteDeps} from './types'; export const createRouter = (services: Services) => { @@ -18,7 +18,7 @@ export const createRouter = (services: Services) => { return routes(deps)(router); }; -export const createCaller = (services: Services) => { +export const createCaller = (services: Services = new Services()) => { const router = createRouter(services); const caller = new ObjectValueCaller({ router, From d7aa2733be684516f427a8b8ad985c79bd8d9f25 Mon Sep 17 00:00:00 2001 From: Vadim Dalecky Date: Wed, 10 Apr 2024 11:34:58 +0200 Subject: [PATCH 06/23] =?UTF-8?q?feat:=20=F0=9F=8E=B8=20improve=20RPC=20ty?= =?UTF-8?q?pes?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/reactive-rpc/common/index.ts | 1 + .../common/testing/buildE2eClient.ts | 25 +++---------------- src/reactive-rpc/common/types.ts | 19 ++++++++++++++ src/server/index.ts | 2 ++ 4 files changed, 25 insertions(+), 22 deletions(-) create mode 100644 src/reactive-rpc/common/types.ts diff --git a/src/reactive-rpc/common/index.ts b/src/reactive-rpc/common/index.ts index d988e9bd86..0859fd73d9 100644 --- a/src/reactive-rpc/common/index.ts +++ b/src/reactive-rpc/common/index.ts @@ -1,3 +1,4 @@ +export * from './types'; export * from './messages'; export * from './channel'; export * from './rpc'; diff --git a/src/reactive-rpc/common/testing/buildE2eClient.ts b/src/reactive-rpc/common/testing/buildE2eClient.ts index 937a200fb4..0958b5d727 100644 --- a/src/reactive-rpc/common/testing/buildE2eClient.ts +++ b/src/reactive-rpc/common/testing/buildE2eClient.ts @@ -6,14 +6,8 @@ import {RpcCodecs} from '../codec/RpcCodecs'; import {RpcMessageCodecs} from '../codec/RpcMessageCodecs'; import {ReactiveRpcClientMessage, ReactiveRpcMessage, ReactiveRpcServerMessage} from '../messages'; import {RpcMessageStreamProcessor, StreamingRpcClient, TypedRpcClient} from '../rpc'; -import type {FunctionStreamingType, FunctionType} from '../../../json-type/type/classes'; -import type {Observable} from 'rxjs'; -import type {ResolveType} from '../../../json-type'; -import type {TypeRouter} from '../../../json-type/system/TypeRouter'; -import type {TypeRouterCaller} from '../rpc/caller/TypeRouterCaller'; import type {RpcCaller} from '../rpc/caller/RpcCaller'; -import type {ObjectValueCaller} from '../rpc/caller/ObjectValueCaller'; -import type {ObjectValue, ObjectValueToTypeMap, UnObjectType} from '../../../json-type-value/ObjectValue'; +import type {CallerToMethods} from '../types'; export interface BuildE2eClientOptions { /** @@ -67,7 +61,7 @@ export interface BuildE2eClientOptions { token?: string; } -export const buildE2eClient = >(caller: Caller, opt: BuildE2eClientOptions) => { +export const buildE2eClient = >(caller: Caller, opt: BuildE2eClientOptions = {}) => { const writer = opt.writer ?? new Writer(Fuzzer.randomInt2(opt.writerDefaultBufferKb ?? [4, 4]) * 1024); const codecs = new RpcCodecs(new Codecs(writer), new RpcMessageCodecs()); const ctx = new ConnectionContext( @@ -103,21 +97,8 @@ export const buildE2eClient = >(caller: Caller, op bufferSize: Fuzzer.randomInt2(opt.clientBufferSize ?? [1, 1]), bufferTime: Fuzzer.randomInt2(opt.clientBufferTime ?? [0, 0]), }); - type Router = UnTypeRouterCaller; - type Routes = UnTypeRouter; - type Methods = {[K in keyof Routes]: UnwrapFunction}; - const typedClient = client as TypedRpcClient; + const typedClient = client as TypedRpcClient>; return { client: typedClient, }; }; - -type UnTypeRouterCaller = T extends TypeRouterCaller ? R : T extends ObjectValueCaller ? R : never; -type UnTypeRouter = - T extends TypeRouter ? R : T extends ObjectValue ? ObjectValueToTypeMap> : never; -type UnwrapFunction = - F extends FunctionType - ? (req: ResolveType) => Promise> - : F extends FunctionStreamingType - ? (req$: Observable>) => Observable> - : never; diff --git a/src/reactive-rpc/common/types.ts b/src/reactive-rpc/common/types.ts new file mode 100644 index 0000000000..a1b2f96f0e --- /dev/null +++ b/src/reactive-rpc/common/types.ts @@ -0,0 +1,19 @@ +import type {Observable} from "rxjs"; +import type {FunctionStreamingType, FunctionType, ResolveType} from "../../json-type"; +import type {ObjectValue, ObjectValueToTypeMap, UnObjectType} from "../../json-type-value/ObjectValue"; +import type {TypeRouter} from "../../json-type/system/TypeRouter"; +import type {ObjectValueCaller} from "./rpc/caller/ObjectValueCaller"; +import type {RpcCaller} from "./rpc/caller/RpcCaller"; +import type {TypeRouterCaller} from "./rpc/caller/TypeRouterCaller"; + +export type CallerToMethods> = {[K in keyof UnTypeRouter>]: UnwrapFunction>[K]>}; + +type UnTypeRouterCaller = T extends TypeRouterCaller ? R : T extends ObjectValueCaller ? R : never; +type UnTypeRouter = + T extends TypeRouter ? R : T extends ObjectValue ? ObjectValueToTypeMap> : never; +type UnwrapFunction = + F extends FunctionType + ? (req: ResolveType) => Promise> + : F extends FunctionStreamingType + ? (req$: Observable>) => Observable> + : never; diff --git a/src/server/index.ts b/src/server/index.ts index 0aef51667e..dfe587f508 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -7,6 +7,8 @@ import {Services} from './services/Services'; import type {MyCtx} from './services/types'; import {RpcServer} from '../reactive-rpc/server/http1/RpcServer'; +export type JsonJoyDemoRpcCaller = ReturnType['caller']; + const app = new RpcApp({ uws: App({}), caller: createCaller(new Services()).caller, From 05af834cafae1fdd4f0df6b2a3300431aaa6b676 Mon Sep 17 00:00:00 2001 From: Vadim Dalecky Date: Wed, 10 Apr 2024 11:35:19 +0200 Subject: [PATCH 07/23] =?UTF-8?q?chore(json-crdt):=20=F0=9F=A4=96=20start?= =?UTF-8?q?=20RemoteHistoryServer=20implementation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../remote/RemoteHistoryServer.ts | 68 +++++++++++++++++++ .../__tests__/RemoteHistoryServer.spec.ts | 26 +++++++ src/json-crdt-repo/remote/types.ts | 23 +++---- src/json-crdt-repo/types.ts | 34 ---------- 4 files changed, 103 insertions(+), 48 deletions(-) create mode 100644 src/json-crdt-repo/remote/RemoteHistoryServer.ts create mode 100644 src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts diff --git a/src/json-crdt-repo/remote/RemoteHistoryServer.ts b/src/json-crdt-repo/remote/RemoteHistoryServer.ts new file mode 100644 index 0000000000..5f89a5a009 --- /dev/null +++ b/src/json-crdt-repo/remote/RemoteHistoryServer.ts @@ -0,0 +1,68 @@ +import {Patch} from "../../json-crdt-patch"; +import {Log} from "../../json-crdt/log/Log"; +import {JsonJoyDemoRpcCaller} from '../../server'; +import {CallerToMethods, TypedRpcClient} from '../../reactive-rpc/common'; +import type {RemoteHistory} from "./types"; +import {Model} from "../../json-crdt/model"; + +type Methods = CallerToMethods; + +export type Cursor = number; + +export class RemoteHistoryServer implements RemoteHistory { + constructor (protected readonly client: TypedRpcClient) {} + + public async create(id: string, patches: Patch[]): Promise { + await this.client.call('blocks.create', { + id, + patches: patches.map((patch, seq) => ({ + // TODO: seq and created can be skipped in create call. + seq, + created: Date.now(), + blob: patch.toBinary(), + })), + }); + } + + /** + * Load latest state of the model, and any unmerged "tip" of patches + * it might have. + * + * @todo Maybe `state` and `tip` should be serialized to JSON? + */ + public async read(id: string): Promise<{cursor: Cursor, log: Log}> { + const {block, patches} = await this.client.call('blocks.get', {id}); + const log = new Log(() => Model.fromBinary(block.blob)); + for (const patch of patches) log.end.applyPatch(Patch.fromBinary(patch.blob)); + // TODO: Preserver block metadata: block.created, block.updated, block.seq. + // TODO: Preserver patch metadata: patch.created, patch.seq. + return { + cursor: block.seq, + log, + }; + } + + public async scanAhead(id: string, cursor: Cursor): Promise<{cursor: Cursor, tip: Patch[]}> { + throw new Error('Method not implemented.'); + } + + public async scanBehind(id: string, cursor: Cursor): Promise<{cursor: Cursor, log: Log}> { + throw new Error('Method not implemented.'); + } + + public async update(id: string, cursor: Cursor, patches: Patch[]): Promise { + throw new Error('Method not implemented.'); + } + + public async delete?(id: string): Promise { + throw new Error('Method not implemented.'); + } + + /** + * Subscribe to the latest changes to the model. + * @param callback + */ + public listen(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void { + throw new Error('Method not implemented.'); + } +} diff --git a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts new file mode 100644 index 0000000000..8351776f76 --- /dev/null +++ b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts @@ -0,0 +1,26 @@ +import {Model} from '../../../json-crdt/model'; +import {buildE2eClient} from '../../../reactive-rpc/common/testing/buildE2eClient'; +import {createCaller} from '../../../server/routes/index'; +import {RemoteHistoryServer} from '../RemoteHistoryServer'; + +const setup = () => { + const {caller, router} = createCaller(); + const {client} = buildE2eClient(caller); + const remote = new RemoteHistoryServer(client); + + return { + caller, + router, + client, + remote, + }; +}; + +test('...', async () => { + const {remote} = await setup(); + const model = Model.withLogicalClock(); + model.api.root({foo: 'bar'}); + const patch = model.api.flush(); + + await remote.create('1234567890', [patch]); +}); diff --git a/src/json-crdt-repo/remote/types.ts b/src/json-crdt-repo/remote/types.ts index 53c8404e10..ffef8b67ad 100644 --- a/src/json-crdt-repo/remote/types.ts +++ b/src/json-crdt-repo/remote/types.ts @@ -1,20 +1,13 @@ +import {Log} from '../../json-crdt/log/Log'; import type {Patch} from '../../json-crdt-patch'; -import type {Model} from '../../json-crdt/model'; /** * A history of patches that have been applied to a model, stored on the * "remote": (1) server; (2) content addressable storage; or (3) somewhere in a * peer-to-peer network. - * - * Cases: - * - * - `RemoteHistoryServer` - * - `RemoteHistoryServerIdempotent` - * - `RemoteHistoryCAS` - * - `RemoteHistoryP2P` */ export interface RemoteHistory { - create(id: string, patches: Patch[], start?: Model): Promise; + create(id: string, patches: Patch[]): Promise; /** * Load latest state of the model, and any unmerged "tip" of patches @@ -22,17 +15,19 @@ export interface RemoteHistory { * * @todo Maybe `state` and `tip` should be serialized to JSON? */ - loadLatest(id: string): Promise<[cursor: Cursor, state: Model]>; + read(id: string): Promise<{cursor: string, log: Log}>; - loadAfter(id: string, cursor: Cursor): Promise<[cursor: Cursor, tip: Patch[]]>; + scanAhead(id: string, cursor: Cursor): Promise<{cursor: Cursor, tip: Patch[]}>; - loadBefore(id: string, cursor: Cursor): Promise<[cursor: Cursor, state: Model, tip: Patch[]]>; + scanBehind(id: string, cursor: Cursor): Promise<{cursor: Cursor, log: Log}>; - apply(id: string, patches: Patch[]): Promise; + update(id: string, cursor: Cursor, patches: Patch[]): Promise; + + delete?(id: string): Promise; /** * Subscribe to the latest changes to the model. * @param callback */ - subscribe(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void; + listen(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void; } diff --git a/src/json-crdt-repo/types.ts b/src/json-crdt-repo/types.ts index 556859e1e0..fa8a7a576f 100644 --- a/src/json-crdt-repo/types.ts +++ b/src/json-crdt-repo/types.ts @@ -2,40 +2,6 @@ import type {Patch} from '../json-crdt-patch'; import type {Log} from '../json-crdt/log/Log'; import type {Model} from '../json-crdt/model'; -/** - * A history of patches that have been applied to a model, stored on the - * "remote": (1) server; (2) content addressable storage; or (3) peer-to-peer - * network. - * - * Cases: - * - * - `RemoteHistoryServer` - * - `RemoteHistoryServerIdempotent` - * - `RemoteHistoryCAS` - * - `RemoteHistoryP2P` - */ -export interface RemoteHistory { - /** - * Load latest state of the model, and any unmerged "tip" of patches - * it might have. - * - * @todo Maybe `state` and `tip` should be serialized to JSON? - */ - loadLatest(id: string): Promise<[cursor: Cursor, state: Model]>; - - loadAfter(id: string, cursor: Cursor): Promise<[cursor: Cursor, tip: Patch[]]>; - - loadBefore(id: string, cursor: Cursor): Promise<[cursor: Cursor, state: Model, tip: Patch[]]>; - - apply(id: string, patches: Patch[]): Promise; - - /** - * Subscribe to the latest changes to the model. - * @param callback - */ - subscribe(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void; -} - export interface LocalHistory { create(collection: string[], log: Log): Promise<{id: string}>; read(collection: string[], id: string): Promise<{log: Log; cursor: string}>; From 1b17c684c124ab42e008f7eef71a0300d900001a Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 10 Apr 2024 14:26:01 +0200 Subject: [PATCH 08/23] =?UTF-8?q?feat(json-crdt):=20=F0=9F=8E=B8=20progres?= =?UTF-8?q?s=20on=20remote=20history=20implementation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../remote/RemoteHistoryDemoServer.ts | 97 +++++++++++++++++++ .../remote/RemoteHistoryServer.ts | 68 ------------- .../__tests__/RemoteHistoryServer.spec.ts | 4 +- src/json-crdt-repo/remote/types.ts | 25 +++-- 4 files changed, 114 insertions(+), 80 deletions(-) create mode 100644 src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts delete mode 100644 src/json-crdt-repo/remote/RemoteHistoryServer.ts diff --git a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts new file mode 100644 index 0000000000..94e6586c27 --- /dev/null +++ b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts @@ -0,0 +1,97 @@ +import type {CallerToMethods, TypedRpcClient} from '../../reactive-rpc/common'; +import type {JsonJoyDemoRpcCaller} from '../../server'; +import type {RemoteHistory, RemoteModel, RemotePatch} from "./types"; + +type Methods = CallerToMethods; + +export type Cursor = number; + +export interface RemoteServerModel extends RemoteModel { + seq: number; + created: number; + updated: number; +} + +export interface RemoteServerPatch extends RemotePatch { + seq: number; +} + +export class RemoteHistoryDemoServer implements RemoteHistory { + constructor (protected readonly client: TypedRpcClient) {} + + public async create(id: string, patches: RemotePatch[]): Promise { + await this.client.call('blocks.create', { + id, + patches: patches.map((patch, seq) => ({ + // TODO: seq and created should be set on server. (And returned back?) + seq, + created: Date.now(), + blob: patch.blob, + })), + }); + } + + /** + * Load latest state of the model, and any unmerged "tip" of patches + * it might have. + */ + public async read(id: string): Promise<{cursor: Cursor, model: RemoteServerModel, patches: RemoteServerPatch[]}> { + const {block, patches} = await this.client.call('blocks.get', {id}); + return { + cursor: block.seq, + model: block, + patches, + }; + } + + public async scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor, patches: RemoteServerPatch[]}> { + const limit = 100; + const res = await this.client.call('blocks.history', { + id, + min: cursor, + max: cursor + limit, + }); + if (res.patches.length === 0) { + return { + cursor, + patches: [], + }; + } + return { + cursor: res.patches[res.patches.length - 1].seq, + patches: res.patches, + }; + } + + public async scanBwd(id: string, cursor: Cursor): Promise<{cursor: Cursor, model: RemoteServerModel, patches: RemoteServerPatch[]}> { + throw new Error('The "blocks.history" should be able to return starting model.'); + } + + public async update(id: string, cursor: Cursor, patches: RemotePatch[]): Promise<{cursor: Cursor, patches: RemoteServerPatch[]}> { + + const res = await this.client.call('blocks.edit', { + id, + patches: patches.map((patch, seq) => ({ + seq, + created: Date.now(), + blob: patch.blob, + })), + }); + return { + cursor: res.patches.length ? res.patches[res.patches.length - 1].seq : cursor, + patches: res.patches, + }; + } + + public async delete?(id: string): Promise { + await this.client.call('blocks.remove', {id}); + } + + /** + * Subscribe to the latest changes to the model. + * @param callback + */ + public listen(id: string, cursor: Cursor, callback: (changes: RemoteServerPatch[]) => void): void { + throw new Error('Method not implemented.'); + } +} diff --git a/src/json-crdt-repo/remote/RemoteHistoryServer.ts b/src/json-crdt-repo/remote/RemoteHistoryServer.ts deleted file mode 100644 index 5f89a5a009..0000000000 --- a/src/json-crdt-repo/remote/RemoteHistoryServer.ts +++ /dev/null @@ -1,68 +0,0 @@ -import {Patch} from "../../json-crdt-patch"; -import {Log} from "../../json-crdt/log/Log"; -import {JsonJoyDemoRpcCaller} from '../../server'; -import {CallerToMethods, TypedRpcClient} from '../../reactive-rpc/common'; -import type {RemoteHistory} from "./types"; -import {Model} from "../../json-crdt/model"; - -type Methods = CallerToMethods; - -export type Cursor = number; - -export class RemoteHistoryServer implements RemoteHistory { - constructor (protected readonly client: TypedRpcClient) {} - - public async create(id: string, patches: Patch[]): Promise { - await this.client.call('blocks.create', { - id, - patches: patches.map((patch, seq) => ({ - // TODO: seq and created can be skipped in create call. - seq, - created: Date.now(), - blob: patch.toBinary(), - })), - }); - } - - /** - * Load latest state of the model, and any unmerged "tip" of patches - * it might have. - * - * @todo Maybe `state` and `tip` should be serialized to JSON? - */ - public async read(id: string): Promise<{cursor: Cursor, log: Log}> { - const {block, patches} = await this.client.call('blocks.get', {id}); - const log = new Log(() => Model.fromBinary(block.blob)); - for (const patch of patches) log.end.applyPatch(Patch.fromBinary(patch.blob)); - // TODO: Preserver block metadata: block.created, block.updated, block.seq. - // TODO: Preserver patch metadata: patch.created, patch.seq. - return { - cursor: block.seq, - log, - }; - } - - public async scanAhead(id: string, cursor: Cursor): Promise<{cursor: Cursor, tip: Patch[]}> { - throw new Error('Method not implemented.'); - } - - public async scanBehind(id: string, cursor: Cursor): Promise<{cursor: Cursor, log: Log}> { - throw new Error('Method not implemented.'); - } - - public async update(id: string, cursor: Cursor, patches: Patch[]): Promise { - throw new Error('Method not implemented.'); - } - - public async delete?(id: string): Promise { - throw new Error('Method not implemented.'); - } - - /** - * Subscribe to the latest changes to the model. - * @param callback - */ - public listen(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void { - throw new Error('Method not implemented.'); - } -} diff --git a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts index 8351776f76..cf0623d132 100644 --- a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts +++ b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts @@ -1,12 +1,12 @@ import {Model} from '../../../json-crdt/model'; import {buildE2eClient} from '../../../reactive-rpc/common/testing/buildE2eClient'; import {createCaller} from '../../../server/routes/index'; -import {RemoteHistoryServer} from '../RemoteHistoryServer'; +import {RemoteHistoryDemoServer} from '../RemoteHistoryServer'; const setup = () => { const {caller, router} = createCaller(); const {client} = buildE2eClient(caller); - const remote = new RemoteHistoryServer(client); + const remote = new RemoteHistoryDemoServer(client); return { caller, diff --git a/src/json-crdt-repo/remote/types.ts b/src/json-crdt-repo/remote/types.ts index ffef8b67ad..2e8ac571a4 100644 --- a/src/json-crdt-repo/remote/types.ts +++ b/src/json-crdt-repo/remote/types.ts @@ -1,13 +1,10 @@ -import {Log} from '../../json-crdt/log/Log'; -import type {Patch} from '../../json-crdt-patch'; - /** * A history of patches that have been applied to a model, stored on the * "remote": (1) server; (2) content addressable storage; or (3) somewhere in a * peer-to-peer network. */ -export interface RemoteHistory { - create(id: string, patches: Patch[]): Promise; +export interface RemoteHistory { + create(id: string, patches: RemotePatch[]): Promise; /** * Load latest state of the model, and any unmerged "tip" of patches @@ -15,13 +12,13 @@ export interface RemoteHistory { * * @todo Maybe `state` and `tip` should be serialized to JSON? */ - read(id: string): Promise<{cursor: string, log: Log}>; + read(id: string): Promise<{cursor: Cursor, model: M, patches: P[]}>; - scanAhead(id: string, cursor: Cursor): Promise<{cursor: Cursor, tip: Patch[]}>; + scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor, patches: P[]}>; - scanBehind(id: string, cursor: Cursor): Promise<{cursor: Cursor, log: Log}>; + scanBwd(id: string, cursor: Cursor): Promise<{cursor: Cursor, model: M, patches: P[]}>; - update(id: string, cursor: Cursor, patches: Patch[]): Promise; + update(id: string, cursor: Cursor, patches: RemotePatch[]): Promise<{cursor: Cursor, patches: P[]}>; delete?(id: string): Promise; @@ -29,5 +26,13 @@ export interface RemoteHistory { * Subscribe to the latest changes to the model. * @param callback */ - listen(id: string, cursor: Cursor, callback: (changes: Patch[]) => void): void; + listen(id: string, cursor: Cursor, callback: (patches: P[]) => void): void; +} + +export interface RemoteModel { + blob: Uint8Array; +} + +export interface RemotePatch { + blob: Uint8Array; } From ac22950e6ab3cc90700eb0f7cbe6a4d2f5beeb6e Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 10 Apr 2024 14:26:31 +0200 Subject: [PATCH 09/23] =?UTF-8?q?chore:=20=F0=9F=A4=96=20add=20todos=20for?= =?UTF-8?q?=20demo=20server=20APIS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/routes/blocks/methods/create.ts | 1 + src/server/routes/blocks/methods/edit.ts | 1 + src/server/routes/blocks/methods/get.ts | 3 ++- src/server/routes/blocks/methods/history.ts | 1 + src/server/routes/blocks/methods/remove.ts | 1 + src/server/routes/routes.ts | 1 + 6 files changed, 7 insertions(+), 1 deletion(-) diff --git a/src/server/routes/blocks/methods/create.ts b/src/server/routes/blocks/methods/create.ts index 2f7dec6121..303dbf153a 100644 --- a/src/server/routes/blocks/methods/create.ts +++ b/src/server/routes/blocks/methods/create.ts @@ -1,6 +1,7 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId, BlockPatch} from '../schema'; +// TODO: Rename to "new", like "block.new"? export const create = ({t, services}: RouteDeps) => (r: Router) => { diff --git a/src/server/routes/blocks/methods/edit.ts b/src/server/routes/blocks/methods/edit.ts index cea15b0ebd..c4c938a325 100644 --- a/src/server/routes/blocks/methods/edit.ts +++ b/src/server/routes/blocks/methods/edit.ts @@ -1,6 +1,7 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId, BlockPatch} from '../schema'; +// TODO: Rename to "set"? export const edit = ({t, services}: RouteDeps) => (r: Router) => { diff --git a/src/server/routes/blocks/methods/get.ts b/src/server/routes/blocks/methods/get.ts index 7ff48d2d8a..ccd496774f 100644 --- a/src/server/routes/blocks/methods/get.ts +++ b/src/server/routes/blocks/methods/get.ts @@ -12,7 +12,8 @@ export const get = ); const Response = t.Object( - t.prop('block', t.Ref('Block').options({})), + // TODO: Rename this field to `model` or `state`. + t.prop('block', t.Ref('Block')), t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ title: 'Patches', description: 'The list of all patches.', diff --git a/src/server/routes/blocks/methods/history.ts b/src/server/routes/blocks/methods/history.ts index ea71ef4f2e..cdab447288 100644 --- a/src/server/routes/blocks/methods/history.ts +++ b/src/server/routes/blocks/methods/history.ts @@ -1,6 +1,7 @@ import type {BlockPatch, BlockId} from '../schema'; import type {RouteDeps, Router, RouterBase} from '../../types'; +// TODO: Rename to "scan". export const history = ({t, services}: RouteDeps) => (r: Router) => { diff --git a/src/server/routes/blocks/methods/remove.ts b/src/server/routes/blocks/methods/remove.ts index ae3400d397..e6a7cda856 100644 --- a/src/server/routes/blocks/methods/remove.ts +++ b/src/server/routes/blocks/methods/remove.ts @@ -1,6 +1,7 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId} from '../schema'; +// TODO: rename to "del". export const remove = ({t, services}: RouteDeps) => (r: Router) => { diff --git a/src/server/routes/routes.ts b/src/server/routes/routes.ts index 3459273faf..5f2fc47fe8 100644 --- a/src/server/routes/routes.ts +++ b/src/server/routes/routes.ts @@ -11,5 +11,6 @@ export const routes = (d: RouteDeps) => >(r: ObjectVal ( util(d) ( pubsub(d) ( presence(d) + // TODO: rename "blocks" to "block", in all methods. ( blocks(d) ( r ))))); From 89d4851621ad0f5105fb65d115bebd8de3780f19 Mon Sep 17 00:00:00 2001 From: streamich Date: Thu, 11 Apr 2024 01:34:01 +0200 Subject: [PATCH 10/23] =?UTF-8?q?test(json-crdt):=20=F0=9F=92=8D=20add=20r?= =?UTF-8?q?emote=20history=20smoke=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../__tests__/RemoteHistoryServer.spec.ts | 28 +++++++++++++------ src/json-type-value/ObjectValue.ts | 2 +- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts index cf0623d132..9cb3e3460c 100644 --- a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts +++ b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts @@ -1,7 +1,7 @@ import {Model} from '../../../json-crdt/model'; import {buildE2eClient} from '../../../reactive-rpc/common/testing/buildE2eClient'; import {createCaller} from '../../../server/routes/index'; -import {RemoteHistoryDemoServer} from '../RemoteHistoryServer'; +import {RemoteHistoryDemoServer} from '../RemoteHistoryDemoServer'; const setup = () => { const {caller, router} = createCaller(); @@ -9,18 +9,28 @@ const setup = () => { const remote = new RemoteHistoryDemoServer(client); return { - caller, router, + caller, client, remote, }; }; -test('...', async () => { - const {remote} = await setup(); - const model = Model.withLogicalClock(); - model.api.root({foo: 'bar'}); - const patch = model.api.flush(); - - await remote.create('1234567890', [patch]); +let cnt = 0; +const genId = () => Math.random().toString(36).slice(2) + '-' + Date.now().toString(36) + '-' + cnt++; + +describe('.create()', () => { + test('can create a block with a simple patch', async () => { + const {remote, caller} = await setup(); + const model = Model.withLogicalClock(); + model.api.root({foo: 'bar'}); + const patch = model.api.flush(); + const blob = patch.toBinary(); + const id = genId(); + await remote.create(id, [{blob}]); + const {data} = await caller.call('blocks.get', {id}, {}); + // console.log(data.patches); + const model2 = Model.fromBinary(data.block.blob); + expect(model2.view()).toEqual({foo: 'bar'}); + }); }); diff --git a/src/json-type-value/ObjectValue.ts b/src/json-type-value/ObjectValue.ts index 85127538b5..795b74e936 100644 --- a/src/json-type-value/ObjectValue.ts +++ b/src/json-type-value/ObjectValue.ts @@ -4,7 +4,7 @@ import {TypeSystem} from '../json-type/system/TypeSystem'; import type {ResolveType} from '../json-type'; import type * as classes from '../json-type/type'; import type * as ts from '../json-type/typescript/types'; -import {TypeBuilder} from '../json-type/type/TypeBuilder'; +import type {TypeBuilder} from '../json-type/type/TypeBuilder'; export type UnObjectType = T extends classes.ObjectType ? U : never; export type UnObjectValue = T extends ObjectValue ? U : never; From 5be2b24cb624ca9a20d12f9318097cf7d29de908 Mon Sep 17 00:00:00 2001 From: streamich Date: Thu, 11 Apr 2024 01:52:30 +0200 Subject: [PATCH 11/23] =?UTF-8?q?refactor(reactive-rpc):=20=F0=9F=92=A1=20?= =?UTF-8?q?update=20block=20RPC=20command=20names?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../remote/RemoteHistoryDemoServer.ts | 10 +-- .../__tests__/RemoteHistoryServer.spec.ts | 2 +- .../{blocks.spec.ts => block.spec.ts} | 90 +++++++++---------- src/server/routes/{blocks => block}/index.ts | 18 ++-- .../remove.ts => block/methods/del.ts} | 5 +- .../routes/{blocks => block}/methods/get.ts | 2 +- .../{blocks => block}/methods/listen.ts | 2 +- .../create.ts => block/methods/new.ts} | 5 +- .../history.ts => block/methods/scan.ts} | 5 +- .../methods/edit.ts => block/methods/upd.ts} | 5 +- src/server/routes/{blocks => block}/schema.ts | 0 src/server/routes/routes.ts | 4 +- 12 files changed, 72 insertions(+), 76 deletions(-) rename src/server/__tests__/{blocks.spec.ts => block.spec.ts} (83%) rename src/server/routes/{blocks => block}/index.ts (68%) rename src/server/routes/{blocks/methods/remove.ts => block/methods/del.ts} (85%) rename src/server/routes/{blocks => block}/methods/get.ts (94%) rename src/server/routes/{blocks => block}/methods/listen.ts (96%) rename src/server/routes/{blocks/methods/create.ts => block/methods/new.ts} (87%) rename src/server/routes/{blocks/methods/history.ts => block/methods/scan.ts} (90%) rename src/server/routes/{blocks/methods/edit.ts => block/methods/upd.ts} (93%) rename src/server/routes/{blocks => block}/schema.ts (100%) diff --git a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts index 94e6586c27..57314b18c6 100644 --- a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts +++ b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts @@ -20,7 +20,7 @@ export class RemoteHistoryDemoServer implements RemoteHistory) {} public async create(id: string, patches: RemotePatch[]): Promise { - await this.client.call('blocks.create', { + await this.client.call('block.new', { id, patches: patches.map((patch, seq) => ({ // TODO: seq and created should be set on server. (And returned back?) @@ -36,7 +36,7 @@ export class RemoteHistoryDemoServer implements RemoteHistory { - const {block, patches} = await this.client.call('blocks.get', {id}); + const {block, patches} = await this.client.call('block.get', {id}); return { cursor: block.seq, model: block, @@ -46,7 +46,7 @@ export class RemoteHistoryDemoServer implements RemoteHistory { const limit = 100; - const res = await this.client.call('blocks.history', { + const res = await this.client.call('block.scan', { id, min: cursor, max: cursor + limit, @@ -69,7 +69,7 @@ export class RemoteHistoryDemoServer implements RemoteHistory { - const res = await this.client.call('blocks.edit', { + const res = await this.client.call('block.upd', { id, patches: patches.map((patch, seq) => ({ seq, @@ -84,7 +84,7 @@ export class RemoteHistoryDemoServer implements RemoteHistory { - await this.client.call('blocks.remove', {id}); + await this.client.call('block.del', {id}); } /** diff --git a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts index 9cb3e3460c..07c065d4aa 100644 --- a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts +++ b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts @@ -28,7 +28,7 @@ describe('.create()', () => { const blob = patch.toBinary(); const id = genId(); await remote.create(id, [{blob}]); - const {data} = await caller.call('blocks.get', {id}, {}); + const {data} = await caller.call('block.get', {id}, {}); // console.log(data.patches); const model2 = Model.fromBinary(data.block.blob); expect(model2.view()).toEqual({foo: 'bar'}); diff --git a/src/server/__tests__/blocks.spec.ts b/src/server/__tests__/block.spec.ts similarity index 83% rename from src/server/__tests__/blocks.spec.ts rename to src/server/__tests__/block.spec.ts index a000e4639e..08f3eb992c 100644 --- a/src/server/__tests__/blocks.spec.ts +++ b/src/server/__tests__/block.spec.ts @@ -3,12 +3,12 @@ import {RpcErrorCodes} from '../../reactive-rpc/common/rpc/caller'; import {setup} from './setup'; import {tick, until} from '../../__tests__/util'; -describe('blocks.*', () => { - describe('blocks.create', () => { +describe('block.*', () => { + describe('block.new', () => { test('can create an empty block', async () => { const {call} = setup(); - await call('blocks.create', {id: 'my-block', patches: []}); - const {block} = await call('blocks.get', {id: 'my-block'}); + await call('block.new', {id: 'my-block', patches: []}); + const {block} = await call('block.get', {id: 'my-block'}); expect(block).toMatchObject({ id: 'my-block', seq: -1, @@ -32,7 +32,7 @@ describe('blocks.*', () => { age: 26, }); const patch2 = model.api.flush(); - await call('blocks.create', { + await call('block.new', { id: '123412341234', patches: [ { @@ -47,7 +47,7 @@ describe('blocks.*', () => { }, ], }); - const {block} = await call('blocks.get', {id: '123412341234'}); + const {block} = await call('block.get', {id: '123412341234'}); expect(block).toMatchObject({ id: '123412341234', seq: 1, @@ -63,15 +63,15 @@ describe('blocks.*', () => { }); }); - describe('blocks.remove', () => { + describe('block.remove', () => { test('can remove an existing block', async () => { const {call} = setup(); - await call('blocks.create', {id: 'my-block', patches: []}); - const {block} = await call('blocks.get', {id: 'my-block'}); + await call('block.new', {id: 'my-block', patches: []}); + const {block} = await call('block.get', {id: 'my-block'}); expect(block.id).toBe('my-block'); - await call('blocks.remove', {id: 'my-block'}); + await call('block.del', {id: 'my-block'}); try { - await call('blocks.get', {id: 'my-block'}); + await call('block.get', {id: 'my-block'}); throw new Error('not this error'); } catch (err: any) { expect(err.errno).toBe(RpcErrorCodes.NOT_FOUND); @@ -79,7 +79,7 @@ describe('blocks.*', () => { }); }); - describe('blocks.edit', () => { + describe('block.upd', () => { test('can edit a document sequentially', async () => { const {call} = setup(); const id = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'; @@ -88,7 +88,7 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await call('blocks.create', { + await call('block.new', { id, patches: [ { @@ -102,7 +102,7 @@ describe('blocks.*', () => { const patch2 = model.api.flush(); model.api.str(['text']).ins(5, ' World'); const patch3 = model.api.flush(); - await call('blocks.edit', { + await call('block.upd', { id, patches: [ { @@ -117,7 +117,7 @@ describe('blocks.*', () => { }, ], }); - const block2 = await call('blocks.get', {id}); + const block2 = await call('block.get', {id}); expect(Model.fromBinary(block2.block.blob).view()).toStrictEqual({ text: 'Hello World', }); @@ -125,7 +125,7 @@ describe('blocks.*', () => { const patch4 = model.api.flush(); model.api.str(['text']).ins(12, '!'); const patch5 = model.api.flush(); - await call('blocks.edit', { + await call('block.upd', { id, patches: [ { @@ -140,7 +140,7 @@ describe('blocks.*', () => { }, ], }); - const block3 = await call('blocks.get', {id}); + const block3 = await call('block.get', {id}); expect(Model.fromBinary(block3.block.blob).view()).toStrictEqual({ text: 'Hello, World!', }); @@ -156,7 +156,7 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await call('blocks.create', { + await call('block.new', { id, patches: [ { @@ -168,11 +168,11 @@ describe('blocks.*', () => { }); // User 2 - const block2 = await call('blocks.get', {id}); + const block2 = await call('block.get', {id}); const model2 = Model.fromBinary(block2.block.blob).fork(); model2.api.str(['text']).ins(4, ' yeah!'); const patch2User2 = model2.api.flush(); - await call('blocks.edit', { + await call('block.upd', { id, patches: [ { @@ -184,7 +184,7 @@ describe('blocks.*', () => { }); expect(model2.view()).toStrictEqual({text: 'Hell yeah!'}); - const block3 = await call('blocks.get', {id}); + const block3 = await call('block.get', {id}); const model3 = Model.fromBinary(block3.block.blob).fork(); expect(model3.view()).toStrictEqual({text: 'Hell yeah!'}); @@ -193,7 +193,7 @@ describe('blocks.*', () => { const patch2 = model.api.flush(); model.api.str(['text']).ins(5, ' World'); const patch3 = model.api.flush(); - const {patches} = await call('blocks.edit', { + const {patches} = await call('block.upd', { id, patches: [ { @@ -209,7 +209,7 @@ describe('blocks.*', () => { ], }); - const block4 = await call('blocks.get', {id}); + const block4 = await call('block.get', {id}); const model4 = Model.fromBinary(block4.block.blob).fork(); expect(model4.view()).not.toStrictEqual({text: 'Hell yeah!'}); }); @@ -224,7 +224,7 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await call('blocks.create', { + await call('block.new', { id, patches: [ { @@ -236,11 +236,11 @@ describe('blocks.*', () => { }); // User 2 - const block2 = await call('blocks.get', {id}); + const block2 = await call('block.get', {id}); const model2 = Model.fromBinary(block2.block.blob).fork(); model2.api.str(['text']).ins(4, ' yeah!'); const patch2User2 = model2.api.flush(); - await call('blocks.edit', { + await call('block.upd', { id, patches: [ { @@ -256,7 +256,7 @@ describe('blocks.*', () => { const patch2 = model.api.flush(); model.api.str(['text']).ins(5, ' World'); const patch3 = model.api.flush(); - const {patches} = await call('blocks.edit', { + const {patches} = await call('block.upd', { id, patches: [ { @@ -280,13 +280,13 @@ describe('blocks.*', () => { }); }); - describe('blocks.listen', () => { + describe('block.listen', () => { test('can listen for block changes', async () => { const {client} = setup(); - await client.call('blocks.create', {id: 'my-block', patches: []}); + await client.call('block.new', {id: 'my-block', patches: []}); await tick(11); const emits: any[] = []; - client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); + client.call$('block.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -294,7 +294,7 @@ describe('blocks.*', () => { const patch1 = model.api.flush(); await tick(12); expect(emits.length).toBe(0); - await client.call('blocks.edit', { + await client.call('block.upd', { id: 'my-block', patches: [{seq: 0, created: Date.now(), blob: patch1.toBinary()}], }); @@ -308,7 +308,7 @@ describe('blocks.*', () => { const patch2 = model.api.flush(); await tick(12); expect(emits.length).toBe(1); - await client.call('blocks.edit', { + await client.call('block.upd', { id: 'my-block', patches: [{seq: 1, created: Date.now(), blob: patch2.toBinary()}], }); @@ -321,7 +321,7 @@ describe('blocks.*', () => { test('can subscribe before block is created', async () => { const {client} = setup(); const emits: any[] = []; - client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); + client.call$('block.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); const model = Model.withLogicalClock(); model.api.root({ text: 'Hell', @@ -329,7 +329,7 @@ describe('blocks.*', () => { const patch1 = model.api.flush(); await tick(12); expect(emits.length).toBe(0); - await client.call('blocks.create', { + await client.call('block.new', { id: 'my-block', patches: [ { @@ -349,18 +349,18 @@ describe('blocks.*', () => { test('can receive deletion events', async () => { const {client} = setup(); const emits: any[] = []; - client.call$('blocks.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); - await client.call('blocks.create', {id: 'my-block', patches: []}); + client.call$('block.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); + await client.call('block.new', {id: 'my-block', patches: []}); await until(() => emits.length === 1); expect(emits[0].block.seq).toBe(-1); await tick(3); - await client.call('blocks.remove', {id: 'my-block'}); + await client.call('block.del', {id: 'my-block'}); await until(() => emits.length === 2); expect(emits[1].deleted).toBe(true); }); }); - describe('blocks.history', () => { + describe('block.history', () => { test('can retrieve change history', async () => { const {client} = setup(); const model = Model.withLogicalClock(); @@ -368,7 +368,7 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await client.call('blocks.create', { + await client.call('block.new', { id: 'my-block', patches: [ { @@ -385,7 +385,7 @@ describe('blocks.*', () => { age: 26, }); const patch3 = model.api.flush(); - await client.call('blocks.edit', { + await client.call('block.upd', { id: 'my-block', patches: [ { @@ -400,7 +400,7 @@ describe('blocks.*', () => { }, ], }); - const history = await client.call('blocks.history', {id: 'my-block', min: 0, max: 2}); + const history = await client.call('block.scan', {id: 'my-block', min: 0, max: 2}); expect(history).toMatchObject({ patches: [ { @@ -423,7 +423,7 @@ describe('blocks.*', () => { }); }); - describe('blocks.get', () => { + describe('block.get', () => { test('returns whole history when block is loaded', async () => { const {client} = setup(); const model = Model.withLogicalClock(); @@ -431,7 +431,7 @@ describe('blocks.*', () => { text: 'Hell', }); const patch1 = model.api.flush(); - await client.call('blocks.create', { + await client.call('block.new', { id: 'my-block', patches: [ { @@ -447,7 +447,7 @@ describe('blocks.*', () => { age: 26, }); const patch3 = model.api.flush(); - await client.call('blocks.edit', { + await client.call('block.upd', { id: 'my-block', patches: [ { @@ -462,7 +462,7 @@ describe('blocks.*', () => { }, ], }); - const result = await client.call('blocks.get', {id: 'my-block'}); + const result = await client.call('block.get', {id: 'my-block'}); expect(result).toMatchObject({ block: expect.any(Object), patches: [ diff --git a/src/server/routes/blocks/index.ts b/src/server/routes/block/index.ts similarity index 68% rename from src/server/routes/blocks/index.ts rename to src/server/routes/block/index.ts index b219a7ce78..9306885880 100644 --- a/src/server/routes/blocks/index.ts +++ b/src/server/routes/block/index.ts @@ -1,13 +1,13 @@ -import {create} from './methods/create'; +import {new_} from './methods/new'; import {get} from './methods/get'; -import {remove} from './methods/remove'; -import {edit} from './methods/edit'; +import {upd} from './methods/upd'; +import {del} from './methods/del'; +import {scan} from './methods/scan'; import {listen} from './methods/listen'; import {Block, BlockId, BlockPatch, BlockSeq} from './schema'; -import {history} from './methods/history'; import type {RouteDeps, Router, RouterBase} from '../types'; -export const blocks = +export const block = (d: RouteDeps) => (r: Router) => { const {system} = d; @@ -19,11 +19,11 @@ export const blocks = // prettier-ignore return ( - ( create(d) + ( new_(d) ( get(d) - ( remove(d) - ( edit(d) + ( upd(d) + ( del(d) ( listen(d) - ( history(d) + ( scan(d) ( r )))))))); }; diff --git a/src/server/routes/blocks/methods/remove.ts b/src/server/routes/block/methods/del.ts similarity index 85% rename from src/server/routes/blocks/methods/remove.ts rename to src/server/routes/block/methods/del.ts index e6a7cda856..f3e28617c8 100644 --- a/src/server/routes/blocks/methods/remove.ts +++ b/src/server/routes/block/methods/del.ts @@ -1,8 +1,7 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId} from '../schema'; -// TODO: rename to "del". -export const remove = +export const del = ({t, services}: RouteDeps) => (r: Router) => { const Request = t.Object( @@ -20,7 +19,7 @@ export const remove = description: 'Fetches a block by ID.', }); - return r.prop('blocks.remove', Func, async ({id}) => { + return r.prop('block.del', Func, async ({id}) => { await services.blocks.remove(id); return {}; }); diff --git a/src/server/routes/blocks/methods/get.ts b/src/server/routes/block/methods/get.ts similarity index 94% rename from src/server/routes/blocks/methods/get.ts rename to src/server/routes/block/methods/get.ts index ccd496774f..2da94b63a0 100644 --- a/src/server/routes/blocks/methods/get.ts +++ b/src/server/routes/block/methods/get.ts @@ -26,7 +26,7 @@ export const get = description: 'Fetches a block by ID.', }); - return r.prop('blocks.get', Func, async ({id}) => { + return r.prop('block.get', Func, async ({id}) => { const {block, patches} = await services.blocks.get(id); return { block, diff --git a/src/server/routes/blocks/methods/listen.ts b/src/server/routes/block/methods/listen.ts similarity index 96% rename from src/server/routes/blocks/methods/listen.ts rename to src/server/routes/block/methods/listen.ts index f9272228af..7d6f9b1ce7 100644 --- a/src/server/routes/blocks/methods/listen.ts +++ b/src/server/routes/block/methods/listen.ts @@ -34,7 +34,7 @@ export const listen = description: 'Subscribe to a block to receive updates when it changes.', }); - return r.prop('blocks.listen', Func, (req$) => { + return r.prop('block.listen', Func, (req$) => { return req$.pipe(switchMap(({id}) => services.pubsub.listen$(`__block:${id}`))) as any; }); }; diff --git a/src/server/routes/blocks/methods/create.ts b/src/server/routes/block/methods/new.ts similarity index 87% rename from src/server/routes/blocks/methods/create.ts rename to src/server/routes/block/methods/new.ts index 303dbf153a..e0ac20641b 100644 --- a/src/server/routes/blocks/methods/create.ts +++ b/src/server/routes/block/methods/new.ts @@ -1,8 +1,7 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId, BlockPatch} from '../schema'; -// TODO: Rename to "new", like "block.new"? -export const create = +export const new_ = ({t, services}: RouteDeps) => (r: Router) => { const Request = t.Object( @@ -24,7 +23,7 @@ export const create = description: 'Creates a new block or applies patches to it.', }); - return r.prop('blocks.create', Func, async ({id, patches}) => { + return r.prop('block.new', Func, async ({id, patches}) => { const {block} = await services.blocks.create(id, patches); return {}; }); diff --git a/src/server/routes/blocks/methods/history.ts b/src/server/routes/block/methods/scan.ts similarity index 90% rename from src/server/routes/blocks/methods/history.ts rename to src/server/routes/block/methods/scan.ts index cdab447288..5af677180b 100644 --- a/src/server/routes/blocks/methods/history.ts +++ b/src/server/routes/block/methods/scan.ts @@ -1,8 +1,7 @@ import type {BlockPatch, BlockId} from '../schema'; import type {RouteDeps, Router, RouterBase} from '../../types'; -// TODO: Rename to "scan". -export const history = +export const scan = ({t, services}: RouteDeps) => (r: Router) => { const Request = t.Object( @@ -33,7 +32,7 @@ export const history = description: 'Returns a list of specified change patches for a block.', }); - return r.prop('blocks.history', Func, async ({id, min, max}) => { + return r.prop('block.scan', Func, async ({id, min, max}) => { const {patches} = await services.blocks.history(id, min, max); return {patches}; }); diff --git a/src/server/routes/blocks/methods/edit.ts b/src/server/routes/block/methods/upd.ts similarity index 93% rename from src/server/routes/blocks/methods/edit.ts rename to src/server/routes/block/methods/upd.ts index c4c938a325..987854f5f5 100644 --- a/src/server/routes/blocks/methods/edit.ts +++ b/src/server/routes/block/methods/upd.ts @@ -1,8 +1,7 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId, BlockPatch} from '../schema'; -// TODO: Rename to "set"? -export const edit = +export const upd = ({t, services}: RouteDeps) => (r: Router) => { const PatchType = t.Ref('BlockPatch'); @@ -39,7 +38,7 @@ export const edit = description: 'Applies patches to an existing document and returns the latest concurrent changes.', }); - return r.prop('blocks.edit', Func, async ({id, patches}) => { + return r.prop('block.upd', Func, async ({id, patches}) => { const res = await services.blocks.edit(id, patches); return { patches: res.patches, diff --git a/src/server/routes/blocks/schema.ts b/src/server/routes/block/schema.ts similarity index 100% rename from src/server/routes/blocks/schema.ts rename to src/server/routes/block/schema.ts diff --git a/src/server/routes/routes.ts b/src/server/routes/routes.ts index 5f2fc47fe8..2d5733147c 100644 --- a/src/server/routes/routes.ts +++ b/src/server/routes/routes.ts @@ -1,7 +1,7 @@ import {util} from './util'; import {pubsub} from './pubsub'; import {presence} from './presence'; -import {blocks} from './blocks'; +import {block} from './block'; import type {RouteDeps} from './types'; import type {ObjectValue} from '../../json-type-value/ObjectValue'; import type {ObjectType} from '../../json-type'; @@ -12,5 +12,5 @@ export const routes = (d: RouteDeps) => >(r: ObjectVal ( pubsub(d) ( presence(d) // TODO: rename "blocks" to "block", in all methods. - ( blocks(d) + ( block(d) ( r ))))); From be282c41219be77fd48105080236f5f3edd37b5b Mon Sep 17 00:00:00 2001 From: Vadim Dalecky Date: Thu, 11 Apr 2024 10:42:35 +0200 Subject: [PATCH 12/23] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20impr?= =?UTF-8?q?ove=20block=20store=20interface,=20return=20infor=20about=20rem?= =?UTF-8?q?oval?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/services/blocks/MemoryStore.ts | 8 ++-- src/server/services/blocks/types.ts | 48 ++++++++++++++++++++++- 2 files changed, 51 insertions(+), 5 deletions(-) diff --git a/src/server/services/blocks/MemoryStore.ts b/src/server/services/blocks/MemoryStore.ts index 85c060a7d4..f8acb6685d 100644 --- a/src/server/services/blocks/MemoryStore.ts +++ b/src/server/services/blocks/MemoryStore.ts @@ -70,14 +70,14 @@ export class MemoryStore implements types.Store { return patches.slice(min, max + 1); } - public async remove(id: string): Promise { + public async remove(id: string): Promise { await new Promise((resolve) => setImmediate(resolve)); - this.removeSync(id); + return this.removeSync(id); } - private removeSync(id: string): void { + private removeSync(id: string): boolean { this.blocks.delete(id); - this.patches.delete(id); + return this.patches.delete(id); } public stats(): {blocks: number; patches: number} { diff --git a/src/server/services/blocks/types.ts b/src/server/services/blocks/types.ts index d6932bed34..a7a0fa693a 100644 --- a/src/server/services/blocks/types.ts +++ b/src/server/services/blocks/types.ts @@ -13,11 +13,57 @@ export interface StorePatch { } export interface Store { + /** + * Create a new block. + * + * @param id Block ID. + * @param patches Initial patches to apply to a new block. + * @returns Newly created block data. + */ create(id: string, patches: StorePatch[]): Promise; + + /** + * Retrieve an existing block. + * + * @param id Block ID. + * @returns Block data, or `undefined` if the block does not exist. + */ get(id: string): Promise; + + /** + * Retrieve the sequence number of a block. + * + * @param id Block ID. + * @returns Block sequence number, or `undefined` if the block does not exist. + */ + seq(id: string): Promise; + + /** + * Edit an existing block by applying new patches. + * + * @param id Block ID. + * @param patches Patches to apply to the block. + * @returns Updated block data. + */ edit(id: string, patches: StorePatch[]): Promise; + + /** + * Retrieve the history of patches for a block. + * + * @param id Block ID. + * @param min Minimum sequence number. + * @param max Maximum sequence number. + * @returns List of patches. + */ history(id: string, min: number, max: number): Promise; - remove(id: string): Promise; + + /** + * Remove a block. + * + * @param id Block ID. + * @returns `true` if the block was removed, `false` if the block did not exist. + */ + remove(id: string): Promise; } export interface StoreGetResult { From e62b5efa38e8fb44be2ce8a297f0c45d8ee82e30 Mon Sep 17 00:00:00 2001 From: Vadim Dalecky Date: Thu, 11 Apr 2024 11:02:10 +0200 Subject: [PATCH 13/23] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20impr?= =?UTF-8?q?ove=20scan=20method?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/rpc/caller/error/RpcError.ts | 4 +-- src/server/routes/block/methods/scan.ts | 27 ++++++++++++------- src/server/services/blocks/BlocksServices.ts | 21 +++++++++++++-- src/server/services/blocks/MemoryStore.ts | 19 ++++++++----- 4 files changed, 52 insertions(+), 19 deletions(-) diff --git a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts index b272370634..3941d4e3ad 100644 --- a/src/reactive-rpc/common/rpc/caller/error/RpcError.ts +++ b/src/reactive-rpc/common/rpc/caller/error/RpcError.ts @@ -53,8 +53,8 @@ export class RpcError extends Error implements IRpcError { return RpcError.fromCode(RpcErrorCodes.INTERNAL_ERROR, message, undefined, originalError); } - public static badRequest(): RpcError { - return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, 'Bad Request'); + public static badRequest(message = 'Bad Request'): RpcError { + return RpcError.fromCode(RpcErrorCodes.BAD_REQUEST, message); } public static validation(message: string, meta?: unknown): RpcError { diff --git a/src/server/routes/block/methods/scan.ts b/src/server/routes/block/methods/scan.ts index 5af677180b..c051401c88 100644 --- a/src/server/routes/block/methods/scan.ts +++ b/src/server/routes/block/methods/scan.ts @@ -9,14 +9,22 @@ export const scan = title: 'Block ID', description: 'The ID of the block.', }), - t.prop('max', t.num.options({format: 'u32'})).options({ - title: 'Max', - description: 'The maximum sequence number to return.', + t.prop('seq', t.num.options({format: 'u32'})).options({ + title: 'Starting Sequence Number', + description: 'The sequence number to start from. Defaults to the latest sequence number.', }), - t.prop('min', t.num.options({format: 'u32'})).options({ - title: 'Min', - description: 'The minimum sequence number to return.', + t.prop('len', t.num.options({format: 'u32'})).options({ + title: 'Number of Patches', + description: 'The minimum number of patches to return. Defaults to 10. ' + + 'When positive, returns the patches ahead of the starting sequence number. ' + + 'When negative, returns the patches behind the starting sequence number.', }), + t.prop('model', t.bool) + .options({ + title: 'With Model', + description: 'Whether to include the model in the response. ' + + 'Defaults to `false`, when `len` is positive; and, defaults to `true`, when `len` is negative.', + }), ); const Response = t.Object( @@ -32,8 +40,9 @@ export const scan = description: 'Returns a list of specified change patches for a block.', }); - return r.prop('block.scan', Func, async ({id, min, max}) => { - const {patches} = await services.blocks.history(id, min, max); - return {patches}; + return r.prop('block.scan', Func, async ({id, seq, len, model}) => { + // const {patches} = await services.blocks.history(id, min, max); + // return {patches}; + throw new Error('Not implemented'); }); }; diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 22f139dc03..7419fb003c 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -39,9 +39,12 @@ export class BlocksServices { if (!result) throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND); const patches = await store.history(id, 0, result.block.seq); const {block} = result; + // TODO: should not return `patches`, only the "tip". return {block, patches}; } + public async getSeq(id: string, seq: number) {} + public async remove(id: string) { await this.store.remove(id); this.services.pubsub.publish(`__block:${id}`, {deleted: true}).catch((error) => { @@ -50,10 +53,24 @@ export class BlocksServices { }); } - public async history(id: string, min: number, max: number) { + public async scan(id: string, offset: number | undefined, limit: number | undefined = 10) { const {store} = this; + if (typeof offset !== 'number') offset = await store.seq(id); + let min: number = 0, max: number = 0; + if (!limit || (Math.round(limit) !== limit)) throw RpcError.badRequest('INVALID_LIMIT'); + if (limit > 0) { + min = Number(offset) || 0; + max = min + limit; + } else { + max = Number(offset) || 0; + min = max - limit; + } + if (min < 0) { + min = 0; + max = Math.abs(limit); + } const patches = await store.history(id, min, max); - return {patches}; + // return {patches}; } public async edit(id: string, patches: StorePatch[]) { diff --git a/src/server/services/blocks/MemoryStore.ts b/src/server/services/blocks/MemoryStore.ts index f8acb6685d..af2d67fba9 100644 --- a/src/server/services/blocks/MemoryStore.ts +++ b/src/server/services/blocks/MemoryStore.ts @@ -2,19 +2,26 @@ import {Model} from '../../../json-crdt'; import {Patch} from '../../../json-crdt-patch'; import type * as types from './types'; +const tick = new Promise((resolve) => setImmediate(resolve)); + export class MemoryStore implements types.Store { protected readonly blocks = new Map(); protected readonly patches = new Map(); public async get(id: string): Promise { - await new Promise((resolve) => setImmediate(resolve)); + await tick; const block = this.blocks.get(id); if (!block) return; return {block}; } + public async seq(id: string): Promise { + await tick; + return this.blocks.get(id)?.seq; + } + public async create(id: string, patches: types.StorePatch[]): Promise { - await new Promise((resolve) => setImmediate(resolve)); + await tick; if (!Array.isArray(patches)) throw new Error('NO_PATCHES'); if (this.blocks.has(id)) throw new Error('BLOCK_EXISTS'); const model = Model.withLogicalClock(); @@ -41,7 +48,7 @@ export class MemoryStore implements types.Store { } public async edit(id: string, patches: types.StorePatch[]): Promise { - await new Promise((resolve) => setImmediate(resolve)); + await tick; if (!Array.isArray(patches) || !patches.length) throw new Error('NO_PATCHES'); const block = this.blocks.get(id); const existingPatches = this.patches.get(id); @@ -64,14 +71,14 @@ export class MemoryStore implements types.Store { } public async history(id: string, min: number, max: number): Promise { - await new Promise((resolve) => setImmediate(resolve)); + await tick; const patches = this.patches.get(id); if (!patches) return []; return patches.slice(min, max + 1); } public async remove(id: string): Promise { - await new Promise((resolve) => setImmediate(resolve)); + await tick; return this.removeSync(id); } @@ -88,7 +95,7 @@ export class MemoryStore implements types.Store { } public async removeOlderThan(ts: number): Promise { - await new Promise((resolve) => setImmediate(resolve)); + await tick; for (const [id, block] of this.blocks) if (block.created < ts) this.removeSync(id); } } From a43f396dc1d05e3efc4bb8b18507c7c48ce4e8d2 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 13 Apr 2024 12:23:43 +0200 Subject: [PATCH 14/23] =?UTF-8?q?feat(json-crdt):=20=F0=9F=8E=B8=20allow?= =?UTF-8?q?=20to=20construct=20a=20model=20from=20collection=20of=20patche?= =?UTF-8?q?s?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/model/Model.ts | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/src/json-crdt/model/Model.ts b/src/json-crdt/model/Model.ts index 6dbcd1872d..49409d3099 100644 --- a/src/json-crdt/model/Model.ts +++ b/src/json-crdt/model/Model.ts @@ -65,6 +65,25 @@ export class Model> implements Printable { return decoder.decode(data); } + /** + * Instantiates a model from a collection of patches. The patches are applied + * to the model in the order they are provided. The session ID of the model is + * set to the session ID of the first patch. + * + * @param patches A collection of initial patches to apply to the model. + * @returns A model with the patches applied. + */ + public static fromPatches(patches: Patch[]): Model { + const length = patches.length; + if (!length) throw new Error('NO_PATCHES'); + const first = patches[0]; + const sid = first.getId()!.sid; + if (!sid) throw new Error('NO_SID'); + const model = Model.withLogicalClock(sid); + model.applyBatch(patches); + return model; + } + /** * Root of the JSON document is implemented as Last Write Wins Register, * so that the JSON document does not necessarily need to be an object. The From ecd3a68609eec93db6a7794e78419c1345f4241d Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 13 Apr 2024 12:24:45 +0200 Subject: [PATCH 15/23] =?UTF-8?q?refactor(reactive-rpc):=20=F0=9F=92=A1=20?= =?UTF-8?q?update=20blocks=20scan=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/block.spec.ts | 4 ++-- src/server/routes/block/methods/scan.ts | 15 ++++++++------- src/server/services/blocks/BlocksServices.ts | 19 ++++++++++++++++--- src/server/services/blocks/MemoryStore.ts | 2 ++ 4 files changed, 28 insertions(+), 12 deletions(-) diff --git a/src/server/__tests__/block.spec.ts b/src/server/__tests__/block.spec.ts index 08f3eb992c..7e9bbc1d91 100644 --- a/src/server/__tests__/block.spec.ts +++ b/src/server/__tests__/block.spec.ts @@ -360,7 +360,7 @@ describe('block.*', () => { }); }); - describe('block.history', () => { + describe('block.scan', () => { test('can retrieve change history', async () => { const {client} = setup(); const model = Model.withLogicalClock(); @@ -400,7 +400,7 @@ describe('block.*', () => { }, ], }); - const history = await client.call('block.scan', {id: 'my-block', min: 0, max: 2}); + const history = await client.call('block.scan', {id: 'my-block', seq: 0, limit: 3}); expect(history).toMatchObject({ patches: [ { diff --git a/src/server/routes/block/methods/scan.ts b/src/server/routes/block/methods/scan.ts index c051401c88..d437cc6952 100644 --- a/src/server/routes/block/methods/scan.ts +++ b/src/server/routes/block/methods/scan.ts @@ -9,17 +9,17 @@ export const scan = title: 'Block ID', description: 'The ID of the block.', }), - t.prop('seq', t.num.options({format: 'u32'})).options({ + t.propOpt('seq', t.num.options({format: 'u32'})).options({ title: 'Starting Sequence Number', description: 'The sequence number to start from. Defaults to the latest sequence number.', }), - t.prop('len', t.num.options({format: 'u32'})).options({ + t.propOpt('limit', t.num.options({format: 'u32'})).options({ title: 'Number of Patches', description: 'The minimum number of patches to return. Defaults to 10. ' + 'When positive, returns the patches ahead of the starting sequence number. ' + 'When negative, returns the patches behind the starting sequence number.', }), - t.prop('model', t.bool) + t.propOpt('model', t.bool) .options({ title: 'With Model', description: 'Whether to include the model in the response. ' + @@ -32,6 +32,7 @@ export const scan = title: 'Patches', description: 'The list of patches.', }), + t.propOpt('modelBlob', t.bin), ); const Func = t.Function(Request, Response).options({ @@ -40,9 +41,9 @@ export const scan = description: 'Returns a list of specified change patches for a block.', }); - return r.prop('block.scan', Func, async ({id, seq, len, model}) => { - // const {patches} = await services.blocks.history(id, min, max); - // return {patches}; - throw new Error('Not implemented'); + return r.prop('block.scan', Func, async ({id, seq, limit = 10, model: returnModel = limit > 0}) => { + const {patches, model} = await services.blocks.scan(id, seq, limit, returnModel); + const modelBlob: Uint8Array | undefined = model?.toBinary(); + return {patches, modelBlob}; }); }; diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 7419fb003c..18884f5796 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -2,6 +2,7 @@ import {MemoryStore} from './MemoryStore'; import {StorePatch} from './types'; import {RpcError, RpcErrorCodes} from '../../../reactive-rpc/common/rpc/caller'; import type {Services} from '../Services'; +import {Model, Patch} from '../../../json-crdt'; const BLOCK_TTL = 1000 * 60 * 60; // 1 hour @@ -43,7 +44,12 @@ export class BlocksServices { return {block, patches}; } - public async getSeq(id: string, seq: number) {} + public async getAtSeq(id: string, seq: number) { + const {store} = this; + const patches = await store.history(id, 0, seq); + const model = Model.fromPatches(patches.map(p => Patch.fromBinary(p.blob))); + return model; + } public async remove(id: string) { await this.store.remove(id); @@ -53,7 +59,7 @@ export class BlocksServices { }); } - public async scan(id: string, offset: number | undefined, limit: number | undefined = 10) { + public async scan(id: string, offset: number | undefined, limit: number | undefined = 10, returnStartModel: boolean = limit < 0) { const {store} = this; if (typeof offset !== 'number') offset = await store.seq(id); let min: number = 0, max: number = 0; @@ -70,7 +76,14 @@ export class BlocksServices { max = Math.abs(limit); } const patches = await store.history(id, min, max); - // return {patches}; + let model: Model | undefined; + if (returnStartModel) { + const startPatches = await store.history(id, 0, min); + if (startPatches.length) { + model = Model.fromPatches(startPatches.map(p => Patch.fromBinary(p.blob))); + } + } + return {patches, model}; } public async edit(id: string, patches: StorePatch[]) { diff --git a/src/server/services/blocks/MemoryStore.ts b/src/server/services/blocks/MemoryStore.ts index af2d67fba9..69d10362cd 100644 --- a/src/server/services/blocks/MemoryStore.ts +++ b/src/server/services/blocks/MemoryStore.ts @@ -98,4 +98,6 @@ export class MemoryStore implements types.Store { await tick; for (const [id, block] of this.blocks) if (block.created < ts) this.removeSync(id); } + + // TODO: GC by update timestamp instead } From 11d958caa2e9fe0cc78189fe540619c22a8ec2a4 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 13 Apr 2024 12:37:09 +0200 Subject: [PATCH 16/23] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20GC?= =?UTF-8?q?=20old=20blocks=20based=20on=20update=20time?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/services/blocks/BlocksServices.ts | 4 ++-- src/server/services/blocks/MemoryStore.ts | 5 ++++- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 18884f5796..46ae8be878 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -1,8 +1,8 @@ import {MemoryStore} from './MemoryStore'; import {StorePatch} from './types'; import {RpcError, RpcErrorCodes} from '../../../reactive-rpc/common/rpc/caller'; -import type {Services} from '../Services'; import {Model, Patch} from '../../../json-crdt'; +import type {Services} from '../Services'; const BLOCK_TTL = 1000 * 60 * 60; // 1 hour @@ -125,6 +125,6 @@ export class BlocksServices { private async gc(): Promise { const ts = Date.now() - BLOCK_TTL; const {store} = this; - await store.removeOlderThan(ts); + await store.removeAccessedBefore(ts); } } diff --git a/src/server/services/blocks/MemoryStore.ts b/src/server/services/blocks/MemoryStore.ts index 69d10362cd..48e87f42b5 100644 --- a/src/server/services/blocks/MemoryStore.ts +++ b/src/server/services/blocks/MemoryStore.ts @@ -99,5 +99,8 @@ export class MemoryStore implements types.Store { for (const [id, block] of this.blocks) if (block.created < ts) this.removeSync(id); } - // TODO: GC by update timestamp instead + public async removeAccessedBefore(ts: number): Promise { + await tick; + for (const [id, block] of this.blocks) if (block.updated < ts) this.removeSync(id); + } } From ca4d16b8bb1533985b8f66bdf1e33a8d456be4b7 Mon Sep 17 00:00:00 2001 From: streamich Date: Sat, 13 Apr 2024 14:27:13 +0200 Subject: [PATCH 17/23] =?UTF-8?q?feat(json-type):=20=F0=9F=8E=B8=20add=20a?= =?UTF-8?q?bility=20to=20extend=20ObjectType?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-type/type/classes/ObjectType.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/json-type/type/classes/ObjectType.ts b/src/json-type/type/classes/ObjectType.ts index 38e22ece42..9029cc229b 100644 --- a/src/json-type/type/classes/ObjectType.ts +++ b/src/json-type/type/classes/ObjectType.ts @@ -136,6 +136,10 @@ export class ObjectType[] = ObjectFieldType< return this.fields.find((f) => f.key === key); } + public extend[]>(o: ObjectType): ObjectType<[...F, ...F2]> { + return new ObjectType([...this.fields, ...o.fields]); + } + public validateSchema(): void { const schema = this.getSchema(); validateTType(schema, 'obj'); From 1a561460a309e10539f91690737fb9d6dab0d36b Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 14 Apr 2024 11:00:54 +0200 Subject: [PATCH 18/23] =?UTF-8?q?refactor(reactive-rpc):=20=F0=9F=92=A1=20?= =?UTF-8?q?cleanup=20demo=20server?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/block.spec.ts | 93 +++++++++----------- src/server/routes/block/index.ts | 15 +++- src/server/routes/block/methods/get.ts | 7 +- src/server/routes/block/methods/listen.ts | 9 +- src/server/routes/block/methods/new.ts | 16 ++-- src/server/routes/block/schema.ts | 25 ++++-- src/server/services/blocks/BlocksServices.ts | 80 +++++++++++------ src/server/services/blocks/MemoryStore.ts | 47 +++------- src/server/services/blocks/types.ts | 8 +- 9 files changed, 162 insertions(+), 138 deletions(-) diff --git a/src/server/__tests__/block.spec.ts b/src/server/__tests__/block.spec.ts index 7e9bbc1d91..7a6724c8fe 100644 --- a/src/server/__tests__/block.spec.ts +++ b/src/server/__tests__/block.spec.ts @@ -8,16 +8,16 @@ describe('block.*', () => { test('can create an empty block', async () => { const {call} = setup(); await call('block.new', {id: 'my-block', patches: []}); - const {block} = await call('block.get', {id: 'my-block'}); - expect(block).toMatchObject({ + const {model} = await call('block.get', {id: 'my-block'}); + expect(model).toMatchObject({ id: 'my-block', seq: -1, blob: expect.any(Uint8Array), created: expect.any(Number), updated: expect.any(Number), }); - const model = Model.fromBinary(block.blob); - expect(model.view()).toBe(undefined); + const model2 = Model.fromBinary(model.blob); + expect(model2.view()).toBe(undefined); }); test('can create a block with value', async () => { @@ -36,26 +36,22 @@ describe('block.*', () => { id: '123412341234', patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, { - seq: 1, - created: Date.now(), blob: patch2.toBinary(), }, ], }); - const {block} = await call('block.get', {id: '123412341234'}); - expect(block).toMatchObject({ + const res = await call('block.get', {id: '123412341234'}); + expect(res.model).toMatchObject({ id: '123412341234', seq: 1, blob: expect.any(Uint8Array), created: expect.any(Number), updated: expect.any(Number), }); - const model2 = Model.fromBinary(block.blob); + const model2 = Model.fromBinary(res.model.blob); expect(model2.view()).toStrictEqual({ name: 'Super Woman', age: 26, @@ -67,8 +63,8 @@ describe('block.*', () => { test('can remove an existing block', async () => { const {call} = setup(); await call('block.new', {id: 'my-block', patches: []}); - const {block} = await call('block.get', {id: 'my-block'}); - expect(block.id).toBe('my-block'); + const {model} = await call('block.get', {id: 'my-block'}); + expect(model.id).toBe('my-block'); await call('block.del', {id: 'my-block'}); try { await call('block.get', {id: 'my-block'}); @@ -92,8 +88,6 @@ describe('block.*', () => { id, patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], @@ -118,7 +112,7 @@ describe('block.*', () => { ], }); const block2 = await call('block.get', {id}); - expect(Model.fromBinary(block2.block.blob).view()).toStrictEqual({ + expect(Model.fromBinary(block2.model.blob).view()).toStrictEqual({ text: 'Hello World', }); model.api.str(['text']).del(5, 1).ins(5, ', '); @@ -141,7 +135,7 @@ describe('block.*', () => { ], }); const block3 = await call('block.get', {id}); - expect(Model.fromBinary(block3.block.blob).view()).toStrictEqual({ + expect(Model.fromBinary(block3.model.blob).view()).toStrictEqual({ text: 'Hello, World!', }); }); @@ -160,8 +154,6 @@ describe('block.*', () => { id, patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], @@ -169,7 +161,7 @@ describe('block.*', () => { // User 2 const block2 = await call('block.get', {id}); - const model2 = Model.fromBinary(block2.block.blob).fork(); + const model2 = Model.fromBinary(block2.model.blob).fork(); model2.api.str(['text']).ins(4, ' yeah!'); const patch2User2 = model2.api.flush(); await call('block.upd', { @@ -185,7 +177,7 @@ describe('block.*', () => { expect(model2.view()).toStrictEqual({text: 'Hell yeah!'}); const block3 = await call('block.get', {id}); - const model3 = Model.fromBinary(block3.block.blob).fork(); + const model3 = Model.fromBinary(block3.model.blob).fork(); expect(model3.view()).toStrictEqual({text: 'Hell yeah!'}); // User 1 @@ -210,7 +202,7 @@ describe('block.*', () => { }); const block4 = await call('block.get', {id}); - const model4 = Model.fromBinary(block4.block.blob).fork(); + const model4 = Model.fromBinary(block4.model.blob).fork(); expect(model4.view()).not.toStrictEqual({text: 'Hell yeah!'}); }); @@ -228,8 +220,6 @@ describe('block.*', () => { id, patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], @@ -237,7 +227,7 @@ describe('block.*', () => { // User 2 const block2 = await call('block.get', {id}); - const model2 = Model.fromBinary(block2.block.blob).fork(); + const model2 = Model.fromBinary(block2.model.blob).fork(); model2.api.str(['text']).ins(4, ' yeah!'); const patch2User2 = model2.api.flush(); await call('block.upd', { @@ -333,8 +323,6 @@ describe('block.*', () => { id: 'my-block', patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], @@ -349,10 +337,13 @@ describe('block.*', () => { test('can receive deletion events', async () => { const {client} = setup(); const emits: any[] = []; - client.call$('block.listen', {id: 'my-block'}).subscribe((data) => emits.push(data)); + client.call$('block.listen', {id: 'my-block'}).subscribe((data) => { + console.log('data', data); + emits.push(data); + }); await client.call('block.new', {id: 'my-block', patches: []}); await until(() => emits.length === 1); - expect(emits[0].block.seq).toBe(-1); + expect(emits[0].model.seq).toBe(-1); await tick(3); await client.call('block.del', {id: 'my-block'}); await until(() => emits.length === 2); @@ -372,8 +363,6 @@ describe('block.*', () => { id: 'my-block', patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], @@ -435,8 +424,6 @@ describe('block.*', () => { id: 'my-block', patches: [ { - seq: 0, - created: Date.now(), blob: patch1.toBinary(), }, ], @@ -463,26 +450,26 @@ describe('block.*', () => { ], }); const result = await client.call('block.get', {id: 'my-block'}); - expect(result).toMatchObject({ - block: expect.any(Object), - patches: [ - { - seq: 0, - created: expect.any(Number), - blob: patch1.toBinary(), - }, - { - seq: 1, - created: expect.any(Number), - blob: patch2.toBinary(), - }, - { - seq: 2, - created: expect.any(Number), - blob: patch3.toBinary(), - }, - ], - }); + // expect(result).toMatchObject({ + // block: expect.any(Object), + // // patches: [ + // // { + // // seq: 0, + // // created: expect.any(Number), + // // blob: patch1.toBinary(), + // // }, + // // { + // // seq: 1, + // // created: expect.any(Number), + // // blob: patch2.toBinary(), + // // }, + // // { + // // seq: 2, + // // created: expect.any(Number), + // // blob: patch3.toBinary(), + // // }, + // // ], + // }); }); }); }); diff --git a/src/server/routes/block/index.ts b/src/server/routes/block/index.ts index 9306885880..cda905e8fa 100644 --- a/src/server/routes/block/index.ts +++ b/src/server/routes/block/index.ts @@ -4,7 +4,16 @@ import {upd} from './methods/upd'; import {del} from './methods/del'; import {scan} from './methods/scan'; import {listen} from './methods/listen'; -import {Block, BlockId, BlockPatch, BlockSeq} from './schema'; +import { + Block, + BlockPartial, + BlockPartialReturn, + BlockId, + BlockPatch, + BlockPatchPartial, + BlockPatchPartialReturn, + BlockSeq, +} from './schema'; import type {RouteDeps, Router, RouterBase} from '../types'; export const block = @@ -15,7 +24,11 @@ export const block = system.alias('BlockId', BlockId); system.alias('BlockSeq', BlockSeq); system.alias('Block', Block); + system.alias('BlockPartial', BlockPartial); + system.alias('BlockPartialReturn', BlockPartialReturn); system.alias('BlockPatch', BlockPatch); + system.alias('BlockPatchPartial', BlockPatchPartial); + system.alias('BlockPatchPartialReturn', BlockPatchPartialReturn); // prettier-ignore return ( diff --git a/src/server/routes/block/methods/get.ts b/src/server/routes/block/methods/get.ts index 2da94b63a0..081466b2b9 100644 --- a/src/server/routes/block/methods/get.ts +++ b/src/server/routes/block/methods/get.ts @@ -12,8 +12,7 @@ export const get = ); const Response = t.Object( - // TODO: Rename this field to `model` or `state`. - t.prop('block', t.Ref('Block')), + t.prop('model', t.Ref('Block')), t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ title: 'Patches', description: 'The list of all patches.', @@ -27,9 +26,9 @@ export const get = }); return r.prop('block.get', Func, async ({id}) => { - const {block, patches} = await services.blocks.get(id); + const {model, patches} = await services.blocks.get(id); return { - block, + model, patches, }; }); diff --git a/src/server/routes/block/methods/listen.ts b/src/server/routes/block/methods/listen.ts index 7d6f9b1ce7..1235cadd96 100644 --- a/src/server/routes/block/methods/listen.ts +++ b/src/server/routes/block/methods/listen.ts @@ -1,4 +1,4 @@ -import {switchMap} from 'rxjs'; +import {switchMap, tap} from 'rxjs'; import type {RouteDeps, Router, RouterBase} from '../../types'; import type {BlockId, BlockPatch, Block} from '../schema'; @@ -14,17 +14,18 @@ export const listen = }), ); + // TODO: Use TLV encoding. const Response = t.Object( t.propOpt('deleted', t.Boolean()).options({ title: 'Deleted', description: 'Emitted only when the block is deleted.', }), - t.propOpt('block', t.Ref('Block')).options({ + t.propOpt('model', t.Ref('Block')).options({ title: 'Block', description: 'The whole block object, emitted only when the block is created.', }), - t.propOpt('patches', t.Array(PatchType)).options({ - title: 'Latest patches', + t.propOpt('patches', t.Array(t.Ref('BlockPatch'))).options({ + title: 'Latest Patches', description: 'Patches that have been applied to the block.', }), ); diff --git a/src/server/routes/block/methods/new.ts b/src/server/routes/block/methods/new.ts index e0ac20641b..b270460417 100644 --- a/src/server/routes/block/methods/new.ts +++ b/src/server/routes/block/methods/new.ts @@ -1,5 +1,5 @@ import type {RouteDeps, Router, RouterBase} from '../../types'; -import type {BlockId, BlockPatch} from '../schema'; +import type {Block, BlockId, BlockPatchPartial, BlockPatchPartialReturn} from '../schema'; export const new_ = ({t, services}: RouteDeps) => @@ -9,13 +9,19 @@ export const new_ = title: 'New block ID', description: 'The ID of the new block.', }), - t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ + t.prop('patches', t.Array(t.Ref('BlockPatchPartial'))).options({ title: 'Patches', description: 'The patches to apply to the document.', }), ); - const Response = t.obj; + const Response = t.Object( + t.prop('model', t.Ref('Block')), + t.prop('patches', t.Array(t.Ref('BlockPatchPartialReturn'))).options({ + title: 'Patches', + description: 'The list of all patches.', + }), + ); const Func = t.Function(Request, Response).options({ title: 'Create Block', @@ -24,7 +30,7 @@ export const new_ = }); return r.prop('block.new', Func, async ({id, patches}) => { - const {block} = await services.blocks.create(id, patches); - return {}; + const res = await services.blocks.create(id, patches); + return res; }); }; diff --git a/src/server/routes/block/schema.ts b/src/server/routes/block/schema.ts index 7fb9483b5a..6d12c3eaf1 100644 --- a/src/server/routes/block/schema.ts +++ b/src/server/routes/block/schema.ts @@ -18,18 +18,31 @@ export const BlockSeq = t.num.options({ export type TBlock = ResolveType; // prettier-ignore -export const Block = t.Object( +export const BlockPartial = t.Object( + t.prop('blob', t.bin), +); + +export const BlockPartialReturn = t.Object( t.prop('id', t.Ref('BlockId')), t.prop('seq', t.Ref('BlockSeq')), t.prop('created', t.num), t.prop('updated', t.num), - t.prop('blob', t.bin), ); +export const Block = BlockPartial.extend(BlockPartialReturn); + export type TBlockPatch = ResolveType; // prettier-ignore -export const BlockPatch = t.Object( +export const BlockPatchPartial = t.Object( + t.prop('blob', t.bin).options({ + title: 'Patch Blob', + description: 'The binary data of the patch. The format of the data is defined by the patch type.', + }), +); + +// prettier-ignore +export const BlockPatchPartialReturn = t.Object( t.prop('seq', t.num).options({ title: 'Patch Sequence Number', description: 'The sequence number of the patch in the block. A monotonically increasing integer, starting from 0.', @@ -42,8 +55,6 @@ export const BlockPatch = t.Object( 'want to also store the time when the patch was created by the user, you can include this ' + 'information in the patch blob itself.', }), - t.prop('blob', t.bin).options({ - title: 'Patch Blob', - description: 'The binary data of the patch. The format of the data is defined by the patch type.', - }), ); + +export const BlockPatch = BlockPatchPartial.extend(BlockPatchPartialReturn); diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 46ae8be878..3b7d062f71 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -1,15 +1,16 @@ import {MemoryStore} from './MemoryStore'; -import {StorePatch} from './types'; import {RpcError, RpcErrorCodes} from '../../../reactive-rpc/common/rpc/caller'; import {Model, Patch} from '../../../json-crdt'; +import type {StoreModel, StorePatch} from './types'; import type {Services} from '../Services'; +import {SESSION} from '../../../json-crdt-patch/constants'; -const BLOCK_TTL = 1000 * 60 * 60; // 1 hour +const BLOCK_TTL = 1000 * 60 * 30; // 30 minutes -const validatePatches = (patches: StorePatch[]) => { +const validatePatches = (patches: Pick[]) => { for (const patch of patches) { if (patch.blob.length > 2000) throw RpcError.validation('patch blob too large'); - if (patch.seq > 500_000) throw RpcError.validation('patch seq too large'); + // if (patch.seq > 500_000) throw RpcError.validation('patch seq too large'); } }; @@ -18,37 +19,62 @@ export class BlocksServices { constructor(protected readonly services: Services) {} - public async create(id: string, patches: StorePatch[]) { + public async create(id: string, partialPatches: Pick[]) { this.maybeGc(); - const {store} = this; - validatePatches(patches); - const {block} = await store.create(id, patches); - const data = { - block, - patches, + validatePatches(partialPatches); + if (!Array.isArray(partialPatches)) throw new Error('INVALID_PATCHES'); + const length = partialPatches.length; + const now = Date.now(); + if (!length) { + const rawModel = Model.withLogicalClock(SESSION.GLOBAL); + const model: StoreModel = { + id, + seq: -1, + blob: rawModel.toBinary(), + created: now, + updated: now, + }; + return await this.__create(id, model, []); + } + const rawPatches: Patch[] = []; + const patches: StorePatch[] = []; + let seq = 0; + for (; seq < length; seq++) { + const blob = partialPatches[seq].blob; + rawPatches.push(Patch.fromBinary(blob)); + patches.push({seq, created: now, blob}); + } + const rawModel = Model.fromPatches(rawPatches); + const model: StoreModel = { + id, + seq: seq - 1, + blob: rawModel.toBinary(), + created: now, + updated: now, }; - this.services.pubsub.publish(`__block:${id}`, data).catch((error) => { + return await this.__create(id, model, patches);; + } + + private async __create(id: string, model: StoreModel, patches: StorePatch[]) { + await this.store.create(id, model, patches); + this.services.pubsub.publish(`__block:${id}`, {model, patches}).catch((error) => { // tslint:disable-next-line:no-console console.error('Error publishing block patches', error); }); - return {block}; + return { + model, + patches, + }; } public async get(id: string) { const {store} = this; const result = await store.get(id); if (!result) throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND); - const patches = await store.history(id, 0, result.block.seq); - const {block} = result; + // const patches = await store.history(id, 0, result.block.seq); + const {model} = result; // TODO: should not return `patches`, only the "tip". - return {block, patches}; - } - - public async getAtSeq(id: string, seq: number) { - const {store} = this; - const patches = await store.history(id, 0, seq); - const model = Model.fromPatches(patches.map(p => Patch.fromBinary(p.blob))); - return model; + return {model, patches: []}; } public async remove(id: string) { @@ -93,19 +119,19 @@ export class BlocksServices { const seq = patches[0].seq; const {store} = this; validatePatches(patches); - const {block} = await store.edit(id, patches); + const {model} = await store.edit(id, patches); this.services.pubsub.publish(`__block:${id}`, {patches}).catch((error) => { // tslint:disable-next-line:no-console console.error('Error publishing block patches', error); }); const expectedBlockSeq = seq + patches.length - 1; - const hadConcurrentEdits = block.seq !== expectedBlockSeq; + const hadConcurrentEdits = model.seq !== expectedBlockSeq; let patchesBack: StorePatch[] = []; if (hadConcurrentEdits) { - patchesBack = await store.history(id, seq, block.seq); + patchesBack = await store.history(id, seq, model.seq); } return { - block, + model, patches: patchesBack, }; } diff --git a/src/server/services/blocks/MemoryStore.ts b/src/server/services/blocks/MemoryStore.ts index 48e87f42b5..491c7fec13 100644 --- a/src/server/services/blocks/MemoryStore.ts +++ b/src/server/services/blocks/MemoryStore.ts @@ -5,52 +5,33 @@ import type * as types from './types'; const tick = new Promise((resolve) => setImmediate(resolve)); export class MemoryStore implements types.Store { - protected readonly blocks = new Map(); + protected readonly models = new Map(); protected readonly patches = new Map(); public async get(id: string): Promise { await tick; - const block = this.blocks.get(id); - if (!block) return; - return {block}; + const model = this.models.get(id); + if (!model) return; + return {model}; } public async seq(id: string): Promise { await tick; - return this.blocks.get(id)?.seq; + return this.models.get(id)?.seq; } - public async create(id: string, patches: types.StorePatch[]): Promise { + public async create(id: string, model: types.StoreModel, patches: types.StorePatch[]): Promise { await tick; if (!Array.isArray(patches)) throw new Error('NO_PATCHES'); - if (this.blocks.has(id)) throw new Error('BLOCK_EXISTS'); - const model = Model.withLogicalClock(); - let seq = -1; - const now = Date.now(); - if (patches.length) { - for (const patch of patches) { - seq++; - if (seq !== patch.seq) throw new Error('PATCHES_OUT_OF_ORDER'); - model.applyPatch(Patch.fromBinary(patch.blob)); - if (patch.created > now) patch.created = now; - } - } - const block: types.StoreBlock = { - id, - seq: seq, - blob: model.toBinary(), - created: now, - updated: now, - }; - this.blocks.set(id, block); + if (this.models.has(id)) throw new Error('BLOCK_EXISTS'); + this.models.set(id, model); this.patches.set(id, patches); - return {block}; } public async edit(id: string, patches: types.StorePatch[]): Promise { await tick; if (!Array.isArray(patches) || !patches.length) throw new Error('NO_PATCHES'); - const block = this.blocks.get(id); + const block = this.models.get(id); const existingPatches = this.patches.get(id); if (!block || !existingPatches) throw new Error('BLOCK_NOT_FOUND'); let seq = patches[0].seq; @@ -67,7 +48,7 @@ export class MemoryStore implements types.Store { block.blob = model.toBinary(); block.updated = Date.now(); for (const patch of patches) existingPatches.push(patch); - return {block}; + return {model: block}; } public async history(id: string, min: number, max: number): Promise { @@ -83,24 +64,24 @@ export class MemoryStore implements types.Store { } private removeSync(id: string): boolean { - this.blocks.delete(id); + this.models.delete(id); return this.patches.delete(id); } public stats(): {blocks: number; patches: number} { return { - blocks: this.blocks.size, + blocks: this.models.size, patches: [...this.patches.values()].reduce((acc, v) => acc + v.length, 0), }; } public async removeOlderThan(ts: number): Promise { await tick; - for (const [id, block] of this.blocks) if (block.created < ts) this.removeSync(id); + for (const [id, block] of this.models) if (block.created < ts) this.removeSync(id); } public async removeAccessedBefore(ts: number): Promise { await tick; - for (const [id, block] of this.blocks) if (block.updated < ts) this.removeSync(id); + for (const [id, block] of this.models) if (block.updated < ts) this.removeSync(id); } } diff --git a/src/server/services/blocks/types.ts b/src/server/services/blocks/types.ts index a7a0fa693a..89a22c4404 100644 --- a/src/server/services/blocks/types.ts +++ b/src/server/services/blocks/types.ts @@ -1,4 +1,4 @@ -export interface StoreBlock { +export interface StoreModel { id: string; seq: number; created: number; @@ -20,7 +20,7 @@ export interface Store { * @param patches Initial patches to apply to a new block. * @returns Newly created block data. */ - create(id: string, patches: StorePatch[]): Promise; + create(id: string, model: StoreModel, patches: StorePatch[]): Promise; /** * Retrieve an existing block. @@ -67,9 +67,9 @@ export interface Store { } export interface StoreGetResult { - block: StoreBlock; + model: StoreModel; } export interface StoreApplyResult { - block: StoreBlock; + model: StoreModel; } From ae14b6cd176dc873fb3e9c97c620bbdf4aa005df Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 14 Apr 2024 11:07:57 +0200 Subject: [PATCH 19/23] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20allo?= =?UTF-8?q?w=20to=20optionally=20load=20full=20history=20in=20"block.get"?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/block.spec.ts | 49 +++++++++++--------- src/server/routes/block/methods/get.ts | 21 ++++++--- src/server/services/blocks/BlocksServices.ts | 4 +- 3 files changed, 42 insertions(+), 32 deletions(-) diff --git a/src/server/__tests__/block.spec.ts b/src/server/__tests__/block.spec.ts index 7a6724c8fe..d3c2177455 100644 --- a/src/server/__tests__/block.spec.ts +++ b/src/server/__tests__/block.spec.ts @@ -338,7 +338,6 @@ describe('block.*', () => { const {client} = setup(); const emits: any[] = []; client.call$('block.listen', {id: 'my-block'}).subscribe((data) => { - console.log('data', data); emits.push(data); }); await client.call('block.new', {id: 'my-block', patches: []}); @@ -449,27 +448,33 @@ describe('block.*', () => { }, ], }); - const result = await client.call('block.get', {id: 'my-block'}); - // expect(result).toMatchObject({ - // block: expect.any(Object), - // // patches: [ - // // { - // // seq: 0, - // // created: expect.any(Number), - // // blob: patch1.toBinary(), - // // }, - // // { - // // seq: 1, - // // created: expect.any(Number), - // // blob: patch2.toBinary(), - // // }, - // // { - // // seq: 2, - // // created: expect.any(Number), - // // blob: patch3.toBinary(), - // // }, - // // ], - // }); + const result = await client.call('block.get', {id: 'my-block', history: true}); + expect(result).toMatchObject({ + model: { + id: 'my-block', + seq: 2, + blob: expect.any(Uint8Array), + created: expect.any(Number), + updated: expect.any(Number), + }, + patches: [ + { + seq: 0, + created: expect.any(Number), + blob: patch1.toBinary(), + }, + { + seq: 1, + created: expect.any(Number), + blob: patch2.toBinary(), + }, + { + seq: 2, + created: expect.any(Number), + blob: patch3.toBinary(), + }, + ], + }); }); }); }); diff --git a/src/server/routes/block/methods/get.ts b/src/server/routes/block/methods/get.ts index 081466b2b9..2135db2748 100644 --- a/src/server/routes/block/methods/get.ts +++ b/src/server/routes/block/methods/get.ts @@ -1,3 +1,4 @@ +import {ResolveType} from '../../../../json-type'; import type {RouteDeps, Router, RouterBase} from '../../types'; import type {Block, BlockId, BlockPatch} from '../schema'; @@ -9,11 +10,15 @@ export const get = title: 'Block ID', description: 'The ID of the block to retrieve.', }), + t.propOpt('history', t.bool).options({ + title: 'With History', + description: 'Whether to include the full history of patches in the response. Defaults to `false`.', + }), ); const Response = t.Object( t.prop('model', t.Ref('Block')), - t.prop('patches', t.Array(t.Ref('BlockPatch'))).options({ + t.propOpt('patches', t.Array(t.Ref('BlockPatch'))).options({ title: 'Patches', description: 'The list of all patches.', }), @@ -25,11 +30,13 @@ export const get = description: 'Fetches a block by ID.', }); - return r.prop('block.get', Func, async ({id}) => { - const {model, patches} = await services.blocks.get(id); - return { - model, - patches, - }; + return r.prop('block.get', Func, async ({id, history}) => { + const {model} = await services.blocks.get(id); + const response: ResolveType = {model}; + if (history) { + const {patches} = await services.blocks.scan(id, 0, model.seq); + response.patches = patches; + } + return response; }); }; diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 3b7d062f71..47a3a67195 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -71,10 +71,8 @@ export class BlocksServices { const {store} = this; const result = await store.get(id); if (!result) throw RpcError.fromCode(RpcErrorCodes.NOT_FOUND); - // const patches = await store.history(id, 0, result.block.seq); const {model} = result; - // TODO: should not return `patches`, only the "tip". - return {model, patches: []}; + return {model}; } public async remove(id: string) { From 2dcdad5fb0211d649d899da85ce20c36ca903025 Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 14 Apr 2024 11:33:59 +0200 Subject: [PATCH 20/23] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20impr?= =?UTF-8?q?ove=20emitted=20event=20shape?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/server/__tests__/block.spec.ts | 21 +++++++------ src/server/routes/block/methods/listen.ts | 31 ++++++++++---------- src/server/services/blocks/BlocksServices.ts | 27 +++++++++-------- 3 files changed, 41 insertions(+), 38 deletions(-) diff --git a/src/server/__tests__/block.spec.ts b/src/server/__tests__/block.spec.ts index d3c2177455..4a52c1760c 100644 --- a/src/server/__tests__/block.spec.ts +++ b/src/server/__tests__/block.spec.ts @@ -290,8 +290,9 @@ describe('block.*', () => { }); await until(() => emits.length === 1); expect(emits.length).toBe(1); - expect(emits[0].patches.length).toBe(1); - expect(emits[0].patches[0].seq).toBe(0); + expect(emits[0][0]).toBe('upd'); + expect(emits[0][1].patches.length).toBe(1); + expect(emits[0][1].patches[0].seq).toBe(0); model.api.root({ text: 'Hello', }); @@ -304,8 +305,9 @@ describe('block.*', () => { }); await until(() => emits.length === 2); expect(emits.length).toBe(2); - expect(emits[1].patches.length).toBe(1); - expect(emits[1].patches[0].seq).toBe(1); + expect(emits[1][0]).toBe('upd'); + expect(emits[1][1].patches.length).toBe(1); + expect(emits[1][1].patches[0].seq).toBe(1); }); test('can subscribe before block is created', async () => { @@ -329,9 +331,10 @@ describe('block.*', () => { }); await until(() => emits.length === 1); expect(emits.length).toBe(1); - expect(emits[0].patches.length).toBe(1); - expect(emits[0].patches[0].seq).toBe(0); - expect(emits[0].patches[0].blob).toStrictEqual(patch1.toBinary()); + expect(emits[0][0]).toBe('upd'); + expect(emits[0][1].patches.length).toBe(1); + expect(emits[0][1].patches[0].seq).toBe(0); + expect(emits[0][1].patches[0].blob).toStrictEqual(patch1.toBinary()); }); test('can receive deletion events', async () => { @@ -342,11 +345,11 @@ describe('block.*', () => { }); await client.call('block.new', {id: 'my-block', patches: []}); await until(() => emits.length === 1); - expect(emits[0].model.seq).toBe(-1); + expect(emits[0][1].model.seq).toBe(-1); await tick(3); await client.call('block.del', {id: 'my-block'}); await until(() => emits.length === 2); - expect(emits[1].deleted).toBe(true); + expect(emits[1][0]).toBe('del'); }); }); diff --git a/src/server/routes/block/methods/listen.ts b/src/server/routes/block/methods/listen.ts index 1235cadd96..e522e0e8a0 100644 --- a/src/server/routes/block/methods/listen.ts +++ b/src/server/routes/block/methods/listen.ts @@ -5,8 +5,6 @@ import type {BlockId, BlockPatch, Block} from '../schema'; export const listen = ({t, services}: RouteDeps) => (r: Router) => { - const PatchType = t.Ref('BlockPatch'); - const Request = t.Object( t.prop('id', t.Ref('BlockId')).options({ title: 'Block ID', @@ -14,20 +12,21 @@ export const listen = }), ); - // TODO: Use TLV encoding. - const Response = t.Object( - t.propOpt('deleted', t.Boolean()).options({ - title: 'Deleted', - description: 'Emitted only when the block is deleted.', - }), - t.propOpt('model', t.Ref('Block')).options({ - title: 'Block', - description: 'The whole block object, emitted only when the block is created.', - }), - t.propOpt('patches', t.Array(t.Ref('BlockPatch'))).options({ - title: 'Latest Patches', - description: 'Patches that have been applied to the block.', - }), + const Response = t.Or( + t.Tuple(t.Const('del')), + t.Tuple( + t.Const('upd'), + t.Object( + t.propOpt('model', t.Ref('Block')).options({ + title: 'Block', + description: 'The whole block object, emitted only when the block is created.', + }), + t.propOpt('patches', t.Array(t.Ref('BlockPatch'))).options({ + title: 'Latest Patches', + description: 'Patches that have been applied to the block.', + }), + ) + ), ); const Func = t.Function$(Request, Response).options({ diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index 47a3a67195..aeb2770c0f 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -1,9 +1,9 @@ import {MemoryStore} from './MemoryStore'; import {RpcError, RpcErrorCodes} from '../../../reactive-rpc/common/rpc/caller'; import {Model, Patch} from '../../../json-crdt'; +import {SESSION} from '../../../json-crdt-patch/constants'; import type {StoreModel, StorePatch} from './types'; import type {Services} from '../Services'; -import {SESSION} from '../../../json-crdt-patch/constants'; const BLOCK_TTL = 1000 * 60 * 30; // 30 minutes @@ -57,16 +57,21 @@ export class BlocksServices { private async __create(id: string, model: StoreModel, patches: StorePatch[]) { await this.store.create(id, model, patches); - this.services.pubsub.publish(`__block:${id}`, {model, patches}).catch((error) => { - // tslint:disable-next-line:no-console - console.error('Error publishing block patches', error); - }); + this.__emitUpd(id, model, patches); return { model, patches, }; } + private __emitUpd(id: string, model: StoreModel, patches: StorePatch[]) { + const msg = ['upd', {model, patches}]; + this.services.pubsub.publish(`__block:${id}`, msg).catch((error) => { + // tslint:disable-next-line:no-console + console.error('Error publishing block patches', error); + }); + } + public async get(id: string) { const {store} = this; const result = await store.get(id); @@ -77,7 +82,8 @@ export class BlocksServices { public async remove(id: string) { await this.store.remove(id); - this.services.pubsub.publish(`__block:${id}`, {deleted: true}).catch((error) => { + const msg = ['del']; + this.services.pubsub.publish(`__block:${id}`, msg).catch((error) => { // tslint:disable-next-line:no-console console.error('Error publishing block deletion', error); }); @@ -118,16 +124,11 @@ export class BlocksServices { const {store} = this; validatePatches(patches); const {model} = await store.edit(id, patches); - this.services.pubsub.publish(`__block:${id}`, {patches}).catch((error) => { - // tslint:disable-next-line:no-console - console.error('Error publishing block patches', error); - }); + this.__emitUpd(id, model, patches); const expectedBlockSeq = seq + patches.length - 1; const hadConcurrentEdits = model.seq !== expectedBlockSeq; let patchesBack: StorePatch[] = []; - if (hadConcurrentEdits) { - patchesBack = await store.history(id, seq, model.seq); - } + if (hadConcurrentEdits) patchesBack = await store.history(id, seq, model.seq); return { model, patches: patchesBack, From e1343ac92d112146d6ec6af56721f87a69379273 Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 14 Apr 2024 11:34:53 +0200 Subject: [PATCH 21/23] =?UTF-8?q?style:=20=F0=9F=92=84=20run=20Prettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../remote/RemoteHistoryDemoServer.ts | 20 ++++++++++++------- src/json-crdt-repo/remote/types.ts | 8 ++++---- src/reactive-rpc/common/types.ts | 18 +++++++++-------- src/server/routes/block/methods/listen.ts | 2 +- src/server/routes/block/methods/scan.ts | 19 +++++++++--------- src/server/services/blocks/BlocksServices.ts | 16 ++++++++++----- 6 files changed, 49 insertions(+), 34 deletions(-) diff --git a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts index 57314b18c6..1bc8656ab1 100644 --- a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts +++ b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts @@ -1,6 +1,6 @@ import type {CallerToMethods, TypedRpcClient} from '../../reactive-rpc/common'; import type {JsonJoyDemoRpcCaller} from '../../server'; -import type {RemoteHistory, RemoteModel, RemotePatch} from "./types"; +import type {RemoteHistory, RemoteModel, RemotePatch} from './types'; type Methods = CallerToMethods; @@ -17,7 +17,7 @@ export interface RemoteServerPatch extends RemotePatch { } export class RemoteHistoryDemoServer implements RemoteHistory { - constructor (protected readonly client: TypedRpcClient) {} + constructor(protected readonly client: TypedRpcClient) {} public async create(id: string, patches: RemotePatch[]): Promise { await this.client.call('block.new', { @@ -35,7 +35,7 @@ export class RemoteHistoryDemoServer implements RemoteHistory { + public async read(id: string): Promise<{cursor: Cursor; model: RemoteServerModel; patches: RemoteServerPatch[]}> { const {block, patches} = await this.client.call('block.get', {id}); return { cursor: block.seq, @@ -44,7 +44,7 @@ export class RemoteHistoryDemoServer implements RemoteHistory { + public async scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; patches: RemoteServerPatch[]}> { const limit = 100; const res = await this.client.call('block.scan', { id, @@ -63,12 +63,18 @@ export class RemoteHistoryDemoServer implements RemoteHistory { + public async scanBwd( + id: string, + cursor: Cursor, + ): Promise<{cursor: Cursor; model: RemoteServerModel; patches: RemoteServerPatch[]}> { throw new Error('The "blocks.history" should be able to return starting model.'); } - public async update(id: string, cursor: Cursor, patches: RemotePatch[]): Promise<{cursor: Cursor, patches: RemoteServerPatch[]}> { - + public async update( + id: string, + cursor: Cursor, + patches: RemotePatch[], + ): Promise<{cursor: Cursor; patches: RemoteServerPatch[]}> { const res = await this.client.call('block.upd', { id, patches: patches.map((patch, seq) => ({ diff --git a/src/json-crdt-repo/remote/types.ts b/src/json-crdt-repo/remote/types.ts index 2e8ac571a4..2f76a1037a 100644 --- a/src/json-crdt-repo/remote/types.ts +++ b/src/json-crdt-repo/remote/types.ts @@ -12,13 +12,13 @@ export interface RemoteHistory; + read(id: string): Promise<{cursor: Cursor; model: M; patches: P[]}>; - scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor, patches: P[]}>; + scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; patches: P[]}>; - scanBwd(id: string, cursor: Cursor): Promise<{cursor: Cursor, model: M, patches: P[]}>; + scanBwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; model: M; patches: P[]}>; - update(id: string, cursor: Cursor, patches: RemotePatch[]): Promise<{cursor: Cursor, patches: P[]}>; + update(id: string, cursor: Cursor, patches: RemotePatch[]): Promise<{cursor: Cursor; patches: P[]}>; delete?(id: string): Promise; diff --git a/src/reactive-rpc/common/types.ts b/src/reactive-rpc/common/types.ts index a1b2f96f0e..b9ee65d320 100644 --- a/src/reactive-rpc/common/types.ts +++ b/src/reactive-rpc/common/types.ts @@ -1,12 +1,14 @@ -import type {Observable} from "rxjs"; -import type {FunctionStreamingType, FunctionType, ResolveType} from "../../json-type"; -import type {ObjectValue, ObjectValueToTypeMap, UnObjectType} from "../../json-type-value/ObjectValue"; -import type {TypeRouter} from "../../json-type/system/TypeRouter"; -import type {ObjectValueCaller} from "./rpc/caller/ObjectValueCaller"; -import type {RpcCaller} from "./rpc/caller/RpcCaller"; -import type {TypeRouterCaller} from "./rpc/caller/TypeRouterCaller"; +import type {Observable} from 'rxjs'; +import type {FunctionStreamingType, FunctionType, ResolveType} from '../../json-type'; +import type {ObjectValue, ObjectValueToTypeMap, UnObjectType} from '../../json-type-value/ObjectValue'; +import type {TypeRouter} from '../../json-type/system/TypeRouter'; +import type {ObjectValueCaller} from './rpc/caller/ObjectValueCaller'; +import type {RpcCaller} from './rpc/caller/RpcCaller'; +import type {TypeRouterCaller} from './rpc/caller/TypeRouterCaller'; -export type CallerToMethods> = {[K in keyof UnTypeRouter>]: UnwrapFunction>[K]>}; +export type CallerToMethods> = { + [K in keyof UnTypeRouter>]: UnwrapFunction>[K]>; +}; type UnTypeRouterCaller = T extends TypeRouterCaller ? R : T extends ObjectValueCaller ? R : never; type UnTypeRouter = diff --git a/src/server/routes/block/methods/listen.ts b/src/server/routes/block/methods/listen.ts index e522e0e8a0..bd3f9c8405 100644 --- a/src/server/routes/block/methods/listen.ts +++ b/src/server/routes/block/methods/listen.ts @@ -25,7 +25,7 @@ export const listen = title: 'Latest Patches', description: 'Patches that have been applied to the block.', }), - ) + ), ), ); diff --git a/src/server/routes/block/methods/scan.ts b/src/server/routes/block/methods/scan.ts index d437cc6952..ce5080199f 100644 --- a/src/server/routes/block/methods/scan.ts +++ b/src/server/routes/block/methods/scan.ts @@ -15,16 +15,17 @@ export const scan = }), t.propOpt('limit', t.num.options({format: 'u32'})).options({ title: 'Number of Patches', - description: 'The minimum number of patches to return. Defaults to 10. ' + - 'When positive, returns the patches ahead of the starting sequence number. ' + - 'When negative, returns the patches behind the starting sequence number.', + description: + 'The minimum number of patches to return. Defaults to 10. ' + + 'When positive, returns the patches ahead of the starting sequence number. ' + + 'When negative, returns the patches behind the starting sequence number.', + }), + t.propOpt('model', t.bool).options({ + title: 'With Model', + description: + 'Whether to include the model in the response. ' + + 'Defaults to `false`, when `len` is positive; and, defaults to `true`, when `len` is negative.', }), - t.propOpt('model', t.bool) - .options({ - title: 'With Model', - description: 'Whether to include the model in the response. ' + - 'Defaults to `false`, when `len` is positive; and, defaults to `true`, when `len` is negative.', - }), ); const Response = t.Object( diff --git a/src/server/services/blocks/BlocksServices.ts b/src/server/services/blocks/BlocksServices.ts index aeb2770c0f..dad9f11715 100644 --- a/src/server/services/blocks/BlocksServices.ts +++ b/src/server/services/blocks/BlocksServices.ts @@ -52,7 +52,7 @@ export class BlocksServices { created: now, updated: now, }; - return await this.__create(id, model, patches);; + return await this.__create(id, model, patches); } private async __create(id: string, model: StoreModel, patches: StorePatch[]) { @@ -89,11 +89,17 @@ export class BlocksServices { }); } - public async scan(id: string, offset: number | undefined, limit: number | undefined = 10, returnStartModel: boolean = limit < 0) { + public async scan( + id: string, + offset: number | undefined, + limit: number | undefined = 10, + returnStartModel: boolean = limit < 0, + ) { const {store} = this; if (typeof offset !== 'number') offset = await store.seq(id); - let min: number = 0, max: number = 0; - if (!limit || (Math.round(limit) !== limit)) throw RpcError.badRequest('INVALID_LIMIT'); + let min: number = 0, + max: number = 0; + if (!limit || Math.round(limit) !== limit) throw RpcError.badRequest('INVALID_LIMIT'); if (limit > 0) { min = Number(offset) || 0; max = min + limit; @@ -110,7 +116,7 @@ export class BlocksServices { if (returnStartModel) { const startPatches = await store.history(id, 0, min); if (startPatches.length) { - model = Model.fromPatches(startPatches.map(p => Patch.fromBinary(p.blob))); + model = Model.fromPatches(startPatches.map((p) => Patch.fromBinary(p.blob))); } } return {patches, model}; From f86ed4df3af32087e32e4540b13ec0937ea5f485 Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 14 Apr 2024 11:56:23 +0200 Subject: [PATCH 22/23] =?UTF-8?q?feat(reactive-rpc):=20=F0=9F=8E=B8=20clea?= =?UTF-8?q?nup=20RemoteHistoryDemoServer?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../remote/RemoteHistoryDemoServer.ts | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts index 1bc8656ab1..27accc7c8b 100644 --- a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts +++ b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts @@ -36,11 +36,11 @@ export class RemoteHistoryDemoServer implements RemoteHistory { - const {block, patches} = await this.client.call('block.get', {id}); + const {model, patches} = await this.client.call('block.get', {id}); return { - cursor: block.seq, - model: block, - patches, + cursor: model.seq, + model, + patches: [], }; } @@ -48,8 +48,8 @@ export class RemoteHistoryDemoServer implements RemoteHistory { + public async delete(id: string): Promise { await this.client.call('block.del', {id}); } From 7d6a033ed929348cf13568123d850e73b59ce390 Mon Sep 17 00:00:00 2001 From: streamich Date: Sun, 14 Apr 2024 12:47:14 +0200 Subject: [PATCH 23/23] =?UTF-8?q?fix(json-crdt):=20=F0=9F=90=9B=20use=20ri?= =?UTF-8?q?ght=20payload?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts | 5 +---- .../remote/__tests__/RemoteHistoryServer.spec.ts | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts index 27accc7c8b..f3ed21b557 100644 --- a/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts +++ b/src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts @@ -22,10 +22,7 @@ export class RemoteHistoryDemoServer implements RemoteHistory { await this.client.call('block.new', { id, - patches: patches.map((patch, seq) => ({ - // TODO: seq and created should be set on server. (And returned back?) - seq, - created: Date.now(), + patches: patches.map((patch) => ({ blob: patch.blob, })), }); diff --git a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts index 07c065d4aa..31b85a7b14 100644 --- a/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts +++ b/src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts @@ -30,7 +30,7 @@ describe('.create()', () => { await remote.create(id, [{blob}]); const {data} = await caller.call('block.get', {id}, {}); // console.log(data.patches); - const model2 = Model.fromBinary(data.block.blob); + const model2 = Model.fromBinary(data.model.blob); expect(model2.view()).toEqual({foo: 'bar'}); }); });