diff --git a/.changeset/wet-grapes-study.md b/.changeset/wet-grapes-study.md new file mode 100644 index 000000000..323beb700 --- /dev/null +++ b/.changeset/wet-grapes-study.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +Avoid frequent write checkpoint lookups when the user does not have one. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 18ab73ea5..8853d99a6 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -714,7 +714,10 @@ export class MongoSyncBucketStorage let lastCheckpoint: ReplicationCheckpoint | null = null; const iter = this.sharedIter[Symbol.asyncIterator](options.signal); + let writeCheckpoint: bigint | null = null; + // true if we queried the initial write checkpoint, even if it doesn't exist + let queriedInitialWriteCheckpoint = false; for await (const nextCheckpoint of iter) { // lsn changes are not important by itself. @@ -722,14 +725,17 @@ export class MongoSyncBucketStorage // 1. checkpoint (op_id) changes. // 2. write checkpoint changes for the specific user - if (nextCheckpoint.lsn != null) { - writeCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({ + if (nextCheckpoint.lsn != null && !queriedInitialWriteCheckpoint) { + // Lookup the first write checkpoint for the user when we can. + // There will not actually be one in all cases. + writeCheckpoint = await this.writeCheckpointAPI.lastWriteCheckpoint({ sync_rules_id: this.group_id, user_id: options.user_id, heads: { '1': nextCheckpoint.lsn } }); + queriedInitialWriteCheckpoint = true; } if ( @@ -739,12 +745,13 @@ export class MongoSyncBucketStorage ) { // No change - wait for next one // In some cases, many LSNs may be produced in a short time. - // Add a delay to throttle the write checkpoint lookup a bit. + // Add a delay to throttle the loop a bit. await timers.setTimeout(20 + 10 * Math.random()); continue; } if (lastCheckpoint == null) { + // First message for this stream - "INVALIDATE_ALL" means it will lookup all data yield { base: nextCheckpoint, writeCheckpoint, @@ -758,7 +765,9 @@ export class MongoSyncBucketStorage let updatedWriteCheckpoint = updates.updatedWriteCheckpoints.get(options.user_id) ?? null; if (updates.invalidateWriteCheckpoints) { - updatedWriteCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({ + // Invalidated means there were too many updates to track the individual ones, + // so we switch to "polling" (querying directly in each stream). + updatedWriteCheckpoint = await this.writeCheckpointAPI.lastWriteCheckpoint({ sync_rules_id: this.group_id, user_id: options.user_id, heads: { @@ -768,6 +777,9 @@ export class MongoSyncBucketStorage } if (updatedWriteCheckpoint != null && (writeCheckpoint == null || updatedWriteCheckpoint > writeCheckpoint)) { writeCheckpoint = updatedWriteCheckpoint; + // If it happened that we haven't queried a write checkpoint at this point, + // then we don't need to anymore, since we got an updated one. + queriedInitialWriteCheckpoint = true; } yield { diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts index 4431cd994..4baf65e7d 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoWriteCheckpointAPI.ts @@ -12,12 +12,10 @@ export type MongoCheckpointAPIOptions = { export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI { readonly db: PowerSyncMongo; private _mode: storage.WriteCheckpointMode; - private sync_rules_id: number; constructor(options: MongoCheckpointAPIOptions) { this.db = options.db; this._mode = options.mode; - this.sync_rules_id = options.sync_rules_id; } get writeCheckpointMode() {