Skip to content

Commit d93d39e

Browse files
committed
feat: add editor record (#7938)
fix CLOUD-58, CLOUD-61, CLOUD-62, PD-1607, PD-1608
1 parent d9cedf8 commit d93d39e

File tree

33 files changed

+622
-55
lines changed

33 files changed

+622
-55
lines changed
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
-- AlterTable
2+
ALTER TABLE "snapshot_histories" ADD COLUMN "created_by" VARCHAR;
3+
4+
-- AlterTable
5+
ALTER TABLE "snapshots" ADD COLUMN "created_by" VARCHAR,
6+
ADD COLUMN "updated_by" VARCHAR;
7+
8+
-- AlterTable
9+
ALTER TABLE "updates" ADD COLUMN "created_by" VARCHAR DEFAULT 'system';
10+
11+
-- AddForeignKey
12+
ALTER TABLE "snapshots" ADD CONSTRAINT "snapshots_created_by_fkey" FOREIGN KEY ("created_by") REFERENCES "users"("id") ON DELETE SET NULL ON UPDATE CASCADE;
13+
14+
-- AddForeignKey
15+
ALTER TABLE "snapshots" ADD CONSTRAINT "snapshots_updated_by_fkey" FOREIGN KEY ("updated_by") REFERENCES "users"("id") ON DELETE SET NULL ON UPDATE CASCADE;
16+
17+
-- AddForeignKey
18+
ALTER TABLE "updates" ADD CONSTRAINT "updates_created_by_fkey" FOREIGN KEY ("created_by") REFERENCES "users"("id") ON DELETE SET NULL ON UPDATE CASCADE;
19+
20+
-- AddForeignKey
21+
ALTER TABLE "snapshot_histories" ADD CONSTRAINT "snapshot_histories_created_by_fkey" FOREIGN KEY ("created_by") REFERENCES "users"("id") ON DELETE SET NULL ON UPDATE CASCADE;

packages/backend/server/schema.prisma

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ model User {
3333
aiSessions AiSession[]
3434
updatedRuntimeConfigs RuntimeConfig[]
3535
userSnapshots UserSnapshot[]
36+
createdSnapshot Snapshot[] @relation("createdSnapshot")
37+
updatedSnapshot Snapshot[] @relation("updatedSnapshot")
38+
createdUpdate Update[] @relation("createdUpdate")
39+
createdHistory SnapshotHistory[] @relation("createdHistory")
3640
3741
@@index([email])
3842
@@map("users")
@@ -241,9 +245,16 @@ model Snapshot {
241245
// the `updated_at` field will not record the time of record changed,
242246
// but the created time of last seen update that has been merged into snapshot.
243247
updatedAt DateTime @map("updated_at") @db.Timestamptz(3)
248+
createdBy String? @map("created_by") @db.VarChar
249+
updatedBy String? @map("updated_by") @db.VarChar
250+
251+
// should not delete origin snapshot even if user is deleted
252+
// we only delete the snapshot if the workspace is deleted
253+
createdByUser User? @relation(name: "createdSnapshot", fields: [createdBy], references: [id], onDelete: SetNull)
254+
updatedByUser User? @relation(name: "updatedSnapshot", fields: [updatedBy], references: [id], onDelete: SetNull)
244255
245256
// @deprecated use updatedAt only
246-
seq Int? @default(0) @db.Integer
257+
seq Int? @default(0) @db.Integer
247258
248259
// we need to clear all hanging updates and snapshots before enable the foreign key on workspaceId
249260
// workspace Workspace @relation(fields: [workspaceId], references: [id], onDelete: Cascade)
@@ -274,9 +285,14 @@ model Update {
274285
id String @map("guid") @db.VarChar
275286
blob Bytes @db.ByteA
276287
createdAt DateTime @map("created_at") @db.Timestamptz(3)
288+
// TODO(@darkskygit): fullfill old update, remove default value in next release
289+
createdBy String? @default("system") @map("created_by") @db.VarChar
290+
291+
// will delete createor record if createor's account is deleted
292+
createdByUser User? @relation(name: "createdUpdate", fields: [createdBy], references: [id], onDelete: SetNull)
277293
278294
// @deprecated use createdAt only
279-
seq Int? @db.Integer
295+
seq Int? @db.Integer
280296
281297
@@id([workspaceId, id, createdAt])
282298
@@map("updates")
@@ -289,6 +305,10 @@ model SnapshotHistory {
289305
blob Bytes @db.ByteA
290306
state Bytes? @db.ByteA
291307
expiredAt DateTime @map("expired_at") @db.Timestamptz(3)
308+
createdBy String? @map("created_by") @db.VarChar
309+
310+
// will delete createor record if creator's account is deleted
311+
createdByUser User? @relation(name: "createdHistory", fields: [createdBy], references: [id], onDelete: SetNull)
292312
293313
@@id([workspaceId, id, timestamp])
294314
@@map("snapshot_histories")

packages/backend/server/src/core/doc/adapters/userspace.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,12 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
4545
return this.getDocSnapshot(spaceId, docId);
4646
}
4747

48-
async pushDocUpdates(userId: string, docId: string, updates: Uint8Array[]) {
48+
async pushDocUpdates(
49+
userId: string,
50+
docId: string,
51+
updates: Uint8Array[],
52+
editorId?: string
53+
) {
4954
if (!updates.length) {
5055
return 0;
5156
}
@@ -67,6 +72,7 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
6772
docId,
6873
bin,
6974
timestamp,
75+
editor: editorId,
7076
});
7177

7278
return timestamp;
@@ -135,6 +141,7 @@ export class PgUserspaceDocStorageAdapter extends DocStorageAdapter {
135141
docId,
136142
bin: snapshot.blob,
137143
timestamp: snapshot.updatedAt.getTime(),
144+
editor: snapshot.userId,
138145
};
139146
}
140147

packages/backend/server/src/core/doc/adapters/workspace.ts

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,8 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
3838
async pushDocUpdates(
3939
workspaceId: string,
4040
docId: string,
41-
updates: Uint8Array[]
41+
updates: Uint8Array[],
42+
editorId?: string
4243
) {
4344
if (!updates.length) {
4445
return 0;
@@ -82,6 +83,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
8283
blob: Buffer.from(update),
8384
seq,
8485
createdAt: new Date(createdAt),
86+
createdBy: editorId || null,
8587
};
8688
}),
8789
});
@@ -113,6 +115,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
113115
return rows.map(row => ({
114116
bin: row.blob,
115117
timestamp: row.createdAt.getTime(),
118+
editor: row.createdBy || undefined,
116119
}));
117120
}
118121

