Skip to content

Commit

Permalink
Optionally disable subscriptions on error.
Browse files Browse the repository at this point in the history
Logical replication apply workers for a subscription can easily get stuck
in an infinite loop of attempting to apply a change, triggering an error
(such as a constraint violation), exiting with the error written to the
subscription server log, and restarting.

To partially remedy the situation, this patch adds a new subscription
option named 'disable_on_error'. To be consistent with old behavior, this
option defaults to false. When true, both the tablesync worker and apply
worker catch any errors thrown and disable the subscription in order to
break the loop. The error is still also written in the logs.

Once the subscription is disabled, users can either manually resolve the
conflict/error or skip the conflicting transaction by using
pg_replication_origin_advance() function. After resolving the conflict,
users need to enable the subscription to allow apply process to proceed.

Author: Osumi Takamichi and Mark Dilger
Reviewed-by: Greg Nancarrow, Vignesh C, Amit Kapila, Wang wei, Tang Haiying, Peter Smith, Masahiko Sawada, Shi Yu
Discussion : https://postgr.es/m/DB35438F-9356-4841-89A0-412709EBD3AB%40enterprisedb.com
  • Loading branch information
Amit Kapila committed Mar 14, 2022
1 parent 369398e commit 705e20f
Show file tree
Hide file tree
Showing 17 changed files with 421 additions and 108 deletions.
10 changes: 10 additions & 0 deletions doc/src/sgml/catalogs.sgml
Expand Up @@ -7769,6 +7769,16 @@ SCRAM-SHA-256$<replaceable>&lt;iteration count&gt;</replaceable>:<replaceable>&l
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>subdisableonerr</structfield> <type>bool</type>
</para>
<para>
If true, the subscription will be disabled if one of its workers
detects an error
</para></entry>
</row>

<row>
<entry role="catalog_table_entry"><para role="column_definition">
<structfield>subconninfo</structfield> <type>text</type>
Expand Down
5 changes: 3 additions & 2 deletions doc/src/sgml/logical-replication.sgml
Expand Up @@ -364,8 +364,9 @@ CONTEXT: processing remote data for replication origin "pg_16395" during "INSER
the replication origin name can be found from the server log (LSN 0/14C0378 and
replication origin <literal>pg_16395</literal> in the above case). To skip the
transaction, the subscription needs to be disabled temporarily by
<command>ALTER SUBSCRIPTION ... DISABLE</command> first. Then, the transaction
can be skipped by calling the
<command>ALTER SUBSCRIPTION ... DISABLE</command> first or alternatively, the
subscription can be used with the <literal>disable_on_error</literal> option.
Then, the transaction can be skipped by calling the
<link linkend="pg-replication-origin-advance">
<function>pg_replication_origin_advance()</function></link> function with
the <parameter>node_name</parameter> (i.e., <literal>pg_16395</literal>) and the
Expand Down
4 changes: 2 additions & 2 deletions doc/src/sgml/ref/alter_subscription.sgml
Expand Up @@ -204,8 +204,8 @@ ALTER SUBSCRIPTION <replaceable class="parameter">name</replaceable> RENAME TO <
information. The parameters that can be altered
are <literal>slot_name</literal>,
<literal>synchronous_commit</literal>,
<literal>binary</literal>, and
<literal>streaming</literal>.
<literal>binary</literal>, <literal>streaming</literal>, and
<literal>disable_on_error</literal>.
</para>
</listitem>
</varlistentry>
Expand Down
12 changes: 12 additions & 0 deletions doc/src/sgml/ref/create_subscription.sgml
Expand Up @@ -290,6 +290,18 @@ CREATE SUBSCRIPTION <replaceable class="parameter">subscription_name</replaceabl

</listitem>
</varlistentry>

<varlistentry>
<term><literal>disable_on_error</literal> (<type>boolean</type>)</term>
<listitem>
<para>
Specifies whether the subscription should be automatically disabled
if any errors are detected by subscription workers during data
replication from the publisher. The default is
<literal>false</literal>.
</para>
</listitem>
</varlistentry>
</variablelist>
</para>

Expand Down
40 changes: 40 additions & 0 deletions src/backend/catalog/pg_subscription.c
Expand Up @@ -69,6 +69,7 @@ GetSubscription(Oid subid, bool missing_ok)
sub->binary = subform->subbinary;
sub->stream = subform->substream;
sub->twophasestate = subform->subtwophasestate;
sub->disableonerr = subform->subdisableonerr;

/* Get conninfo */
datum = SysCacheGetAttr(SUBSCRIPTIONOID,
Expand Down Expand Up @@ -156,6 +157,45 @@ FreeSubscription(Subscription *sub)
pfree(sub);
}

/*
* Disable the given subscription.
*/
void
DisableSubscription(Oid subid)
{
Relation rel;
bool nulls[Natts_pg_subscription];
bool replaces[Natts_pg_subscription];
Datum values[Natts_pg_subscription];
HeapTuple tup;

/* Look up the subscription in the catalog */
rel = table_open(SubscriptionRelationId, RowExclusiveLock);
tup = SearchSysCacheCopy1(SUBSCRIPTIONOID, ObjectIdGetDatum(subid));

if (!HeapTupleIsValid(tup))
elog(ERROR, "cache lookup failed for subscription %u", subid);

LockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);

