Skip to content

Commit

Permalink
Add a failover option to subscriptions.
Browse files Browse the repository at this point in the history
This commit introduces a new subscription option named 'failover', which
provides users with the ability to set the failover property of the
replication slot on the publisher when creating or altering a
subscription.

This uses the replication commands introduced by commit 7329240 to
enable the failover option for a logical replication slot.

If the failover option is set to true, the associated replication slots
(i.e. the main slot and the table sync slots) in the upstream database are
enabled to be synchronized to the standbys. Note that the capability to
sync the replication slots will be added in subsequent commits.

Thanks to Masahiko Sawada for the design inputs.

Author: Shveta Malik, Hou Zhijie, Ajin Cherian
Reviewed-by: Peter Smith, Bertrand Drouvot, Dilip Kumar, Masahiko Sawada, Nisha Moond, Kuroda Hayato, Amit Kapila
Discussion: https://postgr.es/m/514f6f2f-6833-4539-39f1-96cd1e011f23@enterprisedb.com
  • Loading branch information
Amit Kapila committed Jan 30, 2024
1 parent b527ebc commit 776621a
Show file tree
Hide file tree
Showing 21 changed files with 412 additions and 108 deletions.
11 changes: 11 additions & 0 deletions doc/src/sgml/catalogs.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -8000,6 +8000,17 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>subfailover</structfield> <type>bool</type>
</para>
<para>
If true, the associated replication slots (i.e. the main slot and the
table sync slots) in the upstream database are enabled to be
synchronized to the standbys
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>subconninfo</structfield> <type>text</type>
Expand Down
25 changes: 23 additions & 2 deletions doc/src/sgml/ref/alter_subscription.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -226,10 +226,31 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
<link linkend="sql-createsubscription-params-with-streaming"><literal>streaming</literal></link>,
<link linkend="sql-createsubscription-params-with-disable-on-error"><literal>disable_on_error</literal></link>,
<link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
<link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>, and
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>.
<link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
<link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>, and
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>.
Only a superuser can set <literal>password_required = false</literal>.
</para>

<para>
When altering the
<link linkend="sql-createsubscription-params-with-slot-name"><literal>slot_name</literal></link>,
the <literal>failover</literal> property value of the named slot may differ from the
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
parameter specified in the subscription. When creating the slot,
ensure the slot <literal>failover</literal> property matches the
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
parameter value of the subscription. Otherwise, the slot on the
publisher may behave differently from what subscription's
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
option says. The slot on the publisher could either be
synced to the standbys even when the subscription's
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
option is disabled or could be disabled for sync
even when the subscription's
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
option is enabled.
</para>
</listitem>
</varlistentry>

Expand Down
23 changes: 19 additions & 4 deletions doc/src/sgml/ref/create_subscription.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -117,19 +117,22 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
command should connect to the publisher at all. The default
is <literal>true</literal>. Setting this to
<literal>false</literal> will force the values of
<literal>create_slot</literal>, <literal>enabled</literal> and
<literal>copy_data</literal> to <literal>false</literal>.
<literal>create_slot</literal>, <literal>enabled</literal>,
<literal>copy_data</literal>, and <literal>failover</literal>
to <literal>false</literal>.
(You cannot combine setting <literal>connect</literal>
to <literal>false</literal> with
setting <literal>create_slot</literal>, <literal>enabled</literal>,
or <literal>copy_data</literal> to <literal>true</literal>.)
<literal>copy_data</literal>, or <literal>failover</literal> to
<literal>true</literal>.)
</para>

<para>
Since no connection is made when this option is
<literal>false</literal>, no tables are subscribed. To initiate
replication, you must manually create the replication slot, enable
the subscription, and refresh the subscription. See
the failover if required, enable the subscription, and refresh the
subscription. See
<xref linkend="logical-replication-subscription-examples-deferred-slot"/>
for examples.
</para>
Expand Down Expand Up @@ -400,6 +403,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl
</para>
</listitem>
</varlistentry>

<varlistentry id="sql-createsubscription-params-with-failover">
<term><literal>failover</literal> (<type>boolean</type>)</term>
<listitem>
<para>
Specifies whether the replication slots associated with the subscription
are enabled to be synced to the standbys so that logical
replication can be resumed from the new primary after failover.
The default is <literal>false</literal>.
</para>
</listitem>
</varlistentry>
</variablelist></para>