@@ -216,6 +219,12 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
216219
const histories = await this.db.snapshotHistory.findMany({
217220
select: {
218221
timestamp: true,
222+
createdByUser: {
223+
select: {
224+
name: true,
225+
avatarUrl: true,
226+
},
227+
},
219228
},
220229
where: {
221230
workspaceId,
@@ -230,7 +239,10 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
230239
take: query.limit,
231240
});
232241

233-
return histories.map(h => h.timestamp.getTime());
242+
return histories.map(h => ({
243+
timestamp: h.timestamp.getTime(),
244+
editor: h.createdByUser,
245+
}));
234246
}
235247

236248
async getDocHistory(workspaceId: string, docId: string, timestamp: number) {
@@ -253,13 +265,15 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
253265
docId,
254266
bin: history.blob,
255267
timestamp,
268+
editor: history.createdBy || undefined,
256269
};
257270
}
258271

259272
override async rollbackDoc(
260273
spaceId: string,
261274
docId: string,
262-
timestamp: number
275+
timestamp: number,
276+
editorId?: string
263277
): Promise<void> {
264278
await using _lock = await this.lockDocForUpdate(spaceId, docId);
265279
const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp);
@@ -274,7 +288,14 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
274288
}
275289

276290
// force create a new history record after rollback
277-
await this.createDocHistory(fromSnapshot, true);
291+
await this.createDocHistory(
292+
{
293+
...fromSnapshot,
294+
// override the editor to the one who requested the rollback
295+
editor: editorId,
296+
},
297+
true
298+
);
278299
// WARN:
279300
// we should never do the snapshot updating in recovering,
280301
// which is not the solution in CRDT.
@@ -331,6 +352,7 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
331352
id: snapshot.docId,
332353
timestamp: new Date(snapshot.timestamp),
333354
blob: Buffer.from(snapshot.bin),
355+
createdBy: snapshot.editor,
334356
expiredAt: new Date(
335357
Date.now() + (await this.options.historyMaxAge(snapshot.spaceId))
336358
),
@@ -374,6 +396,8 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
374396
docId,
375397
bin: snapshot.blob,
376398
timestamp: snapshot.updatedAt.getTime(),
399+
// creator and editor may null if their account is deleted
400+
editor: snapshot.updatedBy || snapshot.createdBy || undefined,
377401
};
378402
}
379403

