Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
884b560
feat(json-crdt): 🎸 improve LocalHistoryCrud implementation
streamich Apr 6, 2024
339826f
chore(json-crdt): 🤖 change SessionHistory dependencies
streamich Apr 7, 2024
0dd6756
refactor(json-crdt): 💡 move repo code to its own subfolder
streamich Apr 7, 2024
839ea96
Merge branch 'next' into session-history
streamich Apr 10, 2024
0d3c8c9
feat: 🎸 improve create schema
streamich Apr 10, 2024
166f209
feat: 🎸 initialize service by default
streamich Apr 10, 2024
d7aa273
feat: 🎸 improve RPC types
streamich Apr 10, 2024
05af834
chore(json-crdt): 🤖 start RemoteHistoryServer implementation
streamich Apr 10, 2024
1b17c68
feat(json-crdt): 🎸 progress on remote history implementation
streamich Apr 10, 2024
ac22950
chore: 🤖 add todos for demo server APIS
streamich Apr 10, 2024
89d4851
test(json-crdt): 💍 add remote history smoke test
streamich Apr 10, 2024
5be2b24
refactor(reactive-rpc): 💡 update block RPC command names
streamich Apr 10, 2024
be282c4
feat(reactive-rpc): 🎸 improve block store interface, return infor abo…
streamich Apr 11, 2024
e62b5ef
feat(reactive-rpc): 🎸 improve scan method
streamich Apr 11, 2024
a43f396
feat(json-crdt): 🎸 allow to construct a model from collection of patches
streamich Apr 13, 2024
ecd3a68
refactor(reactive-rpc): 💡 update blocks scan API
streamich Apr 13, 2024
11d958c
feat(reactive-rpc): 🎸 GC old blocks based on update time
streamich Apr 13, 2024
ca4d16b
feat(json-type): 🎸 add ability to extend ObjectType
streamich Apr 13, 2024
1a56146
refactor(reactive-rpc): 💡 cleanup demo server
streamich Apr 14, 2024
ae14b6c
feat(reactive-rpc): 🎸 allow to optionally load full history in "block…
streamich Apr 14, 2024
2dcdad5
feat(reactive-rpc): 🎸 improve emitted event shape
streamich Apr 14, 2024
e1343ac
style: 💄 run Prettier
streamich Apr 14, 2024
f86ed4d
feat(reactive-rpc): 🎸 cleanup RemoteHistoryDemoServer
streamich Apr 14, 2024
7d6a033
fix(json-crdt): 🐛 use right payload
streamich Apr 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import {CborEncoder} from '@jsonjoy.com/json-pack/lib/cbor/CborEncoder';
import {CborDecoder} from '@jsonjoy.com/json-pack/lib/cbor/CborDecoder';
import {LogEncoder} from '../log/codec/LogEncoder';
import {LogDecoder} from '../log/codec/LogDecoder';
import {LogEncoder} from '../json-crdt/log/codec/LogEncoder';
import {LogDecoder} from '../json-crdt/log/codec/LogDecoder';
import type {CrudApi} from 'memfs/lib/crud/types';
import type {Locks} from 'thingies/es2020/Locks';
import type {Patch} from '../../json-crdt-patch';
import type {Log} from '../log/Log';
import type {Patch} from '../json-crdt-patch';
import type {Log} from '../json-crdt/log/Log';
import type {LocalHistory} from './types';

