From aa66e780406d37d9479ada27671ec5feed65c3ce Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 29 Nov 2023 02:49:27 +0100 Subject: [PATCH 1/9] =?UTF-8?q?feat(json-crdt):=20=F0=9F=8E=B8=20add=20.on?= =?UTF-8?q?beforechange()=20callback?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/model/Model.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/json-crdt/model/Model.ts b/src/json-crdt/model/Model.ts index bf2d763ac8..84699e69aa 100644 --- a/src/json-crdt/model/Model.ts +++ b/src/json-crdt/model/Model.ts @@ -141,7 +141,8 @@ export class Model implements Printable { public tick: number = 0; /** - * Callback called after every `applyPatch` call. + * Callback called after every `applyPatch` call. Or after local changes + * applied through the `ModelApi`. * * When using the `.api` API, this property is set automatically by * the {@link ModelApi} class. In that case use the `mode.api.evens.on('change')` @@ -149,6 +150,11 @@ export class Model implements Printable { */ public onchange: undefined | ((type: ModelChangeType) => void) = undefined; + /** + * Callback called before every `applyPatch` call. + */ + public onbeforechange: undefined | ((type: ModelChangeType, patch: Patch) => void) = undefined; + /** * Applies a batch of patches to the document. * @@ -164,6 +170,7 @@ export class Model implements Printable { * through this method. */ public applyPatch(patch: Patch) { + this.onbeforechange?.(ModelChangeType.REMOTE, patch); const ops = patch.ops; const {length} = ops; for (let i = 0; i < length; i++) this.applyOperation(ops[i]); @@ -180,6 +187,7 @@ export class Model implements Printable { * * @param op Any JSON CRDT Patch operation * @ignore + * @internal */ public applyOperation(op: JsonCrdtPatchOperation): void { this.clock.observe(op.id, op.span()); From ded0caf563ed8c022cf253516cf2f736ac7ffd14 Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 29 Nov 2023 10:28:42 +0100 Subject: [PATCH 2/9] =?UTF-8?q?feat(json-crdt):=20=F0=9F=8E=B8=20preserve?= =?UTF-8?q?=20API=20nodes=20on=20reset?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/model/Model.ts | 12 ++++++++++-- .../model/__tests__/Model.cloning.spec.ts | 14 ++++++++++++++ src/json-crdt/model/api/nodes.ts | 2 +- 3 files changed, 25 insertions(+), 3 deletions(-) diff --git a/src/json-crdt/model/Model.ts b/src/json-crdt/model/Model.ts index 84699e69aa..04a6a86e96 100644 --- a/src/json-crdt/model/Model.ts +++ b/src/json-crdt/model/Model.ts @@ -334,13 +334,21 @@ export class Model implements Printable { * Resets the model to equivalent state of another model. */ public reset(to: Model): void { + const index = this.index; this.index = new AvlMap(clock.compare); const blob = to.toBinary(); decoder.decode(blob, this); this.clock = to.clock.clone(); this.ext = to.ext.clone(); - const api = this._api; - if (api) api.flush(); + this._api?.flush(); + index.forEach(({v: node}) => { + const api = node.api as NodeApi | undefined; + if (!api) return; + const newNode = this.index.get(node.id); + if (!newNode) return; + api.node = newNode; + newNode.api = api; + }); this.onchange?.(ModelChangeType.RESET); } diff --git a/src/json-crdt/model/__tests__/Model.cloning.spec.ts b/src/json-crdt/model/__tests__/Model.cloning.spec.ts index 814cfac541..d4f774384c 100644 --- a/src/json-crdt/model/__tests__/Model.cloning.spec.ts +++ b/src/json-crdt/model/__tests__/Model.cloning.spec.ts @@ -1,4 +1,5 @@ import {until} from '../../../__tests__/util'; +import {schema} from '../../../json-crdt-patch'; import {PatchBuilder} from '../../../json-crdt-patch/PatchBuilder'; import {Model} from '../Model'; @@ -219,4 +220,17 @@ describe('reset()', () => { await until(() => cnt > 0); expect(cnt).toBe(1); }); + + test('preserves API nodes when model is reset', async () => { + const doc1 = Model.withLogicalClock().setSchema(schema.obj({ + text: schema.str('hell'), + })); + const doc2 = doc1.fork(); + doc2.s.text.toApi().ins(4, 'o'); + const str = doc1.s.text.toApi(); + expect(str === doc2.s.text.toApi()).toBe(false); + expect(str.view()).toBe('hell'); + doc1.reset(doc2); + expect(str.view()).toBe('hello'); + }); }); diff --git a/src/json-crdt/model/api/nodes.ts b/src/json-crdt/model/api/nodes.ts index 6b11af34f7..6043d3060e 100644 --- a/src/json-crdt/model/api/nodes.ts +++ b/src/json-crdt/model/api/nodes.ts @@ -19,7 +19,7 @@ export type ApiPath = string | number | Path | void; * @category Local API */ export class NodeApi implements Printable { - constructor(public readonly node: N, public readonly api: ModelApi) {} + constructor(public node: N, public readonly api: ModelApi) {} /** @ignore */ private ev: undefined | NodeEvents = undefined; From 9fb273cc979567456134fb89440217a3ef163db5 Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 29 Nov 2023 10:54:46 +0100 Subject: [PATCH 3/9] =?UTF-8?q?feat(json-crdt):=20=F0=9F=8E=B8=20start=20e?= =?UTF-8?q?vent=20system=20refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/model/Model.ts | 40 +++++++++++++++++------------ src/json-crdt/model/api/ModelApi.ts | 5 ++++ 2 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/json-crdt/model/Model.ts b/src/json-crdt/model/Model.ts index 04a6a86e96..d7a2bbb7ae 100644 --- a/src/json-crdt/model/Model.ts +++ b/src/json-crdt/model/Model.ts @@ -141,41 +141,36 @@ export class Model implements Printable { public tick: number = 0; /** - * Callback called after every `applyPatch` call. Or after local changes - * applied through the `ModelApi`. + * Applies a batch of patches to the document. * - * When using the `.api` API, this property is set automatically by - * the {@link ModelApi} class. In that case use the `mode.api.evens.on('change')` - * to subscribe to changes. + * @param patches A batch, i.e. an array of patches. */ - public onchange: undefined | ((type: ModelChangeType) => void) = undefined; + public applyBatch(patches: Patch[]) { + const length = patches.length; + for (let i = 0; i < length; i++) this.applyPatch(patches[i]); + } /** * Callback called before every `applyPatch` call. */ - public onbeforechange: undefined | ((type: ModelChangeType, patch: Patch) => void) = undefined; + public onbeforepatch?: (patch: Patch) => void = undefined; /** - * Applies a batch of patches to the document. - * - * @param patches A batch, i.e. an array of patches. + * Callback called after every `applyPatch` call. */ - public applyBatch(patches: Patch[]) { - const length = patches.length; - for (let i = 0; i < length; i++) this.applyPatch(patches[i]); - } + public onpatch?: (patch: Patch) => void = undefined; /** * Applies a single patch to the document. All mutations to the model must go * through this method. */ public applyPatch(patch: Patch) { - this.onbeforechange?.(ModelChangeType.REMOTE, patch); + this.onbeforepatch?.(patch); const ops = patch.ops; const {length} = ops; for (let i = 0; i < length; i++) this.applyOperation(ops[i]); this.tick++; - this.onchange?.(ModelChangeType.REMOTE); + this.onpatch?.(patch); } /** @@ -330,10 +325,21 @@ export class Model implements Printable { return this.fork(this.clock.sid); } + /** + * Callback called before model isi reset using the `.reset()` method. + */ + public onbeforereset?: () => void = undefined; + + /** + * Callback called after model has been reset using the `.reset()` method. + */ + public onreset?: () => void = undefined; + /** * Resets the model to equivalent state of another model. */ public reset(to: Model): void { + this.onbeforereset?.(); const index = this.index; this.index = new AvlMap(clock.compare); const blob = to.toBinary(); @@ -349,7 +355,7 @@ export class Model implements Printable { api.node = newNode; newNode.api = api; }); - this.onchange?.(ModelChangeType.RESET); + this.onreset?.(); } /** diff --git a/src/json-crdt/model/api/ModelApi.ts b/src/json-crdt/model/api/ModelApi.ts index dba1b8baed..35bce11e1e 100644 --- a/src/json-crdt/model/api/ModelApi.ts +++ b/src/json-crdt/model/api/ModelApi.ts @@ -41,11 +41,16 @@ export class ModelApi implements SyncStore(); + public readonly onReset = new FanOut(); + /** * @param model Model instance on which the API operates. */ constructor(public readonly model: Model) { this.builder = new PatchBuilder(this.model.clock); + this.model.onbeforereset = () => this.onBeforeReset.emit(); + this.model.onreset = () => this.onReset.emit(); this.model.onchange = this.queueChange; } From 45bf8e11db7b45476d4c53513a5acfeb1a2c6750 Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 29 Nov 2023 18:44:17 +0100 Subject: [PATCH 4/9] =?UTF-8?q?feat(json-crdt):=20=F0=9F=8E=B8=20introduce?= =?UTF-8?q?=20new=20event=20system?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/__demos__/events.ts | 2 +- src/json-crdt/model/Model.ts | 7 +- src/json-crdt/model/api/ModelApi.ts | 86 +++++++---------- src/json-crdt/model/api/NodeEvents.ts | 133 +++++--------------------- src/json-crdt/model/api/nodes.ts | 2 + src/json-crdt/model/api/util.ts | 125 ++++++++++++++++++++++++ 6 files changed, 190 insertions(+), 165 deletions(-) create mode 100644 src/json-crdt/model/api/util.ts diff --git a/src/json-crdt/__demos__/events.ts b/src/json-crdt/__demos__/events.ts index 7e981e0cab..19cd1cc2ff 100644 --- a/src/json-crdt/__demos__/events.ts +++ b/src/json-crdt/__demos__/events.ts @@ -25,7 +25,7 @@ const model = Model.withLogicalClock(1234); // 1234 is the session ID // DOM Level 2 node events const root = model.api.r; -root.events.on('view', () => { +root.events.onViewChange.listen(() => { console.log('Root value changed'); }); diff --git a/src/json-crdt/model/Model.ts b/src/json-crdt/model/Model.ts index d7a2bbb7ae..4a79408f18 100644 --- a/src/json-crdt/model/Model.ts +++ b/src/json-crdt/model/Model.ts @@ -296,7 +296,7 @@ export class Model implements Printable { const node = this.index.get(value); if (!node) return; const api = node.api; - if (api) (api as NodeApi).events.onDelete(); + if (api) (api as NodeApi).events.handleDelete(); node.children((child) => this.deleteNodeTree(child.id)); this.index.del(value); } @@ -351,7 +351,10 @@ export class Model implements Printable { const api = node.api as NodeApi | undefined; if (!api) return; const newNode = this.index.get(node.id); - if (!newNode) return; + if (!newNode) { + api.events.handleDelete(); + return; + } api.node = newNode; newNode.api = api; }); diff --git a/src/json-crdt/model/api/ModelApi.ts b/src/json-crdt/model/api/ModelApi.ts index 35bce11e1e..b88c15c927 100644 --- a/src/json-crdt/model/api/ModelApi.ts +++ b/src/json-crdt/model/api/ModelApi.ts @@ -1,11 +1,11 @@ import {FanOut} from 'thingies/es2020/fanout'; import {VecNode, ConNode, ObjNode, ArrNode, BinNode, StrNode, ValNode} from '../../nodes'; import {ApiPath, ArrApi, BinApi, ConApi, NodeApi, ObjApi, StrApi, VecApi, ValApi} from './nodes'; -import {Emitter} from '../../../util/events/Emitter'; import {Patch} from '../../../json-crdt-patch/Patch'; import {PatchBuilder} from '../../../json-crdt-patch/PatchBuilder'; import {ModelChangeType, type Model} from '../Model'; import {SyncStore} from '../../../util/events/sync-store'; +import {MergeFanOut, MicrotaskBufferFanOut} from './util'; import type {JsonNode, JsonNodeView} from '../../nodes'; export interface ModelApiEvents { @@ -41,53 +41,34 @@ export class ModelApi implements SyncStore(); + /** Emitted after the model is reset, using the `.reset()` method. */ public readonly onReset = new FanOut(); + /** Emitted before a patch is applied using `model.applyPatch()`. */ + public readonly onBeforePatch = new FanOut(); + /** Emitted after a patch is applied using `model.applyPatch()`. */ + public readonly onPatch = new FanOut(); + /** Emitted before local changes through `model.api` are applied. */ + public readonly onBeforeLocalChange = new FanOut(); + /** Emitted after local changes through `model.api` are applied. */ + public readonly onLocalChange = new FanOut(); + /** Emitted when the model changes. Combines `onReset`, `onPatch` and `onLocalChange`. */ + public readonly onChange = new MergeFanOut([this.onReset, this.onPatch, this.onLocalChange]); + /** Emitted when the model changes. Same as `.onChange`, but this event is emitted once per microtask. */ + public readonly onChanges = new MicrotaskBufferFanOut(this.onChange); + /** Emitted when the `model.api` builder change buffer is flushed. */ + public readonly onFlush = new FanOut(); /** * @param model Model instance on which the API operates. */ constructor(public readonly model: Model) { - this.builder = new PatchBuilder(this.model.clock); - this.model.onbeforereset = () => this.onBeforeReset.emit(); - this.model.onreset = () => this.onReset.emit(); - this.model.onchange = this.queueChange; - } - - public readonly changes = new FanOut(); - - /** @ignore */ - private queuedChanges: undefined | Set = undefined; - - /** @ignore @deprecated */ - private readonly queueChange = (changeType: ModelChangeType): void => { - this.changes.emit(changeType); - let changesQueued = this.queuedChanges; - if (changesQueued) { - changesQueued.add(changeType); - return; - } - changesQueued = this.queuedChanges = new Set(); - changesQueued.add(changeType); - queueMicrotask(() => { - let changes = this.queuedChanges || new Set(); - this.queuedChanges = undefined; - const et = this.et; - if (changes.has(ModelChangeType.RESET)) changes = new Set([ModelChangeType.RESET]); - if (et) et.emit(new CustomEvent>('change', {detail: changes})); - }); - }; - - /** @ignore @deprecated */ - private et: Emitter = new Emitter(); - - /** - * Event target for listening to {@link Model} changes. - * - * @deprecated - */ - public get events(): Emitter { - return this.et; + this.builder = new PatchBuilder(model.clock); + model.onbeforereset = () => this.onBeforeReset.emit(); + model.onreset = () => this.onReset.emit(); + model.onbeforepatch = (patch) => this.onBeforePatch.emit(patch); + model.onpatch = (patch) => this.onPatch.emit(patch); } /** @@ -245,23 +226,28 @@ export class ModelApi implements SyncStore implements SyncStore('flush', {detail: patch}); - this.events.emit(event); + this.onFlush.emit(patch); return patch; } // ---------------------------------------------------------------- SyncStore - public readonly subscribe = (callback: () => void) => { - const listener = () => callback(); - this.events.on('change', listener); - return () => this.events.off('change', listener); - }; - + public readonly subscribe = (callback: () => void) => this.onChanges.listen(() => callback()); public readonly getSnapshot = () => this.view() as any; } diff --git a/src/json-crdt/model/api/NodeEvents.ts b/src/json-crdt/model/api/NodeEvents.ts index 743d378426..d7bb19d5b4 100644 --- a/src/json-crdt/model/api/NodeEvents.ts +++ b/src/json-crdt/model/api/NodeEvents.ts @@ -1,117 +1,34 @@ -import {FanOut, FanOutListener, FanOutUnsubscribe} from 'thingies/es2020/fanout'; -import {Emitter} from '../../../util/events/Emitter'; +import {FanOut} from 'thingies/es2020/fanout'; +import {MapFanOut, OnNewFanOut} from './util'; import type {JsonNode, JsonNodeView} from '../../nodes'; import type {SyncStore, SyncStoreUnsubscribe} from '../../../util/events/sync-store'; import type {NodeApi} from './nodes'; -export interface NodeEventMap { +export class NodeEvents implements SyncStore> { /** - * Fired when the node's view has changed, checked using triple equality `===`. - * - * The strict equality identity is preserved deeply equal values, even for objects - * and arrays. So, this event will not fire if there was a change to the node's - * value, but the value is still deeply equal to the previous value. - * - * This event depends on overall Model's `change` event, which is batched using - * `queueMicrotask` and fired asynchronously. So, this event will not fire - * immediately after the node's value has changed, but rather after the next - * microtask. + * Fired on any model change, even if the node's value has not changed. The + * changes are fired once per microtask, so multiple changes in the same + * microtask are batched into a single event. */ - view: CustomEvent; -} - -class ChangesFanOut extends FanOut> { - /** @ignore */ - private _v: JsonNodeView | undefined = undefined; - /** @ignore */ - private _u: FanOutUnsubscribe | undefined = undefined; - - /** @ignore */ - constructor(private readonly api: NodeApi) { - super(); - } - - public listen(listener: FanOutListener>) { - if (!this.listeners.size) { - const api = this.api; - this._v = api.view(); - this._u = api.api.changes.listen(() => { - const view = api.view(); - if (view !== this._v) { - this._v = view; - this.emit(view); - } - }); - } - const unsubscribe = super.listen(listener); - return () => { - unsubscribe(); - if (!this.listeners.size) { - this._u?.(); - this._u = this._v = undefined; - } - }; - } + public readonly onChanges: FanOut>; /** - * @internal - * @ignore + * Similar to `.onChanges`, but fired when the node's view has changed, + * checked using triple equality `===`. + * + * The strict equality identity is preserved deeply equal values, even for + * objects and arrays. So, this event will not fire if there was a change + * to the node's value, but the value is still deeply equal to the previous + * value. + * + * This event depends on overall Model's `onChanges` event, which is + * batched using `queueMicrotask`. */ - public dispose() { - this.listeners.clear(); - this._u?.(); - this._u = this._v = undefined; - } -} - -export class NodeEvents - extends Emitter - implements SyncStore> -{ - public readonly changes: ChangesFanOut; + public readonly onViewChange: FanOut>; constructor(private readonly api: NodeApi) { - super(); - this.changes = new ChangesFanOut(api); - } - - private viewSubs: Set<(ev: NodeEventMap['view']) => any> = new Set(); - - private _view: undefined | unknown = undefined; - - private onModelChange = () => { - const _view = this._view; - const view = this.api.node.view(); - const viewHasChanged = _view !== view; - this._view = view; - if (viewHasChanged) this.emit(new CustomEvent('view')); - }; - - private setupViewEvents(): void { - this._view = this.api.node.view(); - this.api.api.events.on('change', this.onModelChange); - } - - public on( - type: K, - listener: (ev: NodeEventMap[K]) => any, - options?: boolean | AddEventListenerOptions, - ): void { - if (type === 'view') this.viewSubs.add(listener); - const shouldSubscribeToModelChanges = this.viewSubs.size === 1; - if (shouldSubscribeToModelChanges) this.setupViewEvents(); - super.on(type, listener, options); - } - - public off( - type: K, - listener: (ev: NodeEventMap[K]) => any, - options?: boolean | EventListenerOptions, - ): void { - if (type === 'view') this.viewSubs.delete(listener); - const shouldUnsubscribeFromModelChanges = this.viewSubs.size === 1; - if (shouldUnsubscribeFromModelChanges) this.api.api.events.off('change', this.onModelChange); - super.off(type, listener, options); + this.onChanges = new MapFanOut(this.api.api.onChanges, this.getSnapshot); + this.onViewChange = new OnNewFanOut(this.onChanges); } /** @@ -120,15 +37,13 @@ export class NodeEvents * @internal * @ignore */ - public onDelete() { - this.changes.dispose(); + public handleDelete() { + (this.onViewChange as OnNewFanOut>).clear(); + (this.onChanges as MapFanOut).clear(); } // ---------------------------------------------------------------- SyncStore - public readonly subscribe = (callback: () => void): SyncStoreUnsubscribe => { - return this.changes.listen(() => callback()); - }; - + public readonly subscribe = (callback: () => void): SyncStoreUnsubscribe => this.onViewChange.listen(() => callback()); public readonly getSnapshot = () => this.api.view(); } diff --git a/src/json-crdt/model/api/nodes.ts b/src/json-crdt/model/api/nodes.ts index 6043d3060e..5eea7b67ec 100644 --- a/src/json-crdt/model/api/nodes.ts +++ b/src/json-crdt/model/api/nodes.ts @@ -382,6 +382,7 @@ export class StrApi extends NodeApi { */ public ins(index: number, text: string): this { const {api, node} = this; + api.onBeforeLocalChange.emit(api.next); const builder = api.builder; builder.pad(); const nextTime = api.builder.nextTime(); @@ -402,6 +403,7 @@ export class StrApi extends NodeApi { */ public del(index: number, length: number): this { const {api, node} = this; + api.onBeforeLocalChange.emit(api.next); const builder = api.builder; builder.pad(); const spans = node.findInterval(index, length); diff --git a/src/json-crdt/model/api/util.ts b/src/json-crdt/model/api/util.ts new file mode 100644 index 0000000000..5c5395a88f --- /dev/null +++ b/src/json-crdt/model/api/util.ts @@ -0,0 +1,125 @@ +import {FanOut, FanOutUnsubscribe, FanOutListener} from 'thingies/es2020/fanout'; + +/** + * Merges multiple fanouts into a single fanout. The merged fanout emits the + * same data as the source fanouts. + */ +export class MergeFanOut extends FanOut { + private unsubs: FanOutUnsubscribe[] = []; + + constructor(private readonly fanouts: FanOut[]) { + super(); + } + + public listen(listener: FanOutListener): FanOutUnsubscribe { + if (!this.listeners.size) + this.unsubs = this.fanouts.map(fanout => fanout.listen(data => this.emit(data))); + const unsub = super.listen(listener); + return () => { + unsub(); + if (!this.listeners.size) { + this.unsubs.forEach(unsub => unsub()); + this.unsubs = []; + } + }; + } +} + +/** + * Buffers data from a fanout and emits the buffered data once per microtask. + */ +export class MicrotaskBufferFanOut extends FanOut { + private buffer: I[] = []; + private unsub?: FanOutUnsubscribe = undefined; + + constructor (private readonly source: FanOut) { + super(); + } + + public listen(listener: FanOutListener): FanOutUnsubscribe { + if (!this.unsub) { + this.unsub = this.source.listen(data => { + const buffer = this.buffer; + if (!buffer.length) { + queueMicrotask(() => { + this.emit(buffer); + this.buffer = []; + }); + } + buffer.push(data); + }); + } + const unsub = super.listen(listener); + return () => { + unsub(); + if (!this.listeners.size) this.clear(); + }; + } + + public clear() { + this.listeners.clear(); + this.buffer = []; + this.unsub?.(); + this.unsub = undefined; + } +} + +/** + * Maps the data from a fanout using a mapper function. + */ +export class MapFanOut extends FanOut { + constructor(private readonly source: FanOut, private readonly mapper: (data: I) => O) { + super(); + } + + private unsub?: FanOutUnsubscribe = undefined; + + public listen(listener: FanOutListener): FanOutUnsubscribe { + if (!this.unsub) + this.unsub = this.source.listen(data => this.emit(this.mapper(data))); + const unsub = super.listen(listener); + return () => { + unsub(); + if (!this.listeners.size) this.clear(); + }; + } + + public clear() { + this.listeners.clear(); + this.unsub?.(); + this.unsub = undefined; + } +} + +/** + * Emits only when the source fanout emits a new value. The first value is + * emitted immediately. + */ +export class OnNewFanOut extends FanOut { + private last: D | undefined = undefined; + private unsub?: FanOutUnsubscribe = undefined; + + constructor(private readonly source: FanOut) { + super(); + } + + public listen(listener: FanOutListener): FanOutUnsubscribe { + if (!this.unsub) { + this.unsub = this.source.listen(data => { + if (this.last !== data) this.emit(this.last = data) + }); + } + const unsub = super.listen(listener); + return () => { + unsub(); + if (!this.listeners.size) this.clear(); + }; + } + + public clear() { + this.listeners.clear(); + this.last = undefined; + this.unsub?.(); + this.unsub = undefined; + } +} From dec27072e1316b2b4709db94e02d5da16dd5fc6b Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 29 Nov 2023 19:00:20 +0100 Subject: [PATCH 5/9] =?UTF-8?q?test(json-crdt):=20=F0=9F=92=8D=20fix=20hig?= =?UTF-8?q?h=20level=20event=20API=20tests=20after=20refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/model/Model.ts | 9 -- src/json-crdt/model/api/ModelApi.ts | 16 +--- .../api/__tests__/ModelApi.events.spec.ts | 93 +++++++++---------- 3 files changed, 43 insertions(+), 75 deletions(-) diff --git a/src/json-crdt/model/Model.ts b/src/json-crdt/model/Model.ts index 4a79408f18..c18e31ce85 100644 --- a/src/json-crdt/model/Model.ts +++ b/src/json-crdt/model/Model.ts @@ -17,15 +17,6 @@ import type {NodeApi} from './api/nodes'; export const UNDEFINED = new ConNode(ORIGIN, undefined); -export const enum ModelChangeType { - /** When operations are applied through `.applyPatch()` directly. */ - REMOTE = 0, - /** When local operations are applied through the `ModelApi`. */ - LOCAL = 1, - /** When model is reset using the `.reset()` method. */ - RESET = 2, -} - /** * In instance of Model class represents the underlying data structure, * i.e. model, of the JSON CRDT document. diff --git a/src/json-crdt/model/api/ModelApi.ts b/src/json-crdt/model/api/ModelApi.ts index b88c15c927..392b2e054f 100644 --- a/src/json-crdt/model/api/ModelApi.ts +++ b/src/json-crdt/model/api/ModelApi.ts @@ -3,25 +3,11 @@ import {VecNode, ConNode, ObjNode, ArrNode, BinNode, StrNode, ValNode} from '../ import {ApiPath, ArrApi, BinApi, ConApi, NodeApi, ObjApi, StrApi, VecApi, ValApi} from './nodes'; import {Patch} from '../../../json-crdt-patch/Patch'; import {PatchBuilder} from '../../../json-crdt-patch/PatchBuilder'; -import {ModelChangeType, type Model} from '../Model'; import {SyncStore} from '../../../util/events/sync-store'; import {MergeFanOut, MicrotaskBufferFanOut} from './util'; +import type {Model} from '../Model'; import type {JsonNode, JsonNodeView} from '../../nodes'; -export interface ModelApiEvents { - /** - * Emitted when the model changes. This event is emitted once per microtask, - * multiple changes in the same microtask are batched into a single event. The - * payload is a set of change types that occurred in the model. - */ - change: CustomEvent>; - - /** - * Emitted when the builder is flushed. The event detail is the flushed patch. - */ - flush: CustomEvent; -} - /** * Local changes API for a JSON CRDT model. This class is the main entry point * for executing local user actions on a JSON CRDT document. diff --git a/src/json-crdt/model/api/__tests__/ModelApi.events.spec.ts b/src/json-crdt/model/api/__tests__/ModelApi.events.spec.ts index 066867e27d..4a52202206 100644 --- a/src/json-crdt/model/api/__tests__/ModelApi.events.spec.ts +++ b/src/json-crdt/model/api/__tests__/ModelApi.events.spec.ts @@ -1,13 +1,14 @@ -import {Model, ModelChangeType} from '../../Model'; +import {Patch} from '../../../../json-crdt-patch'; +import {Model} from '../../Model'; -describe('DOM Level 2 events, .et.addEventListener()', () => { +describe('FanOut event API', () => { test('dispatches "change" events on document change', async () => { const doc = Model.withLogicalClock(); const api = doc.api; let cnt = 0; api.root({a: {}}); expect(cnt).toBe(0); - api.events.on('change', () => { + api.onChanges.listen(() => { cnt++; }); api.obj([]).set({gg: true}); @@ -24,7 +25,7 @@ describe('DOM Level 2 events, .et.addEventListener()', () => { let cnt = 0; api.root({a: {}}); expect(cnt).toBe(0); - api.events.on('change', () => { + api.onChanges.listen(() => { cnt++; }); api.obj([]).set({gg: true}); @@ -39,10 +40,10 @@ describe('DOM Level 2 events, .et.addEventListener()', () => { let cnt = 0; api.root({a: {}}); expect(cnt).toBe(0); - api.events.on('change', () => { + api.onChanges.listen(() => { cnt++; }); - api.events.on('change', () => { + api.onChanges.listen(() => { cnt++; }); expect(cnt).toBe(0); @@ -55,7 +56,7 @@ describe('DOM Level 2 events, .et.addEventListener()', () => { it('fires "change" event when a value is set to the same value', async () => { const model = Model.withLogicalClock(); let cnt = 0; - model.api.events.on('change', () => { + model.api.onChanges.listen(() => { cnt++; }); await Promise.resolve(); @@ -68,10 +69,10 @@ describe('DOM Level 2 events, .et.addEventListener()', () => { expect(cnt).toBe(2); }); - it('fires "change" event when a value is deleted', async () => { + it('fires change event when a value is deleted', async () => { const model = Model.withLogicalClock(); let cnt = 0; - model.api.events.on('change', () => { + model.api.onChanges.listen(() => { cnt++; }); await Promise.resolve(); @@ -85,31 +86,29 @@ describe('DOM Level 2 events, .et.addEventListener()', () => { expect(model.view()).toStrictEqual({}); }); - test('reports LOCAL change type when a value is set locally', async () => { + test('reports local change type when a value is set locally', async () => { const model = Model.withLogicalClock(); let cnt = 0; - let set: Set | undefined; - model.api.events.on('change', (event) => { + let bufferPoint: number | undefined; + model.api.onLocalChange.listen((pointer) => { cnt++; - set = event.detail; + bufferPoint = pointer; }); await Promise.resolve(); expect(cnt).toBe(0); model.api.root(123); await Promise.resolve(); expect(cnt).toBe(1); - expect(set!.has(ModelChangeType.LOCAL)).toBe(true); - expect(set!.has(ModelChangeType.REMOTE)).toBe(false); - expect(set!.has(ModelChangeType.RESET)).toBe(false); + expect(typeof bufferPoint).toBe('number'); }); - test('reports REMOTE change type when a value is set remotely', async () => { + test('reports remote change type when a value is set remotely', async () => { const model = Model.withLogicalClock(); let cnt = 0; - let set: Set | undefined; - model.api.events.on('change', (event) => { + let patchFromEvent: Patch | undefined; + model.api.onPatch.listen((p) => { cnt++; - set = event.detail; + patchFromEvent = p; }); await Promise.resolve(); expect(cnt).toBe(0); @@ -119,18 +118,16 @@ describe('DOM Level 2 events, .et.addEventListener()', () => { model.applyPatch(patch); await Promise.resolve(); expect(cnt).toBe(1); - expect(set!.has(ModelChangeType.LOCAL)).toBe(false); - expect(set!.has(ModelChangeType.REMOTE)).toBe(true); - expect(set!.has(ModelChangeType.RESET)).toBe(false); + expect(patchFromEvent).toBeInstanceOf(Patch); }); - test('reports REMOTE and LOCAL changes when both types are present', async () => { + test('reports remote and local changes when both types are present', async () => { const model = Model.withLogicalClock(); let cnt = 0; - let set: Set | undefined; - model.api.events.on('change', (event) => { + let set: any[] | undefined; + model.api.onChanges.listen((changes) => { cnt++; - set = event.detail; + set = changes; }); await Promise.resolve(); expect(cnt).toBe(0); @@ -141,18 +138,16 @@ describe('DOM Level 2 events, .et.addEventListener()', () => { model.api.root(321); await Promise.resolve(); expect(cnt).toBe(1); - expect(set!.has(ModelChangeType.LOCAL)).toBe(true); - expect(set!.has(ModelChangeType.REMOTE)).toBe(true); - expect(set!.has(ModelChangeType.RESET)).toBe(false); + expect((set as any[]).length).toBe(2); + expect((set as any[])[0]).toBeInstanceOf(Patch); + expect(typeof (set as any[])[1]).toBe('number'); }); - test('reports RESET change when model is reset', async () => { + test('reports reset change when model is reset', async () => { const model = Model.withLogicalClock(); let cnt = 0; - let set: Set | undefined; - model.api.events.on('change', (event) => { + model.api.onReset.listen(() => { cnt++; - set = event.detail; }); const model2 = Model.withLogicalClock(); model2.api.root(123); @@ -161,9 +156,6 @@ describe('DOM Level 2 events, .et.addEventListener()', () => { model.reset(model2); await Promise.resolve(); expect(cnt).toBe(1); - expect(set!.has(ModelChangeType.LOCAL)).toBe(false); - expect(set!.has(ModelChangeType.REMOTE)).toBe(false); - expect(set!.has(ModelChangeType.RESET)).toBe(true); }); test('on reset builder is flushed', async () => { @@ -177,23 +169,22 @@ describe('DOM Level 2 events, .et.addEventListener()', () => { expect(model.api.builder.patch.ops.length > 0).toBe(false); }); - test('on reset other change events are removed for the type set', async () => { + test('on reset other events are not emitted', async () => { const model = Model.withLogicalClock(); const model2 = Model.withLogicalClock(); - let cnt = 0; - let set: Set | undefined; - model.api.events.on('change', (event) => { - cnt++; - set = event.detail; - }); model2.api.root(123); model.api.root('asdf'); + let cntReset = 0; + let cntPatch = 0; + let cntLocal = 0; + model.api.onReset.listen(() => { cntReset++; }); + model.api.onPatch.listen(() => { cntPatch++; }); + model.api.onLocalChange.listen(() => { cntLocal++; }); model.reset(model2); await Promise.resolve(); - expect(cnt).toBe(1); - expect(set!.has(ModelChangeType.LOCAL)).toBe(false); - expect(set!.has(ModelChangeType.REMOTE)).toBe(false); - expect(set!.has(ModelChangeType.RESET)).toBe(true); + expect(cntReset).toBe(1); + expect(cntPatch).toBe(0); + expect(cntLocal).toBe(0); }); }); @@ -205,7 +196,7 @@ describe('fanout', () => { let cnt = 0; api.root({a: {}}); expect(cnt).toBe(0); - api.changes.listen(() => { + api.onChanges.listen(() => { cnt++; }); api.obj([]).set({gg: true}); @@ -222,10 +213,10 @@ describe('fanout', () => { let cnt = 0; api.root({a: {}}); expect(cnt).toBe(0); - api.changes.listen(() => { + api.onChanges.listen(() => { cnt++; }); - api.changes.listen(() => { + api.onChanges.listen(() => { cnt++; }); expect(cnt).toBe(0); From ca2883b8173ea54bd6d8af1382c81605e3031225 Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 29 Nov 2023 19:03:04 +0100 Subject: [PATCH 6/9] =?UTF-8?q?style(json-crdt):=20=F0=9F=92=84=20run=20Pr?= =?UTF-8?q?ettier?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model/__tests__/Model.cloning.spec.ts | 8 +++++--- src/json-crdt/model/api/ModelApi.ts | 2 +- src/json-crdt/model/api/NodeEvents.ts | 3 ++- .../api/__tests__/ModelApi.events.spec.ts | 12 +++++++++--- src/json-crdt/model/api/util.ts | 18 ++++++++---------- 5 files changed, 25 insertions(+), 18 deletions(-) diff --git a/src/json-crdt/model/__tests__/Model.cloning.spec.ts b/src/json-crdt/model/__tests__/Model.cloning.spec.ts index d4f774384c..3cc0bb1098 100644 --- a/src/json-crdt/model/__tests__/Model.cloning.spec.ts +++ b/src/json-crdt/model/__tests__/Model.cloning.spec.ts @@ -222,9 +222,11 @@ describe('reset()', () => { }); test('preserves API nodes when model is reset', async () => { - const doc1 = Model.withLogicalClock().setSchema(schema.obj({ - text: schema.str('hell'), - })); + const doc1 = Model.withLogicalClock().setSchema( + schema.obj({ + text: schema.str('hell'), + }), + ); const doc2 = doc1.fork(); doc2.s.text.toApi().ins(4, 'o'); const str = doc1.s.text.toApi(); diff --git a/src/json-crdt/model/api/ModelApi.ts b/src/json-crdt/model/api/ModelApi.ts index 392b2e054f..cf25e06935 100644 --- a/src/json-crdt/model/api/ModelApi.ts +++ b/src/json-crdt/model/api/ModelApi.ts @@ -223,7 +223,7 @@ export class ModelApi implements SyncStore implements SyncStore void): SyncStoreUnsubscribe => this.onViewChange.listen(() => callback()); + public readonly subscribe = (callback: () => void): SyncStoreUnsubscribe => + this.onViewChange.listen(() => callback()); public readonly getSnapshot = () => this.api.view(); } diff --git a/src/json-crdt/model/api/__tests__/ModelApi.events.spec.ts b/src/json-crdt/model/api/__tests__/ModelApi.events.spec.ts index 4a52202206..a3ff2c1983 100644 --- a/src/json-crdt/model/api/__tests__/ModelApi.events.spec.ts +++ b/src/json-crdt/model/api/__tests__/ModelApi.events.spec.ts @@ -177,9 +177,15 @@ describe('FanOut event API', () => { let cntReset = 0; let cntPatch = 0; let cntLocal = 0; - model.api.onReset.listen(() => { cntReset++; }); - model.api.onPatch.listen(() => { cntPatch++; }); - model.api.onLocalChange.listen(() => { cntLocal++; }); + model.api.onReset.listen(() => { + cntReset++; + }); + model.api.onPatch.listen(() => { + cntPatch++; + }); + model.api.onLocalChange.listen(() => { + cntLocal++; + }); model.reset(model2); await Promise.resolve(); expect(cntReset).toBe(1); diff --git a/src/json-crdt/model/api/util.ts b/src/json-crdt/model/api/util.ts index 5c5395a88f..0f48f1c9d8 100644 --- a/src/json-crdt/model/api/util.ts +++ b/src/json-crdt/model/api/util.ts @@ -12,13 +12,12 @@ export class MergeFanOut extends FanOut { } public listen(listener: FanOutListener): FanOutUnsubscribe { - if (!this.listeners.size) - this.unsubs = this.fanouts.map(fanout => fanout.listen(data => this.emit(data))); + if (!this.listeners.size) this.unsubs = this.fanouts.map((fanout) => fanout.listen((data) => this.emit(data))); const unsub = super.listen(listener); return () => { unsub(); if (!this.listeners.size) { - this.unsubs.forEach(unsub => unsub()); + this.unsubs.forEach((unsub) => unsub()); this.unsubs = []; } }; @@ -32,20 +31,20 @@ export class MicrotaskBufferFanOut extends FanOut { private buffer: I[] = []; private unsub?: FanOutUnsubscribe = undefined; - constructor (private readonly source: FanOut) { + constructor(private readonly source: FanOut) { super(); } public listen(listener: FanOutListener): FanOutUnsubscribe { if (!this.unsub) { - this.unsub = this.source.listen(data => { + this.unsub = this.source.listen((data) => { const buffer = this.buffer; if (!buffer.length) { queueMicrotask(() => { this.emit(buffer); this.buffer = []; }); - } + } buffer.push(data); }); } @@ -75,8 +74,7 @@ export class MapFanOut extends FanOut { private unsub?: FanOutUnsubscribe = undefined; public listen(listener: FanOutListener): FanOutUnsubscribe { - if (!this.unsub) - this.unsub = this.source.listen(data => this.emit(this.mapper(data))); + if (!this.unsub) this.unsub = this.source.listen((data) => this.emit(this.mapper(data))); const unsub = super.listen(listener); return () => { unsub(); @@ -105,8 +103,8 @@ export class OnNewFanOut extends FanOut { public listen(listener: FanOutListener): FanOutUnsubscribe { if (!this.unsub) { - this.unsub = this.source.listen(data => { - if (this.last !== data) this.emit(this.last = data) + this.unsub = this.source.listen((data) => { + if (this.last !== data) this.emit((this.last = data)); }); } const unsub = super.listen(listener); From 3393e03d3009fc20e894a1902ed8582200d3a11e Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 29 Nov 2023 19:14:47 +0100 Subject: [PATCH 7/9] =?UTF-8?q?test(json-crdt):=20=F0=9F=92=8D=20fix=20all?= =?UTF-8?q?=20tests=20after=20refactor?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/__demos__/events.ts | 2 +- .../model/__tests__/Model.cloning.spec.ts | 2 +- .../model/__tests__/Model.events.spec.ts | 31 +++++-------------- src/json-crdt/model/api/NodeEvents.ts | 8 ++--- .../model/api/__tests__/NodeEvents.spec.ts | 5 +-- .../model/api/__tests__/StrApi.spec.ts | 8 ++--- .../model/api/__tests__/VecApi.spec.ts | 10 +++--- 7 files changed, 25 insertions(+), 41 deletions(-) diff --git a/src/json-crdt/__demos__/events.ts b/src/json-crdt/__demos__/events.ts index 19cd1cc2ff..ed699bb84a 100644 --- a/src/json-crdt/__demos__/events.ts +++ b/src/json-crdt/__demos__/events.ts @@ -25,7 +25,7 @@ const model = Model.withLogicalClock(1234); // 1234 is the session ID // DOM Level 2 node events const root = model.api.r; -root.events.onViewChange.listen(() => { +root.events.onViewChanges.listen(() => { console.log('Root value changed'); }); diff --git a/src/json-crdt/model/__tests__/Model.cloning.spec.ts b/src/json-crdt/model/__tests__/Model.cloning.spec.ts index 3cc0bb1098..443a478c66 100644 --- a/src/json-crdt/model/__tests__/Model.cloning.spec.ts +++ b/src/json-crdt/model/__tests__/Model.cloning.spec.ts @@ -215,7 +215,7 @@ describe('reset()', () => { }); doc2.api.str(['text']).ins(5, ' world'); let cnt = 0; - doc2.api.events.on('change', () => cnt++); + doc2.api.onChanges.listen(() => cnt++); doc2.reset(doc1); await until(() => cnt > 0); expect(cnt).toBe(1); diff --git a/src/json-crdt/model/__tests__/Model.events.spec.ts b/src/json-crdt/model/__tests__/Model.events.spec.ts index 939e8bcb13..02d5fd5e1d 100644 --- a/src/json-crdt/model/__tests__/Model.events.spec.ts +++ b/src/json-crdt/model/__tests__/Model.events.spec.ts @@ -1,11 +1,11 @@ import {PatchBuilder} from '../../../json-crdt-patch'; -import {Model, ModelChangeType} from '../Model'; +import {Model} from '../Model'; describe('DOM Level 0, .onchange event system', () => { it('should trigger the onchange event when a value is set', () => { const model = Model.withLogicalClock(); let cnt = 0; - model.onchange = () => { + model.onpatch = () => { cnt++; }; expect(cnt).toBe(0); @@ -26,7 +26,7 @@ describe('DOM Level 0, .onchange event system', () => { it('should trigger the onchange event when a value is set to the same value', () => { const model = Model.withLogicalClock(); let cnt = 0; - model.onchange = () => { + model.onpatch = () => { cnt++; }; expect(cnt).toBe(0); @@ -42,7 +42,7 @@ describe('DOM Level 0, .onchange event system', () => { it('should trigger the onchange event when a value is deleted', () => { const model = Model.withLogicalClock(); let cnt = 0; - model.onchange = () => { + model.onpatch = () => { cnt++; }; expect(cnt).toBe(0); @@ -60,7 +60,7 @@ describe('DOM Level 0, .onchange event system', () => { it('should trigger the onchange event when a non-existent value is deleted', () => { const model = Model.withLogicalClock(); let cnt = 0; - model.onchange = () => { + model.onpatch = () => { cnt++; }; expect(cnt).toBe(0); @@ -78,7 +78,7 @@ describe('DOM Level 0, .onchange event system', () => { it('should trigger when root value is changed', () => { const model = Model.withLogicalClock(); let cnt = 0; - model.onchange = () => { + model.onpatch = () => { cnt++; }; expect(cnt).toBe(0); @@ -96,29 +96,12 @@ describe('DOM Level 0, .onchange event system', () => { }); describe('event types', () => { - it('should trigger the onchange event with a REMOTE event type', () => { - const model = Model.withLogicalClock(); - let cnt = 0; - model.onchange = (type) => { - expect(type).toBe(ModelChangeType.REMOTE); - cnt++; - }; - const builder = new PatchBuilder(model.clock.clone()); - builder.root(builder.json({foo: 123})); - const patch = builder.flush(); - expect(cnt).toBe(0); - model.applyPatch(patch); - expect(cnt).toBe(1); - expect(model.view()).toStrictEqual({foo: 123}); - }); - it('should trigger the onchange event with a RESET event type', () => { const model1 = Model.withLogicalClock(); const model2 = Model.withLogicalClock(); model2.api.root([1, 2, 3]); let cnt = 0; - model1.onchange = (type) => { - expect(type).toBe(ModelChangeType.RESET); + model1.onreset = () => { cnt++; }; expect(cnt).toBe(0); diff --git a/src/json-crdt/model/api/NodeEvents.ts b/src/json-crdt/model/api/NodeEvents.ts index 6b4581e61d..2e8d5983aa 100644 --- a/src/json-crdt/model/api/NodeEvents.ts +++ b/src/json-crdt/model/api/NodeEvents.ts @@ -24,11 +24,11 @@ export class NodeEvents implements SyncStore>; + public readonly onViewChanges: FanOut>; constructor(private readonly api: NodeApi) { this.onChanges = new MapFanOut(this.api.api.onChanges, this.getSnapshot); - this.onViewChange = new OnNewFanOut(this.onChanges); + this.onViewChanges = new OnNewFanOut(this.onChanges); } /** @@ -38,13 +38,13 @@ export class NodeEvents implements SyncStore>).clear(); + (this.onViewChanges as OnNewFanOut>).clear(); (this.onChanges as MapFanOut).clear(); } // ---------------------------------------------------------------- SyncStore public readonly subscribe = (callback: () => void): SyncStoreUnsubscribe => - this.onViewChange.listen(() => callback()); + this.onViewChanges.listen(() => callback()); public readonly getSnapshot = () => this.api.view(); } diff --git a/src/json-crdt/model/api/__tests__/NodeEvents.spec.ts b/src/json-crdt/model/api/__tests__/NodeEvents.spec.ts index 2ce7fc3d19..4ea18b3a55 100644 --- a/src/json-crdt/model/api/__tests__/NodeEvents.spec.ts +++ b/src/json-crdt/model/api/__tests__/NodeEvents.spec.ts @@ -1,6 +1,6 @@ import {Model} from '../..'; -test('does not fire events after node is deleted', () => { +test('does not fire events after node is deleted', async () => { const model = Model.withLogicalClock(); model.api.root({ foo: { @@ -11,13 +11,14 @@ test('does not fire events after node is deleted', () => { }); const bar = model.api.obj(['foo', 'bar']); let cnt = 0; - bar.events.changes.listen(() => { + bar.events.onViewChanges.listen(() => { cnt++; }); expect(cnt).toBe(0); bar.set({ gg: 'wp', }); + await Promise.resolve(); expect(cnt).toBe(1); model.api.obj(['foo']).del(['bar']); model.api.obj(['foo']).set({gl: 'hf'}); diff --git a/src/json-crdt/model/api/__tests__/StrApi.spec.ts b/src/json-crdt/model/api/__tests__/StrApi.spec.ts index e871d3140e..d88c7ff3e7 100644 --- a/src/json-crdt/model/api/__tests__/StrApi.spec.ts +++ b/src/json-crdt/model/api/__tests__/StrApi.spec.ts @@ -75,14 +75,14 @@ describe('events', () => { const onView = () => cnt++; str.ins(0, 'aaa'); expect(cnt).toEqual(0); - str.events.on('view', onView); + const unsub = str.events.onViewChanges.listen(onView); str.ins(0, 'bbb'); await Promise.resolve(); expect(cnt).toEqual(1); str.ins(0, 'ccc'); await Promise.resolve(); expect(cnt).toEqual(2); - str.events.off('view', onView); + unsub(); str.del(1, 7); expect(cnt).toEqual(2); }); @@ -96,7 +96,7 @@ describe('events', () => { const onChange = () => cnt++; str.ins(0, 'aaa'); expect(cnt).toEqual(0); - str.events.on('view', onChange); + str.events.onViewChanges.listen(onChange); str.ins(0, 'bbb'); str.ins(0, 'ccc'); str.del(1, 7); @@ -114,7 +114,7 @@ describe('events', () => { const onView = () => cnt++; str.ins(0, 'aaa'); expect(cnt).toEqual(0); - const unsubscribe = str.events.changes.listen(onView); + const unsubscribe = str.events.onViewChanges.listen(onView); str.ins(0, 'bbb'); await Promise.resolve(); expect(cnt).toEqual(1); diff --git a/src/json-crdt/model/api/__tests__/VecApi.spec.ts b/src/json-crdt/model/api/__tests__/VecApi.spec.ts index adba25fb01..daaffd4b4b 100644 --- a/src/json-crdt/model/api/__tests__/VecApi.spec.ts +++ b/src/json-crdt/model/api/__tests__/VecApi.spec.ts @@ -49,7 +49,7 @@ describe('events', () => { let cnt = 0; const onView = () => cnt++; const tuple = api.vec([]); - tuple.events.on('view', onView); + tuple.events.onViewChanges.listen(onView); expect(cnt).toBe(0); tuple.set([[0, 1.5]]); await Promise.resolve(); @@ -69,7 +69,7 @@ describe('events', () => { let cnt = 0; const onView = () => cnt++; const tuple = api.vec([]); - tuple.events.on('view', onView); + tuple.events.onViewChanges.listen(onView); expect(cnt).toBe(0); tuple.set([[0, 1.5]]); await Promise.resolve(); @@ -92,7 +92,7 @@ describe('events', () => { let cnt = 0; const onView = () => cnt++; const tuple = api.vec([]); - tuple.events.on('view', onView); + const unsub = tuple.events.onViewChanges.listen(onView); expect(cnt).toBe(0); tuple.set([[0, 1.5]]); await Promise.resolve(); @@ -100,7 +100,7 @@ describe('events', () => { tuple.set([[0, 2.5]]); await Promise.resolve(); expect(cnt).toBe(2); - tuple.events.off('view', onView); + unsub(); tuple.set([[0, 3.5]]); await Promise.resolve(); expect(cnt).toBe(2); @@ -113,7 +113,7 @@ describe('events', () => { let cnt = 0; const onView = () => cnt++; const tuple = api.vec([]); - tuple.events.on('view', onView); + tuple.events.onViewChanges.listen(onView); expect(cnt).toBe(0); tuple.set([[0, 1.5]]); tuple.set([[1, 44]]); From 36a2062f5e1a4007106bb4aab1a409ee2af5606d Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 29 Nov 2023 19:20:49 +0100 Subject: [PATCH 8/9] =?UTF-8?q?refactor(json-crdt):=20=F0=9F=92=A1=20renam?= =?UTF-8?q?e=20fanout=20utility=20file?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/json-crdt/model/api/ModelApi.ts | 2 +- src/json-crdt/model/api/NodeEvents.ts | 2 +- src/json-crdt/model/api/{util.ts => fanout.ts} | 0 3 files changed, 2 insertions(+), 2 deletions(-) rename src/json-crdt/model/api/{util.ts => fanout.ts} (100%) diff --git a/src/json-crdt/model/api/ModelApi.ts b/src/json-crdt/model/api/ModelApi.ts index cf25e06935..97b52b1060 100644 --- a/src/json-crdt/model/api/ModelApi.ts +++ b/src/json-crdt/model/api/ModelApi.ts @@ -4,7 +4,7 @@ import {ApiPath, ArrApi, BinApi, ConApi, NodeApi, ObjApi, StrApi, VecApi, ValApi import {Patch} from '../../../json-crdt-patch/Patch'; import {PatchBuilder} from '../../../json-crdt-patch/PatchBuilder'; import {SyncStore} from '../../../util/events/sync-store'; -import {MergeFanOut, MicrotaskBufferFanOut} from './util'; +import {MergeFanOut, MicrotaskBufferFanOut} from './fanout'; import type {Model} from '../Model'; import type {JsonNode, JsonNodeView} from '../../nodes'; diff --git a/src/json-crdt/model/api/NodeEvents.ts b/src/json-crdt/model/api/NodeEvents.ts index 2e8d5983aa..c82d1f5d39 100644 --- a/src/json-crdt/model/api/NodeEvents.ts +++ b/src/json-crdt/model/api/NodeEvents.ts @@ -1,5 +1,5 @@ import {FanOut} from 'thingies/es2020/fanout'; -import {MapFanOut, OnNewFanOut} from './util'; +import {MapFanOut, OnNewFanOut} from './fanout'; import type {JsonNode, JsonNodeView} from '../../nodes'; import type {SyncStore, SyncStoreUnsubscribe} from '../../../util/events/sync-store'; import type {NodeApi} from './nodes'; diff --git a/src/json-crdt/model/api/util.ts b/src/json-crdt/model/api/fanout.ts similarity index 100% rename from src/json-crdt/model/api/util.ts rename to src/json-crdt/model/api/fanout.ts From 6a1768eede78bdfc05b961f93e5f28b9eb05352b Mon Sep 17 00:00:00 2001 From: streamich Date: Wed, 29 Nov 2023 19:30:32 +0100 Subject: [PATCH 9/9] =?UTF-8?q?test(json-crdt):=20=F0=9F=92=8D=20add=20fan?= =?UTF-8?q?out=20tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../model/api/__tests__/fanout.spec.ts | 114 ++++++++++++++++++ 1 file changed, 114 insertions(+) create mode 100644 src/json-crdt/model/api/__tests__/fanout.spec.ts diff --git a/src/json-crdt/model/api/__tests__/fanout.spec.ts b/src/json-crdt/model/api/__tests__/fanout.spec.ts new file mode 100644 index 0000000000..da6ea6cf93 --- /dev/null +++ b/src/json-crdt/model/api/__tests__/fanout.spec.ts @@ -0,0 +1,114 @@ +import {MapFanOut, MergeFanOut, MicrotaskBufferFanOut, OnNewFanOut} from '../fanout'; +import {FanOut} from 'thingies/es2020/fanout'; + +describe('MergeFanOut', () => { + test('merges data from multiple fanouts', () => { + const fanout1 = new FanOut(); + const fanout2 = new FanOut(); + const fanout3 = new FanOut(); + const merged = new MergeFanOut([fanout1, fanout2, fanout3]); + const data: number[] = []; + const unsub = merged.listen((d) => data.push(d)); + fanout1.emit(1); + fanout2.emit(2); + fanout3.emit(3); + expect(data).toEqual([1, 2, 3]); + unsub(); + fanout1.emit(1); + fanout2.emit(2); + fanout3.emit(3); + expect(data).toEqual([1, 2, 3]); + merged.listen((d) => data.push(d)); + fanout3.emit(3); + fanout2.emit(2); + fanout1.emit(1); + expect(data).toEqual([1, 2, 3, 3, 2, 1]); + }); +}); + +describe('MicrotaskBufferFanOut', () => { + test('buffers multiple emissions by microtask', async () => { + const source = new FanOut(); + const merged = new MicrotaskBufferFanOut(source); + const data: number[][] = []; + const unsub = merged.listen((d) => data.push(d)); + expect(data.length).toBe(0); + source.emit(1); + source.emit(2); + source.emit(3); + expect(data.length).toBe(0); + await Promise.resolve(); + expect(data).toEqual([[1, 2, 3]]); + source.emit(1); + source.emit(2); + source.emit(3); + unsub(); + source.emit(1); + source.emit(2); + source.emit(3); + await Promise.resolve(); + expect(data).toEqual([[1, 2, 3]]); + merged.listen((d) => data.push(d)); + source.emit(1); + source.emit(2); + source.emit(3); + await Promise.resolve(); + expect(data).toEqual([ + [1, 2, 3], + [1, 2, 3], + ]); + }); +}); + +describe('MapFanOut', () => { + test('can multiply all values by 2x', () => { + const source = new FanOut(); + const merged = new MapFanOut(source, (x) => x * 2); + const data: number[] = []; + const unsub = merged.listen((d) => data.push(d)); + expect(data.length).toBe(0); + source.emit(1); + source.emit(2); + source.emit(3); + expect(data.length).toBe(3); + expect(data).toEqual([2, 4, 6]); + unsub(); + source.emit(1); + source.emit(2); + source.emit(3); + expect(data.length).toBe(3); + merged.listen((d) => data.push(d)); + source.emit(4); + expect(data).toEqual([2, 4, 6, 8]); + }); +}); + +describe('OnNewFanOut', () => { + test('emits only new value changes', () => { + const obj1 = {}; + const obj2 = {}; + const source = new FanOut(); + const merged = new OnNewFanOut(source); + const data: any[] = []; + const unsub = merged.listen((d) => data.push(d)); + expect(data.length).toBe(0); + source.emit(obj1); + expect(data.length).toBe(1); + expect(data).toEqual([obj1]); + source.emit(obj1); + expect(data.length).toBe(1); + source.emit(obj2); + source.emit(obj2); + expect(data.length).toBe(2); + expect(data).toEqual([obj1, obj2]); + unsub(); + source.emit(obj1); + source.emit(obj2); + expect(data.length).toBe(2); + expect(data).toEqual([obj1, obj2]); + merged.listen((d) => data.push(d)); + source.emit(obj2); + expect(data.length).toBe(3); + expect(data).toEqual([obj1, obj2, obj2]); + }); +});