Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .changeset/serious-icons-drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
'@powersync/service-module-postgres': patch
'@powersync/service-core': patch
'@powersync/service-image': patch
---

[Postgres] Remove usage of pg_logical_slot_peek_binary_changes due to performance issues in some cases
187 changes: 71 additions & 116 deletions modules/module-postgres/src/replication/WalStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import {
DatabaseConnectionError,
logger as defaultLogger,
ErrorCode,
errors,
Logger,
ReplicationAbortedError,
ReplicationAssertionError
Expand Down Expand Up @@ -100,8 +99,10 @@ export const sendKeepAlive = async (db: pgwire.PgClient) => {
};

export class MissingReplicationSlotError extends Error {
constructor(message: string) {
constructor(message: string, cause?: any) {
super(message);

this.cause = cause;
}
}

Expand Down Expand Up @@ -304,135 +305,54 @@ export class WalStream {
})
)[0];

// Previously we also used pg_catalog.pg_logical_slot_peek_binary_changes to confirm that we can query the slot.
// However, there were some edge cases where the query times out, repeating the query, ultimately
// causing high load on the source database and never recovering automatically.
// We now instead jump straight to replication if the wal_status is not "lost", rather detecting those
// errors during streaming replication, which is a little more robust.

// We can have:
// 1. needsInitialSync: true, lost slot -> MissingReplicationSlotError (starts new sync rules version).
// Theoretically we could handle this the same as (2).
// 2. needsInitialSync: true, no slot -> create new slot
// 3. needsInitialSync: true, valid slot -> resume initial sync
// 4. needsInitialSync: false, lost slot -> MissingReplicationSlotError (starts new sync rules version)
// 5. needsInitialSync: false, no slot -> MissingReplicationSlotError (starts new sync rules version)
// 6. needsInitialSync: false, valid slot -> resume streaming replication
// The main advantage of MissingReplicationSlotError are:
// 1. If there was a complete snapshot already (cases 4/5), users can still sync from that snapshot while
// we do the reprocessing under a new slot name.
// 2. If there was a partial snapshot (case 1), we can start with the new slot faster by not waiting for
// the partial data to be cleared.
if (slot != null) {
// This checks that the slot is still valid
const r = await this.checkReplicationSlot(slot as any);
if (snapshotDone && r.needsNewSlot) {
// We keep the current snapshot, and create a new replication slot
throw new MissingReplicationSlotError(`Replication slot ${slotName} is not valid anymore`);

// wal_status is present in postgres 13+
// invalidation_reason is present in postgres 17+
const lost = slot.wal_status == 'lost';
if (lost) {
// Case 1 / 4
throw new MissingReplicationSlotError(
`Replication slot ${slotName} is not valid anymore. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}`
);
}
// We can have:
// needsInitialSync: true, needsNewSlot: true -> initial sync from scratch
// needsInitialSync: true, needsNewSlot: false -> resume initial sync
// needsInitialSync: false, needsNewSlot: true -> handled above
// needsInitialSync: false, needsNewSlot: false -> resume streaming replication
// Case 3 / 6
return {
needsInitialSync: !snapshotDone,
needsNewSlot: r.needsNewSlot
needsNewSlot: false
};
} else {
if (snapshotDone) {
// Case 5
// This will create a new slot, while keeping the current sync rules active
throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`);
}
// This will clear data and re-create the same slot
// Case 2
// This will clear data (if any) and re-create the same slot
return { needsInitialSync: true, needsNewSlot: true };
}
}

/**
* If a replication slot exists, check that it is healthy.
*/
private async checkReplicationSlot(slot: {
// postgres 13+
wal_status?: string;
// postgres 17+
invalidation_reason?: string | null;
}): Promise<{ needsNewSlot: boolean }> {
// Start with a placeholder error, should be replaced if there is an actual issue.
let last_error = new ReplicationAssertionError(`Slot health check failed to execute`);

const slotName = this.slot_name;

const lost = slot.wal_status == 'lost';
if (lost) {
this.logger.warn(
`Replication slot ${slotName} is invalidated. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}`
);
return {
needsNewSlot: true
};
}

// Check that replication slot exists, trying for up to 2 minutes.
const startAt = performance.now();
while (performance.now() - startAt < 120_000) {
this.touch();

try {
// We peek a large number of changes here, to make it more likely to pick up replication slot errors.
// For example, "publication does not exist" only occurs here if the peek actually includes changes related
// to the slot.
this.logger.info(`Checking ${slotName}`);

// The actual results can be quite large, so we don't actually return everything
// due to memory and processing overhead that would create.
const cursor = await this.connections.pool.stream({
statement: `SELECT 1 FROM pg_catalog.pg_logical_slot_peek_binary_changes($1, NULL, 1000, 'proto_version', '1', 'publication_names', $2)`,
params: [
{ type: 'varchar', value: slotName },
{ type: 'varchar', value: PUBLICATION_NAME }
]
});

for await (let _chunk of cursor) {
// No-op, just exhaust the cursor
}

// Success
this.logger.info(`Slot ${slotName} appears healthy`);
return { needsNewSlot: false };
} catch (e) {
last_error = e;
this.logger.warn(`Replication slot error`, e);

if (this.stopped) {
throw e;
}

if (
/incorrect prev-link/.test(e.message) ||
/replication slot.*does not exist/.test(e.message) ||
/publication.*does not exist/.test(e.message) ||
// Postgres 18 - exceeded max_slot_wal_keep_size
/can no longer access replication slot/.test(e.message) ||
// Postgres 17 - exceeded max_slot_wal_keep_size
/can no longer get changes from replication slot/.test(e.message)
) {
// Fatal error. In most cases since Postgres 13+, the `wal_status == 'lost'` check should pick this up, but this
// works as a fallback.

container.reporter.captureException(e, {
level: errors.ErrorSeverity.WARNING,
metadata: {
replication_slot: slotName
}
});
// Sample: record with incorrect prev-link 10000/10000 at 0/18AB778
// Seen during development. Some internal error, fixed by re-creating slot.
//
// Sample: publication "powersync" does not exist
// Happens when publication deleted or never created.
// Slot must be re-created in this case.
this.logger.info(`${slotName} is not valid anymore`);

return { needsNewSlot: true };
}
// Try again after a pause
await new Promise((resolve) => setTimeout(resolve, 1000));
}
}

container.reporter.captureException(last_error, {
level: errors.ErrorSeverity.ERROR,
metadata: {
replication_slot: slotName
}
});

throw last_error;
}

async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise<number> {
const results = await db.query({
statement: `SELECT reltuples::bigint AS estimate
Expand Down Expand Up @@ -915,6 +835,17 @@ WHERE oid = $1::regclass`,
}

async streamChanges(replicationConnection: pgwire.PgConnection) {
try {
await this.streamChangesInternal(replicationConnection);
} catch (e) {
if (isReplicationSlotInvalidError(e)) {
throw new MissingReplicationSlotError(e.message, e);
}
throw e;
}
}

private async streamChangesInternal(replicationConnection: pgwire.PgConnection) {
// When changing any logic here, check /docs/wal-lsns.md.
const { createEmptyCheckpoints } = await this.ensureStorageCompatibility();

Expand Down Expand Up @@ -1179,3 +1110,27 @@ WHERE oid = $1::regclass`,
});
}
}

