From c68189e2d7de2c3974c2346e4d625352d4e76908 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Nov 2025 15:35:23 +0200 Subject: [PATCH 1/4] Remove usage of pg_logical_slot_peek_binary_changes. --- .../src/replication/WalStream.ts | 185 +++++++----------- .../test/src/wal_stream.test.ts | 22 ++- .../test/src/wal_stream_utils.ts | 27 ++- 3 files changed, 106 insertions(+), 128 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index ce65603d..c43515a1 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -4,7 +4,6 @@ import { DatabaseConnectionError, logger as defaultLogger, ErrorCode, - errors, Logger, ReplicationAbortedError, ReplicationAssertionError @@ -100,8 +99,10 @@ export const sendKeepAlive = async (db: pgwire.PgClient) => { }; export class MissingReplicationSlotError extends Error { - constructor(message: string) { + constructor(message: string, cause: any) { super(message); + + this.cause = cause; } } @@ -304,135 +305,50 @@ export class WalStream { }) )[0]; + // Previously we also used pg_catalog.pg_logical_slot_peek_binary_changes to confirm that we can query the slot. + // However, there were some edge cases where the query times out, repeating the query, ultimately + // causing high load on the source database and never recovering automatically. + // We now instead jump straight to replication if the wal_status is not "lost", rather detecting those + // errors during streaming replication, which is a little more robust. + + // We can have: + // 1. needsInitialSync: true, lost slot -> MissingReplicationSlotError (starts new sync rules version). + // Theoretically we could handle this the same as (2). + // 2. needsInitialSync: true, no slot -> create new slot + // 3. needsInitialSync: true, valid slot -> resume initial sync + // 4. needsInitialSync: false, lost slot -> MissingReplicationSlotError (starts new sync rules version) + // 5. needsInitialSync: false, no slot -> MissingReplicationSlotError (starts new sync rules version) + // 6. needsInitialSync: false, valid slot -> resume streaming replication if (slot != null) { // This checks that the slot is still valid - const r = await this.checkReplicationSlot(slot as any); - if (snapshotDone && r.needsNewSlot) { - // We keep the current snapshot, and create a new replication slot - throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`); + + // wal_status is present in postgres 13+ + // invalidation_reason is present in postgres 17+ + const lost = slot.wal_status == 'lost'; + if (lost) { + // Case 1 / 4 + throw new MissingReplicationSlotError( + `Replication slot ${slotName} is not valid anymore. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}`, + undefined + ); } - // We can have: - // needsInitialSync: true, needsNewSlot: true -> initial sync from scratch - // needsInitialSync: true, needsNewSlot: false -> resume initial sync - // needsInitialSync: false, needsNewSlot: true -> handled above - // needsInitialSync: false, needsNewSlot: false -> resume streaming replication + // Case 3 / 6 return { needsInitialSync: !snapshotDone, - needsNewSlot: r.needsNewSlot + needsNewSlot: false }; } else { if (snapshotDone) { + // Case 5 // This will create a new slot, while keeping the current sync rules active - throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`); + throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`, undefined); } - // This will clear data and re-create the same slot + // Case 2 + // This will clear data (if any) and re-create the same slot return { needsInitialSync: true, needsNewSlot: true }; } } - /** - * If a replication slot exists, check that it is healthy. - */ - private async checkReplicationSlot(slot: { - // postgres 13+ - wal_status?: string; - // postgres 17+ - invalidation_reason?: string | null; - }): Promise<{ needsNewSlot: boolean }> { - // Start with a placeholder error, should be replaced if there is an actual issue. - let last_error = new ReplicationAssertionError(`Slot health check failed to execute`); - - const slotName = this.slot_name; - - const lost = slot.wal_status == 'lost'; - if (lost) { - this.logger.warn( - `Replication slot ${slotName} is invalidated. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}` - ); - return { - needsNewSlot: true - }; - } - - // Check that replication slot exists, trying for up to 2 minutes. - const startAt = performance.now(); - while (performance.now() - startAt < 120_000) { - this.touch(); - - try { - // We peek a large number of changes here, to make it more likely to pick up replication slot errors. - // For example, "publication does not exist" only occurs here if the peek actually includes changes related - // to the slot. - this.logger.info(`Checking ${slotName}`); - - // The actual results can be quite large, so we don't actually return everything - // due to memory and processing overhead that would create. - const cursor = await this.connections.pool.stream({ - statement: `SELECT 1 FROM pg_catalog.pg_logical_slot_peek_binary_changes($1, NULL, 1000, 'proto_version', '1', 'publication_names', $2)`, - params: [ - { type: 'varchar', value: slotName }, - { type: 'varchar', value: PUBLICATION_NAME } - ] - }); - - for await (let _chunk of cursor) { - // No-op, just exhaust the cursor - } - - // Success - this.logger.info(`Slot ${slotName} appears healthy`); - return { needsNewSlot: false }; - } catch (e) { - last_error = e; - this.logger.warn(`Replication slot error`, e); - - if (this.stopped) { - throw e; - } - - if ( - /incorrect prev-link/.test(e.message) || - /replication slot.*does not exist/.test(e.message) || - /publication.*does not exist/.test(e.message) || - // Postgres 18 - exceeded max_slot_wal_keep_size - /can no longer access replication slot/.test(e.message) || - // Postgres 17 - exceeded max_slot_wal_keep_size - /can no longer get changes from replication slot/.test(e.message) - ) { - // Fatal error. In most cases since Postgres 13+, the `wal_status == 'lost'` check should pick this up, but this - // works as a fallback. - - container.reporter.captureException(e, { - level: errors.ErrorSeverity.WARNING, - metadata: { - replication_slot: slotName - } - }); - // Sample: record with incorrect prev-link 10000/10000 at 0/18AB778 - // Seen during development. Some internal error, fixed by re-creating slot. - // - // Sample: publication "powersync" does not exist - // Happens when publication deleted or never created. - // Slot must be re-created in this case. - this.logger.info(`${slotName} is not valid anymore`); - - return { needsNewSlot: true }; - } - // Try again after a pause - await new Promise((resolve) => setTimeout(resolve, 1000)); - } - } - - container.reporter.captureException(last_error, { - level: errors.ErrorSeverity.ERROR, - metadata: { - replication_slot: slotName - } - }); - - throw last_error; - } - async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise { const results = await db.query({ statement: `SELECT reltuples::bigint AS estimate @@ -915,6 +831,17 @@ WHERE oid = $1::regclass`, } async streamChanges(replicationConnection: pgwire.PgConnection) { + try { + await this.streamChangesInternal(replicationConnection); + } catch (e) { + if (isReplicationSlotInvalidError(e)) { + throw new MissingReplicationSlotError(e.message, e); + } + throw e; + } + } + + private async streamChangesInternal(replicationConnection: pgwire.PgConnection) { // When changing any logic here, check /docs/wal-lsns.md. const { createEmptyCheckpoints } = await this.ensureStorageCompatibility(); @@ -1179,3 +1106,27 @@ WHERE oid = $1::regclass`, }); } } + +function isReplicationSlotInvalidError(e: any) { + // We could access the error code from pgwire using this: + // e[Symbol.for('pg.ErrorCode')] + // However, we typically get a generic code such as 42704 (undefined_object), which does not + // help much. So we check the actual error message. + const message = e.message ?? ''; + + // Sample: record with incorrect prev-link 10000/10000 at 0/18AB778 + // Seen during development. Some internal error, fixed by re-creating slot. + // + // Sample: publication "powersync" does not exist + // Happens when publication deleted or never created. + // Slot must be re-created in this case. + return ( + /incorrect prev-link/.test(message) || + /replication slot.*does not exist/.test(message) || + /publication.*does not exist/.test(message) || + // Postgres 18 - exceeded max_slot_wal_keep_size + /can no longer access replication slot/.test(message) || + // Postgres 17 - exceeded max_slot_wal_keep_size + /can no longer get changes from replication slot/.test(message) + ); +} diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index 99d14638..0eb0bdcf 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -295,7 +295,7 @@ bucket_definitions: `INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id` ); await context.replicateSnapshot(); - await context.startStreaming(); + context.startStreaming(); const data = await context.getBucketData('global[]'); @@ -320,17 +320,25 @@ bucket_definitions: await context.loadActiveSyncRules(); + // Previously, the `replicateSnapshot` call picked up on this error. + // Now, we have removed that check, this only comes up when we start actually streaming. + // We don't get the streaming response directly here, but getCheckpoint() checks for that. + await context.replicateSnapshot(); + context.startStreaming(); + if (serverVersion!.compareMain('18.0.0') >= 0) { - await context.replicateSnapshot(); // No error expected in Postres 18. Replication keeps on working depite the // publication being re-created. + await context.getCheckpoint(); } else { + // await context.getCheckpoint(); // Postgres < 18 invalidates the replication slot when the publication is re-created. - // The error is handled on a higher level, which triggers + // In the service, this error is handled in WalStreamReplicationJob, // creating a new replication slot. await expect(async () => { - await context.replicateSnapshot(); + await context.getCheckpoint(); }).rejects.toThrowError(MissingReplicationSlotError); + context.clearStreamError(); } } }); @@ -352,7 +360,7 @@ bucket_definitions: `INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id` ); await context.replicateSnapshot(); - await context.startStreaming(); + context.startStreaming(); const data = await context.getBucketData('global[]'); @@ -415,7 +423,7 @@ bucket_definitions: `INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id` ); await context.replicateSnapshot(); - await context.startStreaming(); + context.startStreaming(); const data = await context.getBucketData('global[]'); @@ -572,7 +580,7 @@ config: ); await context.replicateSnapshot(); - await context.startStreaming(); + context.startStreaming(); await pool.query(`UPDATE test_data SET description = 'test2' WHERE id = '${test_id}'`); diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 5d4442f2..fd680918 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -55,12 +55,31 @@ export class WalStreamTestContext implements AsyncDisposable { await this.dispose(); } + /** + * Clear any errors from startStream, to allow for a graceful dispose when streaming errors + * were expected. + */ + async clearStreamError() { + if (this.streamPromise != null) { + this.streamPromise = this.streamPromise.catch((e) => {}); + } + } + async dispose() { this.abortController.abort(); - await this.snapshotPromise; - await this.streamPromise; - await this.connectionManager.destroy(); - await this.factory?.[Symbol.asyncDispose](); + try { + await this.snapshotPromise; + await this.streamPromise; + await this.connectionManager.destroy(); + await this.factory?.[Symbol.asyncDispose](); + } catch (e) { + // Throwing here may result in SuppressedError. The underlying errors often don't show up + // in the test output, so we log it here. + // If we could get vitest to log SuppressedError.error and SuppressedError.suppressed, we + // could remove this. + console.error('Error during WalStreamTestContext dispose', e); + throw e; + } } get pool() { From 85bb599ca37a611c07c8ade6cbb83694591f7c3c Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Nov 2025 15:57:19 +0200 Subject: [PATCH 2/4] More comments. --- modules/module-postgres/src/replication/WalStream.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index c43515a1..d4ef2b65 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -319,6 +319,11 @@ export class WalStream { // 4. needsInitialSync: false, lost slot -> MissingReplicationSlotError (starts new sync rules version) // 5. needsInitialSync: false, no slot -> MissingReplicationSlotError (starts new sync rules version) // 6. needsInitialSync: false, valid slot -> resume streaming replication + // The main advantage of MissingReplicationSlotError are: + // 1. If there was a complete snapshot already (cases 4/5), users can still sync from that snapshot while + // we do the reprocessing under a new slot name. + // 2. If there was a partial snapshot (case 1), we can start with the new slot faster by not waiting for + // the partial data to be cleared. if (slot != null) { // This checks that the slot is still valid From 5d0daba1d592e5863554337e9b9f34ca6af42d4e Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Nov 2025 15:58:34 +0200 Subject: [PATCH 3/4] Clean up error handling. --- modules/module-postgres/src/replication/WalStream.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index d4ef2b65..5dca68d9 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -99,7 +99,7 @@ export const sendKeepAlive = async (db: pgwire.PgClient) => { }; export class MissingReplicationSlotError extends Error { - constructor(message: string, cause: any) { + constructor(message: string, cause?: any) { super(message); this.cause = cause; @@ -333,8 +333,7 @@ export class WalStream { if (lost) { // Case 1 / 4 throw new MissingReplicationSlotError( - `Replication slot ${slotName} is not valid anymore. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}`, - undefined + `Replication slot ${slotName} is not valid anymore. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}` ); } // Case 3 / 6 @@ -346,7 +345,7 @@ export class WalStream { if (snapshotDone) { // Case 5 // This will create a new slot, while keeping the current sync rules active - throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`, undefined); + throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`); } // Case 2 // This will clear data (if any) and re-create the same slot From f7b752f1c5699b65c18df061d85bf3ea9f45eba3 Mon Sep 17 00:00:00 2001 From: Ralf Kistner Date: Mon, 10 Nov 2025 16:54:47 +0200 Subject: [PATCH 4/4] Add changeset. --- .changeset/serious-icons-drop.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/serious-icons-drop.md diff --git a/.changeset/serious-icons-drop.md b/.changeset/serious-icons-drop.md new file mode 100644 index 00000000..c301494d --- /dev/null +++ b/.changeset/serious-icons-drop.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-postgres': patch +'@powersync/service-core': patch +'@powersync/service-image': patch +--- + +[Postgres] Remove usage of pg_logical_slot_peek_binary_changes due to performance issues in some cases