From 6aaf83af7d96892baf2eead3493c2e36834ab796 Mon Sep 17 00:00:00 2001 From: liuyi Date: Thu, 21 Mar 2024 16:33:12 +0800 Subject: [PATCH] fix(server): wrong doc response branch tests (#6250) --- .../backend/server/src/core/doc/manager.ts | 4 +- .../common/infra/src/initialization/index.ts | 2 +- packages/common/infra/src/page/record-list.ts | 6 +- packages/common/infra/src/storage/kv.ts | 22 ++- packages/common/infra/src/storage/memento.ts | 18 +++ .../infra/src/workspace/engine/doc/README.md | 127 ++++++++++++++++++ .../engine/doc/__tests__/sync.spec.ts | 36 +---- .../infra/src/workspace/engine/doc/event.ts | 24 ++-- .../infra/src/workspace/engine/doc/index.ts | 84 ++++++------ .../infra/src/workspace/engine/doc/local.ts | 13 +- .../infra/src/workspace/engine/doc/remote.ts | 39 +++--- .../infra/src/workspace/engine/doc/server.ts | 4 +- .../infra/src/workspace/engine/doc/storage.ts | 66 +++++++-- .../infra/src/workspace/engine/index.ts | 17 ++- packages/common/infra/src/workspace/index.ts | 18 +-- .../common/infra/src/workspace/manager.ts | 2 +- .../common/infra/src/workspace/testing.ts | 11 +- .../new-workspace-setting-detail/export.tsx | 2 +- .../new-workspace-setting-detail/index.tsx | 26 ++++ .../new-workspace-setting-detail/profile.tsx | 8 +- .../hooks/affine/use-doc-engine-status.tsx | 3 +- .../src/modules/infra-web/storage/index.ts | 10 ++ .../src/pages/share/share-detail-page.tsx | 15 +-- .../workspace/detail-page/detail-page.tsx | 3 + .../core/src/pages/workspace/index.tsx | 6 +- packages/frontend/core/src/testing.ts | 2 +- packages/frontend/electron/package.json | 1 + packages/frontend/electron/renderer/index.tsx | 1 + .../electron/renderer/polyfill/dispose.ts | 2 + packages/frontend/i18n/src/resources/en.json | 4 +- packages/frontend/web/package.json | 1 + packages/frontend/web/src/index.tsx | 1 + packages/frontend/web/src/polyfill/dispose.ts | 2 + .../frontend/workspace-impl/src/cloud/doc.ts | 6 +- .../src/cloud/workspace-factory.ts | 6 +- .../workspace-impl/src/local-state.ts | 8 ++ .../src/local/doc-broadcast-channel.ts | 10 +- .../workspace-impl/src/local/doc-indexeddb.ts | 50 ++++++- .../workspace-impl/src/local/doc-sqlite.ts | 44 +++++- .../src/local/workspace-factory.ts | 16 +-- yarn.lock | 9 ++ 41 files changed, 524 insertions(+), 205 deletions(-) create mode 100644 packages/common/infra/src/workspace/engine/doc/README.md create mode 100644 packages/frontend/electron/renderer/polyfill/dispose.ts create mode 100644 packages/frontend/web/src/polyfill/dispose.ts diff --git a/packages/backend/server/src/core/doc/manager.ts b/packages/backend/server/src/core/doc/manager.ts index ae7f48e75b22d..cce9c9cdb9d52 100644 --- a/packages/backend/server/src/core/doc/manager.ts +++ b/packages/backend/server/src/core/doc/manager.ts @@ -393,7 +393,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { if (result) { if ('doc' in result) { return result; - } else if ('snapshot' in result) { + } else { const doc = await this.recoverDoc(result.binary); return { @@ -420,7 +420,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy { binary: Buffer.from(encodeStateAsUpdate(result.doc)), timestamp: result.timestamp, }; - } else if ('snapshot' in result) { + } else { return result; } } diff --git a/packages/common/infra/src/initialization/index.ts b/packages/common/infra/src/initialization/index.ts index f114d411885b4..ce8748ecc0331 100644 --- a/packages/common/infra/src/initialization/index.ts +++ b/packages/common/infra/src/initialization/index.ts @@ -105,7 +105,7 @@ export async function buildShowcaseWorkspace( const { workspace, release } = workspaceManager.open(meta); - await workspace.engine.doc.waitForReady(meta.id); + await workspace.engine.waitForRootDocReady(); const pageRecordList = workspace.services.get(PageRecordList); diff --git a/packages/common/infra/src/page/record-list.ts b/packages/common/infra/src/page/record-list.ts index fa436dfb85329..417f6ef1b08ba 100644 --- a/packages/common/infra/src/page/record-list.ts +++ b/packages/common/infra/src/page/record-list.ts @@ -31,9 +31,9 @@ export class PageRecordList { [] ); - public readonly isReady = this.workspace.engine.doc - .docState(this.workspace.id) - .map(state => !state.syncing); + public readonly isReady = this.workspace.engine.rootDocState.map( + state => !state.syncing + ); public record(id: string) { return this.records.map(record => record.find(record => record.id === id)); diff --git a/packages/common/infra/src/storage/kv.ts b/packages/common/infra/src/storage/kv.ts index 388a44a4b8d7b..09d87654da395 100644 --- a/packages/common/infra/src/storage/kv.ts +++ b/packages/common/infra/src/storage/kv.ts @@ -7,7 +7,9 @@ export interface ByteKV extends ByteKVBehavior { export interface ByteKVBehavior { get(key: string): Promise | Uint8Array | null; set(key: string, value: Uint8Array): Promise | void; + del(key: string): Promise | void; keys(): Promise | string[]; + clear(): Promise | void; } export class MemoryByteKV implements ByteKV { @@ -17,7 +19,7 @@ export class MemoryByteKV implements ByteKV { async transaction(cb: (transaction: ByteKVBehavior) => Promise) { using _lock = await this.lock.acquire(); - return cb({ + return await cb({ get: async key => { return this.db.get(key) ?? null; }, @@ -27,6 +29,12 @@ export class MemoryByteKV implements ByteKV { keys: async () => { return Array.from(this.db.keys()); }, + del: async key => { + this.db.delete(key); + }, + clear: async () => { + this.db.clear(); + }, }); } get(key: string) { @@ -38,6 +46,12 @@ export class MemoryByteKV implements ByteKV { keys() { return this.transaction(async tx => tx.keys()); } + clear() { + return this.transaction(async tx => tx.clear()); + } + del(key: string) { + return this.transaction(async tx => tx.del(key)); + } } export class ReadonlyByteKV extends MemoryByteKV implements ByteKV { @@ -49,4 +63,10 @@ export class ReadonlyByteKV extends MemoryByteKV implements ByteKV { override set(_key: string, _value: Uint8Array): Promise { return Promise.resolve(); } + override del(key: string): Promise { + return super.del(key); + } + override clear(): Promise { + return super.clear(); + } } diff --git a/packages/common/infra/src/storage/memento.ts b/packages/common/infra/src/storage/memento.ts index 525c5ad5ef078..0dc6b3aabb666 100644 --- a/packages/common/infra/src/storage/memento.ts +++ b/packages/common/infra/src/storage/memento.ts @@ -10,6 +10,8 @@ export interface Memento { get(key: string): T | null; watch(key: string): Observable; set(key: string, value: T | null): void; + del(key: string): void; + clear(): void; keys(): string[]; } @@ -58,6 +60,12 @@ export class MemoryMemento implements Memento { keys(): string[] { return Array.from(this.data.keys()); } + clear(): void { + this.data.clear(); + } + del(key: string): void { + this.data.delete(key); + } } export function wrapMemento(memento: Memento, prefix: string): Memento { @@ -77,5 +85,15 @@ export function wrapMemento(memento: Memento, prefix: string): Memento { .filter(k => k.startsWith(prefix)) .map(k => k.slice(prefix.length)); }, + clear() { + memento.keys().forEach(k => { + if (k.startsWith(prefix)) { + memento.del(k); + } + }); + }, + del(key: string): void { + memento.del(prefix + key); + }, }; } diff --git a/packages/common/infra/src/workspace/engine/doc/README.md b/packages/common/infra/src/workspace/engine/doc/README.md new file mode 100644 index 0000000000000..dcd49f3948657 --- /dev/null +++ b/packages/common/infra/src/workspace/engine/doc/README.md @@ -0,0 +1,127 @@ +# DocEngine + +The synchronization algorithm for yjs docs. + +``` + ┌─────────┐ ┌───────────┐ ┌────────┐ + │ Storage ◄──┤ DocEngine ├──► Server │ + └─────────┘ └───────────┘ └────────┘ +``` + +# Core Components + +## DocStorage + +```ts +export interface DocStorage { + eventBus: DocEventBus; + doc: ByteKV; + syncMetadata: ByteKV; + serverClock: ByteKV; +} +``` + +Represents the local storage used, Specific implementations are replaceable, such as `IndexedDBDocStorage` on the `browser` and `SqliteDocStorage` on the `desktop`. + +### DocEventBus + +Each `DocStorage` contains a `DocEventBus`, which is used to communicate with other engines that share the same storage. + +With `DocEventBus` we can sync updates between engines without connecting to the server. + +For example, on the `browser`, we have multiple tabs, all tabs share the same `IndexedDBDocStorage`, so we use `BroadcastChannel` to implement `DocEventBus`, which allows us to broadcast events to all tabs. + +On the `desktop` app, if we have multiple Windows sharing the same `SqliteDocStorage`, we must build a mechanism to broadcast events between all Windows (currently not implemented). + +## DocServer + +```ts +export interface DocServer { + pullDoc( + docId: string, + stateVector: Uint8Array + ): Promise<{ + data: Uint8Array; + serverClock: number; + stateVector?: Uint8Array; + } | null>; + + pushDoc(docId: string, data: Uint8Array): Promise<{ serverClock: number }>; + + subscribeAllDocs(cb: (updates: { docId: string; data: Uint8Array; serverClock: number }) => void): Promise<() => void>; + + loadServerClock(after: number): Promise>; + + waitForConnectingServer(signal: AbortSignal): Promise; + disconnectServer(): void; + onInterrupted(cb: (reason: string) => void): void; +} +``` + +Represents the server we want to synchronize, there is a simulated implementation in `tests/sync.spec.ts`, and the real implementation is in `packages/backend/server`. + +### ServerClock + +`ServerClock` is a clock generated after each updates is stored in the Server. It is used to determine the order in which updates are stored in the Server. + +The `DocEngine` decides whether to pull updates from the server based on the `ServerClock`. + +The `ServerClock` written later must be **greater** than all previously. So on the client side, we can use `loadServerClock(the largest ServerClock previously received)` to obtain all changed `ServerClock`. + +## DocEngine + +The `DocEngine` is where all the synchronization logic actually happens. + +Due to the complexity of the implementation, we divide it into 2 parts. + +## DocEngine - LocalPart + +Synchronizing **the `YDoc` instance** and **storage**. + +The typical workflow is: + +1. load data from storage, apply to `YDoc` instance. +2. track `YDoc` changes +3. write the changes back to storage. + +### SeqNum + +There is a `SeqNum` on each Doc data in `Storage`. Every time `LocalPart` writes data, `SeqNum` will be +1. + +There is also a `PushedSeqNum`, which is used for RemotePart later. + +## DocEngine - RemotePart + +Synchronizing `Storage` and `Server`. + +The typical workflow is: + +1. Connect with the server, Load `ServerClocks` for all docs, Start subscribing to server-side updates. + +2. Check whether each doc requires `push` and `pull` + +3. Execute all push and pull + +4. Listen for updates from `LocalPart` and push the updates to the server + +5. Listen for server-side updates and write them to storage. + +### PushedSeqNum + +Each Doc will record a `PushedSeqNum`, used to determine whether the doc has unpush updates. + +After each `push` is completed, `PushedSeqNum` + 1 + +If `PushedSeqNum` and `SeqNum` are still different after we complete the push (usually means the previous `push` failed) + +Then do a full pull and push and set `pushedSeqNum` = `SeqNum` + +### PulledServerClock + +Each Doc also record `PulledServerClock`, Used to compare with ServerClock to determine whether to `pull` doc. + +When the `pull` is completed, set `PulledServerClock` = `ServerClock` returned by the server. + +### Retry + +The `RemotePart` may fail at any time, and `RemotePart`'s built-in retry mechanism will restart the process in 5 seconds after failure. diff --git a/packages/common/infra/src/workspace/engine/doc/__tests__/sync.spec.ts b/packages/common/infra/src/workspace/engine/doc/__tests__/sync.spec.ts index ee09eae5fcfdc..45bcf27cc196b 100644 --- a/packages/common/infra/src/workspace/engine/doc/__tests__/sync.spec.ts +++ b/packages/common/infra/src/workspace/engine/doc/__tests__/sync.spec.ts @@ -5,8 +5,7 @@ import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs'; import { AsyncLock } from '../../../../utils'; import { DocEngine } from '..'; -import { MemoryEventBus } from '../event'; -import type { Server } from '../server'; +import type { DocServer } from '../server'; import { MemoryStorage } from '../storage'; import { isEmptyUpdate } from '../utils'; @@ -27,7 +26,7 @@ class MiniServer { } } -class MiniServerClient implements Server { +class MiniServerClient implements DocServer { constructor( private readonly id: string, private readonly server: MiniServer @@ -107,7 +106,7 @@ class MiniServerClient implements Server { }; } - async waitingForConnectServer(): Promise {} + async waitForConnectingServer(): Promise {} disconnectServer(): void {} onInterrupted(_cb: (reason: string) => void): void {} } @@ -116,11 +115,7 @@ describe('sync', () => { test('basic sync', async () => { const storage = new MemoryStorage(); const server = new MiniServer(); - const engine = new DocEngine( - storage, - new MemoryEventBus(), - server.client() - ).start(); + const engine = new DocEngine(storage, server.client()).start(); const doc = new YDoc({ guid: 'a' }); engine.addDoc(doc); const map = doc.getMap('aaa'); @@ -136,7 +131,6 @@ describe('sync', () => { { const engine = new DocEngine( new MemoryStorage(), - new MemoryEventBus(), server.client() ).start(); const doc = new YDoc({ guid: 'a' }); @@ -149,7 +143,6 @@ describe('sync', () => { { const engine = new DocEngine( new MemoryStorage(), - new MemoryEventBus(), server.client() ).start(); const doc = new YDoc({ guid: 'a' }); @@ -165,7 +158,6 @@ describe('sync', () => { (async () => { const engine = new DocEngine( new MemoryStorage(), - new MemoryEventBus(), server.client() ).start(); const doc = new YDoc({ guid: 'a' }); @@ -179,7 +171,6 @@ describe('sync', () => { (async () => { const engine = new DocEngine( new MemoryStorage(), - new MemoryEventBus(), server.client() ).start(); const doc = new YDoc({ guid: 'a' }); @@ -196,15 +187,10 @@ describe('sync', () => { test('2 client share storage and eventBus (simulate different tabs in same browser)', async () => { const server = new MiniServer(); const storage = new MemoryStorage(); - const eventBus = new MemoryEventBus(); await Promise.all([ (async () => { - const engine = new DocEngine( - storage, - eventBus, - server.client() - ).start(); + const engine = new DocEngine(storage, server.client()).start(); const doc = new YDoc({ guid: 'a' }); engine.addDoc(doc); @@ -213,11 +199,7 @@ describe('sync', () => { await vitest.waitUntil(() => map.get('b') === 2); })(), (async () => { - const engine = new DocEngine( - storage, - eventBus, - server.client() - ).start(); + const engine = new DocEngine(storage, server.client()).start(); const doc = new YDoc({ guid: 'a' }); engine.addDoc(doc); const map = doc.getMap('aaa'); @@ -240,11 +222,7 @@ describe('sync', () => { await storage.doc.set('a', encodeStateAsUpdate(doc)); } - const engine = new DocEngine( - storage, - new MemoryEventBus(), - server.client() - ).start(); + const engine = new DocEngine(storage, server.client()).start(); const doc = new YDoc({ guid: 'a' }); engine.addDoc(doc); diff --git a/packages/common/infra/src/workspace/engine/doc/event.ts b/packages/common/infra/src/workspace/engine/doc/event.ts index 17d51ed884811..4dc5bac4383a9 100644 --- a/packages/common/infra/src/workspace/engine/doc/event.ts +++ b/packages/common/infra/src/workspace/engine/doc/event.ts @@ -1,4 +1,4 @@ -export type Event = +export type DocEvent = | { type: 'ClientUpdateCommitted'; clientId: string; @@ -18,14 +18,14 @@ export type Event = update: Uint8Array; }; -export interface EventBus { - emit(event: Event): void; - on(cb: (event: Event) => void): () => void; +export interface DocEventBus { + emit(event: DocEvent): void; + on(cb: (event: DocEvent) => void): () => void; } -export class MemoryEventBus implements EventBus { - listeners = new Set<(event: Event) => void>(); - emit(event: Event): void { +export class MemoryDocEventBus implements DocEventBus { + listeners = new Set<(event: DocEvent) => void>(); + emit(event: DocEvent): void { for (const listener of this.listeners) { try { listener(event); @@ -34,7 +34,7 @@ export class MemoryEventBus implements EventBus { } } } - on(cb: (event: Event) => void): () => void { + on(cb: (event: DocEvent) => void): () => void { this.listeners.add(cb); return () => { this.listeners.delete(cb); @@ -42,14 +42,14 @@ export class MemoryEventBus implements EventBus { } } -export class EventBusInner implements EventBus { - constructor(private readonly eventBusBehavior: EventBus) {} +export class DocEventBusInner implements DocEventBus { + constructor(private readonly eventBusBehavior: DocEventBus) {} - emit(event: Event) { + emit(event: DocEvent) { this.eventBusBehavior.emit(event); } - on(cb: (event: Event) => void) { + on(cb: (event: DocEvent) => void) { return this.eventBusBehavior.on(cb); } } diff --git a/packages/common/infra/src/workspace/engine/doc/index.ts b/packages/common/infra/src/workspace/engine/doc/index.ts index 821a6aeea63ac..09316f70868d4 100644 --- a/packages/common/infra/src/workspace/engine/doc/index.ts +++ b/packages/common/infra/src/workspace/engine/doc/index.ts @@ -6,40 +6,31 @@ import type { Doc as YDoc } from 'yjs'; import { createIdentifier } from '../../../di'; import { LiveData } from '../../../livedata'; import { MANUALLY_STOP } from '../../../utils'; -import { type EventBus, EventBusInner } from './event'; import { DocEngineLocalPart } from './local'; import { DocEngineRemotePart } from './remote'; -import type { Server } from './server'; -import { type Storage, StorageInner } from './storage'; +import type { DocServer } from './server'; +import { type DocStorage, DocStorageInner } from './storage'; const logger = new DebugLogger('doc-engine'); -export type { - Event as DocEngineEvent, - EventBus as DocEngineEventBus, -} from './event'; -export { MemoryEventBus as DocEngineMemoryEventBus } from './event'; -export type { Server as DocEngineServer } from './server'; -export type { Storage as DocEngineStorage } from './storage'; +export type { DocEvent, DocEventBus } from './event'; +export { MemoryDocEventBus } from './event'; +export type { DocServer } from './server'; +export type { DocStorage } from './storage'; export { - MemoryStorage as DocEngineMemoryStorage, - ReadonlyStorage as DocEngineReadonlyStorage, + MemoryStorage as MemoryDocStorage, + ReadonlyStorage as ReadonlyDocStorage, } from './storage'; -export const DocEngineEventBusImpl = - createIdentifier('DocEngineEventBus'); +export const DocServerImpl = createIdentifier('DocServer'); -export const DocEngineServerImpl = createIdentifier('DocEngineServer'); - -export const DocEngineStorageImpl = - createIdentifier('DocEngineStorage'); +export const DocStorageImpl = createIdentifier('DocStorage'); export class DocEngine { localPart: DocEngineLocalPart; remotePart: DocEngineRemotePart | null; - storage: StorageInner; - eventBus: EventBusInner; + storage: DocStorageInner; engineState = LiveData.computed(get => { const localState = get(this.localPart.engineState); @@ -77,31 +68,15 @@ export class DocEngine { } constructor( - storage: Storage, - eventBus: EventBus, - private readonly server?: Server | null, - doc?: YDoc + storage: DocStorage, + private readonly server?: DocServer | null ) { const clientId = nanoid(); - this.storage = new StorageInner(storage); - this.eventBus = new EventBusInner(eventBus); - this.localPart = new DocEngineLocalPart( - clientId, - this.storage, - this.eventBus - ); + this.storage = new DocStorageInner(storage); + this.localPart = new DocEngineLocalPart(clientId, this.storage); this.remotePart = this.server - ? new DocEngineRemotePart( - clientId, - this.storage, - this.server, - this.eventBus - ) + ? new DocEngineRemotePart(clientId, this.storage, this.server) : null; - - if (doc) { - this.addDoc(doc); - } } abort = new AbortController(); @@ -125,6 +100,12 @@ export class DocEngine { this.abort.abort(MANUALLY_STOP); } + async resetSyncStatus() { + this.stop(); + await this.storage.clearSyncMetadata(); + await this.storage.clearServerClock(); + } + addDoc(doc: YDoc, withSubDocs = true) { this.localPart.actions.addDoc(doc); this.remotePart?.actions.addDoc(doc.guid); @@ -147,6 +128,10 @@ export class DocEngine { this.remotePart?.setPriority(docId, priority); } + /** + * ## Saved: + * YDoc changes have been saved to storage, and the browser can be safely closed without losing data. + */ waitForSaved() { return new Promise(resolve => { this.engineState @@ -159,6 +144,10 @@ export class DocEngine { }); } + /** + * ## Synced: + * is fully synchronized with the server + */ waitForSynced() { return new Promise(resolve => { this.engineState @@ -171,6 +160,19 @@ export class DocEngine { }); } + /** + * ## Ready: + * + * means that the doc has been loaded and the data can be modified. + * (is not force, you can still modify it if you know you are creating some new data) + * + * this is a temporary solution to deal with the yjs overwrite issue. + * + * if content is loaded from storage + * or if content is pulled from the server, it will be true, otherwise be false. + * + * For example, when opening a doc that is not in storage, ready = false until the content is pulled from the server. + */ waitForReady(docId: string) { return new Promise(resolve => { this.docState(docId) diff --git a/packages/common/infra/src/workspace/engine/doc/local.ts b/packages/common/infra/src/workspace/engine/doc/local.ts index 68cc7a53f8eca..3e07ee82bcef2 100644 --- a/packages/common/infra/src/workspace/engine/doc/local.ts +++ b/packages/common/infra/src/workspace/engine/doc/local.ts @@ -8,8 +8,8 @@ import { applyUpdate, encodeStateAsUpdate, mergeUpdates } from 'yjs'; import { LiveData } from '../../../livedata'; import { throwIfAborted } from '../../../utils'; import { AsyncPriorityQueue } from './async-priority-queue'; -import type { Event, EventBusInner } from './event'; -import type { StorageInner } from './storage'; +import type { DocEvent } from './event'; +import type { DocStorageInner } from './storage'; import { isEmptyUpdate } from './utils'; type Job = @@ -97,12 +97,11 @@ export class DocEngineLocalPart { constructor( private readonly clientId: string, - private readonly storage: StorageInner, - private readonly eventBus: EventBusInner + private readonly storage: DocStorageInner ) {} async mainLoop(signal?: AbortSignal) { - const dispose = this.eventBus.on(event => { + const dispose = this.storage.eventBus.on(event => { const handler = this.events[event.type]; if (handler) { handler(event as any); @@ -213,7 +212,7 @@ export class DocEngineLocalPart { merged, signal ); - this.eventBus.emit({ + this.storage.eventBus.emit({ type: 'ClientUpdateCommitted', seqNum: newSeqNum, docId: docId, @@ -234,7 +233,7 @@ export class DocEngineLocalPart { }; readonly events: { - [key in Event['type']]?: (event: Event & { type: key }) => void; + [key in DocEvent['type']]?: (event: DocEvent & { type: key }) => void; } = { ServerUpdateCommitted: ({ docId, update, clientId }) => { this.schedule({ diff --git a/packages/common/infra/src/workspace/engine/doc/remote.ts b/packages/common/infra/src/workspace/engine/doc/remote.ts index 6a198e4aaecbc..6869bec411abb 100644 --- a/packages/common/infra/src/workspace/engine/doc/remote.ts +++ b/packages/common/infra/src/workspace/engine/doc/remote.ts @@ -7,9 +7,9 @@ import { LiveData } from '../../../livedata'; import { throwIfAborted } from '../../../utils'; import { AsyncPriorityQueue } from './async-priority-queue'; import { ClockMap } from './clock'; -import type { Event, EventBusInner } from './event'; -import type { Server } from './server'; -import type { StorageInner } from './storage'; +import type { DocEvent } from './event'; +import type { DocServer } from './server'; +import type { DocStorageInner } from './storage'; import { isEmptyUpdate } from './utils'; const logger = new DebugLogger('doc-engine:remote'); @@ -67,9 +67,8 @@ export class DocEngineRemotePart { constructor( private readonly clientId: string, - private readonly storage: StorageInner, - private readonly server: Server, - private readonly eventBus: EventBusInner + private readonly storage: DocStorageInner, + private readonly server: DocServer ) {} private status: Status = { @@ -204,29 +203,23 @@ export class DocEngineRemotePart { stateVector: serverStateVector, serverClock, } = serverData; + await this.storage.saveServerClock( + new Map([[docId, serverClock]]), + signal + ); + this.actions.updateServerClock(docId, serverClock); await this.storage.commitDocAsServerUpdate( docId, newData, serverClock, signal ); - this.eventBus.emit({ + this.storage.eventBus.emit({ type: 'ServerUpdateCommitted', docId, clientId: this.clientId, update: newData, }); - this.eventBus.emit({ - type: 'ServerUpdateCommitted', - docId, - clientId: this.clientId, - update: newData, - }); - await this.storage.saveServerClock( - new Map([[docId, serverClock]]), - signal - ); - this.actions.updateServerClock(docId, serverClock); const diff = data && serverStateVector && serverStateVector.length > 0 ? diffUpdate(data, serverStateVector) @@ -275,7 +268,7 @@ export class DocEngineRemotePart { serverClock, signal ); - this.eventBus.emit({ + this.storage.eventBus.emit({ type: 'ServerUpdateCommitted', docId, clientId: this.clientId, @@ -311,7 +304,7 @@ export class DocEngineRemotePart { serverClock, signal ); - this.eventBus.emit({ + this.storage.eventBus.emit({ type: 'ServerUpdateCommitted', docId, clientId: this.clientId, @@ -338,7 +331,7 @@ export class DocEngineRemotePart { }; readonly events: { - [key in Event['type']]?: (event: Event & { type: key }) => void; + [key in DocEvent['type']]?: (event: DocEvent & { type: key }) => void; } = { ClientUpdateCommitted: ({ clientId, docId, seqNum, update }) => { if (clientId !== this.clientId) { @@ -411,7 +404,7 @@ export class DocEngineRemotePart { try { disposes.push( - this.eventBus.on(event => { + this.storage.eventBus.on(event => { const handler = this.events[event.type]; handler?.(event as any); }) @@ -433,7 +426,7 @@ export class DocEngineRemotePart { abort.abort(reason); }); await Promise.race([ - this.server.waitingForConnectServer(signal), + this.server.waitForConnectingServer(signal), new Promise((_, reject) => { setTimeout(() => { reject(new Error('Connect to server timeout')); diff --git a/packages/common/infra/src/workspace/engine/doc/server.ts b/packages/common/infra/src/workspace/engine/doc/server.ts index ed191a8273bcf..fb73fdf6fd26e 100644 --- a/packages/common/infra/src/workspace/engine/doc/server.ts +++ b/packages/common/infra/src/workspace/engine/doc/server.ts @@ -1,4 +1,4 @@ -export interface Server { +export interface DocServer { pullDoc( docId: string, stateVector: Uint8Array @@ -20,7 +20,7 @@ export interface Server { }) => void ): Promise<() => void>; - waitingForConnectServer(signal: AbortSignal): Promise; + waitForConnectingServer(signal: AbortSignal): Promise; disconnectServer(): void; onInterrupted(cb: (reason: string) => void): void; } diff --git a/packages/common/infra/src/workspace/engine/doc/storage.ts b/packages/common/infra/src/workspace/engine/doc/storage.ts index 6b0299fc79f7a..f87717d8395b1 100644 --- a/packages/common/infra/src/workspace/engine/doc/storage.ts +++ b/packages/common/infra/src/workspace/engine/doc/storage.ts @@ -6,9 +6,12 @@ import { wrapMemento, } from '../../../storage'; import { AsyncLock, mergeUpdates, throwIfAborted } from '../../../utils'; +import type { DocEventBus } from '.'; +import { DocEventBusInner, MemoryDocEventBus } from './event'; import { isEmptyUpdate } from './utils'; -export interface Storage { +export interface DocStorage { + eventBus: DocEventBus; doc: ByteKV; syncMetadata: ByteKV; serverClock: ByteKV; @@ -36,8 +39,9 @@ const Values = { }, }; -export class StorageInner { - constructor(public readonly behavior: Storage) {} +export class DocStorageInner { + public readonly eventBus = new DocEventBusInner(this.behavior.eventBus); + constructor(public readonly behavior: DocStorage) {} async loadServerClock(signal?: AbortSignal): Promise> { throwIfAborted(signal); @@ -212,23 +216,33 @@ export class StorageInner { return await this.saveDocSeqNum(docId, true); } + + clearSyncMetadata() { + return this.behavior.syncMetadata.clear(); + } + + async clearServerClock() { + return this.behavior.serverClock.clear(); + } } -export class ReadonlyStorage implements Storage { +export class ReadonlyStorage implements DocStorage { constructor( private readonly map: { [key: string]: Uint8Array; } ) {} + eventBus = new MemoryDocEventBus(); doc = new ReadonlyByteKV(new Map(Object.entries(this.map))); serverClock = new ReadonlyByteKV(); syncMetadata = new ReadonlyByteKV(); } -export class MemoryStorage implements Storage { +export class MemoryStorage implements DocStorage { constructor(private readonly memo: Memento = new MemoryMemento()) {} + eventBus = new MemoryDocEventBus(); lock = new AsyncLock(); readonly docDb = wrapMemento(this.memo, 'doc:'); readonly syncMetadataDb = wrapMemento(this.memo, 'syncMetadata:'); @@ -237,7 +251,7 @@ export class MemoryStorage implements Storage { readonly doc = { transaction: async cb => { using _lock = await this.lock.acquire(); - return cb({ + return await cb({ get: async key => { return this.docDb.get(key) ?? null; }, @@ -247,6 +261,12 @@ export class MemoryStorage implements Storage { keys: async () => { return Array.from(this.docDb.keys()); }, + clear: () => { + this.docDb.clear(); + }, + del: key => { + this.docDb.del(key); + }, }); }, get(key) { @@ -258,12 +278,18 @@ export class MemoryStorage implements Storage { keys() { return this.transaction(async tx => tx.keys()); }, + clear() { + return this.transaction(async tx => tx.clear()); + }, + del(key) { + return this.transaction(async tx => tx.del(key)); + }, } satisfies ByteKV; readonly syncMetadata = { transaction: async cb => { using _lock = await this.lock.acquire(); - return cb({ + return await cb({ get: async key => { return this.syncMetadataDb.get(key) ?? null; }, @@ -273,6 +299,12 @@ export class MemoryStorage implements Storage { keys: async () => { return Array.from(this.syncMetadataDb.keys()); }, + clear: () => { + this.syncMetadataDb.clear(); + }, + del: key => { + this.syncMetadataDb.del(key); + }, }); }, get(key) { @@ -284,12 +316,18 @@ export class MemoryStorage implements Storage { keys() { return this.transaction(async tx => tx.keys()); }, + clear() { + return this.transaction(async tx => tx.clear()); + }, + del(key) { + return this.transaction(async tx => tx.del(key)); + }, } satisfies ByteKV; readonly serverClock = { transaction: async cb => { using _lock = await this.lock.acquire(); - return cb({ + return await cb({ get: async key => { return this.serverClockDb.get(key) ?? null; }, @@ -299,6 +337,12 @@ export class MemoryStorage implements Storage { keys: async () => { return Array.from(this.serverClockDb.keys()); }, + clear: () => { + this.serverClockDb.clear(); + }, + del: key => { + this.serverClockDb.del(key); + }, }); }, get(key) { @@ -310,5 +354,11 @@ export class MemoryStorage implements Storage { keys() { return this.transaction(async tx => tx.keys()); }, + clear() { + return this.transaction(async tx => tx.clear()); + }, + del(key) { + return this.transaction(async tx => tx.del(key)); + }, } satisfies ByteKV; } diff --git a/packages/common/infra/src/workspace/engine/index.ts b/packages/common/infra/src/workspace/engine/index.ts index 9eb1293c2271f..0d90f23abe4c7 100644 --- a/packages/common/infra/src/workspace/engine/index.ts +++ b/packages/common/infra/src/workspace/engine/index.ts @@ -1,4 +1,5 @@ import { Slot } from '@blocksuite/global/utils'; +import type { Doc as YDoc } from 'yjs'; import { throwIfAborted } from '../../utils/throw-if-aborted'; import type { AwarenessEngine } from './awareness'; @@ -30,7 +31,8 @@ export class WorkspaceEngine { constructor( public blob: BlobEngine, public doc: DocEngine, - public awareness: AwarenessEngine + public awareness: AwarenessEngine, + private readonly yDoc: YDoc ) { this._status = { blob: blob.status, @@ -40,6 +42,7 @@ export class WorkspaceEngine { blob: status, }; }); + this.doc.addDoc(yDoc); } start() { @@ -63,6 +66,18 @@ export class WorkspaceEngine { this.awareness.disconnect(); this.blob.stop(); } + + docEngineState = this.doc.engineState; + + rootDocState = this.doc.docState(this.yDoc.guid); + + waitForSynced() { + return this.doc.waitForSynced(); + } + + waitForRootDocReady() { + return this.doc.waitForReady(this.yDoc.guid); + } } export * from './awareness'; diff --git a/packages/common/infra/src/workspace/index.ts b/packages/common/infra/src/workspace/index.ts index 01c715b782b28..15b02d8663ae5 100644 --- a/packages/common/infra/src/workspace/index.ts +++ b/packages/common/infra/src/workspace/index.ts @@ -24,9 +24,8 @@ import { AwarenessProvider, BlobEngine, DocEngine, - DocEngineEventBusImpl, - DocEngineServerImpl, - DocEngineStorageImpl, + DocServerImpl, + DocStorageImpl, LocalBlobStorage, RemoteBlobStorage, WorkspaceEngine, @@ -64,15 +63,18 @@ export function configureWorkspaceServices(services: ServiceCollection) { WorkspaceUpgradeController, ServiceProvider, ]) - .add(WorkspaceEngine, [BlobEngine, DocEngine, AwarenessEngine]) + .add(WorkspaceEngine, [ + BlobEngine, + DocEngine, + AwarenessEngine, + RootYDocContext, + ]) .add(AwarenessEngine, [[AwarenessProvider]]) .add(BlobEngine, [LocalBlobStorage, [RemoteBlobStorage]]) .addImpl(DocEngine, services => { return new DocEngine( - services.get(DocEngineStorageImpl), - services.get(DocEngineEventBusImpl), - services.getOptional(DocEngineServerImpl), - services.get(RootYDocContext) + services.get(DocStorageImpl), + services.getOptional(DocServerImpl) ); }) .add(WorkspaceUpgradeController, [ diff --git a/packages/common/infra/src/workspace/manager.ts b/packages/common/infra/src/workspace/manager.ts index 1a3d52c6dc185..da3ad835cd170 100644 --- a/packages/common/infra/src/workspace/manager.ts +++ b/packages/common/infra/src/workspace/manager.ts @@ -126,7 +126,7 @@ export class WorkspaceManager { async transformLocalToCloud(local: Workspace): Promise { assertEquals(local.flavour, WorkspaceFlavour.LOCAL); - await local.engine.doc.waitForSynced(); + await local.engine.waitForSynced(); const newId = await this.list.create( WorkspaceFlavour.AFFINE_CLOUD, diff --git a/packages/common/infra/src/workspace/testing.ts b/packages/common/infra/src/workspace/testing.ts index 547399608e564..6f6c664601113 100644 --- a/packages/common/infra/src/workspace/testing.ts +++ b/packages/common/infra/src/workspace/testing.ts @@ -10,12 +10,10 @@ import { WorkspaceMetadataContext } from './context'; import { AwarenessProvider, type BlobStorage, - DocEngineEventBusImpl, - DocEngineMemoryStorage, - DocEngineStorageImpl, + DocStorageImpl, LocalBlobStorage, + MemoryDocStorage, } from './engine'; -import { MemoryEventBus } from './engine/doc/event'; import { MemoryStorage } from './engine/doc/storage'; import type { WorkspaceFactory } from './factory'; import { globalBlockSuiteSchema } from './global-schema'; @@ -30,7 +28,7 @@ export class TestingLocalWorkspaceListProvider implements WorkspaceListProvider { name = WorkspaceFlavour.LOCAL; - docStorage = new DocEngineMemoryStorage(this.state); + docStorage = new MemoryDocStorage(this.state); constructor(private readonly state: Memento) {} @@ -138,8 +136,7 @@ export class TestingLocalWorkspaceFactory implements WorkspaceFactory { WorkspaceMetadataContext, GlobalState, ]) - .addImpl(DocEngineStorageImpl, MemoryStorage, [GlobalState]) - .addImpl(DocEngineEventBusImpl, MemoryEventBus) + .addImpl(DocStorageImpl, MemoryStorage, [GlobalState]) .addImpl(AwarenessProvider, TestingAwarenessProvider); } diff --git a/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/export.tsx b/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/export.tsx index 9a46da955fbdb..86df117517e6d 100644 --- a/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/export.tsx +++ b/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/export.tsx @@ -31,7 +31,7 @@ export const ExportPanel = ({ setSaving(true); try { if (isOnline) { - await workspace.engine.doc.waitForSynced(); + await workspace.engine.waitForSynced(); await workspace.engine.blob.sync(); } diff --git a/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/index.tsx b/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/index.tsx index 2d178dd85e44d..76d7afe3d8ee0 100644 --- a/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/index.tsx +++ b/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/index.tsx @@ -8,6 +8,8 @@ import { useWorkspace } from '@affine/core/hooks/use-workspace'; import { useWorkspaceInfo } from '@affine/core/hooks/use-workspace-info'; import { UNTITLED_WORKSPACE_NAME } from '@affine/env/constant'; import { useAFFiNEI18N } from '@affine/i18n/hooks'; +import { ArrowRightSmallIcon } from '@blocksuite/icons'; +import { useCallback } from 'react'; import { DeleteLeaveWorkspace } from './delete-leave-workspace'; import { EnableCloudPanel } from './enable-cloud'; @@ -29,6 +31,17 @@ export const WorkspaceSettingDetail = (props: WorkspaceSettingDetailProps) => { const workspaceInfo = useWorkspaceInfo(workspaceMetadata); + const handleResetSyncStatus = useCallback(() => { + workspace?.engine.doc + .resetSyncStatus() + .then(() => { + window.location.reload(); + }) + .catch(err => { + console.error(err); + }); + }, [workspace]); + return ( <> { )} + + {t['com.affine.resetSyncStatus.button']()} + + } + desc={t['com.affine.resetSyncStatus.description']()} + style={{ cursor: 'pointer' }} + onClick={handleResetSyncStatus} + data-testid="reset-sync-status" + > + + ); diff --git a/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/profile.tsx b/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/profile.tsx index d17554f10dd23..7d57995ce8c30 100644 --- a/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/profile.tsx +++ b/packages/frontend/core/src/components/affine/setting-modal/workspace-setting/new-workspace-setting-detail/profile.tsx @@ -31,9 +31,7 @@ export const ProfilePanel = ({ isOwner, workspace }: ProfilePanelProps) => { const t = useAFFiNEI18N(); const pushNotification = useSetAtom(pushNotificationAtom); - const workspaceIsLoading = !useLiveData( - workspace?.engine.doc.docState(workspace.id) - )?.ready; + const workspaceIsReady = useLiveData(workspace?.engine.rootDocState)?.ready; const [avatarBlob, setAvatarBlob] = useState(null); const [name, setName] = useState(''); @@ -153,7 +151,7 @@ export const ProfilePanel = ({ isOwner, workspace }: ProfilePanelProps) => { [pushNotification, setWorkspaceAvatar] ); - const canAdjustAvatar = !workspaceIsLoading && avatarUrl && isOwner; + const canAdjustAvatar = workspaceIsReady && avatarUrl && isOwner; return (
@@ -189,7 +187,7 @@ export const ProfilePanel = ({ isOwner, workspace }: ProfilePanelProps) => {
{t['Workspace Name']()}
{ ]) .addImpl(RemoteBlobStorage('static'), StaticBlobStorage) .addImpl( - DocEngineStorageImpl, - new DocEngineReadonlyStorage({ + DocStorageImpl, + new ReadonlyDocStorage({ [workspaceId]: new Uint8Array(workspaceArrayBuffer), [pageId]: new Uint8Array(pageArrayBuffer), }) - ) - .addImpl(DocEngineEventBusImpl, new DocEngineMemoryEventBus()); + ); } ); - workspace.engine.doc + workspace.engine .waitForSynced() .then(() => { const { page } = workspace.services.get(PageManager).open(pageId); diff --git a/packages/frontend/core/src/pages/workspace/detail-page/detail-page.tsx b/packages/frontend/core/src/pages/workspace/detail-page/detail-page.tsx index 127f2e25627ca..b4d675250798d 100644 --- a/packages/frontend/core/src/pages/workspace/detail-page/detail-page.tsx +++ b/packages/frontend/core/src/pages/workspace/detail-page/detail-page.tsx @@ -291,6 +291,9 @@ export const DetailPage = ({ pageId }: { pageId: string }): ReactElement => { // set sync engine priority target useEffect(() => { currentWorkspace.setPriorityLoad(pageId, 10); + return () => { + currentWorkspace.setPriorityLoad(pageId, 5); + }; }, [currentWorkspace, pageId]); const jumpOnce = useLiveData(pageRecord?.meta.map(meta => meta.jumpOnce)); diff --git a/packages/frontend/core/src/pages/workspace/index.tsx b/packages/frontend/core/src/pages/workspace/index.tsx index 541db9e5627c8..b5bb1db848674 100644 --- a/packages/frontend/core/src/pages/workspace/index.tsx +++ b/packages/frontend/core/src/pages/workspace/index.tsx @@ -70,8 +70,8 @@ export const Component = (): ReactElement => { }, [meta, workspaceManager, workspace, currentWorkspaceService]); // avoid doing operation, before workspace is loaded - const isRootDocLoaded = - useLiveData(workspace?.engine.doc.docState(workspace?.id))?.ready ?? false; + const isRootDocReady = + useLiveData(workspace?.engine.rootDocState)?.ready ?? false; // if listLoading is false, we can show 404 page, otherwise we should show loading page. if (listLoading === false && meta === undefined) { @@ -82,7 +82,7 @@ export const Component = (): ReactElement => { return ; } - if (!isRootDocLoaded) { + if (!isRootDocReady) { return ( diff --git a/packages/frontend/core/src/testing.ts b/packages/frontend/core/src/testing.ts index 77232b3c5b8ab..577068a353f47 100644 --- a/packages/frontend/core/src/testing.ts +++ b/packages/frontend/core/src/testing.ts @@ -34,7 +34,7 @@ export async function configureTestingEnvironment() { }) ); - await workspace.engine.doc.waitForSynced(); + await workspace.engine.waitForSynced(); const { page } = workspace.services.get(PageManager).open('page0'); diff --git a/packages/frontend/electron/package.json b/packages/frontend/electron/package.json index 5e2f5d3d04586..db2ba68f3c17f 100644 --- a/packages/frontend/electron/package.json +++ b/packages/frontend/electron/package.json @@ -46,6 +46,7 @@ "@toeverything/infra": "workspace:*", "@types/uuid": "^9.0.8", "builder-util-runtime": "^9.2.4", + "core-js": "^3.36.1", "cross-env": "^7.0.3", "electron": "^29.0.1", "electron-log": "^5.1.1", diff --git a/packages/frontend/electron/renderer/index.tsx b/packages/frontend/electron/renderer/index.tsx index 5a425e0098672..77bcf95f09c4a 100644 --- a/packages/frontend/electron/renderer/index.tsx +++ b/packages/frontend/electron/renderer/index.tsx @@ -1,3 +1,4 @@ +import './polyfill/dispose'; // Side effect import, "declare global" import '@affine/env/constant'; diff --git a/packages/frontend/electron/renderer/polyfill/dispose.ts b/packages/frontend/electron/renderer/polyfill/dispose.ts new file mode 100644 index 0000000000000..615ed233c788b --- /dev/null +++ b/packages/frontend/electron/renderer/polyfill/dispose.ts @@ -0,0 +1,2 @@ +import 'core-js/modules/esnext.symbol.async-dispose'; +import 'core-js/modules/esnext.symbol.dispose'; diff --git a/packages/frontend/i18n/src/resources/en.json b/packages/frontend/i18n/src/resources/en.json index 3590122d8d9d2..397549947d7c3 100644 --- a/packages/frontend/i18n/src/resources/en.json +++ b/packages/frontend/i18n/src/resources/en.json @@ -1165,5 +1165,7 @@ "com.affine.delete-tags.count_other": "{{count}} tags deleted", "com.affine.workbench.split-view-menu.keep-this-one": "Solo View", "com.affine.workbench.split-view.page-menu-open": "Open in split view", - "com.affine.search-tags.placeholder": "Type here ..." + "com.affine.search-tags.placeholder": "Type here ...", + "com.affine.resetSyncStatus.button": "Reset Sync", + "com.affine.resetSyncStatus.description": "This operation may fix some synchronization issues." } diff --git a/packages/frontend/web/package.json b/packages/frontend/web/package.json index afa91f82eb2a0..719213d6a1986 100644 --- a/packages/frontend/web/package.json +++ b/packages/frontend/web/package.json @@ -14,6 +14,7 @@ "@affine/core": "workspace:*", "@affine/env": "workspace:*", "@juggle/resize-observer": "^3.4.0", + "core-js": "^3.36.1", "intl-segmenter-polyfill-rs": "^0.1.7", "react": "^18.2.0", "react-dom": "^18.2.0" diff --git a/packages/frontend/web/src/index.tsx b/packages/frontend/web/src/index.tsx index a2b6ed2296c35..7833e9228dc89 100644 --- a/packages/frontend/web/src/index.tsx +++ b/packages/frontend/web/src/index.tsx @@ -1,3 +1,4 @@ +import './polyfill/dispose'; import './polyfill/intl-segmenter'; import './polyfill/request-idle-callback'; import './polyfill/resize-observer'; diff --git a/packages/frontend/web/src/polyfill/dispose.ts b/packages/frontend/web/src/polyfill/dispose.ts new file mode 100644 index 0000000000000..615ed233c788b --- /dev/null +++ b/packages/frontend/web/src/polyfill/dispose.ts @@ -0,0 +1,2 @@ +import 'core-js/modules/esnext.symbol.async-dispose'; +import 'core-js/modules/esnext.symbol.dispose'; diff --git a/packages/frontend/workspace-impl/src/cloud/doc.ts b/packages/frontend/workspace-impl/src/cloud/doc.ts index 45655833d1c6a..ee0253cada41d 100644 --- a/packages/frontend/workspace-impl/src/cloud/doc.ts +++ b/packages/frontend/workspace-impl/src/cloud/doc.ts @@ -1,5 +1,5 @@ import { DebugLogger } from '@affine/debug'; -import { type DocEngineServer, throwIfAborted } from '@toeverything/infra'; +import { type DocServer, throwIfAborted } from '@toeverything/infra'; import type { Socket } from 'socket.io-client'; import { getIoManager } from '../utils/affine-io'; @@ -9,7 +9,7 @@ import { base64ToUint8Array, uint8ArrayToBase64 } from '../utils/base64'; const logger = new DebugLogger('affine-cloud-doc-engine-server'); -export class AffineCloudDocEngineServer implements DocEngineServer { +export class AffineCloudDocEngineServer implements DocServer { socket = null as unknown as Socket; interruptCb: ((reason: string) => void) | null = null; SEND_TIMEOUT = 30000; @@ -135,7 +135,7 @@ export class AffineCloudDocEngineServer implements DocEngineServer { this.socket.off('server-updates', handleUpdate); }; } - async waitingForConnectServer(signal: AbortSignal): Promise { + async waitForConnectingServer(signal: AbortSignal): Promise { const socket = getIoManager().socket('/'); this.socket = socket; this.socket.on('server-version-rejected', this.handleVersionRejected); diff --git a/packages/frontend/workspace-impl/src/cloud/workspace-factory.ts b/packages/frontend/workspace-impl/src/cloud/workspace-factory.ts index e5a080e36e591..60da818741637 100644 --- a/packages/frontend/workspace-impl/src/cloud/workspace-factory.ts +++ b/packages/frontend/workspace-impl/src/cloud/workspace-factory.ts @@ -3,7 +3,7 @@ import type { WorkspaceFactory } from '@toeverything/infra'; import { AwarenessContext, AwarenessProvider, - DocEngineServerImpl, + DocServerImpl, RemoteBlobStorage, WorkspaceIdContext, WorkspaceScope, @@ -28,9 +28,7 @@ export class CloudWorkspaceFactory implements WorkspaceFactory { .addImpl(RemoteBlobStorage('affine-cloud'), AffineCloudBlobStorage, [ WorkspaceIdContext, ]) - .addImpl(DocEngineServerImpl, AffineCloudDocEngineServer, [ - WorkspaceIdContext, - ]) + .addImpl(DocServerImpl, AffineCloudDocEngineServer, [WorkspaceIdContext]) .addImpl( AwarenessProvider('affine-cloud'), AffineCloudAwarenessProvider, diff --git a/packages/frontend/workspace-impl/src/local-state.ts b/packages/frontend/workspace-impl/src/local-state.ts index e280f7a13b00f..fa4abf94300b2 100644 --- a/packages/frontend/workspace-impl/src/local-state.ts +++ b/packages/frontend/workspace-impl/src/local-state.ts @@ -27,4 +27,12 @@ export class WorkspaceLocalStateImpl implements WorkspaceLocalState { set(key: string, value: T | null): void { return this.wrapped.set(key, value); } + + del(key: string): void { + return this.wrapped.del(key); + } + + clear(): void { + return this.wrapped.clear(); + } } diff --git a/packages/frontend/workspace-impl/src/local/doc-broadcast-channel.ts b/packages/frontend/workspace-impl/src/local/doc-broadcast-channel.ts index 4cf3a0032830d..e04a9d9f3dc90 100644 --- a/packages/frontend/workspace-impl/src/local/doc-broadcast-channel.ts +++ b/packages/frontend/workspace-impl/src/local/doc-broadcast-channel.ts @@ -1,4 +1,4 @@ -import type { DocEngineEvent, DocEngineEventBus } from '@toeverything/infra'; +import type { DocEvent, DocEventBus } from '@toeverything/infra'; type LegacyChannelMessage = { type: 'db-updated'; @@ -9,7 +9,7 @@ type LegacyChannelMessage = { __from_new_doc_engine?: boolean; }; -export class BroadcastChannelDocEngineEventBus implements DocEngineEventBus { +export class BroadcastChannelDocEventBus implements DocEventBus { legacyChannel = new BroadcastChannel('indexeddb:' + this.workspaceId); senderChannel = new BroadcastChannel('doc:' + this.workspaceId); constructor(private readonly workspaceId: string) { @@ -29,7 +29,7 @@ export class BroadcastChannelDocEngineEventBus implements DocEngineEventBus { } ); } - emit(event: DocEngineEvent): void { + emit(event: DocEvent): void { if ( event.type === 'ClientUpdateCommitted' || event.type === 'ServerUpdateCommitted' @@ -46,8 +46,8 @@ export class BroadcastChannelDocEngineEventBus implements DocEngineEventBus { this.senderChannel.postMessage(event); } - on(cb: (event: DocEngineEvent) => void): () => void { - const listener = (event: MessageEvent) => { + on(cb: (event: DocEvent) => void): () => void { + const listener = (event: MessageEvent) => { cb(event.data); }; const channel = new BroadcastChannel('doc:' + this.workspaceId); diff --git a/packages/frontend/workspace-impl/src/local/doc-indexeddb.ts b/packages/frontend/workspace-impl/src/local/doc-indexeddb.ts index a30843f394dff..c774caa21e465 100644 --- a/packages/frontend/workspace-impl/src/local/doc-indexeddb.ts +++ b/packages/frontend/workspace-impl/src/local/doc-indexeddb.ts @@ -1,8 +1,4 @@ -import type { - ByteKV, - ByteKVBehavior, - DocEngineStorage, -} from '@toeverything/infra'; +import type { ByteKV, ByteKVBehavior, DocStorage } from '@toeverything/infra'; import { type DBSchema, type IDBPDatabase, @@ -11,6 +7,8 @@ import { } from 'idb'; import { mergeUpdates } from 'yjs'; +import { BroadcastChannelDocEventBus } from './doc-broadcast-channel'; + function isEmptyUpdate(binary: Uint8Array) { return ( binary.byteLength === 0 || @@ -18,8 +16,9 @@ function isEmptyUpdate(binary: Uint8Array) { ); } -export class IndexedDBDocStorage implements DocEngineStorage { +export class IndexedDBDocStorage implements DocStorage { constructor(private readonly workspaceId: string) {} + eventBus = new BroadcastChannelDocEventBus(this.workspaceId); readonly doc = new Doc(); readonly syncMetadata = new KV(`${this.workspaceId}:sync-metadata`); readonly serverClock = new KV(`${this.workspaceId}:server-clock`); @@ -38,7 +37,7 @@ interface DocDBSchema extends DBSchema { }; } -type DocType = DocEngineStorage['doc']; +type DocType = DocStorage['doc']; class Doc implements DocType { dbName = 'affine-local'; dbPromise: Promise> | null = null; @@ -100,6 +99,14 @@ class Doc implements DocType { return store.getAllKeys(); } + clear(): void | Promise { + return; + } + + del(_key: string): void | Promise { + return; + } + async transaction( cb: (transaction: ByteKVBehavior) => Promise ): Promise { @@ -130,6 +137,12 @@ class Doc implements DocType { updates: rows, }); }, + async clear() { + return await store.clear(); + }, + async del(key) { + return store.delete(key); + }, }); } } @@ -185,6 +198,16 @@ class KV implements ByteKV { const store = db.transaction('kv', 'readwrite').objectStore('kv'); return new KVBehavior(store).keys(); } + async clear() { + const db = await this.getDb(); + const store = db.transaction('kv', 'readwrite').objectStore('kv'); + return new KVBehavior(store).clear(); + } + async del(key: string) { + const db = await this.getDb(); + const store = db.transaction('kv', 'readwrite').objectStore('kv'); + return new KVBehavior(store).del(key); + } } class KVBehavior implements ByteKVBehavior { @@ -207,4 +230,17 @@ class KVBehavior implements ByteKVBehavior { async keys(): Promise { return await this.store.getAllKeys(); } + async del(key: string) { + if (this.store.delete === undefined) { + throw new Error('Cannot set in a readonly transaction'); + } + return await this.store.delete(key); + } + + async clear() { + if (this.store.clear === undefined) { + throw new Error('Cannot set in a readonly transaction'); + } + return await this.store.clear(); + } } diff --git a/packages/frontend/workspace-impl/src/local/doc-sqlite.ts b/packages/frontend/workspace-impl/src/local/doc-sqlite.ts index f09b8280473ff..8ef4507cdc8a9 100644 --- a/packages/frontend/workspace-impl/src/local/doc-sqlite.ts +++ b/packages/frontend/workspace-impl/src/local/doc-sqlite.ts @@ -3,7 +3,8 @@ import { AsyncLock, type ByteKV, type ByteKVBehavior, - type DocEngineStorage, + type DocStorage, + MemoryDocEventBus, } from '@toeverything/infra'; import { type DBSchema, @@ -12,14 +13,15 @@ import { openDB, } from 'idb'; -export class SqliteDocStorage implements DocEngineStorage { +export class SqliteDocStorage implements DocStorage { constructor(private readonly workspaceId: string) {} + eventBus = new MemoryDocEventBus(); readonly doc = new Doc(this.workspaceId); readonly syncMetadata = new KV(`${this.workspaceId}:sync-metadata`); readonly serverClock = new KV(`${this.workspaceId}:server-clock`); } -type DocType = DocEngineStorage['doc']; +type DocType = DocStorage['doc']; class Doc implements DocType { lock = new AsyncLock(); @@ -33,8 +35,7 @@ class Doc implements DocType { cb: (transaction: ByteKVBehavior) => Promise ): Promise { using _lock = await this.lock.acquire(); - const result = await cb(this); - return result; + return await cb(this); } keys(): string[] | Promise { @@ -74,6 +75,14 @@ class Doc implements DocType { this.workspaceId === docId ? undefined : docId ); } + + clear(): void | Promise { + return; + } + + del(): void | Promise { + return; + } } interface KvDBSchema extends DBSchema { @@ -127,12 +136,23 @@ class KV implements ByteKV { const store = db.transaction('kv', 'readwrite').objectStore('kv'); return new KVBehavior(store).keys(); } + async clear() { + const db = await this.getDb(); + const store = db.transaction('kv', 'readwrite').objectStore('kv'); + return new KVBehavior(store).clear(); + } + async del(key: string) { + const db = await this.getDb(); + const store = db.transaction('kv', 'readwrite').objectStore('kv'); + return new KVBehavior(store).del(key); + } } class KVBehavior implements ByteKVBehavior { constructor( private readonly store: IDBPObjectStore ) {} + async get(key: string): Promise { const value = await this.store.get(key); return value?.val ?? null; @@ -149,4 +169,18 @@ class KVBehavior implements ByteKVBehavior { async keys(): Promise { return await this.store.getAllKeys(); } + + async del(key: string) { + if (this.store.delete === undefined) { + throw new Error('Cannot set in a readonly transaction'); + } + return await this.store.delete(key); + } + + async clear() { + if (this.store.clear === undefined) { + throw new Error('Cannot set in a readonly transaction'); + } + return await this.store.clear(); + } } diff --git a/packages/frontend/workspace-impl/src/local/workspace-factory.ts b/packages/frontend/workspace-impl/src/local/workspace-factory.ts index e7e8765ee0ded..f755eb9ae289d 100644 --- a/packages/frontend/workspace-impl/src/local/workspace-factory.ts +++ b/packages/frontend/workspace-impl/src/local/workspace-factory.ts @@ -2,8 +2,7 @@ import type { ServiceCollection, WorkspaceFactory } from '@toeverything/infra'; import { AwarenessContext, AwarenessProvider, - DocEngineEventBusImpl, - DocEngineStorageImpl, + DocStorageImpl, LocalBlobStorage, RemoteBlobStorage, WorkspaceIdContext, @@ -14,7 +13,6 @@ import { BroadcastChannelAwarenessProvider } from './awareness'; import { IndexedDBBlobStorage } from './blob-indexeddb'; import { SQLiteBlobStorage } from './blob-sqlite'; import { StaticBlobStorage } from './blob-static'; -import { BroadcastChannelDocEngineEventBus } from './doc-broadcast-channel'; import { IndexedDBDocStorage } from './doc-indexeddb'; import { SqliteDocStorage } from './doc-sqlite'; @@ -25,20 +23,12 @@ export class LocalWorkspaceFactory implements WorkspaceFactory { services .scope(WorkspaceScope) .addImpl(LocalBlobStorage, SQLiteBlobStorage, [WorkspaceIdContext]) - .addImpl(DocEngineStorageImpl, SqliteDocStorage, [WorkspaceIdContext]) - .addImpl(DocEngineEventBusImpl, BroadcastChannelDocEngineEventBus, [ - WorkspaceIdContext, - ]); + .addImpl(DocStorageImpl, SqliteDocStorage, [WorkspaceIdContext]); } else { services .scope(WorkspaceScope) .addImpl(LocalBlobStorage, IndexedDBBlobStorage, [WorkspaceIdContext]) - .addImpl(DocEngineStorageImpl, IndexedDBDocStorage, [ - WorkspaceIdContext, - ]) - .addImpl(DocEngineEventBusImpl, BroadcastChannelDocEngineEventBus, [ - WorkspaceIdContext, - ]); + .addImpl(DocStorageImpl, IndexedDBDocStorage, [WorkspaceIdContext]); } services diff --git a/yarn.lock b/yarn.lock index 879a1d861bb09..d69854816b5bf 100644 --- a/yarn.lock +++ b/yarn.lock @@ -468,6 +468,7 @@ __metadata: "@types/uuid": "npm:^9.0.8" async-call-rpc: "npm:^6.4.0" builder-util-runtime: "npm:^9.2.4" + core-js: "npm:^3.36.1" cross-env: "npm:^7.0.3" electron: "npm:^29.0.1" electron-log: "npm:^5.1.1" @@ -820,6 +821,7 @@ __metadata: "@juggle/resize-observer": "npm:^3.4.0" "@types/react": "npm:^18.2.60" "@types/react-dom": "npm:^18.2.19" + core-js: "npm:^3.36.1" intl-segmenter-polyfill-rs: "npm:^0.1.7" react: "npm:^18.2.0" react-dom: "npm:^18.2.0" @@ -18377,6 +18379,13 @@ __metadata: languageName: node linkType: hard +"core-js@npm:^3.36.1": + version: 3.36.1 + resolution: "core-js@npm:3.36.1" + checksum: 10/ce1e1bfc1034b6f2ff7c91077319e8abdd650ee606ffe6e80073e64ab9d8aad2d6a6d953461b01f331a6f796ad2fd766a3386b88aa371b45d44fa7c0b9913ce6 + languageName: node + linkType: hard + "core-util-is@npm:~1.0.0": version: 1.0.3 resolution: "core-util-is@npm:1.0.3"