</listitem>
Expand Down
8 changes: 7 additions & 1 deletion doc/src/sgml/ref/pg_dump.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -1588,7 +1588,13 @@ CREATE DATABASE foo WITH TEMPLATE template0;
dump can be restored without requiring network access to the remote
servers. It is then up to the user to reactivate the subscriptions in a
suitable way. If the involved hosts have changed, the connection
information might have to be changed. It might also be appropriate to
information might have to be changed. If the subscription needs to
be enabled for
<link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
then same needs to be done by executing
<link linkend="sql-altersubscription-params-set">
<literal>ALTER SUBSCRIPTION ... SET (failover = true)</literal></link>
after the slot has been created. It might also be appropriate to
truncate the target tables before initiating a new full table copy. If users
intend to copy initial data during refresh they must create the slot with
<literal>two_phase = false</literal>. After the initial sync, the
Expand Down
1 change: 1 addition & 0 deletions src/backend/catalog/pg_subscription.c
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->disableonerr = subform->subdisableonerr;
sub->passwordrequired = subform->subpasswordrequired;
sub->runasowner = subform->subrunasowner;
sub->failover = subform->subfailover;

/* Get conninfo */
datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
Expand Down
2 changes: 1 addition & 1 deletion src/backend/catalog/system_views.sql
Original file line number Diff line number Diff line change
Expand Up @@ -1358,7 +1358,7 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
subbinary, substream, subtwophasestate, subdisableonerr,
subpasswordrequired, subrunasowner,
subpasswordrequired, subrunasowner, subfailover,
subslotname, subsynccommit, subpublications, suborigin)
ON pg_subscription TO public;

Expand Down
116 changes: 111 additions & 5 deletions src/backend/commands/subscriptioncmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@
#define SUBOPT_DISABLE_ON_ERR 0x00000400
#define SUBOPT_PASSWORD_REQUIRED 0x00000800
#define SUBOPT_RUN_AS_OWNER 0x00001000
#define SUBOPT_LSN 0x00002000
#define SUBOPT_ORIGIN 0x00004000
#define SUBOPT_FAILOVER 0x00002000
#define SUBOPT_LSN 0x00004000
#define SUBOPT_ORIGIN 0x00008000


/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
Expand All @@ -95,6 +97,7 @@ typedef struct SubOpts
bool disableonerr;
bool passwordrequired;
bool runasowner;
bool failover;
char *origin;
XLogRecPtr lsn;
} SubOpts;
Expand Down Expand Up @@ -155,6 +158,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->passwordrequired = true;
if (IsSet(supported_opts, SUBOPT_RUN_AS_OWNER))
opts->runasowner = false;
if (IsSet(supported_opts, SUBOPT_FAILOVER))
opts->failover = false;
if (IsSet(supported_opts, SUBOPT_ORIGIN))
opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);

Expand Down Expand Up @@ -303,6 +308,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_RUN_AS_OWNER;
opts->runasowner = defGetBoolean(defel);
}
else if (IsSet(supported_opts, SUBOPT_FAILOVER) &&
strcmp(defel->defname, "failover") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_FAILOVER))
errorConflictingDefElem(defel, pstate);

opts->specified_opts |= SUBOPT_FAILOVER;
opts->failover = defGetBoolean(defel);
}
else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
strcmp(defel->defname, "origin") == 0)
{
Expand Down Expand Up @@ -388,6 +402,13 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
errmsg("%s and %s are mutually exclusive options",
"connect = false", "copy_data = true")));

if (opts->failover &&
IsSet(opts->specified_opts, SUBOPT_FAILOVER))
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
errmsg("%s and %s are mutually exclusive options",
"connect = false", "failover = true")));

/* Change the defaults of other options. */
opts->enabled = false;
opts->create_slot = false;
Expand Down Expand Up @@ -591,7 +612,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);

