From b1344e742375a47c73e678afd8b002a2f9655d4a Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Mon, 20 Jan 2025 15:52:20 +0200 Subject: [PATCH 01/20] cleanup persisted queries --- .../storage/batch/PostgresPersistedBatch.ts | 120 ++++++------------ 1 file changed, 39 insertions(+), 81 deletions(-) diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts index 8dcb7eb0d..759a2b909 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts @@ -256,33 +256,6 @@ export class PostgresPersistedBatch { protected async flushBucketData(db: lib_postgres.WrappedConnection) { if (this.bucketDataInserts.length > 0) { await db.sql` - WITH - parsed_data AS ( - SELECT - group_id, - bucket_name, - source_table, - decode(source_key, 'hex') AS source_key, -- Decode hex to bytea - table_name, - op, - row_id, - checksum, - data, - target_op - FROM - jsonb_to_recordset(${{ type: 'jsonb', value: this.bucketDataInserts }}::jsonb) AS t ( - group_id integer, - bucket_name text, - source_table text, - source_key text, -- Input as hex string - table_name text, - op text, - row_id text, - checksum bigint, - data text, - target_op bigint - ) - ) INSERT INTO bucket_data ( group_id, @@ -303,14 +276,25 @@ export class PostgresPersistedBatch { nextval('op_id_sequence'), op, source_table, - source_key, -- Already decoded + decode(source_key, 'hex') AS source_key, table_name, row_id, checksum, data, target_op FROM - parsed_data; + json_to_recordset(${{ type: 'json', value: this.bucketDataInserts }}::json) AS t ( + group_id integer, + bucket_name text, + source_table text, + source_key text, -- Input as hex string + table_name text, + op text, + row_id text, + checksum bigint, + data text, + target_op bigint + ); `.execute(); } } @@ -318,23 +302,6 @@ export class PostgresPersistedBatch { protected async flushParameterData(db: lib_postgres.WrappedConnection) { if (this.parameterDataInserts.length > 0) { await db.sql` - WITH - parsed_data AS ( - SELECT - group_id, - source_table, - decode(source_key, 'hex') AS source_key, -- Decode hex to bytea - decode(lookup, 'hex') AS lookup, -- Decode hex to bytea - bucket_parameters - FROM - jsonb_to_recordset(${{ type: 'jsonb', value: this.parameterDataInserts }}::jsonb) AS t ( - group_id integer, - source_table text, - source_key text, -- Input as hex string - lookup text, -- Input as hex string - bucket_parameters text -- Input as stringified JSON - ) - ) INSERT INTO bucket_parameters ( group_id, @@ -346,11 +313,17 @@ export class PostgresPersistedBatch { SELECT group_id, source_table, - source_key, -- Already decoded - lookup, -- Already decoded + decode(source_key, 'hex') AS source_key, -- Decode hex to bytea + decode(lookup, 'hex') AS lookup, -- Decode hex to bytea bucket_parameters FROM - parsed_data; + json_to_recordset(${{ type: 'json', value: this.parameterDataInserts }}::json) AS t ( + group_id integer, + source_table text, + source_key text, -- Input as hex string + lookup text, -- Input as hex string + bucket_parameters text -- Input as stringified JSON + ) `.execute(); } } @@ -358,33 +331,6 @@ export class PostgresPersistedBatch { protected async flushCurrentData(db: lib_postgres.WrappedConnection) { if (this.currentDataInserts.size > 0) { await db.sql` - WITH - parsed_data AS ( - SELECT - group_id, - source_table, - decode(source_key, 'hex') AS source_key, -- Decode hex to bytea - buckets::jsonb AS buckets, - decode(data, 'hex') AS data, -- Decode hex to bytea - ARRAY( - SELECT - decode((value ->> 0)::TEXT, 'hex') - FROM - jsonb_array_elements(lookups::jsonb) AS value - ) AS lookups -- Decode array of hex strings to bytea[] - FROM - jsonb_to_recordset(${{ - type: 'jsonb', - value: Array.from(this.currentDataInserts.values()) - }}::jsonb) AS t ( - group_id integer, - source_table text, - source_key text, -- Input as hex string - buckets text, - data text, -- Input as hex string - lookups text -- Input as stringified JSONB array of hex strings - ) - ) INSERT INTO current_data ( group_id, @@ -397,12 +343,24 @@ export class PostgresPersistedBatch { SELECT group_id, source_table, - source_key, -- Already decoded - buckets, - data, -- Already decoded - lookups -- Already decoded + decode(source_key, 'hex') AS source_key, -- Decode hex to bytea + buckets::jsonb AS buckets, + decode(data, 'hex') AS data, -- Decode hex to bytea + array( + SELECT + decode(element, 'hex') + FROM + unnest(lookups) AS element + ) AS lookups FROM - parsed_data + json_to_recordset(${{ type: 'json', value: Array.from(this.currentDataInserts.values()) }}::json) AS t ( + group_id integer, + source_table text, + source_key text, -- Input as hex string + buckets text, + data text, -- Input as hex string + lookups TEXT[] -- Input as stringified JSONB array of hex strings + ) ON CONFLICT (group_id, source_table, source_key) DO UPDATE SET buckets = EXCLUDED.buckets, From 7cea06baaee5420ae53deaf21c6e9e46ce84f86f Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 22 Jan 2025 09:36:29 +0200 Subject: [PATCH 02/20] minor cleanup --- .../src/storage/batch/PostgresBucketBatch.ts | 30 ++++++++++--------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index d7596b2bc..e483529e4 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -479,7 +479,7 @@ export class PostgresBucketBatch jsonb_array_elements(${{ type: 'jsonb', value: sizeLookups }}::jsonb) AS FILTER ) SELECT - pg_column_size(c.data) AS data_size, + octet_length(c.data) AS data_size, c.source_table, c.source_key FROM @@ -520,23 +520,20 @@ export class PostgresBucketBatch const current_data_lookup = new Map(); for await (const currentDataRows of db.streamRows({ statement: /* sql */ ` - WITH - filter_data AS ( - SELECT - decode(FILTER ->> 'source_key', 'hex') AS source_key, -- Decoding from hex to bytea - (FILTER ->> 'source_table') AS source_table_id - FROM - jsonb_array_elements($1::jsonb) AS FILTER - ) SELECT - --- With skipExistingRows, we only need to know whether or not the row exists. ${this.options.skip_existing_rows ? `c.source_table, c.source_key` : 'c.*'} FROM current_data c - JOIN filter_data f ON c.source_table = f.source_table_id + JOIN ( + SELECT + decode(FILTER ->> 'source_key', 'hex') AS source_key, + FILTER ->> 'source_table' AS source_table_id + FROM + jsonb_array_elements($1::jsonb) AS FILTER + ) f ON c.source_table = f.source_table_id AND c.source_key = f.source_key WHERE - c.group_id = $2 + c.group_id = $2; `, params: [ { @@ -544,7 +541,7 @@ export class PostgresBucketBatch value: lookups }, { - type: 'int8', + type: 'int4', value: this.group_id } ] @@ -601,7 +598,12 @@ export class PostgresBucketBatch await persistedBatch.flush(db); } } - return resumeBatch; + + // Don't return empty batches + if (resumeBatch?.batch.length) { + return resumeBatch; + } + return null; } protected async saveOperation( From 5aa55e9f8f23084ad8948efe233a5b2fd369f7d6 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 28 Jan 2025 13:48:58 +0200 Subject: [PATCH 03/20] Add support for same postgres cluster for replication and sync storage --- .../src/storage/MongoBucketStorage.ts | 17 +++ .../storage/PostgresBucketStorageFactory.ts | 18 ++++ modules/module-postgres/package.json | 14 +-- .../src/replication/PgManager.ts | 10 ++ .../src/replication/WalStream.ts | 102 ++++++++++++++++-- .../replication/WalStreamReplicationJob.ts | 4 +- .../service-core/src/storage/BucketStorage.ts | 19 ++++ pnpm-lock.yaml | 7 ++ 8 files changed, 174 insertions(+), 17 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 87f577eca..335192794 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -84,6 +84,23 @@ export class MongoBucketStorage return storage; } + async getSystemIdentifier(): Promise { + let id = 'N/A'; + try { + const result = await this.client + .db('local') + .collection('system.replset') + .findOne({}, { projection: { _id: 1 } }); + id = result?._id.toString() ?? id; + } catch (ex) { + logger.warn(`Could not query MongoDB replica set id`, ex); + } + return { + id, + type: lib_mongo.MONGO_CONNECTION_TYPE + }; + } + async configureSyncRules(sync_rules: string, options?: { lock?: boolean }) { const next = await this.getNextSyncRulesContent(); const active = await this.getActiveSyncRulesContent(); diff --git a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts index e2e6c4540..a470ecdde 100644 --- a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts +++ b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts @@ -157,6 +157,24 @@ export class PostgresBucketStorageFactory return newInstanceRow!.id; } + async getSystemIdentifier(): Promise { + /** + * No special permissions should be required to read this. + */ + const result = await this.db.sql` + SELECT + system_identifier + FROM + pg_control_system(); + `.first<{ system_identifier: bigint }>(); + const id = result!.system_identifier.toString(); + + return { + id, + type: lib_postgres.POSTGRES_CONNECTION_TYPE + }; + } + // TODO possibly share implementation in abstract class async configureSyncRules( sync_rules: string, diff --git a/modules/module-postgres/package.json b/modules/module-postgres/package.json index 935323c0f..424efc21e 100644 --- a/modules/module-postgres/package.json +++ b/modules/module-postgres/package.json @@ -28,23 +28,25 @@ } }, "dependencies": { - "@powersync/lib-services-framework": "workspace:*", "@powersync/lib-service-postgres": "workspace:*", + "@powersync/lib-services-framework": "workspace:*", "@powersync/service-core": "workspace:*", "@powersync/service-jpgwire": "workspace:*", "@powersync/service-jsonbig": "workspace:*", "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", - "pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87", "jose": "^4.15.1", + "pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87", + "semver": "^7.5.4", "ts-codec": "^1.3.0", - "uuid": "^9.0.1", - "uri-js": "^4.4.1" + "uri-js": "^4.4.1", + "uuid": "^9.0.1" }, "devDependencies": { - "@types/uuid": "^9.0.4", "@powersync/service-core-tests": "workspace:*", "@powersync/service-module-mongodb-storage": "workspace:*", - "@powersync/service-module-postgres-storage": "workspace:*" + "@powersync/service-module-postgres-storage": "workspace:*", + "@types/semver": "^7.5.4", + "@types/uuid": "^9.0.4" } } diff --git a/modules/module-postgres/src/replication/PgManager.ts b/modules/module-postgres/src/replication/PgManager.ts index ad1ab899d..03a3db4fa 100644 --- a/modules/module-postgres/src/replication/PgManager.ts +++ b/modules/module-postgres/src/replication/PgManager.ts @@ -1,4 +1,5 @@ import * as pgwire from '@powersync/service-jpgwire'; +import semver from 'semver'; import { NormalizedPostgresConnectionConfig } from '../types/types.js'; /** @@ -35,6 +36,15 @@ export class PgManager { return await p; } + /** + * @returns The Postgres server version in a parsed Semver instance + */ + async getServerVersion(): Promise { + const result = await this.pool.query(`SHOW server_version;`); + // The result is usually of the form "16.2 (Debian 16.2-1.pgdg120+2)" + return semver.coerce(result.rows[0][0].split(' ')[0]); + } + /** * Create a new standard connection, used for initial snapshot. * diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 92b8a2031..24b97fa50 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -13,6 +13,9 @@ export const ZERO_LSN = '00000000/00000000'; export const PUBLICATION_NAME = 'powersync'; export const POSTGRES_DEFAULT_SCHEMA = 'public'; +export const KEEPALIVE_CONTENT = 'ping'; +export const KEEPALIVE_BUFFER = Buffer.from(KEEPALIVE_CONTENT); + export interface WalStreamOptions { connections: PgManager; storage: storage.SyncRulesBucketStorage; @@ -26,6 +29,22 @@ interface InitResult { needsNewSlot: boolean; } +export const isKeepAliveMessage = (msg: pgwire.PgoutputMessage) => { + return ( + msg.tag == 'message' && + msg.prefix == 'powersync' && + msg.content && + Buffer.from(msg.content).equals(KEEPALIVE_BUFFER) + ); +}; + +export const sendKeepAlive = async (db: pgwire.PgClient) => { + await lib_postgres.retriedQuery(db, { + statement: `SELECT * FROM pg_logical_emit_message(false, 'powersync', $1)`, + params: [{ type: 'varchar', value: KEEPALIVE_CONTENT }] + }); +}; + export class MissingReplicationSlotError extends Error { constructor(message: string) { super(message); @@ -65,10 +84,7 @@ export class WalStream { // Ping to speed up cancellation of streaming replication // We're not using pg_snapshot here, since it could be in the middle of // an initial replication transaction. - const promise = lib_postgres.retriedQuery( - this.connections.pool, - `SELECT * FROM pg_logical_emit_message(false, 'powersync', 'ping')` - ); + const promise = sendKeepAlive(this.connections.pool); promise.catch((e) => { // Failures here are okay - this only speeds up stopping the process. logger.warn('Failed to ping connection', e); @@ -376,6 +392,15 @@ WHERE oid = $1::regclass`, await batch.commit(ZERO_LSN); } ); + /** + * Send a keepalive message after initial replication. + * In some edge cases we wait for a keepalive after the initial snapshot. + * If we don't explicitly check the contents of keepalive messages then a keepalive is detected + * rather quickly after initial replication - perhaps due to other WAL events. + * If we do explicitly check the contents of messages, we need an actual keepalive payload in order + * to advance the active sync rules LSN. + */ + await sendKeepAlive(db); } static *getQueryData(results: Iterable): Generator { @@ -592,13 +617,50 @@ WHERE oid = $1::regclass`, async streamChanges(replicationConnection: pgwire.PgConnection) { // When changing any logic here, check /docs/wal-lsns.md. + const replicationOptions: Record = { + proto_version: '1', + publication_names: PUBLICATION_NAME + }; + + /** + * Viewing the contents of logical messages emitted with `pg_logical_emit_message` + * is only supported on Postgres >= 14.0. + * https://www.postgresql.org/docs/14/protocol-logical-replication.html + */ + const version = await this.connections.getServerVersion(); + const exposesLogicalMessages = version ? version.compareMain('14.0.0') >= 0 : false; + + if (exposesLogicalMessages) { + /** + * Only add this option if the Postgres server supports it. + * Adding the option to a server that doesn't support it will throw an exception when starting logical replication. + * Error: `unrecognized pgoutput option: messages` + */ + replicationOptions['messages'] = 'true'; + } else { + const storageIdentifier = await this.storage.factory.getSystemIdentifier(); + if (storageIdentifier.type == lib_postgres.POSTGRES_CONNECTION_TYPE) { + /** + * Check if the same cluster is being used for both the sync bucket storage and the logical replication. + */ + const replicationIdentifierResult = pgwire.pgwireRows( + await lib_postgres.retriedQuery(this.connections.pool, `SELECT system_identifier FROM pg_control_system();`) + ) as Array<{ system_identifier: bigint }>; + + const replicationIdentifier = replicationIdentifierResult[0].system_identifier.toString(); + if (replicationIdentifier == storageIdentifier.id) { + throw new Error( + `Separate Postgres clusters are required for the replication source and sync bucket storage when using .` + ); + } + } + } + const replicationStream = replicationConnection.logicalReplication({ slot: this.slot_name, - options: { - proto_version: '1', - publication_names: PUBLICATION_NAME - } + options: replicationOptions }); + this.startedStreaming = true; // Auto-activate as soon as initial replication is done @@ -621,6 +683,15 @@ WHERE oid = $1::regclass`, // chunkLastLsn may come from normal messages in the chunk, // or from a PrimaryKeepalive message. const { messages, lastLsn: chunkLastLsn } = chunk; + + /** + * We can check if an explicit keepalive was sent if `exposesLogicalMessages == true`. + * If we can't check the logical messages, we should assume a keepalive if we + * receive an empty array of messages in a replication event. + */ + const assumeKeepalive = !exposesLogicalMessages; + let keepaliveDetected = false; + for (const msg of messages) { if (msg.tag == 'relation') { await this.handleRelation(batch, getPgOutputRelation(msg), true); @@ -636,12 +707,25 @@ WHERE oid = $1::regclass`, logger.info(`${this.slot_name} replicating op ${count} ${msg.lsn}`); } + /** + * If we can see the contents of logical messages, then we can check if a keepalive + * message is present. We only perform a keepalive (below) if we explicitly detect a keepalive message. + * If we can't see the contents of logical messages, then we should assume a keepalive is required + * due to the default value of `assumeKeepalive`. + */ + if (exposesLogicalMessages && isKeepAliveMessage(msg)) { + keepaliveDetected = true; + } + count += 1; await this.writeChange(batch, msg); } } - if (!inTx) { + if (!inTx && (assumeKeepalive || keepaliveDetected)) { + // Reset the detection flag. + keepaliveDetected = false; + // In a transaction, we ack and commit according to the transaction progress. // Outside transactions, we use the PrimaryKeepalive messages to advance progress. // Big caveat: This _must not_ be used to skip individual messages, since this LSN diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index 40247452a..e7db0e820 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -1,6 +1,6 @@ import { container } from '@powersync/lib-services-framework'; import { PgManager } from './PgManager.js'; -import { MissingReplicationSlotError, WalStream } from './WalStream.js'; +import { MissingReplicationSlotError, sendKeepAlive, WalStream } from './WalStream.js'; import { replication } from '@powersync/service-core'; import { ConnectionManagerFactory } from './ConnectionManagerFactory.js'; @@ -37,7 +37,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob */ async keepAlive() { try { - await this.connectionManager.pool.query(`SELECT * FROM pg_logical_emit_message(false, 'powersync', 'ping')`); + await sendKeepAlive(this.connectionManager.pool); } catch (e) { this.logger.warn(`KeepAlive failed, unable to post to WAL`, e); } diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index ff7b653f5..02ec212ee 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -60,6 +60,20 @@ export interface BucketStorageFactoryListener extends DisposableListener { replicationEvent: (event: ReplicationEventPayload) => void; } +export interface BucketStorageSystemIdentifier { + /** + * A unique identifier for the system used for storage. + * For Postgres this can be the cluster `system_identifier`. + * For MongoDB this can be the replica set ID. + */ + id: string; + /** + * A unique type for the storage implementation. + * e.g. `mongodb`, `postgresql`. + */ + type: string; +} + export interface BucketStorageFactory extends AsyncDisposableObserverClient { /** * Update sync rules from configuration, if changed. @@ -143,6 +157,11 @@ export interface BucketStorageFactory extends AsyncDisposableObserverClient; + + /** + * Get a unique identifier for the system used for storage. + */ + getSystemIdentifier(): Promise; } export interface ReplicationCheckpoint { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 63a93834a..446912373 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -328,6 +328,9 @@ importers: pgwire: specifier: github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87 version: https://codeload.github.com/kagis/pgwire/tar.gz/f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87 + semver: + specifier: ^7.5.4 + version: 7.6.2 ts-codec: specifier: ^1.3.0 version: 1.3.0 @@ -347,6 +350,9 @@ importers: '@powersync/service-module-postgres-storage': specifier: workspace:* version: link:../module-postgres-storage + '@types/semver': + specifier: ^7.5.4 + version: 7.5.8 '@types/uuid': specifier: ^9.0.4 version: 9.0.8 @@ -1626,6 +1632,7 @@ packages: acorn-import-assertions@1.9.0: resolution: {integrity: sha512-cmMwop9x+8KFhxvKrKfPYmN6/pKTYYHBqLa0DfvVZcKMJWNyWLnaqND7dx/qn66R7ewM1UX5XMaDVP5wlVTaVA==} + deprecated: package has been renamed to acorn-import-attributes peerDependencies: acorn: ^8 From b3894c9f25818ad23294d1c2a919a70b22cd75bb Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 28 Jan 2025 14:24:59 +0200 Subject: [PATCH 04/20] add error code --- modules/module-postgres/src/replication/WalStream.ts | 8 ++++++-- packages/service-errors/src/codes.ts | 10 ++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index f30970446..d4747601b 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -1,6 +1,8 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import { container, + DatabaseConnectionError, + ErrorCode, errors, logger, ReplicationAbortedError, @@ -656,8 +658,10 @@ WHERE oid = $1::regclass`, const replicationIdentifier = replicationIdentifierResult[0].system_identifier.toString(); if (replicationIdentifier == storageIdentifier.id) { - throw new Error( - `Separate Postgres clusters are required for the replication source and sync bucket storage when using .` + throw new DatabaseConnectionError( + ErrorCode.PSYNC_S1144, + `Separate Postgres clusters are required for the replication source and sync bucket storage when using Postgres version ${version}.`, + null ); } } diff --git a/packages/service-errors/src/codes.ts b/packages/service-errors/src/codes.ts index 18901d51b..b3dfb1f1a 100644 --- a/packages/service-errors/src/codes.ts +++ b/packages/service-errors/src/codes.ts @@ -158,6 +158,16 @@ export enum ErrorCode { */ PSYNC_S1143 = 'PSYNC_S1143', + /** + * Invalid Postgres cluster configuration for replication and sync bucket storage. + * + * The same Postgres cluster, running an unsupported version of Postgres, has been configured for both replication and sync bucket storage. + * Using the same Postgres cluster is only supported on Postgres 14 and above. + * This error typically indicates that the Postgres version is below 14. + * Either upgrade the Postgres cluster to version 14 or above, or use a different Postgres cluster for sync bucket storage. + */ + PSYNC_S1144 = 'PSYNC_S1144', + // ## PSYNC_S12xx: MySQL replication issues // ## PSYNC_S13xx: MongoDB replication issues From 37e9228bc1c37fe9284162f5896830a261f25537 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 28 Jan 2025 14:32:57 +0200 Subject: [PATCH 05/20] add error cause --- modules/module-postgres/src/replication/WalStream.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index d4747601b..af6221c95 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -661,7 +661,7 @@ WHERE oid = $1::regclass`, throw new DatabaseConnectionError( ErrorCode.PSYNC_S1144, `Separate Postgres clusters are required for the replication source and sync bucket storage when using Postgres version ${version}.`, - null + new Error('Postgres version is below 14') ); } } From e63a4e1477a54ed90ca29d80155cb4703a457655 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Tue, 28 Jan 2025 18:43:20 +0200 Subject: [PATCH 06/20] update to use replica set name --- .../src/storage/MongoBucketStorage.ts | 18 +++++++++--------- .../src/replication/WalStream.ts | 17 +++++++++++++++-- 2 files changed, 24 insertions(+), 11 deletions(-) diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 49ea09bb0..ada8922eb 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -85,16 +85,16 @@ export class MongoBucketStorage } async getSystemIdentifier(): Promise { - let id = 'N/A'; - try { - const result = await this.client - .db('local') - .collection('system.replset') - .findOne({}, { projection: { _id: 1 } }); - id = result?._id.toString() ?? id; - } catch (ex) { - logger.warn(`Could not query MongoDB replica set id`, ex); + const { setName: id } = await this.db.db.command({ + hello: 1 + }); + if (id == null) { + throw new ServiceError( + ErrorCode.PSYNC_S1342, + 'Standalone MongoDB instances are not supported - use a replicaset.' + ); } + return { id, type: lib_mongo.MONGO_CONNECTION_TYPE diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index af6221c95..1124ee669 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -48,7 +48,12 @@ export const isKeepAliveMessage = (msg: pgwire.PgoutputMessage) => { export const sendKeepAlive = async (db: pgwire.PgClient) => { await lib_postgres.retriedQuery(db, { - statement: `SELECT * FROM pg_logical_emit_message(false, 'powersync', $1)`, + statement: /* sql */ ` + SELECT + * + FROM + pg_logical_emit_message(FALSE, 'powersync', $1) + `, params: [{ type: 'varchar', value: KEEPALIVE_CONTENT }] }); }; @@ -653,7 +658,15 @@ WHERE oid = $1::regclass`, * Check if the same cluster is being used for both the sync bucket storage and the logical replication. */ const replicationIdentifierResult = pgwire.pgwireRows( - await lib_postgres.retriedQuery(this.connections.pool, `SELECT system_identifier FROM pg_control_system();`) + await lib_postgres.retriedQuery( + this.connections.pool, + /* sql */ ` + SELECT + system_identifier + FROM + pg_control_system(); + ` + ) ) as Array<{ system_identifier: bigint }>; const replicationIdentifier = replicationIdentifierResult[0].system_identifier.toString(); From 4f66e95c2ac8f451e28d65dd49df7e40e41bb1cc Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 29 Jan 2025 09:41:40 +0200 Subject: [PATCH 07/20] add unit test for streaming errors --- .../test/src/storage_combination.test.ts | 41 +++++++++++++++++++ .../test/src/wal_stream_utils.ts | 9 ++++ 2 files changed, 50 insertions(+) create mode 100644 modules/module-postgres/test/src/storage_combination.test.ts diff --git a/modules/module-postgres/test/src/storage_combination.test.ts b/modules/module-postgres/test/src/storage_combination.test.ts new file mode 100644 index 000000000..27baa54f1 --- /dev/null +++ b/modules/module-postgres/test/src/storage_combination.test.ts @@ -0,0 +1,41 @@ +import * as postgres_storage from '@powersync/service-module-postgres-storage'; +import { describe, expect, test } from 'vitest'; +import { env } from './env.js'; +import { WalStreamTestContext } from './wal_stream_utils.js'; + +describe.skipIf(!env.TEST_POSTGRES_STORAGE)('replication storage combination - postgres', function () { + test('should allow the same Postgres cluster to be used for data and storage', async () => { + // Use the same cluster for the storage as the data source + await using context = await WalStreamTestContext.open( + postgres_storage.PostgresTestStorageFactoryGenerator({ + url: env.PG_TEST_URL + }), + { doNotClear: false } + ); + + await context.updateSyncRules(/* yaml */ + ` bucket_definitions: + global: + data: + - SELECT * FROM "test_data" `); + + const { pool, connectionManager } = context; + + const sourceVersion = await connectionManager.getServerVersion(); + + await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`); + + await context.replicateSnapshot(); + + // Perhaps we should check and throw when replicating the snapshot or somewhere else earlier. + context.startStreaming(); + + if (sourceVersion!.compareMain('14.0.0') < 0) { + console.log('waiting for throw'); + await expect(context.waitForStream()).rejects.toThrow(); + console.log('done with throw'); + } else { + await expect(context.waitForStream()).resolves.toReturn(); + } + }); +}); diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index f25d6d083..cea951341 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -41,6 +41,15 @@ export class WalStreamTestContext implements AsyncDisposable { await this.dispose(); } + async waitForStream() { + try { + await this.streamPromise; + } catch (ex) { + this.streamPromise = undefined; + throw ex; + } + } + async dispose() { this.abortController.abort(); await this.streamPromise; From 8cda6ff975ff0a0cd347b87067fb46b41b0dd275 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 29 Jan 2025 09:55:01 +0200 Subject: [PATCH 08/20] throw an error on initial replication if storage is incompatible --- .../src/replication/WalStream.ts | 80 ++++++++++++------- .../test/src/storage_combination.test.ts | 11 +-- 2 files changed, 53 insertions(+), 38 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 1124ee669..c437fe0ab 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -203,6 +203,7 @@ export class WalStream { async initSlot(): Promise { await checkSourceConfiguration(this.connections.pool, PUBLICATION_NAME); + await this.ensureStorageCompatibility(); const slotName = this.slot_name; @@ -641,9 +642,9 @@ WHERE oid = $1::regclass`, * is only supported on Postgres >= 14.0. * https://www.postgresql.org/docs/14/protocol-logical-replication.html */ - const version = await this.connections.getServerVersion(); - const exposesLogicalMessages = version ? version.compareMain('14.0.0') >= 0 : false; + await this.ensureStorageCompatibility(); + const exposesLogicalMessages = await this.checkLogicalMessageSupport(); if (exposesLogicalMessages) { /** * Only add this option if the Postgres server supports it. @@ -651,33 +652,6 @@ WHERE oid = $1::regclass`, * Error: `unrecognized pgoutput option: messages` */ replicationOptions['messages'] = 'true'; - } else { - const storageIdentifier = await this.storage.factory.getSystemIdentifier(); - if (storageIdentifier.type == lib_postgres.POSTGRES_CONNECTION_TYPE) { - /** - * Check if the same cluster is being used for both the sync bucket storage and the logical replication. - */ - const replicationIdentifierResult = pgwire.pgwireRows( - await lib_postgres.retriedQuery( - this.connections.pool, - /* sql */ ` - SELECT - system_identifier - FROM - pg_control_system(); - ` - ) - ) as Array<{ system_identifier: bigint }>; - - const replicationIdentifier = replicationIdentifierResult[0].system_identifier.toString(); - if (replicationIdentifier == storageIdentifier.id) { - throw new DatabaseConnectionError( - ErrorCode.PSYNC_S1144, - `Separate Postgres clusters are required for the replication source and sync bucket storage when using Postgres version ${version}.`, - new Error('Postgres version is below 14') - ); - } - } } const replicationStream = replicationConnection.logicalReplication({ @@ -773,6 +747,54 @@ WHERE oid = $1::regclass`, replicationStream.ack(lsn); } + + /** + * Ensures that the storage is compatible with the replication connection. + * @throws {DatabaseConnectionError} If the storage is not compatible with the replication connection. + */ + protected async ensureStorageCompatibility() { + const supportsLogicalMessages = await this.checkLogicalMessageSupport(); + + if (supportsLogicalMessages) { + return; + } + + const storageIdentifier = await this.storage.factory.getSystemIdentifier(); + if (storageIdentifier.type == lib_postgres.POSTGRES_CONNECTION_TYPE) { + /** + * Check if the same cluster is being used for both the sync bucket storage and the logical replication. + */ + const replicationIdentifierResult = pgwire.pgwireRows( + await lib_postgres.retriedQuery( + this.connections.pool, + /* sql */ ` + SELECT + system_identifier + FROM + pg_control_system(); + ` + ) + ) as Array<{ system_identifier: bigint }>; + + const replicationIdentifier = replicationIdentifierResult[0].system_identifier.toString(); + if (replicationIdentifier == storageIdentifier.id) { + throw new DatabaseConnectionError( + ErrorCode.PSYNC_S1144, + `Separate Postgres clusters are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, + new Error('Postgres version is below 14') + ); + } + } + } + + /** + * Check if the replication connection Postgres server supports + * viewing the contents of logical replication messages. + */ + protected async checkLogicalMessageSupport() { + const version = await this.connections.getServerVersion(); + return version ? version.compareMain('14.0.0') >= 0 : false; + } } async function touch() { diff --git a/modules/module-postgres/test/src/storage_combination.test.ts b/modules/module-postgres/test/src/storage_combination.test.ts index 27baa54f1..2fbfe3927 100644 --- a/modules/module-postgres/test/src/storage_combination.test.ts +++ b/modules/module-postgres/test/src/storage_combination.test.ts @@ -25,17 +25,10 @@ describe.skipIf(!env.TEST_POSTGRES_STORAGE)('replication storage combination - p await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`); - await context.replicateSnapshot(); - - // Perhaps we should check and throw when replicating the snapshot or somewhere else earlier. - context.startStreaming(); - if (sourceVersion!.compareMain('14.0.0') < 0) { - console.log('waiting for throw'); - await expect(context.waitForStream()).rejects.toThrow(); - console.log('done with throw'); + await expect(context.replicateSnapshot).rejects.toThrow(); } else { - await expect(context.waitForStream()).resolves.toReturn(); + await expect(context.replicateSnapshot).resolves.toReturn(); } }); }); From cc2764905d9536d4026e193a5753f6191bd37399 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 29 Jan 2025 10:09:39 +0200 Subject: [PATCH 09/20] update readme --- modules/module-postgres-storage/README.md | 8 +++++++- .../module-postgres/test/src/storage_combination.test.ts | 4 ++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/modules/module-postgres-storage/README.md b/modules/module-postgres-storage/README.md index f53bc947e..c12043733 100644 --- a/modules/module-postgres-storage/README.md +++ b/modules/module-postgres-storage/README.md @@ -31,7 +31,13 @@ storage: ``` **IMPORTANT**: -A separate Postgres server is currently required for replication connections (if using Postgres for replication) and storage. Using the same server might cause unexpected results. + +A separate Postgres server is required for replication connections **if using PostgreSQL versions below 14**. + +| PostgreSQL Version | Server configuration | +| ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------- | +| Below 14 | Separate servers are required for the source and sync bucket storage. Using the same server on older versions might cause unexpected results. | +| 14 and above | The same server can be used for the source and sync bucket storage. Separate servers are also supported. | ### Connection credentials diff --git a/modules/module-postgres/test/src/storage_combination.test.ts b/modules/module-postgres/test/src/storage_combination.test.ts index 2fbfe3927..452b0c2ad 100644 --- a/modules/module-postgres/test/src/storage_combination.test.ts +++ b/modules/module-postgres/test/src/storage_combination.test.ts @@ -26,9 +26,9 @@ describe.skipIf(!env.TEST_POSTGRES_STORAGE)('replication storage combination - p await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`); if (sourceVersion!.compareMain('14.0.0') < 0) { - await expect(context.replicateSnapshot).rejects.toThrow(); + await expect(context.replicateSnapshot()).rejects.toThrow(); } else { - await expect(context.replicateSnapshot).resolves.toReturn(); + await expect(context.replicateSnapshot()).resolves.toReturn(); } }); }); From bfe54fb95123a2c84c1817bcf559991743236228 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 29 Jan 2025 10:48:06 +0200 Subject: [PATCH 10/20] fix test --- modules/module-postgres/test/src/storage_combination.test.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/modules/module-postgres/test/src/storage_combination.test.ts b/modules/module-postgres/test/src/storage_combination.test.ts index 452b0c2ad..1183d6806 100644 --- a/modules/module-postgres/test/src/storage_combination.test.ts +++ b/modules/module-postgres/test/src/storage_combination.test.ts @@ -28,7 +28,8 @@ describe.skipIf(!env.TEST_POSTGRES_STORAGE)('replication storage combination - p if (sourceVersion!.compareMain('14.0.0') < 0) { await expect(context.replicateSnapshot()).rejects.toThrow(); } else { - await expect(context.replicateSnapshot()).resolves.toReturn(); + // Should resolve + await context.replicateSnapshot(); } }); }); From e9c5f3d38eb1a4179f8763118c967f043abdfed2 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 29 Jan 2025 11:46:44 +0200 Subject: [PATCH 11/20] code cleanup --- modules/module-postgres-storage/README.md | 8 ++++---- modules/module-postgres/src/replication/WalStream.ts | 10 +++++----- modules/module-postgres/test/src/wal_stream_utils.ts | 9 --------- packages/service-core/src/storage/BucketStorage.ts | 2 +- 4 files changed, 10 insertions(+), 19 deletions(-) diff --git a/modules/module-postgres-storage/README.md b/modules/module-postgres-storage/README.md index c12043733..96b10df25 100644 --- a/modules/module-postgres-storage/README.md +++ b/modules/module-postgres-storage/README.md @@ -34,10 +34,10 @@ storage: A separate Postgres server is required for replication connections **if using PostgreSQL versions below 14**. -| PostgreSQL Version | Server configuration | -| ------------------ | --------------------------------------------------------------------------------------------------------------------------------------------- | -| Below 14 | Separate servers are required for the source and sync bucket storage. Using the same server on older versions might cause unexpected results. | -| 14 and above | The same server can be used for the source and sync bucket storage. Separate servers are also supported. | +| PostgreSQL Version | Server configuration | +| ------------------ | --------------------------------------------------------------------------------------------------------------------------------- | +| Below 14 | Separate servers are required for the source and sync bucket storage. Replication will be blocked if the same server is detected. | +| 14 and above | The same server can be used for the source and sync bucket storage. Separate servers are also supported. | ### Connection credentials diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index c437fe0ab..6c0f48c0c 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -632,11 +632,6 @@ WHERE oid = $1::regclass`, async streamChanges(replicationConnection: pgwire.PgConnection) { // When changing any logic here, check /docs/wal-lsns.md. - const replicationOptions: Record = { - proto_version: '1', - publication_names: PUBLICATION_NAME - }; - /** * Viewing the contents of logical messages emitted with `pg_logical_emit_message` * is only supported on Postgres >= 14.0. @@ -644,6 +639,11 @@ WHERE oid = $1::regclass`, */ await this.ensureStorageCompatibility(); + const replicationOptions: Record = { + proto_version: '1', + publication_names: PUBLICATION_NAME + }; + const exposesLogicalMessages = await this.checkLogicalMessageSupport(); if (exposesLogicalMessages) { /** diff --git a/modules/module-postgres/test/src/wal_stream_utils.ts b/modules/module-postgres/test/src/wal_stream_utils.ts index cea951341..f25d6d083 100644 --- a/modules/module-postgres/test/src/wal_stream_utils.ts +++ b/modules/module-postgres/test/src/wal_stream_utils.ts @@ -41,15 +41,6 @@ export class WalStreamTestContext implements AsyncDisposable { await this.dispose(); } - async waitForStream() { - try { - await this.streamPromise; - } catch (ex) { - this.streamPromise = undefined; - throw ex; - } - } - async dispose() { this.abortController.abort(); await this.streamPromise; diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 02ec212ee..1a201503b 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -64,7 +64,7 @@ export interface BucketStorageSystemIdentifier { /** * A unique identifier for the system used for storage. * For Postgres this can be the cluster `system_identifier`. - * For MongoDB this can be the replica set ID. + * For MongoDB this can be the replica set name. */ id: string; /** From aad0fa13ebe670db3a45f902ee9a13529e4f459b Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 29 Jan 2025 11:57:42 +0200 Subject: [PATCH 12/20] wording update --- modules/module-postgres/src/replication/WalStream.ts | 4 ++-- packages/service-errors/src/codes.ts | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 6c0f48c0c..60ef0d784 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -762,7 +762,7 @@ WHERE oid = $1::regclass`, const storageIdentifier = await this.storage.factory.getSystemIdentifier(); if (storageIdentifier.type == lib_postgres.POSTGRES_CONNECTION_TYPE) { /** - * Check if the same cluster is being used for both the sync bucket storage and the logical replication. + * Check if the same server is being used for both the sync bucket storage and the logical replication. */ const replicationIdentifierResult = pgwire.pgwireRows( await lib_postgres.retriedQuery( @@ -780,7 +780,7 @@ WHERE oid = $1::regclass`, if (replicationIdentifier == storageIdentifier.id) { throw new DatabaseConnectionError( ErrorCode.PSYNC_S1144, - `Separate Postgres clusters are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, + `Separate Postgres server are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, new Error('Postgres version is below 14') ); } diff --git a/packages/service-errors/src/codes.ts b/packages/service-errors/src/codes.ts index b3dfb1f1a..9458be92c 100644 --- a/packages/service-errors/src/codes.ts +++ b/packages/service-errors/src/codes.ts @@ -159,12 +159,12 @@ export enum ErrorCode { PSYNC_S1143 = 'PSYNC_S1143', /** - * Invalid Postgres cluster configuration for replication and sync bucket storage. + * Invalid Postgres server configuration for replication and sync bucket storage. * - * The same Postgres cluster, running an unsupported version of Postgres, has been configured for both replication and sync bucket storage. - * Using the same Postgres cluster is only supported on Postgres 14 and above. + * The same Postgres server, running an unsupported version of Postgres, has been configured for both replication and sync bucket storage. + * Using the same Postgres server is only supported on Postgres 14 and above. * This error typically indicates that the Postgres version is below 14. - * Either upgrade the Postgres cluster to version 14 or above, or use a different Postgres cluster for sync bucket storage. + * Either upgrade the Postgres server to version 14 or above, or use a different Postgres server for sync bucket storage. */ PSYNC_S1144 = 'PSYNC_S1144', From 031d7acd7d79a305e9e998dbc673b51413b884f0 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 29 Jan 2025 13:38:37 +0200 Subject: [PATCH 13/20] cleanup --- modules/module-postgres-storage/README.md | 2 +- modules/module-postgres/src/replication/WalStream.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/modules/module-postgres-storage/README.md b/modules/module-postgres-storage/README.md index 96b10df25..8aab123df 100644 --- a/modules/module-postgres-storage/README.md +++ b/modules/module-postgres-storage/README.md @@ -32,7 +32,7 @@ storage: **IMPORTANT**: -A separate Postgres server is required for replication connections **if using PostgreSQL versions below 14**. +Separate Postgres servers are required for replication connections **if using PostgreSQL versions below 14**. | PostgreSQL Version | Server configuration | | ------------------ | --------------------------------------------------------------------------------------------------------------------------------- | diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 60ef0d784..56bc35afb 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -780,7 +780,7 @@ WHERE oid = $1::regclass`, if (replicationIdentifier == storageIdentifier.id) { throw new DatabaseConnectionError( ErrorCode.PSYNC_S1144, - `Separate Postgres server are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, + `Separate Postgres servers are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, new Error('Postgres version is below 14') ); } From c7329bb38806fcbdcde7dd7a1d90b659e25c89bf Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Wed, 29 Jan 2025 13:58:56 +0200 Subject: [PATCH 14/20] added changeset --- .changeset/pretty-countries-cover.md | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changeset/pretty-countries-cover.md diff --git a/.changeset/pretty-countries-cover.md b/.changeset/pretty-countries-cover.md new file mode 100644 index 000000000..7a660a19a --- /dev/null +++ b/.changeset/pretty-countries-cover.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-postgres': minor +--- + +Allowed using the same Postgres server for the replication source and sync bucket storage. This is only supported on Postgres versions newer than 14.0. From ea5c3f89ec06a394406a4ff9111e41727bbf824c Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 30 Jan 2025 09:42:49 +0200 Subject: [PATCH 15/20] fix notification bug --- .changeset/strong-tables-rescue.md | 5 ++++ .../src/db/connection/ConnectionSlot.ts | 27 +++++-------------- .../src/db/connection/DatabaseClient.ts | 6 +++-- 3 files changed, 15 insertions(+), 23 deletions(-) create mode 100644 .changeset/strong-tables-rescue.md diff --git a/.changeset/strong-tables-rescue.md b/.changeset/strong-tables-rescue.md new file mode 100644 index 000000000..b2a0c15a8 --- /dev/null +++ b/.changeset/strong-tables-rescue.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-postgres-storage': patch +--- + +Fix bug where listening to active checkpoint notifications on an ended connection could cause a crash. diff --git a/libs/lib-postgres/src/db/connection/ConnectionSlot.ts b/libs/lib-postgres/src/db/connection/ConnectionSlot.ts index b2b543b6c..ac04832e1 100644 --- a/libs/lib-postgres/src/db/connection/ConnectionSlot.ts +++ b/libs/lib-postgres/src/db/connection/ConnectionSlot.ts @@ -50,9 +50,12 @@ export class ConnectionSlot extends framework.DisposableObserver l.connectionCreated?.(connection)); - if (this.hasNotificationListener()) { - await this.configureConnectionNotifications(connection); - } + + /** + * Configure the Postgres connection to listen to notifications. + * Subscribing to notifications, even without a registered listener, should not add much overhead. + */ + await this.configureConnectionNotifications(connection); return connection; } @@ -64,11 +67,6 @@ export class ConnectionSlot extends framework.DisposableObserver): () => void { - const dispose = super.registerListener(listener); - if (this.connection && this.hasNotificationListener()) { - this.configureConnectionNotifications(this.connection); - } - return () => { - dispose(); - if (this.connection && !this.hasNotificationListener()) { - this.connection.onnotification = () => {}; - } - }; - } - protected handleNotification = (payload: pgwire.PgNotification) => { if (!this.options.notificationChannels?.includes(payload.channel)) { return; diff --git a/libs/lib-postgres/src/db/connection/DatabaseClient.ts b/libs/lib-postgres/src/db/connection/DatabaseClient.ts index 1fbd048c9..66fa0fa99 100644 --- a/libs/lib-postgres/src/db/connection/DatabaseClient.ts +++ b/libs/lib-postgres/src/db/connection/DatabaseClient.ts @@ -42,8 +42,10 @@ export class DatabaseClient extends AbstractPostgresConnection { - const slot = new ConnectionSlot({ config: options.config, notificationChannels: options.notificationChannels }); + this.connections = Array.from({ length: TRANSACTION_CONNECTION_COUNT }, (v, index) => { + // Only listen to notifications on a single (the first) connection + const notificationChannels = index == 0 ? options.notificationChannels : []; + const slot = new ConnectionSlot({ config: options.config, notificationChannels }); slot.registerListener({ connectionAvailable: () => this.processConnectionQueue(), connectionError: (ex) => this.handleConnectionError(ex), From 4c4151092dba13eade849792dea5b556fe3d765d Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 30 Jan 2025 09:43:10 +0200 Subject: [PATCH 16/20] cleanup keepAlive variables --- modules/module-postgres/src/replication/WalStream.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 56bc35afb..c11db171b 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -687,8 +687,8 @@ WHERE oid = $1::regclass`, * If we can't check the logical messages, we should assume a keepalive if we * receive an empty array of messages in a replication event. */ - const assumeKeepalive = !exposesLogicalMessages; - let keepaliveDetected = false; + const assumeKeepAlive = !exposesLogicalMessages; + let keepAliveDetected = false; for (const msg of messages) { if (msg.tag == 'relation') { @@ -712,7 +712,7 @@ WHERE oid = $1::regclass`, * due to the default value of `assumeKeepalive`. */ if (exposesLogicalMessages && isKeepAliveMessage(msg)) { - keepaliveDetected = true; + keepAliveDetected = true; } count += 1; @@ -720,9 +720,9 @@ WHERE oid = $1::regclass`, } } - if (!inTx && (assumeKeepalive || keepaliveDetected)) { + if (!inTx && (assumeKeepAlive || keepAliveDetected)) { // Reset the detection flag. - keepaliveDetected = false; + keepAliveDetected = false; // In a transaction, we ack and commit according to the transaction progress. // Outside transactions, we use the PrimaryKeepalive messages to advance progress. From 5d6da80354193030dab5440a7416abf600725ca9 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 30 Jan 2025 12:04:12 +0200 Subject: [PATCH 17/20] allow using the same database for replication source and storage. --- .changeset/smart-chairs-smoke.md | 7 +++ .../src/db/connection/DatabaseClient.ts | 3 +- .../src/utils/identifier-utils.ts | 38 +++++++++++++ libs/lib-postgres/src/utils/utils-index.ts | 1 + .../implementation/MongoBucketBatch.ts | 9 ++- modules/module-postgres-storage/README.md | 8 +-- .../storage/PostgresBucketStorageFactory.ts | 13 +---- .../src/storage/batch/PostgresBucketBatch.ts | 17 +++++- .../src/replication/WalStream.ts | 57 +++++++++---------- .../service-core/src/storage/BucketStorage.ts | 18 +++++- 10 files changed, 120 insertions(+), 51 deletions(-) create mode 100644 .changeset/smart-chairs-smoke.md create mode 100644 libs/lib-postgres/src/utils/identifier-utils.ts diff --git a/.changeset/smart-chairs-smoke.md b/.changeset/smart-chairs-smoke.md new file mode 100644 index 000000000..9bb8642eb --- /dev/null +++ b/.changeset/smart-chairs-smoke.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core': minor +--- + +Added the ability to skip creating empty sync checkpoints if no changes were present in a batch. diff --git a/libs/lib-postgres/src/db/connection/DatabaseClient.ts b/libs/lib-postgres/src/db/connection/DatabaseClient.ts index 66fa0fa99..9896a4eec 100644 --- a/libs/lib-postgres/src/db/connection/DatabaseClient.ts +++ b/libs/lib-postgres/src/db/connection/DatabaseClient.ts @@ -32,7 +32,8 @@ export const TRANSACTION_CONNECTION_COUNT = 5; export class DatabaseClient extends AbstractPostgresConnection { closed: boolean; - protected pool: pgwire.PgClient; + pool: pgwire.PgClient; + protected connections: ConnectionSlot[]; protected initialized: Promise; diff --git a/libs/lib-postgres/src/utils/identifier-utils.ts b/libs/lib-postgres/src/utils/identifier-utils.ts new file mode 100644 index 000000000..7e4c387e2 --- /dev/null +++ b/libs/lib-postgres/src/utils/identifier-utils.ts @@ -0,0 +1,38 @@ +import * as pgwire from '@powersync/service-jpgwire'; +import { retriedQuery } from './pgwire_utils.js'; + +export interface DecodedPostgresIdentifier { + server_id: string; + database_name: string; +} + +export const decodePostgresSystemIdentifier = (identifier: string): DecodedPostgresIdentifier => { + const [server_id, database_name] = identifier.split('.'); + return { server_id, database_name }; +}; + +export const encodePostgresSystemIdentifier = (decoded: DecodedPostgresIdentifier): string => { + return `${decoded.server_id}.${decoded.database_name}`; +}; + +export const queryPostgresSystemIdentifier = async ( + connection: pgwire.PgClient +): Promise => { + const result = pgwire.pgwireRows( + await retriedQuery( + connection, + /* sql */ ` + SELECT + current_database() AS database_name, + system_identifier + FROM + pg_control_system(); + ` + ) + ) as Array<{ database_name: string; system_identifier: bigint }>; + + return { + database_name: result[0].database_name, + server_id: result[0].system_identifier.toString() + }; +}; diff --git a/libs/lib-postgres/src/utils/utils-index.ts b/libs/lib-postgres/src/utils/utils-index.ts index 5bfe85120..3d6f38a57 100644 --- a/libs/lib-postgres/src/utils/utils-index.ts +++ b/libs/lib-postgres/src/utils/utils-index.ts @@ -1 +1,2 @@ +export * from './identifier-utils.js'; export * from './pgwire_utils.js'; diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 01d5577a1..2b329f233 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -9,7 +9,6 @@ import { errors, logger, ReplicationAssertionError, - ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework'; import { SaveOperationTag, storage, utils } from '@powersync/service-core'; @@ -616,7 +615,9 @@ export class MongoBucketBatch private lastWaitingLogThottled = 0; - async commit(lsn: string): Promise { + async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise { + const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options }; + await this.flush(); if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) { @@ -654,6 +655,10 @@ export class MongoBucketBatch return false; } + if (!createEmptyCheckpoints && this.persisted_op == null) { + return false; + } + const now = new Date(); const update: Partial = { last_checkpoint_lsn: lsn, diff --git a/modules/module-postgres-storage/README.md b/modules/module-postgres-storage/README.md index 8aab123df..8eb112b5a 100644 --- a/modules/module-postgres-storage/README.md +++ b/modules/module-postgres-storage/README.md @@ -34,10 +34,10 @@ storage: Separate Postgres servers are required for replication connections **if using PostgreSQL versions below 14**. -| PostgreSQL Version | Server configuration | -| ------------------ | --------------------------------------------------------------------------------------------------------------------------------- | -| Below 14 | Separate servers are required for the source and sync bucket storage. Replication will be blocked if the same server is detected. | -| 14 and above | The same server can be used for the source and sync bucket storage. Separate servers are also supported. | +| PostgreSQL Version | Server configuration | +| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| Below 14 | Separate servers are required for the source and sync bucket storage. Replication will be blocked if the same server is detected. | +| 14 and above | The source database and sync bucket storage database can be on the same server. Using the same database (with separate schemas) is supported but may lead to higher CPU usage. Using separate servers remains an option. | ### Connection credentials diff --git a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts index a470ecdde..d78a0a5de 100644 --- a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts +++ b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts @@ -158,16 +158,9 @@ export class PostgresBucketStorageFactory } async getSystemIdentifier(): Promise { - /** - * No special permissions should be required to read this. - */ - const result = await this.db.sql` - SELECT - system_identifier - FROM - pg_control_system(); - `.first<{ system_identifier: bigint }>(); - const id = result!.system_identifier.toString(); + const id = lib_postgres.utils.encodePostgresSystemIdentifier( + await lib_postgres.utils.queryPostgresSystemIdentifier(this.db.pool) + ); return { id, diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index 1226f7a7c..60c9d24dd 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -89,7 +89,7 @@ export class PostgresBucketBatch async save(record: storage.SaveOptions): Promise { // TODO maybe share with abstract class - const { after, afterReplicaId, before, beforeReplicaId, sourceTable, tag } = record; + const { after, before, sourceTable, tag } = record; for (const event of this.getTableEvents(sourceTable)) { this.iterateListeners((cb) => cb.replicationEvent?.({ @@ -245,7 +245,10 @@ export class PostgresBucketBatch private async flushInner(): Promise { const batch = this.batch; - if (batch == null) { + // Don't flush empty batches + // This helps prevent feedback loops when using the same database for + // the source data and sync bucket storage + if (batch == null || batch.length == 0) { return null; } @@ -275,7 +278,9 @@ export class PostgresBucketBatch return { flushed_op: String(lastOp) }; } - async commit(lsn: string): Promise { + async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise { + const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options }; + await this.flush(); if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) { @@ -309,6 +314,12 @@ export class PostgresBucketBatch return false; } + + // Don't create a checkpoint if there were no changes + if (!createEmptyCheckpoints && this.persisted_op == null) { + return false; + } + const now = new Date().toISOString(); const update: Partial = { last_checkpoint_lsn: lsn, diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index c11db171b..88f062e4b 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -637,7 +637,7 @@ WHERE oid = $1::regclass`, * is only supported on Postgres >= 14.0. * https://www.postgresql.org/docs/14/protocol-logical-replication.html */ - await this.ensureStorageCompatibility(); + const { createEmptyCheckpoints } = await this.ensureStorageCompatibility(); const replicationOptions: Record = { proto_version: '1', @@ -698,7 +698,7 @@ WHERE oid = $1::regclass`, } else if (msg.tag == 'commit') { Metrics.getInstance().transactions_replicated_total.add(1); inTx = false; - await batch.commit(msg.lsn!); + await batch.commit(msg.lsn!, { createEmptyCheckpoints }); await this.ack(msg.lsn!, replicationStream); } else { if (count % 100 == 0) { @@ -752,39 +752,38 @@ WHERE oid = $1::regclass`, * Ensures that the storage is compatible with the replication connection. * @throws {DatabaseConnectionError} If the storage is not compatible with the replication connection. */ - protected async ensureStorageCompatibility() { + protected async ensureStorageCompatibility(): Promise { const supportsLogicalMessages = await this.checkLogicalMessageSupport(); - if (supportsLogicalMessages) { - return; + const storageIdentifier = await this.storage.factory.getSystemIdentifier(); + if (storageIdentifier.type != lib_postgres.POSTGRES_CONNECTION_TYPE) { + return { + // Keep the same behaviour as before allowing Postgres storage. + createEmptyCheckpoints: true + }; } - const storageIdentifier = await this.storage.factory.getSystemIdentifier(); - if (storageIdentifier.type == lib_postgres.POSTGRES_CONNECTION_TYPE) { + const parsedStorageIdentifier = lib_postgres.utils.decodePostgresSystemIdentifier(storageIdentifier.id); + /** + * Check if the same server is being used for both the sync bucket storage and the logical replication. + */ + const replicationIdentifier = await lib_postgres.utils.queryPostgresSystemIdentifier(this.connections.pool); + + if (!supportsLogicalMessages && replicationIdentifier.server_id == parsedStorageIdentifier.server_id) { + throw new DatabaseConnectionError( + ErrorCode.PSYNC_S1144, + `Separate Postgres servers are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, + new Error('Postgres version is below 14') + ); + } + + return { /** - * Check if the same server is being used for both the sync bucket storage and the logical replication. + * Don't create empty checkpoints if the same Postgres database is used for the data source + * and sync bucket storage. Creating empty checkpoints will cause WAL feedback loops. */ - const replicationIdentifierResult = pgwire.pgwireRows( - await lib_postgres.retriedQuery( - this.connections.pool, - /* sql */ ` - SELECT - system_identifier - FROM - pg_control_system(); - ` - ) - ) as Array<{ system_identifier: bigint }>; - - const replicationIdentifier = replicationIdentifierResult[0].system_identifier.toString(); - if (replicationIdentifier == storageIdentifier.id) { - throw new DatabaseConnectionError( - ErrorCode.PSYNC_S1144, - `Separate Postgres servers are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, - new Error('Postgres version is below 14') - ); - } - } + createEmptyCheckpoints: replicationIdentifier.database_name != parsedStorageIdentifier.database_name + }; } /** diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 1a201503b..6d3699ab0 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -386,6 +386,20 @@ export interface BucketBatchStorageListener extends DisposableListener { replicationEvent: (payload: ReplicationEventPayload) => void; } +export interface BucketBatchCommitOptions { + /** + * Creates a new checkpoint even if there were no persisted operations. + * Defaults to true. + */ + createEmptyCheckpoints?: boolean; +} + +export type ResolvedBucketBatchCommitOptions = Required; + +export const DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS: ResolvedBucketBatchCommitOptions = { + createEmptyCheckpoints: true +}; + export interface BucketStorageBatch extends DisposableObserverClient { /** * Save an op, and potentially flush. @@ -417,11 +431,11 @@ export interface BucketStorageBatch extends DisposableObserverClient; /** - * Flush and commit any saved ops. This creates a new checkpoint. + * Flush and commit any saved ops. This creates a new checkpoint by default. * * Only call this after a transaction. */ - commit(lsn: string): Promise; + commit(lsn: string, options?: BucketBatchCommitOptions): Promise; /** * Advance the checkpoint LSN position, without any associated op. From a65f03cfca1d830b49cba480648b7a9b999f4e11 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 30 Jan 2025 13:16:59 +0200 Subject: [PATCH 18/20] cleanup comments --- modules/module-postgres/src/replication/WalStream.ts | 10 +++++----- packages/service-core/src/storage/BucketStorage.ts | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 88f062e4b..1111219b2 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -632,11 +632,6 @@ WHERE oid = $1::regclass`, async streamChanges(replicationConnection: pgwire.PgConnection) { // When changing any logic here, check /docs/wal-lsns.md. - /** - * Viewing the contents of logical messages emitted with `pg_logical_emit_message` - * is only supported on Postgres >= 14.0. - * https://www.postgresql.org/docs/14/protocol-logical-replication.html - */ const { createEmptyCheckpoints } = await this.ensureStorageCompatibility(); const replicationOptions: Record = { @@ -644,6 +639,11 @@ WHERE oid = $1::regclass`, publication_names: PUBLICATION_NAME }; + /** + * Viewing the contents of logical messages emitted with `pg_logical_emit_message` + * is only supported on Postgres >= 14.0. + * https://www.postgresql.org/docs/14/protocol-logical-replication.html + */ const exposesLogicalMessages = await this.checkLogicalMessageSupport(); if (exposesLogicalMessages) { /** diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index 6d3699ab0..008370150 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -63,7 +63,7 @@ export interface BucketStorageFactoryListener extends DisposableListener { export interface BucketStorageSystemIdentifier { /** * A unique identifier for the system used for storage. - * For Postgres this can be the cluster `system_identifier`. + * For Postgres this can be the cluster `system_identifier` and database name. * For MongoDB this can be the replica set name. */ id: string; From f076c444d76a0db3cba96d2e6940304cd155a0d7 Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 30 Jan 2025 15:46:53 +0200 Subject: [PATCH 19/20] acknowledge more often when out of tx --- .../src/replication/WalStream.ts | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 1111219b2..04123bb90 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -720,18 +720,22 @@ WHERE oid = $1::regclass`, } } - if (!inTx && (assumeKeepAlive || keepAliveDetected)) { - // Reset the detection flag. - keepAliveDetected = false; - - // In a transaction, we ack and commit according to the transaction progress. - // Outside transactions, we use the PrimaryKeepalive messages to advance progress. - // Big caveat: This _must not_ be used to skip individual messages, since this LSN - // may be in the middle of the next transaction. - // It must only be used to associate checkpoints with LSNs. - if (await batch.keepalive(chunkLastLsn)) { - await this.ack(chunkLastLsn, replicationStream); + if (!inTx) { + if (assumeKeepAlive || keepAliveDetected) { + // Reset the detection flag. + keepAliveDetected = false; + + // In a transaction, we ack and commit according to the transaction progress. + // Outside transactions, we use the PrimaryKeepalive messages to advance progress. + // Big caveat: This _must not_ be used to skip individual messages, since this LSN + // may be in the middle of the next transaction. + // It must only be used to associate checkpoints with LSNs. + await batch.keepalive(chunkLastLsn); } + + // We receive chunks with empty messages often (about each second). + // Acknowledging here progresses the slot past these and frees up resources. + await this.ack(chunkLastLsn, replicationStream); } Metrics.getInstance().chunks_replicated_total.add(1); From 834b990d18fc5506e0f3cc63a4999b4ba317c56e Mon Sep 17 00:00:00 2001 From: stevensJourney Date: Thu, 30 Jan 2025 16:11:03 +0200 Subject: [PATCH 20/20] share keepalive statement for write checkpoints --- .../src/api/PostgresRouteAPIAdapter.ts | 4 +-- .../src/replication/WalStream.ts | 33 ++++++++++--------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index fed6ce8c8..755b3fad7 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -6,7 +6,7 @@ import * as sync_rules from '@powersync/service-sync-rules'; import * as service_types from '@powersync/service-types'; import * as replication_utils from '../replication/replication-utils.js'; import { getDebugTableInfo } from '../replication/replication-utils.js'; -import { PUBLICATION_NAME } from '../replication/WalStream.js'; +import { KEEPALIVE_STATEMENT, PUBLICATION_NAME } from '../replication/WalStream.js'; import * as types from '../types/types.js'; export class PostgresRouteAPIAdapter implements api.RouteAPI { @@ -244,7 +244,7 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`, const { results } = await lib_postgres.retriedQuery( this.pool, { statement: `SELECT pg_current_wal_lsn() as lsn` }, - { statement: `SELECT pg_logical_emit_message(false, 'powersync', 'ping')` } + KEEPALIVE_STATEMENT ); // Specifically use the lsn from the first statement, not the second one. diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 04123bb90..0950efa3c 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -17,13 +17,6 @@ import { PgManager } from './PgManager.js'; import { getPgOutputRelation, getRelId } from './PgRelation.js'; import { checkSourceConfiguration, getReplicationIdentityColumns } from './replication-utils.js'; -export const ZERO_LSN = '00000000/00000000'; -export const PUBLICATION_NAME = 'powersync'; -export const POSTGRES_DEFAULT_SCHEMA = 'public'; - -export const KEEPALIVE_CONTENT = 'ping'; -export const KEEPALIVE_BUFFER = Buffer.from(KEEPALIVE_CONTENT); - export interface WalStreamOptions { connections: PgManager; storage: storage.SyncRulesBucketStorage; @@ -37,6 +30,22 @@ interface InitResult { needsNewSlot: boolean; } +export const ZERO_LSN = '00000000/00000000'; +export const PUBLICATION_NAME = 'powersync'; +export const POSTGRES_DEFAULT_SCHEMA = 'public'; + +export const KEEPALIVE_CONTENT = 'ping'; +export const KEEPALIVE_BUFFER = Buffer.from(KEEPALIVE_CONTENT); +export const KEEPALIVE_STATEMENT: pgwire.Statement = { + statement: /* sql */ ` + SELECT + * + FROM + pg_logical_emit_message(FALSE, 'powersync', $1) + `, + params: [{ type: 'varchar', value: KEEPALIVE_CONTENT }] +} as const; + export const isKeepAliveMessage = (msg: pgwire.PgoutputMessage) => { return ( msg.tag == 'message' && @@ -47,15 +56,7 @@ export const isKeepAliveMessage = (msg: pgwire.PgoutputMessage) => { }; export const sendKeepAlive = async (db: pgwire.PgClient) => { - await lib_postgres.retriedQuery(db, { - statement: /* sql */ ` - SELECT - * - FROM - pg_logical_emit_message(FALSE, 'powersync', $1) - `, - params: [{ type: 'varchar', value: KEEPALIVE_CONTENT }] - }); + await lib_postgres.retriedQuery(db, KEEPALIVE_STATEMENT); }; export class MissingReplicationSlotError extends Error {