diff --git a/.changeset/pretty-countries-cover.md b/.changeset/pretty-countries-cover.md new file mode 100644 index 000000000..7a660a19a --- /dev/null +++ b/.changeset/pretty-countries-cover.md @@ -0,0 +1,6 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-postgres': minor +--- + +Allowed using the same Postgres server for the replication source and sync bucket storage. This is only supported on Postgres versions newer than 14.0. diff --git a/.changeset/smart-chairs-smoke.md b/.changeset/smart-chairs-smoke.md new file mode 100644 index 000000000..9bb8642eb --- /dev/null +++ b/.changeset/smart-chairs-smoke.md @@ -0,0 +1,7 @@ +--- +'@powersync/service-module-postgres-storage': minor +'@powersync/service-module-mongodb-storage': minor +'@powersync/service-core': minor +--- + +Added the ability to skip creating empty sync checkpoints if no changes were present in a batch. diff --git a/.changeset/strong-tables-rescue.md b/.changeset/strong-tables-rescue.md new file mode 100644 index 000000000..b2a0c15a8 --- /dev/null +++ b/.changeset/strong-tables-rescue.md @@ -0,0 +1,5 @@ +--- +'@powersync/service-module-postgres-storage': patch +--- + +Fix bug where listening to active checkpoint notifications on an ended connection could cause a crash. diff --git a/libs/lib-postgres/src/db/connection/ConnectionSlot.ts b/libs/lib-postgres/src/db/connection/ConnectionSlot.ts index b2b543b6c..ac04832e1 100644 --- a/libs/lib-postgres/src/db/connection/ConnectionSlot.ts +++ b/libs/lib-postgres/src/db/connection/ConnectionSlot.ts @@ -50,9 +50,12 @@ export class ConnectionSlot extends framework.DisposableObserver l.connectionCreated?.(connection)); - if (this.hasNotificationListener()) { - await this.configureConnectionNotifications(connection); - } + + /** + * Configure the Postgres connection to listen to notifications. + * Subscribing to notifications, even without a registered listener, should not add much overhead. + */ + await this.configureConnectionNotifications(connection); return connection; } @@ -64,11 +67,6 @@ export class ConnectionSlot extends framework.DisposableObserver): () => void { - const dispose = super.registerListener(listener); - if (this.connection && this.hasNotificationListener()) { - this.configureConnectionNotifications(this.connection); - } - return () => { - dispose(); - if (this.connection && !this.hasNotificationListener()) { - this.connection.onnotification = () => {}; - } - }; - } - protected handleNotification = (payload: pgwire.PgNotification) => { if (!this.options.notificationChannels?.includes(payload.channel)) { return; diff --git a/libs/lib-postgres/src/db/connection/DatabaseClient.ts b/libs/lib-postgres/src/db/connection/DatabaseClient.ts index 1fbd048c9..9896a4eec 100644 --- a/libs/lib-postgres/src/db/connection/DatabaseClient.ts +++ b/libs/lib-postgres/src/db/connection/DatabaseClient.ts @@ -32,7 +32,8 @@ export const TRANSACTION_CONNECTION_COUNT = 5; export class DatabaseClient extends AbstractPostgresConnection { closed: boolean; - protected pool: pgwire.PgClient; + pool: pgwire.PgClient; + protected connections: ConnectionSlot[]; protected initialized: Promise; @@ -42,8 +43,10 @@ export class DatabaseClient extends AbstractPostgresConnection { - const slot = new ConnectionSlot({ config: options.config, notificationChannels: options.notificationChannels }); + this.connections = Array.from({ length: TRANSACTION_CONNECTION_COUNT }, (v, index) => { + // Only listen to notifications on a single (the first) connection + const notificationChannels = index == 0 ? options.notificationChannels : []; + const slot = new ConnectionSlot({ config: options.config, notificationChannels }); slot.registerListener({ connectionAvailable: () => this.processConnectionQueue(), connectionError: (ex) => this.handleConnectionError(ex), diff --git a/libs/lib-postgres/src/utils/identifier-utils.ts b/libs/lib-postgres/src/utils/identifier-utils.ts new file mode 100644 index 000000000..7e4c387e2 --- /dev/null +++ b/libs/lib-postgres/src/utils/identifier-utils.ts @@ -0,0 +1,38 @@ +import * as pgwire from '@powersync/service-jpgwire'; +import { retriedQuery } from './pgwire_utils.js'; + +export interface DecodedPostgresIdentifier { + server_id: string; + database_name: string; +} + +export const decodePostgresSystemIdentifier = (identifier: string): DecodedPostgresIdentifier => { + const [server_id, database_name] = identifier.split('.'); + return { server_id, database_name }; +}; + +export const encodePostgresSystemIdentifier = (decoded: DecodedPostgresIdentifier): string => { + return `${decoded.server_id}.${decoded.database_name}`; +}; + +export const queryPostgresSystemIdentifier = async ( + connection: pgwire.PgClient +): Promise => { + const result = pgwire.pgwireRows( + await retriedQuery( + connection, + /* sql */ ` + SELECT + current_database() AS database_name, + system_identifier + FROM + pg_control_system(); + ` + ) + ) as Array<{ database_name: string; system_identifier: bigint }>; + + return { + database_name: result[0].database_name, + server_id: result[0].system_identifier.toString() + }; +}; diff --git a/libs/lib-postgres/src/utils/utils-index.ts b/libs/lib-postgres/src/utils/utils-index.ts index 5bfe85120..3d6f38a57 100644 --- a/libs/lib-postgres/src/utils/utils-index.ts +++ b/libs/lib-postgres/src/utils/utils-index.ts @@ -1 +1,2 @@ +export * from './identifier-utils.js'; export * from './pgwire_utils.js'; diff --git a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts index 908872cc3..ada8922eb 100644 --- a/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts +++ b/modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts @@ -84,6 +84,23 @@ export class MongoBucketStorage return storage; } + async getSystemIdentifier(): Promise { + const { setName: id } = await this.db.db.command({ + hello: 1 + }); + if (id == null) { + throw new ServiceError( + ErrorCode.PSYNC_S1342, + 'Standalone MongoDB instances are not supported - use a replicaset.' + ); + } + + return { + id, + type: lib_mongo.MONGO_CONNECTION_TYPE + }; + } + async configureSyncRules(sync_rules: string, options?: { lock?: boolean }) { const next = await this.getNextSyncRulesContent(); const active = await this.getActiveSyncRulesContent(); diff --git a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts index 01d5577a1..2b329f233 100644 --- a/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts +++ b/modules/module-mongodb-storage/src/storage/implementation/MongoBucketBatch.ts @@ -9,7 +9,6 @@ import { errors, logger, ReplicationAssertionError, - ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework'; import { SaveOperationTag, storage, utils } from '@powersync/service-core'; @@ -616,7 +615,9 @@ export class MongoBucketBatch private lastWaitingLogThottled = 0; - async commit(lsn: string): Promise { + async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise { + const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options }; + await this.flush(); if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) { @@ -654,6 +655,10 @@ export class MongoBucketBatch return false; } + if (!createEmptyCheckpoints && this.persisted_op == null) { + return false; + } + const now = new Date(); const update: Partial = { last_checkpoint_lsn: lsn, diff --git a/modules/module-postgres-storage/README.md b/modules/module-postgres-storage/README.md index f53bc947e..8eb112b5a 100644 --- a/modules/module-postgres-storage/README.md +++ b/modules/module-postgres-storage/README.md @@ -31,7 +31,13 @@ storage: ``` **IMPORTANT**: -A separate Postgres server is currently required for replication connections (if using Postgres for replication) and storage. Using the same server might cause unexpected results. + +Separate Postgres servers are required for replication connections **if using PostgreSQL versions below 14**. + +| PostgreSQL Version | Server configuration | +| ------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ | +| Below 14 | Separate servers are required for the source and sync bucket storage. Replication will be blocked if the same server is detected. | +| 14 and above | The source database and sync bucket storage database can be on the same server. Using the same database (with separate schemas) is supported but may lead to higher CPU usage. Using separate servers remains an option. | ### Connection credentials diff --git a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts index e2e6c4540..d78a0a5de 100644 --- a/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts +++ b/modules/module-postgres-storage/src/storage/PostgresBucketStorageFactory.ts @@ -157,6 +157,17 @@ export class PostgresBucketStorageFactory return newInstanceRow!.id; } + async getSystemIdentifier(): Promise { + const id = lib_postgres.utils.encodePostgresSystemIdentifier( + await lib_postgres.utils.queryPostgresSystemIdentifier(this.db.pool) + ); + + return { + id, + type: lib_postgres.POSTGRES_CONNECTION_TYPE + }; + } + // TODO possibly share implementation in abstract class async configureSyncRules( sync_rules: string, diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts index 1ee75697a..60c9d24dd 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresBucketBatch.ts @@ -89,7 +89,7 @@ export class PostgresBucketBatch async save(record: storage.SaveOptions): Promise { // TODO maybe share with abstract class - const { after, afterReplicaId, before, beforeReplicaId, sourceTable, tag } = record; + const { after, before, sourceTable, tag } = record; for (const event of this.getTableEvents(sourceTable)) { this.iterateListeners((cb) => cb.replicationEvent?.({ @@ -245,7 +245,10 @@ export class PostgresBucketBatch private async flushInner(): Promise { const batch = this.batch; - if (batch == null) { + // Don't flush empty batches + // This helps prevent feedback loops when using the same database for + // the source data and sync bucket storage + if (batch == null || batch.length == 0) { return null; } @@ -275,7 +278,9 @@ export class PostgresBucketBatch return { flushed_op: String(lastOp) }; } - async commit(lsn: string): Promise { + async commit(lsn: string, options?: storage.BucketBatchCommitOptions): Promise { + const { createEmptyCheckpoints } = { ...storage.DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS, ...options }; + await this.flush(); if (this.last_checkpoint_lsn != null && lsn < this.last_checkpoint_lsn) { @@ -309,6 +314,12 @@ export class PostgresBucketBatch return false; } + + // Don't create a checkpoint if there were no changes + if (!createEmptyCheckpoints && this.persisted_op == null) { + return false; + } + const now = new Date().toISOString(); const update: Partial = { last_checkpoint_lsn: lsn, @@ -488,7 +499,7 @@ export class PostgresBucketBatch jsonb_array_elements(${{ type: 'jsonb', value: sizeLookups }}::jsonb) AS FILTER ) SELECT - pg_column_size(c.data) AS data_size, + octet_length(c.data) AS data_size, c.source_table, c.source_key FROM @@ -529,23 +540,20 @@ export class PostgresBucketBatch const current_data_lookup = new Map(); for await (const currentDataRows of db.streamRows({ statement: /* sql */ ` - WITH - filter_data AS ( - SELECT - decode(FILTER ->> 'source_key', 'hex') AS source_key, -- Decoding from hex to bytea - (FILTER ->> 'source_table') AS source_table_id - FROM - jsonb_array_elements($1::jsonb) AS FILTER - ) SELECT - --- With skipExistingRows, we only need to know whether or not the row exists. ${this.options.skip_existing_rows ? `c.source_table, c.source_key` : 'c.*'} FROM current_data c - JOIN filter_data f ON c.source_table = f.source_table_id + JOIN ( + SELECT + decode(FILTER ->> 'source_key', 'hex') AS source_key, + FILTER ->> 'source_table' AS source_table_id + FROM + jsonb_array_elements($1::jsonb) AS FILTER + ) f ON c.source_table = f.source_table_id AND c.source_key = f.source_key WHERE - c.group_id = $2 + c.group_id = $2; `, params: [ { @@ -553,7 +561,7 @@ export class PostgresBucketBatch value: lookups }, { - type: 'int8', + type: 'int4', value: this.group_id } ] @@ -610,7 +618,12 @@ export class PostgresBucketBatch await persistedBatch.flush(db); } } - return resumeBatch; + + // Don't return empty batches + if (resumeBatch?.batch.length) { + return resumeBatch; + } + return null; } protected async saveOperation( diff --git a/modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts b/modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts index 8dcb7eb0d..759a2b909 100644 --- a/modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts +++ b/modules/module-postgres-storage/src/storage/batch/PostgresPersistedBatch.ts @@ -256,33 +256,6 @@ export class PostgresPersistedBatch { protected async flushBucketData(db: lib_postgres.WrappedConnection) { if (this.bucketDataInserts.length > 0) { await db.sql` - WITH - parsed_data AS ( - SELECT - group_id, - bucket_name, - source_table, - decode(source_key, 'hex') AS source_key, -- Decode hex to bytea - table_name, - op, - row_id, - checksum, - data, - target_op - FROM - jsonb_to_recordset(${{ type: 'jsonb', value: this.bucketDataInserts }}::jsonb) AS t ( - group_id integer, - bucket_name text, - source_table text, - source_key text, -- Input as hex string - table_name text, - op text, - row_id text, - checksum bigint, - data text, - target_op bigint - ) - ) INSERT INTO bucket_data ( group_id, @@ -303,14 +276,25 @@ export class PostgresPersistedBatch { nextval('op_id_sequence'), op, source_table, - source_key, -- Already decoded + decode(source_key, 'hex') AS source_key, table_name, row_id, checksum, data, target_op FROM - parsed_data; + json_to_recordset(${{ type: 'json', value: this.bucketDataInserts }}::json) AS t ( + group_id integer, + bucket_name text, + source_table text, + source_key text, -- Input as hex string + table_name text, + op text, + row_id text, + checksum bigint, + data text, + target_op bigint + ); `.execute(); } } @@ -318,23 +302,6 @@ export class PostgresPersistedBatch { protected async flushParameterData(db: lib_postgres.WrappedConnection) { if (this.parameterDataInserts.length > 0) { await db.sql` - WITH - parsed_data AS ( - SELECT - group_id, - source_table, - decode(source_key, 'hex') AS source_key, -- Decode hex to bytea - decode(lookup, 'hex') AS lookup, -- Decode hex to bytea - bucket_parameters - FROM - jsonb_to_recordset(${{ type: 'jsonb', value: this.parameterDataInserts }}::jsonb) AS t ( - group_id integer, - source_table text, - source_key text, -- Input as hex string - lookup text, -- Input as hex string - bucket_parameters text -- Input as stringified JSON - ) - ) INSERT INTO bucket_parameters ( group_id, @@ -346,11 +313,17 @@ export class PostgresPersistedBatch { SELECT group_id, source_table, - source_key, -- Already decoded - lookup, -- Already decoded + decode(source_key, 'hex') AS source_key, -- Decode hex to bytea + decode(lookup, 'hex') AS lookup, -- Decode hex to bytea bucket_parameters FROM - parsed_data; + json_to_recordset(${{ type: 'json', value: this.parameterDataInserts }}::json) AS t ( + group_id integer, + source_table text, + source_key text, -- Input as hex string + lookup text, -- Input as hex string + bucket_parameters text -- Input as stringified JSON + ) `.execute(); } } @@ -358,33 +331,6 @@ export class PostgresPersistedBatch { protected async flushCurrentData(db: lib_postgres.WrappedConnection) { if (this.currentDataInserts.size > 0) { await db.sql` - WITH - parsed_data AS ( - SELECT - group_id, - source_table, - decode(source_key, 'hex') AS source_key, -- Decode hex to bytea - buckets::jsonb AS buckets, - decode(data, 'hex') AS data, -- Decode hex to bytea - ARRAY( - SELECT - decode((value ->> 0)::TEXT, 'hex') - FROM - jsonb_array_elements(lookups::jsonb) AS value - ) AS lookups -- Decode array of hex strings to bytea[] - FROM - jsonb_to_recordset(${{ - type: 'jsonb', - value: Array.from(this.currentDataInserts.values()) - }}::jsonb) AS t ( - group_id integer, - source_table text, - source_key text, -- Input as hex string - buckets text, - data text, -- Input as hex string - lookups text -- Input as stringified JSONB array of hex strings - ) - ) INSERT INTO current_data ( group_id, @@ -397,12 +343,24 @@ export class PostgresPersistedBatch { SELECT group_id, source_table, - source_key, -- Already decoded - buckets, - data, -- Already decoded - lookups -- Already decoded + decode(source_key, 'hex') AS source_key, -- Decode hex to bytea + buckets::jsonb AS buckets, + decode(data, 'hex') AS data, -- Decode hex to bytea + array( + SELECT + decode(element, 'hex') + FROM + unnest(lookups) AS element + ) AS lookups FROM - parsed_data + json_to_recordset(${{ type: 'json', value: Array.from(this.currentDataInserts.values()) }}::json) AS t ( + group_id integer, + source_table text, + source_key text, -- Input as hex string + buckets text, + data text, -- Input as hex string + lookups TEXT[] -- Input as stringified JSONB array of hex strings + ) ON CONFLICT (group_id, source_table, source_key) DO UPDATE SET buckets = EXCLUDED.buckets, diff --git a/modules/module-postgres/package.json b/modules/module-postgres/package.json index e5209d8f7..1d6fcdd04 100644 --- a/modules/module-postgres/package.json +++ b/modules/module-postgres/package.json @@ -28,23 +28,25 @@ } }, "dependencies": { - "@powersync/lib-services-framework": "workspace:*", "@powersync/lib-service-postgres": "workspace:*", + "@powersync/lib-services-framework": "workspace:*", "@powersync/service-core": "workspace:*", "@powersync/service-jpgwire": "workspace:*", "@powersync/service-jsonbig": "workspace:*", "@powersync/service-sync-rules": "workspace:*", "@powersync/service-types": "workspace:*", - "pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87", "jose": "^4.15.1", + "pgwire": "github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87", + "semver": "^7.5.4", "ts-codec": "^1.3.0", - "uuid": "^9.0.1", - "uri-js": "^4.4.1" + "uri-js": "^4.4.1", + "uuid": "^9.0.1" }, "devDependencies": { - "@types/uuid": "^9.0.4", "@powersync/service-core-tests": "workspace:*", "@powersync/service-module-mongodb-storage": "workspace:*", - "@powersync/service-module-postgres-storage": "workspace:*" + "@powersync/service-module-postgres-storage": "workspace:*", + "@types/semver": "^7.5.4", + "@types/uuid": "^9.0.4" } } diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index fed6ce8c8..755b3fad7 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -6,7 +6,7 @@ import * as sync_rules from '@powersync/service-sync-rules'; import * as service_types from '@powersync/service-types'; import * as replication_utils from '../replication/replication-utils.js'; import { getDebugTableInfo } from '../replication/replication-utils.js'; -import { PUBLICATION_NAME } from '../replication/WalStream.js'; +import { KEEPALIVE_STATEMENT, PUBLICATION_NAME } from '../replication/WalStream.js'; import * as types from '../types/types.js'; export class PostgresRouteAPIAdapter implements api.RouteAPI { @@ -244,7 +244,7 @@ FROM pg_replication_slots WHERE slot_name = $1 LIMIT 1;`, const { results } = await lib_postgres.retriedQuery( this.pool, { statement: `SELECT pg_current_wal_lsn() as lsn` }, - { statement: `SELECT pg_logical_emit_message(false, 'powersync', 'ping')` } + KEEPALIVE_STATEMENT ); // Specifically use the lsn from the first statement, not the second one. diff --git a/modules/module-postgres/src/replication/PgManager.ts b/modules/module-postgres/src/replication/PgManager.ts index 586542d52..d85902d49 100644 --- a/modules/module-postgres/src/replication/PgManager.ts +++ b/modules/module-postgres/src/replication/PgManager.ts @@ -1,4 +1,5 @@ import * as pgwire from '@powersync/service-jpgwire'; +import semver from 'semver'; import { NormalizedPostgresConnectionConfig } from '../types/types.js'; /** @@ -35,6 +36,15 @@ export class PgManager { return await p; } + /** + * @returns The Postgres server version in a parsed Semver instance + */ + async getServerVersion(): Promise { + const result = await this.pool.query(`SHOW server_version;`); + // The result is usually of the form "16.2 (Debian 16.2-1.pgdg120+2)" + return semver.coerce(result.rows[0][0].split(' ')[0]); + } + /** * Create a new standard connection, used for initial snapshot. * diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 4e26aca31..0950efa3c 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -1,6 +1,8 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import { container, + DatabaseConnectionError, + ErrorCode, errors, logger, ReplicationAbortedError, @@ -15,10 +17,6 @@ import { PgManager } from './PgManager.js'; import { getPgOutputRelation, getRelId } from './PgRelation.js'; import { checkSourceConfiguration, getReplicationIdentityColumns } from './replication-utils.js'; -export const ZERO_LSN = '00000000/00000000'; -export const PUBLICATION_NAME = 'powersync'; -export const POSTGRES_DEFAULT_SCHEMA = 'public'; - export interface WalStreamOptions { connections: PgManager; storage: storage.SyncRulesBucketStorage; @@ -32,6 +30,35 @@ interface InitResult { needsNewSlot: boolean; } +export const ZERO_LSN = '00000000/00000000'; +export const PUBLICATION_NAME = 'powersync'; +export const POSTGRES_DEFAULT_SCHEMA = 'public'; + +export const KEEPALIVE_CONTENT = 'ping'; +export const KEEPALIVE_BUFFER = Buffer.from(KEEPALIVE_CONTENT); +export const KEEPALIVE_STATEMENT: pgwire.Statement = { + statement: /* sql */ ` + SELECT + * + FROM + pg_logical_emit_message(FALSE, 'powersync', $1) + `, + params: [{ type: 'varchar', value: KEEPALIVE_CONTENT }] +} as const; + +export const isKeepAliveMessage = (msg: pgwire.PgoutputMessage) => { + return ( + msg.tag == 'message' && + msg.prefix == 'powersync' && + msg.content && + Buffer.from(msg.content).equals(KEEPALIVE_BUFFER) + ); +}; + +export const sendKeepAlive = async (db: pgwire.PgClient) => { + await lib_postgres.retriedQuery(db, KEEPALIVE_STATEMENT); +}; + export class MissingReplicationSlotError extends Error { constructor(message: string) { super(message); @@ -71,10 +98,7 @@ export class WalStream { // Ping to speed up cancellation of streaming replication // We're not using pg_snapshot here, since it could be in the middle of // an initial replication transaction. - const promise = lib_postgres.retriedQuery( - this.connections.pool, - `SELECT * FROM pg_logical_emit_message(false, 'powersync', 'ping')` - ); + const promise = sendKeepAlive(this.connections.pool); promise.catch((e) => { // Failures here are okay - this only speeds up stopping the process. logger.warn('Failed to ping connection', e); @@ -180,6 +204,7 @@ export class WalStream { async initSlot(): Promise { await checkSourceConfiguration(this.connections.pool, PUBLICATION_NAME); + await this.ensureStorageCompatibility(); const slotName = this.slot_name; @@ -382,6 +407,15 @@ WHERE oid = $1::regclass`, await batch.commit(ZERO_LSN); } ); + /** + * Send a keepalive message after initial replication. + * In some edge cases we wait for a keepalive after the initial snapshot. + * If we don't explicitly check the contents of keepalive messages then a keepalive is detected + * rather quickly after initial replication - perhaps due to other WAL events. + * If we do explicitly check the contents of messages, we need an actual keepalive payload in order + * to advance the active sync rules LSN. + */ + await sendKeepAlive(db); } static *getQueryData(results: Iterable): Generator { @@ -599,13 +633,33 @@ WHERE oid = $1::regclass`, async streamChanges(replicationConnection: pgwire.PgConnection) { // When changing any logic here, check /docs/wal-lsns.md. + const { createEmptyCheckpoints } = await this.ensureStorageCompatibility(); + + const replicationOptions: Record = { + proto_version: '1', + publication_names: PUBLICATION_NAME + }; + + /** + * Viewing the contents of logical messages emitted with `pg_logical_emit_message` + * is only supported on Postgres >= 14.0. + * https://www.postgresql.org/docs/14/protocol-logical-replication.html + */ + const exposesLogicalMessages = await this.checkLogicalMessageSupport(); + if (exposesLogicalMessages) { + /** + * Only add this option if the Postgres server supports it. + * Adding the option to a server that doesn't support it will throw an exception when starting logical replication. + * Error: `unrecognized pgoutput option: messages` + */ + replicationOptions['messages'] = 'true'; + } + const replicationStream = replicationConnection.logicalReplication({ slot: this.slot_name, - options: { - proto_version: '1', - publication_names: PUBLICATION_NAME - } + options: replicationOptions }); + this.startedStreaming = true; // Auto-activate as soon as initial replication is done @@ -628,6 +682,15 @@ WHERE oid = $1::regclass`, // chunkLastLsn may come from normal messages in the chunk, // or from a PrimaryKeepalive message. const { messages, lastLsn: chunkLastLsn } = chunk; + + /** + * We can check if an explicit keepalive was sent if `exposesLogicalMessages == true`. + * If we can't check the logical messages, we should assume a keepalive if we + * receive an empty array of messages in a replication event. + */ + const assumeKeepAlive = !exposesLogicalMessages; + let keepAliveDetected = false; + for (const msg of messages) { if (msg.tag == 'relation') { await this.handleRelation(batch, getPgOutputRelation(msg), true); @@ -636,27 +699,44 @@ WHERE oid = $1::regclass`, } else if (msg.tag == 'commit') { Metrics.getInstance().transactions_replicated_total.add(1); inTx = false; - await batch.commit(msg.lsn!); + await batch.commit(msg.lsn!, { createEmptyCheckpoints }); await this.ack(msg.lsn!, replicationStream); } else { if (count % 100 == 0) { logger.info(`${this.slot_name} replicating op ${count} ${msg.lsn}`); } + /** + * If we can see the contents of logical messages, then we can check if a keepalive + * message is present. We only perform a keepalive (below) if we explicitly detect a keepalive message. + * If we can't see the contents of logical messages, then we should assume a keepalive is required + * due to the default value of `assumeKeepalive`. + */ + if (exposesLogicalMessages && isKeepAliveMessage(msg)) { + keepAliveDetected = true; + } + count += 1; await this.writeChange(batch, msg); } } if (!inTx) { - // In a transaction, we ack and commit according to the transaction progress. - // Outside transactions, we use the PrimaryKeepalive messages to advance progress. - // Big caveat: This _must not_ be used to skip individual messages, since this LSN - // may be in the middle of the next transaction. - // It must only be used to associate checkpoints with LSNs. - if (await batch.keepalive(chunkLastLsn)) { - await this.ack(chunkLastLsn, replicationStream); + if (assumeKeepAlive || keepAliveDetected) { + // Reset the detection flag. + keepAliveDetected = false; + + // In a transaction, we ack and commit according to the transaction progress. + // Outside transactions, we use the PrimaryKeepalive messages to advance progress. + // Big caveat: This _must not_ be used to skip individual messages, since this LSN + // may be in the middle of the next transaction. + // It must only be used to associate checkpoints with LSNs. + await batch.keepalive(chunkLastLsn); } + + // We receive chunks with empty messages often (about each second). + // Acknowledging here progresses the slot past these and frees up resources. + await this.ack(chunkLastLsn, replicationStream); } Metrics.getInstance().chunks_replicated_total.add(1); @@ -672,6 +752,53 @@ WHERE oid = $1::regclass`, replicationStream.ack(lsn); } + + /** + * Ensures that the storage is compatible with the replication connection. + * @throws {DatabaseConnectionError} If the storage is not compatible with the replication connection. + */ + protected async ensureStorageCompatibility(): Promise { + const supportsLogicalMessages = await this.checkLogicalMessageSupport(); + + const storageIdentifier = await this.storage.factory.getSystemIdentifier(); + if (storageIdentifier.type != lib_postgres.POSTGRES_CONNECTION_TYPE) { + return { + // Keep the same behaviour as before allowing Postgres storage. + createEmptyCheckpoints: true + }; + } + + const parsedStorageIdentifier = lib_postgres.utils.decodePostgresSystemIdentifier(storageIdentifier.id); + /** + * Check if the same server is being used for both the sync bucket storage and the logical replication. + */ + const replicationIdentifier = await lib_postgres.utils.queryPostgresSystemIdentifier(this.connections.pool); + + if (!supportsLogicalMessages && replicationIdentifier.server_id == parsedStorageIdentifier.server_id) { + throw new DatabaseConnectionError( + ErrorCode.PSYNC_S1144, + `Separate Postgres servers are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, + new Error('Postgres version is below 14') + ); + } + + return { + /** + * Don't create empty checkpoints if the same Postgres database is used for the data source + * and sync bucket storage. Creating empty checkpoints will cause WAL feedback loops. + */ + createEmptyCheckpoints: replicationIdentifier.database_name != parsedStorageIdentifier.database_name + }; + } + + /** + * Check if the replication connection Postgres server supports + * viewing the contents of logical replication messages. + */ + protected async checkLogicalMessageSupport() { + const version = await this.connections.getServerVersion(); + return version ? version.compareMain('14.0.0') >= 0 : false; + } } async function touch() { diff --git a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts index 40247452a..e7db0e820 100644 --- a/modules/module-postgres/src/replication/WalStreamReplicationJob.ts +++ b/modules/module-postgres/src/replication/WalStreamReplicationJob.ts @@ -1,6 +1,6 @@ import { container } from '@powersync/lib-services-framework'; import { PgManager } from './PgManager.js'; -import { MissingReplicationSlotError, WalStream } from './WalStream.js'; +import { MissingReplicationSlotError, sendKeepAlive, WalStream } from './WalStream.js'; import { replication } from '@powersync/service-core'; import { ConnectionManagerFactory } from './ConnectionManagerFactory.js'; @@ -37,7 +37,7 @@ export class WalStreamReplicationJob extends replication.AbstractReplicationJob */ async keepAlive() { try { - await this.connectionManager.pool.query(`SELECT * FROM pg_logical_emit_message(false, 'powersync', 'ping')`); + await sendKeepAlive(this.connectionManager.pool); } catch (e) { this.logger.warn(`KeepAlive failed, unable to post to WAL`, e); } diff --git a/modules/module-postgres/test/src/storage_combination.test.ts b/modules/module-postgres/test/src/storage_combination.test.ts new file mode 100644 index 000000000..1183d6806 --- /dev/null +++ b/modules/module-postgres/test/src/storage_combination.test.ts @@ -0,0 +1,35 @@ +import * as postgres_storage from '@powersync/service-module-postgres-storage'; +import { describe, expect, test } from 'vitest'; +import { env } from './env.js'; +import { WalStreamTestContext } from './wal_stream_utils.js'; + +describe.skipIf(!env.TEST_POSTGRES_STORAGE)('replication storage combination - postgres', function () { + test('should allow the same Postgres cluster to be used for data and storage', async () => { + // Use the same cluster for the storage as the data source + await using context = await WalStreamTestContext.open( + postgres_storage.PostgresTestStorageFactoryGenerator({ + url: env.PG_TEST_URL + }), + { doNotClear: false } + ); + + await context.updateSyncRules(/* yaml */ + ` bucket_definitions: + global: + data: + - SELECT * FROM "test_data" `); + + const { pool, connectionManager } = context; + + const sourceVersion = await connectionManager.getServerVersion(); + + await pool.query(`CREATE TABLE test_data(id text primary key, description text, other text)`); + + if (sourceVersion!.compareMain('14.0.0') < 0) { + await expect(context.replicateSnapshot()).rejects.toThrow(); + } else { + // Should resolve + await context.replicateSnapshot(); + } + }); +}); diff --git a/packages/service-core/src/storage/BucketStorage.ts b/packages/service-core/src/storage/BucketStorage.ts index ff7b653f5..008370150 100644 --- a/packages/service-core/src/storage/BucketStorage.ts +++ b/packages/service-core/src/storage/BucketStorage.ts @@ -60,6 +60,20 @@ export interface BucketStorageFactoryListener extends DisposableListener { replicationEvent: (event: ReplicationEventPayload) => void; } +export interface BucketStorageSystemIdentifier { + /** + * A unique identifier for the system used for storage. + * For Postgres this can be the cluster `system_identifier` and database name. + * For MongoDB this can be the replica set name. + */ + id: string; + /** + * A unique type for the storage implementation. + * e.g. `mongodb`, `postgresql`. + */ + type: string; +} + export interface BucketStorageFactory extends AsyncDisposableObserverClient { /** * Update sync rules from configuration, if changed. @@ -143,6 +157,11 @@ export interface BucketStorageFactory extends AsyncDisposableObserverClient; + + /** + * Get a unique identifier for the system used for storage. + */ + getSystemIdentifier(): Promise; } export interface ReplicationCheckpoint { @@ -367,6 +386,20 @@ export interface BucketBatchStorageListener extends DisposableListener { replicationEvent: (payload: ReplicationEventPayload) => void; } +export interface BucketBatchCommitOptions { + /** + * Creates a new checkpoint even if there were no persisted operations. + * Defaults to true. + */ + createEmptyCheckpoints?: boolean; +} + +export type ResolvedBucketBatchCommitOptions = Required; + +export const DEFAULT_BUCKET_BATCH_COMMIT_OPTIONS: ResolvedBucketBatchCommitOptions = { + createEmptyCheckpoints: true +}; + export interface BucketStorageBatch extends DisposableObserverClient { /** * Save an op, and potentially flush. @@ -398,11 +431,11 @@ export interface BucketStorageBatch extends DisposableObserverClient; /** - * Flush and commit any saved ops. This creates a new checkpoint. + * Flush and commit any saved ops. This creates a new checkpoint by default. * * Only call this after a transaction. */ - commit(lsn: string): Promise; + commit(lsn: string, options?: BucketBatchCommitOptions): Promise; /** * Advance the checkpoint LSN position, without any associated op. diff --git a/packages/service-errors/src/codes.ts b/packages/service-errors/src/codes.ts index 18901d51b..9458be92c 100644 --- a/packages/service-errors/src/codes.ts +++ b/packages/service-errors/src/codes.ts @@ -158,6 +158,16 @@ export enum ErrorCode { */ PSYNC_S1143 = 'PSYNC_S1143', + /** + * Invalid Postgres server configuration for replication and sync bucket storage. + * + * The same Postgres server, running an unsupported version of Postgres, has been configured for both replication and sync bucket storage. + * Using the same Postgres server is only supported on Postgres 14 and above. + * This error typically indicates that the Postgres version is below 14. + * Either upgrade the Postgres server to version 14 or above, or use a different Postgres server for sync bucket storage. + */ + PSYNC_S1144 = 'PSYNC_S1144', + // ## PSYNC_S12xx: MySQL replication issues // ## PSYNC_S13xx: MongoDB replication issues diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 5ea006fb8..b941e6be0 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -331,6 +331,9 @@ importers: pgwire: specifier: github:kagis/pgwire#f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87 version: https://codeload.github.com/kagis/pgwire/tar.gz/f1cb95f9a0f42a612bb5a6b67bb2eb793fc5fc87 + semver: + specifier: ^7.5.4 + version: 7.6.2 ts-codec: specifier: ^1.3.0 version: 1.3.0 @@ -350,6 +353,9 @@ importers: '@powersync/service-module-postgres-storage': specifier: workspace:* version: link:../module-postgres-storage + '@types/semver': + specifier: ^7.5.4 + version: 7.5.8 '@types/uuid': specifier: ^9.0.4 version: 9.0.8 @@ -1625,6 +1631,7 @@ packages: acorn-import-assertions@1.9.0: resolution: {integrity: sha512-cmMwop9x+8KFhxvKrKfPYmN6/pKTYYHBqLa0DfvVZcKMJWNyWLnaqND7dx/qn66R7ewM1UX5XMaDVP5wlVTaVA==} + deprecated: package has been renamed to acorn-import-attributes peerDependencies: acorn: ^8