Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/mean-zoos-retire.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-core': patch
---

Avoid re-compacting recently compacted buckets.
7 changes: 7 additions & 0 deletions .changeset/shaggy-queens-cheat.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-mongodb-storage': patch
'@powersync/service-core-tests': patch
'@powersync/service-core': patch
---

Reduce batch size for checksum pre-calculations to reduce timeouts.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ export class MongoChecksums {
// Limit the number of buckets we query for at a time.
const bucketBatchLimit = this.options?.bucketBatchLimit ?? DEFAULT_BUCKET_BATCH_LIMIT;

if (batch.length < bucketBatchLimit) {
if (batch.length <= bucketBatchLimit) {
// Single batch - no need for splitting the batch and merging results
return await this.computePartialChecksumsInternal(batch);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ export interface MongoCompactOptions extends storage.CompactOptions {}
const DEFAULT_CLEAR_BATCH_LIMIT = 5000;
const DEFAULT_MOVE_BATCH_LIMIT = 2000;
const DEFAULT_MOVE_BATCH_QUERY_LIMIT = 10_000;
const DEFAULT_MIN_BUCKET_CHANGES = 10;

/** This default is primarily for tests. */
const DEFAULT_MEMORY_LIMIT_MB = 64;
Expand All @@ -73,6 +74,7 @@ export class MongoCompactor {
private moveBatchLimit: number;
private moveBatchQueryLimit: number;
private clearBatchLimit: number;
private minBucketChanges: number;
private maxOpId: bigint;
private buckets: string[] | undefined;
private signal?: AbortSignal;
Expand All @@ -88,6 +90,7 @@ export class MongoCompactor {
this.moveBatchLimit = options?.moveBatchLimit ?? DEFAULT_MOVE_BATCH_LIMIT;
this.moveBatchQueryLimit = options?.moveBatchQueryLimit ?? DEFAULT_MOVE_BATCH_QUERY_LIMIT;
this.clearBatchLimit = options?.clearBatchLimit ?? DEFAULT_CLEAR_BATCH_LIMIT;
this.minBucketChanges = options?.minBucketChanges ?? DEFAULT_MIN_BUCKET_CHANGES;
this.maxOpId = options?.maxOpId ?? 0n;
this.buckets = options?.compactBuckets;
this.signal = options?.signal;
Expand All @@ -113,14 +116,26 @@ export class MongoCompactor {

private async compactDirtyBuckets() {
while (!this.signal?.aborted) {
// Process all buckets with 1 or more changes since last time
const buckets = await this.dirtyBucketBatch({ minBucketChanges: 1 });
// Process all buckets with 10 or more changes since last time.
// We exclude the last 100 compacted buckets, to avoid repeatedly re-compacting the same buckets over and over
// if they are modified while compacting.
const TRACK_RECENTLY_COMPACTED_NUMBER = 100;

let recentlyCompacted: string[] = [];
const buckets = await this.dirtyBucketBatch({
minBucketChanges: this.minBucketChanges,
exclude: recentlyCompacted
});
if (buckets.length == 0) {
// All done
break;
}
for (let bucket of buckets) {
for (let { bucket } of buckets) {
await this.compactSingleBucket(bucket);
recentlyCompacted.push(bucket);
}
if (recentlyCompacted.length > TRACK_RECENTLY_COMPACTED_NUMBER) {
recentlyCompacted = recentlyCompacted.slice(-TRACK_RECENTLY_COMPACTED_NUMBER);
}
}
}
Expand Down Expand Up @@ -482,10 +497,20 @@ export class MongoCompactor {
break;
}
const start = Date.now();
logger.info(`Calculating checksums for batch of ${buckets.length} buckets, starting at ${buckets[0]}`);
logger.info(`Calculating checksums for batch of ${buckets.length} buckets`);

await this.updateChecksumsBatch(buckets);
logger.info(`Updated checksums for batch of ${buckets.length} buckets in ${Date.now() - start}ms`);
// Filter batch by estimated bucket size, to reduce possibility of timeouts
let checkBuckets: typeof buckets = [];
let totalCountEstimate = 0;
for (let bucket of buckets) {
checkBuckets.push(bucket);
totalCountEstimate += bucket.estimatedCount;
if (totalCountEstimate > 50_000) {
break;
}
}
await this.updateChecksumsBatch(checkBuckets.map((b) => b.bucket));
logger.info(`Updated checksums for batch of ${checkBuckets.length} buckets in ${Date.now() - start}ms`);
count += buckets.length;
}
return { buckets: count };
Expand All @@ -497,7 +522,10 @@ export class MongoCompactor {
* This cannot be used to iterate on its own - the client is expected to process these buckets and
* set estimate_since_compact.count: 0 when done, before fetching the next batch.
*/
private async dirtyBucketBatch(options: { minBucketChanges: number }): Promise<string[]> {
private async dirtyBucketBatch(options: {
minBucketChanges: number;
exclude?: string[];
}): Promise<{ bucket: string; estimatedCount: number }[]> {
if (options.minBucketChanges <= 0) {
throw new ReplicationAssertionError('minBucketChanges must be >= 1');
}
Expand All @@ -506,22 +534,28 @@ export class MongoCompactor {
.find(
{
'_id.g': this.group_id,
'estimate_since_compact.count': { $gte: options.minBucketChanges }
'estimate_since_compact.count': { $gte: options.minBucketChanges },
'_id.b': { $nin: options.exclude ?? [] }
},
{
projection: {
_id: 1
_id: 1,
estimate_since_compact: 1,
compacted_state: 1
},
sort: {
'estimate_since_compact.count': -1
},
limit: 5_000,
limit: 200,
maxTimeMS: MONGO_OPERATION_TIMEOUT_MS
}
)
.toArray();

return dirtyBuckets.map((bucket) => bucket._id.b);
return dirtyBuckets.map((bucket) => ({
bucket: bucket._id.b,
estimatedCount: bucket.estimate_since_compact!.count + (bucket.compacted_state?.count ?? 0)
}));
}

private async updateChecksumsBatch(buckets: string[]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ bucket_definitions:
clearBatchLimit: 200,
moveBatchLimit: 10,
moveBatchQueryLimit: 10,
minBucketChanges: 1,
maxOpId: checkpoint,
signal: null as any
});
Expand Down
18 changes: 12 additions & 6 deletions packages/service-core-tests/src/tests/register-compacting-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ bucket_definitions:
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
moveBatchQueryLimit: 1,
minBucketChanges: 1
});

const batchAfter = await test_utils.oneFromAsync(
Expand Down Expand Up @@ -207,7 +208,8 @@ bucket_definitions:
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
moveBatchQueryLimit: 1,
minBucketChanges: 1
});

const batchAfter = await test_utils.oneFromAsync(
Expand Down Expand Up @@ -300,7 +302,8 @@ bucket_definitions:
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
moveBatchQueryLimit: 1,
minBucketChanges: 1
});

const batchAfter = await test_utils.oneFromAsync(
Expand Down Expand Up @@ -412,7 +415,8 @@ bucket_definitions:
await bucketStorage.compact({
clearBatchLimit: 100,
moveBatchLimit: 100,
moveBatchQueryLimit: 100 // Larger limit for a larger window of operations
moveBatchQueryLimit: 100, // Larger limit for a larger window of operations
minBucketChanges: 1
});

const batchAfter = await test_utils.fromAsync(
Expand Down Expand Up @@ -498,7 +502,8 @@ bucket_definitions:
await bucketStorage.compact({
clearBatchLimit: 2,
moveBatchLimit: 1,
moveBatchQueryLimit: 1
moveBatchQueryLimit: 1,
minBucketChanges: 1
});

const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => {
Expand Down Expand Up @@ -572,7 +577,8 @@ bucket_definitions:
await bucketStorage.compact({
clearBatchLimit: 20,
moveBatchLimit: 10,
moveBatchQueryLimit: 10
moveBatchQueryLimit: 10,
minBucketChanges: 1
});

const checkpoint2 = result2!.flushed_op;
Expand Down
4 changes: 3 additions & 1 deletion packages/service-core-tests/src/tests/register-sync-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1141,7 +1141,9 @@ bucket_definitions:
await batch.commit('0/2');
});

await bucketStorage.compact();
await bucketStorage.compact({
minBucketChanges: 1
});

const lines2 = await getCheckpointLines(iter, { consume: true });

Expand Down
5 changes: 5 additions & 0 deletions packages/service-core/src/storage/SyncRulesBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,11 @@ export interface CompactOptions {
/** Minimum of 1 */
moveBatchQueryLimit?: number;

/**
* Minimum of 1, default of 10.
*/
minBucketChanges?: number;

/**
* Internal/testing use: Cache size for compacting parameters.
*/
Expand Down