/* Form a new tuple. */
memset(values, 0, sizeof(values));
memset(nulls, false, sizeof(nulls));
memset(replaces, false, sizeof(replaces));

/* Set the subscription to disabled. */
values[Anum_pg_subscription_subenabled - 1] = BoolGetDatum(false);
replaces[Anum_pg_subscription_subenabled - 1] = true;

/* Update the catalog */
tup = heap_modify_tuple(tup, RelationGetDescr(rel), values, nulls,
replaces);
CatalogTupleUpdate(rel, &tup->t_self, tup);
heap_freetuple(tup);

table_close(rel, NoLock);
}

/*
* get_subscription_oid - given a subscription name, look up the OID
*
Expand Down
3 changes: 2 additions & 1 deletion src/backend/catalog/system_views.sql
Expand Up @@ -1261,7 +1261,8 @@ REVOKE ALL ON pg_replication_origin_status FROM public;
-- All columns of pg_subscription except subconninfo are publicly readable.
REVOKE ALL ON pg_subscription FROM public;
GRANT SELECT (oid, subdbid, subname, subowner, subenabled, subbinary,
substream, subtwophasestate, subslotname, subsynccommit, subpublications)
substream, subtwophasestate, subdisableonerr, subslotname,
subsynccommit, subpublications)
ON pg_subscription TO public;

CREATE VIEW pg_stat_subscription_stats AS
Expand Down
27 changes: 25 additions & 2 deletions src/backend/commands/subscriptioncmds.c
Expand Up @@ -61,6 +61,7 @@
#define SUBOPT_BINARY 0x00000080
#define SUBOPT_STREAMING 0x00000100
#define SUBOPT_TWOPHASE_COMMIT 0x00000200
#define SUBOPT_DISABLE_ON_ERR 0x00000400

/* check if the 'val' has 'bits' set */
#define IsSet(val, bits) (((val) & (bits)) == (bits))
Expand All @@ -82,6 +83,7 @@ typedef struct SubOpts
bool binary;
bool streaming;
bool twophase;
bool disableonerr;
} SubOpts;

static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
Expand Down Expand Up @@ -130,6 +132,8 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->streaming = false;
if (IsSet(supported_opts, SUBOPT_TWOPHASE_COMMIT))
opts->twophase = false;
if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR))
opts->disableonerr = false;

/* Parse options */
foreach(lc, stmt_options)
Expand Down Expand Up @@ -249,6 +253,15 @@ parse_subscription_options(ParseState *pstate, List *stmt_options,
opts->specified_opts |= SUBOPT_TWOPHASE_COMMIT;
opts->twophase = defGetBoolean(defel);
}
else if (IsSet(supported_opts, SUBOPT_DISABLE_ON_ERR) &&
strcmp(defel->defname, "disable_on_error") == 0)
{
if (IsSet(opts->specified_opts, SUBOPT_DISABLE_ON_ERR))
errorConflictingDefElem(defel, pstate);

opts->specified_opts |= SUBOPT_DISABLE_ON_ERR;
opts->disableonerr = defGetBoolean(defel);
}
else
ereport(ERROR,
(errcode(ERRCODE_SYNTAX_ERROR),
Expand Down Expand Up @@ -390,7 +403,8 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
supported_opts = (SUBOPT_CONNECT | SUBOPT_ENABLED | SUBOPT_CREATE_SLOT |
SUBOPT_SLOT_NAME | SUBOPT_COPY_DATA |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT);
SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
SUBOPT_DISABLE_ON_ERR);
parse_subscription_options(pstate, stmt->options, supported_opts, &opts);

/*
Expand Down Expand Up @@ -464,6 +478,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
CharGetDatum(opts.twophase ?
LOGICALREP_TWOPHASE_STATE_PENDING :
LOGICALREP_TWOPHASE_STATE_DISABLED);
values[Anum_pg_subscription_subdisableonerr - 1] = BoolGetDatum(opts.disableonerr);
values[Anum_pg_subscription_subconninfo - 1] =
CStringGetTextDatum(conninfo);
if (opts.slot_name)
Expand Down Expand Up @@ -864,7 +879,7 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
{
supported_opts = (SUBOPT_SLOT_NAME |
SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
SUBOPT_STREAMING);
SUBOPT_STREAMING | SUBOPT_DISABLE_ON_ERR);

parse_subscription_options(pstate, stmt->options,
supported_opts, &opts);
Expand Down Expand Up @@ -913,6 +928,14 @@ AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt,
replaces[Anum_pg_subscription_substream - 1] = true;
}

if (IsSet(opts.specified_opts, SUBOPT_DISABLE_ON_ERR))
{
values[Anum_pg_subscription_subdisableonerr - 1]
= BoolGetDatum(opts.disableonerr);
replaces[Anum_pg_subscription_subdisableonerr - 1]
= true;
}

update_tuple = true;
break;
}
Expand Down

0 comments on commit 705e20f

Please sign in to comment.