diff --git a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts index 6275676b..9746df10 100644 --- a/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts +++ b/modules/module-postgres/src/api/PostgresRouteAPIAdapter.ts @@ -314,7 +314,8 @@ LEFT JOIN ( AND NOT a.attisdropped AND has_column_privilege(tbl.quoted_name, a.attname, 'SELECT, INSERT, UPDATE, REFERENCES') ) -GROUP BY schemaname, tablename, quoted_name` +GROUP BY schemaname, tablename, quoted_name +ORDER BY schemaname, tablename;` ); await this.typeCache.fetchTypesForSchema(); const rows = pgwire.pgwireRows(results); diff --git a/modules/module-postgres/src/replication/PostgresSnapshotter.ts b/modules/module-postgres/src/replication/PostgresSnapshotter.ts new file mode 100644 index 00000000..bb446bda --- /dev/null +++ b/modules/module-postgres/src/replication/PostgresSnapshotter.ts @@ -0,0 +1,673 @@ +import { + container, + logger as defaultLogger, + Logger, + ReplicationAbortedError, + ReplicationAssertionError +} from '@powersync/lib-services-framework'; +import { + getUuidReplicaIdentityBson, + MetricsEngine, + RelationCache, + SourceEntityDescriptor, + SourceTable, + storage +} from '@powersync/service-core'; +import * as pgwire from '@powersync/service-jpgwire'; +import { + DatabaseInputRow, + SqliteInputRow, + SqliteRow, + SqlSyncRules, + TablePattern, + toSyncRulesRow +} from '@powersync/service-sync-rules'; + +import { ReplicationMetric } from '@powersync/service-types'; +import { PgManager } from './PgManager.js'; +import { + checkSourceConfiguration, + checkTableRls, + ensureStorageCompatibility, + getReplicationIdentityColumns +} from './replication-utils.js'; +import { + ChunkedSnapshotQuery, + IdSnapshotQuery, + PrimaryKeyValue, + SimpleSnapshotQuery, + SnapshotQuery +} from './SnapshotQuery.js'; +import { + MissingReplicationSlotError, + POSTGRES_DEFAULT_SCHEMA, + PUBLICATION_NAME, + sendKeepAlive, + WalStreamOptions, + ZERO_LSN +} from './WalStream.js'; +import * as timers from 'node:timers/promises'; +import pDefer, { DeferredPromise } from 'p-defer'; + +interface InitResult { + /** True if initial snapshot is not yet done. */ + needsInitialSync: boolean; + /** True if snapshot must be started from scratch with a new slot. */ + needsNewSlot: boolean; +} + +export class PostgresSnapshotter { + sync_rules: SqlSyncRules; + group_id: number; + + connection_id = 1; + + private logger: Logger; + + private readonly storage: storage.SyncRulesBucketStorage; + private readonly metrics: MetricsEngine; + private readonly slot_name: string; + + private connections: PgManager; + + private abortSignal: AbortSignal; + + private snapshotChunkLength: number; + + private relationCache = new RelationCache((relation: number | SourceTable) => { + if (typeof relation == 'number') { + return relation; + } + return relation.objectId!; + }); + + private queue = new Set(); + private initialSnapshotDone = pDefer(); + + constructor(options: WalStreamOptions) { + this.logger = options.logger ?? defaultLogger; + this.storage = options.storage; + this.metrics = options.metrics; + this.sync_rules = options.storage.getParsedSyncRules({ defaultSchema: POSTGRES_DEFAULT_SCHEMA }); + this.group_id = options.storage.group_id; + this.slot_name = options.storage.slot_name; + this.connections = options.connections; + this.snapshotChunkLength = options.snapshotChunkLength ?? 10_000; + + this.abortSignal = options.abort_signal; + } + + async getQualifiedTableNames( + batch: storage.BucketStorageBatch, + db: pgwire.PgConnection, + tablePattern: TablePattern + ): Promise { + const schema = tablePattern.schema; + if (tablePattern.connectionTag != this.connections.connectionTag) { + return []; + } + + let tableRows: any[]; + const prefix = tablePattern.isWildcard ? tablePattern.tablePrefix : undefined; + + { + let query = ` + SELECT + c.oid AS relid, + c.relname AS table_name, + (SELECT + json_agg(DISTINCT a.atttypid) + FROM pg_attribute a + WHERE a.attnum > 0 AND NOT a.attisdropped AND a.attrelid = c.oid) + AS column_types + FROM pg_class c + JOIN pg_namespace n ON n.oid = c.relnamespace + WHERE n.nspname = $1 + AND c.relkind = 'r'`; + + if (tablePattern.isWildcard) { + query += ' AND c.relname LIKE $2'; + } else { + query += ' AND c.relname = $2'; + } + + const result = await db.query({ + statement: query, + params: [ + { type: 'varchar', value: schema }, + { type: 'varchar', value: tablePattern.tablePattern } + ] + }); + + tableRows = pgwire.pgwireRows(result); + } + + let result: storage.SourceTable[] = []; + + for (let row of tableRows) { + const name = row.table_name as string; + if (typeof row.relid != 'bigint') { + throw new ReplicationAssertionError(`Missing relid for ${name}`); + } + const relid = Number(row.relid as bigint); + + if (prefix && !name.startsWith(prefix)) { + continue; + } + + const rs = await db.query({ + statement: `SELECT 1 FROM pg_publication_tables WHERE pubname = $1 AND schemaname = $2 AND tablename = $3`, + params: [ + { type: 'varchar', value: PUBLICATION_NAME }, + { type: 'varchar', value: tablePattern.schema }, + { type: 'varchar', value: name } + ] + }); + if (rs.rows.length == 0) { + this.logger.info(`Skipping ${tablePattern.schema}.${name} - not part of ${PUBLICATION_NAME} publication`); + continue; + } + + try { + const result = await checkTableRls(db, relid); + if (!result.canRead) { + // We log the message, then continue anyway, since the check does not cover all cases. + this.logger.warn(result.message!); + } + } catch (e) { + // It's possible that we just don't have permission to access pg_roles - log the error and continue. + this.logger.warn(`Could not check RLS access for ${tablePattern.schema}.${name}`, e); + } + + const cresult = await getReplicationIdentityColumns(db, relid); + + const columnTypes = (JSON.parse(row.column_types) as string[]).map((e) => Number(e)); + const table = await this.handleRelation({ + batch, + descriptor: { + name, + schema, + objectId: relid, + replicaIdColumns: cresult.replicationColumns + } as SourceEntityDescriptor, + referencedTypeIds: columnTypes + }); + + result.push(table); + } + return result; + } + + async checkSlot(): Promise { + await checkSourceConfiguration(this.connections.pool, PUBLICATION_NAME); + await ensureStorageCompatibility(this.connections.pool, this.storage.factory); + + const slotName = this.slot_name; + + const status = await this.storage.getStatus(); + const snapshotDone = status.snapshot_done && status.checkpoint_lsn != null; + if (snapshotDone) { + // Snapshot is done, but we still need to check the replication slot status + this.logger.info(`Initial replication already done`); + } + + // Check if replication slot exists + const slot = pgwire.pgwireRows( + await this.connections.pool.query({ + // We specifically want wal_status and invalidation_reason, but it's not available on older versions, + // so we just query *. + statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1', + params: [{ type: 'varchar', value: slotName }] + }) + )[0]; + + // Previously we also used pg_catalog.pg_logical_slot_peek_binary_changes to confirm that we can query the slot. + // However, there were some edge cases where the query times out, repeating the query, ultimately + // causing high load on the source database and never recovering automatically. + // We now instead jump straight to replication if the wal_status is not "lost", rather detecting those + // errors during streaming replication, which is a little more robust. + + // We can have: + // 1. needsInitialSync: true, lost slot -> MissingReplicationSlotError (starts new sync rules version). + // Theoretically we could handle this the same as (2). + // 2. needsInitialSync: true, no slot -> create new slot + // 3. needsInitialSync: true, valid slot -> resume initial sync + // 4. needsInitialSync: false, lost slot -> MissingReplicationSlotError (starts new sync rules version) + // 5. needsInitialSync: false, no slot -> MissingReplicationSlotError (starts new sync rules version) + // 6. needsInitialSync: false, valid slot -> resume streaming replication + // The main advantage of MissingReplicationSlotError are: + // 1. If there was a complete snapshot already (cases 4/5), users can still sync from that snapshot while + // we do the reprocessing under a new slot name. + // 2. If there was a partial snapshot (case 1), we can start with the new slot faster by not waiting for + // the partial data to be cleared. + if (slot != null) { + // This checks that the slot is still valid + + // wal_status is present in postgres 13+ + // invalidation_reason is present in postgres 17+ + const lost = slot.wal_status == 'lost'; + if (lost) { + // Case 1 / 4 + throw new MissingReplicationSlotError( + `Replication slot ${slotName} is not valid anymore. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}` + ); + } + // Case 3 / 6 + return { + needsInitialSync: !snapshotDone, + needsNewSlot: false + }; + } else { + if (snapshotDone) { + // Case 5 + // This will create a new slot, while keeping the current sync rules active + throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`); + } + // Case 2 + // This will clear data (if any) and re-create the same slot + return { needsInitialSync: true, needsNewSlot: true }; + } + } + + async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise { + const results = await db.query({ + statement: `SELECT reltuples::bigint AS estimate + FROM pg_class + WHERE oid = $1::regclass`, + params: [{ value: table.qualifiedName, type: 'varchar' }] + }); + const row = results.rows[0]; + if ((row?.[0] ?? -1n) == -1n) { + return -1; + } else { + return Number(row[0]); + } + } + + public async setupSlot(db: pgwire.PgConnection, status: InitResult) { + // If anything here errors, the entire replication process is aborted, + // and all connections are closed, including this one. + const slotName = this.slot_name; + + if (status.needsNewSlot) { + // This happens when there is no existing replication slot, or if the + // existing one is unhealthy. + // In those cases, we have to start replication from scratch. + // If there is an existing healthy slot, we can skip this and continue + // initial replication where we left off. + await this.storage.clear({ signal: this.abortSignal }); + + await db.query({ + statement: 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = $1', + params: [{ type: 'varchar', value: slotName }] + }); + + // We use the replication connection here, not a pool. + // The replication slot must be created before we start snapshotting tables. + const initReplicationConnection = await this.connections.replicationConnection(); + try { + await initReplicationConnection.query(`CREATE_REPLICATION_SLOT ${slotName} LOGICAL pgoutput`); + } finally { + await initReplicationConnection.end(); + } + + this.logger.info(`Created replication slot ${slotName}`); + } + } + + async replicateTable(table: SourceTable) { + const db = await this.connections.snapshotConnection(); + try { + const flushResults = await this.storage.startBatch( + { + logger: this.logger, + zeroLSN: ZERO_LSN, + defaultSchema: POSTGRES_DEFAULT_SCHEMA, + storeCurrentData: true, + skipExistingRows: true + }, + async (batch) => { + await this.snapshotTableInTx(batch, db, table); + // This commit ensures we set keepalive_op. + // It may be better if that is automatically set when flushing. + await batch.commit(ZERO_LSN); + } + ); + this.logger.info(`Flushed snapshot at ${flushResults?.flushed_op}`); + } finally { + await db.end(); + } + } + + async waitForInitialSnapshot() { + await this.initialSnapshotDone.promise; + } + + async replicationLoop() { + try { + if (this.queue.size == 0) { + // Special case where we start with no tables to snapshot + await this.markSnapshotDone(); + } + while (!this.abortSignal.aborted) { + const table = this.queue.values().next().value; + if (table == null) { + this.initialSnapshotDone.resolve(); + await timers.setTimeout(500, { signal: this.abortSignal }); + continue; + } + + await this.replicateTable(table); + this.queue.delete(table); + if (this.queue.size == 0) { + await this.markSnapshotDone(); + } + } + throw new ReplicationAbortedError(`Replication loop aborted`, this.abortSignal.reason); + } catch (e) { + // If initial snapshot already completed, this has no effect + this.initialSnapshotDone.reject(e); + throw e; + } + } + + private async markSnapshotDone() { + const db = await this.connections.snapshotConnection(); + await using _ = { [Symbol.asyncDispose]: () => db.end() }; + + const flushResults = await this.storage.startBatch( + { + logger: this.logger, + zeroLSN: ZERO_LSN, + defaultSchema: POSTGRES_DEFAULT_SCHEMA, + storeCurrentData: true, + skipExistingRows: true + }, + async (batch) => { + const rs = await db.query(`select pg_current_wal_lsn() as lsn`); + const globalLsnNotBefore = rs.rows[0][0]; + await batch.markAllSnapshotDone(globalLsnNotBefore); + } + ); + /** + * Send a keepalive message after initial replication. + * In some edge cases we wait for a keepalive after the initial snapshot. + * If we don't explicitly check the contents of keepalive messages then a keepalive is detected + * rather quickly after initial replication - perhaps due to other WAL events. + * If we do explicitly check the contents of messages, we need an actual keepalive payload in order + * to advance the active sync rules LSN. + */ + await sendKeepAlive(db); + + const lastOp = flushResults?.flushed_op; + if (lastOp != null) { + // Populate the cache _after_ initial replication, but _before_ we switch to this sync rules. + // TODO: only run this after initial replication, not after each table. + await this.storage.populatePersistentChecksumCache({ + // No checkpoint yet, but we do have the opId. + maxOpId: lastOp, + signal: this.abortSignal + }); + } + } + + /** + * Start initial replication. + * + * If (partial) replication was done before on this slot, this clears the state + * and starts again from scratch. + */ + async queueSnapshotTables(db: pgwire.PgConnection) { + const sourceTables = this.sync_rules.getSourceTables(); + + await this.storage.startBatch( + { + logger: this.logger, + zeroLSN: ZERO_LSN, + defaultSchema: POSTGRES_DEFAULT_SCHEMA, + storeCurrentData: true, + skipExistingRows: true + }, + async (batch) => { + for (let tablePattern of sourceTables) { + const tables = await this.getQualifiedTableNames(batch, db, tablePattern); + // Pre-get counts + for (let table of tables) { + if (table.snapshotComplete) { + this.logger.info(`Skipping ${table.qualifiedName} - snapshot already done`); + continue; + } + const count = await this.estimatedCountNumber(db, table); + table = await batch.updateTableProgress(table, { totalEstimatedCount: count }); + this.relationCache.update(table); + + this.logger.info(`To replicate: ${table.qualifiedName} ${table.formatSnapshotProgress()}`); + + this.queue.add(table); + } + } + } + ); + } + + static *getQueryData(results: Iterable): Generator { + for (let row of results) { + yield toSyncRulesRow(row); + } + } + + public async queueSnapshot(batch: storage.BucketStorageBatch, table: storage.SourceTable) { + await batch.markTableSnapshotRequired(table); + this.queue.add(table); + } + + public async snapshotTableInTx( + batch: storage.BucketStorageBatch, + db: pgwire.PgConnection, + table: storage.SourceTable, + limited?: PrimaryKeyValue[] + ): Promise { + // Note: We use the default "Read Committed" isolation level here, not snapshot isolation. + // The data may change during the transaction, but that is compensated for in the streaming + // replication afterwards. + await db.query('BEGIN'); + try { + let tableLsnNotBefore: string; + await this.snapshotTable(batch, db, table, limited); + + // Get the current LSN. + // The data will only be consistent once incremental replication has passed that point. + // We have to get this LSN _after_ we have finished the table snapshot. + // + // There are basically two relevant LSNs here: + // A: The LSN before the snapshot starts. We don't explicitly record this on the PowerSync side, + // but it is implicitly recorded in the replication slot. + // B: The LSN after the table snapshot is complete, which is what we get here. + // When we do the snapshot queries, the data that we get back for each chunk could match the state + // anywhere between A and B. To actually have a consistent state on our side, we need to: + // 1. Complete the snapshot. + // 2. Wait until logical replication has caught up with all the change between A and B. + // Calling `markSnapshotDone(LSN B)` covers that. + const rs = await db.query(`select pg_current_wal_lsn() as lsn`); + tableLsnNotBefore = rs.rows[0][0]; + // Side note: A ROLLBACK would probably also be fine here, since we only read in this transaction. + await db.query('COMMIT'); + this.logger.info(`Snapshot complete for table ${table.qualifiedName}, resume at ${tableLsnNotBefore}`); + const [resultTable] = await batch.markTableSnapshotDone([table], tableLsnNotBefore); + this.relationCache.update(resultTable); + return resultTable; + } catch (e) { + await db.query('ROLLBACK'); + throw e; + } + } + + private async snapshotTable( + batch: storage.BucketStorageBatch, + db: pgwire.PgConnection, + table: storage.SourceTable, + limited?: PrimaryKeyValue[] + ) { + let totalEstimatedCount = table.snapshotStatus?.totalEstimatedCount; + let at = table.snapshotStatus?.replicatedCount ?? 0; + let lastCountTime = 0; + let q: SnapshotQuery; + // We do streaming on two levels: + // 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time. + // 2. Fine level: Stream chunks from each fetch call. + if (limited) { + q = new IdSnapshotQuery(db, table, limited); + } else if (ChunkedSnapshotQuery.supports(table)) { + // Single primary key - we can use the primary key for chunking + const orderByKey = table.replicaIdColumns[0]; + q = new ChunkedSnapshotQuery(db, table, this.snapshotChunkLength, table.snapshotStatus?.lastKey ?? null); + if (table.snapshotStatus?.lastKey != null) { + this.logger.info( + `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming from ${orderByKey.name} > ${(q as ChunkedSnapshotQuery).lastKey}` + ); + } else { + this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resumable`); + } + } else { + // Fallback case - query the entire table + this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - not resumable`); + q = new SimpleSnapshotQuery(db, table, this.snapshotChunkLength); + at = 0; + } + await q.initialize(); + + let columns: { i: number; name: string }[] = []; + let columnMap: Record = {}; + let hasRemainingData = true; + while (hasRemainingData) { + // Fetch 10k at a time. + // The balance here is between latency overhead per FETCH call, + // and not spending too much time on each FETCH call. + // We aim for a couple of seconds on each FETCH call. + const cursor = q.nextChunk(); + hasRemainingData = false; + // pgwire streams rows in chunks. + // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. + // There are typically 100-200 rows per chunk. + for await (let chunk of cursor) { + if (chunk.tag == 'RowDescription') { + // We get a RowDescription for each FETCH call, but they should + // all be the same. + let i = 0; + columns = chunk.payload.map((c) => { + return { i: i++, name: c.name }; + }); + for (let column of chunk.payload) { + columnMap[column.name] = column.typeOid; + } + continue; + } + + const rows = chunk.rows.map((row) => { + let q: DatabaseInputRow = {}; + for (let c of columns) { + q[c.name] = row[c.i]; + } + return q; + }); + if (rows.length > 0) { + hasRemainingData = true; + } + + for (const inputRecord of PostgresSnapshotter.getQueryData(rows)) { + const record = this.syncRulesRecord(this.connections.types.constructRowRecord(columnMap, inputRecord)); + // This auto-flushes when the batch reaches its size limit + await batch.save({ + tag: storage.SaveOperationTag.INSERT, + sourceTable: table, + before: undefined, + beforeReplicaId: undefined, + after: record, + afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) + }); + } + + at += rows.length; + this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(rows.length); + + this.touch(); + } + + // Important: flush before marking progress + await batch.flush(); + if (limited == null) { + let lastKey: Uint8Array | undefined; + if (q instanceof ChunkedSnapshotQuery) { + lastKey = q.getLastKeySerialized(); + } + if (lastCountTime < performance.now() - 10 * 60 * 1000) { + // Even though we're doing the snapshot inside a transaction, the transaction uses + // the default "Read Committed" isolation level. This means we can get new data + // within the transaction, so we re-estimate the count every 10 minutes when replicating + // large tables. + totalEstimatedCount = await this.estimatedCountNumber(db, table); + lastCountTime = performance.now(); + } + table = await batch.updateTableProgress(table, { + lastKey: lastKey, + replicatedCount: at, + totalEstimatedCount: totalEstimatedCount + }); + this.relationCache.update(table); + + this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`); + } else { + this.logger.info(`Replicating ${table.qualifiedName} ${at}/${limited.length} for resnapshot`); + } + + if (this.abortSignal.aborted) { + // We only abort after flushing + throw new ReplicationAbortedError(`Table snapshot interrupted`, this.abortSignal.reason); + } + } + } + + async handleRelation(options: { + batch: storage.BucketStorageBatch; + descriptor: SourceEntityDescriptor; + referencedTypeIds: number[]; + }) { + const { batch, descriptor, referencedTypeIds } = options; + + if (!descriptor.objectId && typeof descriptor.objectId != 'number') { + throw new ReplicationAssertionError(`objectId expected, got ${typeof descriptor.objectId}`); + } + const result = await this.storage.resolveTable({ + group_id: this.group_id, + connection_id: this.connection_id, + connection_tag: this.connections.connectionTag, + entity_descriptor: descriptor, + sync_rules: this.sync_rules + }); + this.relationCache.update(result.table); + + // Drop conflicting tables. This includes for example renamed tables. + await batch.drop(result.dropTables); + + // Ensure we have a description for custom types referenced in the table. + await this.connections.types.fetchTypes(referencedTypeIds); + + return result.table; + } + + private touch() { + container.probes.touch().catch((e) => { + this.logger.error(`Error touching probe`, e); + }); + } + + private syncRulesRecord(row: SqliteInputRow): SqliteRow; + private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined; + + private syncRulesRecord(row: SqliteInputRow | undefined): SqliteRow | undefined { + if (row == null) { + return undefined; + } + return this.sync_rules.applyRowContext(row); + } +} diff --git a/modules/module-postgres/src/replication/WalStream.ts b/modules/module-postgres/src/replication/WalStream.ts index 84da6200..9e3426d6 100644 --- a/modules/module-postgres/src/replication/WalStream.ts +++ b/modules/module-postgres/src/replication/WalStream.ts @@ -1,9 +1,7 @@ import * as lib_postgres from '@powersync/lib-service-postgres'; import { container, - DatabaseConnectionError, logger as defaultLogger, - ErrorCode, Logger, ReplicationAbortedError, ReplicationAssertionError @@ -22,28 +20,19 @@ import * as pgwire from '@powersync/service-jpgwire'; import { applyValueContext, CompatibilityContext, - DatabaseInputRow, SqliteInputRow, SqliteInputValue, SqliteRow, SqlSyncRules, - TablePattern, - ToastableSqliteRow, - toSyncRulesRow + ToastableSqliteRow } from '@powersync/service-sync-rules'; import { ReplicationMetric } from '@powersync/service-types'; import { PgManager } from './PgManager.js'; import { getPgOutputRelation, getRelId, referencedColumnTypeIds } from './PgRelation.js'; -import { checkSourceConfiguration, checkTableRls, getReplicationIdentityColumns } from './replication-utils.js'; -import { - ChunkedSnapshotQuery, - IdSnapshotQuery, - MissingRow, - PrimaryKeyValue, - SimpleSnapshotQuery, - SnapshotQuery -} from './SnapshotQuery.js'; +import { PostgresSnapshotter } from './PostgresSnapshotter.js'; +import { ensureStorageCompatibility } from './replication-utils.js'; +import { IdSnapshotQuery, MissingRow, PrimaryKeyValue } from './SnapshotQuery.js'; export interface WalStreamOptions { logger?: Logger; @@ -62,13 +51,6 @@ export interface WalStreamOptions { snapshotChunkLength?: number; } -interface InitResult { - /** True if initial snapshot is not yet done. */ - needsInitialSync: boolean; - /** True if snapshot must be started from scratch with a new slot. */ - needsNewSlot: boolean; -} - export const ZERO_LSN = '00000000/00000000'; export const PUBLICATION_NAME = 'powersync'; export const POSTGRES_DEFAULT_SCHEMA = 'public'; @@ -120,7 +102,11 @@ export class WalStream { private connections: PgManager; - private abort_signal: AbortSignal; + private abortController = new AbortController(); + private abortSignal: AbortSignal = this.abortController.signal; + + private initPromise: Promise | null = null; + private snapshotter: PostgresSnapshotter; private relationCache = new RelationCache((relation: number | SourceTable) => { if (typeof relation == 'number') { @@ -131,8 +117,6 @@ export class WalStream { private startedStreaming = false; - private snapshotChunkLength: number; - /** * Time of the oldest uncommitted change, according to the source db. * This is used to determine the replication lag. @@ -144,9 +128,7 @@ export class WalStream { */ private isStartingReplication = true; - private initialSnapshotPromise: Promise | null = null; - - constructor(options: WalStreamOptions) { + constructor(private options: WalStreamOptions) { this.logger = options.logger ?? defaultLogger; this.storage = options.storage; this.metrics = options.metrics; @@ -154,10 +136,17 @@ export class WalStream { this.group_id = options.storage.group_id; this.slot_name = options.storage.slot_name; this.connections = options.connections; - this.snapshotChunkLength = options.snapshotChunkLength ?? 10_000; - this.abort_signal = options.abort_signal; - this.abort_signal.addEventListener( + // We wrap in our own abort controller so we can trigger abort internally. + options.abort_signal.addEventListener('abort', () => { + this.abortController.abort(options.abort_signal.reason); + }); + if (options.abort_signal.aborted) { + this.abortController.abort(options.abort_signal.reason); + } + + this.snapshotter = new PostgresSnapshotter({ ...options, abort_signal: this.abortSignal }); + this.abortSignal.addEventListener( 'abort', () => { if (this.startedStreaming) { @@ -179,470 +168,7 @@ export class WalStream { } get stopped() { - return this.abort_signal.aborted; - } - - async getQualifiedTableNames( - batch: storage.BucketStorageBatch, - db: pgwire.PgConnection, - tablePattern: TablePattern - ): Promise { - const schema = tablePattern.schema; - if (tablePattern.connectionTag != this.connections.connectionTag) { - return []; - } - - let tableRows: any[]; - const prefix = tablePattern.isWildcard ? tablePattern.tablePrefix : undefined; - - { - let query = ` - SELECT - c.oid AS relid, - c.relname AS table_name, - (SELECT - json_agg(DISTINCT a.atttypid) - FROM pg_attribute a - WHERE a.attnum > 0 AND NOT a.attisdropped AND a.attrelid = c.oid) - AS column_types - FROM pg_class c - JOIN pg_namespace n ON n.oid = c.relnamespace - WHERE n.nspname = $1 - AND c.relkind = 'r'`; - - if (tablePattern.isWildcard) { - query += ' AND c.relname LIKE $2'; - } else { - query += ' AND c.relname = $2'; - } - - const result = await db.query({ - statement: query, - params: [ - { type: 'varchar', value: schema }, - { type: 'varchar', value: tablePattern.tablePattern } - ] - }); - - tableRows = pgwire.pgwireRows(result); - } - - let result: storage.SourceTable[] = []; - - for (let row of tableRows) { - const name = row.table_name as string; - if (typeof row.relid != 'bigint') { - throw new ReplicationAssertionError(`Missing relid for ${name}`); - } - const relid = Number(row.relid as bigint); - - if (prefix && !name.startsWith(prefix)) { - continue; - } - - const rs = await db.query({ - statement: `SELECT 1 FROM pg_publication_tables WHERE pubname = $1 AND schemaname = $2 AND tablename = $3`, - params: [ - { type: 'varchar', value: PUBLICATION_NAME }, - { type: 'varchar', value: tablePattern.schema }, - { type: 'varchar', value: name } - ] - }); - if (rs.rows.length == 0) { - this.logger.info(`Skipping ${tablePattern.schema}.${name} - not part of ${PUBLICATION_NAME} publication`); - continue; - } - - try { - const result = await checkTableRls(db, relid); - if (!result.canRead) { - // We log the message, then continue anyway, since the check does not cover all cases. - this.logger.warn(result.message!); - } - } catch (e) { - // It's possible that we just don't have permission to access pg_roles - log the error and continue. - this.logger.warn(`Could not check RLS access for ${tablePattern.schema}.${name}`, e); - } - - const cresult = await getReplicationIdentityColumns(db, relid); - - const columnTypes = (JSON.parse(row.column_types) as string[]).map((e) => Number(e)); - const table = await this.handleRelation({ - batch, - descriptor: { - name, - schema, - objectId: relid, - replicaIdColumns: cresult.replicationColumns - } as SourceEntityDescriptor, - snapshot: false, - referencedTypeIds: columnTypes - }); - - result.push(table); - } - return result; - } - - async initSlot(): Promise { - await checkSourceConfiguration(this.connections.pool, PUBLICATION_NAME); - await this.ensureStorageCompatibility(); - - const slotName = this.slot_name; - - const status = await this.storage.getStatus(); - const snapshotDone = status.snapshot_done && status.checkpoint_lsn != null; - if (snapshotDone) { - // Snapshot is done, but we still need to check the replication slot status - this.logger.info(`Initial replication already done`); - } - - // Check if replication slot exists - const slot = pgwire.pgwireRows( - await this.connections.pool.query({ - // We specifically want wal_status and invalidation_reason, but it's not available on older versions, - // so we just query *. - statement: 'SELECT * FROM pg_replication_slots WHERE slot_name = $1', - params: [{ type: 'varchar', value: slotName }] - }) - )[0]; - - // Previously we also used pg_catalog.pg_logical_slot_peek_binary_changes to confirm that we can query the slot. - // However, there were some edge cases where the query times out, repeating the query, ultimately - // causing high load on the source database and never recovering automatically. - // We now instead jump straight to replication if the wal_status is not "lost", rather detecting those - // errors during streaming replication, which is a little more robust. - - // We can have: - // 1. needsInitialSync: true, lost slot -> MissingReplicationSlotError (starts new sync rules version). - // Theoretically we could handle this the same as (2). - // 2. needsInitialSync: true, no slot -> create new slot - // 3. needsInitialSync: true, valid slot -> resume initial sync - // 4. needsInitialSync: false, lost slot -> MissingReplicationSlotError (starts new sync rules version) - // 5. needsInitialSync: false, no slot -> MissingReplicationSlotError (starts new sync rules version) - // 6. needsInitialSync: false, valid slot -> resume streaming replication - // The main advantage of MissingReplicationSlotError are: - // 1. If there was a complete snapshot already (cases 4/5), users can still sync from that snapshot while - // we do the reprocessing under a new slot name. - // 2. If there was a partial snapshot (case 1), we can start with the new slot faster by not waiting for - // the partial data to be cleared. - if (slot != null) { - // This checks that the slot is still valid - - // wal_status is present in postgres 13+ - // invalidation_reason is present in postgres 17+ - const lost = slot.wal_status == 'lost'; - if (lost) { - // Case 1 / 4 - throw new MissingReplicationSlotError( - `Replication slot ${slotName} is not valid anymore. invalidation_reason: ${slot.invalidation_reason ?? 'unknown'}` - ); - } - // Case 3 / 6 - return { - needsInitialSync: !snapshotDone, - needsNewSlot: false - }; - } else { - if (snapshotDone) { - // Case 5 - // This will create a new slot, while keeping the current sync rules active - throw new MissingReplicationSlotError(`Replication slot ${slotName} is missing`); - } - // Case 2 - // This will clear data (if any) and re-create the same slot - return { needsInitialSync: true, needsNewSlot: true }; - } - } - - async estimatedCountNumber(db: pgwire.PgConnection, table: storage.SourceTable): Promise { - const results = await db.query({ - statement: `SELECT reltuples::bigint AS estimate -FROM pg_class -WHERE oid = $1::regclass`, - params: [{ value: table.qualifiedName, type: 'varchar' }] - }); - const row = results.rows[0]; - if ((row?.[0] ?? -1n) == -1n) { - return -1; - } else { - return Number(row[0]); - } - } - - /** - * Start initial replication. - * - * If (partial) replication was done before on this slot, this clears the state - * and starts again from scratch. - */ - async startInitialReplication(replicationConnection: pgwire.PgConnection, status: InitResult) { - // If anything here errors, the entire replication process is aborted, - // and all connections are closed, including this one. - const db = await this.connections.snapshotConnection(); - - const slotName = this.slot_name; - - if (status.needsNewSlot) { - // This happens when there is no existing replication slot, or if the - // existing one is unhealthy. - // In those cases, we have to start replication from scratch. - // If there is an existing healthy slot, we can skip this and continue - // initial replication where we left off. - await this.storage.clear({ signal: this.abort_signal }); - - await db.query({ - statement: 'SELECT pg_drop_replication_slot(slot_name) FROM pg_replication_slots WHERE slot_name = $1', - params: [{ type: 'varchar', value: slotName }] - }); - - // We use the replication connection here, not a pool. - // The replication slot must be created before we start snapshotting tables. - await replicationConnection.query(`CREATE_REPLICATION_SLOT ${slotName} LOGICAL pgoutput`); - - this.logger.info(`Created replication slot ${slotName}`); - } - - await this.initialReplication(db); - } - - async initialReplication(db: pgwire.PgConnection) { - const sourceTables = this.sync_rules.getSourceTables(); - const flushResults = await this.storage.startBatch( - { - logger: this.logger, - zeroLSN: ZERO_LSN, - defaultSchema: POSTGRES_DEFAULT_SCHEMA, - storeCurrentData: true, - skipExistingRows: true - }, - async (batch) => { - let tablesWithStatus: SourceTable[] = []; - for (let tablePattern of sourceTables) { - const tables = await this.getQualifiedTableNames(batch, db, tablePattern); - // Pre-get counts - for (let table of tables) { - if (table.snapshotComplete) { - this.logger.info(`Skipping ${table.qualifiedName} - snapshot already done`); - continue; - } - const count = await this.estimatedCountNumber(db, table); - table = await batch.updateTableProgress(table, { totalEstimatedCount: count }); - this.relationCache.update(table); - tablesWithStatus.push(table); - - this.logger.info(`To replicate: ${table.qualifiedName} ${table.formatSnapshotProgress()}`); - } - } - - for (let table of tablesWithStatus) { - await this.snapshotTableInTx(batch, db, table); - this.touch(); - } - - // Always commit the initial snapshot at zero. - // This makes sure we don't skip any changes applied before starting this snapshot, - // in the case of snapshot retries. - // We could alternatively commit at the replication slot LSN. - - // Get the current LSN for hte snapshot. - // We could also use the LSN from the last table snapshto. - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - const noCommitBefore = rs.rows[0][0]; - - await batch.markAllSnapshotDone(noCommitBefore); - await batch.commit(ZERO_LSN); - } - ); - /** - * Send a keepalive message after initial replication. - * In some edge cases we wait for a keepalive after the initial snapshot. - * If we don't explicitly check the contents of keepalive messages then a keepalive is detected - * rather quickly after initial replication - perhaps due to other WAL events. - * If we do explicitly check the contents of messages, we need an actual keepalive payload in order - * to advance the active sync rules LSN. - */ - await sendKeepAlive(db); - - const lastOp = flushResults?.flushed_op; - if (lastOp != null) { - // Populate the cache _after_ initial replication, but _before_ we switch to this sync rules. - await this.storage.populatePersistentChecksumCache({ - // No checkpoint yet, but we do have the opId. - maxOpId: lastOp, - signal: this.abort_signal - }); - } - } - - static *getQueryData(results: Iterable): Generator { - for (let row of results) { - yield toSyncRulesRow(row); - } - } - private async snapshotTableInTx( - batch: storage.BucketStorageBatch, - db: pgwire.PgConnection, - table: storage.SourceTable, - limited?: PrimaryKeyValue[] - ): Promise { - // Note: We use the default "Read Committed" isolation level here, not snapshot isolation. - // The data may change during the transaction, but that is compensated for in the streaming - // replication afterwards. - await db.query('BEGIN'); - try { - await this.snapshotTable(batch, db, table, limited); - - // Get the current LSN. - // The data will only be consistent once incremental replication has passed that point. - // We have to get this LSN _after_ we have finished the table snapshot. - // - // There are basically two relevant LSNs here: - // A: The LSN before the snapshot starts. We don't explicitly record this on the PowerSync side, - // but it is implicitly recorded in the replication slot. - // B: The LSN after the table snapshot is complete, which is what we get here. - // When we do the snapshot queries, the data that we get back for each chunk could match the state - // anywhere between A and B. To actually have a consistent state on our side, we need to: - // 1. Complete the snapshot. - // 2. Wait until logical replication has caught up with all the change between A and B. - // Calling `markSnapshotDone(LSN B)` covers that. - const rs = await db.query(`select pg_current_wal_lsn() as lsn`); - const tableLsnNotBefore = rs.rows[0][0]; - - // Side note: A ROLLBACK would probably also be fine here, since we only read in this transaction. - await db.query('COMMIT'); - const [resultTable] = await batch.markTableSnapshotDone([table], tableLsnNotBefore); - this.relationCache.update(resultTable); - return resultTable; - } catch (e) { - await db.query('ROLLBACK'); - throw e; - } - } - - private async snapshotTable( - batch: storage.BucketStorageBatch, - db: pgwire.PgConnection, - table: storage.SourceTable, - limited?: PrimaryKeyValue[] - ) { - let totalEstimatedCount = table.snapshotStatus?.totalEstimatedCount; - let at = table.snapshotStatus?.replicatedCount ?? 0; - let lastCountTime = 0; - let q: SnapshotQuery; - // We do streaming on two levels: - // 1. Coarse level: DELCARE CURSOR, FETCH 10000 at a time. - // 2. Fine level: Stream chunks from each fetch call. - if (limited) { - q = new IdSnapshotQuery(db, table, limited); - } else if (ChunkedSnapshotQuery.supports(table)) { - // Single primary key - we can use the primary key for chunking - const orderByKey = table.replicaIdColumns[0]; - q = new ChunkedSnapshotQuery(db, table, this.snapshotChunkLength, table.snapshotStatus?.lastKey ?? null); - if (table.snapshotStatus?.lastKey != null) { - this.logger.info( - `Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resuming from ${orderByKey.name} > ${(q as ChunkedSnapshotQuery).lastKey}` - ); - } else { - this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - resumable`); - } - } else { - // Fallback case - query the entire table - this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()} - not resumable`); - q = new SimpleSnapshotQuery(db, table, this.snapshotChunkLength); - at = 0; - } - await q.initialize(); - - let columns: { i: number; name: string }[] = []; - let columnMap: Record = {}; - let hasRemainingData = true; - while (hasRemainingData) { - // Fetch 10k at a time. - // The balance here is between latency overhead per FETCH call, - // and not spending too much time on each FETCH call. - // We aim for a couple of seconds on each FETCH call. - const cursor = q.nextChunk(); - hasRemainingData = false; - // pgwire streams rows in chunks. - // These chunks can be quite small (as little as 16KB), so we don't flush chunks automatically. - // There are typically 100-200 rows per chunk. - for await (let chunk of cursor) { - if (chunk.tag == 'RowDescription') { - // We get a RowDescription for each FETCH call, but they should - // all be the same. - let i = 0; - columns = chunk.payload.map((c) => { - return { i: i++, name: c.name }; - }); - for (let column of chunk.payload) { - columnMap[column.name] = column.typeOid; - } - continue; - } - - const rows = chunk.rows.map((row) => { - let q: DatabaseInputRow = {}; - for (let c of columns) { - q[c.name] = row[c.i]; - } - return q; - }); - if (rows.length > 0) { - hasRemainingData = true; - } - - for (const inputRecord of WalStream.getQueryData(rows)) { - const record = this.syncRulesRecord(this.connections.types.constructRowRecord(columnMap, inputRecord)); - // This auto-flushes when the batch reaches its size limit - await batch.save({ - tag: storage.SaveOperationTag.INSERT, - sourceTable: table, - before: undefined, - beforeReplicaId: undefined, - after: record, - afterReplicaId: getUuidReplicaIdentityBson(record, table.replicaIdColumns) - }); - } - - at += rows.length; - this.metrics.getCounter(ReplicationMetric.ROWS_REPLICATED).add(rows.length); - - this.touch(); - } - - // Important: flush before marking progress - await batch.flush(); - if (limited == null) { - let lastKey: Uint8Array | undefined; - if (q instanceof ChunkedSnapshotQuery) { - lastKey = q.getLastKeySerialized(); - } - if (lastCountTime < performance.now() - 10 * 60 * 1000) { - // Even though we're doing the snapshot inside a transaction, the transaction uses - // the default "Read Committed" isolation level. This means we can get new data - // within the transaction, so we re-estimate the count every 10 minutes when replicating - // large tables. - totalEstimatedCount = await this.estimatedCountNumber(db, table); - lastCountTime = performance.now(); - } - table = await batch.updateTableProgress(table, { - lastKey: lastKey, - replicatedCount: at, - totalEstimatedCount: totalEstimatedCount - }); - this.relationCache.update(table); - - this.logger.info(`Replicating ${table.qualifiedName} ${table.formatSnapshotProgress()}`); - } else { - this.logger.info(`Replicating ${table.qualifiedName} ${at}/${limited.length} for resnapshot`); - } - - if (this.abort_signal.aborted) { - // We only abort after flushing - throw new ReplicationAbortedError(`Initial replication interrupted`); - } - } + return this.abortSignal.aborted; } async handleRelation(options: { @@ -666,7 +192,10 @@ WHERE oid = $1::regclass`, this.relationCache.update(result.table); // Drop conflicting tables. This includes for example renamed tables. - await batch.drop(result.dropTables); + if (result.dropTables.length > 0) { + this.logger.info(`Dropping conflicting tables: ${result.dropTables.map((t) => t.qualifiedName).join(', ')}`); + await batch.drop(result.dropTables); + } // Ensure we have a description for custom types referenced in the table. await this.connections.types.fetchTypes(referencedTypeIds); @@ -678,22 +207,8 @@ WHERE oid = $1::regclass`, const shouldSnapshot = snapshot && !result.table.snapshotComplete && result.table.syncAny; if (shouldSnapshot) { - // Truncate this table, in case a previous snapshot was interrupted. - await batch.truncate([result.table]); - - // Start the snapshot inside a transaction. - // We use a dedicated connection for this. - const db = await this.connections.snapshotConnection(); - try { - const table = await this.snapshotTableInTx(batch, db, result.table); - // After the table snapshot, we wait for replication to catch up. - // To make sure there is actually something to replicate, we send a keepalive - // message. - await sendKeepAlive(db); - return table; - } finally { - await db.end(); - } + this.logger.info(`Queuing snapshot for new table ${result.table.qualifiedName}`); + await this.snapshotter.queueSnapshot(batch, result.table); } return result.table; @@ -720,7 +235,7 @@ WHERE oid = $1::regclass`, try { for (let rows of byTable.values()) { const table = rows[0].table; - await this.snapshotTableInTx( + await this.snapshotter.snapshotTableInTx( batch, db, table, @@ -822,73 +337,116 @@ WHERE oid = $1::regclass`, return null; } + /** + * Start replication loop, and continue until aborted or error. + */ async replicate() { + let streamPromise: Promise | null = null; + let loopPromise: Promise | null = null; try { - // If anything errors here, the entire replication process is halted, and - // all connections automatically closed, including this one. - this.initialSnapshotPromise = (async () => { - const initReplicationConnection = await this.connections.replicationConnection(); - await this.initReplication(initReplicationConnection); - await initReplicationConnection.end(); - })(); - - await this.initialSnapshotPromise; - - // At this point, the above connection has often timed out, so we start a new one - const streamReplicationConnection = await this.connections.replicationConnection(); - await this.streamChanges(streamReplicationConnection); - await streamReplicationConnection.end(); + this.initPromise = this.initReplication(); + await this.initPromise; + // These Promises are both expected to run until aborted or error. + streamPromise = this.streamChanges() + .then(() => { + throw new ReplicationAssertionError(`Replication stream exited unexpectedly`); + }) + .catch((e) => { + this.abortController.abort(e); + throw e; + }); + loopPromise = this.snapshotter + .replicationLoop() + .then(() => { + throw new ReplicationAssertionError(`Replication snapshotter exited unexpectedly`); + }) + .catch((e) => { + this.abortController.abort(e); + throw e; + }); + const results = await Promise.allSettled([loopPromise, streamPromise]); + // First, prioritize non-aborted errors + for (let result of results) { + if (result.status == 'rejected' && !(result.reason instanceof ReplicationAbortedError)) { + throw result.reason; + } + } + // Then include aborted errors + for (let result of results) { + if (result.status == 'rejected') { + throw result.reason; + } + } + + // If we get here, both Promises completed successfully, which is unexpected. + throw new ReplicationAssertionError(`Replication loop exited unexpectedly`); } catch (e) { await this.storage.reportError(e); throw e; + } finally { + // Just to make sure + this.abortController.abort(); } } /** - * After calling replicate(), call this to wait for the initial snapshot to complete. - * - * For tests only. + * For tests: Wait until the initial snapshot is complete. */ - async waitForInitialSnapshot() { - if (this.initialSnapshotPromise == null) { - throw new ReplicationAssertionError(`Initial snapshot not started yet`); + public async waitForInitialSnapshot() { + if (this.initPromise == null) { + throw new ReplicationAssertionError('replicate() must be called before waitForInitialSnapshot()'); } - return this.initialSnapshotPromise; + await this.initPromise; + + await this.snapshotter.waitForInitialSnapshot(); } - async initReplication(replicationConnection: pgwire.PgConnection) { - const result = await this.initSlot(); - if (result.needsInitialSync) { - await this.startInitialReplication(replicationConnection, result); + /** + * Initialize replication. + * Start replication loop, and continue until aborted, error or initial snapshot completed. + */ + private async initReplication() { + const result = await this.snapshotter.checkSlot(); + const db = await this.connections.snapshotConnection(); + try { + await this.snapshotter.setupSlot(db, result); + if (result.needsInitialSync) { + await this.snapshotter.queueSnapshotTables(db); + } + } finally { + await db.end(); } } - async streamChanges(replicationConnection: pgwire.PgConnection) { + private async streamChanges() { + const streamReplicationConnection = await this.connections.replicationConnection(); try { - await this.streamChangesInternal(replicationConnection); + await this.streamChangesInternal(streamReplicationConnection); } catch (e) { if (isReplicationSlotInvalidError(e)) { throw new MissingReplicationSlotError(e.message, e); } throw e; + } finally { + await streamReplicationConnection.end(); } } private async streamChangesInternal(replicationConnection: pgwire.PgConnection) { // When changing any logic here, check /docs/wal-lsns.md. - const { createEmptyCheckpoints } = await this.ensureStorageCompatibility(); + + // Viewing the contents of logical messages emitted with `pg_logical_emit_message` + // is only supported on Postgres >= 14.0. + // https://www.postgresql.org/docs/14/protocol-logical-replication.html + const { createEmptyCheckpoints, exposesLogicalMessages } = await ensureStorageCompatibility( + this.connections.pool, + this.storage.factory + ); const replicationOptions: Record = { proto_version: '1', publication_names: PUBLICATION_NAME }; - - /** - * Viewing the contents of logical messages emitted with `pg_logical_emit_message` - * is only supported on Postgres >= 14.0. - * https://www.postgresql.org/docs/14/protocol-logical-replication.html - */ - const exposesLogicalMessages = await this.checkLogicalMessageSupport(); if (exposesLogicalMessages) { /** * Only add this option if the Postgres server supports it. @@ -949,7 +507,7 @@ WHERE oid = $1::regclass`, for await (const chunk of replicationStream.pgoutputDecode()) { this.touch(); - if (this.abort_signal.aborted) { + if (this.abortSignal.aborted) { break; } @@ -1044,6 +602,7 @@ WHERE oid = $1::regclass`, // Big caveat: This _must not_ be used to skip individual messages, since this LSN // may be in the middle of the next transaction. // It must only be used to associate checkpoints with LSNs. + const didCommit = await batch.keepalive(chunkLastLsn); if (didCommit) { this.oldestUncommittedChange = null; @@ -1061,6 +620,8 @@ WHERE oid = $1::regclass`, } } ); + + throw new ReplicationAbortedError(`Replication stream aborted`, this.abortSignal.reason); } async ack(lsn: string, replicationStream: pgwire.ReplicationStream) { @@ -1071,55 +632,6 @@ WHERE oid = $1::regclass`, replicationStream.ack(lsn); } - /** - * Ensures that the storage is compatible with the replication connection. - * @throws {DatabaseConnectionError} If the storage is not compatible with the replication connection. - */ - protected async ensureStorageCompatibility(): Promise { - const supportsLogicalMessages = await this.checkLogicalMessageSupport(); - - const storageIdentifier = await this.storage.factory.getSystemIdentifier(); - if (storageIdentifier.type != lib_postgres.POSTGRES_CONNECTION_TYPE) { - return { - // Keep the same behaviour as before allowing Postgres storage. - createEmptyCheckpoints: true, - oldestUncommittedChange: null - }; - } - - const parsedStorageIdentifier = lib_postgres.utils.decodePostgresSystemIdentifier(storageIdentifier.id); - /** - * Check if the same server is being used for both the sync bucket storage and the logical replication. - */ - const replicationIdentifier = await lib_postgres.utils.queryPostgresSystemIdentifier(this.connections.pool); - - if (!supportsLogicalMessages && replicationIdentifier.server_id == parsedStorageIdentifier.server_id) { - throw new DatabaseConnectionError( - ErrorCode.PSYNC_S1144, - `Separate Postgres servers are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, - new Error('Postgres version is below 14') - ); - } - - return { - /** - * Don't create empty checkpoints if the same Postgres database is used for the data source - * and sync bucket storage. Creating empty checkpoints will cause WAL feedback loops. - */ - createEmptyCheckpoints: replicationIdentifier.database_name != parsedStorageIdentifier.database_name, - oldestUncommittedChange: null - }; - } - - /** - * Check if the replication connection Postgres server supports - * viewing the contents of logical replication messages. - */ - protected async checkLogicalMessageSupport() { - const version = await this.connections.getServerVersion(); - return version ? version.compareMain('14.0.0') >= 0 : false; - } - async getReplicationLagMillis(): Promise { if (this.oldestUncommittedChange == null) { if (this.isStartingReplication) { diff --git a/modules/module-postgres/src/replication/replication-utils.ts b/modules/module-postgres/src/replication/replication-utils.ts index 893f2ba8..48f62722 100644 --- a/modules/module-postgres/src/replication/replication-utils.ts +++ b/modules/module-postgres/src/replication/replication-utils.ts @@ -1,11 +1,18 @@ import * as pgwire from '@powersync/service-jpgwire'; import * as lib_postgres from '@powersync/lib-service-postgres'; -import { ErrorCode, logger, ServiceAssertionError, ServiceError } from '@powersync/lib-services-framework'; -import { PatternResult, storage } from '@powersync/service-core'; +import { + DatabaseConnectionError, + ErrorCode, + logger, + ServiceAssertionError, + ServiceError +} from '@powersync/lib-services-framework'; +import { BucketStorageFactory, PatternResult, storage } from '@powersync/service-core'; import * as sync_rules from '@powersync/service-sync-rules'; import * as service_types from '@powersync/service-types'; import { ReplicationIdentity } from './PgRelation.js'; +import { getServerVersion } from '../utils/postgres_version.js'; export interface ReplicaIdentityResult { replicationColumns: storage.ColumnDescriptor[]; @@ -396,3 +403,57 @@ export async function cleanUpReplicationSlot(slotName: string, db: pgwire.PgClie params: [{ type: 'varchar', value: slotName }] }); } + +/** + * Ensures that the storage is compatible with the replication connection. + * @throws {DatabaseConnectionError} If the storage is not compatible with the replication connection. + */ +export async function ensureStorageCompatibility( + db: pgwire.PgClient, + factory: BucketStorageFactory +): Promise { + const supportsLogicalMessages = await checkLogicalMessageSupport(db); + + const storageIdentifier = await factory.getSystemIdentifier(); + if (storageIdentifier.type != lib_postgres.POSTGRES_CONNECTION_TYPE) { + return { + // Keep the same behaviour as before allowing Postgres storage. + createEmptyCheckpoints: true, + oldestUncommittedChange: null, + exposesLogicalMessages: supportsLogicalMessages + }; + } + + const parsedStorageIdentifier = lib_postgres.utils.decodePostgresSystemIdentifier(storageIdentifier.id); + /** + * Check if the same server is being used for both the sync bucket storage and the logical replication. + */ + const replicationIdentifier = await lib_postgres.utils.queryPostgresSystemIdentifier(db); + + if (!supportsLogicalMessages && replicationIdentifier.server_id == parsedStorageIdentifier.server_id) { + throw new DatabaseConnectionError( + ErrorCode.PSYNC_S1144, + `Separate Postgres servers are required for the replication source and sync bucket storage when using Postgres versions below 14.0.`, + new Error('Postgres version is below 14') + ); + } + + return { + /** + * Don't create empty checkpoints if the same Postgres database is used for the data source + * and sync bucket storage. Creating empty checkpoints will cause WAL feedback loops. + */ + createEmptyCheckpoints: replicationIdentifier.database_name != parsedStorageIdentifier.database_name, + oldestUncommittedChange: null, + exposesLogicalMessages: supportsLogicalMessages + }; +} + +/** + * Check if the replication connection Postgres server supports + * viewing the contents of logical replication messages. + */ +export async function checkLogicalMessageSupport(db: pgwire.PgClient) { + const version = await getServerVersion(db); + return version ? version.compareMain('14.0.0') >= 0 : false; +} diff --git a/modules/module-postgres/test/src/pg_test.test.ts b/modules/module-postgres/test/src/pg_test.test.ts index 9d4a517c..a4fa4f7d 100644 --- a/modules/module-postgres/test/src/pg_test.test.ts +++ b/modules/module-postgres/test/src/pg_test.test.ts @@ -1,16 +1,18 @@ -import { WalStream } from '@module/replication/WalStream.js'; -import { PostgresTypeResolver } from '@module/types/resolver.js'; import * as pgwire from '@powersync/service-jpgwire'; import { applyRowContext, CompatibilityContext, - CompatibilityEdition, - DateTimeValue, SqliteInputRow, - TimeValue + DateTimeValue, + TimeValue, + CompatibilityEdition } from '@powersync/service-sync-rules'; import { describe, expect, test } from 'vitest'; import { clearTestDb, connectPgPool, connectPgWire, TEST_URI } from './util.js'; +import { WalStream } from '@module/replication/WalStream.js'; +import { PostgresTypeResolver } from '@module/types/resolver.js'; +import { CustomTypeRegistry } from '@module/types/registry.js'; +import { PostgresSnapshotter } from '@module/replication/PostgresSnapshotter.js'; describe('pg data types', () => { async function setupTable(db: pgwire.PgClient) { @@ -302,7 +304,7 @@ VALUES(10, ARRAY['null']::TEXT[]); await insert(db); const transformed = [ - ...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data ORDER BY id`))) + ...PostgresSnapshotter.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data ORDER BY id`))) ]; checkResults(transformed); @@ -321,7 +323,7 @@ VALUES(10, ARRAY['null']::TEXT[]); await insert(db); const transformed = [ - ...WalStream.getQueryData( + ...PostgresSnapshotter.getQueryData( pgwire.pgwireRows( await db.query({ statement: `SELECT * FROM test_data WHERE $1 ORDER BY id`, @@ -345,7 +347,9 @@ VALUES(10, ARRAY['null']::TEXT[]); await insertArrays(db); const transformed = [ - ...WalStream.getQueryData(pgwire.pgwireRows(await db.query(`SELECT * FROM test_data_arrays ORDER BY id`))) + ...PostgresSnapshotter.getQueryData( + pgwire.pgwireRows(await db.query(`SELECT * FROM test_data_arrays ORDER BY id`)) + ) ].map((e) => applyRowContext(e, CompatibilityContext.FULL_BACKWARDS_COMPATIBILITY)); checkResultArrays(transformed); @@ -448,7 +452,7 @@ INSERT INTO test_data(id, time, timestamp, timestamptz) VALUES (1, '17:42:01.12' `); const [row] = [ - ...WalStream.getQueryData( + ...PostgresSnapshotter.getQueryData( pgwire.pgwireRows(await db.query(`SELECT time, timestamp, timestamptz FROM test_data`)) ) ];