Skip to content

Commit

Permalink
Add a slot synchronization function.
Browse files Browse the repository at this point in the history
This commit introduces a new SQL function pg_sync_replication_slots()
which is used to synchronize the logical replication slots from the
primary server to the physical standby so that logical replication can be
resumed after a failover or planned switchover.

A new 'synced' flag is introduced in pg_replication_slots view, indicating
whether the slot has been synchronized from the primary server. On a
standby, synced slots cannot be dropped or consumed, and any attempt to
perform logical decoding on them will result in an error.

The logical replication slots on the primary can be synchronized to the
hot standby by using the 'failover' parameter of
pg-create-logical-replication-slot(), or by using the 'failover' option of
CREATE SUBSCRIPTION during slot creation, and then calling
pg_sync_replication_slots() on standby. For the synchronization to work,
it is mandatory to have a physical replication slot between the primary
and the standby aka 'primary_slot_name' should be configured on the
standby, and 'hot_standby_feedback' must be enabled on the standby. It is
also necessary to specify a valid 'dbname' in the 'primary_conninfo'.

If a logical slot is invalidated on the primary, then that slot on the
standby is also invalidated.

If a logical slot on the primary is valid but is invalidated on the
standby, then that slot is dropped but will be recreated on the standby in
the next pg_sync_replication_slots() call provided the slot still exists
on the primary server. It is okay to recreate such slots as long as these
are not consumable on standby (which is the case currently). This
situation may occur due to the following reasons:
- The 'max_slot_wal_keep_size' on the standby is insufficient to retain
WAL records from the restart_lsn of the slot.
- 'primary_slot_name' is temporarily reset to null and the physical slot
is removed.

The slot synchronization status on the standby can be monitored using the
'synced' column of pg_replication_slots view.

A functionality to automatically synchronize slots by a background worker
and allow logical walsenders to wait for the physical will be done in
subsequent commits.

Author: Hou Zhijie, Shveta Malik, Ajin Cherian based on an earlier version by Peter Eisentraut
Reviewed-by: Masahiko Sawada, Bertrand Drouvot, Peter Smith, Dilip Kumar, Nisha Moond, Kuroda Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
  • Loading branch information
Amit Kapila committed Feb 14, 2024
1 parent 06bd311 commit ddd5f4f
Show file tree
Hide file tree
Showing 26 changed files with 1,522 additions and 37 deletions.
3 changes: 3 additions & 0 deletions contrib/test_decoding/expected/permissions.out
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ DETAIL: Only roles with the REPLICATION attribute may use replication slots.
SELECT pg_drop_replication_slot('regression_slot');
ERROR: permission denied to use replication slots
DETAIL: Only roles with the REPLICATION attribute may use replication slots.
SELECT pg_sync_replication_slots();
ERROR: permission denied to use replication slots
DETAIL: Only roles with the REPLICATION attribute may use replication slots.
RESET ROLE;
-- replication users can drop superuser created slots
SET ROLE regress_lr_superuser;
Expand Down
2 changes: 2 additions & 0 deletions contrib/test_decoding/expected/slot.out
Original file line number Diff line number Diff line change
Expand Up @@ -425,6 +425,8 @@ SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', '
init
(1 row)

SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
ERROR: cannot enable failover for a temporary replication slot
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');
?column?
----------
Expand Down
1 change: 1 addition & 0 deletions contrib/test_decoding/sql/permissions.sql
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_d
INSERT INTO lr_test VALUES('lr_superuser_init');
SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
SELECT pg_drop_replication_slot('regression_slot');
SELECT pg_sync_replication_slots();
RESET ROLE;

-- replication users can drop superuser created slots
Expand Down
1 change: 1 addition & 0 deletions contrib/test_decoding/sql/slot.sql
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ SELECT pg_drop_replication_slot('copied_slot2_notemp');
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_slot', 'test_decoding', false, false, true);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_false_slot', 'test_decoding', false, false, false);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_default_slot', 'test_decoding', false, false);
SELECT 'init' FROM pg_create_logical_replication_slot('failover_true_temp_slot', 'test_decoding', true, false, true);
SELECT 'init' FROM pg_create_physical_replication_slot('physical_slot');

