From 303528d7553c249c73845904bdeea11bc747672c Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 14 Oct 2024 16:18:12 +0200 Subject: [PATCH 01/13] rename parameters --- .../service-core/src/util/config/compound-config-collector.ts | 3 ++- packages/service-core/src/util/config/types.ts | 1 + packages/types/src/config/PowerSyncConfig.ts | 4 +++- 3 files changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/service-core/src/util/config/compound-config-collector.ts b/packages/service-core/src/util/config/compound-config-collector.ts index 78e8500cb..7c2b70c87 100644 --- a/packages/service-core/src/util/config/compound-config-collector.ts +++ b/packages/service-core/src/util/config/compound-config-collector.ts @@ -122,7 +122,8 @@ export class CompoundConfigCollector { }, // TODO maybe move this out of the connection or something // slot_name_prefix: connections[0]?.slot_name_prefix ?? 'powersync_' - slot_name_prefix: 'powersync_' + slot_name_prefix: 'powersync_', + parameters: baseConfig.parameters ?? {} }; return config; diff --git a/packages/service-core/src/util/config/types.ts b/packages/service-core/src/util/config/types.ts index e5f461e49..99829526d 100644 --- a/packages/service-core/src/util/config/types.ts +++ b/packages/service-core/src/util/config/types.ts @@ -64,4 +64,5 @@ export type ResolvedPowerSyncConfig = { /** Prefix for postgres replication slot names. May eventually be connection-specific. */ slot_name_prefix: string; + parameters: Record; }; diff --git a/packages/types/src/config/PowerSyncConfig.ts b/packages/types/src/config/PowerSyncConfig.ts index e9dff54f4..dbd33e1ae 100644 --- a/packages/types/src/config/PowerSyncConfig.ts +++ b/packages/types/src/config/PowerSyncConfig.ts @@ -135,7 +135,9 @@ export const powerSyncConfig = t.object({ disable_telemetry_sharing: t.boolean, internal_service_endpoint: t.string.optional() }) - .optional() + .optional(), + + parameters: t.record(t.number.or(t.string).or(t.boolean).or(t.Null)).optional() }); export type PowerSyncConfig = t.Decoded; From 86072e37393a9adaf66da52195cda3dab6f25a65 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Wed, 16 Oct 2024 13:29:08 +0200 Subject: [PATCH 02/13] Allow listening to active sync rules. Allow changing WriteCheckpoint mode after the fact --- .../service-core/src/storage/BucketStorage.ts | 7 ++++ .../src/storage/MongoBucketStorage.ts | 34 +++++++++++++++---- .../storage/mongo/MongoWriteCheckpointAPI.ts | 23 ++++++++----- .../src/storage/write-checkpoint.ts | 2 ++ 4 files changed, 52 insertions(+), 14 deletions(-) diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 9c12e805c..14d95fdd6 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -90,6 +90,11 @@ export interface BucketStorageFactory */ getActiveCheckpoint(): Promise; + /** + * Yields the latest sync checkpoint. + */ + watchActiveCheckpoint(signal: AbortSignal): AsyncIterable; + /** * Yields the latest user write checkpoint whenever the sync checkpoint updates. */ @@ -118,6 +123,8 @@ export interface ActiveCheckpoint { hasSyncRules(): boolean; getBucketStorage(): Promise; + + syncRules: PersistedSyncRules | null; } export interface StorageMetrics { diff --git a/packages/service-core/src/storage/MongoBucketStorage.ts b/packages/service-core/src/storage/MongoBucketStorage.ts index c0254da71..e86425d54 100644 --- a/packages/service-core/src/storage/MongoBucketStorage.ts +++ b/packages/service-core/src/storage/MongoBucketStorage.ts @@ -78,6 +78,9 @@ export class MongoBucketStorage db: PowerSyncMongo, options: { slot_name_prefix: string; + /** + * Initial Write Checkpoint Mode + */ write_checkpoint_mode?: WriteCheckpointMode; } ) { @@ -303,6 +306,10 @@ export class MongoBucketStorage return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints(checkpoints); } + setWriteCheckpointMode(mode: WriteCheckpointMode): void { + return this.writeCheckpointAPI.setWriteCheckpointMode(mode); + } + async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise { return this.writeCheckpointAPI.createCustomWriteCheckpoint(options); } @@ -408,14 +415,19 @@ export class MongoBucketStorage return null; } return (await this.storageCache.fetch(doc._id)) ?? null; - } - }; + }, + syncRules: doc + ? new MongoPersistedSyncRulesContent(this.db, doc).parsed({ + defaultSchema: '' + }) + : null + } satisfies ActiveCheckpoint; } /** * Instance-wide watch on the latest available checkpoint (op_id + lsn). */ - private async *watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { + private async *_watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { const pipeline: mongo.Document[] = [ { $match: { @@ -428,7 +440,8 @@ export class MongoBucketStorage operationType: 1, 'fullDocument._id': 1, 'fullDocument.last_checkpoint': 1, - 'fullDocument.last_checkpoint_lsn': 1 + 'fullDocument.last_checkpoint_lsn': 1, + 'fullDocument.content': 1 } } ]; @@ -450,7 +463,8 @@ export class MongoBucketStorage projection: { _id: 1, last_checkpoint: 1, - last_checkpoint_lsn: 1 + last_checkpoint_lsn: 1, + content: 1 } } ); @@ -499,6 +513,7 @@ export class MongoBucketStorage if (doc == null) { continue; } + const op = this.makeActiveCheckpoint(doc); // Check for LSN / checkpoint changes - ignore other metadata changes if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) { @@ -510,9 +525,16 @@ export class MongoBucketStorage // Nothing is done here until a subscriber starts to iterate private readonly sharedIter = new sync.BroadcastIterable((signal) => { - return this.watchActiveCheckpoint(signal); + return this._watchActiveCheckpoint(signal); }); + /** + * Watch changes to the active sync rules and checkpoint. + */ + watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { + return wrapWithAbort(this.sharedIter, signal); + } + /** * User-specific watch on the latest checkpoint and/or write checkpoint. */ diff --git a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts index 230db3153..d466173f5 100644 --- a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts @@ -17,11 +17,19 @@ export type MongoCheckpointAPIOptions = { export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { readonly db: PowerSyncMongo; - readonly mode: WriteCheckpointMode; + private _mode: WriteCheckpointMode; constructor(options: MongoCheckpointAPIOptions) { this.db = options.db; - this.mode = options.mode; + this._mode = options.mode; + } + + get mode() { + return this._mode; + } + + setWriteCheckpointMode(mode: WriteCheckpointMode): void { + this._mode = mode; } async batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise { @@ -29,12 +37,11 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { } async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise { - if (this.mode !== WriteCheckpointMode.CUSTOM) { - throw new framework.errors.ValidationError( - `Creating a custom Write Checkpoint when the current Write Checkpoint mode is set to "${this.mode}"` - ); - } - + /** + * Allow creating custom checkpoints even if the current mode is not `custom`. + * There might be a state where the next sync rules rely on replicating custom + * write checkpoints, but the current active sync rules uses managed checkpoints. + */ const { checkpoint, user_id, sync_rules_id } = options; const doc = await this.db.custom_write_checkpoints.findOneAndUpdate( { diff --git a/packages/service-core/src/storage/write-checkpoint.ts b/packages/service-core/src/storage/write-checkpoint.ts index 0b61fe0c1..0a45dbffd 100644 --- a/packages/service-core/src/storage/write-checkpoint.ts +++ b/packages/service-core/src/storage/write-checkpoint.ts @@ -55,6 +55,8 @@ export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters; export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters; export interface WriteCheckpointAPI { + setWriteCheckpointMode(mode: WriteCheckpointMode): void; + batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise; From 6d0f671de6c9e8207efdf14cd8d5e31f5e4f253b Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Wed, 16 Oct 2024 13:42:23 +0200 Subject: [PATCH 03/13] fix tests --- packages/service-core/src/storage/MongoBucketStorage.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-core/src/storage/MongoBucketStorage.ts b/packages/service-core/src/storage/MongoBucketStorage.ts index e86425d54..251727a36 100644 --- a/packages/service-core/src/storage/MongoBucketStorage.ts +++ b/packages/service-core/src/storage/MongoBucketStorage.ts @@ -416,7 +416,7 @@ export class MongoBucketStorage } return (await this.storageCache.fetch(doc._id)) ?? null; }, - syncRules: doc + syncRules: doc?.content ? new MongoPersistedSyncRulesContent(this.db, doc).parsed({ defaultSchema: '' }) From f45b0410cfe54b595cc6daa347e6139452183442 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Wed, 16 Oct 2024 18:23:33 +0200 Subject: [PATCH 04/13] report current write checkpoint mode --- packages/service-core/src/storage/MongoBucketStorage.ts | 4 ++++ .../src/storage/mongo/MongoWriteCheckpointAPI.ts | 8 ++++---- packages/service-core/src/storage/write-checkpoint.ts | 2 ++ 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/packages/service-core/src/storage/MongoBucketStorage.ts b/packages/service-core/src/storage/MongoBucketStorage.ts index 251727a36..b27a71b53 100644 --- a/packages/service-core/src/storage/MongoBucketStorage.ts +++ b/packages/service-core/src/storage/MongoBucketStorage.ts @@ -96,6 +96,10 @@ export class MongoBucketStorage }); } + get writeCheckpointMode() { + return this.writeCheckpointAPI.writeCheckpointMode; + } + getInstance(options: PersistedSyncRulesContent): MongoSyncBucketStorage { let { id, slot_name } = options; if ((typeof id as any) == 'bigint') { diff --git a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts index d466173f5..aa66b6f28 100644 --- a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts @@ -24,7 +24,7 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { this._mode = options.mode; } - get mode() { + get writeCheckpointMode() { return this._mode; } @@ -59,9 +59,9 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { } async createManagedWriteCheckpoint(checkpoint: ManagedWriteCheckpointOptions): Promise { - if (this.mode !== WriteCheckpointMode.MANAGED) { + if (this.writeCheckpointMode !== WriteCheckpointMode.MANAGED) { throw new framework.errors.ValidationError( - `Creating a managed Write Checkpoint when the current Write Checkpoint mode is set to "${this.mode}"` + `Attempting to create a managed Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"` ); } @@ -84,7 +84,7 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { } async lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise { - switch (this.mode) { + switch (this.writeCheckpointMode) { case WriteCheckpointMode.CUSTOM: if (false == 'sync_rules_id' in filters) { throw new framework.errors.ValidationError(`Sync rules ID is required for custom Write Checkpoint filtering`); diff --git a/packages/service-core/src/storage/write-checkpoint.ts b/packages/service-core/src/storage/write-checkpoint.ts index 0a45dbffd..3a9abd216 100644 --- a/packages/service-core/src/storage/write-checkpoint.ts +++ b/packages/service-core/src/storage/write-checkpoint.ts @@ -55,6 +55,8 @@ export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters; export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters; export interface WriteCheckpointAPI { + readonly writeCheckpointMode: WriteCheckpointMode; + setWriteCheckpointMode(mode: WriteCheckpointMode): void; batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; From c65820ac235c585e553d41036607888d320b27b2 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Thu, 17 Oct 2024 15:47:41 +0200 Subject: [PATCH 05/13] cleanup --- packages/service-core/src/storage/BucketStorage.ts | 2 +- packages/service-core/src/storage/MongoBucketStorage.ts | 6 +----- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 14d95fdd6..8d7df60af 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -124,7 +124,7 @@ export interface ActiveCheckpoint { getBucketStorage(): Promise; - syncRules: PersistedSyncRules | null; + syncRules: PersistedSyncRulesContent | null; } export interface StorageMetrics { diff --git a/packages/service-core/src/storage/MongoBucketStorage.ts b/packages/service-core/src/storage/MongoBucketStorage.ts index b27a71b53..8b44e34a3 100644 --- a/packages/service-core/src/storage/MongoBucketStorage.ts +++ b/packages/service-core/src/storage/MongoBucketStorage.ts @@ -420,11 +420,7 @@ export class MongoBucketStorage } return (await this.storageCache.fetch(doc._id)) ?? null; }, - syncRules: doc?.content - ? new MongoPersistedSyncRulesContent(this.db, doc).parsed({ - defaultSchema: '' - }) - : null + syncRules: doc ? new MongoPersistedSyncRulesContent(this.db, doc) : null } satisfies ActiveCheckpoint; } From eb94c0f54ea3eeee167c8dc26e589ef4848e706d Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Fri, 18 Oct 2024 09:57:04 +0200 Subject: [PATCH 06/13] changeset version does not run postversion script --- packages/sync-rules/package.json | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/sync-rules/package.json b/packages/sync-rules/package.json index bb942b1e8..727188bb6 100644 --- a/packages/sync-rules/package.json +++ b/packages/sync-rules/package.json @@ -15,10 +15,9 @@ "type": "module", "scripts": { "clean": "rm -r ./dist && tsc -b --clean", - "build": "tsc -b", + "build:tsc": "tsc -b", + "build": "pnpm build:tsc && node scripts/compile-schema.js", "build:tests": "tsc -b test/tsconfig.json", - "compile:schema": "pnpm build && node scripts/compile-schema.js", - "postversion": "pnpm compile:schema", "test": "vitest" }, "dependencies": { From 83cb22b7e3747bbb11bce845670b0dc1cae4e773 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Fri, 18 Oct 2024 10:28:42 +0200 Subject: [PATCH 07/13] fix docker build --- service/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/service/Dockerfile b/service/Dockerfile index 715931b11..d6641555e 100644 --- a/service/Dockerfile +++ b/service/Dockerfile @@ -29,6 +29,7 @@ COPY packages/jpgwire/src packages/jpgwire/src/ COPY packages/jpgwire/ca packages/jpgwire/ca/ COPY packages/jsonbig/src packages/jsonbig/src/ COPY packages/sync-rules/src packages/sync-rules/src/ +COPY packages/sync-rules/scripts packages/sync-rules/scripts/ COPY packages/rsocket-router/src packages/rsocket-router/src/ COPY packages/types/src packages/types/src/ From e969c615d0d33ab00f4af6ec749046cb8f658287 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 21 Oct 2024 16:56:30 +0200 Subject: [PATCH 08/13] move write checkpoint methods to sync bucket storage --- .../src/routes/endpoints/checkpointing.ts | 9 ++- .../service-core/src/storage/BucketStorage.ts | 18 ++--- .../src/storage/MongoBucketStorage.ts | 72 +++---------------- .../service-core/src/storage/StorageEngine.ts | 26 +------ .../src/storage/StorageProvider.ts | 10 +-- .../src/storage/mongo/MongoBucketBatch.ts | 4 +- .../src/storage/mongo/MongoStorageProvider.ts | 3 +- .../storage/mongo/MongoSyncBucketStorage.ts | 53 ++++++++++++-- .../storage/mongo/MongoWriteCheckpointAPI.ts | 11 +-- .../src/storage/write-checkpoint.ts | 40 +++++++---- 10 files changed, 111 insertions(+), 135 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/checkpointing.ts b/packages/service-core/src/routes/endpoints/checkpointing.ts index 0cfd2dc12..b95dc703b 100644 --- a/packages/service-core/src/routes/endpoints/checkpointing.ts +++ b/packages/service-core/src/routes/endpoints/checkpointing.ts @@ -1,6 +1,7 @@ import { logger, router, schema } from '@powersync/lib-services-framework'; import * as t from 'ts-codec'; +import * as framework from '@powersync/lib-services-framework'; import * as util from '../../util/util-index.js'; import { authUser } from '../auth.js'; import { routeDefinition } from '../router.js'; @@ -63,7 +64,13 @@ export const writeCheckpoint2 = routeDefinition({ storageEngine: { activeBucketStorage } } = service_context; - const writeCheckpoint = await activeBucketStorage.createManagedWriteCheckpoint({ + const active = await activeBucketStorage.getActiveSyncRulesContent(); + if (!active) { + throw new framework.errors.ValidationError(`Cannot create Write Checkpoint since no sync rules are active.`); + } + + using syncBucketStorage = activeBucketStorage.getInstance(active); + const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({ user_id: full_user_id, heads: { '1': currentCheckpoint } }); diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 61b376be0..edaa226a9 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -12,16 +12,15 @@ import * as util from '../util/util-index.js'; import { ReplicationEventPayload } from './ReplicationEventPayload.js'; import { SourceEntityDescriptor } from './SourceEntity.js'; import { SourceTable } from './SourceTable.js'; -import { BatchedCustomWriteCheckpointOptions, ReplicaId, WriteCheckpointAPI } from './storage-index.js'; +import { BatchedCustomWriteCheckpointOptions, ReplicaId } from './storage-index.js'; +import { SyncStorageWriteCheckpointAPI } from './write-checkpoint.js'; export interface BucketStorageFactoryListener extends DisposableListener { syncStorageCreated: (storage: SyncRulesBucketStorage) => void; replicationEvent: (event: ReplicationEventPayload) => void; } -export interface BucketStorageFactory - extends DisposableObserverClient, - WriteCheckpointAPI { +export interface BucketStorageFactory extends DisposableObserverClient { /** * Update sync rules from configuration, if changed. */ @@ -90,11 +89,6 @@ export interface BucketStorageFactory */ getActiveCheckpoint(): Promise; - /** - * Yields the latest sync checkpoint. - */ - watchActiveCheckpoint(signal: AbortSignal): AsyncIterable; - /** * Yields the latest user write checkpoint whenever the sync checkpoint updates. */ @@ -123,8 +117,6 @@ export interface ActiveCheckpoint { hasSyncRules(): boolean; getBucketStorage(): Promise; - - syncRules: PersistedSyncRulesContent | null; } export interface StorageMetrics { @@ -213,7 +205,9 @@ export interface SyncRulesBucketStorageListener extends DisposableListener { batchStarted: (batch: BucketStorageBatch) => void; } -export interface SyncRulesBucketStorage extends DisposableObserverClient { +export interface SyncRulesBucketStorage + extends DisposableObserverClient, + SyncStorageWriteCheckpointAPI { readonly group_id: number; readonly slot_name: string; diff --git a/packages/service-core/src/storage/MongoBucketStorage.ts b/packages/service-core/src/storage/MongoBucketStorage.ts index 46fd1e66b..f3175b915 100644 --- a/packages/service-core/src/storage/MongoBucketStorage.ts +++ b/packages/service-core/src/storage/MongoBucketStorage.ts @@ -25,16 +25,7 @@ import { PowerSyncMongo, PowerSyncMongoOptions } from './mongo/db.js'; import { SyncRuleDocument, SyncRuleState } from './mongo/models.js'; import { MongoPersistedSyncRulesContent } from './mongo/MongoPersistedSyncRulesContent.js'; import { MongoSyncBucketStorage } from './mongo/MongoSyncBucketStorage.js'; -import { MongoWriteCheckpointAPI } from './mongo/MongoWriteCheckpointAPI.js'; import { generateSlotName } from './mongo/util.js'; -import { - CustomWriteCheckpointOptions, - DEFAULT_WRITE_CHECKPOINT_MODE, - LastWriteCheckpointFilters, - ManagedWriteCheckpointOptions, - WriteCheckpointAPI, - WriteCheckpointMode -} from './write-checkpoint.js'; export interface MongoBucketStorageOptions extends PowerSyncMongoOptions {} @@ -47,10 +38,6 @@ export class MongoBucketStorage // TODO: This is still Postgres specific and needs to be reworked public readonly slot_name_prefix: string; - readonly write_checkpoint_mode: WriteCheckpointMode; - - protected readonly writeCheckpointAPI: WriteCheckpointAPI; - private readonly storageCache = new LRUCache({ max: 3, fetchMethod: async (id) => { @@ -78,10 +65,6 @@ export class MongoBucketStorage db: PowerSyncMongo, options: { slot_name_prefix: string; - /** - * Initial Write Checkpoint Mode - */ - write_checkpoint_mode?: WriteCheckpointMode; } ) { super(); @@ -89,15 +72,6 @@ export class MongoBucketStorage this.db = db; this.session = this.client.startSession(); this.slot_name_prefix = options.slot_name_prefix; - this.write_checkpoint_mode = options.write_checkpoint_mode ?? DEFAULT_WRITE_CHECKPOINT_MODE; - this.writeCheckpointAPI = new MongoWriteCheckpointAPI({ - db, - mode: this.write_checkpoint_mode - }); - } - - get writeCheckpointMode() { - return this.writeCheckpointAPI.writeCheckpointMode; } getInstance(options: PersistedSyncRulesContent): MongoSyncBucketStorage { @@ -306,26 +280,6 @@ export class MongoBucketStorage }); } - async batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise { - return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints(checkpoints); - } - - setWriteCheckpointMode(mode: WriteCheckpointMode): void { - return this.writeCheckpointAPI.setWriteCheckpointMode(mode); - } - - async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise { - return this.writeCheckpointAPI.createCustomWriteCheckpoint(options); - } - - async createManagedWriteCheckpoint(options: ManagedWriteCheckpointOptions): Promise { - return this.writeCheckpointAPI.createManagedWriteCheckpoint(options); - } - - async lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise { - return this.writeCheckpointAPI.lastWriteCheckpoint(filters); - } - async getActiveCheckpoint(): Promise { const doc = await this.db.sync_rules.findOne( { @@ -436,15 +390,14 @@ export class MongoBucketStorage return null; } return (await this.storageCache.fetch(doc._id)) ?? null; - }, - syncRules: doc ? new MongoPersistedSyncRulesContent(this.db, doc) : null + } } satisfies ActiveCheckpoint; } /** * Instance-wide watch on the latest available checkpoint (op_id + lsn). */ - private async *_watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { + private async *watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { const pipeline: mongo.Document[] = [ { $match: { @@ -457,8 +410,7 @@ export class MongoBucketStorage operationType: 1, 'fullDocument._id': 1, 'fullDocument.last_checkpoint': 1, - 'fullDocument.last_checkpoint_lsn': 1, - 'fullDocument.content': 1 + 'fullDocument.last_checkpoint_lsn': 1 } } ]; @@ -480,8 +432,7 @@ export class MongoBucketStorage projection: { _id: 1, last_checkpoint: 1, - last_checkpoint_lsn: 1, - content: 1 + last_checkpoint_lsn: 1 } } ); @@ -542,16 +493,9 @@ export class MongoBucketStorage // Nothing is done here until a subscriber starts to iterate private readonly sharedIter = new sync.BroadcastIterable((signal) => { - return this._watchActiveCheckpoint(signal); + return this.watchActiveCheckpoint(signal); }); - /** - * Watch changes to the active sync rules and checkpoint. - */ - watchActiveCheckpoint(signal: AbortSignal): AsyncIterable { - return wrapWithAbort(this.sharedIter, signal); - } - /** * User-specific watch on the latest checkpoint and/or write checkpoint. */ @@ -568,12 +512,14 @@ export class MongoBucketStorage // 1. checkpoint (op_id) changes. // 2. write checkpoint changes for the specific user const bucketStorage = await cp.getBucketStorage(); + if (!bucketStorage) { + continue; + } const lsnFilters: Record = lsn ? { 1: lsn } : {}; - const currentWriteCheckpoint = await this.lastWriteCheckpoint({ + const currentWriteCheckpoint = await bucketStorage.lastWriteCheckpoint({ user_id, - sync_rules_id: bucketStorage?.group_id, heads: { ...lsnFilters } diff --git a/packages/service-core/src/storage/StorageEngine.ts b/packages/service-core/src/storage/StorageEngine.ts index bdafeb240..a6639211a 100644 --- a/packages/service-core/src/storage/StorageEngine.ts +++ b/packages/service-core/src/storage/StorageEngine.ts @@ -1,17 +1,12 @@ import { DisposableListener, DisposableObserver, logger } from '@powersync/lib-services-framework'; import { ResolvedPowerSyncConfig } from '../util/util-index.js'; import { BucketStorageFactory } from './BucketStorage.js'; -import { ActiveStorage, BucketStorageProvider, StorageSettings } from './StorageProvider.js'; -import { DEFAULT_WRITE_CHECKPOINT_MODE } from './write-checkpoint.js'; +import { ActiveStorage, BucketStorageProvider } from './StorageProvider.js'; export type StorageEngineOptions = { configuration: ResolvedPowerSyncConfig; }; -export const DEFAULT_STORAGE_SETTINGS: StorageSettings = { - writeCheckpointMode: DEFAULT_WRITE_CHECKPOINT_MODE -}; - export interface StorageEngineListener extends DisposableListener { storageActivated: (storage: BucketStorageFactory) => void; } @@ -20,11 +15,9 @@ export class StorageEngine extends DisposableObserver { // TODO: This will need to revisited when we actually support multiple storage providers. private storageProviders: Map = new Map(); private currentActiveStorage: ActiveStorage | null = null; - private _activeSettings: StorageSettings; constructor(private options: StorageEngineOptions) { super(); - this._activeSettings = DEFAULT_STORAGE_SETTINGS; } get activeBucketStorage(): BucketStorageFactory { @@ -39,20 +32,6 @@ export class StorageEngine extends DisposableObserver { return this.currentActiveStorage; } - get activeSettings(): StorageSettings { - return { ...this._activeSettings }; - } - - updateSettings(settings: Partial) { - if (this.currentActiveStorage) { - throw new Error(`Storage is already active, settings cannot be modified.`); - } - this._activeSettings = { - ...this._activeSettings, - ...settings - }; - } - /** * Register a provider which generates a {@link BucketStorageFactory} * given the matching config specified in the loaded {@link ResolvedPowerSyncConfig} @@ -65,8 +44,7 @@ export class StorageEngine extends DisposableObserver { logger.info('Starting Storage Engine...'); const { configuration } = this.options; this.currentActiveStorage = await this.storageProviders.get(configuration.storage.type)!.getStorage({ - resolvedConfig: configuration, - ...this.activeSettings + resolvedConfig: configuration }); this.iterateListeners((cb) => cb.storageActivated?.(this.activeBucketStorage)); logger.info(`Successfully activated storage: ${configuration.storage.type}.`); diff --git a/packages/service-core/src/storage/StorageProvider.ts b/packages/service-core/src/storage/StorageProvider.ts index 7c730fb4b..385a042fd 100644 --- a/packages/service-core/src/storage/StorageProvider.ts +++ b/packages/service-core/src/storage/StorageProvider.ts @@ -1,6 +1,5 @@ import * as util from '../util/util-index.js'; import { BucketStorageFactory } from './BucketStorage.js'; -import { WriteCheckpointMode } from './write-checkpoint.js'; export interface ActiveStorage { storage: BucketStorageFactory; @@ -12,14 +11,7 @@ export interface ActiveStorage { tearDown(): Promise; } -/** - * Settings which can be modified by various modules in their initialization. - */ -export interface StorageSettings { - writeCheckpointMode: WriteCheckpointMode; -} - -export interface GetStorageOptions extends StorageSettings { +export interface GetStorageOptions { // TODO: This should just be the storage config. Update once the slot name prefix coupling has been removed from the storage resolvedConfig: util.ResolvedPowerSyncConfig; } diff --git a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts index f0f95cd4e..3a4f38453 100644 --- a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts +++ b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts @@ -12,7 +12,7 @@ import { SaveOptions } from '../BucketStorage.js'; import { SourceTable } from '../SourceTable.js'; -import { CustomWriteCheckpointOptions } from '../write-checkpoint.js'; +import { BatchedCustomWriteCheckpointOptions, CustomWriteCheckpointOptions } from '../write-checkpoint.js'; import { PowerSyncMongo } from './db.js'; import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js'; import { MongoIdSequence } from './MongoIdSequence.js'; @@ -83,7 +83,7 @@ export class MongoBucketBatch extends DisposableObserver client.close(), tearDown: () => { diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index e4b57788e..829ab6ba8 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -2,7 +2,8 @@ import { SqliteJsonRow, SqliteJsonValue, SqlSyncRules } from '@powersync/service import * as bson from 'bson'; import * as mongo from 'mongodb'; -import { DisposableObserver } from '@powersync/lib-services-framework'; +import { DisposableObserver, logger } from '@powersync/lib-services-framework'; +import * as timers from 'timers/promises'; import * as db from '../../db/db-index.js'; import * as util from '../../util/util-index.js'; import { @@ -26,13 +27,19 @@ import { import { ChecksumCache, FetchPartialBucketChecksum, PartialChecksum, PartialChecksumMap } from '../ChecksumCache.js'; import { MongoBucketStorage } from '../MongoBucketStorage.js'; import { SourceTable } from '../SourceTable.js'; +import { + BatchedCustomWriteCheckpointOptions, + ManagedWriteCheckpointOptions, + SyncStorageLastWriteCheckpointFilters, + WriteCheckpointAPI, + WriteCheckpointMode +} from '../write-checkpoint.js'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, SourceKey, SyncRuleState } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; import { MongoCompactor } from './MongoCompactor.js'; +import { MongoWriteCheckpointAPI } from './MongoWriteCheckpointAPI.js'; import { BSON_DESERIALIZE_OPTIONS, idPrefixFilter, mapOpEntry, readSingleBatch, serializeLookup } from './util.js'; -import { logger } from '@powersync/lib-services-framework'; -import * as timers from 'timers/promises'; export class MongoSyncBucketStorage extends DisposableObserver @@ -46,15 +53,53 @@ export class MongoSyncBucketStorage }); private parsedSyncRulesCache: SqlSyncRules | undefined; + private writeCheckpointAPI: WriteCheckpointAPI; constructor( public readonly factory: MongoBucketStorage, public readonly group_id: number, private readonly sync_rules: PersistedSyncRulesContent, - public readonly slot_name: string + public readonly slot_name: string, + writeCheckpointMode: WriteCheckpointMode = WriteCheckpointMode.MANAGED ) { super(); this.db = factory.db; + this.writeCheckpointAPI = new MongoWriteCheckpointAPI({ + db: this.db, + mode: writeCheckpointMode + }); + } + + get writeCheckpointMode() { + return this.writeCheckpointAPI.writeCheckpointMode; + } + + setWriteCheckpointMode(mode: WriteCheckpointMode): void { + this.writeCheckpointAPI.setWriteCheckpointMode(mode); + } + + batchCreateCustomWriteCheckpoints(checkpoints: BatchedCustomWriteCheckpointOptions[]): Promise { + return this.writeCheckpointAPI.batchCreateCustomWriteCheckpoints( + checkpoints.map((checkpoint) => ({ ...checkpoint, sync_rules_id: this.group_id })) + ); + } + + createCustomWriteCheckpoint(checkpoint: BatchedCustomWriteCheckpointOptions): Promise { + return this.writeCheckpointAPI.createCustomWriteCheckpoint({ + ...checkpoint, + sync_rules_id: this.group_id + }); + } + + createManagedWriteCheckpoint(checkpoint: ManagedWriteCheckpointOptions): Promise { + return this.writeCheckpointAPI.createManagedWriteCheckpoint(checkpoint); + } + + lastWriteCheckpoint(filters: SyncStorageLastWriteCheckpointFilters): Promise { + return this.writeCheckpointAPI.lastWriteCheckpoint({ + ...filters, + sync_rules_id: this.group_id + }); } getParsedSyncRules(options: ParseSyncRulesOptions): SqlSyncRules { diff --git a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts index aa66b6f28..46b4266b9 100644 --- a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts @@ -37,11 +37,12 @@ export class MongoWriteCheckpointAPI implements WriteCheckpointAPI { } async createCustomWriteCheckpoint(options: CustomWriteCheckpointOptions): Promise { - /** - * Allow creating custom checkpoints even if the current mode is not `custom`. - * There might be a state where the next sync rules rely on replicating custom - * write checkpoints, but the current active sync rules uses managed checkpoints. - */ + if (this.writeCheckpointMode !== WriteCheckpointMode.CUSTOM) { + throw new framework.errors.ValidationError( + `Creating a custom Write Checkpoint when the current Write Checkpoint mode is set to "${this.writeCheckpointMode}"` + ); + } + const { checkpoint, user_id, sync_rules_id } = options; const doc = await this.db.custom_write_checkpoints.findOneAndUpdate( { diff --git a/packages/service-core/src/storage/write-checkpoint.ts b/packages/service-core/src/storage/write-checkpoint.ts index 3a9abd216..5fce0b18f 100644 --- a/packages/service-core/src/storage/write-checkpoint.ts +++ b/packages/service-core/src/storage/write-checkpoint.ts @@ -26,19 +26,19 @@ export interface CustomWriteCheckpointFilters extends BaseWriteCheckpointIdentif sync_rules_id: number; } -export interface CustomWriteCheckpointOptions extends CustomWriteCheckpointFilters { +export interface BatchedCustomWriteCheckpointOptions extends BaseWriteCheckpointIdentifier { /** * A supplied incrementing Write Checkpoint number */ checkpoint: bigint; } -/** - * Options for creating a custom Write Checkpoint in a batch. - * A {@link BucketStorageBatch} is already associated with a Sync Rules instance. - * The `sync_rules_id` is not required here. - */ -export type BatchedCustomWriteCheckpointOptions = Omit; +export interface CustomWriteCheckpointOptions extends BatchedCustomWriteCheckpointOptions { + /** + * Sync rules which were active when this checkpoint was created. + */ + sync_rules_id: number; +} /** * Managed Write Checkpoints are a mapping of User ID to replication HEAD @@ -52,19 +52,33 @@ export interface ManagedWriteCheckpointFilters extends BaseWriteCheckpointIdenti export type ManagedWriteCheckpointOptions = ManagedWriteCheckpointFilters; +export type SyncStorageLastWriteCheckpointFilters = BaseWriteCheckpointIdentifier | ManagedWriteCheckpointFilters; export type LastWriteCheckpointFilters = CustomWriteCheckpointFilters | ManagedWriteCheckpointFilters; -export interface WriteCheckpointAPI { +export interface BaseWriteCheckpointAPI { readonly writeCheckpointMode: WriteCheckpointMode; - setWriteCheckpointMode(mode: WriteCheckpointMode): void; + createManagedWriteCheckpoint(checkpoint: ManagedWriteCheckpointOptions): Promise; +} - batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; +/** + * Write Checkpoint API to be used in conjunction with a {@link SyncRulesBucketStorage}. + * This storage corresponds with a set of sync rules. These APIs don't require specifying a + * sync rules id. + */ +export interface SyncStorageWriteCheckpointAPI extends BaseWriteCheckpointAPI { + batchCreateCustomWriteCheckpoints(checkpoints: BatchedCustomWriteCheckpointOptions[]): Promise; + createCustomWriteCheckpoint(checkpoint: BatchedCustomWriteCheckpointOptions): Promise; + lastWriteCheckpoint(filters: SyncStorageLastWriteCheckpointFilters): Promise; +} +/** + * Write Checkpoint API which is interfaced directly with the storage layer. This requires + * sync rules identifiers for custom write checkpoints. + */ +export interface WriteCheckpointAPI extends BaseWriteCheckpointAPI { + batchCreateCustomWriteCheckpoints(checkpoints: CustomWriteCheckpointOptions[]): Promise; createCustomWriteCheckpoint(checkpoint: CustomWriteCheckpointOptions): Promise; - - createManagedWriteCheckpoint(checkpoint: ManagedWriteCheckpointOptions): Promise; - lastWriteCheckpoint(filters: LastWriteCheckpointFilters): Promise; } From 380ff8145f61f74dff9299d2a7f5e8b889277556 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 21 Oct 2024 17:00:29 +0200 Subject: [PATCH 09/13] Rename WriteCheckPointAPI file --- .../src/storage/{write-checkpoint.ts => WriteCheckpointAPI.ts} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename packages/service-core/src/storage/{write-checkpoint.ts => WriteCheckpointAPI.ts} (100%) diff --git a/packages/service-core/src/storage/write-checkpoint.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts similarity index 100% rename from packages/service-core/src/storage/write-checkpoint.ts rename to packages/service-core/src/storage/WriteCheckpointAPI.ts From 093c5a6a26669720d90b2bb4b39387b06fb3e2f9 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 21 Oct 2024 17:00:40 +0200 Subject: [PATCH 10/13] updates from rename --- packages/service-core/src/storage/BucketStorage.ts | 2 +- packages/service-core/src/storage/mongo/MongoBucketBatch.ts | 2 +- .../service-core/src/storage/mongo/MongoSyncBucketStorage.ts | 2 +- .../service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts | 2 +- packages/service-core/src/storage/storage-index.ts | 4 ++-- 5 files changed, 6 insertions(+), 6 deletions(-) diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index edaa226a9..446362a5a 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -13,7 +13,7 @@ import { ReplicationEventPayload } from './ReplicationEventPayload.js'; import { SourceEntityDescriptor } from './SourceEntity.js'; import { SourceTable } from './SourceTable.js'; import { BatchedCustomWriteCheckpointOptions, ReplicaId } from './storage-index.js'; -import { SyncStorageWriteCheckpointAPI } from './write-checkpoint.js'; +import { SyncStorageWriteCheckpointAPI } from './WriteCheckpointAPI.js'; export interface BucketStorageFactoryListener extends DisposableListener { syncStorageCreated: (storage: SyncRulesBucketStorage) => void; diff --git a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts index 3a4f38453..c24a04999 100644 --- a/packages/service-core/src/storage/mongo/MongoBucketBatch.ts +++ b/packages/service-core/src/storage/mongo/MongoBucketBatch.ts @@ -12,7 +12,7 @@ import { SaveOptions } from '../BucketStorage.js'; import { SourceTable } from '../SourceTable.js'; -import { BatchedCustomWriteCheckpointOptions, CustomWriteCheckpointOptions } from '../write-checkpoint.js'; +import { BatchedCustomWriteCheckpointOptions, CustomWriteCheckpointOptions } from '../WriteCheckpointAPI.js'; import { PowerSyncMongo } from './db.js'; import { CurrentBucket, CurrentDataDocument, SourceKey, SyncRuleDocument } from './models.js'; import { MongoIdSequence } from './MongoIdSequence.js'; diff --git a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts index 829ab6ba8..9243c0107 100644 --- a/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts +++ b/packages/service-core/src/storage/mongo/MongoSyncBucketStorage.ts @@ -33,7 +33,7 @@ import { SyncStorageLastWriteCheckpointFilters, WriteCheckpointAPI, WriteCheckpointMode -} from '../write-checkpoint.js'; +} from '../WriteCheckpointAPI.js'; import { PowerSyncMongo } from './db.js'; import { BucketDataDocument, BucketDataKey, SourceKey, SyncRuleState } from './models.js'; import { MongoBucketBatch } from './MongoBucketBatch.js'; diff --git a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts index 46b4266b9..bbcebb0de 100644 --- a/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/mongo/MongoWriteCheckpointAPI.ts @@ -7,7 +7,7 @@ import { ManagedWriteCheckpointOptions, WriteCheckpointAPI, WriteCheckpointMode -} from '../write-checkpoint.js'; +} from '../WriteCheckpointAPI.js'; import { PowerSyncMongo } from './db.js'; export type MongoCheckpointAPIOptions = { diff --git a/packages/service-core/src/storage/storage-index.ts b/packages/service-core/src/storage/storage-index.ts index b58576639..231bb3849 100644 --- a/packages/service-core/src/storage/storage-index.ts +++ b/packages/service-core/src/storage/storage-index.ts @@ -5,6 +5,7 @@ export * from './SourceEntity.js'; export * from './SourceTable.js'; export * from './StorageEngine.js'; +export * from './mongo/config.js'; export * from './mongo/db.js'; export * from './mongo/models.js'; export * from './mongo/MongoBucketBatch.js'; @@ -17,5 +18,4 @@ export * from './mongo/MongoSyncRulesLock.js'; export * from './mongo/OperationBatch.js'; export * from './mongo/PersistedBatch.js'; export * from './mongo/util.js'; -export * from './mongo/config.js'; -export * from './write-checkpoint.js'; +export * from './WriteCheckpointAPI.js'; From 93d9191058e8a89e083ce1da44706142dfa767ba Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 21 Oct 2024 17:03:24 +0200 Subject: [PATCH 11/13] rename variable --- packages/service-core/src/routes/endpoints/checkpointing.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/service-core/src/routes/endpoints/checkpointing.ts b/packages/service-core/src/routes/endpoints/checkpointing.ts index b95dc703b..3ce913ad2 100644 --- a/packages/service-core/src/routes/endpoints/checkpointing.ts +++ b/packages/service-core/src/routes/endpoints/checkpointing.ts @@ -64,12 +64,12 @@ export const writeCheckpoint2 = routeDefinition({ storageEngine: { activeBucketStorage } } = service_context; - const active = await activeBucketStorage.getActiveSyncRulesContent(); - if (!active) { + const activeSyncRules = await activeBucketStorage.getActiveSyncRulesContent(); + if (!activeSyncRules) { throw new framework.errors.ValidationError(`Cannot create Write Checkpoint since no sync rules are active.`); } - using syncBucketStorage = activeBucketStorage.getInstance(active); + using syncBucketStorage = activeBucketStorage.getInstance(activeSyncRules); const writeCheckpoint = await syncBucketStorage.createManagedWriteCheckpoint({ user_id: full_user_id, heads: { '1': currentCheckpoint } From ba682a460ae6bd80c099672ef0a58bb89df64885 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 21 Oct 2024 17:20:32 +0200 Subject: [PATCH 12/13] add changeset --- .changeset/slow-stingrays-kiss.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/slow-stingrays-kiss.md diff --git a/.changeset/slow-stingrays-kiss.md b/.changeset/slow-stingrays-kiss.md new file mode 100644 index 000000000..a93126de7 --- /dev/null +++ b/.changeset/slow-stingrays-kiss.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-core': minor +--- + +Moved Write Checkpoint APIs to SyncBucketStorage From 28fb524472727661d15b298f777d733d410e1ec5 Mon Sep 17 00:00:00 2001 From: Steven Ontong Date: Mon, 21 Oct 2024 17:38:00 +0200 Subject: [PATCH 13/13] fix typo --- packages/service-core/src/storage/WriteCheckpointAPI.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/service-core/src/storage/WriteCheckpointAPI.ts b/packages/service-core/src/storage/WriteCheckpointAPI.ts index 5fce0b18f..d38ac3979 100644 --- a/packages/service-core/src/storage/WriteCheckpointAPI.ts +++ b/packages/service-core/src/storage/WriteCheckpointAPI.ts @@ -3,7 +3,7 @@ export enum WriteCheckpointMode { * Raw mappings of `user_id` to `write_checkpoint`s should * be supplied for each set of sync rules. */ - CUSTOM = 'manual', + CUSTOM = 'custom', /** * Write checkpoints are stored as a mapping of `user_id` plus * replication HEAD (lsn in Postgres) to an automatically generated