From 14d22c9ba6bcb05f1cf3b72a91e022a84bd64220 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 4 Sep 2025 13:39:01 +0200 Subject: [PATCH 1/3] Minor cleanup. --- .../src/storage/implementation/MongoWriteCheckpointAPI.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 4431cd994..4baf65e7d 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -12,12 +12,10 @@ export type MongoCheckpointAPIOptions = { export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { readonly db: PowerSyncMongo; private _mode: storage.WriteCheckpointMode; - private sync_rules_id: number; constructor(options: MongoCheckpointAPIOptions) { this.db = options.db; this._mode = options.mode; - this.sync_rules_id = options.sync_rules_id; } get writeCheckpointMode() { From d448a5f10d31d3471fc56fdae5fe2dd50e191b18 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 4 Sep 2025 13:39:16 +0200 Subject: [PATCH 2/3] Only query write checkpoint once initially. --- .../implementation/MongoSyncBucketStorage.ts | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 18ab73ea5..8853d99a6 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -714,7 +714,10 @@ export class MongoSyncBucketStorage let lastCheckpoint: ReplicationCheckpoint | null = null; const iter = this.sharedIter[Symbol.asyncIterator](options.signal); + let writeCheckpoint: bigint | null = null; + // true if we queried the initial write checkpoint, even if it doesn't exist + let queriedInitialWriteCheckpoint = false; for await (const nextCheckpoint of iter) { // lsn changes are not important by itself. @@ -722,14 +725,17 @@ export class MongoSyncBucketStorage // 1. checkpoint (op_id) changes. // 2. write checkpoint changes for the specific user - if (nextCheckpoint.lsn != null) { - writeCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({ + if (nextCheckpoint.lsn != null && !queriedInitialWriteCheckpoint) { + // Lookup the first write checkpoint for the user when we can. + // There will not actually be one in all cases. + writeCheckpoint = await this.writeCheckpointAPI.lastWriteCheckpoint({ sync_rules_id: this.group_id, user_id: options.user_id, heads: { '1': nextCheckpoint.lsn } }); + queriedInitialWriteCheckpoint = true; } if ( @@ -739,12 +745,13 @@ export class MongoSyncBucketStorage ) { // No change - wait for next one // In some cases, many LSNs may be produced in a short time. - // Add a delay to throttle the write checkpoint lookup a bit. + // Add a delay to throttle the loop a bit. await timers.setTimeout(20 + 10 * Math.random()); continue; } if (lastCheckpoint == null) { + // First message for this stream - "INVALIDATE_ALL" means it will lookup all data yield { base: nextCheckpoint, writeCheckpoint, @@ -758,7 +765,9 @@ export class MongoSyncBucketStorage let updatedWriteCheckpoint = updates.updatedWriteCheckpoints.get(options.user_id) ?? null; if (updates.invalidateWriteCheckpoints) { - updatedWriteCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({ + // Invalidated means there were too many updates to track the individual ones, + // so we switch to "polling" (querying directly in each stream). + updatedWriteCheckpoint = await this.writeCheckpointAPI.lastWriteCheckpoint({ sync_rules_id: this.group_id, user_id: options.user_id, heads: { @@ -768,6 +777,9 @@ export class MongoSyncBucketStorage } if (updatedWriteCheckpoint != null && (writeCheckpoint == null || updatedWriteCheckpoint > writeCheckpoint)) { writeCheckpoint = updatedWriteCheckpoint; + // If it happened that we haven't queried a write checkpoint at this point, + // then we don't need to anymore, since we got an updated one. + queriedInitialWriteCheckpoint = true; } yield { From 3269f76c450b2cfb012697e54293747ef871bac7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 4 Sep 2025 13:40:20 +0200 Subject: [PATCH 3/3] Changset. --- .changeset/wet-grapes-study.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/wet-grapes-study.md diff --git a/.changeset/wet-grapes-study.md b/.changeset/wet-grapes-study.md new file mode 100644 index 000000000..323beb700 --- /dev/null +++ b/.changeset/wet-grapes-study.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Avoid frequent write checkpoint lookups when the user does not have one.