Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
"demo:e2e:json-crdt-server:http1": "ts-node src/__demos__/json-crdt-server/main-http1.ts",
"demo:e2e:json-crdt-server:uws": "ts-node src/__demos__/json-crdt-server/main-uws.ts",
"demo:ui:json": "webpack serve --config ./src/__demos__/ui-json/webpack.config.js",
"demo:ui:text": "webpack serve --config ./src/__demos__/ui-text/webpack.config.js",
"start:json-crdt-server:http1": "NODE_ENV=production PORT=80 JSON_CRDT_STORE=level pm2 start lib/__demos__/json-crdt-server/main-http1.js",
"start": "NODE_ENV=production PORT=80 JSON_CRDT_STORE=level pm2 start lib/__demos__/json-crdt-server/main-http1.js --exp-backoff-restart-delay=100",
"coverage": "yarn test --collectCoverage",
Expand Down
2 changes: 1 addition & 1 deletion src/__demos__/ui-json/main.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import {Model, Patch} from 'json-joy/lib/json-crdt';
const repo = new JsonCrdtRepo({
wsUrl: 'wss://demo-iasd8921ondk0.jsonjoy.com/rpc',
});
const id = 'block-sync-ui-demo-id';
const id = 'block-sync-ui-demo-json';
const session = repo.make(id);

const model = session.model;
Expand Down
59 changes: 59 additions & 0 deletions src/__demos__/ui-text/main.tsx
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import * as React from 'react';
import * as ReactDOM from 'react-dom/client';
import {JsonCrdtRepo} from '../../json-crdt-repo/JsonCrdtRepo';
import {bind} from 'collaborative-input';
import {Model, Patch} from 'json-joy/lib/json-crdt';

/* tslint:disable no-console */

const main = async () => {
const repo = new JsonCrdtRepo({
wsUrl: 'wss://demo-iasd8921ondk0.jsonjoy.com/rpc',
});
const id = 'block-sync-ui-demo-text-3';
const session = await repo.sessions.load({id: [id], make: {}, remote: {timeout: 1000}});
const model = session.model;
const view = model.view();
if (typeof view !== 'string') model.api.root('');

const Demo: React.FC = () => {
const [remote, setRemote] = React.useState<Model | null>(null);
const ref = React.useRef<HTMLTextAreaElement | null>(null);
React.useLayoutEffect(() => {
if (!ref.current) return;
const unbind = bind(() => model.api.str([]), ref.current);
return () => unbind();
}, []);

return (
<div style={{padding: 32}}>
<textarea ref={ref} style={{width: 600, height: 300}}></textarea>
<hr />
<button
onClick={async () => {
const {block} = await repo.remote.read(id);
const model = Model.fromBinary(block.snapshot.blob);
for (const batch of block.tip)
for (const patch of batch.patches) model.applyPatch(Patch.fromBinary(patch.blob));
setRemote(model);
}}
>
Load remote state
</button>
<br />
{!!remote && (
<code style={{fontSize: 8}}>
<pre>{remote.toString()}</pre>
</code>
)}
</div>
);
};

const div = document.createElement('div');
document.body.appendChild(div);
const root = ReactDOM.createRoot(div);
root.render(<Demo />);
};

main().catch(() => {});
35 changes: 35 additions & 0 deletions src/__demos__/ui-text/webpack.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
const path = require('path');
const HtmlWebpackPlugin = require('html-webpack-plugin');