export const genId = (octets: number = 8): string => {
Expand All @@ -30,9 +30,8 @@ export class LocalHistoryCrud implements LocalHistory {
protected readonly locks: Locks,
) {}

public async create(collection: string[], log: Log): Promise<{id: string}> {
public async create(collection: string[], log: Log, id: string = genId()): Promise<{id: string}> {
const blob = this.encode(log);
const id = genId();
await this.lock(collection, id, async () => {
await this.crud.put([...collection, id], STATE_FILE_NAME, blob, {throwIf: 'exists'});
});
Expand All @@ -54,12 +53,17 @@ export class LocalHistoryCrud implements LocalHistory {
const {frontier} = this.decoder.decode(blob, {format: 'seq.cbor', frontier: true});
return {
log: frontier!,
cursor: '',
cursor: '1',
};
}

public readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}> {
throw new Error('Method not implemented.');
public async readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}> {
const blob = await this.crud.get([...collection, id], STATE_FILE_NAME);
const {history} = this.decoder.decode(blob, {format: 'seq.cbor', history: true});
return {
log: history!,
cursor: '',
};
}

public async update(collection: string[], id: string, patches: Patch[]): Promise<void> {
Expand Down
78 changes: 78 additions & 0 deletions src/json-crdt-repo/SessionHistory.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import {createRace} from 'thingies/es2020/createRace';
import {FanOutUnsubscribe} from 'thingies/es2020/fanout';
import {InsValOp, Patch} from '../json-crdt-patch';
import {ValNode} from '../json-crdt/nodes';
import {toSchema} from '../json-crdt/schema/toSchema';
import {Log} from '../json-crdt/log/Log';
import {RedoItem, UndoItem, UndoRedoStack} from './UndoRedoStack';
import type {LocalHistory} from './types';

class Undo implements UndoItem {
constructor(public readonly undo: () => Redo) {}
}

class Redo implements RedoItem {
constructor(public readonly redo: () => Undo) {}
}

export class SessionHistory {
constructor(
public readonly collection: string[],
public readonly id: string,
protected readonly local: LocalHistory,
) {}

private readonly __onPatchRace = createRace();

public attachUndoRedo(stack: UndoRedoStack): FanOutUnsubscribe {
// const onBeforePatch = (patch: Patch) => {
// this.__onPatchRace(() => {
// const undo = this.createUndo(patch);
// stack.push(undo);
// });
// };
// const unsubscribe = this.log.end.api.onBeforePatch.listen(onBeforePatch);
// return unsubscribe;
throw new Error('Method not implemented.');
}

public createUndo(patch: Patch): Undo {
const undoTasks: Array<() => void> = [];
const ops = patch.ops;
const length = ops.length;
for (let i = length - 1; i >= 0; i--) {
const op = ops[i];
switch (op.name()) {
case 'ins_val': {
// const insOp = op as InsValOp;
// const valNode = this.log.end.index.get(insOp.obj);
// if (!(valNode instanceof ValNode)) throw new Error('INVALID_NODE');
// const copy = toSchema(valNode.node());
// const valNodeId = valNode.id;
// const task = () => {
// const end = this.log.end;
// const valNode = end.index.get(valNodeId);
// if (!valNode) return;
// end.api.wrap(valNode).asVal().set(copy);
// };
// undoTasks.push(task);
}
}
}
const undo = new Undo(() => {
this.__onPatchRace(() => {
for (const task of undoTasks) task();
});
return new Redo(() => {
const undo = this.__onPatchRace(() => {
// // TODO: This line needs to be changed:
// const redoPatch = Patch.fromBinary(patch.toBinary());
// this.log.end.api.builder.patch = redoPatch;
// return this.createUndo(redoPatch);
});
return undo!;
});
});
return undo;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ import {memfs} from 'memfs';
import {NodeCrud} from 'memfs/lib/node-to-crud';
import {Locks} from 'thingies/es2020/Locks';
import {LocalHistoryCrud} from '../LocalHistoryCrud';
import {Log} from '../../log/Log';
import {Model} from '../../model';
import {Log} from '../../json-crdt/log/Log';
import {Model} from '../../json-crdt/model';

const setup = async () => {
const {fs, vol} = memfs();
Expand Down Expand Up @@ -60,3 +60,37 @@ test('can delete a document', async () => {
expect((err as Error).message).toBe(`Collection /test/${id} does not exist`);
}
});

test('can update document', async () => {
const {local} = await setup();
const model = Model.withLogicalClock();
model.api.root({
foo: 'spam',
});
const log = Log.fromNewModel(model);
const {id} = await local.create(['test'], log);
const {log: log2} = await local.read(['test'], id);
log2.end.api.obj([]).set({
bar: 'eggs',
});
const patch = log2.end.api.flush();
await local.update(['test'], id, [patch]);
const {log: log3} = await local.read(['test'], id);
expect(log3.end.view()).toStrictEqual({
foo: 'spam',
bar: 'eggs',
});
});

test('can delete document', async () => {
const {local} = await setup();
const model = Model.withLogicalClock();
model.api.root({
foo: 'spam',
});
const log = Log.fromNewModel(model);
const {id} = await local.create(['test'], log);
await local.read(['test'], id);
await local.delete(['test'], id);
expect(() => local.read(['test'], id)).rejects.toThrow(`Collection /test/${id} does not exist`);
});
100 changes: 100 additions & 0 deletions src/json-crdt-repo/remote/RemoteHistoryDemoServer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
import type {CallerToMethods, TypedRpcClient} from '../../reactive-rpc/common';
import type {JsonJoyDemoRpcCaller} from '../../server';
import type {RemoteHistory, RemoteModel, RemotePatch} from './types';

type Methods = CallerToMethods<JsonJoyDemoRpcCaller>;

export type Cursor = number;

export interface RemoteServerModel extends RemoteModel {
seq: number;
created: number;
updated: number;
}

export interface RemoteServerPatch extends RemotePatch {
seq: number;
}

export class RemoteHistoryDemoServer implements RemoteHistory<Cursor, RemoteServerModel, RemoteServerPatch> {
constructor(protected readonly client: TypedRpcClient<Methods>) {}

public async create(id: string, patches: RemotePatch[]): Promise<void> {
await this.client.call('block.new', {
id,
patches: patches.map((patch) => ({
blob: patch.blob,
})),
});
}

/**
* Load latest state of the model, and any unmerged "tip" of patches
* it might have.
*/
public async read(id: string): Promise<{cursor: Cursor; model: RemoteServerModel; patches: RemoteServerPatch[]}> {
const {model, patches} = await this.client.call('block.get', {id});
return {
cursor: model.seq,
model,
patches: [],
};
}

public async scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; patches: RemoteServerPatch[]}> {
const limit = 100;
const res = await this.client.call('block.scan', {
id,
seq: cursor,
limit: cursor + limit,
});
if (res.patches.length === 0) {
return {
cursor,
patches: [],
};
}
return {
cursor: res.patches[res.patches.length - 1].seq,
patches: res.patches,
};
}

public async scanBwd(
id: string,
cursor: Cursor,
): Promise<{cursor: Cursor; model: RemoteServerModel; patches: RemoteServerPatch[]}> {
throw new Error('The "blocks.history" should be able to return starting model.');
}

public async update(
id: string,
cursor: Cursor,
patches: RemotePatch[],
): Promise<{cursor: Cursor; patches: RemoteServerPatch[]}> {
const res = await this.client.call('block.upd', {
id,
patches: patches.map((patch, seq) => ({
seq,
created: Date.now(),
blob: patch.blob,
})),
});
return {
cursor: res.patches.length ? res.patches[res.patches.length - 1].seq : cursor,
patches: res.patches,
};
}

public async delete(id: string): Promise<void> {
await this.client.call('block.del', {id});
}

/**
* Subscribe to the latest changes to the model.
* @param callback
*/
public listen(id: string, cursor: Cursor, callback: (changes: RemoteServerPatch[]) => void): void {
throw new Error('Method not implemented.');
}
}
36 changes: 36 additions & 0 deletions src/json-crdt-repo/remote/__tests__/RemoteHistoryServer.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import {Model} from '../../../json-crdt/model';
import {buildE2eClient} from '../../../reactive-rpc/common/testing/buildE2eClient';
import {createCaller} from '../../../server/routes/index';
import {RemoteHistoryDemoServer} from '../RemoteHistoryDemoServer';

const setup = () => {
const {caller, router} = createCaller();
const {client} = buildE2eClient(caller);
const remote = new RemoteHistoryDemoServer(client);

return {
router,
caller,
client,
remote,
};
};

let cnt = 0;
const genId = () => Math.random().toString(36).slice(2) + '-' + Date.now().toString(36) + '-' + cnt++;

describe('.create()', () => {
test('can create a block with a simple patch', async () => {
const {remote, caller} = await setup();
const model = Model.withLogicalClock();
model.api.root({foo: 'bar'});
const patch = model.api.flush();
const blob = patch.toBinary();
const id = genId();
await remote.create(id, [{blob}]);
const {data} = await caller.call('block.get', {id}, {});
// console.log(data.patches);
const model2 = Model.fromBinary(data.model.blob);
expect(model2.view()).toEqual({foo: 'bar'});
});
});
38 changes: 38 additions & 0 deletions src/json-crdt-repo/remote/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/**
* A history of patches that have been applied to a model, stored on the
* "remote": (1) server; (2) content addressable storage; or (3) somewhere in a
* peer-to-peer network.
*/
export interface RemoteHistory<Cursor, M extends RemoteModel = RemoteModel, P extends RemotePatch = RemotePatch> {
create(id: string, patches: RemotePatch[]): Promise<void>;

/**
* Load latest state of the model, and any unmerged "tip" of patches
* it might have.
*
* @todo Maybe `state` and `tip` should be serialized to JSON?
*/
read(id: string): Promise<{cursor: Cursor; model: M; patches: P[]}>;

scanFwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; patches: P[]}>;

scanBwd(id: string, cursor: Cursor): Promise<{cursor: Cursor; model: M; patches: P[]}>;

update(id: string, cursor: Cursor, patches: RemotePatch[]): Promise<{cursor: Cursor; patches: P[]}>;

delete?(id: string): Promise<void>;

/**
* Subscribe to the latest changes to the model.
* @param callback
*/
listen(id: string, cursor: Cursor, callback: (patches: P[]) => void): void;
}

export interface RemoteModel {
blob: Uint8Array;
}

export interface RemotePatch {
blob: Uint8Array;
}
18 changes: 18 additions & 0 deletions src/json-crdt-repo/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import type {Patch} from '../json-crdt-patch';
import type {Log} from '../json-crdt/log/Log';
import type {Model} from '../json-crdt/model';

export interface LocalHistory {
create(collection: string[], log: Log): Promise<{id: string}>;
read(collection: string[], id: string): Promise<{log: Log; cursor: string}>;
readHistory(collection: string[], id: string, cursor: string): Promise<{log: Log; cursor: string}>;
update(collection: string[], id: string, patches: Patch[]): Promise<void>;
delete(collection: string[], id: string): Promise<void>;
}

export interface EditingSessionHistory {
load(id: string): Promise<Model>;
loadHistory(id: string): Promise<Log>;
undo(id: string): Promise<void>;
redo(id: string): Promise<void>;
}
Loading