diff --git a/package.json b/package.json index e1e77637..c04fc067 100644 --- a/package.json +++ b/package.json @@ -56,6 +56,7 @@ "demo:e2e:json-crdt-server:http1": "ts-node src/__demos__/json-crdt-server/main-http1.ts", "demo:e2e:json-crdt-server:uws": "ts-node src/__demos__/json-crdt-server/main-uws.ts", "demo:ui:json": "webpack serve --config ./src/__demos__/ui-json/webpack.config.js", + "demo:ui:text": "webpack serve --config ./src/__demos__/ui-text/webpack.config.js", "start:json-crdt-server:http1": "NODE_ENV=production PORT=80 JSON_CRDT_STORE=level pm2 start lib/__demos__/json-crdt-server/main-http1.js", "start": "NODE_ENV=production PORT=80 JSON_CRDT_STORE=level pm2 start lib/__demos__/json-crdt-server/main-http1.js --exp-backoff-restart-delay=100", "coverage": "yarn test --collectCoverage", diff --git a/src/__demos__/ui-json/main.tsx b/src/__demos__/ui-json/main.tsx index 370448bc..26488a7a 100644 --- a/src/__demos__/ui-json/main.tsx +++ b/src/__demos__/ui-json/main.tsx @@ -9,7 +9,7 @@ import {Model, Patch} from 'json-joy/lib/json-crdt'; const repo = new JsonCrdtRepo({ wsUrl: 'wss://demo-iasd8921ondk0.jsonjoy.com/rpc', }); -const id = 'block-sync-ui-demo-id'; +const id = 'block-sync-ui-demo-json'; const session = repo.make(id); const model = session.model; diff --git a/src/__demos__/ui-text/main.tsx b/src/__demos__/ui-text/main.tsx new file mode 100644 index 00000000..ec88240e --- /dev/null +++ b/src/__demos__/ui-text/main.tsx @@ -0,0 +1,59 @@ +import * as React from 'react'; +import * as ReactDOM from 'react-dom/client'; +import {JsonCrdtRepo} from '../../json-crdt-repo/JsonCrdtRepo'; +import {bind} from 'collaborative-input'; +import {Model, Patch} from 'json-joy/lib/json-crdt'; + +/* tslint:disable no-console */ + +const main = async () => { + const repo = new JsonCrdtRepo({ + wsUrl: 'wss://demo-iasd8921ondk0.jsonjoy.com/rpc', + }); + const id = 'block-sync-ui-demo-text-3'; + const session = await repo.sessions.load({id: [id], make: {}, remote: {timeout: 1000}}); + const model = session.model; + const view = model.view(); + if (typeof view !== 'string') model.api.root(''); + + const Demo: React.FC = () => { + const [remote, setRemote] = React.useState(null); + const ref = React.useRef(null); + React.useLayoutEffect(() => { + if (!ref.current) return; + const unbind = bind(() => model.api.str([]), ref.current); + return () => unbind(); + }, []); + + return ( +
+ +
+ +
+ {!!remote && ( + +
{remote.toString()}
+
+ )} +
+ ); + }; + + const div = document.createElement('div'); + document.body.appendChild(div); + const root = ReactDOM.createRoot(div); + root.render(); +}; + +main().catch(() => {}); diff --git a/src/__demos__/ui-text/webpack.config.js b/src/__demos__/ui-text/webpack.config.js new file mode 100644 index 00000000..202a1346 --- /dev/null +++ b/src/__demos__/ui-text/webpack.config.js @@ -0,0 +1,35 @@ +const path = require('path'); +const HtmlWebpackPlugin = require('html-webpack-plugin'); + +module.exports = { + mode: 'development', + devtool: 'inline-source-map', + entry: { + bundle: __dirname + '/main', + }, + plugins: [ + new HtmlWebpackPlugin({ + title: 'Development', + }), + ], + module: { + rules: [ + { + test: /\.tsx?$/, + exclude: /node_modules/, + loader: 'ts-loader', + }, + ], + }, + resolve: { + extensions: ['.tsx', '.ts', '.js'], + }, + output: { + filename: '[name].js', + path: path.resolve(__dirname, '../../dist'), + }, + devServer: { + port: 9949, + hot: false, + }, +}; diff --git a/src/json-crdt-repo/local/level/LevelLocalRepo.ts b/src/json-crdt-repo/local/level/LevelLocalRepo.ts index f590136c..609c817e 100644 --- a/src/json-crdt-repo/local/level/LevelLocalRepo.ts +++ b/src/json-crdt-repo/local/level/LevelLocalRepo.ts @@ -715,6 +715,7 @@ export class LevelLocalRepo implements LocalRepo { } catch (error) { if (error instanceof Error && error.message === 'EXISTS') // TODO: make sure reset does not happen, if models are the same. + // TODO: Check for `req.time` in `_syncRead`. return await this._syncRead(id); throw error; } @@ -732,6 +733,16 @@ export class LevelLocalRepo implements LocalRepo { private async _syncMerge(req: LocalRepoSyncRequest): Promise { const {id, patches} = req; + let lastKnownTime: number = 0; + const reqTime = req.time; + if (typeof reqTime === 'number') { + lastKnownTime = reqTime; + const firstPatch = patches?.[0]; + if (firstPatch?.getId()?.sid === SESSION.GLOBAL) lastKnownTime = firstPatch.getId()!.time + firstPatch.span() - 1; + } else if (patches?.length) { + const firstPatchTime = patches?.[0]?.getId()?.time; + if (typeof firstPatchTime === 'number') lastKnownTime = firstPatchTime - 1; + } const keyBase = await this.blockKeyBase(id); if (!patches || !patches.length) throw new Error('EMPTY_BATCH'); const writtenPatches: Uint8Array[] = []; @@ -740,10 +751,12 @@ export class LevelLocalRepo implements LocalRepo { // TODO: Return correct response. // TODO: Check that remote state is in sync, too. let needsReset = false; + let cursorBehind = false; const didPush = await this.lockBlock(keyBase, async () => { const [tip, meta] = await Promise.all([this.readFrontierTip(keyBase), this.readMeta(keyBase)]); - if (meta.seq > -1 && (typeof req.cursor !== 'number' || req.cursor < meta.seq)) needsReset = true; + if (meta.seq > -1 && (typeof req.cursor !== 'number' || req.cursor < meta.seq)) cursorBehind = true; let nextTick = meta.time + 1; + if (lastKnownTime < meta.time) needsReset = true; cursor = meta.seq; if (tip) { const tipTime = tip.getId()?.time ?? 0; @@ -775,18 +788,18 @@ export class LevelLocalRepo implements LocalRepo { const op: BinStrLevelOperation = {type: 'put', key: patchKey, value: uint8}; ops.push(op); } + if (writtenPatches.length) { + this.pubsub.pub({type: 'rebase', id, patches: writtenPatches, session: req.session}); + } if (ops.length) { await this.kv.batch(ops); return true; } return false; }); - if (writtenPatches.length) { - this.pubsub.pub({type: 'rebase', id, patches: writtenPatches, session: req.session}); - } if (!didPush && !needsReset) { const merge = await this.readFrontier0(keyBase); - return {cursor, merge}; + return {cursor, merge, cursorBehind}; } const remote = this.markDirtyAndSync(keyBase, id) .then(() => {}) @@ -796,9 +809,9 @@ export class LevelLocalRepo implements LocalRepo { }); if (needsReset) { const {cursor, model} = await this._syncRead0(keyBase); - return {cursor, model, remote}; + return {cursor, model, remote, cursorBehind}; } - return {cursor, remote}; + return {cursor, remote, cursorBehind}; } protected async readLocal0(keyBase: string): Promise<[model: Model, cursor: LevelLocalRepoCursor]> { diff --git a/src/json-crdt-repo/local/types.ts b/src/json-crdt-repo/local/types.ts index 45992f50..86327d92 100644 --- a/src/json-crdt-repo/local/types.ts +++ b/src/json-crdt-repo/local/types.ts @@ -87,6 +87,12 @@ export interface LocalRepoSyncRequest { */ id: BlockId; + /** + * Logical clock time of the local operations which the client has caught up + * to. + */ + time?: number; + /** * The last known cursor returned in the `.sync()` call response. The cursor * should be omitted in the first `.sync()` call, and then set to the value @@ -115,6 +121,13 @@ export interface LocalRepoSyncResponse { */ cursor: undefined | unknown; + /** + * Set to true if the client is behind the remote. When true, the client + * should call `.getIf()` after a short wait period to check if the remote + * is indeed ahead. + */ + cursorBehind?: boolean; + /** * Model snapshot that the client should reset its "start" state to. The * `Model` is sent when the *sync* call detects that the client is behind the diff --git a/src/json-crdt-repo/session/EditSession.ts b/src/json-crdt-repo/session/EditSession.ts index 5d9fbece..eb4b161c 100644 --- a/src/json-crdt-repo/session/EditSession.ts +++ b/src/json-crdt-repo/session/EditSession.ts @@ -87,7 +87,14 @@ export class EditSession> { const length = patches.length; // TODO: After async call check that sync state is still valid. New patches, might have been added. if (length || this.cursor === undefined) { - const res = await this.repo.sync({id: this.id, patches, cursor: this.cursor, session: this.session}); + const time = this.start.clock.time - 1; + const res = await this.repo.sync({ + id: this.id, + patches, + time, + cursor: this.cursor, + session: this.session, + }); if (this._stopped) return null; // TODO: After sync call succeeds, remove the patches from the log. if (length) { @@ -96,16 +103,33 @@ export class EditSession> { if (lastId) this.log.advanceTo(lastId); this.start.applyBatch(patches); } - if (typeof res.cursor !== undefined) this.cursor = res.cursor; + // "cursor" shall not be returned from .sync() call. The cursor shall update + // only when remote model changes are received, during local .sync() write + // only the local model is updated. + if (typeof res.cursor !== undefined) { + this.cursor = res.cursor; + } if (res.model) { this._syncRace(() => { this.reset(res.model!); }); - } else if (res.merge) { + } else if (res.merge && res.merge) { this._syncRace(() => { this.merge(res.merge!); }); } + if (res.cursorBehind) { + setTimeout(async () => { + if (this._stopped) return; + const get = await this.repo.getIf({ + id: this.id, + cursor: this.cursor, + }); + if (this._stopped) return; + if (!get) return; + this.reset(get.model); + }, 50); + } return {remote: res.remote}; } else { const res = await this.repo.getIf({id: this.id, time: this.model.clock.time - 1, cursor: this.cursor}); @@ -125,9 +149,13 @@ export class EditSession> { public syncLog(): void { if (!this.log.patches.size()) return; this._syncRace(() => { - this.sync().then((error) => { - this.onsyncerror?.(error); - }); + this.sync() + .then((error) => { + this.onsyncerror?.(error); + }) + .catch((error) => { + this.onsyncerror?.(error); + }); }); } @@ -209,6 +237,12 @@ export class EditSession> { private onEvent = (event: LocalRepoEvent): void => { if (this._stopped) return; + if ((event as LocalRepoMergeEvent).merge) { + const cursor = (event as LocalRepoMergeEvent).cursor; + if (cursor !== undefined) { + this.cursor = cursor; + } + } if ((event as LocalRepoRebaseEvent).rebase) { if ((event as LocalRepoRebaseEvent).session === this.session) return; } diff --git a/src/json-crdt-repo/session/EditSessionFactory.ts b/src/json-crdt-repo/session/EditSessionFactory.ts index 6a411aef..11af3d96 100644 --- a/src/json-crdt-repo/session/EditSessionFactory.ts +++ b/src/json-crdt-repo/session/EditSessionFactory.ts @@ -66,9 +66,14 @@ export class EditSessionFactory { session.log.end.api.autoFlush(); return session; } catch (error) { - if (error instanceof Error && error.message === 'TIMEOUT') { + if (!!error && typeof error === 'object' && (error as Record).message === 'TIMEOUT') { if (!opts.make) throw error; - } else if (error instanceof Error && error.message === 'NOT_FOUND') { + } else if ( + !!error && + typeof error === 'object' && + ((error as Record).message === 'NOT_FOUND' || + (error as Record).code === 'NOT_FOUND') + ) { if (remote.throwIf === 'missing') throw error; } else throw error; } diff --git a/src/json-crdt-repo/session/__tests__/EditSession.sync.spec.ts b/src/json-crdt-repo/session/__tests__/EditSession.sync.spec.ts index 974a9c6c..efad9e73 100644 --- a/src/json-crdt-repo/session/__tests__/EditSession.sync.spec.ts +++ b/src/json-crdt-repo/session/__tests__/EditSession.sync.spec.ts @@ -118,6 +118,38 @@ describe('sync', () => { await kit.stop(); }); + test('sessions created just after the first one, converges in state', async () => { + const kit = await setup(); + const schema = s.obj({a: s.con('a')}); + const {session: session1} = kit.sessions.make({id: kit.blockId, schema, session: 1}); + const {session: session2} = kit.sessions.make({id: kit.blockId, schema, session: 2}); + await session1.sync(); + await session2.sync(); + expect(session1.log.patches.size()).toBe(0); + session1.model.api.obj([]).set({b: 'b'}); + session1.model.api.obj([]).set({c: 'c'}); + await tick(5); + session1.model.api.obj([]).set({d: 'd'}); + const {session: session3} = kit.sessions.make({id: kit.blockId, pull: true, schema, session: 3}); + await tick(5); + session1.model.api.obj([]).set({e: 'e'}); + await tick(5); + session1.model.api.obj([]).set({f: 'f'}); + await session1.sync(); + await until(async () => { + try { + expect(session1.model.view()).toEqual(session3.model.view()); + return true; + } catch { + return false; + } + }); + await session1.dispose(); + await session2.dispose(); + await session3.dispose(); + await kit.stop(); + }); + test('sessions converge to the same view', async () => { const kit = await setup(); const schema = s.obj({a: s.con('a')}); @@ -141,7 +173,21 @@ describe('sync', () => { await until(async () => { try { expect(session1.model.view()).toEqual(session2.model.view()); + return true; + } catch { + return false; + } + }); + await until(async () => { + try { expect(session1.model.view()).toEqual(session3.model.view()); + return true; + } catch { + return false; + } + }); + await until(async () => { + try { expect(session1.model.view()).toEqual(session4.model.view()); return true; } catch { @@ -155,6 +201,7 @@ describe('sync', () => { await session1.dispose(); await session2.dispose(); await session3.dispose(); + await session4.dispose(); await kit.stop(); }); });