module.exports = {
mode: 'development',
devtool: 'inline-source-map',
entry: {
bundle: __dirname + '/main',
},
plugins: [
new HtmlWebpackPlugin({
title: 'Development',
}),
],
module: {
rules: [
{
test: /\.tsx?$/,
exclude: /node_modules/,
loader: 'ts-loader',
},
],
},
resolve: {
extensions: ['.tsx', '.ts', '.js'],
},
output: {
filename: '[name].js',
path: path.resolve(__dirname, '../../dist'),
},
devServer: {
port: 9949,
hot: false,
},
};
27 changes: 20 additions & 7 deletions src/json-crdt-repo/local/level/LevelLocalRepo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ export class LevelLocalRepo implements LocalRepo {
} catch (error) {
if (error instanceof Error && error.message === 'EXISTS')
// TODO: make sure reset does not happen, if models are the same.
// TODO: Check for `req.time` in `_syncRead`.
return await this._syncRead(id);
throw error;
}
Expand All @@ -732,6 +733,16 @@ export class LevelLocalRepo implements LocalRepo {

private async _syncMerge(req: LocalRepoSyncRequest): Promise<LocalRepoSyncResponse> {
const {id, patches} = req;
let lastKnownTime: number = 0;
const reqTime = req.time;
if (typeof reqTime === 'number') {
lastKnownTime = reqTime;
const firstPatch = patches?.[0];
if (firstPatch?.getId()?.sid === SESSION.GLOBAL) lastKnownTime = firstPatch.getId()!.time + firstPatch.span() - 1;
} else if (patches?.length) {
const firstPatchTime = patches?.[0]?.getId()?.time;
if (typeof firstPatchTime === 'number') lastKnownTime = firstPatchTime - 1;
}
const keyBase = await this.blockKeyBase(id);
if (!patches || !patches.length) throw new Error('EMPTY_BATCH');
const writtenPatches: Uint8Array[] = [];
Expand All @@ -740,10 +751,12 @@ export class LevelLocalRepo implements LocalRepo {
// TODO: Return correct response.
// TODO: Check that remote state is in sync, too.
let needsReset = false;
let cursorBehind = false;
const didPush = await this.lockBlock(keyBase, async () => {
const [tip, meta] = await Promise.all([this.readFrontierTip(keyBase), this.readMeta(keyBase)]);
if (meta.seq > -1 && (typeof req.cursor !== 'number' || req.cursor < meta.seq)) needsReset = true;
if (meta.seq > -1 && (typeof req.cursor !== 'number' || req.cursor < meta.seq)) cursorBehind = true;
let nextTick = meta.time + 1;
if (lastKnownTime < meta.time) needsReset = true;
cursor = meta.seq;
if (tip) {
const tipTime = tip.getId()?.time ?? 0;
Expand Down Expand Up @@ -775,18 +788,18 @@ export class LevelLocalRepo implements LocalRepo {
const op: BinStrLevelOperation = {type: 'put', key: patchKey, value: uint8};
ops.push(op);
}
if (writtenPatches.length) {
this.pubsub.pub({type: 'rebase', id, patches: writtenPatches, session: req.session});
}
if (ops.length) {
await this.kv.batch(ops);
return true;
}
return false;
});
if (writtenPatches.length) {
this.pubsub.pub({type: 'rebase', id, patches: writtenPatches, session: req.session});
}
if (!didPush && !needsReset) {
const merge = await this.readFrontier0(keyBase);
return {cursor, merge};
return {cursor, merge, cursorBehind};
}
const remote = this.markDirtyAndSync(keyBase, id)
.then(() => {})
Expand All @@ -796,9 +809,9 @@ export class LevelLocalRepo implements LocalRepo {
});
if (needsReset) {
const {cursor, model} = await this._syncRead0(keyBase);
return {cursor, model, remote};
return {cursor, model, remote, cursorBehind};
}
return {cursor, remote};
return {cursor, remote, cursorBehind};
}

protected async readLocal0(keyBase: string): Promise<[model: Model, cursor: LevelLocalRepoCursor]> {
Expand Down
13 changes: 13 additions & 0 deletions src/json-crdt-repo/local/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ export interface LocalRepoSyncRequest {
*/
id: BlockId;

/**
* Logical clock time of the local operations which the client has caught up
* to.
*/
time?: number;

/**
* The last known cursor returned in the `.sync()` call response. The cursor
* should be omitted in the first `.sync()` call, and then set to the value
Expand Down Expand Up @@ -115,6 +121,13 @@ export interface LocalRepoSyncResponse {
*/
cursor: undefined | unknown;

/**
* Set to true if the client is behind the remote. When true, the client
* should call `.getIf()` after a short wait period to check if the remote
* is indeed ahead.
*/
cursorBehind?: boolean;

/**
* Model snapshot that the client should reset its "start" state to. The
* `Model` is sent when the *sync* call detects that the client is behind the
Expand Down
46 changes: 40 additions & 6 deletions src/json-crdt-repo/session/EditSession.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,14 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
const length = patches.length;
// TODO: After async call check that sync state is still valid. New patches, might have been added.
if (length || this.cursor === undefined) {
const res = await this.repo.sync({id: this.id, patches, cursor: this.cursor, session: this.session});
const time = this.start.clock.time - 1;
const res = await this.repo.sync({
id: this.id,
patches,
time,
cursor: this.cursor,
session: this.session,
});
if (this._stopped) return null;
// TODO: After sync call succeeds, remove the patches from the log.
if (length) {
Expand All @@ -96,16 +103,33 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
if (lastId) this.log.advanceTo(lastId);
this.start.applyBatch(patches);
}
if (typeof res.cursor !== undefined) this.cursor = res.cursor;
// "cursor" shall not be returned from .sync() call. The cursor shall update
// only when remote model changes are received, during local .sync() write
// only the local model is updated.
if (typeof res.cursor !== undefined) {
this.cursor = res.cursor;
}
if (res.model) {
this._syncRace(() => {
this.reset(<any>res.model!);
});
} else if (res.merge) {
} else if (res.merge && res.merge) {
this._syncRace(() => {
this.merge(<any>res.merge!);
});
}
if (res.cursorBehind) {
setTimeout(async () => {
if (this._stopped) return;
const get = await this.repo.getIf({
id: this.id,
cursor: this.cursor,
});
if (this._stopped) return;
if (!get) return;
this.reset(<any>get.model);
}, 50);
}
return {remote: res.remote};
} else {
const res = await this.repo.getIf({id: this.id, time: this.model.clock.time - 1, cursor: this.cursor});
Expand All @@ -125,9 +149,13 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {
public syncLog(): void {
if (!this.log.patches.size()) return;
this._syncRace(() => {
this.sync().then((error) => {
this.onsyncerror?.(error);
});
this.sync()
.then((error) => {
this.onsyncerror?.(error);
})
.catch((error) => {
this.onsyncerror?.(error);
});
});
}

Expand Down Expand Up @@ -209,6 +237,12 @@ export class EditSession<N extends JsonNode = JsonNode<any>> {

private onEvent = (event: LocalRepoEvent): void => {
if (this._stopped) return;
if ((event as LocalRepoMergeEvent).merge) {
const cursor = (event as LocalRepoMergeEvent).cursor;
if (cursor !== undefined) {
this.cursor = cursor;
}
}
if ((event as LocalRepoRebaseEvent).rebase) {
if ((event as LocalRepoRebaseEvent).session === this.session) return;
}
Expand Down
9 changes: 7 additions & 2 deletions src/json-crdt-repo/session/EditSessionFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,14 @@ export class EditSessionFactory {
session.log.end.api.autoFlush();
return session;
} catch (error) {
if (error instanceof Error && error.message === 'TIMEOUT') {
if (!!error && typeof error === 'object' && (error as Record<string, unknown>).message === 'TIMEOUT') {
if (!opts.make) throw error;
} else if (error instanceof Error && error.message === 'NOT_FOUND') {
} else if (
!!error &&
typeof error === 'object' &&
((error as Record<string, unknown>).message === 'NOT_FOUND' ||
(error as Record<string, unknown>).code === 'NOT_FOUND')
) {
if (remote.throwIf === 'missing') throw error;
} else throw error;
}
Expand Down
47 changes: 47 additions & 0 deletions src/json-crdt-repo/session/__tests__/EditSession.sync.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,38 @@ describe('sync', () => {
await kit.stop();
});

test('sessions created just after the first one, converges in state', async () => {
const kit = await setup();
const schema = s.obj({a: s.con('a')});
const {session: session1} = kit.sessions.make({id: kit.blockId, schema, session: 1});
const {session: session2} = kit.sessions.make({id: kit.blockId, schema, session: 2});
await session1.sync();
await session2.sync();
expect(session1.log.patches.size()).toBe(0);
session1.model.api.obj([]).set({b: 'b'});
session1.model.api.obj([]).set({c: 'c'});
await tick(5);
session1.model.api.obj([]).set({d: 'd'});
const {session: session3} = kit.sessions.make({id: kit.blockId, pull: true, schema, session: 3});
await tick(5);
session1.model.api.obj([]).set({e: 'e'});
await tick(5);
session1.model.api.obj([]).set({f: 'f'});
await session1.sync();
await until(async () => {
try {
expect(session1.model.view()).toEqual(session3.model.view());
return true;
} catch {
return false;
}
});
await session1.dispose();
await session2.dispose();
await session3.dispose();
await kit.stop();
});

test('sessions converge to the same view', async () => {
const kit = await setup();
const schema = s.obj({a: s.con('a')});
Expand All @@ -141,7 +173,21 @@ describe('sync', () => {
await until(async () => {
try {
expect(session1.model.view()).toEqual(session2.model.view());
return true;
} catch {
return false;
}
});
await until(async () => {
try {
expect(session1.model.view()).toEqual(session3.model.view());
return true;
} catch {
return false;
}
});
await until(async () => {
try {
expect(session1.model.view()).toEqual(session4.model.view());
return true;
} catch {
Expand All @@ -155,6 +201,7 @@ describe('sync', () => {
await session1.dispose();
await session2.dispose();
await session3.dispose();
await session4.dispose();
await kit.stop();
});
});