/*
Expand Down Expand Up @@ -697,6 +718,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
Expand Down Expand Up @@ -807,7 +829,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
twophase_enabled = true;

walrcv_create_slot(wrconn, opts.slot_name, false, twophase_enabled,
false, CRS_NOEXPORT_SNAPSHOT, NULL);
opts.failover, CRS_NOEXPORT_SNAPSHOT, NULL);

if (twophase_enabled)
UpdateTwoPhaseState(subid, LOGICALREP_TWOPHASE_STATE_ENABLED);
Expand All @@ -816,6 +838,24 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
(errmsg("created replication slot \"%s\" on publisher",
opts.slot_name)));
}

/*
* If the slot_name is specified without the create_slot option,
* it is possible that the user intends to use an existing slot on
* the publisher, so here we alter the failover property of the
* slot to match the failover value in subscription.
*
* We do not need to change the failover to false if the server
* does not support failover (e.g. pre-PG17).
*/
else if (opts.slot_name &&
(opts.failover || walrcv_server_version(wrconn) >= 170000))
{
walrcv_alter_slot(wrconn, opts.slot_name, opts.failover);
ereport(NOTICE,
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
opts.slot_name, opts.failover ? "true" : "false")));
}
}
PG_FINALLY();
{
Expand Down Expand Up @@ -1132,7 +1172,8 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR |
SUBOPT_PASSWORD_REQUIRED |
SUBOPT_RUN_AS_OWNER | SUBOPT_ORIGIN);
SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
SUBOPT_ORIGIN);

parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
Expand Down Expand Up @@ -1211,6 +1252,31 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_subrunasowner - 1] = true;
}

if (IsSet(opts.specified_opts, SUBOPT_FAILOVER))
{
if (!sub->slotname)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot set %s for a subscription that does not have a slot name",
"failover")));

/*
* Do not allow changing the failover state if the
* subscription is enabled. This is because the failover
* state of the slot on the publisher cannot be modified
* if the slot is currently acquired by the apply worker.
*/
if (sub->enabled)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("cannot set %s for enabled subscription",
"failover")));

values[Anum_pg_subscription_subfailover - 1] =
BoolGetDatum(opts.failover);
replaces[Anum_pg_subscription_subfailover - 1] = true;
}

if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
{
values[Anum_pg_subscription_suborigin - 1] =
Expand Down Expand Up @@ -1453,6 +1519,46 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
heap_freetuple(tup);
}

/*
* Try to acquire the connection necessary for altering slot.
*
* This has to be at the end because otherwise if there is an error while
* doing the database operations we won't be able to rollback altered
* slot.
*/
if (replaces[Anum_pg_subscription_subfailover - 1])
{
bool must_use_password;
char *err;
WalReceiverConn *wrconn;

/* Load the library providing us libpq calls. */
load_file("libpqwalreceiver", false);

/* Try to connect to the publisher. */
must_use_password = sub->passwordrequired && !sub->ownersuperuser;
wrconn = walrcv_connect(sub->conninfo, true, must_use_password,
sub->name, &err);
if (!wrconn)
ereport(ERROR,
(errcode(ERRCODE_CONNECTION_FAILURE),
errmsg("could not connect to the publisher: %s", err)));

PG_TRY();
{
walrcv_alter_slot(wrconn, sub->slotname, opts.failover);

ereport(NOTICE,
(errmsg("changed the failover state of replication slot \"%s\" on publisher to %s",
sub->slotname, opts.failover ? "true" : "false")));
}
PG_FINALLY();
{
walrcv_disconnect(wrconn);
}
PG_END_TRY();
}

table_close(rel, RowExclusiveLock);

ObjectAddressSet(myself, SubscriptionRelationId, subid);
Expand Down
2 changes: 1 addition & 1 deletion src/backend/replication/logical/tablesync.c
Original file line number Diff line number Diff line change
Expand Up @@ -1430,7 +1430,7 @@ LogicalRepSyncTableStart(XLogRecPtr *origin_startpos)
*/
walrcv_create_slot(LogRepWorkerWalRcvConn,
slotname, false /* permanent */ , false /* two_phase */ ,
false,
MySubscription->failover,
CRS_USE_SNAPSHOT, origin_startpos);

/*
Expand Down
7 changes: 7 additions & 0 deletions src/backend/replication/logical/worker.c
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,13 @@
* avoid such deadlocks, we generate a unique GID (consisting of the
* subscription oid and the xid of the prepared transaction) for each prepare
* transaction on the subscriber.
*
* FAILOVER
* ----------------------
* The logical slot on the primary can be synced to the standby by specifying
* failover = true when creating the subscription. Enabling failover allows us
* to smoothly transition to the promoted standby, ensuring that we can
* subscribe to the new primary without losing any data.
*-------------------------------------------------------------------------
*/

Expand Down

0 comments on commit 776621a

Please sign in to comment.