Skip to content

Commit 6f9f579

Browse files
committed
feat(server): make an abstraction for ydoc storage (#7901)
1 parent 682a01e commit 6f9f579

File tree

5 files changed

+306
-0
lines changed

5 files changed

+306
-0
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { Connection } from './connection';
2+
3+
export interface BlobStorageOptions {}
4+
5+
export interface Blob {
6+
key: string;
7+
bin: Uint8Array;
8+
mimeType: string;
9+
}
10+
11+
export abstract class BlobStorageAdapter extends Connection {
12+
abstract getBlob(spaceId: string, key: string): Promise<Blob | null>;
13+
abstract setBlob(spaceId: string, blob: Blob): Promise<string>;
14+
abstract deleteBlob(spaceId: string, key: string): Promise<boolean>;
15+
abstract listBlobs(spaceId: string): Promise<Blob>;
16+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
export class Connection {
2+
protected connected: boolean = false;
3+
connect(): Promise<void> {
4+
this.connected = true;
5+
return Promise.resolve();
6+
}
7+
disconnect(): Promise<void> {
8+
this.connected = false;
9+
return Promise.resolve();
10+
}
11+
}
Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import {
2+
applyUpdate,
3+
Doc,
4+
encodeStateAsUpdate,
5+
encodeStateVector,
6+
mergeUpdates,
7+
UndoManager,
8+
} from 'yjs';
9+
10+
import { CallTimer } from '../../../fundamentals';
11+
import { Connection } from './connection';
12+
import { SingletonLocker } from './lock';
13+
14+
export interface DocRecord {
15+
spaceId: string;
16+
docId: string;
17+
bin: Uint8Array;
18+
timestamp: number;
19+
}
20+
21+
export interface DocUpdate {
22+
bin: Uint8Array;
23+
timestamp: number;
24+
}
25+
26+
export interface HistoryFilter {
27+
before?: number;
28+
limit?: number;
29+
}
30+
31+
export interface DocStorageOptions {
32+
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
33+
}
34+
35+
export abstract class DocStorageAdapter extends Connection {
36+
private readonly locker = new SingletonLocker();
37+
38+
constructor(
39+
protected readonly options: DocStorageOptions = {
40+
mergeUpdates,
41+
}
42+
) {
43+
super();
44+
}
45+
46+
// open apis
47+
isEmptyBin(bin: Uint8Array): boolean {
48+
return (
49+
bin.length === 0 ||
50+
// 0x0 for state vector
51+
(bin.length === 1 && bin[0] === 0) ||
52+
// 0x00 for update
53+
(bin.length === 2 && bin[0] === 0 && bin[1] === 0)
54+
);
55+
}
56+
57+
async getDoc(spaceId: string, docId: string): Promise<DocRecord | null> {
58+
await using _lock = await this.lockDocForUpdate(spaceId, docId);
59+
60+
const snapshot = await this.getDocSnapshot(spaceId, docId);
61+
const updates = await this.getDocUpdates(spaceId, docId);
62+
63+
if (updates.length) {
64+
const { timestamp, bin } = await this.squash(
65+
snapshot ? [snapshot, ...updates] : updates
66+
);
67+
68+
const newSnapshot = {
69+
spaceId: spaceId,
70+
docId,
71+
bin,
72+
timestamp,
73+
};
74+
75+
const success = await this.setDocSnapshot(newSnapshot);
76+
77+
// if there is old snapshot, create a new history record
78+
if (success && snapshot) {
79+
await this.createDocHistory(snapshot);
80+
}
81+
82+
// always mark updates as merged unless throws
83+
await this.markUpdatesMerged(spaceId, docId, updates);
84+
85+
return newSnapshot;
86+
}
87+
88+
return snapshot;
89+
}
90+
91+
abstract pushDocUpdates(
92+
spaceId: string,
93+
docId: string,
94+
updates: Uint8Array[]
95+
): Promise<number>;
96+
97+
abstract deleteDoc(spaceId: string, docId: string): Promise<void>;
98+
abstract deleteSpace(spaceId: string): Promise<void>;
99+
async rollbackDoc(
100+
spaceId: string,
101+
docId: string,
102+
timestamp: number
103+
): Promise<void> {
104+
await using _lock = await this.lockDocForUpdate(spaceId, docId);
105+
const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp);
106+
if (!toSnapshot) {
107+
throw new Error('Can not find the version to rollback to.');
108+
}
109+
110+
const fromSnapshot = await this.getDocSnapshot(spaceId, docId);
111+
112+
if (!fromSnapshot) {
113+
throw new Error('Can not find the current version of the doc.');
114+
}
115+
116+
const change = this.generateChangeUpdate(fromSnapshot.bin, toSnapshot.bin);
117+
await this.pushDocUpdates(spaceId, docId, [change]);
118+
// force create a new history record after rollback
119+
await this.createDocHistory(fromSnapshot, true);
120+
}
121+
122+
abstract getSpaceDocTimestamps(
123+
spaceId: string,
124+
after?: number
125+
): Promise<Record<string, number> | null>;
126+
abstract listDocHistories(
127+
spaceId: string,
128+
docId: string,
129+
query: { skip?: number; limit?: number }
130+
): Promise<number[]>;
131+
abstract getDocHistory(
132+
spaceId: string,
133+
docId: string,
134+
timestamp: number
135+
): Promise<DocRecord | null>;
136+
137+
// api for internal usage
138+
protected abstract getDocSnapshot(
139+
spaceId: string,
140+
docId: string
141+
): Promise<DocRecord | null>;
142+
protected abstract setDocSnapshot(snapshot: DocRecord): Promise<boolean>;
143+
protected abstract getDocUpdates(
144+
spaceId: string,
145+
docId: string
146+
): Promise<DocUpdate[]>;
147+
protected abstract markUpdatesMerged(
148+
spaceId: string,
149+
docId: string,
150+
updates: DocUpdate[]
151+
): Promise<number>;
152+
153+
protected abstract createDocHistory(
154+
snapshot: DocRecord,
155+
force?: boolean
156+
): Promise<boolean>;
157+
158+
@CallTimer('doc', 'squash')
159+
protected async squash(updates: DocUpdate[]): Promise<DocUpdate> {
160+
const merge = this.options?.mergeUpdates ?? mergeUpdates;
161+
const lastUpdate = updates.at(-1);
162+
if (!lastUpdate) {
163+
throw new Error('No updates to be squashed.');
164+
}
165+
166+
// fast return
167+
if (updates.length === 1) {
168+
return lastUpdate;
169+
}
170+
171+
const finalUpdate = await merge(updates.map(u => u.bin));
172+
173+
return {
174+
bin: finalUpdate,
175+
timestamp: lastUpdate.timestamp,
176+
};
177+
}
178+
179+
protected async lockDocForUpdate(
180+
spaceId: string,
181+
docId: string
182+
): Promise<AsyncDisposable> {
183+
return this.locker.lock(`workspace:${spaceId}:update`, docId);
184+
}
185+
186+
protected generateChangeUpdate(newerBin: Uint8Array, olderBin: Uint8Array) {
187+
const newerDoc = new Doc();
188+
applyUpdate(newerDoc, newerBin);
189+
const olderDoc = new Doc();
190+
applyUpdate(olderDoc, olderBin);
191+
192+
const newerState = encodeStateVector(newerDoc);
193+
const olderState = encodeStateVector(olderDoc);
194+
195+
const diff = encodeStateAsUpdate(newerDoc, olderState);
196+
197+
const undoManager = new UndoManager(Array.from(newerDoc.share.values()));
198+
199+
applyUpdate(olderDoc, diff);
200+
201+
undoManager.undo();
202+
203+
return encodeStateAsUpdate(olderDoc, newerState);
204+
}
205+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
// TODO(@forehalo): share with frontend
2+
import type { BlobStorageAdapter } from './blob';
3+
import { Connection } from './connection';
4+
import type { DocStorageAdapter } from './doc';
5+
6+
export class SpaceStorage extends Connection {
7+
constructor(
8+
public readonly doc: DocStorageAdapter,
9+
public readonly blob: BlobStorageAdapter
10+
) {
11+
super();
12+
}
13+
14+
override async connect() {
15+
await this.doc.connect();
16+
await this.blob.connect();
17+
}
18+
19+
override async disconnect() {
20+
await this.doc.disconnect();
21+
await this.blob.disconnect();
22+
}
23+
}
24+
25+
export { BlobStorageAdapter, type BlobStorageOptions } from './blob';
26+
export {
27+
type DocRecord,
28+
DocStorageAdapter,
29+
type DocStorageOptions,
30+
type DocUpdate,
31+
type HistoryFilter,
32+
} from './doc';
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
export interface Locker {
2+
lock(domain: string, resource: string): Promise<Lock>;
3+
}
4+
5+
export class SingletonLocker implements Locker {
6+
lockedResource = new Map<string, Lock>();
7+
constructor() {}
8+
9+
async lock(domain: string, resource: string) {
10+
let lock = this.lockedResource.get(`${domain}:${resource}`);
11+
12+
if (!lock) {
13+
lock = new Lock();
14+
}
15+
16+
await lock.acquire();
17+
18+
return lock;
19+
}
20+
}
21+
22+
export class Lock {
23+
private inner: Promise<void> = Promise.resolve();
24+
private release: () => void = () => {};
25+
26+
async acquire() {
27+
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
28+
let release: () => void = null!;
29+
const nextLock = new Promise<void>(resolve => {
30+
release = resolve;
31+
});
32+
33+
await this.inner;
34+
this.inner = nextLock;
35+
this.release = release;
36+
}
37+
38+
[Symbol.asyncDispose]() {
39+
this.release();
40+
return Promise.resolve();
41+
}
42+
}

0 commit comments

Comments
 (0)