@@ -396,10 +420,10 @@ export class PgWorkspaceDocStorageAdapter extends DocStorageAdapter {
396420
// ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
397421
try {
398422
const result: { updatedAt: Date }[] = await this.db.$queryRaw`
399-
INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "created_at", "updated_at")
400-
VALUES (${spaceId}, ${docId}, ${bin}, DEFAULT, ${updatedAt})
423+
INSERT INTO "snapshots" ("workspace_id", "guid", "blob", "created_at", "updated_at", "created_by", "updated_by")
424+
VALUES (${spaceId}, ${docId}, ${bin}, DEFAULT, ${updatedAt}, ${snapshot.editor}, ${snapshot.editor})
401425
ON CONFLICT ("workspace_id", "guid")
402-
DO UPDATE SET "blob" = ${bin}, "updated_at" = ${updatedAt}
426+
DO UPDATE SET "blob" = ${bin}, "updated_at" = ${updatedAt}, "updated_by" = ${snapshot.editor}
403427
WHERE "snapshots"."workspace_id" = ${spaceId} AND "snapshots"."guid" = ${docId} AND "snapshots"."updated_at" <= ${updatedAt}
404428
RETURNING "snapshots"."workspace_id" as "workspaceId", "snapshots"."guid" as "id", "snapshots"."updated_at" as "updatedAt"
405429
`;

packages/backend/server/src/core/doc/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,4 +22,4 @@ import { DocStorageOptions } from './options';
2222
export class DocStorageModule {}
2323
export { PgUserspaceDocStorageAdapter, PgWorkspaceDocStorageAdapter };
2424

25-
export { DocStorageAdapter } from './storage';
25+
export { DocStorageAdapter, type Editor } from './storage';

packages/backend/server/src/core/doc/storage/doc.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,25 @@ export interface DocRecord {
1616
docId: string;
1717
bin: Uint8Array;
1818
timestamp: number;
19+
editor?: string;
1920
}
2021

2122
export interface DocUpdate {
2223
bin: Uint8Array;
2324
timestamp: number;
25+
editor?: string;
2426
}
2527

2628
export interface HistoryFilter {
2729
before?: number;
2830
limit?: number;
2931
}
3032

33+
export interface Editor {
34+
name: string;
35+
avatarUrl: string | null;
36+
}
37+
3138
export interface DocStorageOptions {
3239
mergeUpdates?: (updates: Uint8Array[]) => Promise<Uint8Array> | Uint8Array;
3340
}
@@ -61,7 +68,7 @@ export abstract class DocStorageAdapter extends Connection {
6168
const updates = await this.getDocUpdates(spaceId, docId);
6269

6370
if (updates.length) {
64-
const { timestamp, bin } = await this.squash(
71+
const { timestamp, bin, editor } = await this.squash(
6572
snapshot ? [snapshot, ...updates] : updates
6673
);
6774

@@ -70,6 +77,7 @@ export abstract class DocStorageAdapter extends Connection {
7077
docId,
7178
bin,
7279
timestamp,
80+
editor,
7381
};
7482

7583
const success = await this.setDocSnapshot(newSnapshot);
@@ -91,15 +99,17 @@ export abstract class DocStorageAdapter extends Connection {
9199
abstract pushDocUpdates(
92100
spaceId: string,
93101
docId: string,
94-
updates: Uint8Array[]
102+
updates: Uint8Array[],
103+
editorId?: string
95104
): Promise<number>;
96105

97106
abstract deleteDoc(spaceId: string, docId: string): Promise<void>;
98107
abstract deleteSpace(spaceId: string): Promise<void>;
99108
async rollbackDoc(
100109
spaceId: string,
101110
docId: string,
102-
timestamp: number
111+
timestamp: number,
112+
editorId?: string
103113
): Promise<void> {
104114
await using _lock = await this.lockDocForUpdate(spaceId, docId);
105115
const toSnapshot = await this.getDocHistory(spaceId, docId, timestamp);
@@ -114,7 +124,7 @@ export abstract class DocStorageAdapter extends Connection {
114124
}
115125

116126
const change = this.generateChangeUpdate(fromSnapshot.bin, toSnapshot.bin);
117-
await this.pushDocUpdates(spaceId, docId, [change]);
127+
await this.pushDocUpdates(spaceId, docId, [change], editorId);
118128
// force create a new history record after rollback
119129
await this.createDocHistory(fromSnapshot, true);
120130
}
@@ -127,7 +137,7 @@ export abstract class DocStorageAdapter extends Connection {
127137
spaceId: string,
128138
docId: string,
129139
query: { skip?: number; limit?: number }
130-
): Promise<number[]>;
140+
): Promise<{ timestamp: number; editor: Editor | null }[]>;
131141
abstract getDocHistory(
132142
spaceId: string,
133143
docId: string,
@@ -173,6 +183,7 @@ export abstract class DocStorageAdapter extends Connection {
173183
return {
174184
bin: finalUpdate,
175185
timestamp: lastUpdate.timestamp,
186+
editor: lastUpdate.editor,
176187
};
177188
}
178189

packages/backend/server/src/core/doc/storage/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,6 @@ export {
2828
DocStorageAdapter,
2929
type DocStorageOptions,
3030
type DocUpdate,
31+
type Editor,
3132
type HistoryFilter,
3233
} from './doc';

packages/backend/server/src/core/sync/gateway.ts

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -264,9 +264,11 @@ export class SpaceSyncGateway
264264
};
265265
}
266266

267+
@Auth()
267268
@SubscribeMessage('space:push-doc-updates')
268269
async onReceiveDocUpdates(
269270
@ConnectedSocket() client: Socket,
271+
@CurrentUser() user: CurrentUser,
270272
@MessageBody()
271273
message: PushDocUpdatesMessage
272274
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
@@ -277,7 +279,8 @@ export class SpaceSyncGateway
277279
const timestamp = await adapter.push(
278280
spaceId,
279281
docId,
280-
updates.map(update => Buffer.from(update, 'base64'))
282+
updates.map(update => Buffer.from(update, 'base64')),
283+
user.id
281284
);
282285

283286
// could be put in [adapter.push]
@@ -448,8 +451,10 @@ export class SpaceSyncGateway
448451
});
449452
}
450453

454+
@Auth()
451455
@SubscribeMessage('client-update-v2')
452456
async handleClientUpdateV2(
457+
@CurrentUser() user: CurrentUser,
453458
@MessageBody()
454459
{
455460
workspaceId,
@@ -462,7 +467,7 @@ export class SpaceSyncGateway
462467
},
463468
@ConnectedSocket() client: Socket
464469
): Promise<EventResponse<{ accepted: true; timestamp?: number }>> {
465-
return this.onReceiveDocUpdates(client, {
470+
return this.onReceiveDocUpdates(client, user, {
466471
spaceType: SpaceType.Workspace,
467472
spaceId: workspaceId,
468473
docId: guid,
@@ -596,9 +601,9 @@ abstract class SyncSocketAdapter {
596601
permission?: Permission
597602
): Promise<void>;
598603

599-
push(spaceId: string, docId: string, updates: Buffer[]) {
604+
push(spaceId: string, docId: string, updates: Buffer[], editorId: string) {
600605
this.assertIn(spaceId);
601-
return this.storage.pushDocUpdates(spaceId, docId, updates);
606+
return this.storage.pushDocUpdates(spaceId, docId, updates, editorId);
602607
}
603608

604609
get(spaceId: string, docId: string) {
@@ -621,9 +626,14 @@ class WorkspaceSyncAdapter extends SyncSocketAdapter {
621626
super(SpaceType.Workspace, client, storage);
622627
}
623628

624-
override push(spaceId: string, docId: string, updates: Buffer[]) {
629+
override push(
630+
spaceId: string,
631+
docId: string,
632+
updates: Buffer[],
633+
editorId: string
634+
) {
625635
const id = new DocID(docId, spaceId);
626-
return super.push(spaceId, id.guid, updates);
636+
return super.push(spaceId, id.guid, updates, editorId);
627637
}
628638

629639
override get(spaceId: string, docId: string) {

0 commit comments

Comments
 (0)