Skip to content

Commit

Permalink
fix(server): wrong doc response branch tests (#6250)
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo authored and EYHN committed Mar 22, 2024
1 parent 4d6a19c commit 6aaf83a
Show file tree
Hide file tree
Showing 41 changed files with 524 additions and 205 deletions.
4 changes: 2 additions & 2 deletions packages/backend/server/src/core/doc/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
if (result) {
if ('doc' in result) {
return result;
} else if ('snapshot' in result) {
} else {
const doc = await this.recoverDoc(result.binary);

return {
Expand All @@ -420,7 +420,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
binary: Buffer.from(encodeStateAsUpdate(result.doc)),
timestamp: result.timestamp,
};
} else if ('snapshot' in result) {
} else {
return result;
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/common/infra/src/initialization/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ export async function buildShowcaseWorkspace(

const { workspace, release } = workspaceManager.open(meta);

await workspace.engine.doc.waitForReady(meta.id);
await workspace.engine.waitForRootDocReady();

const pageRecordList = workspace.services.get(PageRecordList);

Expand Down
6 changes: 3 additions & 3 deletions packages/common/infra/src/page/record-list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ export class PageRecordList {
[]
);

public readonly isReady = this.workspace.engine.doc
.docState(this.workspace.id)
.map(state => !state.syncing);
public readonly isReady = this.workspace.engine.rootDocState.map(
state => !state.syncing
);

public record(id: string) {
return this.records.map(record => record.find(record => record.id === id));
Expand Down
22 changes: 21 additions & 1 deletion packages/common/infra/src/storage/kv.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ export interface ByteKV extends ByteKVBehavior {
export interface ByteKVBehavior {
get(key: string): Promise<Uint8Array | null> | Uint8Array | null;
set(key: string, value: Uint8Array): Promise<void> | void;
del(key: string): Promise<void> | void;
keys(): Promise<string[]> | string[];
clear(): Promise<void> | void;
}

export class MemoryByteKV implements ByteKV {
Expand All @@ -17,7 +19,7 @@ export class MemoryByteKV implements ByteKV {

async transaction<T>(cb: (transaction: ByteKVBehavior) => Promise<T>) {
using _lock = await this.lock.acquire();
return cb({
return await cb({
get: async key => {
return this.db.get(key) ?? null;
},
Expand All @@ -27,6 +29,12 @@ export class MemoryByteKV implements ByteKV {
keys: async () => {
return Array.from(this.db.keys());
},
del: async key => {
this.db.delete(key);
},
clear: async () => {
this.db.clear();
},
});
}
get(key: string) {
Expand All @@ -38,6 +46,12 @@ export class MemoryByteKV implements ByteKV {
keys() {
return this.transaction(async tx => tx.keys());
}
clear() {
return this.transaction(async tx => tx.clear());
}
del(key: string) {
return this.transaction(async tx => tx.del(key));
}
}

export class ReadonlyByteKV extends MemoryByteKV implements ByteKV {
Expand All @@ -49,4 +63,10 @@ export class ReadonlyByteKV extends MemoryByteKV implements ByteKV {
override set(_key: string, _value: Uint8Array): Promise<void> {
return Promise.resolve();
}
override del(key: string): Promise<void> {
return super.del(key);
}
override clear(): Promise<void> {
return super.clear();
}
}
18 changes: 18 additions & 0 deletions packages/common/infra/src/storage/memento.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ export interface Memento {
get<T>(key: string): T | null;
watch<T>(key: string): Observable<T | null>;
set<T>(key: string, value: T | null): void;
del(key: string): void;
clear(): void;
keys(): string[];
}

Expand Down Expand Up @@ -58,6 +60,12 @@ export class MemoryMemento implements Memento {
keys(): string[] {
return Array.from(this.data.keys());
}
clear(): void {
this.data.clear();
}
del(key: string): void {
this.data.delete(key);
}
}

export function wrapMemento(memento: Memento, prefix: string): Memento {
Expand All @@ -77,5 +85,15 @@ export function wrapMemento(memento: Memento, prefix: string): Memento {
.filter(k => k.startsWith(prefix))
.map(k => k.slice(prefix.length));
},
clear() {
memento.keys().forEach(k => {
if (k.startsWith(prefix)) {
memento.del(k);
}
});
},
del(key: string): void {
memento.del(prefix + key);
},
};
}
127 changes: 127 additions & 0 deletions packages/common/infra/src/workspace/engine/doc/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
# DocEngine

The synchronization algorithm for yjs docs.

```
┌─────────┐ ┌───────────┐ ┌────────┐
│ Storage ◄──┤ DocEngine ├──► Server │
└─────────┘ └───────────┘ └────────┘
```

# Core Components

## DocStorage

```ts
export interface DocStorage {
eventBus: DocEventBus;
doc: ByteKV;
syncMetadata: ByteKV;
serverClock: ByteKV;
}
```

Represents the local storage used, Specific implementations are replaceable, such as `IndexedDBDocStorage` on the `browser` and `SqliteDocStorage` on the `desktop`.

### DocEventBus

Each `DocStorage` contains a `DocEventBus`, which is used to communicate with other engines that share the same storage.

With `DocEventBus` we can sync updates between engines without connecting to the server.

For example, on the `browser`, we have multiple tabs, all tabs share the same `IndexedDBDocStorage`, so we use `BroadcastChannel` to implement `DocEventBus`, which allows us to broadcast events to all tabs.

On the `desktop` app, if we have multiple Windows sharing the same `SqliteDocStorage`, we must build a mechanism to broadcast events between all Windows (currently not implemented).

## DocServer

```ts
export interface DocServer {
pullDoc(
docId: string,
stateVector: Uint8Array
): Promise<{
data: Uint8Array;
serverClock: number;
stateVector?: Uint8Array;
} | null>;

pushDoc(docId: string, data: Uint8Array): Promise<{ serverClock: number }>;

subscribeAllDocs(cb: (updates: { docId: string; data: Uint8Array; serverClock: number }) => void): Promise<() => void>;

loadServerClock(after: number): Promise<Map<string, number>>;

waitForConnectingServer(signal: AbortSignal): Promise<void>;
disconnectServer(): void;
onInterrupted(cb: (reason: string) => void): void;
}
```

Represents the server we want to synchronize, there is a simulated implementation in `tests/sync.spec.ts`, and the real implementation is in `packages/backend/server`.

### ServerClock

`ServerClock` is a clock generated after each updates is stored in the Server. It is used to determine the order in which updates are stored in the Server.

The `DocEngine` decides whether to pull updates from the server based on the `ServerClock`.

The `ServerClock` written later must be **greater** than all previously. So on the client side, we can use `loadServerClock(the largest ServerClock previously received)` to obtain all changed `ServerClock`.

## DocEngine

The `DocEngine` is where all the synchronization logic actually happens.

Due to the complexity of the implementation, we divide it into 2 parts.

## DocEngine - LocalPart

Synchronizing **the `YDoc` instance** and **storage**.

The typical workflow is:

1. load data from storage, apply to `YDoc` instance.
2. track `YDoc` changes
3. write the changes back to storage.

### SeqNum

There is a `SeqNum` on each Doc data in `Storage`. Every time `LocalPart` writes data, `SeqNum` will be +1.

There is also a `PushedSeqNum`, which is used for RemotePart later.

## DocEngine - RemotePart

Synchronizing `Storage` and `Server`.

The typical workflow is:

1. Connect with the server, Load `ServerClocks` for all docs, Start subscribing to server-side updates.

2. Check whether each doc requires `push` and `pull`

3. Execute all push and pull

4. Listen for updates from `LocalPart` and push the updates to the server

5. Listen for server-side updates and write them to storage.

### PushedSeqNum

Each Doc will record a `PushedSeqNum`, used to determine whether the doc has unpush updates.

After each `push` is completed, `PushedSeqNum` + 1

If `PushedSeqNum` and `SeqNum` are still different after we complete the push (usually means the previous `push` failed)

Then do a full pull and push and set `pushedSeqNum` = `SeqNum`

### PulledServerClock

Each Doc also record `PulledServerClock`, Used to compare with ServerClock to determine whether to `pull` doc.

When the `pull` is completed, set `PulledServerClock` = `ServerClock` returned by the server.

### Retry

The `RemotePart` may fail at any time, and `RemotePart`'s built-in retry mechanism will restart the process in 5 seconds after failure.
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ import { diffUpdate, encodeStateVectorFromUpdate, mergeUpdates } from 'yjs';

import { AsyncLock } from '../../../../utils';
import { DocEngine } from '..';
import { MemoryEventBus } from '../event';
import type { Server } from '../server';
import type { DocServer } from '../server';
import { MemoryStorage } from '../storage';
import { isEmptyUpdate } from '../utils';

Expand All @@ -27,7 +26,7 @@ class MiniServer {
}
}

class MiniServerClient implements Server {
class MiniServerClient implements DocServer {
constructor(
private readonly id: string,
private readonly server: MiniServer
Expand Down Expand Up @@ -107,7 +106,7 @@ class MiniServerClient implements Server {
};
}

async waitingForConnectServer(): Promise<void> {}
async waitForConnectingServer(): Promise<void> {}
disconnectServer(): void {}
onInterrupted(_cb: (reason: string) => void): void {}
}
Expand All @@ -116,11 +115,7 @@ describe('sync', () => {
test('basic sync', async () => {
const storage = new MemoryStorage();
const server = new MiniServer();
const engine = new DocEngine(
storage,
new MemoryEventBus(),
server.client()
).start();
const engine = new DocEngine(storage, server.client()).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
const map = doc.getMap('aaa');
Expand All @@ -136,7 +131,6 @@ describe('sync', () => {
{
const engine = new DocEngine(
new MemoryStorage(),
new MemoryEventBus(),
server.client()
).start();
const doc = new YDoc({ guid: 'a' });
Expand All @@ -149,7 +143,6 @@ describe('sync', () => {
{
const engine = new DocEngine(
new MemoryStorage(),
new MemoryEventBus(),
server.client()
).start();
const doc = new YDoc({ guid: 'a' });
Expand All @@ -165,7 +158,6 @@ describe('sync', () => {
(async () => {
const engine = new DocEngine(
new MemoryStorage(),
new MemoryEventBus(),
server.client()
).start();
const doc = new YDoc({ guid: 'a' });
Expand All @@ -179,7 +171,6 @@ describe('sync', () => {
(async () => {
const engine = new DocEngine(
new MemoryStorage(),
new MemoryEventBus(),
server.client()
).start();
const doc = new YDoc({ guid: 'a' });
Expand All @@ -196,15 +187,10 @@ describe('sync', () => {
test('2 client share storage and eventBus (simulate different tabs in same browser)', async () => {
const server = new MiniServer();
const storage = new MemoryStorage();
const eventBus = new MemoryEventBus();

await Promise.all([
(async () => {
const engine = new DocEngine(
storage,
eventBus,
server.client()
).start();
const engine = new DocEngine(storage, server.client()).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);

Expand All @@ -213,11 +199,7 @@ describe('sync', () => {
await vitest.waitUntil(() => map.get('b') === 2);
})(),
(async () => {
const engine = new DocEngine(
storage,
eventBus,
server.client()
).start();
const engine = new DocEngine(storage, server.client()).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);
const map = doc.getMap('aaa');
Expand All @@ -240,11 +222,7 @@ describe('sync', () => {
await storage.doc.set('a', encodeStateAsUpdate(doc));
}

const engine = new DocEngine(
storage,
new MemoryEventBus(),
server.client()
).start();
const engine = new DocEngine(storage, server.client()).start();
const doc = new YDoc({ guid: 'a' });
engine.addDoc(doc);

Expand Down
Loading

0 comments on commit 6aaf83a

Please sign in to comment.