From bb09c294d5f275bab75dd3b13ff8e932c915c601 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 12 Nov 2025 09:48:23 +0200 Subject: [PATCH 1/7] Reduce batch size for bucket checksum pre-calculations. --- .../storage/implementation/MongoChecksums.ts | 2 +- .../storage/implementation/MongoCompactor.ts | 29 ++++++++++++++----- 2 files changed, 23 insertions(+), 8 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts index b4b8aac5..abcb1584 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts @@ -169,7 +169,7 @@ export class MongoChecksums { // Limit the number of buckets we query for at a time. const bucketBatchLimit = this.options?.bucketBatchLimit ?? DEFAULT_BUCKET_BATCH_LIMIT; - if (batch.length < bucketBatchLimit) { + if (batch.length <= bucketBatchLimit) { // Single batch - no need for splitting the batch and merging results return await this.computePartialChecksumsInternal(batch); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index f3606c83..75f99f7e 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -119,7 +119,7 @@ export class MongoCompactor { // All done break; } - for (let bucket of buckets) { + for (let { bucket } of buckets) { await this.compactSingleBucket(bucket); } } @@ -482,10 +482,20 @@ export class MongoCompactor { break; } const start = Date.now(); - logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`); + logger.info(`Calculating checksums for batch of ${buckets.length} buckets`); - await this.updateChecksumsBatch(buckets); - logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`); + // Filter batch by estimated bucket size, to reduce possibility of timeouts + let checkBuckets: typeof buckets = []; + let totalCountEstimate = 0; + for (let bucket of buckets) { + checkBuckets.push(bucket); + totalCountEstimate += bucket.estimatedCount; + if (totalCountEstimate > 50_000) { + break; + } + } + await this.updateChecksumsBatch(checkBuckets.map((b) => b.bucket)); + logger.info(`Updated checksums for batch of ${checkBuckets.length} buckets in ${Date.now() - start}ms`); count += buckets.length; } return { buckets: count }; @@ -497,7 +507,9 @@ export class MongoCompactor { * This cannot be used to iterate on its own - the client is expected to process these buckets and * set estimate_since_compact.count: 0 when done, before fetching the next batch. */ - private async dirtyBucketBatch(options: { minBucketChanges: number }): Promise { + private async dirtyBucketBatch(options: { + minBucketChanges: number; + }): Promise<{ bucket: string; estimatedCount: number }[]> { if (options.minBucketChanges <= 0) { throw new ReplicationAssertionError('minBucketChanges must be >= 1'); } @@ -515,13 +527,16 @@ export class MongoCompactor { sort: { 'estimate_since_compact.count': -1 }, - limit: 5_000, + limit: 200, maxTimeMS: MONGO_OPERATION_TIMEOUT_MS } ) .toArray(); - return dirtyBuckets.map((bucket) => bucket._id.b); + return dirtyBuckets.map((bucket) => ({ + bucket: bucket._id.b, + estimatedCount: bucket.estimate_since_compact!.count + (bucket.compacted_state?.count ?? 0) + })); } private async updateChecksumsBatch(buckets: string[]) { From 1583e8a7e8ca130c59093e429a46bcbdf1c0feb8 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 12 Nov 2025 09:55:47 +0200 Subject: [PATCH 2/7] Avoid repeatedly re-compacting the same buckets. --- .../storage/implementation/MongoCompactor.ts | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 75f99f7e..37854b1a 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -113,14 +113,23 @@ export class MongoCompactor { private async compactDirtyBuckets() { while (!this.signal?.aborted) { - // Process all buckets with 1 or more changes since last time - const buckets = await this.dirtyBucketBatch({ minBucketChanges: 1 }); + // Process all buckets with 10 or more changes since last time. + // We exclude the last 100 compacted buckets, to avoid repeatedly re-compacting the same buckets over and over + // if they are modified while compacting. + const TRACK_RECENTLY_COMPACTED_NUMBER = 100; + + let recentlyCompacted: string[] = []; + const buckets = await this.dirtyBucketBatch({ minBucketChanges: 10, exclude: recentlyCompacted }); if (buckets.length == 0) { // All done break; } for (let { bucket } of buckets) { await this.compactSingleBucket(bucket); + recentlyCompacted.push(bucket); + } + if (recentlyCompacted.length > TRACK_RECENTLY_COMPACTED_NUMBER) { + recentlyCompacted = recentlyCompacted.slice(-TRACK_RECENTLY_COMPACTED_NUMBER); } } } @@ -509,6 +518,7 @@ export class MongoCompactor { */ private async dirtyBucketBatch(options: { minBucketChanges: number; + exclude?: string[]; }): Promise<{ bucket: string; estimatedCount: number }[]> { if (options.minBucketChanges <= 0) { throw new ReplicationAssertionError('minBucketChanges must be >= 1'); @@ -518,7 +528,8 @@ export class MongoCompactor { .find( { '_id.g': this.group_id, - 'estimate_since_compact.count': { $gte: options.minBucketChanges } + 'estimate_since_compact.count': { $gte: options.minBucketChanges }, + '_id.b': { $nin: options.exclude ?? [] } }, { projection: { From 190972f246df3882ea5a345a7ab78a3b349f0dc3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 12 Nov 2025 09:58:12 +0200 Subject: [PATCH 3/7] Fix projection. --- .../src/storage/implementation/MongoCompactor.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index 37854b1a..ffee2fb2 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -533,7 +533,9 @@ export class MongoCompactor { }, { projection: { - _id: 1 + _id: 1, + estimate_since_compact: 1, + compacted_state: 1 }, sort: { 'estimate_since_compact.count': -1 From 9cd5de73f4cee29b7503f94246ddb20331d16782 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 12 Nov 2025 10:05:11 +0200 Subject: [PATCH 4/7] Fix compact tests. --- .../storage/implementation/MongoCompactor.ts | 8 +++++++- .../test/src/storage_compacting.test.ts | 1 + .../src/tests/register-compacting-tests.ts | 18 ++++++++++++------ .../src/storage/SyncRulesBucketStorage.ts | 5 +++++ 4 files changed, 25 insertions(+), 7 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index ffee2fb2..50ee3ac1 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -61,6 +61,7 @@ export interface MongoCompactOptions extends storage.CompactOptions {} const DEFAULT_CLEAR_BATCH_LIMIT = 5000; const DEFAULT_MOVE_BATCH_LIMIT = 2000; const DEFAULT_MOVE_BATCH_QUERY_LIMIT = 10_000; +const DEFAULT_MIN_BUCKET_CHANGES = 10; /** This default is primarily for tests. */ const DEFAULT_MEMORY_LIMIT_MB = 64; @@ -73,6 +74,7 @@ export class MongoCompactor { private moveBatchLimit: number; private moveBatchQueryLimit: number; private clearBatchLimit: number; + private minBucketChanges: number; private maxOpId: bigint; private buckets: string[] | undefined; private signal?: AbortSignal; @@ -88,6 +90,7 @@ export class MongoCompactor { this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT; this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT; this.clearBatchLimit = options?.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT; + this.minBucketChanges = options?.minBucketChanges ?? DEFAULT_MIN_BUCKET_CHANGES; this.maxOpId = options?.maxOpId ?? 0n; this.buckets = options?.compactBuckets; this.signal = options?.signal; @@ -119,7 +122,10 @@ export class MongoCompactor { const TRACK_RECENTLY_COMPACTED_NUMBER = 100; let recentlyCompacted: string[] = []; - const buckets = await this.dirtyBucketBatch({ minBucketChanges: 10, exclude: recentlyCompacted }); + const buckets = await this.dirtyBucketBatch({ + minBucketChanges: this.minBucketChanges, + exclude: recentlyCompacted + }); if (buckets.length == 0) { // All done break; diff --git a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts index 40ac9f0e..a1ddc094 100644 --- a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts @@ -62,6 +62,7 @@ bucket_definitions: clearBatchLimit: 200, moveBatchLimit: 10, moveBatchQueryLimit: 10, + minBucketChanges: 1, maxOpId: checkpoint, signal: null as any }); diff --git a/packages/service-core-tests/src/tests/register-compacting-tests.ts b/packages/service-core-tests/src/tests/register-compacting-tests.ts index 06504a60..b214a2be 100644 --- a/packages/service-core-tests/src/tests/register-compacting-tests.ts +++ b/packages/service-core-tests/src/tests/register-compacting-tests.ts @@ -80,7 +80,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 2, moveBatchLimit: 1, - moveBatchQueryLimit: 1 + moveBatchQueryLimit: 1, + minBucketChanges: 1 }); const batchAfter = await test_utils.oneFromAsync( @@ -207,7 +208,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 2, moveBatchLimit: 1, - moveBatchQueryLimit: 1 + moveBatchQueryLimit: 1, + minBucketChanges: 1 }); const batchAfter = await test_utils.oneFromAsync( @@ -300,7 +302,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 2, moveBatchLimit: 1, - moveBatchQueryLimit: 1 + moveBatchQueryLimit: 1, + minBucketChanges: 1 }); const batchAfter = await test_utils.oneFromAsync( @@ -412,7 +415,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 100, moveBatchLimit: 100, - moveBatchQueryLimit: 100 // Larger limit for a larger window of operations + moveBatchQueryLimit: 100, // Larger limit for a larger window of operations + minBucketChanges: 1 }); const batchAfter = await test_utils.fromAsync( @@ -498,7 +502,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 2, moveBatchLimit: 1, - moveBatchQueryLimit: 1 + moveBatchQueryLimit: 1, + minBucketChanges: 1 }); const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { @@ -572,7 +577,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 20, moveBatchLimit: 10, - moveBatchQueryLimit: 10 + moveBatchQueryLimit: 10, + minBucketChanges: 1 }); const checkpoint2 = result2!.flushed_op; diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 991166c6..1811773f 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -217,6 +217,11 @@ export interface CompactOptions { /** Minimum of 1 */ moveBatchQueryLimit?: number; + /** + * Minimof of 1, default of 10. + */ + minBucketChanges?: number; + /** * Internal/testing use: Cache size for compacting parameters. */ From 2c1e778547c1a460edb05cd93c10366e305ed8e1 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 12 Nov 2025 10:07:01 +0200 Subject: [PATCH 5/7] Changeset. --- .changeset/mean-zoos-retire.md | 7 +++++++ .changeset/shaggy-queens-cheat.md | 7 +++++++ 2 files changed, 14 insertions(+) create mode 100644 .changeset/mean-zoos-retire.md create mode 100644 .changeset/shaggy-queens-cheat.md diff --git a/.changeset/mean-zoos-retire.md b/.changeset/mean-zoos-retire.md new file mode 100644 index 00000000..4883a414 --- /dev/null +++ b/.changeset/mean-zoos-retire.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core-tests': patch +'@powersync/service-core': patch +--- + +Avoid re-compacting recently compacted buckets. diff --git a/.changeset/shaggy-queens-cheat.md b/.changeset/shaggy-queens-cheat.md new file mode 100644 index 00000000..1fd7280a --- /dev/null +++ b/.changeset/shaggy-queens-cheat.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core-tests': patch +'@powersync/service-core': patch +--- + +Reduce batch size for checksum pre-calculations to reduce timeouts. From b457d92e491b11e1bed20b2fb801994b15b7f97d Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 12 Nov 2025 10:17:25 +0200 Subject: [PATCH 6/7] Fix another test. --- packages/service-core-tests/src/tests/register-sync-tests.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 56d29336..e721a2b3 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -1141,7 +1141,9 @@ bucket_definitions: await batch.commit('0/2'); }); - await bucketStorage.compact(); + await bucketStorage.compact({ + minBucketChanges: 1 + }); const lines2 = await getCheckpointLines(iter, { consume: true }); From add62d42a515ed34429cafdf41915f7e93609900 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 12 Nov 2025 13:31:17 +0200 Subject: [PATCH 7/7] Update packages/service-core/src/storage/SyncRulesBucketStorage.ts Co-authored-by: stevensJourney <51082125+stevensJourney@users.noreply.github.com> --- packages/service-core/src/storage/SyncRulesBucketStorage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 1811773f..23edf025 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -218,7 +218,7 @@ export interface CompactOptions { moveBatchQueryLimit?: number; /** - * Minimof of 1, default of 10. + * Minimum of 1, default of 10. */ minBucketChanges?: number;