Skip to content

Commit

Permalink
fix(infra): fix sync issues on old ids (#6474)
Browse files Browse the repository at this point in the history
  • Loading branch information
EYHN committed Apr 9, 2024
1 parent 142896c commit 97c4ae4
Showing 1 changed file with 63 additions and 9 deletions.
72 changes: 63 additions & 9 deletions packages/common/infra/src/workspace/engine/doc/remote.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,10 @@ export class DocEngineRemotePart {
await this.jobs.pullAndPush(docId, signal);
} else {
const pulled = await this.storage.loadDocServerClockPulled(docId);
if (pulled === null || pulled !== this.status.serverClocks.get(docId)) {
if (
pulled === null ||
pulled !== this.status.serverClocks.get(normalizeServerDocId(docId))
) {
await this.jobs.pull(docId, signal);
}
}
Expand Down Expand Up @@ -204,10 +207,13 @@ export class DocEngineRemotePart {
serverClock,
} = serverData;
await this.storage.saveServerClock(
new Map([[docId, serverClock]]),
new Map([[normalizeServerDocId(docId), serverClock]]),
signal
);
this.actions.updateServerClock(docId, serverClock);
this.actions.updateServerClock(
normalizeServerDocId(docId),
serverClock
);
await this.storage.commitDocAsServerUpdate(
docId,
newData,
Expand Down Expand Up @@ -242,10 +248,13 @@ export class DocEngineRemotePart {
signal
);
await this.storage.saveServerClock(
new Map([[docId, serverClock]]),
new Map([[normalizeServerDocId(docId), serverClock]]),
signal
);
this.actions.updateServerClock(docId, serverClock);
this.actions.updateServerClock(
normalizeServerDocId(docId),
serverClock
);
}
await this.storage.saveDocPushedSeqNum(docId, seqNum, signal);
}
Expand Down Expand Up @@ -275,10 +284,10 @@ export class DocEngineRemotePart {
update: newData,
});
await this.storage.saveServerClock(
new Map([[docId, serverClock]]),
new Map([[normalizeServerDocId(docId), serverClock]]),
signal
);
this.actions.updateServerClock(docId, serverClock);
this.actions.updateServerClock(normalizeServerDocId(docId), serverClock);
},
save: async (
docId: string,
Expand All @@ -287,10 +296,10 @@ export class DocEngineRemotePart {
) => {
const serverClock = jobs.reduce((a, b) => Math.max(a, b.serverClock), 0);
await this.storage.saveServerClock(
new Map([[docId, serverClock]]),
new Map([[normalizeServerDocId(docId), serverClock]]),
signal
);
this.actions.updateServerClock(docId, serverClock);
this.actions.updateServerClock(normalizeServerDocId(docId), serverClock);
if (this.status.connectedDocs.has(docId)) {
const data = jobs
.map(j => j.update)
Expand Down Expand Up @@ -543,3 +552,48 @@ export class DocEngineRemotePart {
this.status.jobDocQueue.updatePriority(docId, priority);
}
}

// use normalized id in server clock
function normalizeServerDocId(raw: string) {
enum DocVariant {
Workspace = 'workspace',
Page = 'page',
Space = 'space',
Settings = 'settings',
Unknown = 'unknown',
}

try {
if (!raw.length) {
throw new Error('Invalid Empty Doc ID');
}

let parts = raw.split(':');

if (parts.length > 3) {
// special adapt case `wsId:space:page:pageId`
if (parts[1] === DocVariant.Space && parts[2] === DocVariant.Page) {
parts = [parts[0], DocVariant.Space, parts[3]];
} else {
throw new Error(`Invalid format of Doc ID: ${raw}`);
}
} else if (parts.length === 2) {
// `${variant}:${guid}`
throw new Error('not supported');
} else if (parts.length === 1) {
// ${ws} or ${pageId}
parts = ['', DocVariant.Unknown, parts[0]];
}

const docId = parts.at(2);

if (!docId) {
throw new Error('ID is required');
}

return docId;
} catch (err) {
logger.error('Error on normalize docId ' + raw, err);
return raw;
}
}

0 comments on commit 97c4ae4

Please sign in to comment.