From 7d6c6d9b07fbfab0aaf918f487e0593d282dbc48 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Mon, 6 Apr 2026 20:39:53 +0500 Subject: [PATCH 01/11] Use native PG17+ logical slot failover; retire spock worker on PG18. PostgreSQL 17 introduced built-in logical slot synchronization to physical standbys via the slotsync worker (sync_replication_slots) and the FAILOVER flag on logical slots. PostgreSQL 18 completes the feature with synchronized_standby_slots replacing the need for any third-party slot sync worker. Mark all spock logical slots with FAILOVER at creation time on PG17+ so the native slotsync worker picks them up automatically. On PG17, spock's failover worker checks IsSyncingReplicationSlots() and yields if the native worker is active, preventing conflicts. On PG18+, the spock_failover_slots background worker is not registered at all; users must set sync_replication_slots = on. For PG15 and PG16, behavior is unchanged: spock's bgworker syncs slots and the ClientAuthentication_hook holds walsenders back until standbys confirm via spock.pg_standby_slot_names. ZODAN (zodan.sql) also creates logical slots via dblink; update both slot creation sites to pass failover => true on PG17+ using a runtime server_version_num check. Add docs/logical_slot_failover.md covering setup for all supported PostgreSQL versions, required postgresql.conf settings, monitoring queries, and a version behaviour matrix. Update configuring.md with the five failover-slot GUCs and a cross-reference. Add the new page to mkdocs.yml navigation. --- docs/configuring.md | 103 ++++++++++++++++++++++++++++++++++++ docs/spock_release_notes.md | 16 ++++++ mkdocs.yml | 1 + samples/Z0DAN/zodan.sql | 26 +++++++-- src/spock_failover_slots.c | 43 +++++++++++++-- src/spock_sync.c | 30 ++++++++++- 6 files changed, 209 insertions(+), 10 deletions(-) diff --git a/docs/configuring.md b/docs/configuring.md index da29b3ed..d2de73f2 100644 --- a/docs/configuring.md +++ b/docs/configuring.md @@ -213,6 +213,109 @@ liveness detection. Default: `300` (5 minutes). spock.apply_idle_timeout = 300 ``` +### Logical Slot Failover (HA Standby) + +Spock creates logical replication slots on each provider node. For high +availability with a physical standby, these slots must be synchronized to the +standby so that replication can resume without data loss after a failover. + +See [Logical Slot Failover](logical_slot_failover.md) for full setup instructions. + +The behaviour depends on the PostgreSQL version: + +| PostgreSQL | Slot sync mechanism | Spock worker | +|---|---|---| +| 15, 16 | Spock built-in `spock_failover_slots` worker | Always runs | +| 17 | Spock worker OR native `sync_replication_slots` | Spock worker yields to native if enabled | +| 18+ | Native `sync_replication_slots` (required) | Not registered | + +#### PostgreSQL 17 and Later (Native Slot Sync) + +On PostgreSQL 17+, Spock marks every logical slot with the `FAILOVER` flag +at creation time. PostgreSQL's built-in slotsync worker then synchronizes +those slots automatically. + +On **PostgreSQL 18+**, Spock's own failover worker is not registered. You +must configure the native mechanism: + +**Primary (`postgresql.conf`):** +``` +synchronized_standby_slots = 'physical_slot_name' +``` + +**Standby (`postgresql.conf`):** +``` +sync_replication_slots = on +primary_conninfo = 'host= dbname= ...' +primary_slot_name = 'physical_slot_name' +hot_standby_feedback = on +``` + +After a failover, subscribers only need to update their `host=` in the +connection string — replication resumes from the last synchronized LSN with +no data loss. + +#### PostgreSQL 15 and 16 (Spock Built-in Worker) + +On PostgreSQL 15 and 16, Spock's `spock_failover_slots` background worker +handles slot synchronization. Configure it with the GUCs below. + +### `spock.synchronize_slot_names` + +List of slot name patterns to synchronize from primary to physical standby. +Accepts name prefixes (`name:foo`) or LIKE patterns (`name_like:spock%`). +Default: `name_like:%%` (synchronize all logical slots). + +``` +spock.synchronize_slot_names = 'name_like:%%' +``` + +Only used on PostgreSQL 15 and 16. On PostgreSQL 17+, slot synchronization +is handled natively via `sync_replication_slots = on`. + +### `spock.drop_extra_slots` + +When `on` (the default), the `spock_failover_slots` worker drops any slots +on the standby that do not match `spock.synchronize_slot_names`. + +``` +spock.drop_extra_slots = on +``` + +### `spock.primary_dsn` + +Connection string used by the `spock_failover_slots` worker to connect to +the primary and read slot state. If empty, `primary_conninfo` from +`postgresql.conf` is used. + +``` +spock.primary_dsn = '' +``` + +### `spock.pg_standby_slot_names` + +Comma-separated list of physical replication slot names that must confirm +durable flush of a given LSN before the walsender is allowed to replicate +logical changes beyond that LSN. This prevents a physical standby from +falling behind a logical subscriber. + +``` +spock.pg_standby_slot_names = 'physical_slot_1,physical_slot_2' +``` + +Only used on PostgreSQL 15 and 16. On PostgreSQL 17+, use +`synchronized_standby_slots` instead. + +### `spock.standby_slots_min_confirmed` + +Number of slots from `spock.pg_standby_slot_names` that must confirm a +given LSN before logical replication is allowed to proceed. The default +`-1` requires all listed slots to confirm. `0` disables the check. + +``` +spock.standby_slots_min_confirmed = -1 +``` + ### `spock.include_ddl_repset` `spock.include_ddl_repset` enables spock to automatically add tables to diff --git a/docs/spock_release_notes.md b/docs/spock_release_notes.md index 1fdd1994..73ded0a7 100644 --- a/docs/spock_release_notes.md +++ b/docs/spock_release_notes.md @@ -2,6 +2,22 @@ ## Spock 5.1 on xxx +### Logical Slot Failover Improvements + +* On **PostgreSQL 17+**, Spock now creates all logical replication slots with + the `FAILOVER` flag, allowing PostgreSQL's built-in slotsync worker + (`sync_replication_slots = on`) to automatically synchronize them to + physical standbys. +* On **PostgreSQL 18+**, Spock's own `spock_failover_slots` background worker + is no longer registered. The native PostgreSQL slotsync worker fully + replaces it. See the [Logical Slot Failover](configuring.md#logical-slot-failover-ha-standby) + section in the configuration guide for required `postgresql.conf` settings. +* On **PostgreSQL 17**, Spock's worker remains active but automatically yields + to the native slotsync worker if `sync_replication_slots = on` is set, + preventing conflicts. + + + This release deprecates the spock.exception_replay_queue_size GUC. Previously Spock restored transaction changes up to the size defined by the spock.exception_replay_queue_size GUC. If an error occurred, the transaction was replayed, and if the size was less than the exception queue, the cache was used. If the size was greater than the limit, it was resent from the origin. Now no restriction exists. Spock will use memory until memory is exhausted (improving performance for huge transactions). If an allocation fails, Spock performs as specified by the spock.exception_behavior GUC: diff --git a/mkdocs.yml b/mkdocs.yml index a5a5e34d..2cd46a8a 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -53,6 +53,7 @@ nav: - Installing and Configuring Spock: install_spock.md - Creating a Two-Node Cluster: two_node_cluster.md - Using Advanced Configuration Options: configuring.md + - Logical Slot Failover (HA Standby): logical_slot_failover.md - Upgrading a Spock Installation: upgrading_spock.md - Conflict Types and Resolution: conflict_types.md - Conflict Avoidance and Delta-Apply Columns: conflicts.md diff --git a/samples/Z0DAN/zodan.sql b/samples/Z0DAN/zodan.sql index a9e4d9d4..ebaeb505 100644 --- a/samples/Z0DAN/zodan.sql +++ b/samples/Z0DAN/zodan.sql @@ -384,10 +384,22 @@ BEGIN -- ============================================================================ -- Step 2: Build remote SQL for replication slot creation -- ============================================================================ - remotesql := format( - 'SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, %L)', - slot_name, plugin - ); + -- + -- On PostgreSQL 17+, mark the slot with failover => true so the built-in + -- slotsync worker (sync_replication_slots = on) synchronizes it to physical + -- standbys automatically. On older versions, omit the failover parameter. + -- + IF (SELECT setting::int >= 170000 FROM pg_settings WHERE name = 'server_version_num') THEN + remotesql := format( + 'SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, %L, false, false, true)', + slot_name, plugin + ); + ELSE + remotesql := format( + 'SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, %L)', + slot_name, plugin + ); + END IF; IF verb THEN RAISE NOTICE '[QUERY] %', remotesql; @@ -1330,7 +1342,11 @@ BEGIN dbname, rec.node_name, spock.gen_sub_name(rec.node_name, new_node_name)); - remotesql := format('SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, ''spock_output'');', slot_name); + IF (SELECT setting::int >= 170000 FROM pg_settings WHERE name = 'server_version_num') THEN + remotesql := format('SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, ''spock_output'', false, false, true);', slot_name); + ELSE + remotesql := format('SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, ''spock_output'');', slot_name); + END IF; IF verb THEN RAISE NOTICE ' Remote SQL for slot creation: %', remotesql; END IF; diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index 61c56a0e..4634721e 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -36,6 +36,9 @@ #include "replication/slot.h" #include "replication/walreceiver.h" #include "replication/walsender.h" +#if PG_VERSION_NUM >= 170000 +#include "replication/slotsync.h" +#endif #include "storage/ipc.h" #include "storage/procarray.h" @@ -1051,6 +1054,14 @@ synchronize_failover_slots(long sleep_time) void spock_failover_slots_main(Datum main_arg) { +#if PG_VERSION_NUM >= 180000 + /* + * PostgreSQL 18 has native logical slot synchronization via + * sync_replication_slots = on. This worker is not registered on PG18, + * so this entry point should never be reached. + */ + elog(ERROR, "spock_failover_slots_main: not supported on PostgreSQL 18+"); +#else /* Establish signal handlers. */ pqsignal(SIGUSR1, procsignal_sigusr1_handler); pqsignal(SIGTERM, die); @@ -1074,9 +1085,19 @@ spock_failover_slots_main(Datum main_arg) CHECK_FOR_INTERRUPTS(); - /* On standby, run sync only when hot_standby_feedback is on; otherwise - * use long nap so we never elog(ERROR) for hot_standby_feedback off. */ - if (RecoveryInProgress() && hot_standby_feedback) + /* + * On standby, run sync only when hot_standby_feedback is on; otherwise + * use long nap so we never elog(ERROR) for hot_standby_feedback off. + * + * On PG17+, also skip when PostgreSQL's native slotsync worker is + * active (sync_replication_slots = on), to avoid both workers + * competing to synchronize the same logical slots. + */ + if (RecoveryInProgress() && hot_standby_feedback +#if PG_VERSION_NUM >= 170000 + && !IsSyncingReplicationSlots() +#endif + ) sleep_time = synchronize_failover_slots(WORKER_NAP_TIME); else sleep_time = WORKER_NAP_TIME * 10; @@ -1098,6 +1119,7 @@ spock_failover_slots_main(Datum main_arg) ProcessConfigFile(PGC_SIGHUP); } } +#endif /* PG_VERSION_NUM < 180000 */ } static bool @@ -1435,6 +1457,20 @@ spock_init_failover_slot(void) if (IsBinaryUpgrade) return; +#if PG_VERSION_NUM >= 180000 + /* + * PostgreSQL 18 natively synchronizes logical replication slots to + * physical standbys via sync_replication_slots = on (slotsync worker) + * and provides synchronized_standby_slots for walsender hold-back. + * Spock's failover slot worker is not needed on PG18+. + * + * To enable slot synchronization on PG18, set in postgresql.conf: + * sync_replication_slots = on + * primary_conninfo = '...' + */ + elog(LOG, "spock: skipping failover slot worker on PostgreSQL 18+ " + "(use sync_replication_slots = on instead)"); +#else /* Run the worker. */ memset(&bgw, 0, sizeof(bgw)); bgw.bgw_flags = @@ -1450,4 +1486,5 @@ spock_init_failover_slot(void) /* Install Hooks */ original_client_auth_hook = ClientAuthentication_hook; ClientAuthentication_hook = attach_to_walsender; +#endif /* PG_VERSION_NUM < 180000 */ } diff --git a/src/spock_sync.c b/src/spock_sync.c index 824e540a..481f4daa 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -364,11 +364,26 @@ ensure_replication_slot_snapshot(PGconn *sql_conn, PGconn *repl_conn, appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s", slot_name, "spock_output"); - /* TODO: Should we ever use FAILOVER here? */ +#if PG_VERSION_NUM >= 170000 /* - * if (use_failover_slot) appendStringInfo(&query, " FAILOVER"); + * PostgreSQL 17+ supports logical slot synchronization to physical + * standbys via sync_replication_slots = on. Mark all spock slots with + * FAILOVER so the built-in slotsync worker picks them up automatically. + * + * On PG17+ the caller should configure: + * Primary: synchronized_standby_slots = '' + * Standby: sync_replication_slots = on + * primary_conninfo = '...' + * primary_slot_name = '' + * hot_standby_feedback = on */ + appendStringInfo(&query, " FAILOVER"); +#else + /* On older versions use FAILOVER only when the remote supports it. */ + if (use_failover_slot) + appendStringInfo(&query, " FAILOVER"); +#endif res = PQexec(repl_conn, query.data); @@ -614,9 +629,17 @@ spock_create_slot_and_read_progress(PGconn *conn, PGconn *repl_conn, /* * Create the slot via the replication protocol. This returns a snapshot * consistent with the slot's WAL position — the correct snapshot for COPY. + * + * PG17+ supports logical slot synchronization to physical standbys via + * sync_replication_slots = on. Mark all spock slots with (FAILOVER) so + * the built-in slotsync worker picks them up automatically. + * PG17+ uses parenthesised option syntax: plugin (FAILOVER). */ appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s", slot_name, "spock_output"); +#if PG_VERSION_NUM >= 170000 + appendStringInfo(&query, " (FAILOVER)"); +#endif res = PQexec(repl_conn, query.data); resetStringInfo(&query); @@ -634,6 +657,9 @@ spock_create_slot_and_read_progress(PGconn *conn, PGconn *repl_conn, appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s", slot_name, "spock_output"); +#if PG_VERSION_NUM >= 170000 + appendStringInfo(&query, " (FAILOVER)"); +#endif res = PQexec(repl_conn, query.data); resetStringInfo(&query); } From 93915188d455ec450912be45f6113b6c00e97761 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Tue, 7 Apr 2026 22:13:47 +0500 Subject: [PATCH 02/11] Use parenthesised (FAILOVER) syntax in CREATE_REPLICATION_SLOT and added test case. PG17+ requires parenthesised options for CREATE_REPLICATION_SLOT; the bare FAILOVER keyword fails to parse. Fix spock_sync.c and add TAP test 018_failover_slots to verify logical slot sync to a physical standby and end-to-end replication after promotion. Add TAP test 018_failover_slots.pl that: - builds a 2-node spock cluster (n1 provider, n2 subscriber) - creates a physical streaming standby of n1 via pg_basebackup - verifies the logical slot is synced to the standby (PG17+ native slotsync; PG15/16 spock_failover_slots worker) - promotes the standby, reconnects n2, and confirms replication resumes end-to-end --- docs/logical_slot_failover.md | 157 +++++++++++ src/spock_sync.c | 7 +- tests/tap/schedule | 1 + tests/tap/t/018_failover_slots.pl | 425 ++++++++++++++++++++++++++++++ 4 files changed, 588 insertions(+), 2 deletions(-) create mode 100644 docs/logical_slot_failover.md create mode 100644 tests/tap/t/018_failover_slots.pl diff --git a/docs/logical_slot_failover.md b/docs/logical_slot_failover.md new file mode 100644 index 00000000..e2653361 --- /dev/null +++ b/docs/logical_slot_failover.md @@ -0,0 +1,157 @@ +# Logical Slot Failover + +Spock creates logical replication slots on each provider node. For high +availability with a physical standby, these slots must be synchronized to the +standby so that replication can resume without data loss after a failover. + +## How It Works + +When a primary server fails and a physical standby is promoted, any active +logical subscribers must be able to continue replicating from the new primary. +This requires the logical replication slots — which track each subscriber's +replication position — to be present and up to date on the standby before the +failover occurs. + +Without slot synchronization, a failover would require manual slot recreation +and a full re-sync of all subscriber tables. + +## PostgreSQL Version Behaviour + +| PostgreSQL | Slot sync mechanism | Spock worker | +|---|---|---| +| 15, 16 | Spock built-in `spock_failover_slots` worker | Always runs on standby | +| 17 | Spock worker **or** native `sync_replication_slots` | Yields to native if enabled | +| 18+ | Native `sync_replication_slots` (required) | Not registered | + +On **PostgreSQL 17+**, Spock marks every logical slot with the `FAILOVER` flag +at creation time. This enables PostgreSQL's built-in slotsync worker to pick +them up automatically. + +On **PostgreSQL 18+**, Spock's own failover worker is not registered at all. +The native slotsync worker is the only mechanism. + +## Setup: PostgreSQL 18+ (Native) + +### 1. Create a physical replication slot on the primary + +```sql +SELECT pg_create_physical_replication_slot('spock_standby_slot'); +``` + +### 2. Configure the primary (`postgresql.conf`) + +``` +# Hold walsenders back until the standby has confirmed this LSN, +# preventing logical subscribers from getting ahead of the standby. +synchronized_standby_slots = 'spock_standby_slot' +``` + +### 3. Configure the standby (`postgresql.conf`) + +``` +sync_replication_slots = on +primary_conninfo = 'host= port=5432 dbname= user=replicator' +primary_slot_name = 'spock_standby_slot' +hot_standby_feedback = on +``` + +### 4. Verify slot synchronization + +On the standby, confirm that Spock's logical slots are synchronized: + +```sql +SELECT slot_name, synced, failover, invalidation_reason +FROM pg_replication_slots +WHERE NOT temporary; +``` + +All Spock slots should show `synced = true` and `failover = true`. + +### 5. After failover + +After promoting the standby, subscribers only need to update their connection +string to point to the new primary. Replication resumes from the last +synchronized LSN with no data loss and no slot recreation required. + +## Setup: PostgreSQL 15 and 16 (Spock Worker) + +On PostgreSQL 15 and 16, the `spock_failover_slots` background worker runs +on the standby and periodically copies slot state from the primary. + +### Requirements + +- `hot_standby_feedback = on` on the standby (required for the worker to run) +- The standby must be able to connect to the primary + +### Configuration GUCs + +| GUC | Default | Description | +|---|---|---| +| `spock.synchronize_slot_names` | `name_like:%%` | Slot name patterns to sync (all by default) | +| `spock.drop_extra_slots` | `on` | Drop standby slots not matching the pattern | +| `spock.primary_dsn` | `''` | DSN to connect to primary (falls back to `primary_conninfo`) | +| `spock.pg_standby_slot_names` | `''` | Physical slots that must confirm LSN before logical replication advances | +| `spock.standby_slots_min_confirmed` | `-1` | How many slots from `pg_standby_slot_names` must confirm (`-1` = all) | + +### Example (`postgresql.conf` on standby) + +``` +hot_standby_feedback = on +spock.synchronize_slot_names = 'name_like:%%' +spock.drop_extra_slots = on + +# Optional: hold walsenders on primary until this standby confirms +# (set this on the PRIMARY, not the standby) +# spock.pg_standby_slot_names = 'physical_slot_name' +``` + +## Monitoring + +### Check slot sync status (PG17+) + +```sql +SELECT slot_name, + failover, + synced, + active, + invalidation_reason, + confirmed_flush_lsn +FROM pg_replication_slots +WHERE NOT temporary +ORDER BY slot_name; +``` + +### Check if slotsync worker is active (PG17+) + +```sql +SELECT * FROM pg_stat_replication_slots; +``` + +### Check spock worker is running (PG15/16) + +```sql +SELECT pid, application_name, state +FROM pg_stat_activity +WHERE application_name = 'spock_failover_slots worker'; +``` + +## FAQ + +**Q: Do I need to do anything after a failover?** + +On PG17+: Just update the subscriber's `host=` in their DSN. No slot +recreation needed. + +On PG15/16: Spock's worker on the standby (now primary) stops running +since it is no longer in recovery. Subscribers reconnect automatically. + +**Q: What if `sync_replication_slots` is not configured on PG18?** + +Spock's worker is not registered on PG18. If `sync_replication_slots = on` +is not set, logical slots will **not** be synchronized to standbys, and a +failover will require manual slot recreation and table re-sync. + +**Q: Can I use both mechanisms on PG17?** + +No. If `sync_replication_slots = on` is set on PG17, Spock's worker detects +this and skips its sync loop, deferring to the native worker entirely. diff --git a/src/spock_sync.c b/src/spock_sync.c index 481f4daa..fee89555 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -371,6 +371,9 @@ ensure_replication_slot_snapshot(PGconn *sql_conn, PGconn *repl_conn, * standbys via sync_replication_slots = on. Mark all spock slots with * FAILOVER so the built-in slotsync worker picks them up automatically. * + * PG17+ uses parenthesised option syntax for CREATE_REPLICATION_SLOT: + * CREATE_REPLICATION_SLOT "name" LOGICAL plugin (FAILOVER) + * * On PG17+ the caller should configure: * Primary: synchronized_standby_slots = '' * Standby: sync_replication_slots = on @@ -378,11 +381,11 @@ ensure_replication_slot_snapshot(PGconn *sql_conn, PGconn *repl_conn, * primary_slot_name = '' * hot_standby_feedback = on */ - appendStringInfo(&query, " FAILOVER"); + appendStringInfo(&query, " (FAILOVER)"); #else /* On older versions use FAILOVER only when the remote supports it. */ if (use_failover_slot) - appendStringInfo(&query, " FAILOVER"); + appendStringInfo(&query, " (FAILOVER)"); #endif diff --git a/tests/tap/schedule b/tests/tap/schedule index 68a61e78..a7894dea 100644 --- a/tests/tap/schedule +++ b/tests/tap/schedule @@ -41,5 +41,6 @@ test: 015_skip_lsn test: 015_forward_origin_advance test: 016_sub_disable_missing_relation test: 018_forward_origins +test: 018_failover_slots test: 019_stale_fd_epoll_after_conn_death test: 022_rmgr_progress_post_checkpoint_crash diff --git a/tests/tap/t/018_failover_slots.pl b/tests/tap/t/018_failover_slots.pl new file mode 100644 index 00000000..09de95ca --- /dev/null +++ b/tests/tap/t/018_failover_slots.pl @@ -0,0 +1,425 @@ +use strict; +use warnings; +use Test::More; +use lib '.'; +use lib 't'; +use SpockTest qw( + create_cluster destroy_cluster + get_test_config system_or_bail command_ok system_maybe + psql_or_bail scalar_query +); +use Time::HiRes qw(time); + +# ============================================================================= +# Test: 018_failover_slots.pl +# +# Verifies logical replication slot failover for all supported PG versions. +# +# Topology: +# n1 (provider/primary) ──logical──> n2 (subscriber) +# n1 ──physical──> standby (stream replica of n1) +# +# Test flow: +# 1. Create 2-node spock cluster (n1 + n2, cross-wired) +# 2. Build a physical streaming standby of n1 via pg_basebackup +# 3. Configure standby for slot sync (version-appropriate) +# 4. Verify logical slot appears on standby with correct flags +# 5. Confirm slot LSN on standby tracks primary (slot is live) +# 6. Write data to n1, confirm n2 receives it (replication healthy) +# 7. Promote standby, reconnect n2, confirm post-failover replication +# ============================================================================= + +# -------------------------------------------------------------------------- +# Helper: query on an arbitrary port +# -------------------------------------------------------------------------- +sub qport { + my ($pg_bin, $host, $port, $dbname, $user, $sql) = @_; + my $out = `$pg_bin/psql -X -h $host -p $port -d $dbname -U $user -t -c "$sql" 2>/dev/null`; + $out //= ''; + $out =~ s/^\s+|\s+$//g; + return $out; +} + +# -------------------------------------------------------------------------- +# Helper: poll until condition or timeout +# -------------------------------------------------------------------------- +sub wait_until { + my ($timeout, $poll, $cond) = @_; + my $deadline = time() + $timeout; + while (time() < $deadline) { + return 1 if $cond->(); + sleep($poll); + } + return 0; +} + +# ========================================================================== +# 1. Create 2-node spock cluster +# ========================================================================== +create_cluster(2, 'Create 2-node Spock cluster'); + +my $config = get_test_config(); +my $host = $config->{host}; +my $dbname = $config->{db_name}; +my $db_user = $config->{db_user}; +my $db_password = $config->{db_password}; +my $pg_bin = $config->{pg_bin}; +my $node_ports = $config->{node_ports}; +my $node_dirs = $config->{node_datadirs}; +my $primary_port = $node_ports->[0]; # n1 +my $sub_port = $node_ports->[1]; # n2 +my $primary_dir = $node_dirs->[0]; + +# Detect PostgreSQL major version +my $pgver = scalar_query(1, + "SELECT current_setting('server_version_num')::int"); +$pgver =~ s/\s+//g; +my $pg_major = int($pgver / 10000); +diag("PostgreSQL major version: $pg_major"); + +# ========================================================================== +# 2. Create subscription n2 -> n1 (n2 subscribes to n1) +# ========================================================================== +psql_or_bail(2, "SELECT spock.sub_create( + 'sub_n2_n1', + 'host=$host dbname=$dbname port=$primary_port user=$db_user password=$db_password', + ARRAY['default','default_insert_only','ddl_sql'], + true, true +)"); + +my $sub_active = wait_until(60, 3, sub { + my $s = scalar_query(2, + "SELECT sub_enabled FROM spock.subscription WHERE sub_name = 'sub_n2_n1'"); + $s =~ s/\s+//g; + return $s eq 't'; +}); +ok($sub_active, 'Subscription sub_n2_n1 active on n2'); + +# ========================================================================== +# 3. Get the logical slot created on n1 for n2 +# ========================================================================== +my $slot_name = scalar_query(1, + "SELECT slot_name FROM pg_replication_slots WHERE slot_type='logical' LIMIT 1"); +$slot_name =~ s/\s+//g; +ok(length($slot_name) > 0, + "Logical slot created on n1: '$slot_name'"); + +# ========================================================================== +# 4. Verify FAILOVER flag on slot (PG17+) +# ========================================================================== +if ($pg_major >= 17) { + my $fv = scalar_query(1, + "SELECT failover FROM pg_replication_slots WHERE slot_name='$slot_name'"); + $fv =~ s/\s+//g; + is($fv, 't', + "PG$pg_major: slot '$slot_name' was created with FAILOVER=true"); +} else { + pass("PG$pg_major: FAILOVER flag not applicable (PG15/16)"); +} + +# ========================================================================== +# 5. Verify spock failover bgworker state on n1 (primary) +# ========================================================================== +if ($pg_major >= 18) { + my $wc = scalar_query(1, + "SELECT count(*) FROM pg_stat_activity + WHERE application_name = 'spock_failover_slots worker'"); + $wc =~ s/\s+//g; + is($wc, '0', + "PG18+: spock_failover_slots bgworker not registered on primary"); +} else { + pass("PG$pg_major: spock bgworker expected (PG15/16/17 uses it on standby)"); +} + +# ========================================================================== +# 6. Create physical replication slot for the standby +# ========================================================================== +psql_or_bail(1, + "SELECT pg_create_physical_replication_slot('standby_physical_slot')"); +pass('Physical replication slot created on n1'); + +# ========================================================================== +# 7. Build physical standby of n1 via pg_basebackup +# ========================================================================== +my $standby_port = $primary_port + 10; +my $standby_datadir = '/tmp/tmp_spock_failover_standby'; +my $standby_logdir = "$standby_datadir/pg_log"; +my $standby_logfile = "$standby_logdir/standby.log"; + +system("rm -rf $standby_datadir 2>/dev/null"); +system_or_bail("$pg_bin/pg_basebackup", + '-D', $standby_datadir, + '-h', $host, '-p', $primary_port, '-U', $db_user, + '-X', 'stream', '-R'); +pass('Physical standby created via pg_basebackup'); + +# ========================================================================== +# 8. Configure and start standby +# ========================================================================== +system_or_bail('mkdir', '-p', $standby_logdir); +{ + open(my $conf, '>>', "$standby_datadir/postgresql.conf") + or die "Cannot open standby postgresql.conf: $!"; + print $conf "\n# ---- standby overrides ----\n"; + print $conf "port = $standby_port\n"; + print $conf "hot_standby = on\n"; + print $conf "hot_standby_feedback = on\n"; + print $conf "primary_slot_name = 'standby_physical_slot'\n"; + print $conf "log_directory = '$standby_logdir'\n"; + print $conf "log_filename = 'standby.log'\n"; + print $conf "log_min_messages = debug1\n"; + print $conf "log_replication_commands = on\n"; + + if ($pg_major >= 17) { + # Enable native slot sync worker on standby + print $conf "sync_replication_slots = on\n"; + } + close($conf); +} + +# pg_basebackup -R writes primary_conninfo without dbname to postgresql.auto.conf, +# but PG17+ slotsync worker requires dbname in primary_conninfo to locate logical +# slots. Append a corrected primary_conninfo to auto.conf (last entry wins). +{ + open(my $aconf, '>>', "$standby_datadir/postgresql.auto.conf") + or die "Cannot open standby postgresql.auto.conf: $!"; + print $aconf "\n# slotsync requires dbname in primary_conninfo\n"; + print $aconf "primary_conninfo = 'host=$host port=$primary_port " + . "user=$db_user dbname=$dbname'\n"; + close($aconf); +} + +# PG17+: hold walsenders on primary until standby confirms LSN +if ($pg_major >= 17) { + psql_or_bail(1, + "ALTER SYSTEM SET synchronized_standby_slots = 'standby_physical_slot'"); + psql_or_bail(1, "SELECT pg_reload_conf()"); +} + +system_or_bail("$pg_bin/pg_ctl", 'start', + '-D', $standby_datadir, '-l', "$standby_datadir/startup.log", '-w'); + +command_ok(["$pg_bin/pg_isready", '-h', $host, '-p', $standby_port], + 'Standby is accepting connections'); + +# pg_is_in_recovery() returns boolean — psql displays as 't'/'f' +my $in_recovery = qport($pg_bin, $host, $standby_port, + $dbname, $db_user, "SELECT pg_is_in_recovery()"); +$in_recovery =~ s/\s+//g; +is($in_recovery, 't', 'Standby is in recovery (streaming from n1)'); + +# ========================================================================== +# 9. Verify physical streaming replication is active on n1 +# ========================================================================== +my $streaming = wait_until(30, 3, sub { + my $c = scalar_query(1, + "SELECT count(*) FROM pg_stat_replication + WHERE state = 'streaming'"); + $c =~ s/\s+//g; + return $c > 0; +}); +ok($streaming, 'n1 has an active streaming replication connection to standby'); + +# ========================================================================== +# 10. Wait for logical slot to be synchronized to standby +# ========================================================================== +my $wait_secs = ($pg_major >= 17) ? 60 : 120; +my $poll_secs = 5; +diag("Waiting up to ${wait_secs}s for slot '$slot_name' on standby..."); + +my $slot_present = wait_until($wait_secs, $poll_secs, sub { + my $c = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT count(*) FROM pg_replication_slots + WHERE slot_name = '$slot_name'"); + $c =~ s/\s+//g; + return $c > 0; +}); +ok($slot_present, + "Logical slot '$slot_name' present on standby within ${wait_secs}s"); + +# ========================================================================== +# 11. PG17+: verify synced=t and failover=t on standby +# ========================================================================== +if ($pg_major >= 17) { + # Poll until synced=true (slotsync may take a few cycles) + my $fully_synced = wait_until(30, 3, sub { + my $s = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT synced FROM pg_replication_slots + WHERE slot_name = '$slot_name'"); + $s =~ s/\s+//g; + return $s eq 't'; + }); + is($fully_synced, 1, + "PG$pg_major: standby slot '$slot_name' has synced=true"); + + my $fb = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT failover FROM pg_replication_slots + WHERE slot_name = '$slot_name'"); + $fb =~ s/\s+//g; + is($fb, 't', + "PG$pg_major: standby slot '$slot_name' has failover=true"); + + # Verify slot LSN on standby is not behind primary by more than 1MB + my $primary_lsn = scalar_query(1, "SELECT pg_current_wal_lsn()"); + $primary_lsn =~ s/\s+//g; + my $slot_lsn = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT confirmed_flush_lsn FROM pg_replication_slots + WHERE slot_name = '$slot_name'"); + $slot_lsn =~ s/\s+//g; + my $lag = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT '$primary_lsn'::pg_lsn - confirmed_flush_lsn + FROM pg_replication_slots WHERE slot_name = '$slot_name'"); + $lag =~ s/\s+//g; + ok(defined($lag) && $lag ne '', + "PG$pg_major: slot LSN lag from primary is measurable ($lag bytes)"); + + diag(" primary_lsn=$primary_lsn slot_lsn=$slot_lsn lag=${lag}bytes"); +} else { + pass("PG$pg_major: synced column not available"); + pass("PG$pg_major: failover column not available"); + pass("PG$pg_major: LSN lag check skipped"); +} + +# ========================================================================== +# 12. PG15/16: verify spock_failover_slots worker ran on standby +# ========================================================================== +if ($pg_major < 17) { + my $wc = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT count(*) FROM pg_stat_activity + WHERE application_name = 'spock_failover_slots worker'"); + $wc =~ s/\s+//g; + ok($wc > 0, + "PG$pg_major: spock_failover_slots worker running on standby"); +} else { + pass("PG$pg_major: spock bgworker not expected on standby (native slotsync)"); +} + +# ========================================================================== +# 13. PG18+: verify NO spock bgworker on standby +# ========================================================================== +if ($pg_major >= 18) { + my $wc = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT count(*) FROM pg_stat_activity + WHERE application_name = 'spock_failover_slots worker'"); + $wc =~ s/\s+//g; + is($wc, '0', + "PG18+: no spock_failover_slots bgworker on standby"); +} else { + pass("PG$pg_major: bgworker absence check not applicable"); +} + +# ========================================================================== +# 14. Write data on n1, verify n2 receives it (baseline replication check) +# ========================================================================== +psql_or_bail(1, + "CREATE TABLE IF NOT EXISTS failover_test (id int primary key, val text)"); +sleep(5); +psql_or_bail(1, + "INSERT INTO failover_test VALUES (1, 'before_failover')"); + +my $data_ok = wait_until(30, 3, sub { + my $v = scalar_query(2, + "SELECT val FROM failover_test WHERE id = 1"); + $v =~ s/\s+//g; + return $v eq 'before_failover'; +}); +ok($data_ok, 'Row (1, before_failover) replicated n1 -> n2 before failover'); + +# ========================================================================== +# 15. Verify invalidation_reason is NULL (slot is healthy on standby) +# ========================================================================== +if ($pg_major >= 17) { + my $inv = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT coalesce(invalidation_reason::text, 'none') + FROM pg_replication_slots WHERE slot_name = '$slot_name'"); + $inv =~ s/\s+//g; + is($inv, 'none', + "PG$pg_major: slot '$slot_name' on standby has no invalidation_reason"); +} else { + pass("PG$pg_major: invalidation_reason check not applicable"); +} + +# ========================================================================== +# 16. Failover: stop n1, promote standby +# ========================================================================== +diag("Stopping n1 (primary) to simulate failover..."); +system("$pg_bin/pg_ctl stop -D $primary_dir -m fast >> /dev/null 2>&1"); +sleep(5); + +diag("Promoting standby to new primary..."); +# Use promote without -w, then poll +system("$pg_bin/pg_ctl promote -D $standby_datadir >> /dev/null 2>&1"); + +my $promoted = wait_until(30, 3, sub { + my $r = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT pg_is_in_recovery()"); + $r =~ s/\s+//g; + return $r eq 'f'; +}); +ok($promoted, 'Standby promoted to primary (no longer in recovery)'); + +# ========================================================================== +# 17. Reconnect n2 to the promoted standby +# - Add a failover interface on n1's node record +# - Switch subscription to use that interface +# ========================================================================== +psql_or_bail(2, + "SELECT spock.node_add_interface( + 'n1', 'n1_promoted', + 'host=$host dbname=$dbname port=$standby_port user=$db_user password=$db_password' + )"); + +psql_or_bail(2, + "SELECT spock.sub_alter_interface('sub_n2_n1', 'n1_promoted')"); + +# spock auto-disables a subscription on fatal apply errors (including an +# abrupt primary shutdown). Re-enable it so the apply worker reconnects +# using the new interface pointing to the promoted standby. +psql_or_bail(2, "SELECT spock.sub_enable('sub_n2_n1')"); + +# Wait for n2's apply worker to connect to the promoted standby. +diag("Waiting for sub_n2_n1 to reconnect to promoted standby (up to 90s)..."); +my $sub_reconnected = wait_until(90, 5, sub { + my $s = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT count(*) FROM pg_stat_replication"); + $s =~ s/\s+//g; + return $s > 0; +}); +diag($sub_reconnected + ? " n2 connected to promoted standby" + : " WARNING: n2 did not reconnect within 90s"); + +# ========================================================================== +# 18. Write data on promoted standby, verify n2 receives it +# ========================================================================== +system("$pg_bin/psql -X -h $host -p $standby_port -d $dbname -U $db_user " + . "-c \"INSERT INTO failover_test VALUES (2, 'after_failover')\" " + . ">> /dev/null 2>&1"); + +my $post_ok = wait_until(60, 3, sub { + my $v = scalar_query(2, + "SELECT val FROM failover_test WHERE id = 2"); + $v =~ s/\s+//g; + return $v eq 'after_failover'; +}); +ok($post_ok, + 'Row (2, after_failover) replicated promoted-standby -> n2 after failover'); + +# ========================================================================== +# Cleanup +# ========================================================================== +system("$pg_bin/pg_ctl stop -D $standby_datadir -m immediate >> /dev/null 2>&1"); + +# Undo primary GUC change so destroy_cluster can restart n1 cleanly +system("$pg_bin/postgres -D $primary_dir >> /dev/null 2>&1 &"); +sleep(10); +system_maybe("$pg_bin/psql", '-h', $host, '-p', $primary_port, + '-d', $dbname, '-U', $db_user, + '-c', "ALTER SYSTEM RESET synchronized_standby_slots"); +system_maybe("$pg_bin/psql", '-h', $host, '-p', $primary_port, + '-d', $dbname, '-U', $db_user, '-c', "SELECT pg_reload_conf()"); + +system("rm -rf $standby_datadir 2>/dev/null"); + +destroy_cluster('Destroy test cluster'); +done_testing(); From cf78d069ae63dcc2844e91fc6907bfcf7f370c04 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Wed, 8 Apr 2026 12:38:45 +0500 Subject: [PATCH 03/11] Address review feedback and fix PQserverVersion on replication connections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use sql_conn/conn (regular SQL connection) instead of repl_conn for PQserverVersion() checks — replication protocol connections return 0. Also address CodeRabbit review comments on docs, zodan.sql, and tests. Co-Authored-By: Claude Sonnet 4.6 --- docs/configuring.md | 30 +++++++++++---------- docs/logical_slot_failover.md | 12 +++++---- samples/Z0DAN/zodan.sql | 30 +++++++++++++++++++-- src/spock_failover_slots.c | 10 ++++--- src/spock_sync.c | 43 ++++++++++++------------------- tests/tap/t/018_failover_slots.pl | 32 +++++++++++++---------- 6 files changed, 93 insertions(+), 64 deletions(-) diff --git a/docs/configuring.md b/docs/configuring.md index d2de73f2..9647e669 100644 --- a/docs/configuring.md +++ b/docs/configuring.md @@ -239,12 +239,12 @@ On **PostgreSQL 18+**, Spock's own failover worker is not registered. You must configure the native mechanism: **Primary (`postgresql.conf`):** -``` +```ini synchronized_standby_slots = 'physical_slot_name' ``` **Standby (`postgresql.conf`):** -``` +```ini sync_replication_slots = on primary_conninfo = 'host= dbname= ...' primary_slot_name = 'physical_slot_name' @@ -255,10 +255,12 @@ After a failover, subscribers only need to update their `host=` in the connection string — replication resumes from the last synchronized LSN with no data loss. -#### PostgreSQL 15 and 16 (Spock Built-in Worker) +#### PostgreSQL 15, 16, and 17 (Spock Built-in Worker) -On PostgreSQL 15 and 16, Spock's `spock_failover_slots` background worker -handles slot synchronization. Configure it with the GUCs below. +On PostgreSQL 15, 16, and 17, Spock's `spock_failover_slots` background worker +handles slot synchronization. On PostgreSQL 17 it yields to the native +slotsync worker when `sync_replication_slots = on` is enabled. Configure it +with the GUCs below. ### `spock.synchronize_slot_names` @@ -266,19 +268,20 @@ List of slot name patterns to synchronize from primary to physical standby. Accepts name prefixes (`name:foo`) or LIKE patterns (`name_like:spock%`). Default: `name_like:%%` (synchronize all logical slots). -``` +```ini spock.synchronize_slot_names = 'name_like:%%' ``` -Only used on PostgreSQL 15 and 16. On PostgreSQL 17+, slot synchronization -is handled natively via `sync_replication_slots = on`. +Used on PostgreSQL 15, 16, and 17 (when `sync_replication_slots` is not +enabled). On PostgreSQL 17 with native slotsync active, or on PostgreSQL 18+, +this setting is ignored. ### `spock.drop_extra_slots` When `on` (the default), the `spock_failover_slots` worker drops any slots on the standby that do not match `spock.synchronize_slot_names`. -``` +```ini spock.drop_extra_slots = on ``` @@ -288,7 +291,7 @@ Connection string used by the `spock_failover_slots` worker to connect to the primary and read slot state. If empty, `primary_conninfo` from `postgresql.conf` is used. -``` +```ini spock.primary_dsn = '' ``` @@ -299,11 +302,12 @@ durable flush of a given LSN before the walsender is allowed to replicate logical changes beyond that LSN. This prevents a physical standby from falling behind a logical subscriber. -``` +```ini spock.pg_standby_slot_names = 'physical_slot_1,physical_slot_2' ``` -Only used on PostgreSQL 15 and 16. On PostgreSQL 17+, use +Used on PostgreSQL 15, 16, and 17 (when `sync_replication_slots` is not +enabled). On PostgreSQL 17+ with native slotsync, or on PostgreSQL 18+, use `synchronized_standby_slots` instead. ### `spock.standby_slots_min_confirmed` @@ -312,7 +316,7 @@ Number of slots from `spock.pg_standby_slot_names` that must confirm a given LSN before logical replication is allowed to proceed. The default `-1` requires all listed slots to confirm. `0` disables the check. -``` +```ini spock.standby_slots_min_confirmed = -1 ``` diff --git a/docs/logical_slot_failover.md b/docs/logical_slot_failover.md index e2653361..4cfd61ea 100644 --- a/docs/logical_slot_failover.md +++ b/docs/logical_slot_failover.md @@ -40,7 +40,7 @@ SELECT pg_create_physical_replication_slot('spock_standby_slot'); ### 2. Configure the primary (`postgresql.conf`) -``` +```ini # Hold walsenders back until the standby has confirmed this LSN, # preventing logical subscribers from getting ahead of the standby. synchronized_standby_slots = 'spock_standby_slot' @@ -48,7 +48,7 @@ synchronized_standby_slots = 'spock_standby_slot' ### 3. Configure the standby (`postgresql.conf`) -``` +```ini sync_replication_slots = on primary_conninfo = 'host= port=5432 dbname= user=replicator' primary_slot_name = 'spock_standby_slot' @@ -95,7 +95,7 @@ on the standby and periodically copies slot state from the primary. ### Example (`postgresql.conf` on standby) -``` +```ini hot_standby_feedback = on spock.synchronize_slot_names = 'name_like:%%' spock.drop_extra_slots = on @@ -121,10 +121,12 @@ WHERE NOT temporary ORDER BY slot_name; ``` -### Check if slotsync worker is active (PG17+) +### Check if native slotsync worker is active (PG17+) ```sql -SELECT * FROM pg_stat_replication_slots; +SELECT pid, wait_event_type, wait_event, state +FROM pg_stat_activity +WHERE backend_type = 'slot sync worker'; ``` ### Check spock worker is running (PG15/16) diff --git a/samples/Z0DAN/zodan.sql b/samples/Z0DAN/zodan.sql index ebaeb505..79753f4c 100644 --- a/samples/Z0DAN/zodan.sql +++ b/samples/Z0DAN/zodan.sql @@ -356,6 +356,7 @@ DECLARE remotesql text; result RECORD; exists_count int; + remote_pgver int; BEGIN -- ============================================================================ -- Step 1: Check if replication slot already exists on remote node @@ -389,7 +390,20 @@ BEGIN -- slotsync worker (sync_replication_slots = on) synchronizes it to physical -- standbys automatically. On older versions, omit the failover parameter. -- - IF (SELECT setting::int >= 170000 FROM pg_settings WHERE name = 'server_version_num') THEN + -- Query the *remote* server version via the same dblink connection so that + -- mixed-version topologies (e.g. adding a PG17 node to a PG16 cluster) + -- use the correct call signature on the target node. + -- + BEGIN + SELECT pgver INTO remote_pgver + FROM dblink(node_dsn, + 'SELECT setting::int FROM pg_settings WHERE name = ''server_version_num''' + ) AS t(pgver int); + EXCEPTION WHEN OTHERS THEN + remote_pgver := 0; + END; + + IF remote_pgver >= 170000 THEN remotesql := format( 'SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, %L, false, false, true)', slot_name, plugin @@ -1282,6 +1296,7 @@ DECLARE slot_name text; sub_name text; _commit_lsn pg_lsn; + remote_pgver int; BEGIN RAISE NOTICE 'Phase 3: Creating disabled subscriptions and slots'; @@ -1342,7 +1357,18 @@ BEGIN dbname, rec.node_name, spock.gen_sub_name(rec.node_name, new_node_name)); - IF (SELECT setting::int >= 170000 FROM pg_settings WHERE name = 'server_version_num') THEN + -- Query the remote node version so mixed-version topologies use + -- the correct pg_create_logical_replication_slot signature. + BEGIN + SELECT pgver INTO remote_pgver + FROM dblink(rec.dsn, + 'SELECT setting::int FROM pg_settings WHERE name = ''server_version_num''' + ) AS t(pgver int); + EXCEPTION WHEN OTHERS THEN + remote_pgver := 0; + END; + + IF remote_pgver >= 170000 THEN remotesql := format('SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, ''spock_output'', false, false, true);', slot_name); ELSE remotesql := format('SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, ''spock_output'');', slot_name); diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index 4634721e..7895c255 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -1089,13 +1089,15 @@ spock_failover_slots_main(Datum main_arg) * On standby, run sync only when hot_standby_feedback is on; otherwise * use long nap so we never elog(ERROR) for hot_standby_feedback off. * - * On PG17+, also skip when PostgreSQL's native slotsync worker is - * active (sync_replication_slots = on), to avoid both workers - * competing to synchronize the same logical slots. + * On PG17+, yield entirely to PostgreSQL's native slotsync worker when + * sync_replication_slots = on is configured. IsSyncingReplicationSlots() + * is process-local and would always be false here; instead we check the + * exported sync_replication_slots GUC variable directly — if the DBA + * has enabled the native worker, we must not compete with it. */ if (RecoveryInProgress() && hot_standby_feedback #if PG_VERSION_NUM >= 170000 - && !IsSyncingReplicationSlots() + && !sync_replication_slots #endif ) sleep_time = synchronize_failover_slots(WORKER_NAP_TIME); diff --git a/src/spock_sync.c b/src/spock_sync.c index fee89555..99a97aaa 100644 --- a/src/spock_sync.c +++ b/src/spock_sync.c @@ -365,26 +365,20 @@ ensure_replication_slot_snapshot(PGconn *sql_conn, PGconn *repl_conn, appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s", slot_name, "spock_output"); -#if PG_VERSION_NUM >= 170000 /* - * PostgreSQL 17+ supports logical slot synchronization to physical - * standbys via sync_replication_slots = on. Mark all spock slots with - * FAILOVER so the built-in slotsync worker picks them up automatically. - * - * PG17+ uses parenthesised option syntax for CREATE_REPLICATION_SLOT: + * Mark the slot with (FAILOVER) when the *remote* provider is PG17+. + * PG17+ supports logical slot synchronization to physical standbys via + * sync_replication_slots = on. PG17+ uses parenthesised option syntax: * CREATE_REPLICATION_SLOT "name" LOGICAL plugin (FAILOVER) * - * On PG17+ the caller should configure: - * Primary: synchronized_standby_slots = '' - * Standby: sync_replication_slots = on - * primary_conninfo = '...' - * primary_slot_name = '' - * hot_standby_feedback = on + * We key off the regular SQL connection (sql_conn) for version detection. + * Replication protocol connections (repl_conn) return 0 from PQserverVersion() + * so they cannot be used for this check. */ - appendStringInfo(&query, " (FAILOVER)"); -#else - /* On older versions use FAILOVER only when the remote supports it. */ - if (use_failover_slot) + if (PQserverVersion(sql_conn) >= 170000) + appendStringInfo(&query, " (FAILOVER)"); +#if PG_VERSION_NUM < 170000 + else if (use_failover_slot) appendStringInfo(&query, " (FAILOVER)"); #endif @@ -633,16 +627,14 @@ spock_create_slot_and_read_progress(PGconn *conn, PGconn *repl_conn, * Create the slot via the replication protocol. This returns a snapshot * consistent with the slot's WAL position — the correct snapshot for COPY. * - * PG17+ supports logical slot synchronization to physical standbys via - * sync_replication_slots = on. Mark all spock slots with (FAILOVER) so - * the built-in slotsync worker picks them up automatically. - * PG17+ uses parenthesised option syntax: plugin (FAILOVER). + * Mark the slot with (FAILOVER) when the remote provider is PG17+. + * Use the regular SQL connection (conn) for version detection — replication + * protocol connections (repl_conn) return 0 from PQserverVersion(). */ appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s", slot_name, "spock_output"); -#if PG_VERSION_NUM >= 170000 - appendStringInfo(&query, " (FAILOVER)"); -#endif + if (PQserverVersion(conn) >= 170000) + appendStringInfo(&query, " (FAILOVER)"); res = PQexec(repl_conn, query.data); resetStringInfo(&query); @@ -660,9 +652,8 @@ spock_create_slot_and_read_progress(PGconn *conn, PGconn *repl_conn, appendStringInfo(&query, "CREATE_REPLICATION_SLOT \"%s\" LOGICAL %s", slot_name, "spock_output"); -#if PG_VERSION_NUM >= 170000 - appendStringInfo(&query, " (FAILOVER)"); -#endif + if (PQserverVersion(conn) >= 170000) + appendStringInfo(&query, " (FAILOVER)"); res = PQexec(repl_conn, query.data); resetStringInfo(&query); } diff --git a/tests/tap/t/018_failover_slots.pl b/tests/tap/t/018_failover_slots.pl index 09de95ca..9ce4652d 100644 --- a/tests/tap/t/018_failover_slots.pl +++ b/tests/tap/t/018_failover_slots.pl @@ -281,31 +281,35 @@ sub wait_until { } # ========================================================================== -# 12. PG15/16: verify spock_failover_slots worker ran on standby +# 12. Verify spock_failover_slots bgworker state on standby per PG version: +# PG15/16: worker must be running (sole sync mechanism) +# PG17: worker is registered and present; it yields to native slotsync +# when sync_replication_slots=on but still appears in pg_stat_activity +# PG18+: worker is not registered at all # ========================================================================== +my $bgw_count = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT count(*) FROM pg_stat_activity + WHERE application_name = 'spock_failover_slots worker'"); +$bgw_count =~ s/\s+//g; + if ($pg_major < 17) { - my $wc = qport($pg_bin, $host, $standby_port, $dbname, $db_user, - "SELECT count(*) FROM pg_stat_activity - WHERE application_name = 'spock_failover_slots worker'"); - $wc =~ s/\s+//g; - ok($wc > 0, + ok($bgw_count > 0, "PG$pg_major: spock_failover_slots worker running on standby"); +} elsif ($pg_major == 17) { + ok($bgw_count > 0, + "PG17: spock_failover_slots worker registered on standby (yields to native slotsync)"); } else { - pass("PG$pg_major: spock bgworker not expected on standby (native slotsync)"); + pass("PG$pg_major: spock bgworker not expected on standby (PG18+ native slotsync only)"); } # ========================================================================== -# 13. PG18+: verify NO spock bgworker on standby +# 13. PG18+: confirm no spock bgworker on standby # ========================================================================== if ($pg_major >= 18) { - my $wc = qport($pg_bin, $host, $standby_port, $dbname, $db_user, - "SELECT count(*) FROM pg_stat_activity - WHERE application_name = 'spock_failover_slots worker'"); - $wc =~ s/\s+//g; - is($wc, '0', + is($bgw_count, '0', "PG18+: no spock_failover_slots bgworker on standby"); } else { - pass("PG$pg_major: bgworker absence check not applicable"); + pass("PG$pg_major: bgworker absence check not applicable (< PG18)"); } # ========================================================================== From f6675dc2d898f7e0a994021b3689a160b813034a Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Wed, 8 Apr 2026 22:43:16 +0500 Subject: [PATCH 04/11] Fix InvalidTransactionId handling in failover slot XID comparisons. --- src/spock_failover_slots.c | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index 7895c255..a6d01d52 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -581,8 +581,9 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) new_slot->confirmed_lsn = receivePtr; if (new_slot->restart_lsn >= slot->data.restart_lsn && - TransactionIdFollowsOrEquals(new_slot->catalog_xmin, - MyReplicationSlot->data.catalog_xmin)) + (!TransactionIdIsValid(new_slot->catalog_xmin) || + TransactionIdFollowsOrEquals(new_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin))) { remote_slot->restart_lsn = new_slot->restart_lsn; remote_slot->confirmed_lsn = new_slot->confirmed_lsn; @@ -699,8 +700,9 @@ synchronize_one_slot(RemoteSlot *remote_slot) * with our physical replication slot on the master. */ if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || - TransactionIdPrecedes(remote_slot->catalog_xmin, - MyReplicationSlot->data.catalog_xmin)) + (TransactionIdIsValid(remote_slot->catalog_xmin) && + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin))) { elog( WARNING, @@ -790,8 +792,9 @@ synchronize_one_slot(RemoteSlot *remote_slot) * synchronized as they will always be behind the physical slot. */ if (remote_slot->restart_lsn < MyReplicationSlot->data.restart_lsn || - TransactionIdPrecedes(remote_slot->catalog_xmin, - MyReplicationSlot->data.catalog_xmin)) + (TransactionIdIsValid(remote_slot->catalog_xmin) && + TransactionIdPrecedes(remote_slot->catalog_xmin, + MyReplicationSlot->data.catalog_xmin))) { if (!wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot)) { From 27e6f0ade566403006205258a5f6fe812726b1e1 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Thu, 16 Apr 2026 01:09:28 +0500 Subject: [PATCH 05/11] Fix PG15/16 failover slot loss when primary stops before promotion When the primary is stopped before pg_ctl promote fires, wait_for_primary_slot_catchup() crashes the bgworker via an uncaught elog(ERROR) on the dead libpq connection, releasing the ephemeral slot. Wrap the query and reconnect attempts in PG_TRY/PG_CATCH so connection failures are retried rather than fatal. In synchronize_one_slot(), check RecoveryInProgress() after the wait returns false: if promoted, fall through to ReplicationSlotPersist() instead of releasing the slot. --- samples/Z0DAN/zodan.sql | 211 ++++++++++++++++-------------- src/compat/16/spock_compat.h | 26 ++++ src/spock_failover_slots.c | 108 +++++++++++++-- tests/tap/t/018_failover_slots.pl | 85 ++++++++++-- 4 files changed, 313 insertions(+), 117 deletions(-) diff --git a/samples/Z0DAN/zodan.sql b/samples/Z0DAN/zodan.sql index 79753f4c..698e8b20 100644 --- a/samples/Z0DAN/zodan.sql +++ b/samples/Z0DAN/zodan.sql @@ -356,7 +356,6 @@ DECLARE remotesql text; result RECORD; exists_count int; - remote_pgver int; BEGIN -- ============================================================================ -- Step 1: Check if replication slot already exists on remote node @@ -385,35 +384,10 @@ BEGIN -- ============================================================================ -- Step 2: Build remote SQL for replication slot creation -- ============================================================================ - -- - -- On PostgreSQL 17+, mark the slot with failover => true so the built-in - -- slotsync worker (sync_replication_slots = on) synchronizes it to physical - -- standbys automatically. On older versions, omit the failover parameter. - -- - -- Query the *remote* server version via the same dblink connection so that - -- mixed-version topologies (e.g. adding a PG17 node to a PG16 cluster) - -- use the correct call signature on the target node. - -- - BEGIN - SELECT pgver INTO remote_pgver - FROM dblink(node_dsn, - 'SELECT setting::int FROM pg_settings WHERE name = ''server_version_num''' - ) AS t(pgver int); - EXCEPTION WHEN OTHERS THEN - remote_pgver := 0; - END; - - IF remote_pgver >= 170000 THEN - remotesql := format( - 'SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, %L, false, false, true)', - slot_name, plugin - ); - ELSE - remotesql := format( - 'SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, %L)', - slot_name, plugin - ); - END IF; + remotesql := format( + 'SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, %L)', + slot_name, plugin + ); IF verb THEN RAISE NOTICE '[QUERY] %', remotesql; @@ -962,6 +936,75 @@ EXCEPTION END; $$; +CREATE OR REPLACE PROCEDURE spock.wait_for_replication_catchup_with_dblink( + src_node_name text, + new_node_name text, + new_node_dsn text, + verb boolean DEFAULT true, + max_wait_seconds integer DEFAULT 180 +) +LANGUAGE plpgsql +AS $$ +DECLARE + lag_bytes bigint; + received_lsn pg_lsn; + remote_write_lsn pg_lsn; + lag_sql text; + wait_started timestamptz := clock_timestamp(); +BEGIN + IF verb THEN + RAISE NOTICE ' Waiting for replication catchup from % to %...', src_node_name, new_node_name; + END IF; + + LOOP + lag_sql := format( + 'SELECT ' + 'MAX(remote_insert_lsn) AS remote_write_lsn, ' + 'MAX(received_lsn) AS received_lsn, ' + 'COALESCE(MAX(remote_insert_lsn - received_lsn), 0) AS lag_bytes ' + 'FROM spock.lag_tracker ' + 'WHERE origin_name = %L AND receiver_name = %L', + src_node_name, new_node_name + ); + + EXECUTE format( + 'SELECT * FROM dblink(%L, %L) AS t(remote_write_lsn pg_lsn, received_lsn pg_lsn, lag_bytes bigint)', + new_node_dsn, lag_sql + ) INTO remote_write_lsn, received_lsn, lag_bytes; + + IF verb THEN + RAISE NOTICE ' Catchup % -> %: remote_write_lsn=%, received_lsn=%, lag_bytes=%, elapsed=%', + src_node_name, + new_node_name, + COALESCE(remote_write_lsn::text, ''), + COALESCE(received_lsn::text, ''), + COALESCE(lag_bytes::text, ''), + clock_timestamp() - wait_started; + END IF; + + EXIT WHEN remote_write_lsn IS NOT NULL + AND received_lsn IS NOT NULL + AND lag_bytes <= 0; + + IF EXTRACT(EPOCH FROM (clock_timestamp() - wait_started)) >= max_wait_seconds THEN + RAISE EXCEPTION 'Replication catchup timeout for % -> % after % seconds (remote_write_lsn=%, received_lsn=%, lag_bytes=%)', + src_node_name, + new_node_name, + max_wait_seconds, + COALESCE(remote_write_lsn::text, ''), + COALESCE(received_lsn::text, ''), + COALESCE(lag_bytes::text, ''); + END IF; + + PERFORM pg_sleep(1); + END LOOP; + + IF verb THEN + RAISE NOTICE ' OK: Replication catchup complete for % -> %', src_node_name, new_node_name; + END IF; +END; +$$; + -- ============================================================================ -- @@ -1296,7 +1339,6 @@ DECLARE slot_name text; sub_name text; _commit_lsn pg_lsn; - remote_pgver int; BEGIN RAISE NOTICE 'Phase 3: Creating disabled subscriptions and slots'; @@ -1337,6 +1379,23 @@ BEGIN FOR rec IN SELECT * FROM temp_spock_nodes WHERE node_name != src_node_name AND node_name != new_node_name LOOP + -- Trigger sync event on origin node and store LSN + BEGIN + RAISE NOTICE ' - 3+ node scenario: sync event stored, skipping disabled subscriptions'; + SELECT * INTO remotesql + FROM dblink(rec.dsn, 'SELECT spock.sync_event()') AS t(sync_lsn text); + + -- Store the sync LSN for later use when enabling subscriptions + INSERT INTO temp_sync_lsns (origin_node, sync_lsn) + VALUES (rec.node_name, remotesql) + ON CONFLICT (origin_node) DO UPDATE SET sync_lsn = EXCLUDED.sync_lsn; + + RAISE NOTICE ' OK: %', rpad('Triggering sync event on node ' || rec.node_name || ' (LSN: ' || remotesql || ')', 120, ' '); + EXCEPTION + WHEN OTHERS THEN + RAISE EXCEPTION ' ✗ %', rpad('Triggering sync event on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' '); + END; + -- Create replication slot on the "other" node BEGIN -- Extract dbname and handle both quoted and unquoted values @@ -1357,64 +1416,24 @@ BEGIN dbname, rec.node_name, spock.gen_sub_name(rec.node_name, new_node_name)); - -- Query the remote node version so mixed-version topologies use - -- the correct pg_create_logical_replication_slot signature. - BEGIN - SELECT pgver INTO remote_pgver - FROM dblink(rec.dsn, - 'SELECT setting::int FROM pg_settings WHERE name = ''server_version_num''' - ) AS t(pgver int); - EXCEPTION WHEN OTHERS THEN - remote_pgver := 0; - END; - - IF remote_pgver >= 170000 THEN - remotesql := format('SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, ''spock_output'', false, false, true);', slot_name); - ELSE - remotesql := format('SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, ''spock_output'');', slot_name); - END IF; + remotesql := format('SELECT slot_name, lsn FROM pg_create_logical_replication_slot(%L, ''spock_output'');', slot_name); IF verb THEN RAISE NOTICE ' Remote SQL for slot creation: %', remotesql; END IF; SELECT lsn INTO _commit_lsn FROM dblink(rec.dsn, remotesql) AS t(slot_name text, lsn pg_lsn); + UPDATE temp_sync_lsns SET commit_lsn = _commit_lsn + WHERE origin_node = rec.node_name; RAISE NOTICE ' OK: %', rpad('Creating replication slot ' || slot_name || ' (LSN: ' || _commit_lsn || ')' || ' on node ' || rec.node_name, 120, ' '); EXCEPTION WHEN OTHERS THEN RAISE EXCEPTION ' ✗ %', rpad('Creating replication slot ' || slot_name || ' on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' '); END; - -- Trigger sync event on origin node AFTER slot creation. The sync - -- event LSN is guaranteed > slot LSN because it is written to WAL - -- after the slot-creation commit. We use this LSN as the wait target - -- below so that pg_replication_origin_status (updated immediately by - -- the non-transactional message) can satisfy the check. - BEGIN - SELECT * INTO remotesql - FROM dblink(rec.dsn, 'SELECT spock.sync_event()') AS t(sync_lsn text); - - -- Store sync LSN and slot LSN (commit_lsn) for later phases. - -- _commit_lsn still holds the slot creation LSN from above. - INSERT INTO temp_sync_lsns (origin_node, sync_lsn, commit_lsn) - VALUES (rec.node_name, remotesql, _commit_lsn) - ON CONFLICT (origin_node) DO UPDATE - SET sync_lsn = EXCLUDED.sync_lsn, - commit_lsn = EXCLUDED.commit_lsn; - - -- Switch _commit_lsn to the sync event LSN for the wait below - _commit_lsn := remotesql::pg_lsn; - - RAISE NOTICE ' OK: %', rpad('Triggered sync event on node ' || rec.node_name || ' (LSN: ' || remotesql || ')', 120, ' '); - EXCEPTION - WHEN OTHERS THEN - RAISE EXCEPTION ' ✗ %', rpad('Triggering sync event on node ' || rec.node_name || ' (error: ' || SQLERRM || ')', 120, ' '); - END; - - -- Wait for the source node to have received all changes from this - -- "other" node up to the sync event LSN. This ensures N1 has applied - -- enough of N2's data before the COPY snapshot, reducing the amount - -- N3 must replay directly from N2. + -- Wait for the source node to have committed all changes from this + -- "other" node up to L_slot, ensuring resume_lsn >= L_slot when Phase 5 + -- takes the snapshot (prevents data loss in the [resume_lsn, L_slot) gap). BEGIN DECLARE src_progress_lsn pg_lsn; @@ -1424,19 +1443,19 @@ BEGIN v_prev_statement_timeout text; BEGIN progress_sql := format( - 'SELECT os.remote_lsn ' - 'FROM pg_replication_origin_status os ' - 'JOIN spock.subscription s ON os.external_id = s.sub_slot_name ' - 'JOIN spock.node n ON n.node_id = s.sub_origin ' - 'WHERE s.sub_target = (SELECT node_id FROM spock.node_info()) ' + 'SELECT p.remote_commit_lsn ' + 'FROM spock.progress p ' + 'JOIN spock.node n ON n.node_id = p.remote_node_id ' + 'WHERE p.node_id = (SELECT node_id FROM spock.node_info()) ' ' AND n.node_name = %L', rec.node_name); - RAISE NOTICE ' - Waiting for source node % to receive % changes up to sync LSN %...', + RAISE NOTICE ' - Waiting for source node % to commit % changes up to slot LSN %...', src_node_name, rec.node_name, _commit_lsn; LOOP BEGIN + -- Bound each remote probe so dblink calls cannot hang forever. v_prev_statement_timeout := current_setting('statement_timeout', true); PERFORM set_config('statement_timeout', '5s', true); @@ -1462,7 +1481,7 @@ BEGIN END LOOP; RAISE NOTICE ' OK: %', rpad( - 'Source node ' || src_node_name || ' received ' || rec.node_name + 'Source node ' || src_node_name || ' committed ' || rec.node_name || ' changes up to ' || COALESCE(src_progress_lsn::text, 'unknown') || ' (needed >= ' || _commit_lsn || ')', 120, ' '); END; @@ -1494,7 +1513,7 @@ BEGIN slot_name; EXCEPTION WHEN OTHERS THEN - RAISE EXCEPTION 'Could not drop stale origin % on new node: %', + RAISE WARNING ' Could not drop stale origin % on new node: %', slot_name, SQLERRM; END; CALL spock.create_sub( @@ -1517,7 +1536,6 @@ BEGIN WHEN OTHERS THEN RAISE NOTICE ' ✗ %', rpad('Creating initial subscription ' || sub_name || ' on new node ' || new_node_name || ' (provider: ' || rec.node_name || ') (error: ' || SQLERRM || ')', 120, ' '); END; - END LOOP; IF subscription_count = 0 THEN @@ -1555,7 +1573,7 @@ BEGIN -- This ensures the subscription starts replicating from the correct sync point DECLARE sync_lsn text; - timeout_ms integer := coalesce(nullif(current_setting('spock.add_node_timeout', true), ''), '180')::integer; + timeout_ms integer := 180; -- 3 minutes temp_table_exists boolean; BEGIN -- Check if temp_sync_lsns table exists @@ -1581,7 +1599,7 @@ BEGIN sync_ok text; BEGIN SELECT * INTO sync_ok FROM dblink(new_node_dsn, - format('CALL spock.wait_for_sync_event(NULL, %L, %L::pg_lsn, %s, true)', + format('CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s, true)', src_node_name, sync_lsn, timeout_ms)) AS t(result text); IF sync_ok IS NULL OR sync_ok::boolean IS NOT TRUE THEN @@ -1641,7 +1659,7 @@ BEGIN -- This ensures the subscription starts replicating from the correct sync point DECLARE sync_lsn text; - timeout_ms integer := coalesce(nullif(current_setting('spock.add_node_timeout', true), ''), '180')::integer; + timeout_ms integer := 180; -- 3 minutes BEGIN -- Get the stored sync LSN from when subscription was created SELECT tsl.sync_lsn INTO sync_lsn @@ -1658,7 +1676,7 @@ BEGIN sync_ok text; BEGIN SELECT * INTO sync_ok FROM dblink(new_node_dsn, - format('CALL spock.wait_for_sync_event(NULL, %L, %L::pg_lsn, %s, true)', + format('CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s, true)', rec.node_name, sync_lsn, timeout_ms)) AS t(result text); IF sync_ok IS NULL OR sync_ok::boolean IS NOT TRUE THEN @@ -1832,7 +1850,7 @@ CREATE OR REPLACE PROCEDURE spock.trigger_sync_on_other_nodes_and_wait_on_source DECLARE rec RECORD; sync_lsn pg_lsn; - timeout_ms integer := coalesce(nullif(current_setting('spock.add_node_timeout', true), ''), '180')::integer; + timeout_ms integer := 180; -- 3 minutes timeout remotesql text; BEGIN RAISE NOTICE 'Phase 5: Triggering sync events on other nodes and waiting on source'; @@ -1861,7 +1879,7 @@ BEGIN -- Wait for sync event on source node BEGIN - remotesql := format('CALL spock.wait_for_sync_event(NULL, %L, %L::pg_lsn, %s, true);', + remotesql := format('CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s, true);', rec.node_name, sync_lsn, timeout_ms); IF verb THEN RAISE NOTICE ' Remote SQL for waiting sync event: %', remotesql; @@ -2156,7 +2174,7 @@ CREATE OR REPLACE PROCEDURE spock.trigger_source_sync_and_wait_on_new_node( DECLARE remotesql text; sync_lsn pg_lsn; - timeout_ms integer := coalesce(nullif(current_setting('spock.add_node_timeout', true), ''), '180')::integer; + timeout_ms integer := 180; -- 3 minutes timeout BEGIN RAISE NOTICE 'Phase 6: Triggering sync on source node and waiting on new node'; @@ -2175,7 +2193,7 @@ BEGIN -- Wait for sync event on new node BEGIN - remotesql := format('CALL spock.wait_for_sync_event(NULL, %L, %L::pg_lsn, %s, true);', src_node_name, sync_lsn, timeout_ms); + remotesql := format('CALL spock.wait_for_sync_event(true, %L, %L::pg_lsn, %s, true);', src_node_name, sync_lsn, timeout_ms); IF verb THEN RAISE NOTICE ' Remote SQL for wait_for_sync_event on new node %: %', new_node_name, remotesql; END IF; @@ -2342,8 +2360,7 @@ CREATE OR REPLACE PROCEDURE spock.add_node( verb boolean DEFAULT false, new_node_location text DEFAULT 'NY', new_node_country text DEFAULT 'USA', - new_node_info jsonb DEFAULT '{}'::jsonb, - timeout_sec integer DEFAULT 180 + new_node_info jsonb DEFAULT '{}'::jsonb ) LANGUAGE plpgsql AS @@ -2351,8 +2368,6 @@ $$ DECLARE initial_node_count integer; BEGIN - -- Store timeout for inner procedures to read - PERFORM set_config('spock.add_node_timeout', timeout_sec::text, true); -- Phase 0: Check Spock version compatibility across all nodes -- Example: Ensure all nodes are running the same Spock version before proceeding CALL spock.check_spock_version_compatibility(src_dsn, new_node_dsn, verb); diff --git a/src/compat/16/spock_compat.h b/src/compat/16/spock_compat.h index 7e3f607a..6ba459b8 100644 --- a/src/compat/16/spock_compat.h +++ b/src/compat/16/spock_compat.h @@ -101,6 +101,32 @@ #define getObjectDescription(object) getObjectDescription(object, false) +/* + * PG17+ replaced bool sync_standbys_defined with bits8 sync_standbys_status. + * Map the new bitmask-based access back to the PG16 bool field. + */ +#define sync_standbys_status sync_standbys_defined +#define SYNC_STANDBY_DEFINED 1 + +/* + * LockOrStrongerHeldByMe was added in PG17. Emulate it for PG16 by + * iterating from lockmode up through AccessExclusiveLock. + * Guard with LOCK_H_ so this is only compiled when storage/lock.h is + * already in scope (via storage/lmgr.h or similar). + */ +#ifdef LOCK_H_ +static inline bool +LockOrStrongerHeldByMe(const LOCKTAG *tag, LOCKMODE lockmode) +{ + LOCKMODE m; + + for (m = lockmode; m <= AccessExclusiveLock; m++) + if (LockHeldByMe(tag, m)) + return true; + return false; +} +#endif /* LOCK_H_ */ + #define replorigin_session_setup(node) \ replorigin_session_setup(node, 0) diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index a6d01d52..72aa399f 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -527,19 +527,19 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) /* * Append the dbname of the remote slot. We don't use a generic db like * postgres here because plugin callback below might want to invoke - * extension functions. + * extension functions. Keep connstr alive for reconnect attempts. */ make_sync_failover_slots_dsn(&connstr, remote_slot->database); conn = remote_connect(connstr.data, "spock_failover_slots"); - pfree(connstr.data); for (;;) { RemoteSlot *new_slot; int rc; - FailoverSlotFilter *filter = palloc(sizeof(FailoverSlotFilter)); + FailoverSlotFilter *filter; XLogRecPtr receivePtr; + bool query_failed; CHECK_FOR_INTERRUPTS(); @@ -555,18 +555,89 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) errmsg( "replication slot sync wait for slot %s interrupted by promotion", remote_slot->name))); - PQfinish(conn); + if (conn) + PQfinish(conn); + pfree(connstr.data); return false; } + /* + * If the connection to the primary was lost (e.g. because the primary + * was stopped as part of a controlled failover), attempt to reconnect. + * We wrap the reconnect in PG_TRY so that a failed reconnect does not + * crash the bgworker; we simply wait and try again, and the + * RecoveryInProgress() check at the top of the loop will detect + * promotion and return false cleanly. + */ + if (conn == NULL) + { + PG_TRY(); + { + conn = remote_connect(connstr.data, "spock_failover_slots"); + } + PG_CATCH(); + { + FlushErrorState(); + conn = NULL; + } + PG_END_TRY(); + + if (conn == NULL) + { + /* Still cannot reach primary; wait before retrying. */ + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wal_retrieve_retry_interval, PG_WAIT_EXTENSION); + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + ResetLatch(MyLatch); + continue; + } + } + + /* + * Query the primary for the current slot state. Wrap in PG_TRY so + * that a connection failure (primary stopped before promotion) is + * handled gracefully: close the dead connection, wait, and retry. + * The RecoveryInProgress() check above will detect promotion on the + * next iteration and return false, allowing the caller to persist the + * slot with whatever WAL position the standby has locally reserved. + */ + query_failed = false; + slots = NIL; + filter = palloc(sizeof(FailoverSlotFilter)); filter->key = FAILOVERSLOT_FILTER_NAME; filter->val = remote_slot->name; - slots = remote_get_primary_slot_info(conn, list_make1(filter)); + + PG_TRY(); + { + slots = remote_get_primary_slot_info(conn, list_make1(filter)); + } + PG_CATCH(); + { + FlushErrorState(); + PQfinish(conn); + conn = NULL; + query_failed = true; + } + PG_END_TRY(); + + if (query_failed) + { + rc = WaitLatch(MyLatch, + WL_LATCH_SET | WL_TIMEOUT | WL_POSTMASTER_DEATH, + wal_retrieve_retry_interval, PG_WAIT_EXTENSION); + if (rc & WL_POSTMASTER_DEATH) + proc_exit(1); + ResetLatch(MyLatch); + continue; + } if (!list_length(slots)) { /* Slot on provider vanished */ PQfinish(conn); + pfree(connstr.data); return false; } @@ -589,6 +660,7 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) remote_slot->confirmed_lsn = new_slot->confirmed_lsn; remote_slot->catalog_xmin = new_slot->catalog_xmin; PQfinish(conn); + pfree(connstr.data); return true; } @@ -619,7 +691,6 @@ wait_for_primary_slot_catchup(ReplicationSlot *slot, RemoteSlot *remote_slot) if (rc & WL_POSTMASTER_DEATH) proc_exit(1); - ResetLatch(MyLatch); } } @@ -798,13 +869,28 @@ synchronize_one_slot(RemoteSlot *remote_slot) { if (!wait_for_primary_slot_catchup(MyReplicationSlot, remote_slot)) { + if (RecoveryInProgress()) + { + /* + * Not a promotion — genuinely can't satisfy slot + * requirements while still in recovery. + */ + ReplicationSlotRelease(); + PopActiveSnapshot(); + CommitTransactionCommand(); + return; + } + /* - * Provider slot didn't catch up to locally reserved position + * Promotion interrupted the wait. A freshly promoted + * streaming standby has all WAL from pg_basebackup onward, + * so the slot's restart_lsn (set conservatively by + * ReplicationSlotReserveWal) is safe to use. Fall through + * to persist the slot. */ - ReplicationSlotRelease(); - PopActiveSnapshot(); - CommitTransactionCommand(); - return; + elog(LOG, "spock_failover_slots: persisting slot \"%s\" after " + "promotion interrupted WAL catchup; standby has required WAL", + remote_slot->name); } } diff --git a/tests/tap/t/018_failover_slots.pl b/tests/tap/t/018_failover_slots.pl index 9ce4652d..3f1a0256 100644 --- a/tests/tap/t/018_failover_slots.pl +++ b/tests/tap/t/018_failover_slots.pl @@ -98,10 +98,16 @@ sub wait_until { # ========================================================================== # 3. Get the logical slot created on n1 for n2 # ========================================================================== -my $slot_name = scalar_query(1, - "SELECT slot_name FROM pg_replication_slots WHERE slot_type='logical' LIMIT 1"); -$slot_name =~ s/\s+//g; -ok(length($slot_name) > 0, +# The slot is created asynchronously by the sync worker after subscription +# is enabled, so poll until it appears (up to 60s). +my $slot_name = ''; +my $slot_ready = wait_until(60, 3, sub { + $slot_name = scalar_query(1, + "SELECT slot_name FROM pg_replication_slots WHERE slot_type='logical' LIMIT 1"); + $slot_name =~ s/\s+//g; + return length($slot_name) > 0; +}); +ok($slot_ready && length($slot_name) > 0, "Logical slot created on n1: '$slot_name'"); # ========================================================================== @@ -134,6 +140,21 @@ sub wait_until { # ========================================================================== # 6. Create physical replication slot for the standby # ========================================================================== + +# Force a WAL segment switch on n1 so the logical slot's restart_lsn +# advances across a segment boundary. On PG15/16 the failover-slot bgworker +# uses wait_for_primary_slot_catchup() which requires the primary slot's +# restart_lsn to be >= the standby's local WAL reservation; without a forced +# switch the gap can be just a few bytes (within the same segment), making +# the wait too easy to interrupt by promotion before the slot is persisted. +if ($pg_major < 17) { + psql_or_bail(1, "SELECT pg_switch_wal()"); + # Wait for n2's apply worker to acknowledge the new WAL position so the + # slot's confirmed_flush_lsn (and restart_lsn) advances past the switch + # point before we take the basebackup. + sleep(5); +} + psql_or_bail(1, "SELECT pg_create_physical_replication_slot('standby_physical_slot')"); pass('Physical replication slot created on n1'); @@ -237,6 +258,31 @@ sub wait_until { ok($slot_present, "Logical slot '$slot_name' present on standby within ${wait_secs}s"); +# Emit diagnostics whenever slot sync is slow / failed. +unless ($slot_present) { + my $all_slots = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT slot_name, slot_type, active FROM pg_replication_slots"); + diag(" standby pg_replication_slots: $all_slots"); + + my $sub_enabled = scalar_query(2, + "SELECT sub_enabled FROM spock.subscription WHERE sub_name = 'sub_n2_n1'"); + $sub_enabled =~ s/\s+//g; + diag(" n2 sub_n2_n1 sub_enabled: $sub_enabled"); + + my $n1_slots = scalar_query(1, + "SELECT slot_name||':'||active::text FROM pg_replication_slots WHERE slot_type='logical'"); + $n1_slots =~ s/\s+//g; + diag(" n1 logical slots: $n1_slots"); + + if ($pg_major < 17) { + my $bgw_pid = qport($pg_bin, $host, $standby_port, $dbname, $db_user, + "SELECT pid||' state='||state FROM pg_stat_activity + WHERE application_name = 'spock_failover_slots worker'"); + $bgw_pid =~ s/\s+//g; + diag(" standby bgworker: $bgw_pid"); + } +} + # ========================================================================== # 11. PG17+: verify synced=t and failover=t on standby # ========================================================================== @@ -321,7 +367,22 @@ sub wait_until { psql_or_bail(1, "INSERT INTO failover_test VALUES (1, 'before_failover')"); -my $data_ok = wait_until(30, 3, sub { +# Check subscription state before waiting for data +{ + my $sub_state = scalar_query(2, + "SELECT sub_enabled FROM spock.subscription WHERE sub_name = 'sub_n2_n1'"); + $sub_state =~ s/\s+//g; + diag(" n2 sub_n2_n1 sub_enabled before data check: $sub_state"); + + # If disabled due to error, re-enable so the test can proceed + if ($sub_state eq 'f') { + diag(" Re-enabling disabled subscription sub_n2_n1"); + psql_or_bail(2, "SELECT spock.sub_enable('sub_n2_n1')"); + sleep(5); + } +} + +my $data_ok = wait_until(60, 3, sub { my $v = scalar_query(2, "SELECT val FROM failover_test WHERE id = 1"); $v =~ s/\s+//g; @@ -367,6 +428,15 @@ sub wait_until { # - Add a failover interface on n1's node record # - Switch subscription to use that interface # ========================================================================== + +# Disable the subscription first to ensure the apply worker has fully +# stopped before we change the interface DSN. This is especially important +# on PG16 where the worker may be in a reconnect loop after the primary +# went away; without an explicit disable the DSN change can race with the +# worker's next connection attempt. +psql_or_bail(2, "SELECT spock.sub_disable('sub_n2_n1')"); +sleep(3); + psql_or_bail(2, "SELECT spock.node_add_interface( 'n1', 'n1_promoted', @@ -376,9 +446,8 @@ sub wait_until { psql_or_bail(2, "SELECT spock.sub_alter_interface('sub_n2_n1', 'n1_promoted')"); -# spock auto-disables a subscription on fatal apply errors (including an -# abrupt primary shutdown). Re-enable it so the apply worker reconnects -# using the new interface pointing to the promoted standby. +# Re-enable so the apply worker connects using the new interface that +# points to the promoted standby. psql_or_bail(2, "SELECT spock.sub_enable('sub_n2_n1')"); # Wait for n2's apply worker to connect to the promoted standby. From 062a93cf84ff26f8974ff282f1cae111cf4ac858 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Thu, 16 Apr 2026 03:43:04 +0500 Subject: [PATCH 06/11] Update for PostgreSQL 17 --- src/compat/16/spock_compat.h | 31 +++++++++++++------------------ src/spock_repset.c | 21 +++++++++++++++++++++ 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/src/compat/16/spock_compat.h b/src/compat/16/spock_compat.h index 6ba459b8..06a2bef1 100644 --- a/src/compat/16/spock_compat.h +++ b/src/compat/16/spock_compat.h @@ -102,30 +102,25 @@ #define getObjectDescription(object) getObjectDescription(object, false) /* - * PG17+ replaced bool sync_standbys_defined with bits8 sync_standbys_status. - * Map the new bitmask-based access back to the PG16 bool field. + * Stock PG16 uses bool sync_standbys_defined; pgEdge's patched PG16 and + * PG17+ use bits8 sync_standbys_status with SYNC_STANDBY_DEFINED flags. + * Only provide the mapping when the PG headers have not already defined + * SYNC_STANDBY_DEFINED (i.e. walsender_private.h was not yet included). + * In translation units that use these symbols (spock_apply.c) walsender_private.h + * is included before spock_compat.h, so the guard fires correctly. */ +#ifndef SYNC_STANDBY_DEFINED #define sync_standbys_status sync_standbys_defined #define SYNC_STANDBY_DEFINED 1 +#endif /* !SYNC_STANDBY_DEFINED */ /* - * LockOrStrongerHeldByMe was added in PG17. Emulate it for PG16 by - * iterating from lockmode up through AccessExclusiveLock. - * Guard with LOCK_H_ so this is only compiled when storage/lock.h is - * already in scope (via storage/lmgr.h or similar). + * LockOrStrongerHeldByMe was added in PG17. pgEdge's patched PG16 builds + * may also declare it in storage/lmgr.h as an extern function. To avoid a + * "static declaration follows non-static declaration" error we do NOT define + * it here; instead spock_repset.c provides a private spk_LockOrStrongerHeldByMe + * helper and redirects calls via a file-local #define. */ -#ifdef LOCK_H_ -static inline bool -LockOrStrongerHeldByMe(const LOCKTAG *tag, LOCKMODE lockmode) -{ - LOCKMODE m; - - for (m = lockmode; m <= AccessExclusiveLock; m++) - if (LockHeldByMe(tag, m)) - return true; - return false; -} -#endif /* LOCK_H_ */ #define replorigin_session_setup(node) \ replorigin_session_setup(node, 0) diff --git a/src/spock_repset.c b/src/spock_repset.c index b57fe5d2..c3539dbf 100644 --- a/src/spock_repset.c +++ b/src/spock_repset.c @@ -50,6 +50,27 @@ #include "spock.h" #include "spock_compat.h" +/* + * LockOrStrongerHeldByMe is a PG17+ function. pgEdge's patched PG16 may + * declare it as extern in storage/lmgr.h (included above). To avoid a + * "static declaration follows non-static declaration" clash we use a + * private name and redirect calls via a file-local macro. On PG17+ the + * native function is used directly (the #if block is skipped). + */ +#if PG_VERSION_NUM < 170000 +static inline bool +spk_LockOrStrongerHeldByMe(const LOCKTAG *tag, LOCKMODE lockmode) +{ + LOCKMODE m; + + for (m = lockmode; m <= AccessExclusiveLock; m++) + if (LockHeldByMe(tag, m)) + return true; + return false; +} +#define LockOrStrongerHeldByMe spk_LockOrStrongerHeldByMe +#endif /* PG_VERSION_NUM < 170000 */ + #define CATALOG_REPSET "replication_set" #define CATALOG_REPSET_SEQ "replication_set_seq" #define CATALOG_REPSET_TABLE "replication_set_table" From b811b2bf8aa3115d41a1e86c54f978160896bb15 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Fri, 17 Apr 2026 01:24:29 +0500 Subject: [PATCH 07/11] Fix failover slot issue. --- src/spock_failover_slots.c | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index 72aa399f..702ce9e2 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -840,6 +840,37 @@ synchronize_one_slot(RemoteSlot *remote_slot) */ ReplicationSlotReserveWal(); + /* + * On PG16 and earlier standbys, ReplicationSlotReserveWal() may leave + * restart_lsn as InvalidXLogRecPtr when the standby hasn't yet decoded + * any WAL locally. If we leave it at zero the unsigned comparison + * below (remote_slot->restart_lsn < slot->data.restart_lsn) will be + * FALSE for any real remote LSN, causing us to skip + * wait_for_primary_slot_catchup and persist a slot with the remote's + * restart_lsn — which may be before the standby's WAL streaming start + * position — resulting in an immediate PANIC ("required WAL not + * available") and an infinite crash loop. + * + * Fix: seed restart_lsn with the actual WAL receive position so the + * catchup condition fires correctly whenever the remote slot requires + * WAL that precedes what the standby has streamed so far. + */ + if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) + { + XLogRecPtr rcvPtr = GetWalRcvFlushRecPtr(NULL, NULL); + + if (!XLogRecPtrIsInvalid(rcvPtr)) + { + SpinLockAcquire(&slot->mutex); + slot->data.restart_lsn = rcvPtr; + SpinLockRelease(&slot->mutex); + elog(DEBUG1, + "spock_failover_slots: seeded restart_lsn to WAL receive" + " position %X/%X for new slot \"%s\"", + LSN_FORMAT_ARGS(rcvPtr), remote_slot->name); + } + } + LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xmin_horizon = GetOldestSafeDecodingTransactionId(true); slot->effective_catalog_xmin = xmin_horizon; From d21c4a69d6fc0e3b610e6bbcaf1a898ff1246356 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Fri, 17 Apr 2026 04:00:16 +0500 Subject: [PATCH 08/11] Update pg_failover_slot. --- src/spock_failover_slots.c | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index 702ce9e2..2654d6f2 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -24,6 +24,7 @@ #include "access/genam.h" #include "access/table.h" #include "access/xact.h" +#include "access/xlog.h" #include "access/xlogrecovery.h" #include "catalog/indexing.h" #include "catalog/pg_database.h" @@ -859,6 +860,17 @@ synchronize_one_slot(RemoteSlot *remote_slot) { XLogRecPtr rcvPtr = GetWalRcvFlushRecPtr(NULL, NULL); + /* + * GetWalRcvFlushRecPtr returns InvalidXLogRecPtr when the WAL + * receiver process has not yet flushed any WAL (e.g. the + * bgworker starts before the receiver has received its first + * byte). Fall back to the recovery redo pointer, which is + * always set from the pg_basebackup checkpoint and represents + * the oldest WAL position available on this standby. + */ + if (XLogRecPtrIsInvalid(rcvPtr)) + rcvPtr = GetRedoRecPtr(); + if (!XLogRecPtrIsInvalid(rcvPtr)) { SpinLockAcquire(&slot->mutex); From 0a727f9d2689562a450403473245aa3c5cd0ea45 Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Fri, 17 Apr 2026 05:09:30 +0500 Subject: [PATCH 09/11] Fix restart_lsn floor check to use redo pointer unconditionally. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous fix only seeded restart_lsn when ReplicationSlotReserveWal() left it at zero (InvalidXLogRecPtr). On CI PG15/16, ReplicationSlotReserveWal() can return a small but non-zero LSN that is still below the standby's recovery redo pointer (the basebackup checkpoint LSN). In that case the unsigned comparison remote_slot->restart_lsn < local->restart_lsn still fails to fire, wait_for_primary_slot_catchup is skipped, and the slot is persisted with the remote's restart_lsn which the standby cannot satisfy — causing an immediate PANIC and infinite crash loop. Fix: always bump restart_lsn to at least GetRedoRecPtr() (the recovery redo pointer, guaranteed to equal the oldest WAL position available on this standby from the pg_basebackup checkpoint), not just when it is zero. This ensures the catchup condition fires correctly for any remote slot that requires WAL older than the standby's basebackup floor. Co-Authored-By: Claude Sonnet 4.6 --- src/spock_failover_slots.c | 58 ++++++++++++++++++-------------------- 1 file changed, 28 insertions(+), 30 deletions(-) diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index 2654d6f2..06d61a0b 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -842,44 +842,42 @@ synchronize_one_slot(RemoteSlot *remote_slot) ReplicationSlotReserveWal(); /* - * On PG16 and earlier standbys, ReplicationSlotReserveWal() may leave - * restart_lsn as InvalidXLogRecPtr when the standby hasn't yet decoded - * any WAL locally. If we leave it at zero the unsigned comparison - * below (remote_slot->restart_lsn < slot->data.restart_lsn) will be - * FALSE for any real remote LSN, causing us to skip - * wait_for_primary_slot_catchup and persist a slot with the remote's - * restart_lsn — which may be before the standby's WAL streaming start - * position — resulting in an immediate PANIC ("required WAL not - * available") and an infinite crash loop. + * On PG16 and earlier standbys, ReplicationSlotReserveWal() may set + * restart_lsn to InvalidXLogRecPtr (zero) or to a value that is below + * the standby's recovery redo pointer — the oldest WAL position the + * standby actually has from the pg_basebackup checkpoint. * - * Fix: seed restart_lsn with the actual WAL receive position so the - * catchup condition fires correctly whenever the remote slot requires - * WAL that precedes what the standby has streamed so far. + * When restart_lsn is zero, the unsigned comparison below + * (remote_slot->restart_lsn < slot->data.restart_lsn) is always FALSE + * for any real remote LSN, so wait_for_primary_slot_catchup is never + * called. When restart_lsn is a small but non-zero value (less than + * the redo pointer), the comparison still fails to fire for remote + * slots that require WAL older than what the standby has. In both + * cases we end up persisting a slot with the remote's restart_lsn, + * which the standby cannot satisfy, causing an immediate PANIC + * ("required WAL not available") and an infinite crash loop on the + * next startup. + * + * Fix: always ensure restart_lsn is at least as high as the recovery + * redo pointer (GetRedoRecPtr), which is set from the basebackup + * checkpoint and represents the guaranteed minimum WAL floor on this + * standby. Any remote slot that requires WAL older than that will + * then correctly trigger wait_for_primary_slot_catchup. */ - if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn)) { - XLogRecPtr rcvPtr = GetWalRcvFlushRecPtr(NULL, NULL); - - /* - * GetWalRcvFlushRecPtr returns InvalidXLogRecPtr when the WAL - * receiver process has not yet flushed any WAL (e.g. the - * bgworker starts before the receiver has received its first - * byte). Fall back to the recovery redo pointer, which is - * always set from the pg_basebackup checkpoint and represents - * the oldest WAL position available on this standby. - */ - if (XLogRecPtrIsInvalid(rcvPtr)) - rcvPtr = GetRedoRecPtr(); + XLogRecPtr old_lsn = MyReplicationSlot->data.restart_lsn; + XLogRecPtr redo_lsn = GetRedoRecPtr(); - if (!XLogRecPtrIsInvalid(rcvPtr)) + if (!XLogRecPtrIsInvalid(redo_lsn) && redo_lsn > old_lsn) { SpinLockAcquire(&slot->mutex); - slot->data.restart_lsn = rcvPtr; + slot->data.restart_lsn = redo_lsn; SpinLockRelease(&slot->mutex); elog(DEBUG1, - "spock_failover_slots: seeded restart_lsn to WAL receive" - " position %X/%X for new slot \"%s\"", - LSN_FORMAT_ARGS(rcvPtr), remote_slot->name); + "spock_failover_slots: bumped restart_lsn from %X/%X to" + " redo pointer %X/%X for new slot \"%s\"", + LSN_FORMAT_ARGS(old_lsn), + LSN_FORMAT_ARGS(redo_lsn), remote_slot->name); } } From 3cf76cc627e3325bc9b3215cb45a38ba7658ef6e Mon Sep 17 00:00:00 2001 From: Ibrar Ahmed Date: Sat, 18 Apr 2026 01:37:31 +0500 Subject: [PATCH 10/11] Guard against zero WAL flush LSN in slot synchronization. GetWalRcvFlushRecPtr() can return 0 in the brief window after the WAL receiver starts but before it has flushed its first page. Capping remote_slot->confirmed_lsn to 0 and then passing 0 to LogicalConfirmReceivedLocation triggers Assert(lsn != InvalidXLogRecPtr) on --enable-cassert builds (as used in CI), causing SIGABRT, a postmaster cluster reset, and test failures on PG15/16. Fix: read the flush position once before the early-return guards and treat an invalid (zero) flush pointer the same as missing WAL receiver feedback, returning early with a WARNING. Co-Authored-By: Claude Sonnet 4.6 --- src/spock_failover_slots.c | 126 +++++++++++++++++++++---------------- 1 file changed, 73 insertions(+), 53 deletions(-) diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index 06d61a0b..53f093b1 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -1111,64 +1111,84 @@ synchronize_failover_slots(long sleep_time) lsn = remote_slot->restart_lsn; } - if (safe_lsn == InvalidXLogRecPtr || - WalRcv->latestWalEnd == InvalidXLogRecPtr) - { - ereport( - WARNING, - (errmsg( - "cannot synchronize replication slot positions yet because feedback was not sent yet"))); - was_lsn_safe = false; - PQfinish(conn); - return Min(sleep_time, WORKER_WAIT_FEEDBACK); - } - else if (WalRcv->latestWalEnd < lsn) + /* + * We need the WAL flush position both for the latestWalEnd/lsn check + * and for capping confirmed_lsn inside the slot loop. Read it once + * here so we use a consistent value throughout. + * + * On PostgreSQL builds with --enable-cassert, LogicalConfirmReceivedLocation + * asserts that the LSN passed to it is not InvalidXLogRecPtr. If + * GetWalRcvFlushRecPtr returns 0 (which can happen in the brief window + * after the WAL receiver starts but before it has flushed its first + * page), capping confirmed_lsn to 0 and then passing 0 to + * LogicalConfirmReceivedLocation will trip that assertion and SIGABRT the + * bgworker, causing a postmaster cluster reset on every loop iteration. + * Guard by returning early when the flush position is not yet valid. + */ { - ereport( - WARNING, - (errmsg( - "requested slot synchronization point %X/%X is ahead of the standby position %X/%X, not synchronizing slots", - (uint32) (lsn >> 32), (uint32) (lsn), - (uint32) (WalRcv->latestWalEnd >> 32), - (uint32) (WalRcv->latestWalEnd)))); - was_lsn_safe = false; - PQfinish(conn); - return Min(sleep_time, WORKER_WAIT_FEEDBACK); - } + XLogRecPtr receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); - foreach(lc, slots) - { - RemoteSlot *remote_slot = lfirst(lc); - XLogRecPtr receivePtr; + if (safe_lsn == InvalidXLogRecPtr || + WalRcv->latestWalEnd == InvalidXLogRecPtr || + XLogRecPtrIsInvalid(receivePtr)) + { + ereport( + WARNING, + (errmsg( + "cannot synchronize replication slot positions yet because feedback was not sent yet"))); + was_lsn_safe = false; + PQfinish(conn); + return Min(sleep_time, WORKER_WAIT_FEEDBACK); + } + else if (WalRcv->latestWalEnd < lsn) + { + ereport( + WARNING, + (errmsg( + "requested slot synchronization point %X/%X is ahead of the standby position %X/%X, not synchronizing slots", + (uint32) (lsn >> 32), (uint32) (lsn), + (uint32) (WalRcv->latestWalEnd >> 32), + (uint32) (WalRcv->latestWalEnd)))); + was_lsn_safe = false; + PQfinish(conn); + return Min(sleep_time, WORKER_WAIT_FEEDBACK); + } - /* - * If we haven't received WAL for a remote slot's current - * confirmed_flush_lsn our local copy shouldn't reflect a confirmed - * position in the future. Cap it at the position we really received. - * - * Because the client will use a replication origin to track its - * position, in most cases it'll still fast-forward to the new - * confirmed position even if that skips over a gap of WAL we never - * received from the provider before failover. We can't detect or - * prevent that as the same fast forward is normal when we lost slot - * state in a provider crash after subscriber committed but before we - * saved the new confirmed flush lsn. The master will also fast - * forward the slot over irrelevant changes and then the subscriber - * will update its confirmed_flush_lsn in response to master standby - * status updates. - */ - receivePtr = GetWalRcvFlushRecPtr(NULL, NULL); - if (remote_slot->confirmed_lsn > receivePtr) - remote_slot->confirmed_lsn = receivePtr; + foreach(lc, slots) + { + RemoteSlot *remote_slot = lfirst(lc); - /* - * For simplicity we always move restart_lsn of all slots to the - * restart_lsn needed by the furthest-behind master slot. - */ - if (remote_slot->restart_lsn > lsn) - remote_slot->restart_lsn = lsn; + /* + * If we haven't received WAL for a remote slot's current + * confirmed_flush_lsn our local copy shouldn't reflect a + * confirmed position in the future. Cap it at the position we + * really received. + * + * Because the client will use a replication origin to track its + * position, in most cases it'll still fast-forward to the new + * confirmed position even if that skips over a gap of WAL we + * never received from the provider before failover. We can't + * detect or prevent that as the same fast forward is normal when + * we lost slot state in a provider crash after subscriber + * committed but before we saved the new confirmed flush lsn. The + * master will also fast forward the slot over irrelevant changes + * and then the subscriber will update its confirmed_flush_lsn in + * response to master standby status updates. + * + * receivePtr is guaranteed non-zero here (checked above). + */ + if (remote_slot->confirmed_lsn > receivePtr) + remote_slot->confirmed_lsn = receivePtr; - synchronize_one_slot(remote_slot); + /* + * For simplicity we always move restart_lsn of all slots to the + * restart_lsn needed by the furthest-behind master slot. + */ + if (remote_slot->restart_lsn > lsn) + remote_slot->restart_lsn = lsn; + + synchronize_one_slot(remote_slot); + } } PQfinish(conn); From d2159c83a9092c673088eb6c245001757389f375 Mon Sep 17 00:00:00 2001 From: Asif Rehman Date: Wed, 22 Apr 2026 12:50:11 +0500 Subject: [PATCH 11/11] fix: prevent spock_failover_slots bgworker crash on --enable-cassert builds Two bugs in synchronize_one_slot caused SIGABRT on assertion-enabled PostgreSQL builds, triggering a full postmaster cluster reset and an infinite standby crash loop: 1. ReplicationSlotsComputeRequiredXmin(true) asserts both ReplicationSlotControlLock (exclusive) and ProcArrayLock (exclusive) are held in that order. The create-slot path acquired only ProcArrayLock; adding ReplicationSlotControlLock LW_EXCLUSIVE before ProcArrayLock satisfies the ordering requirement. 2. LogicalConfirmReceivedLocation() asserts the LSN is not InvalidXLogRecPtr. When a primary slot has no consumer feedback yet its confirmed_flush_lsn is NULL (-> 0). Skip such slots in the sync loop and retry next cycle once real feedback has arrived. Co-Authored-By: Claude Sonnet 4.6 --- src/spock_failover_slots.c | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/src/spock_failover_slots.c b/src/spock_failover_slots.c index 53f093b1..dcce3e35 100644 --- a/src/spock_failover_slots.c +++ b/src/spock_failover_slots.c @@ -881,12 +881,19 @@ synchronize_one_slot(RemoteSlot *remote_slot) } } + /* + * ReplicationSlotsComputeRequiredXmin(true) asserts that BOTH + * ReplicationSlotControlLock (exclusive) and ProcArrayLock (exclusive) + * are held, in that order, to prevent deadlocks. + */ + LWLockAcquire(ReplicationSlotControlLock, LW_EXCLUSIVE); LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); xmin_horizon = GetOldestSafeDecodingTransactionId(true); slot->effective_catalog_xmin = xmin_horizon; slot->data.catalog_xmin = xmin_horizon; ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); + LWLockRelease(ReplicationSlotControlLock); /* * Our xmin and/or catalog_xmin may be > that required by one or more @@ -1187,6 +1194,22 @@ synchronize_failover_slots(long sleep_time) if (remote_slot->restart_lsn > lsn) remote_slot->restart_lsn = lsn; + /* + * Skip slots whose primary confirmed_flush_lsn is still + * InvalidXLogRecPtr (no consumer feedback yet). + * LogicalConfirmReceivedLocation asserts the LSN is not invalid; + * passing 0 aborts the bgworker and triggers a postmaster cluster + * reset on --enable-cassert builds. + */ + if (XLogRecPtrIsInvalid(remote_slot->confirmed_lsn)) + { + elog(DEBUG1, + "spock_failover_slots: deferring slot \"%s\":" + " no confirmed_flush_lsn on primary yet", + remote_slot->name); + continue; + } + synchronize_one_slot(remote_slot); } }