Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/wet-grapes-study.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

Avoid frequent write checkpoint lookups when the user does not have one.
Original file line number Diff line number Diff line change
Expand Up @@ -714,22 +714,28 @@ export class MongoSyncBucketStorage
let lastCheckpoint: ReplicationCheckpoint | null = null;

const iter = this.sharedIter[Symbol.asyncIterator](options.signal);

let writeCheckpoint: bigint | null = null;
// true if we queried the initial write checkpoint, even if it doesn't exist
let queriedInitialWriteCheckpoint = false;

for await (const nextCheckpoint of iter) {
// lsn changes are not important by itself.
// What is important is:
// 1. checkpoint (op_id) changes.
// 2. write checkpoint changes for the specific user

if (nextCheckpoint.lsn != null) {
writeCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({
if (nextCheckpoint.lsn != null && !queriedInitialWriteCheckpoint) {
// Lookup the first write checkpoint for the user when we can.
// There will not actually be one in all cases.
writeCheckpoint = await this.writeCheckpointAPI.lastWriteCheckpoint({
sync_rules_id: this.group_id,
user_id: options.user_id,
heads: {
'1': nextCheckpoint.lsn
}
});
queriedInitialWriteCheckpoint = true;
}

if (
Expand All @@ -739,12 +745,13 @@ export class MongoSyncBucketStorage
) {
// No change - wait for next one
// In some cases, many LSNs may be produced in a short time.
// Add a delay to throttle the write checkpoint lookup a bit.
// Add a delay to throttle the loop a bit.
await timers.setTimeout(20 + 10 * Math.random());
continue;
}

if (lastCheckpoint == null) {
// First message for this stream - "INVALIDATE_ALL" means it will lookup all data
yield {
base: nextCheckpoint,
writeCheckpoint,
Expand All @@ -758,7 +765,9 @@ export class MongoSyncBucketStorage

let updatedWriteCheckpoint = updates.updatedWriteCheckpoints.get(options.user_id) ?? null;
if (updates.invalidateWriteCheckpoints) {
updatedWriteCheckpoint ??= await this.writeCheckpointAPI.lastWriteCheckpoint({
// Invalidated means there were too many updates to track the individual ones,
// so we switch to "polling" (querying directly in each stream).
updatedWriteCheckpoint = await this.writeCheckpointAPI.lastWriteCheckpoint({
sync_rules_id: this.group_id,
user_id: options.user_id,
heads: {
Expand All @@ -768,6 +777,9 @@ export class MongoSyncBucketStorage
}
if (updatedWriteCheckpoint != null && (writeCheckpoint == null || updatedWriteCheckpoint > writeCheckpoint)) {
writeCheckpoint = updatedWriteCheckpoint;
// If it happened that we haven't queried a write checkpoint at this point,
// then we don't need to anymore, since we got an updated one.
queriedInitialWriteCheckpoint = true;
}

yield {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,10 @@ export type MongoCheckpointAPIOptions = {
export class MongoWriteCheckpointAPI implements storage.WriteCheckpointAPI {
readonly db: PowerSyncMongo;
private _mode: storage.WriteCheckpointMode;
private sync_rules_id: number;

constructor(options: MongoCheckpointAPIOptions) {
this.db = options.db;
this._mode = options.mode;
this.sync_rules_id = options.sync_rules_id;
}

get writeCheckpointMode() {
Expand Down