From 0070aa1b35becbb1096432e34491515ef4f7286b Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 13:35:33 +0200 Subject: [PATCH 01/16] Refactor autoActivate logic to execute on the first consistent commit. --- .../implementation/MongoBucketBatch.ts | 57 +++++++++++++++++++ .../implementation/MongoSyncBucketStorage.ts | 35 ------------ .../src/replication/ChangeStream.ts | 21 +++++-- .../src/replication/BinLogStream.ts | 2 - .../src/storage/PostgresSyncRulesStorage.ts | 37 ------------ .../src/storage/batch/PostgresBucketBatch.ts | 56 ++++++++++++++++++ .../src/replication/WalStream.ts | 4 -- .../src/tests/register-sync-tests.ts | 15 ----- .../src/storage/SyncRulesBucketStorage.ts | 2 - 9 files changed, 129 insertions(+), 100 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 6ad06a327..38f9701ed 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -99,6 +99,8 @@ export class MongoBucketBatch */ public last_flushed_op: InternalOpId | null = null; + private needsActivation = true; + constructor(options: MongoBucketBatchOptions) { super(); this.logger = options.logger ?? defaultLogger; @@ -685,6 +687,7 @@ export class MongoBucketBatch if (!createEmptyCheckpoints && this.persisted_op == null) { // Nothing to commit - also return true + await this.autoActivate(lsn); return true; } @@ -729,12 +732,65 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.autoActivate(lsn); await this.db.notifyCheckpoint(); this.persisted_op = null; this.last_checkpoint_lsn = lsn; return true; } + /** + * Switch from processing -> active if relevant. + * + * Called on new commits. + */ + private async autoActivate(lsn: string) { + if (!this.needsActivation) { + return; + } + + // Activate the batch, so it can start processing. + // This is done automatically when the first save() is called. + + const session = this.session; + let activated = false; + await session.withTransaction(async () => { + const doc = await this.db.sync_rules.findOne({ _id: this.group_id }, { session }); + if (doc && doc.state == 'PROCESSING') { + await this.db.sync_rules.updateOne( + { + _id: this.group_id + }, + { + $set: { + state: storage.SyncRuleState.ACTIVE + } + }, + { session } + ); + + await this.db.sync_rules.updateMany( + { + _id: { $ne: this.group_id }, + state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } + }, + { + $set: { + state: storage.SyncRuleState.STOP + } + }, + { session } + ); + activated = true; + } + }); + if (activated) { + this.logger.info(`Activated new sync rules at ${lsn}`); + await this.db.notifyCheckpoint(); + } + this.needsActivation = false; + } + async keepalive(lsn: string): Promise { if (this.last_checkpoint_lsn != null && lsn <= this.last_checkpoint_lsn) { // No-op @@ -782,6 +838,7 @@ export class MongoBucketBatch }, { session: this.session } ); + await this.autoActivate(lsn); await this.db.notifyCheckpoint(); this.last_checkpoint_lsn = lsn; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index d069d28f6..4797ce150 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -640,41 +640,6 @@ export class MongoSyncBucketStorage ); } - async autoActivate(): Promise { - await this.db.client.withSession(async (session) => { - await session.withTransaction(async () => { - const doc = await this.db.sync_rules.findOne({ _id: this.group_id }, { session }); - if (doc && doc.state == 'PROCESSING') { - await this.db.sync_rules.updateOne( - { - _id: this.group_id - }, - { - $set: { - state: storage.SyncRuleState.ACTIVE - } - }, - { session } - ); - - await this.db.sync_rules.updateMany( - { - _id: { $ne: this.group_id }, - state: { $in: [storage.SyncRuleState.ACTIVE, storage.SyncRuleState.ERRORED] } - }, - { - $set: { - state: storage.SyncRuleState.STOP - } - }, - { session } - ); - await this.db.notifyCheckpoint(); - } - }); - }); - } - async reportError(e: any): Promise { const message = String(e.message ?? 'Replication failure'); await this.db.sync_rules.updateOne( diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 9d0b19821..36ec7567f 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -218,6 +218,11 @@ export class ChangeStream { return await db.collection(table.table).estimatedDocumentCount(); } + /** + * This gets a LSN before starting a snapshot, which we can resume streaming from after the snapshot. + * + * This LSN can survive initial replication restarts. + */ private async getSnapshotLsn(): Promise { const hello = await this.defaultDb.command({ hello: 1 }); // Basic sanity check @@ -292,6 +297,9 @@ export class ChangeStream { ); } + /** + * Given a snapshot LSN, validate that we can read from it, by opening a change stream. + */ private async validateSnapshotLsn(lsn: string) { await using streamManager = this.openChangeStream({ lsn: lsn, maxAwaitTimeMs: 0 }); const { stream } = streamManager; @@ -362,8 +370,13 @@ export class ChangeStream { await touch(); } - this.logger.info(`Snapshot commit at ${snapshotLsn}`); - await batch.commit(snapshotLsn); + // The checkpoint here is a marker - we need to replicate up to at least this + // point before the data can be considered consistent. + // We could do this for each individual table, but may as well just do it once for the entire snapshot. + const checkpoint = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); + await batch.markSnapshotDone([], checkpoint); + + this.logger.info(`Snapshot done. Need to replicate from ${snapshotLsn} to ${checkpoint} to be consistent`); } ); } @@ -757,14 +770,12 @@ export class ChangeStream { } async streamChangesInternal() { - // Auto-activate as soon as initial replication is done - await this.storage.autoActivate(); - await this.storage.startBatch( { logger: this.logger, zeroLSN: MongoLSN.ZERO.comparable, defaultSchema: this.defaultDb.databaseName, + // We get a complete postimage for every change, so we don't need to store the current data. storeCurrentData: false }, async (batch) => { diff --git a/modules/module-mysql/src/replication/BinLogStream.ts b/modules/module-mysql/src/replication/BinLogStream.ts index 6907e842c..3a4b76d71 100644 --- a/modules/module-mysql/src/replication/BinLogStream.ts +++ b/modules/module-mysql/src/replication/BinLogStream.ts @@ -430,8 +430,6 @@ AND table_type = 'BASE TABLE';`, } async streamChanges() { - // Auto-activate as soon as initial replication is done - await this.storage.autoActivate(); const serverId = createRandomServerId(this.storage.group_id); const connection = await this.connections.getConnection(); diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index c3eeb641b..c692d4ef6 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -644,43 +644,6 @@ export class PostgresSyncRulesStorage `.execute(); } - async autoActivate(): Promise { - await this.db.transaction(async (db) => { - const syncRulesRow = await db.sql` - SELECT - state - FROM - sync_rules - WHERE - id = ${{ type: 'int4', value: this.group_id }} - ` - .decoded(pick(models.SyncRules, ['state'])) - .first(); - - if (syncRulesRow && syncRulesRow.state == storage.SyncRuleState.PROCESSING) { - await db.sql` - UPDATE sync_rules - SET - state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} - WHERE - id = ${{ type: 'int4', value: this.group_id }} - `.execute(); - } - - await db.sql` - UPDATE sync_rules - SET - state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }} - WHERE - ( - state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} - OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} - ) - AND id != ${{ type: 'int4', value: this.group_id }} - `.execute(); - }); - } - private async getChecksumsInternal(batch: storage.FetchPartialBucketChecksum[]): Promise { if (batch.length == 0) { return new Map(); diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index ab0c942c8..85752e328 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -73,6 +73,7 @@ export class PostgresBucketBatch protected batch: OperationBatch | null; private lastWaitingLogThrottled = 0; private markRecordUnavailable: BucketStorageMarkRecordUnavailable | undefined; + private needsActivation = true; constructor(protected options: PostgresBucketBatchOptions) { super(); @@ -321,6 +322,7 @@ export class PostgresBucketBatch // Don't create a checkpoint if there were no changes if (!createEmptyCheckpoints && this.persisted_op == null) { // Nothing to commit - return true + await this.autoActivate(lsn); return true; } @@ -363,6 +365,7 @@ export class PostgresBucketBatch .decoded(StatefulCheckpoint) .first(); + await this.autoActivate(lsn); await notifySyncRulesUpdate(this.db, doc!); this.persisted_op = null; @@ -916,6 +919,59 @@ export class PostgresBucketBatch return result; } + /** + * Switch from processing -> active if relevant. + * + * Called on new commits. + */ + private async autoActivate(lsn: string): Promise { + if (!this.needsActivation) { + // Already activated + return; + } + + let didActivate = false; + await this.db.transaction(async (db) => { + const syncRulesRow = await db.sql` + SELECT + state + FROM + sync_rules + WHERE + id = ${{ type: 'int4', value: this.group_id }} + ` + .decoded(pick(models.SyncRules, ['state'])) + .first(); + + if (syncRulesRow && syncRulesRow.state == storage.SyncRuleState.PROCESSING) { + await db.sql` + UPDATE sync_rules + SET + state = ${{ type: 'varchar', value: storage.SyncRuleState.ACTIVE }} + WHERE + id = ${{ type: 'int4', value: this.group_id }} + `.execute(); + didActivate = true; + } + + await db.sql` + UPDATE sync_rules + SET + state = ${{ type: 'varchar', value: storage.SyncRuleState.STOP }} + WHERE + ( + state = ${{ value: storage.SyncRuleState.ACTIVE, type: 'varchar' }} + OR state = ${{ value: storage.SyncRuleState.ERRORED, type: 'varchar' }} + ) + AND id != ${{ type: 'int4', value: this.group_id }} + `.execute(); + }); + if (didActivate) { + this.logger.info(`Activated new sync rules at ${lsn}`); + } + this.needsActivation = false; + } + /** * Gets relevant {@link SqlEventDescriptor}s for the given {@link SourceTable} * TODO maybe share this with an abstract class diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 01c99082e..219f7cdda 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -837,7 +837,6 @@ WHERE oid = $1::regclass`, async streamChanges(replicationConnection: pgwire.PgConnection) { // When changing any logic here, check /docs/wal-lsns.md. - const { createEmptyCheckpoints } = await this.ensureStorageCompatibility(); const replicationOptions: Record = { @@ -867,9 +866,6 @@ WHERE oid = $1::regclass`, this.startedStreaming = true; - // Auto-activate as soon as initial replication is done - await this.storage.autoActivate(); - let resnapshot: { table: storage.SourceTable; key: PrimaryKeyValue }[] = []; const markRecordUnavailable = (record: SaveUpdate) => { diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 99e989512..173b1ea85 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -54,7 +54,6 @@ export function registerSyncTests(factory: storage.TestStorageFactory) { }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -116,7 +115,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -178,7 +176,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // Initial data: Add one priority row and 10k low-priority rows. @@ -289,7 +286,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // Initial data: Add one priority row and 10k low-priority rows. @@ -431,7 +427,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // Initial data: Add one priority row and 10k low-priority rows. @@ -561,7 +556,6 @@ bucket_definitions: content: BASIC_SYNC_RULES }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -626,7 +620,6 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); const result = await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -671,7 +664,6 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); const stream = sync.streamResponse({ syncContext, @@ -699,7 +691,6 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); const stream = sync.streamResponse({ syncContext, @@ -770,7 +761,6 @@ bucket_definitions: const listsTable = test_utils.makeTestTable('lists', ['id']); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); const stream = sync.streamResponse({ syncContext, @@ -833,7 +823,6 @@ bucket_definitions: const listsTable = test_utils.makeTestTable('lists', ['id']); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -911,7 +900,6 @@ bucket_definitions: const listsTable = test_utils.makeTestTable('lists', ['id']); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); const stream = sync.streamResponse({ syncContext, @@ -974,7 +962,6 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); const exp = Date.now() / 1000 + 0.1; @@ -1016,7 +1003,6 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { await batch.save({ @@ -1157,7 +1143,6 @@ bucket_definitions: }); const bucketStorage = f.getInstance(syncRules); - await bucketStorage.autoActivate(); await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { // <= the managed write checkpoint LSN below diff --git a/packages/service-core/src/storage/SyncRulesBucketStorage.ts b/packages/service-core/src/storage/SyncRulesBucketStorage.ts index 2317b830f..6b27e3c7c 100644 --- a/packages/service-core/src/storage/SyncRulesBucketStorage.ts +++ b/packages/service-core/src/storage/SyncRulesBucketStorage.ts @@ -50,8 +50,6 @@ export interface SyncRulesBucketStorage */ clear(options?: ClearStorageOptions): Promise; - autoActivate(): Promise; - /** * Record a replication error. * From 07251e594fe9af6a7e99d052e7a60603f56a6208 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 14:48:11 +0200 Subject: [PATCH 02/16] Refactor resuming of replication. --- .../implementation/MongoBucketBatch.ts | 14 ++++++++++ .../implementation/MongoSyncBucketStorage.ts | 4 ++- .../src/replication/ChangeStream.ts | 15 +++++++--- .../test/src/change_stream.test.ts | 1 + .../test/src/change_stream_utils.ts | 28 +++++++++++++------ .../test/src/BinlogStreamUtils.ts | 1 - .../src/storage/PostgresSyncRulesStorage.ts | 7 +++-- .../src/storage/batch/PostgresBucketBatch.ts | 4 +++ .../test/src/large_batch.test.ts | 1 - .../test/src/slow_tests.test.ts | 2 -- .../test/src/wal_stream_utils.ts | 1 - .../src/tests/register-data-storage-tests.ts | 6 ---- .../src/tests/register-sync-tests.ts | 16 +++++++++++ .../src/storage/BucketStorageBatch.ts | 7 +++++ packages/service-core/src/util/util-index.ts | 1 + 15 files changed, 82 insertions(+), 26 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 38f9701ed..5cfe1b802 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -49,6 +49,7 @@ export interface MongoBucketBatchOptions { lastCheckpointLsn: string | null; keepaliveOp: InternalOpId | null; noCheckpointBeforeLsn: string; + resumeFromLsn: string | null; storeCurrentData: boolean; /** * Set to true for initial replication. @@ -99,6 +100,18 @@ export class MongoBucketBatch */ public last_flushed_op: InternalOpId | null = null; + /** + * lastCheckpointLsn is the last consistent commit. + * + * While that is generally a "safe" point to resume from, there are cases where we may want to resume from a different point: + * 1. After an initial snapshot, we don't have a consistent commit yet, but need to resume from the snapshot LSN. + * 2. If "no_checkpoint_before_lsn" is set far in advance, it may take a while to reach that point. We + * may want to resume at incremental points before that. + * + * This is set when creating the batch, but may not be updated afterwards. + */ + public resumeFromLsn: string | null = null; + private needsActivation = true; constructor(options: MongoBucketBatchOptions) { @@ -109,6 +122,7 @@ export class MongoBucketBatch this.group_id = options.groupId; this.last_checkpoint_lsn = options.lastCheckpointLsn; this.no_checkpoint_before_lsn = options.noCheckpointBeforeLsn; + this.resumeFromLsn = options.resumeFromLsn; this.session = this.client.startSession(); this.slot_name = options.slotName; this.sync_rules = options.syncRules; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts index 4797ce150..6a0cfcf04 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoSyncBucketStorage.ts @@ -16,6 +16,7 @@ import { GetCheckpointChangesOptions, InternalOpId, internalToExternalOpId, + maxLsn, ProtocolOpId, ReplicationCheckpoint, storage, @@ -131,7 +132,7 @@ export class MongoSyncBucketStorage { _id: this.group_id }, - { projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1 } } + { projection: { last_checkpoint_lsn: 1, no_checkpoint_before: 1, keepalive_op: 1, snapshot_lsn: 1 } } ); const checkpoint_lsn = doc?.last_checkpoint_lsn ?? null; @@ -142,6 +143,7 @@ export class MongoSyncBucketStorage groupId: this.group_id, slotName: this.slot_name, lastCheckpointLsn: checkpoint_lsn, + resumeFromLsn: maxLsn(checkpoint_lsn, doc?.snapshot_lsn), noCheckpointBeforeLsn: doc?.no_checkpoint_before ?? options.zeroLSN, keepaliveOp: doc?.keepalive_op ? BigInt(doc.keepalive_op) : null, storeCurrentData: options.storeCurrentData, diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 36ec7567f..97551967c 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -376,6 +376,11 @@ export class ChangeStream { const checkpoint = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); await batch.markSnapshotDone([], checkpoint); + // We cannot create a consistent commit at this point. We previously had + // commit(snapshotLsn), but since snapshotLsn < checkpoint, it does no more + // than a flush(). + await batch.flush(); + this.logger.info(`Snapshot done. Need to replicate from ${snapshotLsn} to ${checkpoint} to be consistent`); } ); @@ -669,7 +674,6 @@ export class ChangeStream { try { // If anything errors here, the entire replication process is halted, and // all connections automatically closed, including this one. - await this.initReplication(); await this.streamChanges(); } catch (e) { @@ -779,8 +783,11 @@ export class ChangeStream { storeCurrentData: false }, async (batch) => { - const { lastCheckpointLsn } = batch; - const lastLsn = MongoLSN.fromSerialized(lastCheckpointLsn!); + const { resumeFromLsn } = batch; + if (resumeFromLsn == null) { + throw new ReplicationAssertionError(`No LSN found to resume from`); + } + const lastLsn = MongoLSN.fromSerialized(resumeFromLsn); const startAfter = lastLsn?.timestamp; // It is normal for this to be a minute or two old when there is a low volume @@ -789,7 +796,7 @@ export class ChangeStream { this.logger.info(`Resume streaming at ${startAfter?.inspect()} / ${lastLsn} | Token age: ${tokenAgeSeconds}s`); - await using streamManager = this.openChangeStream({ lsn: lastCheckpointLsn }); + await using streamManager = this.openChangeStream({ lsn: resumeFromLsn }); const { stream, filters } = streamManager; if (this.abort_signal.aborted) { await stream.close(); diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index 2aa524e77..c0f03b0e2 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -512,6 +512,7 @@ bucket_definitions: await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); await context.replicateSnapshot(); + await context.markSnapshotConsistent(); // Simulate an error await context.storage!.reportError(new Error('simulated error')); diff --git a/modules/module-mongodb/test/src/change_stream_utils.ts b/modules/module-mongodb/test/src/change_stream_utils.ts index cc8d17732..b79d12fb4 100644 --- a/modules/module-mongodb/test/src/change_stream_utils.ts +++ b/modules/module-mongodb/test/src/change_stream_utils.ts @@ -17,7 +17,7 @@ import { MongoManager } from '@module/replication/MongoManager.js'; import { createCheckpoint, STANDALONE_CHECKPOINT_ID } from '@module/replication/MongoRelation.js'; import { NormalizedMongoConnectionConfig } from '@module/types/types.js'; -import { TEST_CONNECTION_OPTIONS, clearTestDb } from './util.js'; +import { clearTestDb, TEST_CONNECTION_OPTIONS } from './util.js'; export class ChangeStreamTestContext { private _walStream?: ChangeStream; @@ -119,7 +119,20 @@ export class ChangeStreamTestContext { async replicateSnapshot() { await this.walStream.initReplication(); - await this.storage!.autoActivate(); + } + + /** + * A snapshot is not consistent until streaming replication has caught up. + * We simulate that for tests. + * Do not use if there are any writes performed while doing the snapshot, as that + * would result in inconsistent data. + */ + async markSnapshotConsistent() { + const checkpoint = await createCheckpoint(this.client, this.db, STANDALONE_CHECKPOINT_ID); + + await this.storage!.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive(checkpoint); + }); } startStreaming() { @@ -195,12 +208,11 @@ export async function getClientCheckpoint( while (Date.now() - start < timeout) { const storage = await storageFactory.getActiveStorage(); const cp = await storage?.getCheckpoint(); - if (cp == null) { - throw new Error('No sync rules available'); - } - lastCp = cp; - if (cp.lsn && cp.lsn >= lsn) { - return cp.checkpoint; + if (cp != null) { + lastCp = cp; + if (cp.lsn && cp.lsn >= lsn) { + return cp.checkpoint; + } } await new Promise((resolve) => setTimeout(resolve, 30)); } diff --git a/modules/module-mysql/test/src/BinlogStreamUtils.ts b/modules/module-mysql/test/src/BinlogStreamUtils.ts index 575f50aea..b83aa2bd8 100644 --- a/modules/module-mysql/test/src/BinlogStreamUtils.ts +++ b/modules/module-mysql/test/src/BinlogStreamUtils.ts @@ -112,7 +112,6 @@ export class BinlogStreamTestContext { async replicateSnapshot() { await this.binlogStream.initReplication(); - await this.storage!.autoActivate(); this.replicationDone = true; } diff --git a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts index c692d4ef6..df050372b 100644 --- a/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts +++ b/modules/module-postgres-storage/src/storage/PostgresSyncRulesStorage.ts @@ -7,6 +7,7 @@ import { InternalOpId, internalToExternalOpId, LastValueSink, + maxLsn, storage, utils, WatchWriteCheckpointOptions @@ -310,13 +311,14 @@ export class PostgresSyncRulesStorage SELECT last_checkpoint_lsn, no_checkpoint_before, - keepalive_op + keepalive_op, + snapshot_lsn FROM sync_rules WHERE id = ${{ type: 'int4', value: this.group_id }} ` - .decoded(pick(models.SyncRules, ['last_checkpoint_lsn', 'no_checkpoint_before', 'keepalive_op'])) + .decoded(pick(models.SyncRules, ['last_checkpoint_lsn', 'no_checkpoint_before', 'keepalive_op', 'snapshot_lsn'])) .first(); const checkpoint_lsn = syncRules?.last_checkpoint_lsn ?? null; @@ -330,6 +332,7 @@ export class PostgresSyncRulesStorage last_checkpoint_lsn: checkpoint_lsn, keep_alive_op: syncRules?.keepalive_op, no_checkpoint_before_lsn: syncRules?.no_checkpoint_before ?? options.zeroLSN, + resumeFromLsn: maxLsn(syncRules?.snapshot_lsn, checkpoint_lsn), store_current_data: options.storeCurrentData, skip_existing_rows: options.skipExistingRows ?? false, batch_limits: this.options.batchLimits, diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index 85752e328..90105cd2e 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -31,6 +31,7 @@ export interface PostgresBucketBatchOptions { no_checkpoint_before_lsn: string; store_current_data: boolean; keep_alive_op?: InternalOpId | null; + resumeFromLsn: string | null; /** * Set to true for initial replication. */ @@ -61,6 +62,8 @@ export class PostgresBucketBatch public last_flushed_op: InternalOpId | null = null; + public resumeFromLsn: string | null; + protected db: lib_postgres.DatabaseClient; protected group_id: number; protected last_checkpoint_lsn: string | null; @@ -82,6 +85,7 @@ export class PostgresBucketBatch this.group_id = options.group_id; this.last_checkpoint_lsn = options.last_checkpoint_lsn; this.no_checkpoint_before_lsn = options.no_checkpoint_before_lsn; + this.resumeFromLsn = options.resumeFromLsn; this.write_checkpoint_batch = []; this.sync_rules = options.sync_rules; this.markRecordUnavailable = options.markRecordUnavailable; diff --git a/modules/module-postgres/test/src/large_batch.test.ts b/modules/module-postgres/test/src/large_batch.test.ts index e084b65a7..777662f12 100644 --- a/modules/module-postgres/test/src/large_batch.test.ts +++ b/modules/module-postgres/test/src/large_batch.test.ts @@ -87,7 +87,6 @@ function defineBatchTests(factory: storage.TestStorageFactory) { const start = Date.now(); await context.replicateSnapshot(); - await context.storage!.autoActivate(); context.startStreaming(); const checkpoint = await context.getCheckpoint({ timeout: 100_000 }); diff --git a/modules/module-postgres/test/src/slow_tests.test.ts b/modules/module-postgres/test/src/slow_tests.test.ts index 551eea9fe..ae5294887 100644 --- a/modules/module-postgres/test/src/slow_tests.test.ts +++ b/modules/module-postgres/test/src/slow_tests.test.ts @@ -97,7 +97,6 @@ bucket_definitions: await pool.query(`ALTER TABLE test_data REPLICA IDENTITY FULL`); await walStream.initReplication(replicationConnection); - await storage.autoActivate(); let abort = false; streamPromise = walStream.streamChanges(replicationConnection).finally(() => { abort = true; @@ -348,7 +347,6 @@ bucket_definitions: let initialReplicationDone = false; streamPromise = (async () => { await walStream.initReplication(replicationConnection); - await storage.autoActivate(); initialReplicationDone = true; await walStream.streamChanges(replicationConnection); })() diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index aa1170279..f18631c43 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -122,7 +122,6 @@ export class WalStreamTestContext implements AsyncDisposable { const promise = (async () => { this.replicationConnection = await this.connectionManager.replicationConnection(); await this.walStream.initReplication(this.replicationConnection); - await this.storage!.autoActivate(); })(); this.snapshotPromise = promise.catch((e) => e); await promise; diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index a766fef47..e0ee721e4 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1603,7 +1603,6 @@ bucket_definitions: const r = await f.configureSyncRules({ content: 'bucket_definitions: {}', validate: false }); const storage = f.getInstance(r.persisted_sync_rules!); - await storage.autoActivate(); const metrics2 = await f.getStorageMetrics(); expect(metrics2).toMatchSnapshot(); @@ -1656,7 +1655,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); const abortController = new AbortController(); context.onTestFinished(() => abortController.abort()); @@ -1697,7 +1695,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); const abortController = new AbortController(); context.onTestFinished(() => abortController.abort()); @@ -1760,7 +1757,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); const abortController = new AbortController(); @@ -1801,7 +1797,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); const abortController = new AbortController(); @@ -1845,7 +1840,6 @@ bucket_definitions: validate: false }); const bucketStorage = factory.getInstance(r.persisted_sync_rules!); - await bucketStorage.autoActivate(); bucketStorage.setWriteCheckpointMode(storage.WriteCheckpointMode.CUSTOM); const abortController = new AbortController(); diff --git a/packages/service-core-tests/src/tests/register-sync-tests.ts b/packages/service-core-tests/src/tests/register-sync-tests.ts index 173b1ea85..fc661dd48 100644 --- a/packages/service-core-tests/src/tests/register-sync-tests.ts +++ b/packages/service-core-tests/src/tests/register-sync-tests.ts @@ -691,6 +691,10 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); + // Activate + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('0/0'); + }); const stream = sync.streamResponse({ syncContext, @@ -761,6 +765,10 @@ bucket_definitions: const listsTable = test_utils.makeTestTable('lists', ['id']); const bucketStorage = await f.getInstance(syncRules); + // Activate + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('0/0'); + }); const stream = sync.streamResponse({ syncContext, @@ -900,6 +908,10 @@ bucket_definitions: const listsTable = test_utils.makeTestTable('lists', ['id']); const bucketStorage = await f.getInstance(syncRules); + // Activate + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('0/0'); + }); const stream = sync.streamResponse({ syncContext, @@ -962,6 +974,10 @@ bucket_definitions: }); const bucketStorage = await f.getInstance(syncRules); + // Activate + await bucketStorage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('0/0'); + }); const exp = Date.now() / 1000 + 0.1; diff --git a/packages/service-core/src/storage/BucketStorageBatch.ts b/packages/service-core/src/storage/BucketStorageBatch.ts index 9499af594..22c5a8760 100644 --- a/packages/service-core/src/storage/BucketStorageBatch.ts +++ b/packages/service-core/src/storage/BucketStorageBatch.ts @@ -72,6 +72,13 @@ export interface BucketStorageBatch extends ObserverClient; updateTableProgress(table: SourceTable, progress: Partial): Promise; diff --git a/packages/service-core/src/util/util-index.ts b/packages/service-core/src/util/util-index.ts index acdd09c4a..56e9cbc0e 100644 --- a/packages/service-core/src/util/util-index.ts +++ b/packages/service-core/src/util/util-index.ts @@ -1,5 +1,6 @@ export * from './alerting.js'; export * from './env.js'; +export * from './lsn.js'; export * from './memory-tracking.js'; export * from './Mutex.js'; export * from './protocol-types.js'; From 47bbae7a64234bb7325ec146dcab10a814300c5c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 14:52:20 +0200 Subject: [PATCH 03/16] Fix some postgres tests. --- modules/module-postgres/test/src/checkpoints.test.ts | 2 ++ modules/module-postgres/test/src/util.ts | 6 ++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/modules/module-postgres/test/src/checkpoints.test.ts b/modules/module-postgres/test/src/checkpoints.test.ts index fbae9f5e8..70431f613 100644 --- a/modules/module-postgres/test/src/checkpoints.test.ts +++ b/modules/module-postgres/test/src/checkpoints.test.ts @@ -36,6 +36,8 @@ const checkpointTests = (factory: TestStorageFactory) => { await context.replicateSnapshot(); context.startStreaming(); + // Wait for a consistent checkpoint before we start. + await context.getCheckpoint(); const storage = context.storage!; const controller = new AbortController(); diff --git a/modules/module-postgres/test/src/util.ts b/modules/module-postgres/test/src/util.ts index 7a75db622..130b70fe9 100644 --- a/modules/module-postgres/test/src/util.ts +++ b/modules/module-postgres/test/src/util.ts @@ -90,10 +90,8 @@ export async function getClientCheckpoint( while (Date.now() - start < timeout) { const storage = await storageFactory.getActiveStorage(); const cp = await storage?.getCheckpoint(); - if (cp == null) { - throw new Error('No sync rules available'); - } - if (cp.lsn && cp.lsn >= lsn) { + + if (cp?.lsn != null && cp.lsn >= lsn) { logger.info(`Got write checkpoint: ${lsn} : ${cp.checkpoint}`); return cp.checkpoint; } From 7f8d2cdeda00e1b9be08e68c3868a8885f8ec2c7 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 14:53:31 +0200 Subject: [PATCH 04/16] Add missing file. --- packages/service-core/src/util/lsn.ts | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 packages/service-core/src/util/lsn.ts diff --git a/packages/service-core/src/util/lsn.ts b/packages/service-core/src/util/lsn.ts new file mode 100644 index 000000000..4b1175628 --- /dev/null +++ b/packages/service-core/src/util/lsn.ts @@ -0,0 +1,8 @@ +/** + * Return the larger of two LSNs. + */ +export function maxLsn(a: string | null | undefined, b: string | null | undefined): string | null { + if (a == null) return b ?? null; + if (b == null) return a; + return a > b ? a : b; +} From 5bcd4e810c8804994a446013acdf5f2eebe5fdd1 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 15:04:42 +0200 Subject: [PATCH 05/16] Postgres storage: activate on keepalive. --- modules/module-mongodb/test/src/change_stream.test.ts | 4 +++- .../src/storage/batch/PostgresBucketBatch.ts | 1 + 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index c0f03b0e2..a43394ccc 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -516,7 +516,9 @@ bucket_definitions: // Simulate an error await context.storage!.reportError(new Error('simulated error')); - expect((await context.factory.getActiveSyncRulesContent())?.last_fatal_error).toEqual('simulated error'); + const syncRules = await context.factory.getActiveSyncRulesContent(); + expect(syncRules).toBeTruthy(); + expect(syncRules?.last_fatal_error).toEqual('simulated error'); // startStreaming() should automatically clear the error. context.startStreaming(); diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index 90105cd2e..1f83d4d53 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -413,6 +413,7 @@ export class PostgresBucketBatch .decoded(StatefulCheckpoint) .first(); + await this.autoActivate(lsn); await notifySyncRulesUpdate(this.db, updated!); this.last_checkpoint_lsn = lsn; From 18e2a2f0edd20b249aeb1834d6d6d7791f09eecf Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 15:44:53 +0200 Subject: [PATCH 06/16] We do need a commit after snapshots. --- modules/module-mongodb/src/replication/ChangeStream.ts | 7 +++---- modules/module-mongodb/test/src/change_stream.test.ts | 3 +++ 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 97551967c..31a76af69 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -376,10 +376,9 @@ export class ChangeStream { const checkpoint = await createCheckpoint(this.client, this.defaultDb, STANDALONE_CHECKPOINT_ID); await batch.markSnapshotDone([], checkpoint); - // We cannot create a consistent commit at this point. We previously had - // commit(snapshotLsn), but since snapshotLsn < checkpoint, it does no more - // than a flush(). - await batch.flush(); + // This will not create a consistent checkpoint yet, but will persist the op. + // Actual checkpoint will be created when streaming replication caught up. + await batch.commit(snapshotLsn); this.logger.info(`Snapshot done. Need to replicate from ${snapshotLsn} to ${checkpoint} to be consistent`); } diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index a43394ccc..ba647a7e8 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -354,6 +354,9 @@ bucket_definitions: const test_id = result.insertedId.toHexString(); await context.replicateSnapshot(); + // Note: snapshot is only consistent some time into the streaming request. + // At the point that we get the first acknowledged checkpoint, as is required + // for getBucketData(), the data should be consistent. context.startStreaming(); const data = await context.getBucketData('global[]'); From 7b4a91f6a55deae39b1e566ed5229fda1703e3a4 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 16:12:35 +0200 Subject: [PATCH 07/16] Only trigger resnapshot if needed. --- .../src/storage/implementation/MongoBucketBatch.ts | 3 ++- modules/module-postgres/src/replication/WalStream.ts | 3 +++ modules/module-postgres/test/src/wal_stream.test.ts | 4 ++-- 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 5cfe1b802..90c2885bf 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -16,6 +16,7 @@ import { BucketStorageMarkRecordUnavailable, deserializeBson, InternalOpId, + isCompleteRow, SaveOperationTag, storage, utils @@ -348,7 +349,7 @@ export class MongoBucketBatch // Not an error if we re-apply a transaction existing_buckets = []; existing_lookups = []; - if (this.storeCurrentData) { + if (!isCompleteRow(this.storeCurrentData, after!)) { if (this.markRecordUnavailable != null) { // This will trigger a "resnapshot" of the record. // This is not relevant if storeCurrentData is false, since we'll get the full row diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 219f7cdda..c00df274c 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -737,6 +737,9 @@ WHERE oid = $1::regclass`, rows.map((r) => r.key) ); } + // Even with resnapshot, we need to wait until we get a new consistent checkpoint + // after the snapshot, so we need to send a keepalive message. + await sendKeepAlive(db); } finally { await db.end(); } diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 2293c54c3..011061889 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -179,8 +179,8 @@ bucket_definitions: ) ); - // This update may fail replicating with: - // Error: Update on missing record public.test_data:074a601e-fc78-4c33-a15d-f89fdd4af31d :: {"g":1,"t":"651e9fbe9fec6155895057ec","k":"1a0b34da-fb8c-5e6f-8421-d7a3c5d4df4f"} + // Since we don't have an old copy of the record with the new primary key, this + // may trigger a "resnapshot". await pool.query(`UPDATE test_data SET description = 'test2b' WHERE id = '${test_id2}'`); // Re-use old id again From 39260a07039d4a48d5096d4f80db7bbaff781a7c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 16:30:54 +0200 Subject: [PATCH 08/16] Improve test stability. --- modules/module-mongodb/test/src/change_stream.test.ts | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/modules/module-mongodb/test/src/change_stream.test.ts b/modules/module-mongodb/test/src/change_stream.test.ts index ba647a7e8..c6aacee0a 100644 --- a/modules/module-mongodb/test/src/change_stream.test.ts +++ b/modules/module-mongodb/test/src/change_stream.test.ts @@ -23,7 +23,9 @@ describe('change stream', () => { function defineChangeStreamTests(factory: storage.TestStorageFactory) { test('replicating basic values', async () => { - await using context = await ChangeStreamTestContext.open(factory); + await using context = await ChangeStreamTestContext.open(factory, { + mongoOptions: { postImages: PostImagesOption.READ_ONLY } + }); const { db } = context; await context.updateSyncRules(` bucket_definitions: @@ -32,7 +34,7 @@ bucket_definitions: - SELECT _id as id, description, num FROM "test_data"`); await db.createCollection('test_data', { - changeStreamPreAndPostImages: { enabled: false } + changeStreamPreAndPostImages: { enabled: true } }); const collection = db.collection('test_data'); @@ -42,11 +44,8 @@ bucket_definitions: const result = await collection.insertOne({ description: 'test1', num: 1152921504606846976n }); const test_id = result.insertedId; - await setTimeout(30); await collection.updateOne({ _id: test_id }, { $set: { description: 'test2' } }); - await setTimeout(30); await collection.replaceOne({ _id: test_id }, { description: 'test3' }); - await setTimeout(30); await collection.deleteOne({ _id: test_id }); const data = await context.getBucketData('global[]'); From d58f60115449c3dd58aadebe1ee1e132f4b1b5ba Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 16:33:04 +0200 Subject: [PATCH 09/16] Fix metrics test. --- .../src/tests/register-data-storage-tests.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/packages/service-core-tests/src/tests/register-data-storage-tests.ts b/packages/service-core-tests/src/tests/register-data-storage-tests.ts index e0ee721e4..57e56db8c 100644 --- a/packages/service-core-tests/src/tests/register-data-storage-tests.ts +++ b/packages/service-core-tests/src/tests/register-data-storage-tests.ts @@ -1603,6 +1603,9 @@ bucket_definitions: const r = await f.configureSyncRules({ content: 'bucket_definitions: {}', validate: false }); const storage = f.getInstance(r.persisted_sync_rules!); + await storage.startBatch(test_utils.BATCH_OPTIONS, async (batch) => { + await batch.keepalive('1/0'); + }); const metrics2 = await f.getStorageMetrics(); expect(metrics2).toMatchSnapshot(); From 954cc8f43687d81ef068c2cfea02489fb13f5009 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 16:34:58 +0200 Subject: [PATCH 10/16] Improve GA labels. --- .github/workflows/test.yml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 6af3a0552..1cee0f71b 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -168,10 +168,10 @@ jobs: shell: bash run: pnpm build - - name: Test + - name: Test Replication run: pnpm test --filter='./modules/module-postgres' - - name: Test + - name: Test Storage run: pnpm test --filter='./modules/module-postgres-storage' run-mysql-tests: @@ -252,7 +252,7 @@ jobs: shell: bash run: pnpm build - - name: Test + - name: Test Replication run: pnpm test --filter='./modules/module-mysql' run-mongodb-tests: @@ -320,7 +320,7 @@ jobs: shell: bash run: pnpm build - - name: Test + - name: Test Replication run: pnpm test --filter='./modules/module-mongodb' - name: Test Storage From 9a85cd6aad959cd9b7753cebff2e424c46be07cf Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 16:55:34 +0200 Subject: [PATCH 11/16] Add changeset. --- .changeset/shiny-pugs-train.md | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .changeset/shiny-pugs-train.md diff --git a/.changeset/shiny-pugs-train.md b/.changeset/shiny-pugs-train.md new file mode 100644 index 000000000..52277e778 --- /dev/null +++ b/.changeset/shiny-pugs-train.md @@ -0,0 +1,11 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core-tests': minor +'@powersync/service-module-postgres': minor +'@powersync/service-module-mongodb': minor +'@powersync/service-core': minor +'@powersync/service-module-mysql': minor +--- + +Delay switching over to new sync rules until we have a consistent checkpoint. From 4668cd0bc9bfea99f2469eccc58e868cfd878653 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 17:26:12 +0200 Subject: [PATCH 12/16] Further test stability improvements. --- .../test/src/wal_stream.test.ts | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 011061889..f575fd2cf 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -35,12 +35,13 @@ bucket_definitions: ); await context.replicateSnapshot(); + context.startStreaming(); + // Make sure we're up to date + await context.getCheckpoint(); const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); - const [{ test_id }] = pgwireRows( await pool.query( `INSERT INTO test_data(description, num) VALUES('test1', 1152921504606846976) returning id as test_id` @@ -169,6 +170,9 @@ bucket_definitions: await context.replicateSnapshot(); context.startStreaming(); + // Make sure we're up to date + await context.getCheckpoint(); + const [{ test_id }] = pgwireRows( await pool.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) ); @@ -266,14 +270,14 @@ bucket_definitions: await context.replicateSnapshot(); + context.startStreaming(); + // Make sure we're up to date + await context.getCheckpoint(); + const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); - - const [{ test_id }] = pgwireRows( - await pool.query(`INSERT INTO test_donotsync(description) VALUES('test1') returning id as test_id`) - ); + await pool.query(`INSERT INTO test_donotsync(description) VALUES('test1') returning id as test_id`); const data = await context.getBucketData('global[]'); From f4d9a40363681af621446df666ee96e42e4f085f Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 21 Jul 2025 17:53:50 +0200 Subject: [PATCH 13/16] And more test fixes. --- .../test/src/wal_stream.test.ts | 24 ++++--------------- .../test/src/wal_stream_utils.ts | 10 ++++++++ 2 files changed, 15 insertions(+), 19 deletions(-) diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index f575fd2cf..634376793 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -34,10 +34,7 @@ bucket_definitions: `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)` ); - await context.replicateSnapshot(); - context.startStreaming(); - // Make sure we're up to date - await context.getCheckpoint(); + await context.initializeReplication(); const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; @@ -70,13 +67,11 @@ bucket_definitions: await pool.query(`DROP TABLE IF EXISTS "test_DATA"`); await pool.query(`CREATE TABLE "test_DATA"(id uuid primary key default uuid_generate_v4(), description text)`); - await context.replicateSnapshot(); + await context.initializeReplication(); const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; - context.startStreaming(); - const [{ test_id }] = pgwireRows( await pool.query(`INSERT INTO "test_DATA"(description) VALUES('test1') returning id as test_id`) ); @@ -144,8 +139,7 @@ bucket_definitions: await pool.query(`DROP TABLE IF EXISTS test_data`); await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); - await context.replicateSnapshot(); - context.startStreaming(); + await context.initializeReplication(); const [{ test_id }] = pgwireRows( await pool.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) @@ -167,11 +161,7 @@ bucket_definitions: await pool.query(`DROP TABLE IF EXISTS test_data`); await pool.query(`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text)`); - await context.replicateSnapshot(); - context.startStreaming(); - - // Make sure we're up to date - await context.getCheckpoint(); + await context.initializeReplication(); const [{ test_id }] = pgwireRows( await pool.query(`INSERT INTO test_data(description) VALUES('test1') returning id as test_id`) @@ -268,11 +258,7 @@ bucket_definitions: await pool.query(`CREATE TABLE test_donotsync(id uuid primary key default uuid_generate_v4(), description text)`); - await context.replicateSnapshot(); - - context.startStreaming(); - // Make sure we're up to date - await context.getCheckpoint(); + await context.initializeReplication(); const startRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const startTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index f18631c43..b94c73c5a 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -118,6 +118,16 @@ export class WalStreamTestContext implements AsyncDisposable { return this._walStream!; } + /** + * Replicate a snapshot, start streaming, and wait for a consistent checkpoint. + */ + async initializeReplication() { + await this.replicateSnapshot(); + this.startStreaming(); + // Make sure we're up to date + await this.getCheckpoint(); + } + async replicateSnapshot() { const promise = (async () => { this.replicationConnection = await this.connectionManager.replicationConnection(); From 2f69538778b4ae21c84b663804d8eaba813e12b3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 22 Jul 2025 09:08:54 +0200 Subject: [PATCH 14/16] Periodically persist replication progress in absense of commits. --- .../implementation/MongoBucketBatch.ts | 2 +- .../src/storage/implementation/models.ts | 6 ++++++ .../src/replication/ChangeStream.ts | 20 +++++++++++++++++-- .../src/storage/batch/PostgresBucketBatch.ts | 2 +- .../src/storage/BucketStorageBatch.ts | 8 ++++++-- 5 files changed, 32 insertions(+), 6 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 90c2885bf..ceee3542e 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -860,7 +860,7 @@ export class MongoBucketBatch return true; } - async setSnapshotLsn(lsn: string): Promise { + async setResumeLsn(lsn: string): Promise { const update: Partial = { snapshot_lsn: lsn }; diff --git a/modules/module-mongodb-storage/src/storage/implementation/models.ts b/modules/module-mongodb-storage/src/storage/implementation/models.ts index 181d50fff..33eac22d8 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/models.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/models.ts @@ -118,9 +118,15 @@ export interface SyncRuleDocument { snapshot_done: boolean; /** + * This is now used for "resumeLsn". + * * If snapshot_done = false, this may be the lsn at which we started the snapshot. * * This can be used for resuming the snapshot after a restart. + * + * If snapshot_done is true, this is treated as the point to restart replication from. + * + * More specifically, we resume replication from max(snapshot_lsn, last_checkpoint_lsn). */ snapshot_lsn: string | undefined; diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 31a76af69..53a7013f4 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -329,7 +329,7 @@ export class ChangeStream { if (snapshotLsn == null) { // First replication attempt - get a snapshot and store the timestamp snapshotLsn = await this.getSnapshotLsn(); - await batch.setSnapshotLsn(snapshotLsn); + await batch.setResumeLsn(snapshotLsn); this.logger.info(`Marking snapshot at ${snapshotLsn}`); } else { this.logger.info(`Resuming snapshot at ${snapshotLsn}`); @@ -814,6 +814,7 @@ export class ChangeStream { let splitDocument: mongo.ChangeStreamDocument | null = null; let flexDbNameWorkaroundLogged = false; + let changesSinceLastCheckpoint = 0; let lastEmptyResume = performance.now(); @@ -983,6 +984,7 @@ export class ChangeStream { if (didCommit) { this.oldestUncommittedChange = null; this.isStartingReplication = false; + changesSinceLastCheckpoint = 0; } } else if ( changeDocument.operationType == 'insert' || @@ -1005,7 +1007,21 @@ export class ChangeStream { if (this.oldestUncommittedChange == null && changeDocument.clusterTime != null) { this.oldestUncommittedChange = timestampToDate(changeDocument.clusterTime); } - await this.writeChange(batch, table, changeDocument); + const flushResult = await this.writeChange(batch, table, changeDocument); + changesSinceLastCheckpoint += 1; + if (flushResult != null && changesSinceLastCheckpoint >= 20_000) { + // When we are catching up replication after an initial snapshot, there may be a very long delay + // before we do a commit(). In that case, we need to periodically persist the resume LSN, so + // we don't restart from scratch if we restart replication. + // The same could apply if we need to catch up on replication after some downtime. + const { comparable: lsn } = new MongoLSN({ + timestamp: changeDocument.clusterTime!, + resume_token: changeDocument._id + }); + this.logger.info(`Updating resume LSN to ${lsn} after ${changesSinceLastCheckpoint} changes`); + await batch.setResumeLsn(lsn); + changesSinceLastCheckpoint = 0; + } } } else if (changeDocument.operationType == 'drop') { const rel = getMongoRelation(changeDocument.ns); diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index 1f83d4d53..aa0eb0125 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -420,7 +420,7 @@ export class PostgresBucketBatch return true; } - async setSnapshotLsn(lsn: string): Promise { + async setResumeLsn(lsn: string): Promise { await this.db.sql` UPDATE sync_rules SET diff --git a/packages/service-core/src/storage/BucketStorageBatch.ts b/packages/service-core/src/storage/BucketStorageBatch.ts index 22c5a8760..62db7dd43 100644 --- a/packages/service-core/src/storage/BucketStorageBatch.ts +++ b/packages/service-core/src/storage/BucketStorageBatch.ts @@ -60,12 +60,16 @@ export interface BucketStorageBatch extends ObserverClient; /** - * Set the LSN for a snapshot, before starting replication. + * Set the LSN that replication should resume from. + * + * This can be used for: + * 1. Setting the LSN for a snapshot, before starting replication. + * 2. Setting the LSN to resume from after a replication restart, without advancing the checkpoint LSN via a commit. * * Not required if the source database keeps track of this, for example with * PostgreSQL logical replication slots. */ - setSnapshotLsn(lsn: string): Promise; + setResumeLsn(lsn: string): Promise; /** * Get the last checkpoint LSN, from either commit or keepalive. From d30d3b549b11cb8d1f38069684a5c0919d480369 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 22 Jul 2025 10:29:55 +0200 Subject: [PATCH 15/16] Avoid waiting for probes.touch(). --- .../src/replication/ChangeStream.ts | 25 +++++++++++-------- .../src/replication/WalStream.ts | 19 +++++++------- 2 files changed, 24 insertions(+), 20 deletions(-) diff --git a/modules/module-mongodb/src/replication/ChangeStream.ts b/modules/module-mongodb/src/replication/ChangeStream.ts index 53a7013f4..21d83d27a 100644 --- a/modules/module-mongodb/src/replication/ChangeStream.ts +++ b/modules/module-mongodb/src/replication/ChangeStream.ts @@ -367,7 +367,7 @@ export class ChangeStream { await this.snapshotTable(batch, table); await batch.markSnapshotDone([table], MongoLSN.ZERO.comparable); - await touch(); + this.touch(); } // The checkpoint here is a marker - we need to replicate up to at least this @@ -509,7 +509,7 @@ export class ChangeStream { this.logger.info( `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} in ${duration.toFixed(0)}ms` ); - await touch(); + this.touch(); } // In case the loop was interrupted, make sure we await the last promise. await nextChunkPromise; @@ -849,7 +849,7 @@ export class ChangeStream { if (waitForCheckpointLsn == null && performance.now() - lastEmptyResume > 60_000) { const { comparable: lsn, timestamp } = MongoLSN.fromResumeToken(stream.resumeToken); await batch.keepalive(lsn); - await touch(); + this.touch(); lastEmptyResume = performance.now(); // Log the token update. This helps as a general "replication is still active" message in the logs. // This token would typically be around 10s behind. @@ -861,7 +861,7 @@ export class ChangeStream { continue; } - await touch(); + this.touch(); if (startAfter != null && originalChangeDocument.clusterTime?.lte(startAfter)) { continue; @@ -1069,13 +1069,18 @@ export class ChangeStream { } return Date.now() - this.oldestUncommittedChange.getTime(); } -} -async function touch() { - // FIXME: The hosted Kubernetes probe does not actually check the timestamp on this. - // FIXME: We need a timeout of around 5+ minutes in Kubernetes if we do start checking the timestamp, - // or reduce PING_INTERVAL here. - return container.probes.touch(); + private lastTouchedAt = performance.now(); + + private touch() { + if (performance.now() - this.lastTouchedAt > 1_000) { + this.lastTouchedAt = performance.now(); + // Update the probes, but don't wait for it + container.probes.touch().catch((e) => { + this.logger.error(`Failed to touch the container probe: ${e.message}`, e); + }); + } + } } function mapChangeStreamError(e: any) { diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index c00df274c..022426694 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -321,7 +321,7 @@ export class WalStream { // Check that replication slot exists for (let i = 120; i >= 0; i--) { - await touch(); + this.touch(); if (i == 0) { container.reporter.captureException(last_error, { @@ -479,7 +479,7 @@ WHERE oid = $1::regclass`, for (let table of tablesWithStatus) { await this.snapshotTableInTx(batch, db, table); - await touch(); + this.touch(); } // Always commit the initial snapshot at zero. @@ -628,7 +628,7 @@ WHERE oid = $1::regclass`, at += rows.length; this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(rows.length); - await touch(); + this.touch(); } // Important: flush before marking progress @@ -910,7 +910,7 @@ WHERE oid = $1::regclass`, let count = 0; for await (const chunk of replicationStream.pgoutputDecode()) { - await touch(); + this.touch(); if (this.abort_signal.aborted) { break; @@ -1090,11 +1090,10 @@ WHERE oid = $1::regclass`, } return Date.now() - this.oldestUncommittedChange.getTime(); } -} -async function touch() { - // FIXME: The hosted Kubernetes probe does not actually check the timestamp on this. - // FIXME: We need a timeout of around 5+ minutes in Kubernetes if we do start checking the timestamp, - // or reduce PING_INTERVAL here. - return container.probes.touch(); + private touch() { + container.probes.touch().catch((e) => { + this.logger.error(`Error touching probe`, e); + }); + } } From ff8de76825c1892cc742faf4c9ca19a5a655bba5 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Tue, 22 Jul 2025 11:03:38 +0200 Subject: [PATCH 16/16] Another attempt at making tests more stable. --- modules/module-postgres/test/src/wal_stream.test.ts | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 634376793..53f426171 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -51,7 +51,8 @@ bucket_definitions: const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const endTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); + // In some rare cases there may be additional empty transactions, so we allow for that. + expect(endTxCount - startTxCount).toBeGreaterThanOrEqual(1); }); test('replicating case sensitive table', async () => { @@ -82,7 +83,7 @@ bucket_definitions: const endRowCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.ROWS_REPLICATED)) ?? 0; const endTxCount = (await METRICS_HELPER.getMetricValueForTests(ReplicationMetric.TRANSACTIONS_REPLICATED)) ?? 0; expect(endRowCount - startRowCount).toEqual(1); - expect(endTxCount - startTxCount).toEqual(1); + expect(endTxCount - startTxCount).toBeGreaterThanOrEqual(1); }); test('replicating TOAST values', async () => { @@ -273,7 +274,7 @@ bucket_definitions: // There was a transaction, but we should not replicate any actual data expect(endRowCount - startRowCount).toEqual(0); - expect(endTxCount - startTxCount).toEqual(1); + expect(endTxCount - startTxCount).toBeGreaterThanOrEqual(1); }); test('reporting slot issues', async () => {