Skip to content
6 changes: 6 additions & 0 deletions .changeset/cold-dodos-cheer.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
'@powersync/service-module-postgres': patch
'@powersync/lib-service-postgres': patch
---

Improve replication slot health detection, automatically re-creating "lost" slots.
83 changes: 52 additions & 31 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ import * as lib_postgres from '@powersync/lib-service-postgres';
import {
container,
DatabaseConnectionError,
logger as defaultLogger,
ErrorCode,
errors,
Logger,
logger as defaultLogger,
ReplicationAssertionError,
ReplicationAbortedError
ReplicationAbortedError,
ReplicationAssertionError
} from '@powersync/lib-services-framework';
import {
BucketStorageBatch,
Expand All @@ -33,10 +33,10 @@ import {
toSyncRulesRow
} from '@powersync/service-sync-rules';

import { ReplicationMetric } from '@powersync/service-types';
import { PgManager } from './PgManager.js';
import { getPgOutputRelation, getRelId, referencedColumnTypeIds } from './PgRelation.js';
import { checkSourceConfiguration, checkTableRls, getReplicationIdentityColumns } from './replication-utils.js';
import { ReplicationMetric } from '@powersync/service-types';
import {
ChunkedSnapshotQuery,
IdSnapshotQuery,
Expand Down Expand Up @@ -295,15 +295,18 @@ export class WalStream {
}

// Check if replication slot exists
const rs = await this.connections.pool.query({
statement: 'SELECT 1 FROM pg_replication_slots WHERE slot_name = $1',
params: [{ type: 'varchar', value: slotName }]
});
const slotExists = rs.rows.length > 0;
const slot = pgwire.pgwireRows(
await this.connections.pool.query({
// We specifically want wal_status and invalidation_reason, but it's not available on older versions,
// so we just query *.
statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1',
params: [{ type: 'varchar', value: slotName }]
})
)[0];

if (slotExists) {
if (slot != null) {
// This checks that the slot is still valid
const r = await this.checkReplicationSlot();
const r = await this.checkReplicationSlot(slot as any);
if (snapshotDone && r.needsNewSlot) {
// We keep the current snapshot, and create a new replication slot
throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`);
Expand All @@ -330,24 +333,32 @@ export class WalStream {
/**
* If a replication slot exists, check that it is healthy.
*/
private async checkReplicationSlot(): Promise<{ needsNewSlot: boolean }> {
let last_error = null;
private async checkReplicationSlot(slot: {
// postgres 13+
wal_status?: string;
// postgres 17+
invalidation_reason?: string | null;
}): Promise<{ needsNewSlot: boolean }> {
// Start with a placeholder error, should be replaced if there is an actual issue.
let last_error = new ReplicationAssertionError(`Slot health check failed to execute`);

const slotName = this.slot_name;

// Check that replication slot exists
for (let i = 120; i >= 0; i--) {
this.touch();
const lost = slot.wal_status == 'lost';
if (lost) {
this.logger.warn(
`Replication slot ${slotName} is invalidated. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}`
);
return {
needsNewSlot: true
};
}

if (i == 0) {
container.reporter.captureException(last_error, {
level: errors.ErrorSeverity.ERROR,
metadata: {
replication_slot: slotName
}
});
// Check that replication slot exists, trying for up to 2 minutes.
const startAt = performance.now();
while (performance.now() - startAt < 120_000) {
this.touch();

throw last_error;
}
try {
// We peek a large number of changes here, to make it more likely to pick up replication slot errors.
// For example, "publication does not exist" only occurs here if the peek actually includes changes related
Expand Down Expand Up @@ -379,18 +390,21 @@ export class WalStream {
throw e;
}

// Could also be `publication "powersync" does not exist`, although this error may show up much later
// in some cases.

if (
/incorrect prev-link/.test(e.message) ||
/replication slot.*does not exist/.test(e.message) ||
/publication.*does not exist/.test(e.message)
/publication.*does not exist/.test(e.message) ||
// Postgres 18 - exceeded max_slot_wal_keep_size
/can no longer access replication slot/.test(e.message) ||
// Postgres 17 - exceeded max_slot_wal_keep_size
/can no longer get changes from replication slot/.test(e.message)
) {
// Fatal error. In most cases since Postgres 13+, the `wal_status == 'lost'` check should pick this up, but this
// works as a fallback.

container.reporter.captureException(e, {
level: errors.ErrorSeverity.WARNING,
metadata: {
try_index: i,
replication_slot: slotName
}
});
Expand All @@ -409,7 +423,14 @@ export class WalStream {
}
}

throw new ReplicationAssertionError('Unreachable');
container.reporter.captureException(last_error, {
level: errors.ErrorSeverity.ERROR,
metadata: {
replication_slot: slotName
}
});

throw last_error;
}

async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise<number> {
Expand Down
92 changes: 88 additions & 4 deletions modules/module-postgres/test/src/wal_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ import { METRICS_HELPER, putOp, removeOp } from '@powersync/service-core-tests';
import { pgwireRows } from '@powersync/service-jpgwire';
import { ReplicationMetric } from '@powersync/service-types';
import * as crypto from 'crypto';
import { describe, expect, test } from 'vitest';
import { afterAll, beforeAll, describe, expect, test } from 'vitest';
import { describeWithStorage } from './util.js';
import { WalStreamTestContext } from './wal_stream_utils.js';
import { WalStreamTestContext, withMaxWalSize } from './wal_stream_utils.js';
import { JSONBig } from '@powersync/service-jsonbig';

const BASIC_SYNC_RULES = `
bucket_definitions:
Expand Down Expand Up @@ -321,8 +322,8 @@ bucket_definitions:

if (serverVersion!.compareMain('18.0.0') >= 0) {
await context.replicateSnapshot();
// No error expected in Postres 18
// TODO: introduce new test scenario for Postgres 18 that _does_ invalidate the replication slot.
// No error expected in Postres 18. Replication keeps on working depite the
// publication being re-created.
} else {
// Postgres < 18 invalidates the replication slot when the publication is re-created.
// The error is handled on a higher level, which triggers
Expand Down Expand Up @@ -386,6 +387,89 @@ bucket_definitions:
}
});

test('replication slot lost', async () => {
await using baseContext = await WalStreamTestContext.open(factory, { doNotClear: true });

const serverVersion = await baseContext.connectionManager.getServerVersion();
if (serverVersion!.compareMain('13.0.0') < 0) {
console.warn(`max_slot_wal_keep_size not supported on postgres ${serverVersion} - skipping test.`);
return;
}

// Configure max_slot_wal_keep_size for the test, reverting afterwards.
await using s = await withMaxWalSize(baseContext.pool, '100MB');

{
await using context = await WalStreamTestContext.open(factory);
const { pool } = context;
await context.updateSyncRules(`
bucket_definitions:
global:
data:
- SELECT id, description FROM "test_data"`);

await pool.query(
`CREATE TABLE test_data(id uuid primary key default uuid_generate_v4(), description text, num int8)`
);
await pool.query(
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
);
await context.replicateSnapshot();
await context.startStreaming();

const data = await context.getBucketData('global[]');

expect(data).toMatchObject([
putOp('test_data', {
id: '8133cd37-903b-4937-a022-7c8294015a3a',
description: 'test1'
})
]);

expect(await context.storage!.getStatus()).toMatchObject({ active: true, snapshot_done: true });
}

{
await using context = await WalStreamTestContext.open(factory, { doNotClear: true });
const { pool } = context;
const storage = await context.factory.getActiveStorage();
const slotName = storage?.slot_name!;

// Here, we write data to the WAL until the replication slot is lost.
const TRIES = 100;
for (let i = 0; i < TRIES; i++) {
// Write something to the WAL.
await pool.query(`select pg_logical_emit_message(true, 'test', 'x')`);
// Switch WAL file. With default settings, each WAL file is around 16MB.
await pool.query(`select pg_switch_wal()`);
// Checkpoint command forces the old WAL files to be archived/removed.
await pool.query(`checkpoint`);
// Now check if the slot is still active.
const slot = pgwireRows(
await context.pool.query({
statement: `select slot_name, wal_status, safe_wal_size, pg_wal_lsn_diff(pg_current_wal_lsn(), restart_lsn) as lag from pg_replication_slots where slot_name = $1`,
params: [{ type: 'varchar', value: slotName }]
})
)[0];
if (slot.wal_status == 'lost') {
break;
} else if (i == TRIES - 1) {
throw new Error(
`Could not generate test conditions to expire replication slot. Current status: ${JSONBig.stringify(slot)}`
);
}
}

await context.loadActiveSyncRules();

// The error is handled on a higher level, which triggers
// creating a new replication slot.
await expect(async () => {
await context.replicateSnapshot();
}).rejects.toThrowError(MissingReplicationSlotError);
}
});

test('old date format', async () => {
await using context = await WalStreamTestContext.open(factory);
await context.updateSyncRules(BASIC_SYNC_RULES);
Expand Down
22 changes: 22 additions & 0 deletions modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,25 @@ export class WalStreamTestContext implements AsyncDisposable {
return batches[0]?.chunkData.data ?? [];
}
}

export async function withMaxWalSize(db: pgwire.PgClient, size: string) {
try {
const r1 = await db.query(`SHOW max_slot_wal_keep_size`);

await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '100MB'`);
await db.query(`SELECT pg_reload_conf()`);

const oldSize = r1.results[0].rows[0][0];

return {
[Symbol.asyncDispose]: async () => {
await db.query(`ALTER SYSTEM SET max_slot_wal_keep_size = '${oldSize}'`);
await db.query(`SELECT pg_reload_conf()`);
}
};
} catch (e) {
const err = new Error(`Failed to configure max_slot_wal_keep_size for test`);
err.cause = e;
throw err;
}
}