diff --git a/.changeset/angry-mice-hide.md b/.changeset/angry-mice-hide.md new file mode 100644 index 000000000..bd763a1b0 --- /dev/null +++ b/.changeset/angry-mice-hide.md @@ -0,0 +1,9 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-core': minor +'@powersync/service-image': minor +--- + +[MongoDB Storage] Compact action now also compacts parameter lookup storage. diff --git a/docs/parameters-lookups.md b/docs/parameters-lookups.md new file mode 100644 index 000000000..12477bfc8 --- /dev/null +++ b/docs/parameters-lookups.md @@ -0,0 +1,122 @@ +# Parameter Lookup Implementation + +Most of the other docs focus on bucket data, but parameter lookup data also contains some tricky bits. + +## Basic requirements + +The essence of what we do when syncing data is: + +1. Get the latest checkpoint. +2. Evaluate all parameter queries _at the state of the checkpoint_. +3. Return bucket data for the checkpoint. + +This doc focuses on point 2. + +## Current lookup implementation + +We effectively store an "index" for exact lookups on parameter query tables. + +The format is in MongoDB storage is: + + _id: OpId # auto-incrementing op-id, using the same sequence as checkpoints + key: {g: , t: , k: RowReplicationId } # uniquely identifies the source row + lookup: doc # lookup entry for this source row + bucket_parameters: data # results returned to the parameter query + +If one row evaluates to multiple lookups, those are each stored as a separate document with the same key. + +When a row is deleted, we empty `bucket_parameters` for the same (key, lookup) combinations. + +To query, we do: + +1. Filter by sync rules version: key.g. +2. Filter by lookup. +3. Filter by checkpoint: \_id <= checkpoint. +4. Return the last parameter data for each (key, lookup) combination (highest \_id) + +## Compacting + +In many cases, parameter query tables are updated infrequently, and compacting is not important. However, there are cases where parameter query tables are updated regularly in cron jobs (for example), and the resulting indefinite storage increase causes significant query overhead and other issues. + +To handle this, we compact older data. For each (key.g, key, lookup) combination, we only need to keep the last copy (highest \_id). And if the last one is a remove operation (empty parameter_data), we can remove it completely. + +One big consideration is sync clients may still need some of that data. To cover for this, parameter lookup queries should specifically use a _snapshot_ query mode, querying at the same snapshot that was used for the checkpoint lookup. This is different from the "Future Options: Snapshot queries" point above: We're not using a snapshot at the time the checkpoint was created, but rather a snapshot at the time the checkpoint was read. This means we always use a fresh snapshot. + +# Alternatives + +## Future option: Incremental compacting + +Right now, compacting scans through the entire collection to compact data. It should be possible to make this more incremental, only scanning through documents added since the last compact. + +## Future Option: Snapshot queries + +If we could do a snapshot query with a snapshot matching the checkpoint, the rest of the implementation could become quite simple. We could "just" replicate the latest copy of parameter tables, and run arbitrary parameter queries on them. + +Unforunately, running snapshot queries for specific checkpoints are not that simple. Tricky parts include associating a snapshot with a specific checkpoint, and snapshots typically expiring after a short duration. Nonetheless, this remains an option to consider in the future. + +To implement this with MongoDB: + +1. Every time we `commit()` in the replication process, store the current clusterTime (we can use `$$CLUSTER_TIME` for this). +2. When we query for data, use that clustertime. +3. _Make sure we commit at least once every 5 minutes_, ideally every minute. + +The last point means that replication issues could also turn into query issues: + +1. Replication process being down for 5 minutes means queries stop working. +2. Being more than 5 minutes behind in replication is not an issue, as long as we keep doing new commits. +3. Taking longer than 5 minutes to complete replication for a _single transaction_ will cause API failures. This includes operations such as adding or removing tables. + +In theory, we could take this even further to run query parameter queries directly on the _source_ database, without replicating. + +## Compacting - Implementation alternatives + +Instead of snapshot queries, some other alternatives are listed below. These are not used, just listed here in case we ever need to re-evaluate the implementation. + +### 1. Last active checkpoint + +Compute a "last active" checkpoint - a checkpoint that started being active at least 5 minutes ago, meaning that we can cleanup data only used for checkpoints older than that. + +The issues here are: + +1. We don't store older checkpoints, so it can be tricky to find an older checkpoint without waiting 5 minutes. +2. It is difficult to build in hard guarantees for parameter queries here, without relying on time-based heuristics. +3. Keep track of checkpoints used in the API service can be quite tricky. + +### 2. Merge / invalidate lookups + +Instead of deleting older parameter lookup records, we can merge them. + +Say we have two records with the same key and lookup, and \_id of A and B (A < B). The above approach would just delete A, if A < lastActiveCheckpoint. + +What we can do instead is merge into: + + _id: A + parameter_data: B.parameter_data + not_valid_before: B + +The key here is the `not_valid_before` field: When we query for parameter data, we filter by \_id as usual. But if `checkpoint < not_valid_before`, we need to discard that checkpoint. + +Now we still need to try to avoid merging recent parameter lookup records, otherwise we may keep on invalidating checkpoints as fast as we generate them. But this could function as a final safety check, +giving us proper consistency guarantees. + +This roughly matches the design of `target_op` in MOVE operations. + +This still does not cover deleted data: With this approach alone, we can never fully remove records after the source row was deleted, since we need that `not_valid_before` field. So this is not a complete solution. + +### 3. Globally invalidate checkpoints + +Another alternative is to globally invalidate checkpoints when compacting. So: + +1. We pick a `lastActiveCheckpoint`. +2. Persist `noCheckpointBefore: lastActiveCheckpoint` in the sync_rules collection. +3. At some point between doing the parameter lookups and sending a `checkpoint_complete` message, we lookup the `noCheckpointBefore` checkpoint, and invalidate the checkpoint if required. + +This allows us to cleanly delete older checkpoints, at the expense of needing to run another query. + +This could also replace the current logic we have for `target_op` in MOVE operations. + +To do the lookup very efficiently, we can apply some workarounds: + +1. For each parameter query (and data query?), store the clusterTime of the results. +2. Right before sending checkpointComplete, query for the noCheckpointBefore value, using `afterClusterTime`. +3. _We can cache those results_, re-using it for other clients. As long as the `afterClusterTime` condition is satisfied, we can use the cached value. diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoParameterCompactor.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoParameterCompactor.ts new file mode 100644 index 000000000..3b7f6add6 --- /dev/null +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoParameterCompactor.ts @@ -0,0 +1,105 @@ +import { logger } from '@powersync/lib-services-framework'; +import { bson, CompactOptions, InternalOpId } from '@powersync/service-core'; +import { LRUCache } from 'lru-cache'; +import { PowerSyncMongo } from './db.js'; +import { mongo } from '@powersync/lib-service-mongodb'; +import { BucketParameterDocument } from './models.js'; + +/** + * Compacts parameter lookup data (the bucket_parameters collection). + * + * This scans through the entire collection to find data to compact. + * + * For background, see the `/docs/parameters-lookups.md` file. + */ +export class MongoParameterCompactor { + constructor( + private db: PowerSyncMongo, + private group_id: number, + private checkpoint: InternalOpId, + private options: CompactOptions + ) {} + + async compact() { + logger.info(`Compacting parameters for group ${this.group_id} up to checkpoint ${this.checkpoint}`); + // This is the currently-active checkpoint. + // We do not remove any data that may be used by this checkpoint. + // snapshot queries ensure that if any clients are still using older checkpoints, they would + // not be affected by this compaction. + const checkpoint = this.checkpoint; + + // Index on {'key.g': 1, lookup: 1, _id: 1} + // In theory, we could let MongoDB do more of the work here, by grouping by (key, lookup) + // in MongoDB already. However, that risks running into cases where MongoDB needs to process + // very large amounts of data before returning results, which could lead to timeouts. + const cursor = this.db.bucket_parameters.find( + { + 'key.g': this.group_id + }, + { + sort: { lookup: 1, _id: 1 }, + batchSize: 10_000, + projection: { _id: 1, key: 1, lookup: 1, bucket_parameters: 1 } + } + ); + + // The index doesn't cover sorting by key, so we keep our own cache of the last seen key. + let lastByKey = new LRUCache({ + max: this.options.compactParameterCacheLimit ?? 10_000 + }); + let removeIds: InternalOpId[] = []; + let removeDeleted: mongo.AnyBulkWriteOperation[] = []; + + const flush = async (force: boolean) => { + if (removeIds.length >= 1000 || (force && removeIds.length > 0)) { + const results = await this.db.bucket_parameters.deleteMany({ _id: { $in: removeIds } }); + logger.info(`Removed ${results.deletedCount} (${removeIds.length}) superseded parameter entries`); + removeIds = []; + } + + if (removeDeleted.length > 10 || (force && removeDeleted.length > 0)) { + const results = await this.db.bucket_parameters.bulkWrite(removeDeleted); + logger.info(`Removed ${results.deletedCount} (${removeDeleted.length}) deleted parameter entries`); + removeDeleted = []; + } + }; + + while (await cursor.hasNext()) { + const batch = cursor.readBufferedDocuments(); + for (let doc of batch) { + if (doc._id >= checkpoint) { + continue; + } + const uniqueKey = ( + bson.serialize({ + k: doc.key, + l: doc.lookup + }) as Buffer + ).toString('base64'); + const previous = lastByKey.get(uniqueKey); + if (previous != null && previous < doc._id) { + // We have a newer entry for the same key, so we can remove the old one. + removeIds.push(previous); + } + lastByKey.set(uniqueKey, doc._id); + + if (doc.bucket_parameters?.length == 0) { + // This is a delete operation, so we can remove it completely. + // For this we cannot remove the operation itself only: There is a possibility that + // there is still an earlier operation with the same key and lookup, that we don't have + // in the cache due to cache size limits. So we need to explicitly remove all earlier operations. + removeDeleted.push({ + deleteMany: { + filter: { 'key.g': doc.key.g, lookup: doc.lookup, _id: { $lte: doc._id }, key: doc.key } + } + }); + } + } + + await flush(false); + } + + await flush(true); + logger.info('Parameter compaction completed'); + } +} diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index bd3166247..58e58b4dd 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -2,11 +2,9 @@ import * as lib_mongo from '@powersync/lib-service-mongodb'; import { mongo } from '@powersync/lib-service-mongodb'; import { BaseObserver, - ErrorCode, logger, ReplicationAbortedError, - ServiceAssertionError, - ServiceError + ServiceAssertionError } from '@powersync/lib-services-framework'; import { BroadcastIterable, @@ -30,18 +28,12 @@ import { LRUCache } from 'lru-cache'; import * as timers from 'timers/promises'; import { MongoBucketStorage } from '../MongoBucketStorage.js'; import { PowerSyncMongo } from './db.js'; -import { - BucketDataDocument, - BucketDataKey, - BucketStateDocument, - SourceKey, - SourceTableDocument, - SyncRuleCheckpointState -} from './models.js'; +import { BucketDataDocument, BucketDataKey, BucketStateDocument, SourceKey, SourceTableDocument } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; -import { idPrefixFilter, mapOpEntry, readSingleBatch } from './util.js'; +import { idPrefixFilter, mapOpEntry, readSingleBatch, setSessionSnapshotTime } from './util.js'; +import { MongoParameterCompactor } from './MongoParameterCompactor.js'; export class MongoSyncBucketStorage extends BaseObserver @@ -106,22 +98,44 @@ export class MongoSyncBucketStorage } async getCheckpoint(): Promise { - const doc = await this.db.sync_rules.findOne( - { _id: this.group_id }, - { - projection: { last_checkpoint: 1, last_checkpoint_lsn: 1, snapshot_done: 1 } + return (await this.getCheckpointInternal()) ?? new EmptyReplicationCheckpoint(); + } + + async getCheckpointInternal(): Promise { + return await this.db.client.withSession({ snapshot: true }, async (session) => { + const doc = await this.db.sync_rules.findOne( + { _id: this.group_id }, + { + session, + projection: { _id: 1, state: 1, last_checkpoint: 1, last_checkpoint_lsn: 1, snapshot_done: 1 } + } + ); + if (!doc?.snapshot_done || !['ACTIVE', 'ERRORED'].includes(doc.state)) { + // Sync rules not active - return null + return null; } - ); - if (!doc?.snapshot_done) { - return { - checkpoint: 0n, - lsn: null - }; - } - return { - checkpoint: doc?.last_checkpoint ?? 0n, - lsn: doc?.last_checkpoint_lsn ?? null - }; + + // Specifically using operationTime instead of clusterTime + // There are 3 fields in the response: + // 1. operationTime, not exposed for snapshot sessions (used for causal consistency) + // 2. clusterTime (used for connection management) + // 3. atClusterTime, which is session.snapshotTime + // We use atClusterTime, to match the driver's internal snapshot handling. + // There are cases where clusterTime > operationTime and atClusterTime, + // which could cause snapshot queries using this as the snapshotTime to timeout. + // This was specifically observed on MongoDB 6.0 and 7.0. + const snapshotTime = (session as any).snapshotTime as bson.Timestamp | undefined; + if (snapshotTime == null) { + throw new ServiceAssertionError('Missing snapshotTime in getCheckpoint()'); + } + return new MongoReplicationCheckpoint( + this, + // null/0n is a valid checkpoint in some cases, for example if the initial snapshot was empty + doc.last_checkpoint ?? 0n, + doc.last_checkpoint_lsn ?? null, + snapshotTime + ); + }); } async startBatch( @@ -262,38 +276,67 @@ export class MongoSyncBucketStorage return result!; } - async getParameterSets(checkpoint: utils.InternalOpId, lookups: ParameterLookup[]): Promise { - const lookupFilter = lookups.map((lookup) => { - return storage.serializeLookup(lookup); - }); - const rows = await this.db.bucket_parameters - .aggregate([ - { - $match: { - 'key.g': this.group_id, - lookup: { $in: lookupFilter }, - _id: { $lte: checkpoint } - } - }, - { - $sort: { - _id: -1 - } - }, - { - $group: { - _id: { key: '$key', lookup: '$lookup' }, - bucket_parameters: { - $first: '$bucket_parameters' + async getParameterSets(checkpoint: MongoReplicationCheckpoint, lookups: ParameterLookup[]): Promise { + return this.db.client.withSession({ snapshot: true }, async (session) => { + // Set the session's snapshot time to the checkpoint's snapshot time. + // An alternative would be to create the session when the checkpoint is created, but managing + // the session lifetime would become more complex. + // Starting and ending sessions are cheap (synchronous when no transactions are used), + // so this should be fine. + // This is a roundabout way of setting {readConcern: {atClusterTime: clusterTime}}, since + // that is not exposed directly by the driver. + // Future versions of the driver may change the snapshotTime behavior, so we need tests to + // validate that this works as expected. We test this in the compacting tests. + setSessionSnapshotTime(session, checkpoint.snapshotTime); + const lookupFilter = lookups.map((lookup) => { + return storage.serializeLookup(lookup); + }); + // This query does not use indexes super efficiently, apart from the lookup filter. + // From some experimentation I could do individual lookups more efficient using an index + // on {'key.g': 1, lookup: 1, 'key.t': 1, 'key.k': 1, _id: -1}, + // but could not do the same using $group. + // For now, just rely on compacting to remove extraneous data. + // For a description of the data format, see the `/docs/parameters-lookups.md` file. + const rows = await this.db.bucket_parameters + .aggregate( + [ + { + $match: { + 'key.g': this.group_id, + lookup: { $in: lookupFilter }, + _id: { $lte: checkpoint.checkpoint } + } + }, + { + $sort: { + _id: -1 + } + }, + { + $group: { + _id: { key: '$key', lookup: '$lookup' }, + bucket_parameters: { + $first: '$bucket_parameters' + } + } } + ], + { + session, + readConcern: 'snapshot', + // Limit the time for the operation to complete, to avoid getting connection timeouts + maxTimeMS: lib_mongo.db.MONGO_OPERATION_TIMEOUT_MS } - } - ]) - .toArray(); - const groupedParameters = rows.map((row) => { - return row.bucket_parameters; + ) + .toArray() + .catch((e) => { + throw lib_mongo.mapQueryError(e, 'while evaluating parameter queries'); + }); + const groupedParameters = rows.map((row) => { + return row.bucket_parameters; + }); + return groupedParameters.flat(); }); - return groupedParameters.flat(); } async *getBucketDataBatch( @@ -658,14 +701,11 @@ export class MongoSyncBucketStorage } async compact(options?: storage.CompactOptions) { - return new MongoCompactor(this.db, this.group_id, options).compact(); - } - - private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null) { - return { - checkpoint: doc?.last_checkpoint ?? 0n, - lsn: doc?.last_checkpoint_lsn ?? null - }; + const checkpoint = await this.getCheckpointInternal(); + await new MongoCompactor(this.db, this.group_id, options).compact(); + if (checkpoint != null && options?.compactParameterData) { + await new MongoParameterCompactor(this.db, this.group_id, checkpoint.checkpoint, options).compact(); + } } /** @@ -687,33 +727,13 @@ export class MongoSyncBucketStorage break; } - const doc = await this.db.sync_rules.findOne( - { - _id: this.group_id, - state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } - }, - { - limit: 1, - projection: { - _id: 1, - state: 1, - last_checkpoint: 1, - last_checkpoint_lsn: 1 - } - } - ); - - if (doc == null) { - // Sync rules not present or not active. - // Abort the connections - clients will have to retry later. - throw new ServiceError(ErrorCode.PSYNC_S2302, 'No active sync rules available'); - } else if (doc.state != storage.SyncRuleState.ACTIVE && doc.state != storage.SyncRuleState.ERRORED) { + const op = await this.getCheckpointInternal(); + if (op == null) { // Sync rules have changed - abort and restart. // We do a soft close of the stream here - no error break; } - const op = this.makeActiveCheckpoint(doc); // Check for LSN / checkpoint changes - ignore other metadata changes if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) { lastOp = op; @@ -980,3 +1000,25 @@ interface InternalCheckpointChanges extends CheckpointChanges { updatedWriteCheckpoints: Map; invalidateWriteCheckpoints: boolean; } + +class MongoReplicationCheckpoint implements ReplicationCheckpoint { + constructor( + private storage: MongoSyncBucketStorage, + public readonly checkpoint: InternalOpId, + public readonly lsn: string | null, + public snapshotTime: mongo.Timestamp + ) {} + + async getParameterSets(lookups: ParameterLookup[]): Promise { + return this.storage.getParameterSets(this, lookups); + } +} + +class EmptyReplicationCheckpoint implements ReplicationCheckpoint { + readonly checkpoint: InternalOpId = 0n; + readonly lsn: string | null = null; + + async getParameterSets(lookups: ParameterLookup[]): Promise { + return []; + } +} diff --git a/modules/module-mongodb-storage/src/storage/implementation/util.ts b/modules/module-mongodb-storage/src/storage/implementation/util.ts index 84e94466b..d6a4f489e 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/util.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/util.ts @@ -7,6 +7,7 @@ import { storage, utils } from '@powersync/service-core'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument } from './models.js'; +import { ServiceAssertionError } from '@powersync/lib-services-framework'; export function idPrefixFilter(prefix: Partial, rest: (keyof T)[]): mongo.Condition { let filter = { @@ -117,3 +118,15 @@ export const connectMongoForTests = (url: string, isCI: boolean) => { }); return new PowerSyncMongo(client); }; + +export function setSessionSnapshotTime(session: mongo.ClientSession, time: bson.Timestamp) { + // This is a workaround for the lack of direct support for snapshot reads in the MongoDB driver. + if (!session.snapshotEnabled) { + throw new ServiceAssertionError(`Session must be a snapshot session`); + } + if ((session as any).snapshotTime == null) { + (session as any).snapshotTime = time; + } else { + throw new ServiceAssertionError(`Session snapshotTime is already set`); + } +} diff --git a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts index 285b7232c..a02a10811 100644 --- a/modules/module-mongodb-storage/test/src/storage_compacting.test.ts +++ b/modules/module-mongodb-storage/test/src/storage_compacting.test.ts @@ -3,3 +3,5 @@ import { describe } from 'vitest'; import { INITIALIZED_MONGO_STORAGE_FACTORY } from './util.js'; describe('Mongo Sync Bucket Storage Compact', () => register.registerCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY)); +describe('Mongo Sync Parameter Storage Compact', () => + register.registerParameterCompactTests(INITIALIZED_MONGO_STORAGE_FACTORY)); diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index c7176fd93..21d8d448d 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -8,6 +8,7 @@ import { internalToExternalOpId, LastValueSink, maxLsn, + ReplicationCheckpoint, storage, utils, WatchWriteCheckpointOptions @@ -136,10 +137,11 @@ export class PostgresSyncRulesStorage .decoded(pick(models.SyncRules, ['last_checkpoint', 'last_checkpoint_lsn'])) .first(); - return { - checkpoint: checkpointRow?.last_checkpoint ?? 0n, - lsn: checkpointRow?.last_checkpoint_lsn ?? null - }; + return new PostgresReplicationCheckpoint( + this, + checkpointRow?.last_checkpoint ?? 0n, + checkpointRow?.last_checkpoint_lsn ?? null + ); } async resolveTable(options: storage.ResolveTableOptions): Promise { @@ -351,7 +353,7 @@ export class PostgresSyncRulesStorage } async getParameterSets( - checkpoint: utils.InternalOpId, + checkpoint: ReplicationCheckpoint, lookups: sync_rules.ParameterLookup[] ): Promise { const rows = await this.db.sql` @@ -374,7 +376,7 @@ export class PostgresSyncRulesStorage value: lookups.map((l) => storage.serializeLookupBuffer(l).toString('hex')) }}) AS FILTER ) - AND id <= ${{ type: 'int8', value: checkpoint }} + AND id <= ${{ type: 'int8', value: checkpoint.checkpoint }} ORDER BY lookup, source_table, @@ -834,9 +836,18 @@ export class PostgresSyncRulesStorage } private makeActiveCheckpoint(row: models.ActiveCheckpointDecoded | null) { - return { - checkpoint: row?.last_checkpoint ?? 0n, - lsn: row?.last_checkpoint_lsn ?? null - } satisfies storage.ReplicationCheckpoint; + return new PostgresReplicationCheckpoint(this, row?.last_checkpoint ?? 0n, row?.last_checkpoint_lsn ?? null); + } +} + +class PostgresReplicationCheckpoint implements storage.ReplicationCheckpoint { + constructor( + private storage: PostgresSyncRulesStorage, + public readonly checkpoint: utils.InternalOpId, + public readonly lsn: string | null + ) {} + + getParameterSets(lookups: sync_rules.ParameterLookup[]): Promise { + return this.storage.getParameterSets(this, lookups); } } diff --git a/packages/service-core-tests/src/tests/register-compacting-tests.ts b/packages/service-core-tests/src/tests/register-compacting-tests.ts index 31985f477..c4d975db6 100644 --- a/packages/service-core-tests/src/tests/register-compacting-tests.ts +++ b/packages/service-core-tests/src/tests/register-compacting-tests.ts @@ -4,17 +4,6 @@ import * as test_utils from '../test-utils/test-utils-index.js'; const TEST_TABLE = test_utils.makeTestTable('test', ['id']); -/** - * @example - * ```TypeScript - * // Test with the default options - large batch sizes - * describe('compacting buckets - default options', () => registerCompactTests(() => new MongoStorageFactory(), {})); - * - * // Also test with the miniumum batch sizes, forcing usage of multiple batches internally - * describe('compacting buckets - batched', () => - * compactTests(() => new MongoStorageFactory(), { clearBatchLimit: 2, moveBatchLimit: 1, moveBatchQueryLimit: 1 })); - * ``` - */ export function registerCompactTests(generateStorageFactory: storage.TestStorageFactory) { test('compacting (1)', async () => { const sync_rules = test_utils.testRules(` diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index 57e56db8c..3eec6dab2 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -34,18 +34,19 @@ const normalizeOplogData = (data: OplogEntry['data']) => { */ export function registerDataStorageTests(generateStorageFactory: storage.TestStorageFactory) { test('save and load parameters', async () => { - const sync_rules = test_utils.testRules(` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: mybucket: parameters: - SELECT group_id FROM test WHERE id1 = token_parameters.user_id OR id2 = token_parameters.user_id - data: [] - `); - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + data: [] + ` + }); + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: storage.SaveOperationTag.INSERT, @@ -69,11 +70,12 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('t1') }); + + await batch.commit('1/1'); }); - const parameters = await bucketStorage.getParameterSets(result!.flushed_op, [ - ParameterLookup.normalized('mybucket', '1', ['user1']) - ]); + const checkpoint = await bucketStorage.getCheckpoint(); + const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); expect(parameters).toEqual([ { group_id: 'group1a' @@ -82,20 +84,19 @@ bucket_definitions: }); test('it should use the latest version', async () => { - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: mybucket: parameters: - SELECT group_id FROM test WHERE id = token_parameters.user_id - data: [] + data: [] ` - ); - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); - const result1 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: storage.SaveOperationTag.INSERT, @@ -105,8 +106,10 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('user1') }); + await batch.commit('1/1'); }); - const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + const checkpoint1 = await bucketStorage.getCheckpoint(); + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: storage.SaveOperationTag.INSERT, @@ -116,11 +119,11 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('user1') }); + await batch.commit('1/2'); }); + const checkpoint2 = await bucketStorage.getCheckpoint(); - const parameters = await bucketStorage.getParameterSets(result2!.flushed_op, [ - ParameterLookup.normalized('mybucket', '1', ['user1']) - ]); + const parameters = await checkpoint2.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); expect(parameters).toEqual([ { group_id: 'group2' @@ -128,9 +131,7 @@ bucket_definitions: ]); // Use the checkpoint to get older data if relevant - const parameters2 = await bucketStorage.getParameterSets(result1!.flushed_op, [ - ParameterLookup.normalized('mybucket', '1', ['user1']) - ]); + const parameters2 = await checkpoint1.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); expect(parameters2).toEqual([ { group_id: 'group1' @@ -139,20 +140,19 @@ bucket_definitions: }); test('it should use the latest version after updates', async () => { - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: mybucket: parameters: - SELECT id AS todo_id FROM todos WHERE list_id IN token_parameters.list_id - data: [] + data: [] ` - ); - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); const table = test_utils.makeTestTable('todos', ['id', 'list_id']); @@ -176,9 +176,11 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('todo2') }); + + await batch.commit('1/1'); }); - const result2 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // Update the second todo item to now belong to list 1 await batch.save({ sourceTable: table, @@ -189,12 +191,15 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('todo2') }); + + await batch.commit('1/1'); }); // We specifically request the todo_ids for both lists. // There removal operation for the association of `list2`::`todo2` should not interfere with the new // association of `list1`::`todo2` - const parameters = await bucketStorage.getParameterSets(result2!.flushed_op, [ + const checkpoint = await bucketStorage.getCheckpoint(); + const parameters = await checkpoint.getParameterSets([ ParameterLookup.normalized('mybucket', '1', ['list1']), ParameterLookup.normalized('mybucket', '1', ['list2']) ]); @@ -210,20 +215,19 @@ bucket_definitions: }); test('save and load parameters with different number types', async () => { - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: mybucket: parameters: - SELECT group_id FROM test WHERE n1 = token_parameters.n1 and f2 = token_parameters.f2 and f3 = token_parameters.f3 data: [] ` - ); - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: storage.SaveOperationTag.INSERT, @@ -236,21 +240,23 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('t1') }); + + await batch.commit('1/1'); }); const TEST_PARAMS = { group_id: 'group1' }; - const checkpoint = result!.flushed_op; + const checkpoint = await bucketStorage.getCheckpoint(); - const parameters1 = await bucketStorage.getParameterSets(checkpoint, [ + const parameters1 = await checkpoint.getParameterSets([ ParameterLookup.normalized('mybucket', '1', [314n, 314, 3.14]) ]); expect(parameters1).toEqual([TEST_PARAMS]); - const parameters2 = await bucketStorage.getParameterSets(checkpoint, [ + const parameters2 = await checkpoint.getParameterSets([ ParameterLookup.normalized('mybucket', '1', [314, 314n, 3.14]) ]); expect(parameters2).toEqual([TEST_PARAMS]); - const parameters3 = await bucketStorage.getParameterSets(checkpoint, [ + const parameters3 = await checkpoint.getParameterSets([ ParameterLookup.normalized('mybucket', '1', [314n, 314, 3]) ]); expect(parameters3).toEqual([]); @@ -261,20 +267,19 @@ bucket_definitions: // This specific case tested here cannot happen with postgres in practice, but we still // test this to ensure correct deserialization. - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: mybucket: parameters: - SELECT group_id FROM test WHERE n1 = token_parameters.n1 data: [] ` - ); - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: TEST_TABLE, tag: storage.SaveOperationTag.INSERT, @@ -298,31 +303,33 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('t1') }); + + await batch.commit('1/1'); }); const TEST_PARAMS = { group_id: 'group1' }; - const checkpoint = result!.flushed_op; + const checkpoint = await bucketStorage.getCheckpoint(); - const parameters1 = await bucketStorage.getParameterSets(checkpoint, [ + const parameters1 = await checkpoint.getParameterSets([ ParameterLookup.normalized('mybucket', '1', [1152921504606846976n]) ]); expect(parameters1).toEqual([TEST_PARAMS]); }); test('removing row', async () => { - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: - SELECT id, description FROM "%" ` - ); - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ @@ -339,9 +346,10 @@ bucket_definitions: tag: storage.SaveOperationTag.DELETE, beforeReplicaId: test_utils.rid('test1') }); + await batch.commit('1/1'); }); - const checkpoint = result!.flushed_op; + const { checkpoint } = await bucketStorage.getCheckpoint(); const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); const data = batch[0].chunkData.data.map((d) => { @@ -373,8 +381,9 @@ bucket_definitions: test('save and load parameters with workspaceId', async () => { const WORKSPACE_TABLE = test_utils.makeTestTable('workspace', ['id']); - const sync_rules_content = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: by_workspace: parameters: @@ -382,13 +391,11 @@ bucket_definitions: workspace."userId" = token_parameters.user_id data: [] ` - ); - const sync_rules = sync_rules_content.parsed(test_utils.PARSE_OPTIONS).sync_rules; - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules_content); + }); + const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).sync_rules; + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: WORKSPACE_TABLE, tag: storage.SaveOperationTag.INSERT, @@ -398,9 +405,9 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('workspace1') }); + await batch.commit('1/1'); }); - - const checkpoint = result!.flushed_op; + const checkpoint = await bucketStorage.getCheckpoint(); const parameters = new RequestParameters({ sub: 'u1' }, {}); @@ -409,12 +416,12 @@ bucket_definitions: const lookups = q1.getLookups(parameters); expect(lookups).toEqual([ParameterLookup.normalized('by_workspace', '1', ['u1'])]); - const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups); + const parameter_sets = await checkpoint.getParameterSets(lookups); expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }]); const buckets = await sync_rules.getBucketParameterQuerier(parameters).queryDynamicBucketDescriptions({ getParameterSets(lookups) { - return bucketStorage.getParameterSets(checkpoint, lookups); + return checkpoint.getParameterSets(lookups); } }); expect(buckets).toEqual([{ bucket: 'by_workspace["workspace1"]', priority: 3 }]); @@ -423,8 +430,9 @@ bucket_definitions: test('save and load parameters with dynamic global buckets', async () => { const WORKSPACE_TABLE = test_utils.makeTestTable('workspace'); - const sync_rules_content = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: by_public_workspace: parameters: @@ -432,13 +440,11 @@ bucket_definitions: workspace.visibility = 'public' data: [] ` - ); - const sync_rules = sync_rules_content.parsed(test_utils.PARSE_OPTIONS).sync_rules; - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules_content); + }); + const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).sync_rules; + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: WORKSPACE_TABLE, tag: storage.SaveOperationTag.INSERT, @@ -468,9 +474,11 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('workspace3') }); + + await batch.commit('1/1'); }); - const checkpoint = result!.flushed_op; + const checkpoint = await bucketStorage.getCheckpoint(); const parameters = new RequestParameters({ sub: 'unknown' }, {}); @@ -479,13 +487,13 @@ bucket_definitions: const lookups = q1.getLookups(parameters); expect(lookups).toEqual([ParameterLookup.normalized('by_public_workspace', '1', [])]); - const parameter_sets = await bucketStorage.getParameterSets(checkpoint, lookups); + const parameter_sets = await checkpoint.getParameterSets(lookups); parameter_sets.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); expect(parameter_sets).toEqual([{ workspace_id: 'workspace1' }, { workspace_id: 'workspace3' }]); const buckets = await sync_rules.getBucketParameterQuerier(parameters).queryDynamicBucketDescriptions({ getParameterSets(lookups) { - return bucketStorage.getParameterSets(checkpoint, lookups); + return checkpoint.getParameterSets(lookups); } }); buckets.sort((a, b) => a.bucket.localeCompare(b.bucket)); @@ -498,8 +506,9 @@ bucket_definitions: test('multiple parameter queries', async () => { const WORKSPACE_TABLE = test_utils.makeTestTable('workspace'); - const sync_rules_content = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: by_workspace: parameters: @@ -509,13 +518,11 @@ bucket_definitions: workspace.user_id = token_parameters.user_id data: [] ` - ); - const sync_rules = sync_rules_content.parsed(test_utils.PARSE_OPTIONS).sync_rules; - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules_content); + }); + const sync_rules = syncRules.parsed(test_utils.PARSE_OPTIONS).sync_rules; + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable: WORKSPACE_TABLE, tag: storage.SaveOperationTag.INSERT, @@ -557,9 +564,11 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('workspace4') }); + + await batch.commit('1/1'); }); - const checkpoint = result!.flushed_op; + const checkpoint = await bucketStorage.getCheckpoint(); const parameters = new RequestParameters({ sub: 'u1' }, {}); @@ -568,7 +577,7 @@ bucket_definitions: const lookups1 = q1.getLookups(parameters); expect(lookups1).toEqual([ParameterLookup.normalized('by_workspace', '1', [])]); - const parameter_sets1 = await bucketStorage.getParameterSets(checkpoint, lookups1); + const parameter_sets1 = await checkpoint.getParameterSets(lookups1); parameter_sets1.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); expect(parameter_sets1).toEqual([{ workspace_id: 'workspace1' }]); @@ -576,7 +585,7 @@ bucket_definitions: const lookups2 = q2.getLookups(parameters); expect(lookups2).toEqual([ParameterLookup.normalized('by_workspace', '2', ['u1'])]); - const parameter_sets2 = await bucketStorage.getParameterSets(checkpoint, lookups2); + const parameter_sets2 = await checkpoint.getParameterSets(lookups2); parameter_sets2.sort((a, b) => JSON.stringify(a).localeCompare(JSON.stringify(b))); expect(parameter_sets2).toEqual([{ workspace_id: 'workspace3' }]); @@ -584,7 +593,7 @@ bucket_definitions: const buckets = ( await sync_rules.getBucketParameterQuerier(parameters).queryDynamicBucketDescriptions({ getParameterSets(lookups) { - return bucketStorage.getParameterSets(checkpoint, lookups); + return checkpoint.getParameterSets(lookups); } }) ).map((e) => e.bucket); @@ -593,20 +602,19 @@ bucket_definitions: }); test('changing client ids', async () => { - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: - SELECT client_id as id, description FROM "%" ` - ); - await using factory = await generateStorageFactory(); - - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); const sourceTable = TEST_TABLE; - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ sourceTable, tag: storage.SaveOperationTag.INSERT, @@ -638,8 +646,10 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('test2') }); + + await batch.commit('1/1'); }); - const checkpoint = result!.flushed_op; + const { checkpoint } = await bucketStorage.getCheckpoint(); const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); const data = batch[0].chunkData.data.map((d) => { return { @@ -657,16 +667,16 @@ bucket_definitions: }); test('re-apply delete', async () => { - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: - SELECT id, description FROM "%" ` - ); - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; @@ -690,9 +700,11 @@ bucket_definitions: tag: storage.SaveOperationTag.DELETE, beforeReplicaId: test_utils.rid('test1') }); + + await batch.commit('1/1'); }); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ @@ -702,7 +714,7 @@ bucket_definitions: }); }); - const checkpoint = result!.flushed_op; + const { checkpoint } = await bucketStorage.getCheckpoint(); const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); const data = batch[0].chunkData.data.map((d) => { @@ -732,16 +744,16 @@ bucket_definitions: }); test('re-apply update + delete', async () => { - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: - SELECT id, description FROM "%" ` - ); - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; @@ -785,9 +797,11 @@ bucket_definitions: tag: storage.SaveOperationTag.DELETE, beforeReplicaId: test_utils.rid('test1') }); + + await batch.commit('1/1'); }); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; await batch.save({ @@ -815,9 +829,11 @@ bucket_definitions: tag: storage.SaveOperationTag.DELETE, beforeReplicaId: test_utils.rid('test1') }); + + await batch.commit('2/1'); }); - const checkpoint = result!.flushed_op; + const { checkpoint } = await bucketStorage.getCheckpoint(); const batch = await test_utils.fromAsync(bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]))); @@ -850,18 +866,17 @@ bucket_definitions: }); test('truncate parameters', async () => { - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: mybucket: parameters: - SELECT group_id FROM test WHERE id1 = token_parameters.user_id OR id2 = token_parameters.user_id data: [] ` - ); - - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -879,11 +894,9 @@ bucket_definitions: await batch.truncate([TEST_TABLE]); }); - const { checkpoint } = await bucketStorage.getCheckpoint(); + const checkpoint = await bucketStorage.getCheckpoint(); - const parameters = await bucketStorage.getParameterSets(checkpoint, [ - ParameterLookup.normalized('mybucket', '1', ['user1']) - ]); + const parameters = await checkpoint.getParameterSets([ParameterLookup.normalized('mybucket', '1', ['user1'])]); expect(parameters).toEqual([]); }); @@ -896,16 +909,16 @@ bucket_definitions: // 1. Not getting the correct "current_data" state for each operation. // 2. Output order not being correct. - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: - SELECT id, description FROM "test" ` - ); - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); // Pre-setup const result1 = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { @@ -1048,14 +1061,6 @@ bucket_definitions: }); test('changed data with replica identity full', async () => { - const sync_rules = test_utils.testRules( - ` -bucket_definitions: - global: - data: - - SELECT id, description FROM "test" -` - ); function rid2(id: string, description: string) { return getUuidReplicaIdentityBson({ id, description }, [ { name: 'id', type: 'VARCHAR', typeId: 25 }, @@ -1063,7 +1068,15 @@ bucket_definitions: ]); } await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test" +` + }); + const bucketStorage = factory.getInstance(syncRules); const sourceTable = test_utils.makeTestTable('test', ['id', 'description']); @@ -1155,14 +1168,6 @@ bucket_definitions: }); test('unchanged data with replica identity full', async () => { - const sync_rules = test_utils.testRules( - ` -bucket_definitions: - global: - data: - - SELECT id, description FROM "test" -` - ); function rid2(id: string, description: string) { return getUuidReplicaIdentityBson({ id, description }, [ { name: 'id', type: 'VARCHAR', typeId: 25 }, @@ -1171,7 +1176,15 @@ bucket_definitions: } await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test" +` + }); + const bucketStorage = factory.getInstance(syncRules); const sourceTable = test_utils.makeTestTable('test', ['id', 'description']); @@ -1260,18 +1273,18 @@ bucket_definitions: // but large enough in size to be split over multiple returned batches. // The specific batch splits is an implementation detail of the storage driver, // and the test will have to updated when other implementations are added. - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: - SELECT id, description FROM "%" ` - ); - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; const largeDescription = '0123456789'.repeat(12_000_00); @@ -1316,9 +1329,11 @@ bucket_definitions: }, afterReplicaId: test_utils.rid('test3') }); + + await batch.commit('1/1'); }); - const checkpoint = result!.flushed_op; + const { checkpoint } = await bucketStorage.getCheckpoint(); const options: storage.BucketDataBatchOptions = { chunkLimitBytes: 16 * 1024 * 1024 @@ -1367,18 +1382,18 @@ bucket_definitions: test('long batch', async () => { // Test syncing a batch of data that is limited by count. - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: - SELECT id, description FROM "%" ` - ); - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; for (let i = 1; i <= 6; i++) { @@ -1392,9 +1407,11 @@ bucket_definitions: afterReplicaId: `test${i}` }); } + + await batch.commit('1/1'); }); - const checkpoint = result!.flushed_op; + const { checkpoint } = await bucketStorage.getCheckpoint(); const batch1 = await test_utils.oneFromAsync( bucketStorage.getBucketDataBatch(checkpoint, new Map([['global[]', 0n]]), { limit: 4 }) @@ -1441,8 +1458,9 @@ bucket_definitions: describe('batch has_more', () => { const setup = async (options: BucketDataBatchOptions) => { - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global1: data: @@ -1451,11 +1469,10 @@ bucket_definitions: data: - SELECT id, description FROM test WHERE bucket = 'global2' ` - ); - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); - const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { const sourceTable = TEST_TABLE; for (let i = 1; i <= 10; i++) { @@ -1470,9 +1487,11 @@ bucket_definitions: afterReplicaId: `test${i}` }); } + + await batch.commit('1/1'); }); - const checkpoint = result!.flushed_op; + const { checkpoint } = await bucketStorage.getCheckpoint(); return await test_utils.fromAsync( bucketStorage.getBucketDataBatch( checkpoint, @@ -1612,8 +1631,9 @@ bucket_definitions: }); test('invalidate cached parsed sync rules', async () => { - const sync_rules_content = test_utils.testRules( - ` + await using bucketStorageFactory = await generateStorageFactory(); + const syncRules = await bucketStorageFactory.updateSyncRules({ + content: ` bucket_definitions: by_workspace: parameters: @@ -1621,10 +1641,8 @@ bucket_definitions: workspace."userId" = token_parameters.user_id data: [] ` - ); - - await using bucketStorageFactory = await generateStorageFactory(); - const syncBucketStorage = bucketStorageFactory.getInstance(sync_rules_content); + }); + const syncBucketStorage = bucketStorageFactory.getInstance(syncRules); const parsedSchema1 = syncBucketStorage.getParsedSyncRules({ defaultSchema: 'public' @@ -1653,7 +1671,7 @@ bucket_definitions: content: ` bucket_definitions: mybucket: - data: [] + data: [] `, validate: false }); @@ -1913,17 +1931,17 @@ bucket_definitions: // Test syncing a batch of data that is small in count, // but large enough in size to be split over multiple returned chunks. // Similar to the above test, but splits over 1MB chunks. - const sync_rules = test_utils.testRules( - ` + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` bucket_definitions: global: data: - SELECT id FROM test - SELECT id FROM test_ignore WHERE false ` - ); - await using factory = await generateStorageFactory(); - const bucketStorage = factory.getInstance(sync_rules); + }); + const bucketStorage = factory.getInstance(syncRules); const sourceTable = test_utils.makeTestTable('test', ['id']); const sourceTableIgnore = test_utils.makeTestTable('test_ignore', ['id']); diff --git a/packages/service-core-tests/src/tests/register-parameter-compacting-tests.ts b/packages/service-core-tests/src/tests/register-parameter-compacting-tests.ts new file mode 100644 index 000000000..59499fa02 --- /dev/null +++ b/packages/service-core-tests/src/tests/register-parameter-compacting-tests.ts @@ -0,0 +1,172 @@ +import { storage } from '@powersync/service-core'; +import { ParameterLookup } from '@powersync/service-sync-rules'; +import { expect, test } from 'vitest'; +import * as test_utils from '../test-utils/test-utils-index.js'; + +const TEST_TABLE = test_utils.makeTestTable('test', ['id']); + +export function registerParameterCompactTests(generateStorageFactory: storage.TestStorageFactory) { + test('compacting parameters', async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + test: + parameters: select id from test where id = request.user_id() + data: [] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1' + }, + afterReplicaId: 't1' + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't2' + }, + afterReplicaId: 't2' + }); + + await batch.commit('1/1'); + }); + + const lookup = ParameterLookup.normalized('test', '1', ['t1']); + + const checkpoint1 = await bucketStorage.getCheckpoint(); + const parameters1 = await checkpoint1.getParameterSets([lookup]); + expect(parameters1).toEqual([{ id: 't1' }]); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + before: { + id: 't1' + }, + beforeReplicaId: 't1', + after: { + id: 't1' + }, + afterReplicaId: 't1' + }); + + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.DELETE, + before: { + id: 't1' + }, + beforeReplicaId: 't1' + }); + await batch.commit('1/2'); + }); + const checkpoint2 = await bucketStorage.getCheckpoint(); + const parameters2 = await checkpoint2.getParameterSets([lookup]); + expect(parameters2).toEqual([]); + + const statsBefore = await bucketStorage.factory.getStorageMetrics(); + await bucketStorage.compact({ compactParameterData: true }); + + // Check consistency + const parameters1b = await checkpoint1.getParameterSets([lookup]); + const parameters2b = await checkpoint2.getParameterSets([lookup]); + expect(parameters1b).toEqual([{ id: 't1' }]); + expect(parameters2b).toEqual([]); + + // Check storage size + const statsAfter = await bucketStorage.factory.getStorageMetrics(); + expect(statsAfter.parameters_size_bytes).toBeLessThan(statsBefore.parameters_size_bytes); + }); + + for (let cacheLimit of [1, 10]) { + test(`compacting deleted parameters with cache size ${cacheLimit}`, async () => { + await using factory = await generateStorageFactory(); + const syncRules = await factory.updateSyncRules({ + content: ` +bucket_definitions: + test: + parameters: select id from test where uid = request.user_id() + data: [] + ` + }); + const bucketStorage = factory.getInstance(syncRules); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't1', + uid: 'u1' + }, + afterReplicaId: 't1' + }); + // Interleave with another operation, to evict the other cache entry when compacting. + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.INSERT, + after: { + id: 't2', + uid: 'u1' + }, + afterReplicaId: 't2' + }); + + await batch.commit('1/1'); + }); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.DELETE, + before: { + id: 't1', + uid: 'u1' + }, + beforeReplicaId: 't1' + }); + await batch.commit('2/1'); + }); + + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.save({ + sourceTable: TEST_TABLE, + tag: storage.SaveOperationTag.UPDATE, + after: { + id: 't2', + uid: 'u2' + }, + afterReplicaId: 't2' + }); + await batch.commit('3/1'); + }); + + const lookup = ParameterLookup.normalized('test', '1', ['u1']); + + const checkpoint1 = await bucketStorage.getCheckpoint(); + const parameters1 = await checkpoint1.getParameterSets([lookup]); + expect(parameters1).toEqual([]); + + const statsBefore = await bucketStorage.factory.getStorageMetrics(); + await bucketStorage.compact({ compactParameterData: true, compactParameterCacheLimit: cacheLimit }); + + // Check consistency + const parameters1b = await checkpoint1.getParameterSets([lookup]); + expect(parameters1b).toEqual([]); + + // Check storage size + const statsAfter = await bucketStorage.factory.getStorageMetrics(); + expect(statsAfter.parameters_size_bytes).toBeLessThan(statsBefore.parameters_size_bytes); + }); + } +} diff --git a/packages/service-core-tests/src/tests/tests-index.ts b/packages/service-core-tests/src/tests/tests-index.ts index 558f8c5a1..1917a43c0 100644 --- a/packages/service-core-tests/src/tests/tests-index.ts +++ b/packages/service-core-tests/src/tests/tests-index.ts @@ -1,5 +1,6 @@ export * from './register-bucket-validation-tests.js'; export * from './register-compacting-tests.js'; +export * from './register-parameter-compacting-tests.js'; export * from './register-data-storage-tests.js'; export * from './register-migration-tests.js'; export * from './register-sync-tests.js'; diff --git a/packages/service-core/src/entry/commands/compact-action.ts b/packages/service-core/src/entry/commands/compact-action.ts index 0c2275b79..ce1a67781 100644 --- a/packages/service-core/src/entry/commands/compact-action.ts +++ b/packages/service-core/src/entry/commands/compact-action.ts @@ -59,7 +59,15 @@ export function registerCompactAction(program: Command) { return; } logger.info('Performing compaction...'); - await active.compact({ memoryLimitMB: COMPACT_MEMORY_LIMIT_MB, compactBuckets: buckets }); + if (buckets != null) { + await active.compact({ + memoryLimitMB: COMPACT_MEMORY_LIMIT_MB, + compactBuckets: buckets, + compactParameterData: false + }); + } else { + await active.compact({ memoryLimitMB: COMPACT_MEMORY_LIMIT_MB, compactParameterData: true }); + } logger.info('Successfully compacted storage.'); } catch (e) { logger.error(`Failed to compact: ${e.toString()}`); diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 6b27e3c7c..8ad18df89 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -66,11 +66,6 @@ export interface SyncRulesBucketStorage getCheckpoint(): Promise; - /** - * Used to resolve "dynamic" parameter queries. - */ - getParameterSets(checkpoint: util.InternalOpId, lookups: ParameterLookup[]): Promise; - /** * Given two checkpoints, return the changes in bucket data and parameters that may have occurred * in that period. @@ -198,6 +193,8 @@ export interface CompactOptions { */ compactBuckets?: string[]; + compactParameterData?: boolean; + /** Minimum of 2 */ clearBatchLimit?: number; @@ -206,6 +203,11 @@ export interface CompactOptions { /** Minimum of 1 */ moveBatchQueryLimit?: number; + + /** + * Internal/testing use: Cache size for compacting parameters. + */ + compactParameterCacheLimit?: number; } export interface ClearStorageOptions { @@ -243,6 +245,13 @@ export interface SyncBucketDataChunk { export interface ReplicationCheckpoint { readonly checkpoint: util.InternalOpId; readonly lsn: string | null; + + /** + * Used to resolve "dynamic" parameter queries. + * + * This gets parameter sets specific to this checkpoint. + */ + getParameterSets(lookups: ParameterLookup[]): Promise; } export interface WatchWriteCheckpointOptions { diff --git a/packages/service-core/src/sync/BucketChecksumState.ts b/packages/service-core/src/sync/BucketChecksumState.ts index 2bbd7c8cd..f94720279 100644 --- a/packages/service-core/src/sync/BucketChecksumState.ts +++ b/packages/service-core/src/sync/BucketChecksumState.ts @@ -440,7 +440,7 @@ export class BucketParameterState { if (hasParameterChange || this.cachedDynamicBuckets == null || this.cachedDynamicBucketSet == null) { dynamicBuckets = await querier.queryDynamicBucketDescriptions({ getParameterSets(lookups) { - return storage.getParameterSets(checkpoint.base.checkpoint, lookups); + return checkpoint.base.getParameterSets(lookups); } }); this.cachedDynamicBuckets = dynamicBuckets; @@ -501,7 +501,7 @@ export interface CheckpointLine { } // Use a more specific type to simplify testing -export type BucketChecksumStateStorage = Pick; +export type BucketChecksumStateStorage = Pick; function limitedBuckets(buckets: string[] | { bucket: string }[], limit: number) { buckets = buckets.map((b) => { diff --git a/packages/service-core/test/src/sync/BucketChecksumState.test.ts b/packages/service-core/test/src/sync/BucketChecksumState.test.ts index a53a9f199..312a7dd6c 100644 --- a/packages/service-core/test/src/sync/BucketChecksumState.test.ts +++ b/packages/service-core/test/src/sync/BucketChecksumState.test.ts @@ -5,11 +5,12 @@ import { CHECKPOINT_INVALIDATE_ALL, ChecksumMap, InternalOpId, + ReplicationCheckpoint, SyncContext, WatchFilterEvent } from '@/index.js'; import { JSONBig } from '@powersync/service-jsonbig'; -import { RequestParameters, SqliteJsonRow, ParameterLookup, SqlSyncRules } from '@powersync/service-sync-rules'; +import { ParameterLookup, RequestParameters, SqliteJsonRow, SqlSyncRules } from '@powersync/service-sync-rules'; import { describe, expect, test } from 'vitest'; describe('BucketChecksumState', () => { @@ -67,7 +68,7 @@ bucket_definitions: }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: 1n, lsn: '1' }, + base: storage.makeCheckpoint(1n), writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -97,7 +98,7 @@ bucket_definitions: // Now we get a new line const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: 2n, lsn: '2' }, + base: storage.makeCheckpoint(2n), writeCheckpoint: null, update: { updatedDataBuckets: new Set(['global[]']), @@ -136,7 +137,7 @@ bucket_definitions: }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: 1n, lsn: '1' }, + base: storage.makeCheckpoint(1n), writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -172,7 +173,7 @@ bucket_definitions: }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: 1n, lsn: '1' }, + base: storage.makeCheckpoint(1n), writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -202,7 +203,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 2, count: 2 }); const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: 2n, lsn: '2' }, + base: storage.makeCheckpoint(2n), writeCheckpoint: null, update: { ...CHECKPOINT_INVALIDATE_ALL, @@ -241,7 +242,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[]', checksum: 1, count: 1 }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: 1n, lsn: '1' }, + base: storage.makeCheckpoint(1n), writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -281,7 +282,7 @@ bucket_definitions: // storage.filter = state.checkpointFilter; const line = await state.buildNextCheckpointLine({ - base: { checkpoint: 1n, lsn: '1' }, + base: storage.makeCheckpoint(1n), writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }); @@ -293,7 +294,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 2, count: 2 }); const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: 2n, lsn: '2' }, + base: storage.makeCheckpoint(2n), writeCheckpoint: null, update: { ...CHECKPOINT_INVALIDATE_ALL, @@ -337,7 +338,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 1, count: 1 }); const line = await state.buildNextCheckpointLine({ - base: { checkpoint: 1n, lsn: '1' }, + base: storage.makeCheckpoint(1n), writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }); @@ -348,7 +349,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[2]', checksum: 2, count: 2 }); const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: 2n, lsn: '2' }, + base: storage.makeCheckpoint(2n), writeCheckpoint: null, // Invalidate the state - will re-check all buckets update: CHECKPOINT_INVALIDATE_ALL @@ -384,7 +385,7 @@ bucket_definitions: }); const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: 3n, lsn: '3' }, + base: storage.makeCheckpoint(3n), writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -426,7 +427,7 @@ bucket_definitions: storage.updateTestChecksum({ bucket: 'global[1]', checksum: 4, count: 4 }); const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: 4n, lsn: '4' }, + base: storage.makeCheckpoint(4n), writeCheckpoint: null, update: { ...CHECKPOINT_INVALIDATE_ALL, @@ -484,17 +485,11 @@ bucket_definitions: bucketStorage: storage }); - storage.getParameterSets = async ( - checkpoint: InternalOpId, - lookups: ParameterLookup[] - ): Promise => { - expect(checkpoint).toEqual(1n); - expect(lookups).toEqual([ParameterLookup.normalized('by_project', '1', ['u1'])]); - return [{ id: 1 }, { id: 2 }]; - }; - const line = (await state.buildNextCheckpointLine({ - base: { checkpoint: 1n, lsn: '1' }, + base: storage.makeCheckpoint(1n, (lookups) => { + expect(lookups).toEqual([ParameterLookup.normalized('by_project', '1', ['u1'])]); + return [{ id: 1 }, { id: 2 }]; + }), writeCheckpoint: null, update: CHECKPOINT_INVALIDATE_ALL }))!; @@ -531,18 +526,12 @@ bucket_definitions: line.updateBucketPosition({ bucket: 'by_project[1]', nextAfter: 1n, hasMore: false }); line.updateBucketPosition({ bucket: 'by_project[2]', nextAfter: 1n, hasMore: false }); - storage.getParameterSets = async ( - checkpoint: InternalOpId, - lookups: ParameterLookup[] - ): Promise => { - expect(checkpoint).toEqual(2n); - expect(lookups).toEqual([ParameterLookup.normalized('by_project', '1', ['u1'])]); - return [{ id: 1 }, { id: 2 }, { id: 3 }]; - }; - // Now we get a new line const line2 = (await state.buildNextCheckpointLine({ - base: { checkpoint: 2n, lsn: '2' }, + base: storage.makeCheckpoint(2n, (lookups) => { + expect(lookups).toEqual([ParameterLookup.normalized('by_project', '1', ['u1'])]); + return [{ id: 1 }, { id: 2 }, { id: 3 }]; + }), writeCheckpoint: null, update: { invalidateDataBuckets: false, @@ -595,7 +584,19 @@ class MockBucketChecksumStateStorage implements BucketChecksumStateStorage { ); } - async getParameterSets(checkpoint: InternalOpId, lookups: ParameterLookup[]): Promise { - throw new Error('Method not implemented.'); + makeCheckpoint( + opId: InternalOpId, + parameters?: (lookups: ParameterLookup[]) => SqliteJsonRow[] + ): ReplicationCheckpoint { + return { + checkpoint: opId, + lsn: String(opId), + getParameterSets: async (lookups: ParameterLookup[]) => { + if (parameters == null) { + throw new Error(`getParametersSets not defined for checkpoint ${opId}`); + } + return parameters(lookups); + } + }; } }