Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 103 additions & 3 deletions src/json-crdt/log/Log.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import {AvlMap} from 'sonic-forest/lib/avl/AvlMap';
import {first, next} from 'sonic-forest/lib/util';
import {first, next, prev} from 'sonic-forest/lib/util';
import {printTree} from 'tree-dump/lib/printTree';
import {listToUint8} from '@jsonjoy.com/util/lib/buffers/concat';
import {cloneBinary} from '@jsonjoy.com/util/lib/json-clone/cloneBinary';
import {Model} from '../model';
import {toSchema} from '../schema/toSchema';
import {
Expand Down Expand Up @@ -77,7 +78,7 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
*
* @readonly
*/
public readonly patches = new AvlMap<ITimestampStruct, Patch>(compare);
public patches = new AvlMap<ITimestampStruct, Patch>(compare);

private __onPatch: FanOutUnsubscribe;
private __onFlush: FanOutUnsubscribe;
Expand Down Expand Up @@ -124,7 +125,6 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
public destroy() {
this.__onPatch();
this.__onFlush();
this.patches.clear();
}

/**
Expand Down Expand Up @@ -182,6 +182,106 @@ export class Log<N extends JsonNode = JsonNode<any>, Metadata extends Record<str
};
}

/**
* Finds the latest patch for a given session ID.
*
* @param sid Session ID to find the latest patch for.
* @return The latest patch for the given session ID, or `undefined` if no
* such patch exists.
*/
public findMax(sid: number): Patch | undefined {
let curr = this.patches.max;
while (curr) {
if (curr.k.sid === sid) return curr.v;
curr = prev(curr);
}
return;
}

/**
* @returns A deep clone of the log, including the start function, metadata,
* patches, and the end model.
*/
public clone(): Log<N, Metadata> {
const start = this.start;
const metadata = cloneBinary(this.metadata) as Metadata;
const end = this.end.clone();
const log = new Log(start, end, metadata);
for (const {v} of this.patches.entries()) {
const patch = v.clone();
const id = patch.getId();
if (!id) continue;
log.patches.set(id, patch);
}
return log;
}

// /**
// * Adds a batch of patches to the log, without applying them to the `end`
// * model. It is assumed that the patches are already applied to the `end`
// * model, this method only adds them to the internal patch collection.
// *
// * If you need to apply patches to the `end` model, use `end.applyBatch(batch)`,
// * it will apply them to the model and add them to the log automatically.
// *
// * @param batch Array of patches to add to the log.
// */
// public add(batch: Patch[]): void {
// const patches = this.patches;
// for (const patch of batch) {
// const id = patch.getId();
// if (id) patches.set(id, patch);
// }
// }

/**
* Rebase a batch of patches on top of the current end of the log, or on top
* of the latest patch for a given session ID.
*
* @param batch A batch of patches to rebase.
* @param sid Session ID to find the latest patch for rebasing. If not provided,
* the latest patch in the log is used.
* @returns The rebased patches.
*/
public rebaseBatch(batch: Patch[], sid?: number): Patch[] {
const rebasePatch = sid ? this.findMax(sid) : this.patches.max?.v;
if (!rebasePatch) return batch;
const rebaseId = rebasePatch.getId();
if (!rebaseId) return batch;
let nextTime = rebaseId.time + rebasePatch.span();
const rebased: Patch[] = [];
const length = batch.length;
for (let i = 0; i < length; i++) {
const patch = batch[i].rebase(nextTime);
nextTime += patch.span();
rebased.push(patch);
}
return rebased;
}

/**
* Resets the log to the state of another log. Consumes all state fron the `to`
* log. The `to` log will be destroyed and should not be used after calling
* this method.
*
* If you want to preserve the `to` log, use `.clone()` method first.
*
* ```ts
* const log1 = new Log();
* const log2 = new Log();
* log1.reset(log2.clone());
* ```
*
* @param to The log to consume the state from.
*/
public reset(to: Log<N, Metadata>): void {
this.start = to.start;
this.metadata = to.metadata;
this.patches = to.patches;
this.end.reset(to.end);
to.destroy();
}

/**
* Creates a patch which reverts the given patch. The RGA insertion operations
* are reversed just by deleting the inserted values. All other operations
Expand Down
179 changes: 179 additions & 0 deletions src/json-crdt/log/__tests__/Log.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import {deepEqual} from '@jsonjoy.com/util/lib/json-equal/deepEqual';
import {type DelOp, type InsStrOp, s} from '../../../json-crdt-patch';
import {Model} from '../../model';
import {Log} from '../Log';
Expand Down Expand Up @@ -119,6 +120,184 @@ describe('.advanceTo()', () => {
});
});

describe('.findMax()', () => {
test('can advance the log from start', () => {
const model = Model.create();
const sid0 = model.clock.sid;
const sid1 = Model.sid();
model.api.set({foo: 'bar'});
const log = Log.fromNewModel(model);
log.end.api.obj([]).set({x: 1});
const patch1 = log.end.api.flush();
log.end.setSid(sid1);
log.end.api.obj([]).set({y: 2});
const patch2 = log.end.api.flush();
log.end.setSid(sid0);
log.end.api.obj([]).set({foo: 'baz'});
const patch3 = log.end.api.flush();
const found0 = log.findMax(sid0);
const found1 = log.findMax(sid1);
const found2 = log.findMax(12345);
expect(found0).toBe(patch3);
expect(found1).toBe(patch2);
expect(found2).toBe(void 0);
});
});

const setupTwoLogs = () => {
const model = Model.create({foo: 'bar'});
const log1 = Log.fromNewModel(model);
log1.metadata = {time: 123};
log1.end.api.obj([]).set({x: 1});
log1.end.api.flush();
log1.end.api.obj([]).set({y: 2});
log1.end.api.flush();
log1.end.api.obj([]).set({foo: 'baz'});
log1.end.api.flush();
const log2 = log1.clone();
return {log1, log2};
};

const assertLogsEqual = (log1: Log<any, any>, log2: Log<any, any>) => {
expect(log1.start()).not.toBe(log2.start());
expect(deepEqual(log1.start().view(), log2.start().view())).toBe(true);
expect(log1.start().clock.sid).toEqual(log2.start().clock.sid);
expect(log1.start().clock.time).toEqual(log2.start().clock.time);
expect(log1.end).not.toBe(log2.end);
expect(deepEqual(log1.end.view(), log2.end.view())).toBe(true);
expect(log1.end.clock.sid).toEqual(log2.end.clock.sid);
expect(log1.end.clock.time).toEqual(log2.end.clock.time);
expect(log1.metadata).not.toBe(log2.metadata);
expect(deepEqual(log1.metadata, log2.metadata)).toBe(true);
expect(log1.patches.size()).toBe(log2.patches.size());
expect(log1.patches.min!.v.toBinary()).toEqual(log2.patches.min!.v.toBinary());
expect(log1.patches.max!.v.toBinary()).toEqual(log2.patches.max!.v.toBinary());
expect(log1.patches.min!.v).not.toBe(log2.patches.min!.v);
expect(log1.patches.max!.v).not.toBe(log2.patches.max!.v);
};

describe('.clone()', () => {
test('start model has the same view and clock', () => {
const {log1, log2} = setupTwoLogs();
expect(log1.start()).not.toBe(log2.start());
expect(deepEqual(log1.start().view(), log2.start().view())).toBe(true);
expect(log1.start().clock.sid).toEqual(log2.start().clock.sid);
expect(log1.start().clock.time).toEqual(log2.start().clock.time);
});

test('end model has the same view and clock', () => {
const {log1, log2} = setupTwoLogs();
expect(log1.end).not.toBe(log2.end);
expect(deepEqual(log1.end.view(), log2.end.view())).toBe(true);
expect(log1.end.clock.sid).toEqual(log2.end.clock.sid);
expect(log1.end.clock.time).toEqual(log2.end.clock.time);
});

test('metadata is the same but has different identity', () => {
const {log1, log2} = setupTwoLogs();
expect(log1.metadata).not.toBe(log2.metadata);
expect(deepEqual(log1.metadata, log2.metadata)).toBe(true);
});

test('patch log is the same', () => {
const {log1, log2} = setupTwoLogs();
expect(log1.patches.size()).toBe(log2.patches.size());
expect(log1.patches.min!.v.toBinary()).toEqual(log2.patches.min!.v.toBinary());
expect(log1.patches.max!.v.toBinary()).toEqual(log2.patches.max!.v.toBinary());
expect(log1.patches.min!.v).not.toBe(log2.patches.min!.v);
expect(log1.patches.max!.v).not.toBe(log2.patches.max!.v);
});

test('can evolve logs independently', () => {
const {log1, log2} = setupTwoLogs();
assertLogsEqual(log1, log2);
log1.end.api.obj([]).set({a: 1});
log1.end.api.flush();
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1});
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2});
log2.end.api.obj([]).set({b: 2});
log2.end.api.flush();
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1});
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 2});
});
});

describe('.rebaseBatch()', () => {
test('can rebase a concurrent batch onto another log', () => {
const {log1, log2} = setupTwoLogs();
log1.end.api.obj([]).set({a: 1});
log2.end.api.obj([]).set({b: 2});
const patch1 = log1.end.api.flush();
const patch2 = log2.end.api.flush();
expect(patch1.toBinary()).not.toEqual(patch2.toBinary());
expect(patch1.getId()?.sid).toBe(patch2.getId()?.sid);
expect(patch1.getId()?.time).toBe(patch2.getId()?.time);
expect(patch1.span()).toEqual(patch2.span());
const [patch3] = log1.rebaseBatch([patch2]);
expect(patch1.toBinary()).not.toEqual(patch3.toBinary());
expect(patch1.getId()?.sid).toBe(patch3.getId()?.sid);
expect(patch1.getId()!.time + patch1.span()).toBe(patch3.getId()?.time);
log1.end.applyPatch(patch3);
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 1, b: 2});
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 2});
expect(() => assertLogsEqual(log1, log2)).toThrow();
log2.reset(log1.clone());
assertLogsEqual(log1, log2);
});

test('can rebase a concurrent batch onto another log (multiple patches)', () => {
const {log1, log2} = setupTwoLogs();
log1.end.api.obj([]).set({a: 1});
log2.end.api.obj([]).set({b: 2});
log1.end.api.flush();
const patch2 = log2.end.api.flush();
log1.end.api.obj([]).set({a: 2});
log2.end.api.obj([]).set({b: 3});
log1.end.api.flush();
const patch4 = log2.end.api.flush();
log2.end.api.obj([]).set({b: 3});
const patch5 = log2.end.api.flush();
const batch2 = [patch2, patch4, patch5];
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2});
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
const batch3 = log1.rebaseBatch(batch2);
expect(batch3[0].getId()!.time).toBe(log1.end.clock.time);
log1.end.applyBatch(batch3);
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2, b: 3});
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
expect(() => assertLogsEqual(log1, log2)).toThrow();
log2.reset(log1.clone());
assertLogsEqual(log1, log2);
});

test('can specify rebase sid', () => {
const {log1, log2} = setupTwoLogs();
expect(log1.end.clock.sid).toBe(log2.end.clock.sid);
log1.end.api.obj([]).set({a: 1});
log2.end.api.obj([]).set({b: 2});
log1.end.api.flush();
const patch2 = log2.end.api.flush();
log1.end.setSid(12345);
log1.end.api.obj([]).set({a: 2});
log2.end.api.obj([]).set({b: 3});
log1.end.api.flush();
const patch4 = log2.end.api.flush();
log2.end.api.obj([]).set({b: 3});
const patch5 = log2.end.api.flush();
const batch2 = [patch2, patch4, patch5];
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2});
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
const batch3 = log1.rebaseBatch(batch2, log2.end.clock.sid);
expect(batch3[0].getId()!.time).not.toBe(log1.end.clock.time);
log1.end.applyBatch(batch3);
expect(log1.end.view()).toEqual({foo: 'baz', x: 1, y: 2, a: 2, b: 3});
expect(log2.end.view()).toEqual({foo: 'baz', x: 1, y: 2, b: 3});
expect(() => assertLogsEqual(log1, log2)).toThrow();
log2.reset(log1.clone());
assertLogsEqual(log1, log2);
});
});

describe('.undo()', () => {
describe('RGA', () => {
describe('str', () => {
Expand Down