diff --git a/.changeset/cold-dodos-cheer.md b/.changeset/cold-dodos-cheer.md new file mode 100644 index 00000000..f3cbe0cb --- /dev/null +++ b/.changeset/cold-dodos-cheer.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-module-postgres': patch +'@powersync/lib-service-postgres': patch +--- + +Improve replication slot health detection, automatically re-creating "lost" slots. diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 227f00a6..ce65603d 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -2,12 +2,12 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import { container, DatabaseConnectionError, + logger as defaultLogger, ErrorCode, errors, Logger, - logger as defaultLogger, - ReplicationAssertionError, - ReplicationAbortedError + ReplicationAbortedError, + ReplicationAssertionError } from '@powersync/lib-services-framework'; import { BucketStorageBatch, @@ -33,10 +33,10 @@ import { toSyncRulesRow } from '@powersync/service-sync-rules'; +import { ReplicationMetric } from '@powersync/service-types'; import { PgManager } from './PgManager.js'; import { getPgOutputRelation, getRelId, referencedColumnTypeIds } from './PgRelation.js'; import { checkSourceConfiguration, checkTableRls, getReplicationIdentityColumns } from './replication-utils.js'; -import { ReplicationMetric } from '@powersync/service-types'; import { ChunkedSnapshotQuery, IdSnapshotQuery, @@ -295,15 +295,18 @@ export class WalStream { } // Check if replication slot exists - const rs = await this.connections.pool.query({ - statement: 'SELECT 1 FROM pg_replication_slots WHERE slot_name = $1', - params: [{ type: 'varchar', value: slotName }] - }); - const slotExists = rs.rows.length > 0; + const slot = pgwire.pgwireRows( + await this.connections.pool.query({ + // We specifically want wal_status and invalidation_reason, but it's not available on older versions, + // so we just query *. + statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1', + params: [{ type: 'varchar', value: slotName }] + }) + )[0]; - if (slotExists) { + if (slot != null) { // This checks that the slot is still valid - const r = await this.checkReplicationSlot(); + const r = await this.checkReplicationSlot(slot as any); if (snapshotDone && r.needsNewSlot) { // We keep the current snapshot, and create a new replication slot throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`); @@ -330,24 +333,32 @@ export class WalStream { /** * If a replication slot exists, check that it is healthy. */ - private async checkReplicationSlot(): Promise<{ needsNewSlot: boolean }> { - let last_error = null; + private async checkReplicationSlot(slot: { + // postgres 13+ + wal_status?: string; + // postgres 17+ + invalidation_reason?: string | null; + }): Promise<{ needsNewSlot: boolean }> { + // Start with a placeholder error, should be replaced if there is an actual issue. + let last_error = new ReplicationAssertionError(`Slot health check failed to execute`); + const slotName = this.slot_name; - // Check that replication slot exists - for (let i = 120; i >= 0; i--) { - this.touch(); + const lost = slot.wal_status == 'lost'; + if (lost) { + this.logger.warn( + `Replication slot ${slotName} is invalidated. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}` + ); + return { + needsNewSlot: true + }; + } - if (i == 0) { - container.reporter.captureException(last_error, { - level: errors.ErrorSeverity.ERROR, - metadata: { - replication_slot: slotName - } - }); + // Check that replication slot exists, trying for up to 2 minutes. + const startAt = performance.now(); + while (performance.now() - startAt < 120_000) { + this.touch(); - throw last_error; - } try { // We peek a large number of changes here, to make it more likely to pick up replication slot errors. // For example, "publication does not exist" only occurs here if the peek actually includes changes related @@ -379,18 +390,21 @@ export class WalStream { throw e; } - // Could also be `publication "powersync" does not exist`, although this error may show up much later - // in some cases. - if ( /incorrect prev-link/.test(e.message) || /replication slot.*does not exist/.test(e.message) || - /publication.*does not exist/.test(e.message) + /publication.*does not exist/.test(e.message) || + // Postgres 18 - exceeded max_slot_wal_keep_size + /can no longer access replication slot/.test(e.message) || + // Postgres 17 - exceeded max_slot_wal_keep_size + /can no longer get changes from replication slot/.test(e.message) ) { + // Fatal error. In most cases since Postgres 13+, the `wal_status == 'lost'` check should pick this up, but this + // works as a fallback. + container.reporter.captureException(e, { level: errors.ErrorSeverity.WARNING, metadata: { - try_index: i, replication_slot: slotName } }); @@ -409,7 +423,14 @@ export class WalStream { } } - throw new ReplicationAssertionError('Unreachable'); + container.reporter.captureException(last_error, { + level: errors.ErrorSeverity.ERROR, + metadata: { + replication_slot: slotName + } + }); + + throw last_error; } async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise { diff --git a/modules/module-postgres/test/src/wal_stream.test.ts b/modules/module-postgres/test/src/wal_stream.test.ts index f5cb622d..99d14638 100644 --- a/modules/module-postgres/test/src/wal_stream.test.ts +++ b/modules/module-postgres/test/src/wal_stream.test.ts @@ -4,9 +4,10 @@ import { METRICS_HELPER, putOp, removeOp } from '@powersync/service-core-tests'; import { pgwireRows } from '@powersync/service-jpgwire'; import { ReplicationMetric } from '@powersync/service-types'; import * as crypto from 'crypto'; -import { describe, expect, test } from 'vitest'; +import { afterAll, beforeAll, describe, expect, test } from 'vitest'; import { describeWithStorage } from './util.js'; -import { WalStreamTestContext } from './wal_stream_utils.js'; +import { WalStreamTestContext, withMaxWalSize } from './wal_stream_utils.js'; +import { JSONBig } from '@powersync/service-jsonbig'; const BASIC_SYNC_RULES = ` bucket_definitions: @@ -321,8 +322,8 @@ bucket_definitions: if (serverVersion!.compareMain('18.0.0') >= 0) { await context.replicateSnapshot(); - // No error expected in Postres 18 - // TODO: introduce new test scenario for Postgres 18 that _does_ invalidate the replication slot. + // No error expected in Postres 18. Replication keeps on working depite the + // publication being re-created. } else { // Postgres < 18 invalidates the replication slot when the publication is re-created. // The error is handled on a higher level, which triggers @@ -386,6 +387,89 @@ bucket_definitions: } }); + test('replication slot lost', async () => { + await using baseContext = await WalStreamTestContext.open(factory, { doNotClear: true }); + + const serverVersion = await baseContext.connectionManager.getServerVersion(); + if (serverVersion!.compareMain('13.0.0') < 0) { + console.warn(`max_slot_wal_keep_size not supported on postgres ${serverVersion} - skipping test.`); + return; + } + + // Configure max_slot_wal_keep_size for the test, reverting afterwards. + await using s = await withMaxWalSize(baseContext.pool, '100MB'); + + { + await using context = await WalStreamTestContext.open(factory); + const { pool } = context; + await context.updateSyncRules(` +bucket_definitions: + global: + data: + - SELECT id, description FROM "test_data"`); + + await pool.query( + `CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)` + ); + await pool.query( + `INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id` + ); + await context.replicateSnapshot(); + await context.startStreaming(); + + const data = await context.getBucketData('global[]'); + + expect(data).toMatchObject([ + putOp('test_data', { + id: '8133cd37-903b-4937-a022-7c8294015a3a', + description: 'test1' + }) + ]); + + expect(await context.storage!.getStatus()).toMatchObject({ active: true, snapshot_done: true }); + } + + { + await using context = await WalStreamTestContext.open(factory, { doNotClear: true }); + const { pool } = context; + const storage = await context.factory.getActiveStorage(); + const slotName = storage?.slot_name!; + + // Here, we write data to the WAL until the replication slot is lost. + const TRIES = 100; + for (let i = 0; i < TRIES; i++) { + // Write something to the WAL. + await pool.query(`select pg_logical_emit_message(true, 'test', 'x')`); + // Switch WAL file. With default settings, each WAL file is around 16MB. + await pool.query(`select pg_switch_wal()`); + // Checkpoint command forces the old WAL files to be archived/removed. + await pool.query(`checkpoint`); + // Now check if the slot is still active. + const slot = pgwireRows( + await context.pool.query({ + statement: `select slot_name, wal_status, safe_wal_size, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as lag from pg_replication_slots where slot_name = $1`, + params: [{ type: 'varchar', value: slotName }] + }) + )[0]; + if (slot.wal_status == 'lost') { + break; + } else if (i == TRIES - 1) { + throw new Error( + `Could not generate test conditions to expire replication slot. Current status: ${JSONBig.stringify(slot)}` + ); + } + } + + await context.loadActiveSyncRules(); + + // The error is handled on a higher level, which triggers + // creating a new replication slot. + await expect(async () => { + await context.replicateSnapshot(); + }).rejects.toThrowError(MissingReplicationSlotError); + } + }); + test('old date format', async () => { await using context = await WalStreamTestContext.open(factory); await context.updateSyncRules(BASIC_SYNC_RULES); diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index 40ea5aab..5d4442f2 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -203,3 +203,25 @@ export class WalStreamTestContext implements AsyncDisposable { return batches[0]?.chunkData.data ?? []; } } + +export async function withMaxWalSize(db: pgwire.PgClient, size: string) { + try { + const r1 = await db.query(`SHOW max_slot_wal_keep_size`); + + await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '100MB'`); + await db.query(`SELECT pg_reload_conf()`); + + const oldSize = r1.results[0].rows[0][0]; + + return { + [Symbol.asyncDispose]: async () => { + await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '${oldSize}'`); + await db.query(`SELECT pg_reload_conf()`); + } + }; + } catch (e) { + const err = new Error(`Failed to configure max_slot_wal_keep_size for test`); + err.cause = e; + throw err; + } +}