diff --git a/.changeset/mean-zoos-retire.md b/.changeset/mean-zoos-retire.md new file mode 100644 index 00000000..4883a414 --- /dev/null +++ b/.changeset/mean-zoos-retire.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core-tests': patch +'@powersync/service-core': patch +--- + +Avoid re-compacting recently compacted buckets. diff --git a/.changeset/shaggy-queens-cheat.md b/.changeset/shaggy-queens-cheat.md new file mode 100644 index 00000000..1fd7280a --- /dev/null +++ b/.changeset/shaggy-queens-cheat.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-mongodb-storage': patch +'@powersync/service-core-tests': patch +'@powersync/service-core': patch +--- + +Reduce batch size for checksum pre-calculations to reduce timeouts. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts index b4b8aac5..abcb1584 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoChecksums.ts @@ -169,7 +169,7 @@ export class MongoChecksums { // Limit the number of buckets we query for at a time. const bucketBatchLimit = this.options?.bucketBatchLimit ?? DEFAULT_BUCKET_BATCH_LIMIT; - if (batch.length < bucketBatchLimit) { + if (batch.length <= bucketBatchLimit) { // Single batch - no need for splitting the batch and merging results return await this.computePartialChecksumsInternal(batch); } diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts index f3606c83..50ee3ac1 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoCompactor.ts @@ -61,6 +61,7 @@ export interface MongoCompactOptions extends storage.CompactOptions {} const DEFAULT_CLEAR_BATCH_LIMIT = 5000; const DEFAULT_MOVE_BATCH_LIMIT = 2000; const DEFAULT_MOVE_BATCH_QUERY_LIMIT = 10_000; +const DEFAULT_MIN_BUCKET_CHANGES = 10; /** This default is primarily for tests. */ const DEFAULT_MEMORY_LIMIT_MB = 64; @@ -73,6 +74,7 @@ export class MongoCompactor { private moveBatchLimit: number; private moveBatchQueryLimit: number; private clearBatchLimit: number; + private minBucketChanges: number; private maxOpId: bigint; private buckets: string[] | undefined; private signal?: AbortSignal; @@ -88,6 +90,7 @@ export class MongoCompactor { this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT; this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT; this.clearBatchLimit = options?.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT; + this.minBucketChanges = options?.minBucketChanges ?? DEFAULT_MIN_BUCKET_CHANGES; this.maxOpId = options?.maxOpId ?? 0n; this.buckets = options?.compactBuckets; this.signal = options?.signal; @@ -113,14 +116,26 @@ export class MongoCompactor { private async compactDirtyBuckets() { while (!this.signal?.aborted) { - // Process all buckets with 1 or more changes since last time - const buckets = await this.dirtyBucketBatch({ minBucketChanges: 1 }); + // Process all buckets with 10 or more changes since last time. + // We exclude the last 100 compacted buckets, to avoid repeatedly re-compacting the same buckets over and over + // if they are modified while compacting. + const TRACK_RECENTLY_COMPACTED_NUMBER = 100; + + let recentlyCompacted: string[] = []; + const buckets = await this.dirtyBucketBatch({ + minBucketChanges: this.minBucketChanges, + exclude: recentlyCompacted + }); if (buckets.length == 0) { // All done break; } - for (let bucket of buckets) { + for (let { bucket } of buckets) { await this.compactSingleBucket(bucket); + recentlyCompacted.push(bucket); + } + if (recentlyCompacted.length > TRACK_RECENTLY_COMPACTED_NUMBER) { + recentlyCompacted = recentlyCompacted.slice(-TRACK_RECENTLY_COMPACTED_NUMBER); } } } @@ -482,10 +497,20 @@ export class MongoCompactor { break; } const start = Date.now(); - logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`); + logger.info(`Calculating checksums for batch of ${buckets.length} buckets`); - await this.updateChecksumsBatch(buckets); - logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`); + // Filter batch by estimated bucket size, to reduce possibility of timeouts + let checkBuckets: typeof buckets = []; + let totalCountEstimate = 0; + for (let bucket of buckets) { + checkBuckets.push(bucket); + totalCountEstimate += bucket.estimatedCount; + if (totalCountEstimate > 50_000) { + break; + } + } + await this.updateChecksumsBatch(checkBuckets.map((b) => b.bucket)); + logger.info(`Updated checksums for batch of ${checkBuckets.length} buckets in ${Date.now() - start}ms`); count += buckets.length; } return { buckets: count }; @@ -497,7 +522,10 @@ export class MongoCompactor { * This cannot be used to iterate on its own - the client is expected to process these buckets and * set estimate_since_compact.count: 0 when done, before fetching the next batch. */ - private async dirtyBucketBatch(options: { minBucketChanges: number }): Promise { + private async dirtyBucketBatch(options: { + minBucketChanges: number; + exclude?: string[]; + }): Promise<{ bucket: string; estimatedCount: number }[]> { if (options.minBucketChanges <= 0) { throw new ReplicationAssertionError('minBucketChanges must be >= 1'); } @@ -506,22 +534,28 @@ export class MongoCompactor { .find( { '_id.g': this.group_id, - 'estimate_since_compact.count': { $gte: options.minBucketChanges } + 'estimate_since_compact.count': { $gte: options.minBucketChanges }, + '_id.b': { $nin: options.exclude ?? [] } }, { projection: { - _id: 1 + _id: 1, + estimate_since_compact: 1, + compacted_state: 1 }, sort: { 'estimate_since_compact.count': -1 }, - limit: 5_000, + limit: 200, maxTimeMS: MONGO_OPERATION_TIMEOUT_MS } ) .toArray(); - return dirtyBuckets.map((bucket) => bucket._id.b); + return dirtyBuckets.map((bucket) => ({ + bucket: bucket._id.b, + estimatedCount: bucket.estimate_since_compact!.count + (bucket.compacted_state?.count ?? 0) + })); } private async updateChecksumsBatch(buckets: string[]) { diff --git a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts index 40ac9f0e..a1ddc094 100644 --- a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts @@ -62,6 +62,7 @@ bucket_definitions: clearBatchLimit: 200, moveBatchLimit: 10, moveBatchQueryLimit: 10, + minBucketChanges: 1, maxOpId: checkpoint, signal: null as any }); diff --git a/packages/service-core-tests/src/tests/register-compacting-tests.ts b/packages/service-core-tests/src/tests/register-compacting-tests.ts index 06504a60..b214a2be 100644 --- a/packages/service-core-tests/src/tests/register-compacting-tests.ts +++ b/packages/service-core-tests/src/tests/register-compacting-tests.ts @@ -80,7 +80,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 2, moveBatchLimit: 1, - moveBatchQueryLimit: 1 + moveBatchQueryLimit: 1, + minBucketChanges: 1 }); const batchAfter = await test_utils.oneFromAsync( @@ -207,7 +208,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 2, moveBatchLimit: 1, - moveBatchQueryLimit: 1 + moveBatchQueryLimit: 1, + minBucketChanges: 1 }); const batchAfter = await test_utils.oneFromAsync( @@ -300,7 +302,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 2, moveBatchLimit: 1, - moveBatchQueryLimit: 1 + moveBatchQueryLimit: 1, + minBucketChanges: 1 }); const batchAfter = await test_utils.oneFromAsync( @@ -412,7 +415,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 100, moveBatchLimit: 100, - moveBatchQueryLimit: 100 // Larger limit for a larger window of operations + moveBatchQueryLimit: 100, // Larger limit for a larger window of operations + minBucketChanges: 1 }); const batchAfter = await test_utils.fromAsync( @@ -498,7 +502,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 2, moveBatchLimit: 1, - moveBatchQueryLimit: 1 + moveBatchQueryLimit: 1, + minBucketChanges: 1 }); const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { @@ -572,7 +577,8 @@ bucket_definitions: await bucketStorage.compact({ clearBatchLimit: 20, moveBatchLimit: 10, - moveBatchQueryLimit: 10 + moveBatchQueryLimit: 10, + minBucketChanges: 1 }); const checkpoint2 = result2!.flushed_op; diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 56d29336..e721a2b3 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -1141,7 +1141,9 @@ bucket_definitions: await batch.commit('0/2'); }); - await bucketStorage.compact(); + await bucketStorage.compact({ + minBucketChanges: 1 + }); const lines2 = await getCheckpointLines(iter, { consume: true }); diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 991166c6..23edf025 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -217,6 +217,11 @@ export interface CompactOptions { /** Minimum of 1 */ moveBatchQueryLimit?: number; + /** + * Minimum of 1, default of 10. + */ + minBucketChanges?: number; + /** * Internal/testing use: Cache size for compacting parameters. */