SELECT slot_name, slot_type, failover FROM pg_replication_slots;
Expand Down
9 changes: 7 additions & 2 deletions doc/src/sgml/config.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -4612,8 +4612,13 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
<varname>primary_conninfo</varname> string, or in a separate
<filename>~/.pgpass</filename> file on the standby server (use
<literal>replication</literal> as the database name).
Do not specify a database name in the
<varname>primary_conninfo</varname> string.
</para>
<para>
For replication slot synchronization (see
<xref linkend="logicaldecoding-replication-slots-synchronization"/>),
it is also necessary to specify a valid <literal>dbname</literal>
in the <varname>primary_conninfo</varname> string. This will only be
used for slot synchronization. It is ignored for streaming.
</para>
<para>
This parameter can only be set in the <filename>postgresql.conf</filename>
Expand Down
35 changes: 34 additions & 1 deletion doc/src/sgml/func.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -28075,7 +28075,7 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
</row>

<row>
<entry role="func_table_entry"><para role="func_signature">
<entry id="pg-create-logical-replication-slot" role="func_table_entry"><para role="func_signature">
<indexterm>
<primary>pg_create_logical_replication_slot</primary>
</indexterm>
Expand Down Expand Up @@ -28444,6 +28444,39 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
record is flushed along with its transaction.
</para></entry>
</row>

<row>
<entry id="pg-sync-replication-slots" role="func_table_entry"><para role="func_signature">
<indexterm>
<primary>pg_sync_replication_slots</primary>
</indexterm>
<function>pg_sync_replication_slots</function> ()
<returnvalue>void</returnvalue>
</para>
<para>
Synchronize the logical failover replication slots from the primary
server to the standby server. This function can only be executed on the
standby server. Temporary synced slots, if any, cannot be used for
logical decoding and must be dropped after promotion. See
<xref linkend="logicaldecoding-replication-slots-synchronization"/> for details.
</para>

<caution>
<para>
If, after executing the function,
<link linkend="guc-hot-standby-feedback">
<varname>hot_standby_feedback</varname></link> is disabled on
the standby or the physical slot configured in
<link linkend="guc-primary-slot-name">
<varname>primary_slot_name</varname></link> is
removed, then it is possible that the necessary rows of the
synchronized slot will be removed by the VACUUM process on the primary
server, resulting in the synchronized slot becoming invalidated.
</para>
</caution>
</entry>
</row>

</tbody>
</tgroup>
</table>
Expand Down
56 changes: 56 additions & 0 deletions doc/src/sgml/logicaldecoding.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,62 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
So if a slot is no longer required it should be dropped.
</para>
</caution>

</sect2>

<sect2 id="logicaldecoding-replication-slots-synchronization">
<title>Replication Slot Synchronization</title>
<para>
The logical replication slots on the primary can be synchronized to
the hot standby by using the <literal>failover</literal> parameter of
<link linkend="pg-create-logical-replication-slot">
<function>pg_create_logical_replication_slot</function></link>, or by
using the <link linkend="sql-createsubscription-params-with-failover">
<literal>failover</literal></link> option of
<command>CREATE SUBSCRIPTION</command> during slot creation, and then calling
<link linkend="pg-sync-replication-slots">
<function>pg_sync_replication_slots</function></link>
on the standby. For the synchronization to work, it is mandatory to
have a physical replication slot between the primary and the standby aka
<link linkend="guc-primary-slot-name"><varname>primary_slot_name</varname></link>
should be configured on the standby, and
<link linkend="guc-hot-standby-feedback"><varname>hot_standby_feedback</varname></link>
must be enabled on the standby. It is also necessary to specify a valid
<literal>dbname</literal> in the
<link linkend="guc-primary-conninfo"><varname>primary_conninfo</varname></link>.
</para>

<para>
The ability to resume logical replication after failover depends upon the
<link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>synced</structfield>
value for the synchronized slots on the standby at the time of failover.
Only persistent slots that have attained synced state as true on the standby
before failover can be used for logical replication after failover.
Temporary synced slots cannot be used for logical decoding, therefore
logical replication for those slots cannot be resumed. For example, if the
synchronized slot could not become persistent on the standby due to a
disabled subscription, then the subscription cannot be resumed after
failover even when it is enabled.
</para>