function isReplicationSlotInvalidError(e: any) {
// We could access the error code from pgwire using this:
// e[Symbol.for('pg.ErrorCode')]
// However, we typically get a generic code such as 42704 (undefined_object), which does not
// help much. So we check the actual error message.
const message = e.message ?? '';

// Sample: record with incorrect prev-link 10000/10000 at 0/18AB778
// Seen during development. Some internal error, fixed by re-creating slot.
//
// Sample: publication "powersync" does not exist
// Happens when publication deleted or never created.
// Slot must be re-created in this case.
return (
/incorrect prev-link/.test(message) ||
/replication slot.*does not exist/.test(message) ||
/publication.*does not exist/.test(message) ||
// Postgres 18 - exceeded max_slot_wal_keep_size
/can no longer access replication slot/.test(message) ||
// Postgres 17 - exceeded max_slot_wal_keep_size
/can no longer get changes from replication slot/.test(message)
);
}
22 changes: 15 additions & 7 deletions modules/module-postgres/test/src/wal_stream.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ bucket_definitions:
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
);
await context.replicateSnapshot();
await context.startStreaming();
context.startStreaming();

const data = await context.getBucketData('global[]');

Expand All @@ -320,17 +320,25 @@ bucket_definitions:

await context.loadActiveSyncRules();

// Previously, the `replicateSnapshot` call picked up on this error.
// Now, we have removed that check, this only comes up when we start actually streaming.
// We don't get the streaming response directly here, but getCheckpoint() checks for that.
await context.replicateSnapshot();
context.startStreaming();

if (serverVersion!.compareMain('18.0.0') >= 0) {
await context.replicateSnapshot();
// No error expected in Postres 18. Replication keeps on working depite the
// publication being re-created.
await context.getCheckpoint();
} else {
// await context.getCheckpoint();
// Postgres < 18 invalidates the replication slot when the publication is re-created.
// The error is handled on a higher level, which triggers
// In the service, this error is handled in WalStreamReplicationJob,
// creating a new replication slot.
await expect(async () => {
await context.replicateSnapshot();
await context.getCheckpoint();
}).rejects.toThrowError(MissingReplicationSlotError);
context.clearStreamError();
}
}
});
Expand All @@ -352,7 +360,7 @@ bucket_definitions:
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
);
await context.replicateSnapshot();
await context.startStreaming();
context.startStreaming();

const data = await context.getBucketData('global[]');

Expand Down Expand Up @@ -415,7 +423,7 @@ bucket_definitions:
`INSERT INTO test_data(id, description) VALUES('8133cd37-903b-4937-a022-7c8294015a3a', 'test1') returning id as test_id`
);
await context.replicateSnapshot();
await context.startStreaming();
context.startStreaming();

const data = await context.getBucketData('global[]');

Expand Down Expand Up @@ -572,7 +580,7 @@ config:
);

await context.replicateSnapshot();
await context.startStreaming();
context.startStreaming();

await pool.query(`UPDATE test_data SET description = 'test2' WHERE id = '${test_id}'`);

Expand Down
27 changes: 23 additions & 4 deletions modules/module-postgres/test/src/wal_stream_utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,31 @@ export class WalStreamTestContext implements AsyncDisposable {
await this.dispose();
}

/**
* Clear any errors from startStream, to allow for a graceful dispose when streaming errors
* were expected.
*/
async clearStreamError() {
if (this.streamPromise != null) {
this.streamPromise = this.streamPromise.catch((e) => {});
}
}

async dispose() {
this.abortController.abort();
await this.snapshotPromise;
await this.streamPromise;
await this.connectionManager.destroy();
await this.factory?.[Symbol.asyncDispose]();
try {
await this.snapshotPromise;
await this.streamPromise;
await this.connectionManager.destroy();
await this.factory?.[Symbol.asyncDispose]();
} catch (e) {
// Throwing here may result in SuppressedError. The underlying errors often don't show up
// in the test output, so we log it here.
// If we could get vitest to log SuppressedError.error and SuppressedError.suppressed, we
// could remove this.
console.error('Error during WalStreamTestContext dispose', e);
throw e;
}
}

get pool() {
Expand Down