Skip to content

Commit

Permalink
feat(server): add timestamp in load doc event (#6248)
Browse files Browse the repository at this point in the history
  • Loading branch information
forehalo authored and EYHN committed Mar 22, 2024
1 parent 1f91486 commit 4d6a19c
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 45 deletions.
73 changes: 54 additions & 19 deletions packages/backend/server/src/core/doc/manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,16 @@ export function isEmptyBuffer(buf: Buffer): boolean {
const MAX_SEQ_NUM = 0x3fffffff; // u31
const UPDATES_QUEUE_CACHE_KEY = 'doc:manager:updates';

interface DocResponse {
doc: Doc;
timestamp: number;
}

interface BinaryResponse {
binary: Buffer;
timestamp: number;
}

/**
* Since we can't directly save all client updates into database, in which way the database will overload,
* we need to buffer the updates and merge them to reduce db write.
Expand Down Expand Up @@ -332,8 +342,8 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
/**
* Get latest timestamp of all docs in the workspace.
*/
@CallTimer('doc', 'get_stats')
async getStats(workspaceId: string, after: number | undefined = 0) {
@CallTimer('doc', 'get_doc_timestamps')
async getDocTimestamps(workspaceId: string, after: number | undefined = 0) {
const snapshots = await this.db.snapshot.findMany({
where: {
workspaceId,
Expand Down Expand Up @@ -378,13 +388,18 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
/**
* get the latest doc with all update applied.
*/
async get(workspaceId: string, guid: string): Promise<Doc | null> {
async get(workspaceId: string, guid: string): Promise<DocResponse | null> {
const result = await this._get(workspaceId, guid);
if (result) {
if ('doc' in result) {
return result.doc;
return result;
} else if ('snapshot' in result) {
return this.recoverDoc(result.snapshot);
const doc = await this.recoverDoc(result.binary);

return {
doc,
timestamp: result.timestamp,
};
}
}

Expand All @@ -394,13 +409,19 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
/**
* get the latest doc binary with all update applied.
*/
async getBinary(workspaceId: string, guid: string): Promise<Buffer | null> {
async getBinary(
workspaceId: string,
guid: string
): Promise<BinaryResponse | null> {
const result = await this._get(workspaceId, guid);
if (result) {
if ('doc' in result) {
return Buffer.from(encodeStateAsUpdate(result.doc));
return {
binary: Buffer.from(encodeStateAsUpdate(result.doc)),
timestamp: result.timestamp,
};
} else if ('snapshot' in result) {
return result.snapshot;
return result;
}
}

Expand All @@ -410,16 +431,27 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
/**
* get the latest doc state vector with all update applied.
*/
async getState(workspaceId: string, guid: string): Promise<Buffer | null> {
async getDocState(
workspaceId: string,
guid: string
): Promise<BinaryResponse | null> {
const snapshot = await this.getSnapshot(workspaceId, guid);
const updates = await this.getUpdates(workspaceId, guid);

if (updates.length) {
const doc = await this.squash(snapshot, updates);
return Buffer.from(encodeStateVector(doc));
const { doc, timestamp } = await this.squash(snapshot, updates);
return {
binary: Buffer.from(encodeStateVector(doc)),
timestamp,
};
}

return snapshot ? snapshot.state : null;
return snapshot?.state
? {
binary: snapshot.state,
timestamp: snapshot.updatedAt.getTime(),
}
: null;
}

/**
Expand Down Expand Up @@ -587,25 +619,28 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
private async _get(
workspaceId: string,
guid: string
): Promise<{ doc: Doc } | { snapshot: Buffer } | null> {
): Promise<DocResponse | BinaryResponse | null> {
const snapshot = await this.getSnapshot(workspaceId, guid);
const updates = await this.getUpdates(workspaceId, guid);

if (updates.length) {
return {
doc: await this.squash(snapshot, updates),
};
return this.squash(snapshot, updates);
}

return snapshot ? { snapshot: snapshot.blob } : null;
return snapshot
? { binary: snapshot.blob, timestamp: snapshot.updatedAt.getTime() }
: null;
}

/**
* Squash updates into a single update and save it as snapshot,
* and delete the updates records at the same time.
*/
@CallTimer('doc', 'squash')
private async squash(snapshot: Snapshot | null, updates: Update[]) {
private async squash(
snapshot: Snapshot | null,
updates: Update[]
): Promise<DocResponse> {
if (!updates.length) {
throw new Error('No updates to squash');
}
Expand Down Expand Up @@ -664,7 +699,7 @@ export class DocManager implements OnModuleInit, OnModuleDestroy {
await this.updateCachedUpdatesCount(workspaceId, id, -count);
}

return doc;
return { doc, timestamp: last.createdAt.getTime() };
}

private async getUpdateSeq(workspaceId: string, guid: string, batch = 1) {
Expand Down
15 changes: 9 additions & 6 deletions packages/backend/server/src/core/sync/events/events.gateway.ts
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,10 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
): Promise<EventResponse<Record<string, number>>> {
this.assertInWorkspace(client, Sync(workspaceId));

const stats = await this.docManager.getStats(workspaceId, timestamp);
const stats = await this.docManager.getDocTimestamps(
workspaceId,
timestamp
);

return {
data: stats,
Expand Down Expand Up @@ -308,27 +311,27 @@ export class EventsGateway implements OnGatewayConnection, OnGatewayDisconnect {
this.assertInWorkspace(client, Sync(workspaceId));

const docId = new DocID(guid, workspaceId);
const doc = await this.docManager.get(docId.workspace, docId.guid);
const res = await this.docManager.get(docId.workspace, docId.guid);

if (!doc) {
if (!res) {
return {
error: new DocNotFoundError(workspaceId, docId.guid),
};
}

const missing = Buffer.from(
encodeStateAsUpdate(
doc,
res.doc,
stateVector ? Buffer.from(stateVector, 'base64') : undefined
)
).toString('base64');
const state = Buffer.from(encodeStateVector(doc)).toString('base64');
const state = Buffer.from(encodeStateVector(res.doc)).toString('base64');

return {
data: {
missing,
state,
timestamp: Date.now(),
timestamp: res.timestamp,
},
};
}
Expand Down
19 changes: 13 additions & 6 deletions packages/backend/server/src/core/workspaces/controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ export class WorkspacesController {
// metadata should always exists if body is not null
if (metadata) {
res.setHeader('content-type', metadata.contentType);
res.setHeader('last-modified', metadata.lastModified.toISOString());
res.setHeader('last-modified', metadata.lastModified.toUTCString());
res.setHeader('content-length', metadata.contentLength);
} else {
this.logger.warn(`Blob ${workspaceId}/${name} has no metadata`);
Expand Down Expand Up @@ -83,9 +83,12 @@ export class WorkspacesController {
throw new ForbiddenException('Permission denied');
}

const update = await this.docManager.getBinary(docId.workspace, docId.guid);
const binResponse = await this.docManager.getBinary(
docId.workspace,
docId.guid
);

if (!update) {
if (!binResponse) {
throw new NotFoundException('Doc not found');
}

Expand All @@ -106,8 +109,12 @@ export class WorkspacesController {
}

res.setHeader('content-type', 'application/octet-stream');
res.setHeader('cache-control', 'no-cache');
res.send(update);
res.setHeader(
'last-modified',
new Date(binResponse.timestamp).toUTCString()
);
res.setHeader('cache-control', 'private, max-age=2592000');
res.send(binResponse.binary);
}

@Get('/:id/docs/:guid/histories/:timestamp')
Expand Down Expand Up @@ -142,7 +149,7 @@ export class WorkspacesController {

if (history) {
res.setHeader('content-type', 'application/octet-stream');
res.setHeader('cache-control', 'public, max-age=2592000, immutable');
res.setHeader('cache-control', 'private, max-age=2592000, immutable');
res.send(history.blob);
} else {
throw new NotFoundException('Doc history not found');
Expand Down
29 changes: 15 additions & 14 deletions packages/backend/server/tests/doc.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ test('should merge update when intervel due', async t => {
await manager.autoSquash();

t.deepEqual(
(await manager.getBinary(ws.id, '1'))?.toString('hex'),
(await manager.getBinary(ws.id, '1'))?.binary.toString('hex'),
Buffer.from(update.buffer).toString('hex')
);

Expand All @@ -150,7 +150,7 @@ test('should merge update when intervel due', async t => {
await manager.autoSquash();

t.deepEqual(
(await manager.getBinary(ws.id, '1'))?.toString('hex'),
(await manager.getBinary(ws.id, '1'))?.binary.toString('hex'),
Buffer.from(encodeStateAsUpdate(doc)).toString('hex')
);
});
Expand Down Expand Up @@ -275,20 +275,21 @@ test('should throw if meet max retry times', async t => {
test('should be able to insert the snapshot if it is new created', async t => {
const manager = m.get(DocManager);

const doc = new YDoc();
const text = doc.getText('content');
text.insert(0, 'hello');
const update = encodeStateAsUpdate(doc);

await manager.push('1', '1', Buffer.from(update));
{
const doc = new YDoc();
const text = doc.getText('content');
text.insert(0, 'hello');
const update = encodeStateAsUpdate(doc);

await manager.push('1', '1', Buffer.from(update));
}
const updates = await manager.getUpdates('1', '1');
t.is(updates.length, 1);
// @ts-expect-error private
const snapshot = await manager.squash(null, updates);
const { doc } = await manager.squash(null, updates);

t.truthy(snapshot);
t.is(snapshot.getText('content').toString(), 'hello');
t.truthy(doc);
t.is(doc.getText('content').toString(), 'hello');

const restUpdates = await manager.getUpdates('1', '1');

Expand All @@ -315,14 +316,14 @@ test('should be able to merge updates into snapshot', async t => {
{
await manager.batchPush('1', '1', updates.slice(0, 2));
// do the merge
const doc = (await manager.get('1', '1'))!;
const { doc } = (await manager.get('1', '1'))!;

t.is(doc.getText('content').toString(), 'helloworld');
}

{
await manager.batchPush('1', '1', updates.slice(2));
const doc = (await manager.get('1', '1'))!;
const { doc } = (await manager.get('1', '1'))!;

t.is(doc.getText('content').toString(), 'hello world!');
}
Expand Down Expand Up @@ -372,7 +373,7 @@ test('should not update snapshot if doc is outdated', async t => {
const updateRecords = await manager.getUpdates('2', '1');

// @ts-expect-error private
const doc = await manager.squash(snapshot, updateRecords);
const { doc } = await manager.squash(snapshot, updateRecords);

// all updated will merged into doc not matter it's timestamp is outdated or not,
// but the snapshot record will not be updated
Expand Down

0 comments on commit 4d6a19c

Please sign in to comment.