From f6e94a106de50680a3c1178294169e1b605709e6 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Jun 2024 15:45:23 +0200 Subject: [PATCH 01/15] Incrementally compute checksums and include zero checksums. --- .../service-core/src/storage/BucketStorage.ts | 11 ++- .../storage/mongo/MongoSyncBucketStorage.ts | 17 +++- packages/service-core/src/sync/sync.ts | 42 +++++---- packages/service-core/src/util/utils.ts | 86 +++++++++++++++---- .../test/src/__snapshots__/sync.test.ts.snap | 16 +++- .../test/src/data_storage.test.ts | 6 +- 6 files changed, 138 insertions(+), 40 deletions(-) diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 091bdda9a..99754d4af 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -230,7 +230,16 @@ export interface SyncRulesBucketStorage { options?: BucketDataBatchOptions ): AsyncIterable; - getChecksums(checkpoint: util.OpId, buckets: string[]): Promise; + /** + * Compute checksums for a given list of buckets. + * + * If fromCheckpoint is specified, the result is a diff. Otherwise, it is the full checksum. + */ + getChecksums( + checkpoint: util.OpId, + fromCheckpoint: util.OpId | null, + buckets: string[] + ): Promise; /** * Terminate the sync rules. diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index 012a22f5d..d07f10ac7 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -316,10 +316,21 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { } } - async getChecksums(checkpoint: util.OpId, buckets: string[]): Promise { + async getChecksums( + checkpoint: util.OpId, + fromCheckpoint: util.OpId | null, + buckets: string[] + ): Promise { if (buckets.length == 0) { return []; } + + if (fromCheckpoint == checkpoint) { + return []; + } + + const start = fromCheckpoint ? BigInt(fromCheckpoint) : new bson.MinKey(); + const filters: any[] = []; for (let name of buckets) { filters.push({ @@ -327,7 +338,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { $gt: { g: this.group_id, b: name, - o: new bson.MinKey() + o: start }, $lte: { g: this.group_id, @@ -358,7 +369,7 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { return { bucket: doc._id, count: doc.count, - checksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 4294967295 + checksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff }; }); } diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 618a21a79..032eda9ba 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -78,8 +78,8 @@ async function* streamResponseInner( // This starts with the state from the client. May contain buckets that the user do not have access to (anymore). let dataBuckets = new Map(); - let last_checksums: util.BucketChecksum[] | null = null; - let last_write_checkpoint: bigint | null = null; + let lastChecksums: { checkpoint: util.OpId; checksums: Map } | null = null; + let lastWriteCheckpoint: bigint | null = null; const { raw_data, binary_data } = params; @@ -113,23 +113,22 @@ async function* streamResponseInner( throw new Error(`Too many buckets: ${allBuckets.length}`); } - let checksums: util.BucketChecksum[] | undefined = undefined; - let dataBucketsNew = new Map(); for (let bucket of allBuckets) { dataBucketsNew.set(bucket, dataBuckets.get(bucket) ?? '0'); } dataBuckets = dataBucketsNew; - checksums = await storage.getChecksums(checkpoint, [...dataBuckets.keys()]); + const bucketList = [...dataBuckets.keys()]; + const checksumDiff = await storage.getChecksums(checkpoint, lastChecksums?.checkpoint ?? null, bucketList); - if (last_checksums) { - const diff = util.checksumsDiff(last_checksums, checksums); + if (lastChecksums) { + const diff = util.checksumsDiff(lastChecksums.checksums, bucketList, checksumDiff); if ( - last_write_checkpoint == writeCheckpoint && - diff.removed_buckets.length == 0 && - diff.updated_buckets.length == 0 + lastWriteCheckpoint == writeCheckpoint && + diff.removedBuckets.length == 0 && + diff.updatedBuckets.length == 0 ) { // No changes - don't send anything to the client continue; @@ -137,20 +136,32 @@ async function* streamResponseInner( let message = `Updated checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} | `; - message += `updated: ${limitedBuckets(diff.updated_buckets, 20)} | `; - message += `removed: ${limitedBuckets(diff.removed_buckets, 20)} | `; + message += `updated: ${limitedBuckets(diff.updatedBuckets, 20)} | `; + message += `removed: ${limitedBuckets(diff.removedBuckets, 20)} | `; micro.logger.info(message); const checksum_line: util.StreamingSyncCheckpointDiff = { checkpoint_diff: { last_op_id: checkpoint, write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, - ...diff + removed_buckets: diff.removedBuckets, + updated_buckets: diff.updatedBuckets } }; yield checksum_line; + + lastChecksums = { + checkpoint, + checksums: diff.nextBuckets + }; } else { + const nextBuckets = util.fillEmptyChecksums(bucketList, checksumDiff); + lastChecksums = { + checkpoint, + checksums: nextBuckets + }; + let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; micro.logger.info(message); @@ -158,14 +169,13 @@ async function* streamResponseInner( checkpoint: { last_op_id: checkpoint, write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, - buckets: checksums + buckets: [...nextBuckets.values()] } }; yield checksum_line; } - last_checksums = checksums; - last_write_checkpoint = writeCheckpoint; + lastWriteCheckpoint = writeCheckpoint; yield* bucketDataInBatches(storage, checkpoint, dataBuckets, raw_data, binary_data, signal); diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 4a7bfe462..d5533e8da 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -30,29 +30,85 @@ export function timestampToOpId(ts: bigint): OpId { return ts.toString(10); } -export function checksumsDiff(previous: BucketChecksum[], current: BucketChecksum[]) { - const updated_buckets: BucketChecksum[] = []; +export function fillEmptyChecksums(currentBuckets: string[], checksums: BucketChecksum[]) { + // All current values + const nextBuckets = new Map(); - const previousBuckets = new Map(); - for (let checksum of previous) { - previousBuckets.set(checksum.bucket, checksum); + for (let checksum of checksums) { + // Added checksum + nextBuckets.set(checksum.bucket, checksum); } - for (let checksum of current) { - if (!previousBuckets.has(checksum.bucket)) { - updated_buckets.push(checksum); + + for (let bucket of currentBuckets) { + if (!nextBuckets.has(bucket)) { + // Empty diff - empty bucket + const checksum: BucketChecksum = { + bucket, + checksum: 0, + count: 0 + }; + nextBuckets.set(bucket, checksum); + } + } + + return nextBuckets; +} + +export function checksumsDiff( + previousBuckets: Map, + currentBuckets: string[], + checksumDiff: BucketChecksum[] +) { + // All changed ones + const updatedBuckets = new Map(); + // All current values + const nextBuckets = new Map(); + + for (let cdiff of checksumDiff) { + const p = previousBuckets.get(cdiff.bucket); + if (p == null) { + // Added + updatedBuckets.set(cdiff.bucket, cdiff); + nextBuckets.set(cdiff.bucket, cdiff); } else { - const p = previousBuckets.get(checksum.bucket); - if (p?.checksum != checksum.checksum || p?.count != checksum.count) { - updated_buckets.push(checksum); + // Updated + const checksum: BucketChecksum = { + bucket: cdiff.bucket, + count: p.count + cdiff.count, + checksum: (p.checksum + cdiff.checksum) & 0xffffffff + }; + updatedBuckets.set(checksum.bucket, checksum); + nextBuckets.set(checksum.bucket, checksum); + previousBuckets.delete(cdiff.bucket); + } + } + + for (let bucket of currentBuckets) { + if (!updatedBuckets.has(bucket)) { + // Empty diff - either empty bucket, or unchanged + const p = previousBuckets.get(bucket); + if (p == null) { + // Emtpy bucket + const checksum: BucketChecksum = { + bucket, + checksum: 0, + count: 0 + }; + updatedBuckets.set(bucket, checksum); + nextBuckets.set(bucket, checksum); + } else { + // Unchanged bucket + nextBuckets.set(bucket, p); + previousBuckets.delete(bucket); } - previousBuckets.delete(checksum.bucket); } } - const removed_buckets: string[] = [...previousBuckets.keys()]; + const removedBuckets: string[] = [...previousBuckets.keys()]; return { - updated_buckets, - removed_buckets + updatedBuckets: [...updatedBuckets.values()], + removedBuckets, + nextBuckets }; } diff --git a/packages/service-core/test/src/__snapshots__/sync.test.ts.snap b/packages/service-core/test/src/__snapshots__/sync.test.ts.snap index 561608028..17648be37 100644 --- a/packages/service-core/test/src/__snapshots__/sync.test.ts.snap +++ b/packages/service-core/test/src/__snapshots__/sync.test.ts.snap @@ -12,7 +12,13 @@ exports[`sync - mongodb > expiring token 1`] = ` [ { "checkpoint": { - "buckets": [], + "buckets": [ + { + "bucket": "mybucket[]", + "checksum": 0, + "count": 0, + }, + ], "last_op_id": "0", "write_checkpoint": undefined, }, @@ -135,7 +141,13 @@ exports[`sync - mongodb > sync updates to global data 1`] = ` [ { "checkpoint": { - "buckets": [], + "buckets": [ + { + "bucket": "mybucket[]", + "checksum": 0, + "count": 0, + }, + ], "last_op_id": "0", "write_checkpoint": undefined, }, diff --git a/packages/service-core/test/src/data_storage.test.ts b/packages/service-core/test/src/data_storage.test.ts index c25d32cd9..dc9cf1d13 100644 --- a/packages/service-core/test/src/data_storage.test.ts +++ b/packages/service-core/test/src/data_storage.test.ts @@ -252,7 +252,7 @@ bucket_definitions: { op: 'REMOVE', object_id: 'test1', checksum: c2 } ]); - const checksums = await storage.getChecksums(checkpoint, ['global[]']); + const checksums = await storage.getChecksums(checkpoint, null, ['global[]']); expect(checksums).toEqual([ { bucket: 'global[]', @@ -599,7 +599,7 @@ bucket_definitions: { op: 'REMOVE', object_id: 'test1', checksum: c2 } ]); - const checksums = await storage.getChecksums(checkpoint, ['global[]']); + const checksums = await storage.getChecksums(checkpoint, null, ['global[]']); expect(checksums).toEqual([ { bucket: 'global[]', @@ -713,7 +713,7 @@ bucket_definitions: { op: 'REMOVE', object_id: 'test1', checksum: c2 } ]); - const checksums = await storage.getChecksums(checkpoint, ['global[]']); + const checksums = await storage.getChecksums(checkpoint, null, ['global[]']); expect(checksums).toEqual([ { bucket: 'global[]', From 7812eab9047e2c56e129eec2c57c873520f99300 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Jun 2024 15:47:23 +0200 Subject: [PATCH 02/15] Fix more tests. --- packages/service-core/test/src/large_batch.test.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/service-core/test/src/large_batch.test.ts b/packages/service-core/test/src/large_batch.test.ts index 806fe8934..87b67bf64 100644 --- a/packages/service-core/test/src/large_batch.test.ts +++ b/packages/service-core/test/src/large_batch.test.ts @@ -49,7 +49,7 @@ function defineBatchTests(factory: StorageFactory) { const checkpoint = await context.getCheckpoint({ timeout: 100_000 }); const duration = Date.now() - start; const used = Math.round(process.memoryUsage().heapUsed / 1024 / 1024); - const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']); + const checksum = await context.storage!.getChecksums(checkpoint, null, ['global[]']); expect(checksum[0].count).toEqual(operation_count); const perSecond = Math.round((operation_count / duration) * 1000); console.log(`${operation_count} ops in ${duration}ms ${perSecond} ops/s. ${used}MB heap`); @@ -100,7 +100,7 @@ function defineBatchTests(factory: StorageFactory) { const checkpoint = await context.getCheckpoint({ timeout: 100_000 }); const duration = Date.now() - start; - const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']); + const checksum = await context.storage!.getChecksums(checkpoint, null, ['global[]']); expect(checksum[0].count).toEqual(operation_count); const perSecond = Math.round((operation_count / duration) * 1000); console.log(`${operation_count} ops in ${duration}ms ${perSecond} ops/s.`); @@ -156,7 +156,7 @@ function defineBatchTests(factory: StorageFactory) { const checkpoint = await context.getCheckpoint({ timeout: 50_000 }); const duration = Date.now() - start; const used = Math.round(process.memoryUsage().heapUsed / 1024 / 1024); - const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']); + const checksum = await context.storage!.getChecksums(checkpoint, null, ['global[]']); expect(checksum[0].count).toEqual(operationCount); const perSecond = Math.round((operationCount / duration) * 1000); // This number depends on the test machine, so we keep the test significantly @@ -173,7 +173,7 @@ function defineBatchTests(factory: StorageFactory) { const checkpoint2 = await context.getCheckpoint({ timeout: 20_000 }); const truncateDuration = Date.now() - truncateStart; - const checksum2 = await context.storage!.getChecksums(checkpoint2, ['global[]']); + const checksum2 = await context.storage!.getChecksums(checkpoint2, null, ['global[]']); const truncateCount = checksum2[0].count - checksum[0].count; expect(truncateCount).toEqual(numTransactions * perTransaction); const truncatePerSecond = Math.round((truncateCount / truncateDuration) * 1000); From c8425324d920f67b059a1caad4fe9714b8f0e75a Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 11 Jun 2024 22:13:37 +0200 Subject: [PATCH 03/15] WIP: checksum cache. --- .../service-core/src/storage/ChecksumCache.ts | 197 ++++++++++++++++++ packages/service-core/src/util/utils.ts | 22 +- 2 files changed, 214 insertions(+), 5 deletions(-) create mode 100644 packages/service-core/src/storage/ChecksumCache.ts diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts new file mode 100644 index 000000000..731a8edfe --- /dev/null +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -0,0 +1,197 @@ +import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import { addBucketChecksums } from '@/util/utils.js'; +import { LRUCache } from 'lru-cache/min'; + +interface CheckpointEntry { + refs: Set; + cache: LRUCache; +} + +interface ChecksumFetchContext { + fetch(bucket: string): Promise; +} + +export interface FetchPartialBucketChecksum { + bucket: string; + start?: OpId; + end: OpId; +} + +export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise>; + +export class ChecksumCache { + private nextRefId = 1; + private checkpoints = new Map(); + + constructor(private fetchChecksums: FetchChecksums) {} + + async lock(checkpoint: OpId) { + const ref = this.nextRefId++; + + const existing = this.checkpoints.get(checkpoint); + if (existing != null) { + existing.refs.add(ref); + } else { + const entry: CheckpointEntry = { + refs: new Set([ref]), + cache: new LRUCache({ + maxSize: 10_000, + fetchMethod: async (bucket, staleValue, options) => { + return options.context.fetch(bucket); + } + }) + }; + this.checkpoints.set(checkpoint, entry); + } + + return () => { + const entry = this.checkpoints.get(checkpoint); + if (entry == null) { + return; + } + entry.refs.delete(ref); + if (entry.refs.size == 0) { + this.checkpoints.delete(checkpoint); + } + }; + } + + async getChecksums(checkpoint: OpId, buckets: string[]) { + let toFetch = new Set(); + let fetchResults = new Map(); + let resolveFetch!: () => void; + let rejectFetch!: (err: any) => void; + let fetchPromise = new Promise((resolve, reject) => { + resolveFetch = resolve; + rejectFetch = reject; + }); + + let entry = this.checkpoints.get(checkpoint); + if (entry == null) { + // TODO: throw new Error(`No checkpoint cache for ${checkpoint}`); + // Temporary: auto-create cache + entry = { + refs: new Set([]), + cache: new LRUCache({ + maxSize: 10_000, + fetchMethod: async (bucket, staleValue, options) => { + return options.context.fetch(bucket); + } + }) + }; + this.checkpoints.set(checkpoint, entry); + } + + let finalResults: BucketChecksum[] = []; + + const context: ChecksumFetchContext = { + async fetch(bucket) { + if (!toFetch.has(bucket)) { + throw new Error(`Expected to fetch ${bucket}`); + } + await fetchPromise; + const checksum = fetchResults.get(bucket); + if (checksum == null) { + throw new Error(`Failed to fetch checksum for bucket ${bucket}`); + } + return checksum; + } + }; + + let promises: Promise[] = []; + + try { + for (let bucket of buckets) { + let status: LRUCache.Status = {}; + const p = entry.cache.fetch(bucket, { context: context, status: status }).then((checksums) => { + if (checksums == null) { + throw new Error(`Failed to get checksums for ${bucket}`); + } + finalResults.push(checksums); + }); + promises.push(p); + if (status.fetch == 'hit' || status.fetch == 'inflight') { + // No need to fetch now + } else { + toFetch.add(bucket); + } + } + + if (toFetch.size == 0) { + // Nothing to fetch, but resolve in case + resolveFetch(); + } else { + // Find smaller checkpoints, sorted in descending order + const checkpoints = [...this.checkpoints.keys()] + .filter((other) => BigInt(other) < BigInt(checkpoint)) + .sort((a, b) => { + if (a == b) { + return 0; + } else if (BigInt(a) < BigInt(b)) { + return 1; + } else { + return -1; + } + }); + + let bucketRequests: FetchPartialBucketChecksum[] = []; + let add = new Map(); + + for (let bucket of toFetch) { + let bucketRequest: FetchPartialBucketChecksum | null = null; + for (let cp of checkpoints) { + const entry = this.checkpoints.get(cp); + if (entry == null) { + throw new Error(`Cannot find cached checkpoint ${cp}`); + } + + const cached = entry.cache.get(bucket); + if (cached != null) { + bucketRequest = { + bucket, + start: cp, + end: checkpoint + }; + add.set(bucket, cached); + break; + } + } + + if (bucketRequest == null) { + bucketRequest = { + bucket, + end: checkpoint + }; + add.set(bucket, { + bucket, + checksum: 0, + count: 0 + }); + } + bucketRequests.push(bucketRequest); + } + + const results = await this.fetchChecksums(bucketRequests); + for (let bucket of toFetch) { + const result = results.get(bucket); + const toAdd = add.get(bucket); + if (toAdd == null) { + throw new Error(`toAdd null for ${bucket}`); + } + const added = addBucketChecksums(toAdd, result ?? null); + fetchResults.set(bucket, added); + } + resolveFetch(); + } + } catch (e) { + rejectFetch(e); + throw e; + } + + await Promise.all(promises); + if (finalResults.length != buckets.length) { + throw new Error(`Bucket results mismatch: ${finalResults.length} != ${buckets.length}`); + } + return finalResults; + } +} diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index d5533e8da..4b881d181 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -72,11 +72,7 @@ export function checksumsDiff( nextBuckets.set(cdiff.bucket, cdiff); } else { // Updated - const checksum: BucketChecksum = { - bucket: cdiff.bucket, - count: p.count + cdiff.count, - checksum: (p.checksum + cdiff.checksum) & 0xffffffff - }; + const checksum: BucketChecksum = addBucketChecksums(p, cdiff); updatedBuckets.set(checksum.bucket, checksum); nextBuckets.set(checksum.bucket, checksum); previousBuckets.delete(cdiff.bucket); @@ -112,6 +108,22 @@ export function checksumsDiff( }; } +export function addChecksums(a: number, b: number) { + return (a + b) & 0xffffffff; +} + +export function addBucketChecksums(a: BucketChecksum, b: BucketChecksum | null): BucketChecksum { + if (b == null) { + return a; + } else { + return { + bucket: a.bucket, + count: a.count + b.count, + checksum: addChecksums(a.checksum, b.checksum) + }; + } +} + export async function getClientCheckpoint( db: pgwire.PgClient, bucketStorage: storage.BucketStorageFactory, From 5af5926ce0a1515edb25e4e3a34c6207477b2693 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Wed, 12 Jun 2024 14:00:51 +0200 Subject: [PATCH 04/15] Add a new checksum cache implementation that doesn't require locking checkpoints. --- packages/service-core/package.json | 7 +- .../service-core/src/storage/ChecksumCache.ts | 24 +- .../src/storage/ChecksumCacheTwo.ts | 205 ++++++++++++++ .../test/src/checksum_cache.test.ts | 256 ++++++++++++++++++ pnpm-lock.yaml | 8 + service/package.json | 12 +- 6 files changed, 497 insertions(+), 15 deletions(-) create mode 100644 packages/service-core/src/storage/ChecksumCacheTwo.ts create mode 100644 packages/service-core/test/src/checksum_cache.test.ts diff --git a/packages/service-core/package.json b/packages/service-core/package.json index 65fce0e2e..a41d0bd6f 100644 --- a/packages/service-core/package.json +++ b/packages/service-core/package.json @@ -18,16 +18,17 @@ "dependencies": { "@journeyapps-platform/micro": "^17.0.1", "@journeyapps-platform/micro-migrate": "^4.0.1", + "@js-sdsl/ordered-set": "^4.4.2", "@opentelemetry/api": "~1.8.0", - "@opentelemetry/resources": "^1.24.1", - "@opentelemetry/exporter-prometheus": "^0.51.1", "@opentelemetry/exporter-metrics-otlp-http": "^0.51.1", + "@opentelemetry/exporter-prometheus": "^0.51.1", + "@opentelemetry/resources": "^1.24.1", "@opentelemetry/sdk-metrics": "1.24.1", "@powersync/service-jpgwire": "workspace:*", "@powersync/service-jsonbig": "workspace:*", "@powersync/service-rsocket-router": "workspace:*", - "@powersync/service-types": "workspace:*", "@powersync/service-sync-rules": "workspace:*", + "@powersync/service-types": "workspace:*", "async-mutex": "^0.5.0", "bson": "^6.6.0", "commander": "^12.0.0", diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 731a8edfe..36ec9185f 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -19,11 +19,23 @@ export interface FetchPartialBucketChecksum { export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise>; -export class ChecksumCache { +export interface ChecksumCacheOptions { + fetchChecksums: FetchChecksums; + maxSize?: number; +} + +export interface ChecksumCacheInterface { + getChecksums(checkpoint: OpId, buckets: string[]): Promise; +} + +export class ChecksumCache implements ChecksumCacheInterface { private nextRefId = 1; private checkpoints = new Map(); + private fetchChecksums: FetchChecksums; - constructor(private fetchChecksums: FetchChecksums) {} + constructor(options: ChecksumCacheOptions) { + this.fetchChecksums = options.fetchChecksums; + } async lock(checkpoint: OpId) { const ref = this.nextRefId++; @@ -35,7 +47,7 @@ export class ChecksumCache { const entry: CheckpointEntry = { refs: new Set([ref]), cache: new LRUCache({ - maxSize: 10_000, + max: 10_000, fetchMethod: async (bucket, staleValue, options) => { return options.context.fetch(bucket); } @@ -73,7 +85,7 @@ export class ChecksumCache { entry = { refs: new Set([]), cache: new LRUCache({ - maxSize: 10_000, + max: 10_000, fetchMethod: async (bucket, staleValue, options) => { return options.context.fetch(bucket); } @@ -86,10 +98,10 @@ export class ChecksumCache { const context: ChecksumFetchContext = { async fetch(bucket) { + await fetchPromise; if (!toFetch.has(bucket)) { throw new Error(`Expected to fetch ${bucket}`); } - await fetchPromise; const checksum = fetchResults.get(bucket); if (checksum == null) { throw new Error(`Failed to fetch checksum for bucket ${bucket}`); @@ -145,7 +157,7 @@ export class ChecksumCache { throw new Error(`Cannot find cached checkpoint ${cp}`); } - const cached = entry.cache.get(bucket); + const cached = entry.cache.peek(bucket); if (cached != null) { bucketRequest = { bucket, diff --git a/packages/service-core/src/storage/ChecksumCacheTwo.ts b/packages/service-core/src/storage/ChecksumCacheTwo.ts new file mode 100644 index 000000000..c4277da72 --- /dev/null +++ b/packages/service-core/src/storage/ChecksumCacheTwo.ts @@ -0,0 +1,205 @@ +import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import { addBucketChecksums } from '@/util/utils.js'; +import { LRUCache } from 'lru-cache/min'; +import { OrderedSet } from '@js-sdsl/ordered-set'; +import { ChecksumCacheInterface } from './ChecksumCache.js'; + +interface ChecksumFetchContext { + fetch(bucket: string): Promise; + checkpoint: bigint; +} + +export interface FetchPartialBucketChecksum { + bucket: string; + start?: OpId; + end: OpId; +} + +export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise>; + +export interface ChecksumCacheOptions { + fetchChecksums: FetchChecksums; + maxSize?: number; +} + +// Approximately 5MB of memory, if we assume 50 bytes per entry +const DEFAULT_MAX_SIZE = 100_000; + +/** + * Implement a LRU cache for checksum requests. Each (bucket, checkpoint) request is cached separately, + * while the lookups occur in batches. + * + * For each bucket, we keep a separate OrderedSet of cached checkpoints. + * This allows us to do incrementally update checksums by using the last cached checksum for the same bucket. + * + * We use the LRUCache fetchMethod to deduplicate in-progress requests. + */ +export class ChecksumCache implements ChecksumCacheInterface { + /** + * The primary checksum cache, with key of `${checkpoint}/${bucket}`. + */ + private cache: LRUCache; + + private bucketCheckpoints = new Map>(); + private fetchChecksums: FetchChecksums; + + constructor(options: ChecksumCacheOptions) { + this.fetchChecksums = options.fetchChecksums; + + this.cache = new LRUCache({ + max: options.maxSize ?? DEFAULT_MAX_SIZE, + fetchMethod: async (cacheKey, _staleValue, options) => { + const split = cacheKey.indexOf('/'); + const bucket = cacheKey.substring(split + 1); + + const result = await options.context.fetch(bucket); + + let checkpointSet = this.bucketCheckpoints.get(bucket); + if (checkpointSet == null) { + checkpointSet = new OrderedSet(); + this.bucketCheckpoints.set(bucket, checkpointSet); + } + checkpointSet.insert(options.context.checkpoint); + return result; + }, + + disposeAfter: (value, key) => { + const split = key.indexOf('/'); + const checkpointString = key.substring(0, split); + const checkpoint = BigInt(checkpointString); + const checkpointSet = this.bucketCheckpoints.get(value.bucket); + if (checkpointSet == null) { + return; + } + checkpointSet.eraseElementByKey(checkpoint); + if (checkpointSet.length == 0) { + this.bucketCheckpoints.delete(value.bucket); + } + }, + + noDisposeOnSet: true + }); + } + + async getChecksums(checkpoint: OpId, buckets: string[]) { + let toFetch = new Set(); + let fetchResults = new Map(); + let resolveFetch!: () => void; + let rejectFetch!: (err: any) => void; + let fetchPromise = new Promise((resolve, reject) => { + resolveFetch = resolve; + rejectFetch = reject; + }); + + let finalResults: BucketChecksum[] = []; + + const context: ChecksumFetchContext = { + async fetch(bucket) { + await fetchPromise; + if (!toFetch.has(bucket)) { + // Should never happen + throw new Error(`Expected to fetch ${bucket}`); + } + const checksum = fetchResults.get(bucket); + if (checksum == null) { + // Should never happen + throw new Error(`Failed to fetch checksum for bucket ${bucket}`); + } + return checksum; + }, + checkpoint: BigInt(checkpoint) + }; + + let promises: Promise[] = []; + + try { + for (let bucket of buckets) { + const cacheKey = `${checkpoint}/${bucket}`; + let status: LRUCache.Status = {}; + const p = this.cache.fetch(cacheKey, { context: context, status: status }).then((checksums) => { + if (checksums == null) { + // Should never happen + throw new Error(`Failed to get checksums for ${cacheKey}`); + } + finalResults.push(checksums); + }); + promises.push(p); + if (status.fetch == 'hit' || status.fetch == 'inflight') { + // No need to fetch now + } else { + toFetch.add(bucket); + } + } + + if (toFetch.size == 0) { + // Nothing to fetch, but resolve in case + resolveFetch(); + } else { + // Find smaller checkpoints, sorted in descending order + + let bucketRequests: FetchPartialBucketChecksum[] = []; + let add = new Map(); + + for (let bucket of toFetch) { + let bucketRequest: FetchPartialBucketChecksum | null = null; + const checkpointSet = this.bucketCheckpoints.get(bucket); + if (checkpointSet != null) { + const iter = checkpointSet.reverseUpperBound(context.checkpoint - 1n); + while (iter.isAccessible()) { + const cp = iter.pointer; + const cacheKey = `${cp}/${bucket}`; + // peek to avoid refreshing the key + const cached = this.cache.peek(cacheKey); + if (cached != null) { + bucketRequest = { + bucket, + start: cp.toString(), + end: checkpoint + }; + add.set(bucket, cached); + break; + } + + iter.next(); + } + } + + if (bucketRequest == null) { + bucketRequest = { + bucket, + end: checkpoint + }; + add.set(bucket, { + bucket, + checksum: 0, + count: 0 + }); + } + bucketRequests.push(bucketRequest); + } + + const results = await this.fetchChecksums(bucketRequests); + for (let bucket of toFetch) { + const result = results.get(bucket); + const toAdd = add.get(bucket); + if (toAdd == null) { + // Should never happen + throw new Error(`toAdd null for ${bucket}`); + } + const added = addBucketChecksums(toAdd, result ?? null); + fetchResults.set(bucket, added); + } + resolveFetch(); + } + } catch (e) { + rejectFetch(e); + throw e; + } + + await Promise.all(promises); + if (finalResults.length != buckets.length) { + throw new Error(`Bucket results mismatch: ${finalResults.length} != ${buckets.length}`); + } + return finalResults; + } +} diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts new file mode 100644 index 000000000..b393b9385 --- /dev/null +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -0,0 +1,256 @@ +import { describe, expect, it } from 'vitest'; +import { + ChecksumCache, + ChecksumCacheInterface, + FetchChecksums, + FetchPartialBucketChecksum +} from '../../src/storage/ChecksumCache.js'; +import { ChecksumCache as ChecksumCacheTwo } from '../../src/storage/ChecksumCacheTwo.js'; +import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import * as crypto from 'node:crypto'; +import { addBucketChecksums } from '@/util/util-index.js'; + +type CachsumCacheFactory = (fetch: FetchChecksums) => ChecksumCacheInterface; + +describe('checksum cache 1', function () { + defineChecksumCacheTests((f) => new ChecksumCache({ fetchChecksums: f })); +}); + +describe('checksum cache 2', function () { + defineChecksumCacheTests((f) => new ChecksumCacheTwo({ fetchChecksums: f })); +}); + +/** + * Create a deterministic BucketChecksum based on the bucket name and checkpoint for testing purposes. + */ +function testHash(bucket: string, checkpoint: OpId) { + const key = `${checkpoint}/${bucket}`; + const hash = crypto.createHash('sha256').update(key).digest().readInt32LE(0); + return hash; +} + +function testPartialHash(request: FetchPartialBucketChecksum): BucketChecksum { + if (request.start) { + const a = testHash(request.bucket, request.start); + const b = testHash(request.bucket, request.end); + return addBucketChecksums( + { + bucket: request.bucket, + checksum: b, + count: Number(request.end) + }, + { + // Subtract a + bucket: request.bucket, + checksum: -a, + count: -Number(request.start) + } + ); + } else { + return { + bucket: request.bucket, + checksum: testHash(request.bucket, request.end), + count: Number(request.end) + }; + } +} + +const TEST_123 = { + bucket: 'test', + count: 123, + checksum: 1104081737 +}; + +const TEST_1234 = { + bucket: 'test', + count: 1234, + checksum: -1593864957 +}; + +const TEST2_123 = { + bucket: 'test2', + count: 123, + checksum: 1741377449 +}; + +const TEST3_123 = { + bucket: 'test3', + count: 123, + checksum: -2085080402 +}; + +function fetchTestChecksums(batch: FetchPartialBucketChecksum[]) { + return new Map( + batch.map((v) => { + return [v.bucket, testPartialHash(v)]; + }) + ); +} + +function defineChecksumCacheTests(factory: CachsumCacheFactory) { + it('should handle a sequential lookups (a)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + + expect(await cache.getChecksums('1234', ['test'])).toEqual([TEST_1234]); + + expect(await cache.getChecksums('123', ['test2'])).toEqual([TEST2_123]); + + expect(lookups).toEqual([ + [{ bucket: 'test', end: '123' }], + // This should use the previous lookup + [{ bucket: 'test', start: '123', end: '1234' }], + [{ bucket: 'test2', end: '123' }] + ]); + }); + + it('should handle a sequential lookups (b)', async function () { + // Reverse order of the above + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test2'])).toEqual([TEST2_123]); + + expect(await cache.getChecksums('1234', ['test'])).toEqual([TEST_1234]); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + + expect(lookups).toEqual([ + // With this order, there is no option for a partial lookup + [{ bucket: 'test2', end: '123' }], + [{ bucket: 'test', end: '1234' }], + [{ bucket: 'test', end: '123' }] + ]); + }); + + it('should handle a concurrent lookups (a)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + const p1 = cache.getChecksums('123', ['test']); + const p2 = cache.getChecksums('1234', ['test']); + const p3 = cache.getChecksums('123', ['test2']); + + expect(await p1).toEqual([TEST_123]); + expect(await p2).toEqual([TEST_1234]); + expect(await p3).toEqual([TEST2_123]); + + // Concurrent requests, so we can't do a partial lookup for 123 -> 1234 + expect(lookups).toEqual([ + [{ bucket: 'test', end: '123' }], + [{ bucket: 'test', end: '1234' }], + [{ bucket: 'test2', end: '123' }] + ]); + }); + + it('should handle a concurrent lookups (b)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + const p1 = cache.getChecksums('123', ['test']); + const p2 = cache.getChecksums('123', ['test']); + + expect(await p1).toEqual([TEST_123]); + + expect(await p2).toEqual([TEST_123]); + + // The lookup should be deduplicated, even though it's in progress + expect(lookups).toEqual([[{ bucket: 'test', end: '123' }]]); + }); + + it('should handle serial + concurrent lookups', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + + const p2 = cache.getChecksums('1234', ['test']); + const p3 = cache.getChecksums('1234', ['test']); + + expect(await p2).toEqual([TEST_1234]); + expect(await p3).toEqual([TEST_1234]); + + expect(lookups).toEqual([ + [{ bucket: 'test', end: '123' }], + // This lookup is deduplicated + [{ bucket: 'test', start: '123', end: '1234' }] + ]); + }); + + it('should handle multiple buckets', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); + + expect(lookups).toEqual([ + [ + // Both lookups in the same request + { bucket: 'test', end: '123' }, + { bucket: 'test2', end: '123' } + ] + ]); + }); + + it('should handle multiple buckets with partial caching (a)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums('123', ['test', 'test2'])).toEqual([TEST_123, TEST2_123]); + + expect(lookups).toEqual([ + // Request 1 + [{ bucket: 'test', end: '123' }], + // Request 2 + [{ bucket: 'test2', end: '123' }] + ]); + }); + + it('should handle multiple buckets with partial caching (b)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + const a = cache.getChecksums('123', ['test', 'test2']); + const b = cache.getChecksums('123', ['test2', 'test3']); + + expect(await a).toEqual([TEST_123, TEST2_123]); + expect(await b).toEqual([TEST2_123, TEST3_123]); + + expect(lookups).toEqual([ + // Request a + [ + { bucket: 'test', end: '123' }, + { bucket: 'test2', end: '123' } + ], + // Request b (re-uses the checksum for test2 from request a) + [{ bucket: 'test3', end: '123' }] + ]); + }); +} diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 985490cd5..fd614e254 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -132,6 +132,9 @@ importers: '@journeyapps-platform/micro-migrate': specifier: ^4.0.1 version: 4.0.1(socks@2.8.3) + '@js-sdsl/ordered-set': + specifier: ^4.4.2 + version: 4.4.2 '@opentelemetry/api': specifier: ~1.8.0 version: 1.8.0 @@ -785,6 +788,9 @@ packages: '@js-sdsl/ordered-map@4.4.2': resolution: {integrity: sha512-iUKgm52T8HOE/makSxjqoWhe95ZJA1/G1sYsGev2JDKUSS14KAgg1LHb+Ba+IPow0xflbnSkOsZcO08C7w1gYw==} + '@js-sdsl/ordered-set@4.4.2': + resolution: {integrity: sha512-ieYQ8WlBPKYzEo81H3q0DFbd8WtFRXXABb4+vRCF0AO3WWtJZFxYvRGdipUXGrd6tlSySmqhcPuO3J6SCodCxg==} + '@jsdevtools/ono@7.1.3': resolution: {integrity: sha512-4JQNk+3mVzK3xh2rqd6RB4J46qUR19azEHBneZyTZM+c456qOrbbM/5xcR8huNCCcbVt7+UmizG6GuUvPvKUYg==} @@ -5449,6 +5455,8 @@ snapshots: '@js-sdsl/ordered-map@4.4.2': {} + '@js-sdsl/ordered-set@4.4.2': {} + '@jsdevtools/ono@7.1.3': {} '@ljharb/through@2.3.13': diff --git a/service/package.json b/service/package.json index f692cc309..c0804f100 100644 --- a/service/package.json +++ b/service/package.json @@ -13,15 +13,15 @@ "@fastify/cors": "8.4.1", "@journeyapps-platform/micro": "^17.0.1", "@journeyapps-platform/micro-migrate": "^4.0.1", - "@powersync/service-types": "workspace:*", - "@powersync/service-jpgwire": "workspace:*", - "@powersync/service-jsonbig": "workspace:*", - "@powersync/service-sync-rules": "workspace:*", - "@powersync/service-rsocket-router": "workspace:*", - "@powersync/service-core": "workspace:*", "@opentelemetry/api": "~1.6.0", "@opentelemetry/exporter-prometheus": "^0.43.0", "@opentelemetry/sdk-metrics": "^1.17.0", + "@powersync/service-core": "workspace:*", + "@powersync/service-jpgwire": "workspace:*", + "@powersync/service-jsonbig": "workspace:*", + "@powersync/service-rsocket-router": "workspace:*", + "@powersync/service-sync-rules": "workspace:*", + "@powersync/service-types": "workspace:*", "async-mutex": "^0.5.0", "bson": "^6.6.0", "commander": "^12.0.0", From 8cb01f4865e5f427c5dfc91ff63a7d374e874ef4 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 09:34:34 +0200 Subject: [PATCH 05/15] Cache fixes. --- .../src/storage/ChecksumCacheTwo.ts | 34 ++++--- .../test/src/checksum_cache.test.ts | 89 ++++++++++++++++++- 2 files changed, 111 insertions(+), 12 deletions(-) diff --git a/packages/service-core/src/storage/ChecksumCacheTwo.ts b/packages/service-core/src/storage/ChecksumCacheTwo.ts index c4277da72..3fcef2be6 100644 --- a/packages/service-core/src/storage/ChecksumCacheTwo.ts +++ b/packages/service-core/src/storage/ChecksumCacheTwo.ts @@ -49,9 +49,7 @@ export class ChecksumCache implements ChecksumCacheInterface { this.cache = new LRUCache({ max: options.maxSize ?? DEFAULT_MAX_SIZE, fetchMethod: async (cacheKey, _staleValue, options) => { - const split = cacheKey.indexOf('/'); - const bucket = cacheKey.substring(split + 1); - + const { bucket } = parseCacheKey(cacheKey); const result = await options.context.fetch(bucket); let checkpointSet = this.bucketCheckpoints.get(bucket); @@ -63,9 +61,8 @@ export class ChecksumCache implements ChecksumCacheInterface { return result; }, - disposeAfter: (value, key) => { - const split = key.indexOf('/'); - const checkpointString = key.substring(0, split); + dispose: (value, key) => { + const { checkpointString } = parseCacheKey(key); const checkpoint = BigInt(checkpointString); const checkpointSet = this.bucketCheckpoints.get(value.bucket); if (checkpointSet == null) { @@ -114,7 +111,7 @@ export class ChecksumCache implements ChecksumCacheInterface { try { for (let bucket of buckets) { - const cacheKey = `${checkpoint}/${bucket}`; + const cacheKey = makeCacheKey(checkpoint, bucket); let status: LRUCache.Status = {}; const p = this.cache.fetch(cacheKey, { context: context, status: status }).then((checksums) => { if (checksums == null) { @@ -144,12 +141,17 @@ export class ChecksumCache implements ChecksumCacheInterface { let bucketRequest: FetchPartialBucketChecksum | null = null; const checkpointSet = this.bucketCheckpoints.get(bucket); if (checkpointSet != null) { - const iter = checkpointSet.reverseUpperBound(context.checkpoint - 1n); + let iter = checkpointSet.reverseUpperBound(context.checkpoint); + const begin = checkpointSet.begin(); while (iter.isAccessible()) { const cp = iter.pointer; - const cacheKey = `${cp}/${bucket}`; + const cacheKey = makeCacheKey(cp, bucket); // peek to avoid refreshing the key const cached = this.cache.peek(cacheKey); + // As long as dispose() works correctly, the checkpointset should + // match up with the cache, and `cached` should also have a value here. + // However, we handle caces where it's not present either way. + // Test by disabling the `dispose()` callback. if (cached != null) { bucketRequest = { bucket, @@ -160,7 +162,10 @@ export class ChecksumCache implements ChecksumCacheInterface { break; } - iter.next(); + if (iter.equals(begin)) { + break; + } + iter = iter.pre(); } } @@ -203,3 +208,12 @@ export class ChecksumCache implements ChecksumCacheInterface { return finalResults; } } + +function makeCacheKey(checkpoint: bigint | string, bucket: string) { + return `${checkpoint}/${bucket}`; +} + +function parseCacheKey(key: string) { + const index = key.indexOf('/'); + return { checkpointString: key.substring(0, index), bucket: key.substring(index + 1) }; +} diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index b393b9385..19fc95960 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -244,13 +244,98 @@ function defineChecksumCacheTests(factory: CachsumCacheFactory) { expect(await b).toEqual([TEST2_123, TEST3_123]); expect(lookups).toEqual([ - // Request a + // Request A [ { bucket: 'test', end: '123' }, { bucket: 'test2', end: '123' } ], - // Request b (re-uses the checksum for test2 from request a) + // Request B (re-uses the checksum for test2 from request a) [{ bucket: 'test3', end: '123' }] ]); }); + + it('should handle out-of-order requests', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + + expect(await cache.getChecksums('125', ['test'])).toEqual([ + { + bucket: 'test', + checksum: -1865121912, + count: 125 + } + ]); + + expect(await cache.getChecksums('124', ['test'])).toEqual([ + { + bucket: 'test', + checksum: 1887460431, + count: 124 + } + ]); + expect(lookups).toEqual([ + [{ bucket: 'test', end: '123' }], + [{ bucket: 'test', start: '123', end: '125' }], + [{ bucket: 'test', start: '123', end: '124' }] + ]); + }); } + +describe('cache limit tests', function () { + it('should use maxSize', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = new ChecksumCacheTwo({ + fetchChecksums: async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }, + maxSize: 2 + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + expect(await cache.getChecksums('124', ['test'])).toEqual([ + { + bucket: 'test', + checksum: 1887460431, + count: 124 + } + ]); + + expect(await cache.getChecksums('125', ['test'])).toEqual([ + { + bucket: 'test', + checksum: -1865121912, + count: 125 + } + ]); + expect(await cache.getChecksums('126', ['test'])).toEqual([ + { + bucket: 'test', + checksum: -1720007310, + count: 126 + } + ]); + expect(await cache.getChecksums('124', ['test'])).toEqual([ + { + bucket: 'test', + checksum: 1887460431, + count: 124 + } + ]); + expect(await cache.getChecksums('123', ['test'])).toEqual([TEST_123]); + + expect(lookups).toEqual([ + [{ bucket: 'test', end: '123' }], + [{ bucket: 'test', start: '123', end: '124' }], + [{ bucket: 'test', start: '124', end: '125' }], + [{ bucket: 'test', start: '125', end: '126' }], + [{ bucket: 'test', end: '124' }], + [{ bucket: 'test', end: '123' }] + ]); + }); +}); From c19390379d4355233ad3e83b0ed00c58db9ce2b5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 09:50:12 +0200 Subject: [PATCH 06/15] Test and fix error handling. --- .../service-core/src/storage/ChecksumCache.ts | 15 ++++--- .../src/storage/ChecksumCacheTwo.ts | 16 +++++--- .../test/src/checksum_cache.test.ts | 40 +++++++++++++++++++ 3 files changed, 60 insertions(+), 11 deletions(-) diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 36ec9185f..56f14bce3 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -94,7 +94,7 @@ export class ChecksumCache implements ChecksumCacheInterface { this.checkpoints.set(checkpoint, entry); } - let finalResults: BucketChecksum[] = []; + let finalResults = new Map(); const context: ChecksumFetchContext = { async fetch(bucket) { @@ -119,7 +119,7 @@ export class ChecksumCache implements ChecksumCacheInterface { if (checksums == null) { throw new Error(`Failed to get checksums for ${bucket}`); } - finalResults.push(checksums); + finalResults.set(bucket, checksums); }); promises.push(p); if (status.fetch == 'hit' || status.fetch == 'inflight') { @@ -197,13 +197,18 @@ export class ChecksumCache implements ChecksumCacheInterface { } } catch (e) { rejectFetch(e); + + // Wait for the above rejection to propagate, otherwise we end up with "uncaught" errors. + await Promise.all(promises).catch((_e) => {}); + throw e; } await Promise.all(promises); - if (finalResults.length != buckets.length) { - throw new Error(`Bucket results mismatch: ${finalResults.length} != ${buckets.length}`); + if (finalResults.size != buckets.length) { + throw new Error(`Bucket results mismatch: ${finalResults.size} != ${buckets.length}`); } - return finalResults; + // Return results in the same order as the request + return buckets.map((bucket) => finalResults.get(bucket)!); } } diff --git a/packages/service-core/src/storage/ChecksumCacheTwo.ts b/packages/service-core/src/storage/ChecksumCacheTwo.ts index 3fcef2be6..211a0232a 100644 --- a/packages/service-core/src/storage/ChecksumCacheTwo.ts +++ b/packages/service-core/src/storage/ChecksumCacheTwo.ts @@ -78,7 +78,7 @@ export class ChecksumCache implements ChecksumCacheInterface { }); } - async getChecksums(checkpoint: OpId, buckets: string[]) { + async getChecksums(checkpoint: OpId, buckets: string[]): Promise { let toFetch = new Set(); let fetchResults = new Map(); let resolveFetch!: () => void; @@ -88,7 +88,7 @@ export class ChecksumCache implements ChecksumCacheInterface { rejectFetch = reject; }); - let finalResults: BucketChecksum[] = []; + let finalResults = new Map(); const context: ChecksumFetchContext = { async fetch(bucket) { @@ -118,7 +118,7 @@ export class ChecksumCache implements ChecksumCacheInterface { // Should never happen throw new Error(`Failed to get checksums for ${cacheKey}`); } - finalResults.push(checksums); + finalResults.set(bucket, checksums); }); promises.push(p); if (status.fetch == 'hit' || status.fetch == 'inflight') { @@ -198,14 +198,18 @@ export class ChecksumCache implements ChecksumCacheInterface { } } catch (e) { rejectFetch(e); + + // Wait for the above rejection to propagate, otherwise we end up with "uncaught" errors. + await Promise.all(promises).catch((_e) => {}); + throw e; } await Promise.all(promises); - if (finalResults.length != buckets.length) { - throw new Error(`Bucket results mismatch: ${finalResults.length} != ${buckets.length}`); + if (finalResults.size != buckets.length) { + throw new Error(`Bucket results mismatch: ${finalResults.size} != ${buckets.length}`); } - return finalResults; + return buckets.map((bucket) => finalResults.get(bucket)!); } } diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index 19fc95960..9f468c877 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -284,6 +284,46 @@ function defineChecksumCacheTests(factory: CachsumCacheFactory) { [{ bucket: 'test', start: '123', end: '124' }] ]); }); + + it('should handle errors', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const TEST_ERROR = new Error('Simulated error'); + const cache = factory(async (batch) => { + lookups.push(batch); + if (lookups.length == 1) { + throw new Error('Simulated error'); + } + return fetchTestChecksums(batch); + }); + + const a = cache.getChecksums('123', ['test', 'test2']); + const b = cache.getChecksums('123', ['test2', 'test3']); + + await expect(a).rejects.toEqual(TEST_ERROR); + await expect(b).rejects.toEqual(TEST_ERROR); + + const a2 = cache.getChecksums('123', ['test', 'test2']); + const b2 = cache.getChecksums('123', ['test2', 'test3']); + + expect(await a2).toEqual([TEST_123, TEST2_123]); + expect(await b2).toEqual([TEST2_123, TEST3_123]); + + expect(lookups).toEqual([ + // Request A (fails) + [ + { bucket: 'test', end: '123' }, + { bucket: 'test2', end: '123' } + ], + // Request B (re-uses the checksum for test2 from request a) + // Even thought the full request fails, this batch succeeds + [{ bucket: 'test3', end: '123' }], + // Retry request A + [ + { bucket: 'test', end: '123' }, + { bucket: 'test2', end: '123' } + ] + ]); + }); } describe('cache limit tests', function () { From b5aea31002fb7cccc2c4bea12d35f6495895025b Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 09:55:02 +0200 Subject: [PATCH 07/15] More tests. --- .../test/src/checksum_cache.test.ts | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index 9f468c877..bbd81d4c3 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -324,6 +324,39 @@ function defineChecksumCacheTests(factory: CachsumCacheFactory) { ] ]); }); + + it('should handle missing checksums (a)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch.filter((b) => b.bucket != 'test')); + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([{ bucket: 'test', checksum: 0, count: 0 }]); + expect(await cache.getChecksums('123', ['test', 'test2'])).toEqual([ + { bucket: 'test', checksum: 0, count: 0 }, + TEST2_123 + ]); + }); + + it('should handle missing checksums (b)', async function () { + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = factory(async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch.filter((b) => b.bucket != 'test' || b.end != '123')); + }); + + expect(await cache.getChecksums('123', ['test'])).toEqual([{ bucket: 'test', checksum: 0, count: 0 }]); + expect(await cache.getChecksums('1234', ['test'])).toEqual([ + { + bucket: 'test', + checksum: 1597020602, + count: 1111 + } + ]); + + expect(lookups).toEqual([[{ bucket: 'test', end: '123' }], [{ bucket: 'test', start: '123', end: '1234' }]]); + }); } describe('cache limit tests', function () { From 2e5af122f175ccf189f4a29c1bc5ace81df99430 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 10:19:03 +0200 Subject: [PATCH 08/15] Use ChecksumCache in MongoSyncBucketStorage. --- .../service-core/src/storage/BucketStorage.ts | 8 +- .../service-core/src/storage/ChecksumCache.ts | 15 ++-- .../src/storage/ChecksumCacheTwo.ts | 12 ++- .../storage/mongo/MongoSyncBucketStorage.ts | 53 +++++++------ packages/service-core/src/sync/sync.ts | 21 ++--- .../service-core/src/util/protocol-types.ts | 2 + packages/service-core/src/util/utils.ts | 76 ++++--------------- 7 files changed, 70 insertions(+), 117 deletions(-) diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 99754d4af..c1e031c0a 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -233,13 +233,9 @@ export interface SyncRulesBucketStorage { /** * Compute checksums for a given list of buckets. * - * If fromCheckpoint is specified, the result is a diff. Otherwise, it is the full checksum. + * Returns zero checksums for any buckets not found. */ - getChecksums( - checkpoint: util.OpId, - fromCheckpoint: util.OpId | null, - buckets: string[] - ): Promise; + getChecksums(checkpoint: util.OpId, buckets: string[]): Promise; /** * Terminate the sync rules. diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 56f14bce3..a548162d8 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -1,4 +1,4 @@ -import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import { BucketChecksum, ChecksumMap, OpId } from '@/util/protocol-types.js'; import { addBucketChecksums } from '@/util/utils.js'; import { LRUCache } from 'lru-cache/min'; @@ -17,7 +17,7 @@ export interface FetchPartialBucketChecksum { end: OpId; } -export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise>; +export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise; export interface ChecksumCacheOptions { fetchChecksums: FetchChecksums; @@ -68,7 +68,13 @@ export class ChecksumCache implements ChecksumCacheInterface { }; } - async getChecksums(checkpoint: OpId, buckets: string[]) { + async getChecksums(checkpoint: OpId, buckets: string[]): Promise { + const checksums = await this.getChecksumMap(checkpoint, buckets); + // Return results in the same order as the request + return buckets.map((bucket) => checksums.get(bucket)!); + } + + async getChecksumMap(checkpoint: OpId, buckets: string[]): Promise { let toFetch = new Set(); let fetchResults = new Map(); let resolveFetch!: () => void; @@ -208,7 +214,6 @@ export class ChecksumCache implements ChecksumCacheInterface { if (finalResults.size != buckets.length) { throw new Error(`Bucket results mismatch: ${finalResults.size} != ${buckets.length}`); } - // Return results in the same order as the request - return buckets.map((bucket) => finalResults.get(bucket)!); + return finalResults; } } diff --git a/packages/service-core/src/storage/ChecksumCacheTwo.ts b/packages/service-core/src/storage/ChecksumCacheTwo.ts index 211a0232a..3fc3b362b 100644 --- a/packages/service-core/src/storage/ChecksumCacheTwo.ts +++ b/packages/service-core/src/storage/ChecksumCacheTwo.ts @@ -1,4 +1,4 @@ -import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import { BucketChecksum, ChecksumMap, OpId } from '@/util/protocol-types.js'; import { addBucketChecksums } from '@/util/utils.js'; import { LRUCache } from 'lru-cache/min'; import { OrderedSet } from '@js-sdsl/ordered-set'; @@ -15,7 +15,7 @@ export interface FetchPartialBucketChecksum { end: OpId; } -export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise>; +export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise; export interface ChecksumCacheOptions { fetchChecksums: FetchChecksums; @@ -79,6 +79,12 @@ export class ChecksumCache implements ChecksumCacheInterface { } async getChecksums(checkpoint: OpId, buckets: string[]): Promise { + const checksums = await this.getChecksumMap(checkpoint, buckets); + // Return results in the same order as the request + return buckets.map((bucket) => checksums.get(bucket)!); + } + + async getChecksumMap(checkpoint: OpId, buckets: string[]): Promise> { let toFetch = new Set(); let fetchResults = new Map(); let resolveFetch!: () => void; @@ -209,7 +215,7 @@ export class ChecksumCache implements ChecksumCacheInterface { if (finalResults.size != buckets.length) { throw new Error(`Bucket results mismatch: ${finalResults.size} != ${buckets.length}`); } - return buckets.map((bucket) => finalResults.get(bucket)!); + return finalResults; } } diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index d07f10ac7..6e535f584 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -22,9 +22,15 @@ import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, SourceKey, SyncRuleState } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; import { BSON_DESERIALIZE_OPTIONS, idPrefixFilter, readSingleBatch, serializeLookup } from './util.js'; +import { ChecksumCache, FetchPartialBucketChecksum } from '../ChecksumCacheTwo.js'; export class MongoSyncBucketStorage implements SyncRulesBucketStorage { private readonly db: PowerSyncMongo; + private checksumCache = new ChecksumCache({ + fetchChecksums: (batch) => { + return this.getChecksumsInternal(batch); + } + }); constructor( public readonly factory: MongoBucketStorage, @@ -316,34 +322,28 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { } } - async getChecksums( - checkpoint: util.OpId, - fromCheckpoint: util.OpId | null, - buckets: string[] - ): Promise { - if (buckets.length == 0) { - return []; - } + async getChecksums(checkpoint: util.OpId, buckets: string[]): Promise { + return this.checksumCache.getChecksumMap(checkpoint, buckets); + } - if (fromCheckpoint == checkpoint) { - return []; + private async getChecksumsInternal(batch: FetchPartialBucketChecksum[]): Promise { + if (batch.length == 0) { + return new Map(); } - const start = fromCheckpoint ? BigInt(fromCheckpoint) : new bson.MinKey(); - const filters: any[] = []; - for (let name of buckets) { + for (let request of batch) { filters.push({ _id: { $gt: { g: this.group_id, - b: name, - o: start + b: request.bucket, + o: request.start ? BigInt(request.start) : new bson.MinKey() }, $lte: { g: this.group_id, - b: name, - o: BigInt(checkpoint) + b: request.bucket, + o: BigInt(request.end) } } }); @@ -365,13 +365,18 @@ export class MongoSyncBucketStorage implements SyncRulesBucketStorage { ) .toArray(); - return aggregate.map((doc) => { - return { - bucket: doc._id, - count: doc.count, - checksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff - }; - }); + return new Map( + aggregate.map((doc) => { + return [ + doc._id, + { + bucket: doc._id, + count: doc.count, + checksum: Number(BigInt(doc.checksum_total) & 0xffffffffn) & 0xffffffff + } satisfies util.BucketChecksum + ]; + }) + ); } async terminate() { diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 032eda9ba..7c284ee70 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -78,7 +78,7 @@ async function* streamResponseInner( // This starts with the state from the client. May contain buckets that the user do not have access to (anymore). let dataBuckets = new Map(); - let lastChecksums: { checkpoint: util.OpId; checksums: Map } | null = null; + let lastChecksums: util.ChecksumMap | null = null; let lastWriteCheckpoint: bigint | null = null; const { raw_data, binary_data } = params; @@ -120,10 +120,10 @@ async function* streamResponseInner( dataBuckets = dataBucketsNew; const bucketList = [...dataBuckets.keys()]; - const checksumDiff = await storage.getChecksums(checkpoint, lastChecksums?.checkpoint ?? null, bucketList); + const checksumMap = await storage.getChecksums(checkpoint, bucketList); if (lastChecksums) { - const diff = util.checksumsDiff(lastChecksums.checksums, bucketList, checksumDiff); + const diff = util.checksumsDiff(lastChecksums, checksumMap); if ( lastWriteCheckpoint == writeCheckpoint && @@ -150,18 +150,7 @@ async function* streamResponseInner( }; yield checksum_line; - - lastChecksums = { - checkpoint, - checksums: diff.nextBuckets - }; } else { - const nextBuckets = util.fillEmptyChecksums(bucketList, checksumDiff); - lastChecksums = { - checkpoint, - checksums: nextBuckets - }; - let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; micro.logger.info(message); @@ -169,12 +158,12 @@ async function* streamResponseInner( checkpoint: { last_op_id: checkpoint, write_checkpoint: writeCheckpoint ? String(writeCheckpoint) : undefined, - buckets: [...nextBuckets.values()] + buckets: [...checksumMap.values()] } }; yield checksum_line; } - + lastChecksums = checksumMap; lastWriteCheckpoint = writeCheckpoint; yield* bucketDataInBatches(storage, checkpoint, dataBuckets, raw_data, binary_data, signal); diff --git a/packages/service-core/src/util/protocol-types.ts b/packages/service-core/src/util/protocol-types.ts index 91073f102..695f2b55e 100644 --- a/packages/service-core/src/util/protocol-types.ts +++ b/packages/service-core/src/util/protocol-types.ts @@ -186,6 +186,8 @@ export interface BucketChecksum { count: number; } +export type ChecksumMap = Map; + export function isContinueCheckpointRequest(request: SyncRequest): request is ContinueCheckpointRequest { return ( Array.isArray((request as ContinueCheckpointRequest).buckets) && diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 4b881d181..3bc76ceee 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -4,7 +4,7 @@ import { pgwireRows } from '@powersync/service-jpgwire'; import * as micro from '@journeyapps-platform/micro'; import * as storage from '@/storage/storage-index.js'; -import { BucketChecksum, OpId } from './protocol-types.js'; +import { BucketChecksum, ChecksumMap, OpId } from './protocol-types.js'; import { retriedQuery } from './pgwire_utils.js'; export function hashData(type: string, id: string, data: string): number { @@ -30,81 +30,31 @@ export function timestampToOpId(ts: bigint): OpId { return ts.toString(10); } -export function fillEmptyChecksums(currentBuckets: string[], checksums: BucketChecksum[]) { - // All current values - const nextBuckets = new Map(); - - for (let checksum of checksums) { - // Added checksum - nextBuckets.set(checksum.bucket, checksum); - } - - for (let bucket of currentBuckets) { - if (!nextBuckets.has(bucket)) { - // Empty diff - empty bucket - const checksum: BucketChecksum = { - bucket, - checksum: 0, - count: 0 - }; - nextBuckets.set(bucket, checksum); - } - } - - return nextBuckets; -} - -export function checksumsDiff( - previousBuckets: Map, - currentBuckets: string[], - checksumDiff: BucketChecksum[] -) { +export function checksumsDiff(previous: ChecksumMap, current: ChecksumMap) { // All changed ones const updatedBuckets = new Map(); - // All current values - const nextBuckets = new Map(); - for (let cdiff of checksumDiff) { - const p = previousBuckets.get(cdiff.bucket); + const toRemove = new Set(previous.keys()); + + for (let checksum of current.values()) { + const p = previous.get(checksum.bucket); if (p == null) { // Added - updatedBuckets.set(cdiff.bucket, cdiff); - nextBuckets.set(cdiff.bucket, cdiff); - } else { - // Updated - const checksum: BucketChecksum = addBucketChecksums(p, cdiff); updatedBuckets.set(checksum.bucket, checksum); - nextBuckets.set(checksum.bucket, checksum); - previousBuckets.delete(cdiff.bucket); - } - } - - for (let bucket of currentBuckets) { - if (!updatedBuckets.has(bucket)) { - // Empty diff - either empty bucket, or unchanged - const p = previousBuckets.get(bucket); - if (p == null) { - // Emtpy bucket - const checksum: BucketChecksum = { - bucket, - checksum: 0, - count: 0 - }; - updatedBuckets.set(bucket, checksum); - nextBuckets.set(bucket, checksum); + } else { + toRemove.delete(checksum.bucket); + if (checksum.checksum != p.checksum || checksum.count != p.count) { + // Updated + updatedBuckets.set(checksum.bucket, checksum); } else { - // Unchanged bucket - nextBuckets.set(bucket, p); - previousBuckets.delete(bucket); + // No change } } } - const removedBuckets: string[] = [...previousBuckets.keys()]; return { updatedBuckets: [...updatedBuckets.values()], - removedBuckets, - nextBuckets + removedBuckets: [...toRemove] }; } From cd7dd4005e8ec81cce66a520d94c10fdff9b3226 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 10:25:57 +0200 Subject: [PATCH 09/15] Remove initial cache POC. --- .../service-core/src/storage/ChecksumCache.ts | 177 +++++++------- .../src/storage/ChecksumCacheTwo.ts | 229 ------------------ .../storage/mongo/MongoSyncBucketStorage.ts | 2 +- .../test/src/checksum_cache.test.ts | 17 +- .../test/src/data_storage.test.ts | 6 +- .../service-core/test/src/large_batch.test.ts | 16 +- 6 files changed, 113 insertions(+), 334 deletions(-) delete mode 100644 packages/service-core/src/storage/ChecksumCacheTwo.ts diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index a548162d8..0fb543448 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -1,14 +1,11 @@ import { BucketChecksum, ChecksumMap, OpId } from '@/util/protocol-types.js'; import { addBucketChecksums } from '@/util/utils.js'; import { LRUCache } from 'lru-cache/min'; - -interface CheckpointEntry { - refs: Set; - cache: LRUCache; -} +import { OrderedSet } from '@js-sdsl/ordered-set'; interface ChecksumFetchContext { fetch(bucket: string): Promise; + checkpoint: bigint; } export interface FetchPartialBucketChecksum { @@ -28,44 +25,60 @@ export interface ChecksumCacheInterface { getChecksums(checkpoint: OpId, buckets: string[]): Promise; } +// Approximately 5MB of memory, if we assume 50 bytes per entry +const DEFAULT_MAX_SIZE = 100_000; + +/** + * Implement a LRU cache for checksum requests. Each (bucket, checkpoint) request is cached separately, + * while the lookups occur in batches. + * + * For each bucket, we keep a separate OrderedSet of cached checkpoints. + * This allows us to do incrementally update checksums by using the last cached checksum for the same bucket. + * + * We use the LRUCache fetchMethod to deduplicate in-progress requests. + */ export class ChecksumCache implements ChecksumCacheInterface { - private nextRefId = 1; - private checkpoints = new Map(); + /** + * The primary checksum cache, with key of `${checkpoint}/${bucket}`. + */ + private cache: LRUCache; + + private bucketCheckpoints = new Map>(); private fetchChecksums: FetchChecksums; constructor(options: ChecksumCacheOptions) { this.fetchChecksums = options.fetchChecksums; - } - async lock(checkpoint: OpId) { - const ref = this.nextRefId++; - - const existing = this.checkpoints.get(checkpoint); - if (existing != null) { - existing.refs.add(ref); - } else { - const entry: CheckpointEntry = { - refs: new Set([ref]), - cache: new LRUCache({ - max: 10_000, - fetchMethod: async (bucket, staleValue, options) => { - return options.context.fetch(bucket); - } - }) - }; - this.checkpoints.set(checkpoint, entry); - } + this.cache = new LRUCache({ + max: options.maxSize ?? DEFAULT_MAX_SIZE, + fetchMethod: async (cacheKey, _staleValue, options) => { + const { bucket } = parseCacheKey(cacheKey); + const result = await options.context.fetch(bucket); - return () => { - const entry = this.checkpoints.get(checkpoint); - if (entry == null) { - return; - } - entry.refs.delete(ref); - if (entry.refs.size == 0) { - this.checkpoints.delete(checkpoint); - } - }; + let checkpointSet = this.bucketCheckpoints.get(bucket); + if (checkpointSet == null) { + checkpointSet = new OrderedSet(); + this.bucketCheckpoints.set(bucket, checkpointSet); + } + checkpointSet.insert(options.context.checkpoint); + return result; + }, + + dispose: (value, key) => { + const { checkpointString } = parseCacheKey(key); + const checkpoint = BigInt(checkpointString); + const checkpointSet = this.bucketCheckpoints.get(value.bucket); + if (checkpointSet == null) { + return; + } + checkpointSet.eraseElementByKey(checkpoint); + if (checkpointSet.length == 0) { + this.bucketCheckpoints.delete(value.bucket); + } + }, + + noDisposeOnSet: true + }); } async getChecksums(checkpoint: OpId, buckets: string[]): Promise { @@ -74,7 +87,7 @@ export class ChecksumCache implements ChecksumCacheInterface { return buckets.map((bucket) => checksums.get(bucket)!); } - async getChecksumMap(checkpoint: OpId, buckets: string[]): Promise { + async getChecksumMap(checkpoint: OpId, buckets: string[]): Promise> { let toFetch = new Set(); let fetchResults = new Map(); let resolveFetch!: () => void; @@ -84,46 +97,35 @@ export class ChecksumCache implements ChecksumCacheInterface { rejectFetch = reject; }); - let entry = this.checkpoints.get(checkpoint); - if (entry == null) { - // TODO: throw new Error(`No checkpoint cache for ${checkpoint}`); - // Temporary: auto-create cache - entry = { - refs: new Set([]), - cache: new LRUCache({ - max: 10_000, - fetchMethod: async (bucket, staleValue, options) => { - return options.context.fetch(bucket); - } - }) - }; - this.checkpoints.set(checkpoint, entry); - } - let finalResults = new Map(); const context: ChecksumFetchContext = { async fetch(bucket) { await fetchPromise; if (!toFetch.has(bucket)) { + // Should never happen throw new Error(`Expected to fetch ${bucket}`); } const checksum = fetchResults.get(bucket); if (checksum == null) { + // Should never happen throw new Error(`Failed to fetch checksum for bucket ${bucket}`); } return checksum; - } + }, + checkpoint: BigInt(checkpoint) }; let promises: Promise[] = []; try { for (let bucket of buckets) { + const cacheKey = makeCacheKey(checkpoint, bucket); let status: LRUCache.Status = {}; - const p = entry.cache.fetch(bucket, { context: context, status: status }).then((checksums) => { + const p = this.cache.fetch(cacheKey, { context: context, status: status }).then((checksums) => { if (checksums == null) { - throw new Error(`Failed to get checksums for ${bucket}`); + // Should never happen + throw new Error(`Failed to get checksums for ${cacheKey}`); } finalResults.set(bucket, checksums); }); @@ -140,38 +142,39 @@ export class ChecksumCache implements ChecksumCacheInterface { resolveFetch(); } else { // Find smaller checkpoints, sorted in descending order - const checkpoints = [...this.checkpoints.keys()] - .filter((other) => BigInt(other) < BigInt(checkpoint)) - .sort((a, b) => { - if (a == b) { - return 0; - } else if (BigInt(a) < BigInt(b)) { - return 1; - } else { - return -1; - } - }); let bucketRequests: FetchPartialBucketChecksum[] = []; let add = new Map(); for (let bucket of toFetch) { let bucketRequest: FetchPartialBucketChecksum | null = null; - for (let cp of checkpoints) { - const entry = this.checkpoints.get(cp); - if (entry == null) { - throw new Error(`Cannot find cached checkpoint ${cp}`); - } - - const cached = entry.cache.peek(bucket); - if (cached != null) { - bucketRequest = { - bucket, - start: cp, - end: checkpoint - }; - add.set(bucket, cached); - break; + const checkpointSet = this.bucketCheckpoints.get(bucket); + if (checkpointSet != null) { + let iter = checkpointSet.reverseUpperBound(context.checkpoint); + const begin = checkpointSet.begin(); + while (iter.isAccessible()) { + const cp = iter.pointer; + const cacheKey = makeCacheKey(cp, bucket); + // peek to avoid refreshing the key + const cached = this.cache.peek(cacheKey); + // As long as dispose() works correctly, the checkpointset should + // match up with the cache, and `cached` should also have a value here. + // However, we handle caces where it's not present either way. + // Test by disabling the `dispose()` callback. + if (cached != null) { + bucketRequest = { + bucket, + start: cp.toString(), + end: checkpoint + }; + add.set(bucket, cached); + break; + } + + if (iter.equals(begin)) { + break; + } + iter = iter.pre(); } } @@ -194,6 +197,7 @@ export class ChecksumCache implements ChecksumCacheInterface { const result = results.get(bucket); const toAdd = add.get(bucket); if (toAdd == null) { + // Should never happen throw new Error(`toAdd null for ${bucket}`); } const added = addBucketChecksums(toAdd, result ?? null); @@ -217,3 +221,12 @@ export class ChecksumCache implements ChecksumCacheInterface { return finalResults; } } + +function makeCacheKey(checkpoint: bigint | string, bucket: string) { + return `${checkpoint}/${bucket}`; +} + +function parseCacheKey(key: string) { + const index = key.indexOf('/'); + return { checkpointString: key.substring(0, index), bucket: key.substring(index + 1) }; +} diff --git a/packages/service-core/src/storage/ChecksumCacheTwo.ts b/packages/service-core/src/storage/ChecksumCacheTwo.ts deleted file mode 100644 index 3fc3b362b..000000000 --- a/packages/service-core/src/storage/ChecksumCacheTwo.ts +++ /dev/null @@ -1,229 +0,0 @@ -import { BucketChecksum, ChecksumMap, OpId } from '@/util/protocol-types.js'; -import { addBucketChecksums } from '@/util/utils.js'; -import { LRUCache } from 'lru-cache/min'; -import { OrderedSet } from '@js-sdsl/ordered-set'; -import { ChecksumCacheInterface } from './ChecksumCache.js'; - -interface ChecksumFetchContext { - fetch(bucket: string): Promise; - checkpoint: bigint; -} - -export interface FetchPartialBucketChecksum { - bucket: string; - start?: OpId; - end: OpId; -} - -export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise; - -export interface ChecksumCacheOptions { - fetchChecksums: FetchChecksums; - maxSize?: number; -} - -// Approximately 5MB of memory, if we assume 50 bytes per entry -const DEFAULT_MAX_SIZE = 100_000; - -/** - * Implement a LRU cache for checksum requests. Each (bucket, checkpoint) request is cached separately, - * while the lookups occur in batches. - * - * For each bucket, we keep a separate OrderedSet of cached checkpoints. - * This allows us to do incrementally update checksums by using the last cached checksum for the same bucket. - * - * We use the LRUCache fetchMethod to deduplicate in-progress requests. - */ -export class ChecksumCache implements ChecksumCacheInterface { - /** - * The primary checksum cache, with key of `${checkpoint}/${bucket}`. - */ - private cache: LRUCache; - - private bucketCheckpoints = new Map>(); - private fetchChecksums: FetchChecksums; - - constructor(options: ChecksumCacheOptions) { - this.fetchChecksums = options.fetchChecksums; - - this.cache = new LRUCache({ - max: options.maxSize ?? DEFAULT_MAX_SIZE, - fetchMethod: async (cacheKey, _staleValue, options) => { - const { bucket } = parseCacheKey(cacheKey); - const result = await options.context.fetch(bucket); - - let checkpointSet = this.bucketCheckpoints.get(bucket); - if (checkpointSet == null) { - checkpointSet = new OrderedSet(); - this.bucketCheckpoints.set(bucket, checkpointSet); - } - checkpointSet.insert(options.context.checkpoint); - return result; - }, - - dispose: (value, key) => { - const { checkpointString } = parseCacheKey(key); - const checkpoint = BigInt(checkpointString); - const checkpointSet = this.bucketCheckpoints.get(value.bucket); - if (checkpointSet == null) { - return; - } - checkpointSet.eraseElementByKey(checkpoint); - if (checkpointSet.length == 0) { - this.bucketCheckpoints.delete(value.bucket); - } - }, - - noDisposeOnSet: true - }); - } - - async getChecksums(checkpoint: OpId, buckets: string[]): Promise { - const checksums = await this.getChecksumMap(checkpoint, buckets); - // Return results in the same order as the request - return buckets.map((bucket) => checksums.get(bucket)!); - } - - async getChecksumMap(checkpoint: OpId, buckets: string[]): Promise> { - let toFetch = new Set(); - let fetchResults = new Map(); - let resolveFetch!: () => void; - let rejectFetch!: (err: any) => void; - let fetchPromise = new Promise((resolve, reject) => { - resolveFetch = resolve; - rejectFetch = reject; - }); - - let finalResults = new Map(); - - const context: ChecksumFetchContext = { - async fetch(bucket) { - await fetchPromise; - if (!toFetch.has(bucket)) { - // Should never happen - throw new Error(`Expected to fetch ${bucket}`); - } - const checksum = fetchResults.get(bucket); - if (checksum == null) { - // Should never happen - throw new Error(`Failed to fetch checksum for bucket ${bucket}`); - } - return checksum; - }, - checkpoint: BigInt(checkpoint) - }; - - let promises: Promise[] = []; - - try { - for (let bucket of buckets) { - const cacheKey = makeCacheKey(checkpoint, bucket); - let status: LRUCache.Status = {}; - const p = this.cache.fetch(cacheKey, { context: context, status: status }).then((checksums) => { - if (checksums == null) { - // Should never happen - throw new Error(`Failed to get checksums for ${cacheKey}`); - } - finalResults.set(bucket, checksums); - }); - promises.push(p); - if (status.fetch == 'hit' || status.fetch == 'inflight') { - // No need to fetch now - } else { - toFetch.add(bucket); - } - } - - if (toFetch.size == 0) { - // Nothing to fetch, but resolve in case - resolveFetch(); - } else { - // Find smaller checkpoints, sorted in descending order - - let bucketRequests: FetchPartialBucketChecksum[] = []; - let add = new Map(); - - for (let bucket of toFetch) { - let bucketRequest: FetchPartialBucketChecksum | null = null; - const checkpointSet = this.bucketCheckpoints.get(bucket); - if (checkpointSet != null) { - let iter = checkpointSet.reverseUpperBound(context.checkpoint); - const begin = checkpointSet.begin(); - while (iter.isAccessible()) { - const cp = iter.pointer; - const cacheKey = makeCacheKey(cp, bucket); - // peek to avoid refreshing the key - const cached = this.cache.peek(cacheKey); - // As long as dispose() works correctly, the checkpointset should - // match up with the cache, and `cached` should also have a value here. - // However, we handle caces where it's not present either way. - // Test by disabling the `dispose()` callback. - if (cached != null) { - bucketRequest = { - bucket, - start: cp.toString(), - end: checkpoint - }; - add.set(bucket, cached); - break; - } - - if (iter.equals(begin)) { - break; - } - iter = iter.pre(); - } - } - - if (bucketRequest == null) { - bucketRequest = { - bucket, - end: checkpoint - }; - add.set(bucket, { - bucket, - checksum: 0, - count: 0 - }); - } - bucketRequests.push(bucketRequest); - } - - const results = await this.fetchChecksums(bucketRequests); - for (let bucket of toFetch) { - const result = results.get(bucket); - const toAdd = add.get(bucket); - if (toAdd == null) { - // Should never happen - throw new Error(`toAdd null for ${bucket}`); - } - const added = addBucketChecksums(toAdd, result ?? null); - fetchResults.set(bucket, added); - } - resolveFetch(); - } - } catch (e) { - rejectFetch(e); - - // Wait for the above rejection to propagate, otherwise we end up with "uncaught" errors. - await Promise.all(promises).catch((_e) => {}); - - throw e; - } - - await Promise.all(promises); - if (finalResults.size != buckets.length) { - throw new Error(`Bucket results mismatch: ${finalResults.size} != ${buckets.length}`); - } - return finalResults; - } -} - -function makeCacheKey(checkpoint: bigint | string, bucket: string) { - return `${checkpoint}/${bucket}`; -} - -function parseCacheKey(key: string) { - const index = key.indexOf('/'); - return { checkpointString: key.substring(0, index), bucket: key.substring(index + 1) }; -} diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index 6e535f584..b6e22ae85 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -22,7 +22,7 @@ import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, SourceKey, SyncRuleState } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; import { BSON_DESERIALIZE_OPTIONS, idPrefixFilter, readSingleBatch, serializeLookup } from './util.js'; -import { ChecksumCache, FetchPartialBucketChecksum } from '../ChecksumCacheTwo.js'; +import { ChecksumCache, FetchPartialBucketChecksum } from '../ChecksumCache.js'; export class MongoSyncBucketStorage implements SyncRulesBucketStorage { private readonly db: PowerSyncMongo; diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index bbd81d4c3..25d7cd625 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -1,25 +1,20 @@ import { describe, expect, it } from 'vitest'; +import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import * as crypto from 'node:crypto'; +import { addBucketChecksums } from '@/util/util-index.js'; import { ChecksumCache, ChecksumCacheInterface, FetchChecksums, FetchPartialBucketChecksum -} from '../../src/storage/ChecksumCache.js'; -import { ChecksumCache as ChecksumCacheTwo } from '../../src/storage/ChecksumCacheTwo.js'; -import { BucketChecksum, OpId } from '@/util/protocol-types.js'; -import * as crypto from 'node:crypto'; -import { addBucketChecksums } from '@/util/util-index.js'; +} from '@/storage/ChecksumCache.js'; type CachsumCacheFactory = (fetch: FetchChecksums) => ChecksumCacheInterface; -describe('checksum cache 1', function () { +describe('checksum cache', function () { defineChecksumCacheTests((f) => new ChecksumCache({ fetchChecksums: f })); }); -describe('checksum cache 2', function () { - defineChecksumCacheTests((f) => new ChecksumCacheTwo({ fetchChecksums: f })); -}); - /** * Create a deterministic BucketChecksum based on the bucket name and checkpoint for testing purposes. */ @@ -362,7 +357,7 @@ function defineChecksumCacheTests(factory: CachsumCacheFactory) { describe('cache limit tests', function () { it('should use maxSize', async function () { let lookups: FetchPartialBucketChecksum[][] = []; - const cache = new ChecksumCacheTwo({ + const cache = new ChecksumCache({ fetchChecksums: async (batch) => { lookups.push(batch); return fetchTestChecksums(batch); diff --git a/packages/service-core/test/src/data_storage.test.ts b/packages/service-core/test/src/data_storage.test.ts index dc9cf1d13..ab44a517c 100644 --- a/packages/service-core/test/src/data_storage.test.ts +++ b/packages/service-core/test/src/data_storage.test.ts @@ -252,7 +252,7 @@ bucket_definitions: { op: 'REMOVE', object_id: 'test1', checksum: c2 } ]); - const checksums = await storage.getChecksums(checkpoint, null, ['global[]']); + const checksums = [...(await storage.getChecksums(checkpoint, ['global[]'])).values()]; expect(checksums).toEqual([ { bucket: 'global[]', @@ -599,7 +599,7 @@ bucket_definitions: { op: 'REMOVE', object_id: 'test1', checksum: c2 } ]); - const checksums = await storage.getChecksums(checkpoint, null, ['global[]']); + const checksums = [...(await storage.getChecksums(checkpoint, ['global[]'])).values()]; expect(checksums).toEqual([ { bucket: 'global[]', @@ -713,7 +713,7 @@ bucket_definitions: { op: 'REMOVE', object_id: 'test1', checksum: c2 } ]); - const checksums = await storage.getChecksums(checkpoint, null, ['global[]']); + const checksums = [...(await storage.getChecksums(checkpoint, ['global[]'])).values()]; expect(checksums).toEqual([ { bucket: 'global[]', diff --git a/packages/service-core/test/src/large_batch.test.ts b/packages/service-core/test/src/large_batch.test.ts index 87b67bf64..edbc28610 100644 --- a/packages/service-core/test/src/large_batch.test.ts +++ b/packages/service-core/test/src/large_batch.test.ts @@ -49,8 +49,8 @@ function defineBatchTests(factory: StorageFactory) { const checkpoint = await context.getCheckpoint({ timeout: 100_000 }); const duration = Date.now() - start; const used = Math.round(process.memoryUsage().heapUsed / 1024 / 1024); - const checksum = await context.storage!.getChecksums(checkpoint, null, ['global[]']); - expect(checksum[0].count).toEqual(operation_count); + const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']); + expect(checksum.get('global[]')!.count).toEqual(operation_count); const perSecond = Math.round((operation_count / duration) * 1000); console.log(`${operation_count} ops in ${duration}ms ${perSecond} ops/s. ${used}MB heap`); }), @@ -100,8 +100,8 @@ function defineBatchTests(factory: StorageFactory) { const checkpoint = await context.getCheckpoint({ timeout: 100_000 }); const duration = Date.now() - start; - const checksum = await context.storage!.getChecksums(checkpoint, null, ['global[]']); - expect(checksum[0].count).toEqual(operation_count); + const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']); + expect(checksum.get('global[]')!.count).toEqual(operation_count); const perSecond = Math.round((operation_count / duration) * 1000); console.log(`${operation_count} ops in ${duration}ms ${perSecond} ops/s.`); printMemoryUsage(); @@ -156,8 +156,8 @@ function defineBatchTests(factory: StorageFactory) { const checkpoint = await context.getCheckpoint({ timeout: 50_000 }); const duration = Date.now() - start; const used = Math.round(process.memoryUsage().heapUsed / 1024 / 1024); - const checksum = await context.storage!.getChecksums(checkpoint, null, ['global[]']); - expect(checksum[0].count).toEqual(operationCount); + const checksum = await context.storage!.getChecksums(checkpoint, ['global[]']); + expect(checksum.get('global[]')!.count).toEqual(operationCount); const perSecond = Math.round((operationCount / duration) * 1000); // This number depends on the test machine, so we keep the test significantly // lower than expected numbers. @@ -173,8 +173,8 @@ function defineBatchTests(factory: StorageFactory) { const checkpoint2 = await context.getCheckpoint({ timeout: 20_000 }); const truncateDuration = Date.now() - truncateStart; - const checksum2 = await context.storage!.getChecksums(checkpoint2, null, ['global[]']); - const truncateCount = checksum2[0].count - checksum[0].count; + const checksum2 = await context.storage!.getChecksums(checkpoint2, ['global[]']); + const truncateCount = checksum2.get('global[]')!.count - checksum.get('global[]')!.count; expect(truncateCount).toEqual(numTransactions * perTransaction); const truncatePerSecond = Math.round((truncateCount / truncateDuration) * 1000); console.log(`Truncated ${truncateCount} ops in ${truncateDuration}ms ${truncatePerSecond} ops/s. ${used}MB heap`); From 1c34146471225e49fa716d0623f780bd7418b542 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 10:44:42 +0200 Subject: [PATCH 10/15] Cleanup and comments. --- .../service-core/src/storage/ChecksumCache.ts | 71 +++++++++++++++---- .../test/src/checksum_cache.test.ts | 23 ++---- 2 files changed, 64 insertions(+), 30 deletions(-) diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 0fb543448..4b6faac9e 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -17,12 +17,17 @@ export interface FetchPartialBucketChecksum { export type FetchChecksums = (batch: FetchPartialBucketChecksum[]) => Promise; export interface ChecksumCacheOptions { + /** + * Upstream checksum implementation. + * + * This fetches a batch of either entire bucket checksums, or a partial range. + */ fetchChecksums: FetchChecksums; - maxSize?: number; -} -export interface ChecksumCacheInterface { - getChecksums(checkpoint: OpId, buckets: string[]): Promise; + /** + * Maximum number of cached checksums. + */ + maxSize?: number; } // Approximately 5MB of memory, if we assume 50 bytes per entry @@ -37,13 +42,16 @@ const DEFAULT_MAX_SIZE = 100_000; * * We use the LRUCache fetchMethod to deduplicate in-progress requests. */ -export class ChecksumCache implements ChecksumCacheInterface { +export class ChecksumCache { /** * The primary checksum cache, with key of `${checkpoint}/${bucket}`. */ private cache: LRUCache; - + /** + * For each bucket, an ordered set of cached checkpoints. + */ private bucketCheckpoints = new Map>(); + private fetchChecksums: FetchChecksums; constructor(options: ChecksumCacheOptions) { @@ -52,9 +60,12 @@ export class ChecksumCache implements ChecksumCacheInterface { this.cache = new LRUCache({ max: options.maxSize ?? DEFAULT_MAX_SIZE, fetchMethod: async (cacheKey, _staleValue, options) => { + // Called when this checksum hasn't been cached yet. + // Pass the call back to the request, which implements batch fetching. const { bucket } = parseCacheKey(cacheKey); const result = await options.context.fetch(bucket); + // Add to the set of cached checkpoints for the bucket. let checkpointSet = this.bucketCheckpoints.get(bucket); if (checkpointSet == null) { checkpointSet = new OrderedSet(); @@ -65,6 +76,7 @@ export class ChecksumCache implements ChecksumCacheInterface { }, dispose: (value, key) => { + // Remove from the set of cached checkpoints for the bucket const { checkpointString } = parseCacheKey(key); const checkpoint = BigInt(checkpointString); const checkpointSet = this.bucketCheckpoints.get(value.bucket); @@ -87,9 +99,21 @@ export class ChecksumCache implements ChecksumCacheInterface { return buckets.map((bucket) => checksums.get(bucket)!); } - async getChecksumMap(checkpoint: OpId, buckets: string[]): Promise> { + /** + * Get bucket checksums for a checkpoint. + * + * Any checksums not found upstream are returned as zero checksums. + * + * @returns a Map with exactly one entry for each bucket requested + */ + async getChecksumMap(checkpoint: OpId, buckets: string[]): Promise { + // Buckets that don't have a cached checksum for this checkpoint yet let toFetch = new Set(); + + // Newly fetched results let fetchResults = new Map(); + + // Promise for the bactch new fetch requests let resolveFetch!: () => void; let rejectFetch!: (err: any) => void; let fetchPromise = new Promise((resolve, reject) => { @@ -97,6 +121,7 @@ export class ChecksumCache implements ChecksumCacheInterface { rejectFetch = reject; }); + // Accumulated results - both from cached checksums, and fetched checksums let finalResults = new Map(); const context: ChecksumFetchContext = { @@ -116,7 +141,8 @@ export class ChecksumCache implements ChecksumCacheInterface { checkpoint: BigInt(checkpoint) }; - let promises: Promise[] = []; + // Individual cache fetch promises + let cacheFetchPromises: Promise[] = []; try { for (let bucket of buckets) { @@ -129,10 +155,13 @@ export class ChecksumCache implements ChecksumCacheInterface { } finalResults.set(bucket, checksums); }); - promises.push(p); + cacheFetchPromises.push(p); if (status.fetch == 'hit' || status.fetch == 'inflight') { - // No need to fetch now + // The checksums is either cached already (hit), or another request is busy + // fetching (inflight). + // In either case, we don't need to fetch a new checksum. } else { + // We need a new request for this checksum. toFetch.add(bucket); } } @@ -141,15 +170,15 @@ export class ChecksumCache implements ChecksumCacheInterface { // Nothing to fetch, but resolve in case resolveFetch(); } else { - // Find smaller checkpoints, sorted in descending order - let bucketRequests: FetchPartialBucketChecksum[] = []; + // Partial checksum (previously cached) to add to the partial fetch let add = new Map(); for (let bucket of toFetch) { let bucketRequest: FetchPartialBucketChecksum | null = null; const checkpointSet = this.bucketCheckpoints.get(bucket); if (checkpointSet != null) { + // Find smaller checkpoints, sorted in descending order let iter = checkpointSet.reverseUpperBound(context.checkpoint); const begin = checkpointSet.begin(); while (iter.isAccessible()) { @@ -162,6 +191,7 @@ export class ChecksumCache implements ChecksumCacheInterface { // However, we handle caces where it's not present either way. // Test by disabling the `dispose()` callback. if (cached != null) { + // Partial checksum found - make a partial checksum request bucketRequest = { bucket, start: cp.toString(), @@ -172,13 +202,16 @@ export class ChecksumCache implements ChecksumCacheInterface { } if (iter.equals(begin)) { + // Cannot iterate further break; } + // Iterate backwards iter = iter.pre(); } } if (bucketRequest == null) { + // No partial checksum found - make a new full checksum request bucketRequest = { bucket, end: checkpoint @@ -192,7 +225,9 @@ export class ChecksumCache implements ChecksumCacheInterface { bucketRequests.push(bucketRequest); } + // Fetch partial checksums from upstream const results = await this.fetchChecksums(bucketRequests); + for (let bucket of toFetch) { const result = results.get(bucket); const toAdd = add.get(bucket); @@ -200,22 +235,30 @@ export class ChecksumCache implements ChecksumCacheInterface { // Should never happen throw new Error(`toAdd null for ${bucket}`); } + // Compute the full checksum from the two partials. + // No results returned are treated the same as a zero result. const added = addBucketChecksums(toAdd, result ?? null); fetchResults.set(bucket, added); } + + // fetchResults is fully populated, so we resolve the Promise resolveFetch(); } } catch (e) { + // Failure when fetching checksums - reject the Promise. + // This will reject all individual cache fetch requests, and each will be retried + // on the next request. rejectFetch(e); // Wait for the above rejection to propagate, otherwise we end up with "uncaught" errors. - await Promise.all(promises).catch((_e) => {}); + await Promise.all(cacheFetchPromises).catch((_e) => {}); throw e; } - await Promise.all(promises); + await Promise.all(cacheFetchPromises); if (finalResults.size != buckets.length) { + // Should not happen throw new Error(`Bucket results mismatch: ${finalResults.size} != ${buckets.length}`); } return finalResults; diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index 25d7cd625..1da277b95 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -1,19 +1,8 @@ import { describe, expect, it } from 'vitest'; -import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import { BucketChecksum, ChecksumMap, OpId } from '@/util/protocol-types.js'; import * as crypto from 'node:crypto'; import { addBucketChecksums } from '@/util/util-index.js'; -import { - ChecksumCache, - ChecksumCacheInterface, - FetchChecksums, - FetchPartialBucketChecksum -} from '@/storage/ChecksumCache.js'; - -type CachsumCacheFactory = (fetch: FetchChecksums) => ChecksumCacheInterface; - -describe('checksum cache', function () { - defineChecksumCacheTests((f) => new ChecksumCache({ fetchChecksums: f })); -}); +import { ChecksumCache, FetchChecksums, FetchPartialBucketChecksum } from '@/storage/ChecksumCache.js'; /** * Create a deterministic BucketChecksum based on the bucket name and checkpoint for testing purposes. @@ -82,7 +71,11 @@ function fetchTestChecksums(batch: FetchPartialBucketChecksum[]) { ); } -function defineChecksumCacheTests(factory: CachsumCacheFactory) { +describe('checksum cache', function () { + const factory = (fetch: FetchChecksums) => { + return new ChecksumCache({ fetchChecksums: fetch }); + }; + it('should handle a sequential lookups (a)', async function () { let lookups: FetchPartialBucketChecksum[][] = []; const cache = factory(async (batch) => { @@ -352,9 +345,7 @@ function defineChecksumCacheTests(factory: CachsumCacheFactory) { expect(lookups).toEqual([[{ bucket: 'test', end: '123' }], [{ bucket: 'test', start: '123', end: '1234' }]]); }); -} -describe('cache limit tests', function () { it('should use maxSize', async function () { let lookups: FetchPartialBucketChecksum[][] = []; const cache = new ChecksumCache({ From 616779afdf71cd278de7db3cb8a278a209ce87e1 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 10:47:27 +0200 Subject: [PATCH 11/15] More cleanup. --- packages/service-core/src/storage/ChecksumCache.ts | 4 ++-- packages/service-core/src/util/protocol-types.ts | 2 -- packages/service-core/src/util/utils.ts | 4 +++- packages/service-core/test/src/checksum_cache.test.ts | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 4b6faac9e..c8a3e12f9 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -1,5 +1,5 @@ -import { BucketChecksum, ChecksumMap, OpId } from '@/util/protocol-types.js'; -import { addBucketChecksums } from '@/util/utils.js'; +import { BucketChecksum, OpId } from '@/util/protocol-types.js'; +import { ChecksumMap, addBucketChecksums } from '@/util/utils.js'; import { LRUCache } from 'lru-cache/min'; import { OrderedSet } from '@js-sdsl/ordered-set'; diff --git a/packages/service-core/src/util/protocol-types.ts b/packages/service-core/src/util/protocol-types.ts index 695f2b55e..91073f102 100644 --- a/packages/service-core/src/util/protocol-types.ts +++ b/packages/service-core/src/util/protocol-types.ts @@ -186,8 +186,6 @@ export interface BucketChecksum { count: number; } -export type ChecksumMap = Map; - export function isContinueCheckpointRequest(request: SyncRequest): request is ContinueCheckpointRequest { return ( Array.isArray((request as ContinueCheckpointRequest).buckets) && diff --git a/packages/service-core/src/util/utils.ts b/packages/service-core/src/util/utils.ts index 3bc76ceee..847413623 100644 --- a/packages/service-core/src/util/utils.ts +++ b/packages/service-core/src/util/utils.ts @@ -4,9 +4,11 @@ import { pgwireRows } from '@powersync/service-jpgwire'; import * as micro from '@journeyapps-platform/micro'; import * as storage from '@/storage/storage-index.js'; -import { BucketChecksum, ChecksumMap, OpId } from './protocol-types.js'; +import { BucketChecksum, OpId } from './protocol-types.js'; import { retriedQuery } from './pgwire_utils.js'; +export type ChecksumMap = Map; + export function hashData(type: string, id: string, data: string): number { const hash = crypto.createHash('sha256'); hash.update(`put.${type}.${id}.${data}`); diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index 1da277b95..0d0e154fb 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -1,5 +1,5 @@ import { describe, expect, it } from 'vitest'; -import { BucketChecksum, ChecksumMap, OpId } from '@/util/protocol-types.js'; +import { BucketChecksum, OpId } from '@/util/protocol-types.js'; import * as crypto from 'node:crypto'; import { addBucketChecksums } from '@/util/util-index.js'; import { ChecksumCache, FetchChecksums, FetchPartialBucketChecksum } from '@/storage/ChecksumCache.js'; From 4a5778787abeff139908da564be736ce7ad37b26 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 10:55:58 +0200 Subject: [PATCH 12/15] Add changeset. --- .changeset/tiny-ads-try.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/tiny-ads-try.md diff --git a/.changeset/tiny-ads-try.md b/.changeset/tiny-ads-try.md new file mode 100644 index 000000000..0d5225fe3 --- /dev/null +++ b/.changeset/tiny-ads-try.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-core': patch +'powersync-open-service': patch +--- + +- Use a LRU cache for checksum computations, improving performance and reducing MongoDB database load. +- Return zero checksums to the client instead of omitting, to help with debugging sync issues. From c92edfaee5c38a025d698ea51a509f88d0820376 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 11:42:23 +0200 Subject: [PATCH 13/15] Filter bucket data requests. --- packages/service-core/src/sync/sync.ts | 48 +++++++++++++++----------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/packages/service-core/src/sync/sync.ts b/packages/service-core/src/sync/sync.ts index 7c284ee70..c8ed5a3b6 100644 --- a/packages/service-core/src/sync/sync.ts +++ b/packages/service-core/src/sync/sync.ts @@ -121,6 +121,8 @@ async function* streamResponseInner( const bucketList = [...dataBuckets.keys()]; const checksumMap = await storage.getChecksums(checkpoint, bucketList); + // Subset of buckets for which there may be new data in this batch. + let bucketsToFetch: string[]; if (lastChecksums) { const diff = util.checksumsDiff(lastChecksums, checksumMap); @@ -133,6 +135,7 @@ async function* streamResponseInner( // No changes - don't send anything to the client continue; } + bucketsToFetch = diff.updatedBuckets.map((c) => c.bucket); let message = `Updated checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} | `; @@ -154,6 +157,7 @@ async function* streamResponseInner( let message = `New checkpoint: ${checkpoint} | write: ${writeCheckpoint} | `; message += `buckets: ${allBuckets.length} ${limitedBuckets(allBuckets, 20)}`; micro.logger.info(message); + bucketsToFetch = allBuckets; const checksum_line: util.StreamingSyncCheckpoint = { checkpoint: { last_op_id: checkpoint, @@ -166,22 +170,28 @@ async function* streamResponseInner( lastChecksums = checksumMap; lastWriteCheckpoint = writeCheckpoint; - yield* bucketDataInBatches(storage, checkpoint, dataBuckets, raw_data, binary_data, signal); + // This incrementally updates dataBuckets with each individual bucket position. + // At the end of this, we can be sure that all buckets have data up to the checkpoint. + yield* bucketDataInBatches({ storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal }); await new Promise((resolve) => setTimeout(resolve, 10)); } } -async function* bucketDataInBatches( - storage: storage.SyncRulesBucketStorage, - checkpoint: string, - dataBuckets: Map, - raw_data: boolean | undefined, - binary_data: boolean | undefined, - signal: AbortSignal -) { +interface BucketDataRequest { + storage: storage.SyncRulesBucketStorage; + checkpoint: string; + bucketsToFetch: string[]; + /** Bucket data position, modified by the request. */ + dataBuckets: Map; + raw_data: boolean | undefined; + binary_data: boolean | undefined; + signal: AbortSignal; +} + +async function* bucketDataInBatches(request: BucketDataRequest) { let isDone = false; - while (!signal.aborted && !isDone) { + while (!request.signal.aborted && !isDone) { // The code below is functionally the same as this for-await loop below. // However, the for-await loop appears to have a memory leak, so we avoid it. // for await (const { done, data } of bucketDataBatch(storage, checkpoint, dataBuckets, raw_data, signal)) { @@ -191,7 +201,7 @@ async function* bucketDataInBatches( // } // break; // } - const iter = bucketDataBatch(storage, checkpoint, dataBuckets, raw_data, binary_data, signal); + const iter = bucketDataBatch(request); try { while (true) { const { value, done: iterDone } = await iter.next(); @@ -214,17 +224,15 @@ async function* bucketDataInBatches( /** * Extracted as a separate internal function just to avoid memory leaks. */ -async function* bucketDataBatch( - storage: storage.SyncRulesBucketStorage, - checkpoint: string, - dataBuckets: Map, - raw_data: boolean | undefined, - binary_data: boolean | undefined, - signal: AbortSignal -) { +async function* bucketDataBatch(request: BucketDataRequest) { + const { storage, checkpoint, bucketsToFetch, dataBuckets, raw_data, binary_data, signal } = request; + const [_, release] = await syncSemaphore.acquire(); try { - const data = storage.getBucketDataBatch(checkpoint, dataBuckets); + // Optimization: Only fetch buckets for which the checksums have changed since the last checkpoint + // For the first batch, this will be all buckets. + const filteredBuckets = new Map(bucketsToFetch.map((bucket) => [bucket, dataBuckets.get(bucket)!])); + const data = storage.getBucketDataBatch(checkpoint, filteredBuckets); let has_more = false; From 2b818ae9006c70dfbdb285e5f0053570d88d3284 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 12:39:53 +0200 Subject: [PATCH 14/15] Fix unhandled rejection on eviction errors. --- .../service-core/src/storage/ChecksumCache.ts | 23 +++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index c8a3e12f9..1703de2ad 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -141,10 +141,13 @@ export class ChecksumCache { checkpoint: BigInt(checkpoint) }; - // Individual cache fetch promises - let cacheFetchPromises: Promise[] = []; + // One promise to await to ensure all fetch requests completed. + let settledPromise: Promise[]> | null = null; try { + // Individual cache fetch promises + let cacheFetchPromises: Promise[] = []; + for (let bucket of buckets) { const cacheKey = makeCacheKey(checkpoint, bucket); let status: LRUCache.Status = {}; @@ -165,6 +168,9 @@ export class ChecksumCache { toFetch.add(bucket); } } + // We do this directly after creating the promises, otherwise + // we could end up with weird uncaught rejection errors. + settledPromise = Promise.allSettled(cacheFetchPromises); if (toFetch.size == 0) { // Nothing to fetch, but resolve in case @@ -251,12 +257,21 @@ export class ChecksumCache { rejectFetch(e); // Wait for the above rejection to propagate, otherwise we end up with "uncaught" errors. - await Promise.all(cacheFetchPromises).catch((_e) => {}); + // This promise never throws. + await settledPromise; throw e; } - await Promise.all(cacheFetchPromises); + // Wait for all cache fetch reqeusts to complete + const settledResults = (await settledPromise) ?? []; + // Check if any of them failed + for (let result of settledResults) { + if (result.status == 'rejected') { + throw result.reason; + } + } + if (finalResults.size != buckets.length) { // Should not happen throw new Error(`Bucket results mismatch: ${finalResults.size} != ${buckets.length}`); From db9e0517726eb1d0ce498978f988d11916a82a3f Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Thu, 13 Jun 2024 12:57:11 +0200 Subject: [PATCH 15/15] Avoid cache "evicted" error. This is relevant when there are more concurrent requests than the cache size. --- .../service-core/src/storage/ChecksumCache.ts | 6 +++- .../test/src/checksum_cache.test.ts | 36 +++++++++++++++++++ 2 files changed, 41 insertions(+), 1 deletion(-) diff --git a/packages/service-core/src/storage/ChecksumCache.ts b/packages/service-core/src/storage/ChecksumCache.ts index 1703de2ad..9db834cfa 100644 --- a/packages/service-core/src/storage/ChecksumCache.ts +++ b/packages/service-core/src/storage/ChecksumCache.ts @@ -89,7 +89,11 @@ export class ChecksumCache { } }, - noDisposeOnSet: true + noDisposeOnSet: true, + + // When we have more fetches than the cache size, complete the fetches instead + // of failing with Error('evicted'). + ignoreFetchAbort: true }); } diff --git a/packages/service-core/test/src/checksum_cache.test.ts b/packages/service-core/test/src/checksum_cache.test.ts index 0d0e154fb..2ce56fd19 100644 --- a/packages/service-core/test/src/checksum_cache.test.ts +++ b/packages/service-core/test/src/checksum_cache.test.ts @@ -397,4 +397,40 @@ describe('checksum cache', function () { [{ bucket: 'test', end: '123' }] ]); }); + + it('should handle concurrent requests greater than cache size', async function () { + // This will not be cached efficiently, but we test that we don't get errors at least. + let lookups: FetchPartialBucketChecksum[][] = []; + const cache = new ChecksumCache({ + fetchChecksums: async (batch) => { + lookups.push(batch); + return fetchTestChecksums(batch); + }, + maxSize: 2 + }); + + const p3 = cache.getChecksums('123', ['test3']); + const p4 = cache.getChecksums('123', ['test4']); + const p1 = cache.getChecksums('123', ['test']); + const p2 = cache.getChecksums('123', ['test2']); + + expect(await p1).toEqual([TEST_123]); + expect(await p2).toEqual([TEST2_123]); + expect(await p3).toEqual([TEST3_123]); + expect(await p4).toEqual([ + { + bucket: 'test4', + checksum: 1004797863, + count: 123 + } + ]); + + // The lookup should be deduplicated, even though it's in progress + expect(lookups).toEqual([ + [{ bucket: 'test3', end: '123' }], + [{ bucket: 'test4', end: '123' }], + [{ bucket: 'test', end: '123' }], + [{ bucket: 'test2', end: '123' }] + ]); + }); });