diff --git a/src/json-crdt/log/Log.ts b/src/json-crdt/log/Log.ts index 72169a822e..2b968a35a4 100644 --- a/src/json-crdt/log/Log.ts +++ b/src/json-crdt/log/Log.ts @@ -1,7 +1,8 @@ import {AvlMap} from 'sonic-forest/lib/avl/AvlMap'; -import {first, next} from 'sonic-forest/lib/util'; +import {first, next, prev} from 'sonic-forest/lib/util'; import {printTree} from 'tree-dump/lib/printTree'; import {listToUint8} from '@jsonjoy.com/util/lib/buffers/concat'; +import {cloneBinary} from '@jsonjoy.com/util/lib/json-clone/cloneBinary'; import {Model} from '../model'; import {toSchema} from '../schema/toSchema'; import { @@ -77,7 +78,7 @@ export class Log, Metadata extends Record(compare); + public patches = new AvlMap(compare); private __onPatch: FanOutUnsubscribe; private __onFlush: FanOutUnsubscribe; @@ -124,7 +125,6 @@ export class Log, Metadata extends Record, Metadata extends Record { + const start = this.start; + const metadata = cloneBinary(this.metadata) as Metadata; + const end = this.end.clone(); + const log = new Log(start, end, metadata); + for (const {v} of this.patches.entries()) { + const patch = v.clone(); + const id = patch.getId(); + if (!id) continue; + log.patches.set(id, patch); + } + return log; + } + + // /** + // * Adds a batch of patches to the log, without applying them to the `end` + // * model. It is assumed that the patches are already applied to the `end` + // * model, this method only adds them to the internal patch collection. + // * + // * If you need to apply patches to the `end` model, use `end.applyBatch(batch)`, + // * it will apply them to the model and add them to the log automatically. + // * + // * @param batch Array of patches to add to the log. + // */ + // public add(batch: Patch[]): void { + // const patches = this.patches; + // for (const patch of batch) { + // const id = patch.getId(); + // if (id) patches.set(id, patch); + // } + // } + + /** + * Rebase a batch of patches on top of the current end of the log, or on top + * of the latest patch for a given session ID. + * + * @param batch A batch of patches to rebase. + * @param sid Session ID to find the latest patch for rebasing. If not provided, + * the latest patch in the log is used. + * @returns The rebased patches. + */ + public rebaseBatch(batch: Patch[], sid?: number): Patch[] { + const rebasePatch = sid ? this.findMax(sid) : this.patches.max?.v; + if (!rebasePatch) return batch; + const rebaseId = rebasePatch.getId(); + if (!rebaseId) return batch; + let nextTime = rebaseId.time + rebasePatch.span(); + const rebased: Patch[] = []; + const length = batch.length; + for (let i = 0; i < length; i++) { + const patch = batch[i].rebase(nextTime); + nextTime += patch.span(); + rebased.push(patch); + } + return rebased; + } + + /** + * Resets the log to the state of another log. Consumes all state fron the `to` + * log. The `to` log will be destroyed and should not be used after calling + * this method. + * + * If you want to preserve the `to` log, use `.clone()` method first. + * + * ```ts + * const log1 = new Log(); + * const log2 = new Log(); + * log1.reset(log2.clone()); + * ``` + * + * @param to The log to consume the state from. + */ + public reset(to: Log): void { + this.start = to.start; + this.metadata = to.metadata; + this.patches = to.patches; + this.end.reset(to.end); + to.destroy(); + } + /** * Creates a patch which reverts the given patch. The RGA insertion operations * are reversed just by deleting the inserted values. All other operations diff --git a/src/json-crdt/log/__tests__/Log.spec.ts b/src/json-crdt/log/__tests__/Log.spec.ts index 9407dd063a..d386a59e0b 100644 --- a/src/json-crdt/log/__tests__/Log.spec.ts +++ b/src/json-crdt/log/__tests__/Log.spec.ts @@ -1,3 +1,4 @@ +import {deepEqual} from '@jsonjoy.com/util/lib/json-equal/deepEqual'; import {type DelOp, type InsStrOp, s} from '../../../json-crdt-patch'; import {Model} from '../../model'; import {Log} from '../Log'; @@ -119,6 +120,184 @@ describe('.advanceTo()', () => { }); }); +describe('.findMax()', () => { + test('can advance the log from start', () => { + const model = Model.create(); + const sid0 = model.clock.sid; + const sid1 = Model.sid(); + model.api.set({foo: 'bar'}); + const log = Log.fromNewModel(model); + log.end.api.obj([]).set({x: 1}); + const patch1 = log.end.api.flush(); + log.end.setSid(sid1); + log.end.api.obj([]).set({y: 2}); + const patch2 = log.end.api.flush(); + log.end.setSid(sid0); + log.end.api.obj([]).set({foo: 'baz'}); + const patch3 = log.end.api.flush(); + const found0 = log.findMax(sid0); + const found1 = log.findMax(sid1); + const found2 = log.findMax(12345); + expect(found0).toBe(patch3); + expect(found1).toBe(patch2); + expect(found2).toBe(void 0); + }); +}); + +const setupTwoLogs = () => { + const model = Model.create({foo: 'bar'}); + const log1 = Log.fromNewModel(model); + log1.metadata = {time: 123}; + log1.end.api.obj([]).set({x: 1}); + log1.end.api.flush(); + log1.end.api.obj([]).set({y: 2}); + log1.end.api.flush(); + log1.end.api.obj([]).set({foo: 'baz'}); + log1.end.api.flush(); + const log2 = log1.clone(); + return {log1, log2}; +}; + +const assertLogsEqual = (log1: Log, log2: Log) => { + expect(log1.start()).not.toBe(log2.start()); + expect(deepEqual(log1.start().view(), log2.start().view())).toBe(true); + expect(log1.start().clock.sid).toEqual(log2.start().clock.sid); + expect(log1.start().clock.time).toEqual(log2.start().clock.time); + expect(log1.end).not.toBe(log2.end); + expect(deepEqual(log1.end.view(), log2.end.view())).toBe(true); + expect(log1.end.clock.sid).toEqual(log2.end.clock.sid); + expect(log1.end.clock.time).toEqual(log2.end.clock.time); + expect(log1.metadata).not.toBe(log2.metadata); + expect(deepEqual(log1.metadata, log2.metadata)).toBe(true); + expect(log1.patches.size()).toBe(log2.patches.size()); + expect(log1.patches.min!.v.toBinary()).toEqual(log2.patches.min!.v.toBinary()); + expect(log1.patches.max!.v.toBinary()).toEqual(log2.patches.max!.v.toBinary()); + expect(log1.patches.min!.v).not.toBe(log2.patches.min!.v); + expect(log1.patches.max!.v).not.toBe(log2.patches.max!.v); +}; + +describe('.clone()', () => { + test('start model has the same view and clock', () => { + const {log1, log2} = setupTwoLogs(); + expect(log1.start()).not.toBe(log2.start()); + expect(deepEqual(log1.start().view(), log2.start().view())).toBe(true); + expect(log1.start().clock.sid).toEqual(log2.start().clock.sid); + expect(log1.start().clock.time).toEqual(log2.start().clock.time); + }); + + test('end model has the same view and clock', () => { + const {log1, log2} = setupTwoLogs(); + expect(log1.end).not.toBe(log2.end); + expect(deepEqual(log1.end.view(), log2.end.view())).toBe(true); + expect(log1.end.clock.sid).toEqual(log2.end.clock.sid); + expect(log1.end.clock.time).toEqual(log2.end.clock.time); + }); + + test('metadata is the same but has different identity', () => { + const {log1, log2} = setupTwoLogs(); + expect(log1.metadata).not.toBe(log2.metadata); + expect(deepEqual(log1.metadata, log2.metadata)).toBe(true); + }); + + test('patch log is the same', () => { + const {log1, log2} = setupTwoLogs(); + expect(log1.patches.size()).toBe(log2.patches.size()); + expect(log1.patches.min!.v.toBinary()).toEqual(log2.patches.min!.v.toBinary()); + expect(log1.patches.max!.v.toBinary()).toEqual(log2.patches.max!.v.toBinary()); + expect(log1.patches.min!.v).not.toBe(log2.patches.min!.v); + expect(log1.patches.max!.v).not.toBe(log2.patches.max!.v); + }); + + test('can evolve logs independently', () => { + const {log1, log2} = setupTwoLogs(); + assertLogsEqual(log1, log2); + log1.end.api.obj([]).set({a: 1}); + log1.end.api.flush(); + expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1}); + expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2}); + log2.end.api.obj([]).set({b: 2}); + log2.end.api.flush(); + expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1}); + expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 2}); + }); +}); + +describe('.rebaseBatch()', () => { + test('can rebase a concurrent batch onto another log', () => { + const {log1, log2} = setupTwoLogs(); + log1.end.api.obj([]).set({a: 1}); + log2.end.api.obj([]).set({b: 2}); + const patch1 = log1.end.api.flush(); + const patch2 = log2.end.api.flush(); + expect(patch1.toBinary()).not.toEqual(patch2.toBinary()); + expect(patch1.getId()?.sid).toBe(patch2.getId()?.sid); + expect(patch1.getId()?.time).toBe(patch2.getId()?.time); + expect(patch1.span()).toEqual(patch2.span()); + const [patch3] = log1.rebaseBatch([patch2]); + expect(patch1.toBinary()).not.toEqual(patch3.toBinary()); + expect(patch1.getId()?.sid).toBe(patch3.getId()?.sid); + expect(patch1.getId()!.time + patch1.span()).toBe(patch3.getId()?.time); + log1.end.applyPatch(patch3); + expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1, b: 2}); + expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 2}); + expect(() => assertLogsEqual(log1, log2)).toThrow(); + log2.reset(log1.clone()); + assertLogsEqual(log1, log2); + }); + + test('can rebase a concurrent batch onto another log (multiple patches)', () => { + const {log1, log2} = setupTwoLogs(); + log1.end.api.obj([]).set({a: 1}); + log2.end.api.obj([]).set({b: 2}); + log1.end.api.flush(); + const patch2 = log2.end.api.flush(); + log1.end.api.obj([]).set({a: 2}); + log2.end.api.obj([]).set({b: 3}); + log1.end.api.flush(); + const patch4 = log2.end.api.flush(); + log2.end.api.obj([]).set({b: 3}); + const patch5 = log2.end.api.flush(); + const batch2 = [patch2, patch4, patch5]; + expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2}); + expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3}); + const batch3 = log1.rebaseBatch(batch2); + expect(batch3[0].getId()!.time).toBe(log1.end.clock.time); + log1.end.applyBatch(batch3); + expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2, b: 3}); + expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3}); + expect(() => assertLogsEqual(log1, log2)).toThrow(); + log2.reset(log1.clone()); + assertLogsEqual(log1, log2); + }); + + test('can specify rebase sid', () => { + const {log1, log2} = setupTwoLogs(); + expect(log1.end.clock.sid).toBe(log2.end.clock.sid); + log1.end.api.obj([]).set({a: 1}); + log2.end.api.obj([]).set({b: 2}); + log1.end.api.flush(); + const patch2 = log2.end.api.flush(); + log1.end.setSid(12345); + log1.end.api.obj([]).set({a: 2}); + log2.end.api.obj([]).set({b: 3}); + log1.end.api.flush(); + const patch4 = log2.end.api.flush(); + log2.end.api.obj([]).set({b: 3}); + const patch5 = log2.end.api.flush(); + const batch2 = [patch2, patch4, patch5]; + expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2}); + expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3}); + const batch3 = log1.rebaseBatch(batch2, log2.end.clock.sid); + expect(batch3[0].getId()!.time).not.toBe(log1.end.clock.time); + log1.end.applyBatch(batch3); + expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2, b: 3}); + expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3}); + expect(() => assertLogsEqual(log1, log2)).toThrow(); + log2.reset(log1.clone()); + assertLogsEqual(log1, log2); + }); +}); + describe('.undo()', () => { describe('RGA', () => { describe('str', () => {