<para>
To resume logical replication after failover from the synced logical
slots, the subscription's 'conninfo' must be altered to point to the
new primary server. This is done using
<link linkend="sql-altersubscription-params-connection"><command>ALTER SUBSCRIPTION ... CONNECTION</command></link>.
It is recommended that subscriptions are first disabled before promoting
the standby and are re-enabled after altering the connection string.
</para>
<caution>
<para>
There is a chance that the old primary is up again during the promotion
and if subscriptions are not disabled, the logical subscribers may
continue to receive data from the old primary server even after promotion
until the connection string is altered. This might result in data
inconsistency issues, preventing the logical subscribers from being
able to continue replication from the new primary server.
</para>
</caution>
</sect2>

<sect2 id="logicaldecoding-explanation-output-plugins">
Expand Down
6 changes: 4 additions & 2 deletions doc/src/sgml/protocol.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -2062,7 +2062,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
If true, the slot is enabled to be synced to the standbys.
If true, the slot is enabled to be synced to the standbys
so that logical replication can be resumed after failover.
The default is false.
</para>
</listitem>
Expand Down Expand Up @@ -2162,7 +2163,8 @@ psql "dbname=postgres replication=database" -c "IDENTIFY_SYSTEM;"
<term><literal>FAILOVER [ <replaceable class="parameter">boolean</replaceable> ]</literal></term>
<listitem>
<para>
If true, the slot is enabled to be synced to the standbys.
If true, the slot is enabled to be synced to the standbys
so that logical replication can be resumed after failover.
</para>
</listitem>
</varlistentry>
Expand Down
20 changes: 18 additions & 2 deletions doc/src/sgml/system-views.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -2561,10 +2561,26 @@ SELECT * FROM pg_locks pl LEFT JOIN pg_prepared_xacts ppx
<structfield>failover</structfield> <type>bool</type>
</para>
<para>
True if this is a logical slot enabled to be synced to the standbys.
Always false for physical slots.
True if this is a logical slot enabled to be synced to the standbys
so that logical replication can be resumed from the new primary
after failover. Always false for physical slots.
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>synced</structfield> <type>bool</type>
</para>
<para>
True if this is a logical slot that was synced from a primary server.
On a hot standby, the slots with the synced column marked as true can
neither be used for logical decoding nor dropped manually. The value
of this column has no meaning on the primary server; the column value on
the primary is default false for all slots but may (if leftover from a
promoted standby) also be true.
</para></entry>
</row>

</tbody>
</tgroup>
</table>
Expand Down
3 changes: 2 additions & 1 deletion src/backend/catalog/system_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,8 @@ CREATE VIEW pg_replication_slots AS
L.safe_wal_size,
L.two_phase,
L.conflict_reason,
L.failover
L.failover,
L.synced
FROM pg_get_replication_slots() AS L
LEFT JOIN pg_database D ON (L.datoid = D.oid);

Expand Down
1 change: 1 addition & 0 deletions src/backend/replication/logical/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ OBJS = \
proto.o \
relation.o \
reorderbuffer.o \
slotsync.o \
snapbuild.o \
tablesync.o \
worker.o
Expand Down
12 changes: 12 additions & 0 deletions src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,18 @@ CreateDecodingContext(XLogRecPtr start_lsn,
errmsg("replication slot \"%s\" was not created in this database",
NameStr(slot->data.name))));

/*
* Do not allow consumption of a "synchronized" slot until the standby
* gets promoted.
*/
if (RecoveryInProgress() && slot->data.synced)
ereport(ERROR,
errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot use replication slot \"%s\" for logical decoding",
NameStr(slot->data.name)),
errdetail("This slot is being synchronized from the primary server."),
errhint("Specify another replication slot."));

/*
* Check if slot has been invalidated due to max_slot_wal_keep_size. Avoid
* "cannot get changes" wording in this errmsg because that'd be
Expand Down
1 change: 1 addition & 0 deletions src/backend/replication/logical/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ backend_sources += files(
'proto.c',
'relation.c',
'reorderbuffer.c',
'slotsync.c',
'snapbuild.c',
'tablesync.c',
'worker.c',
Expand Down
Loading

0 comments on commit ddd5f4f

Please sign in to comment.