Skip to content

Commit

Permalink
Allow upgrades to preserve the full subscription's state.
Browse files Browse the repository at this point in the history
This feature will allow us to replicate the changes on subscriber nodes
after the upgrade.

Previously, only the subscription metadata information was preserved.
Without the list of relations and their state, it's not possible to
re-enable the subscriptions without missing some records as the list of
relations can only be refreshed after enabling the subscription (and
therefore starting the apply worker).  Even if we added a way to refresh
the subscription while enabling a publication, we still wouldn't know
which relations are new on the publication side, and therefore should be
fully synced, and which shouldn't.

To preserve the subscription relations, this patch teaches pg_dump to
restore the content of pg_subscription_rel from the old cluster by using
binary_upgrade_add_sub_rel_state SQL function. This is supported only
in binary upgrade mode.

The subscription's replication origin is needed to ensure that we don't
replicate anything twice.

To preserve the replication origins, this patch teaches pg_dump to update
the replication origin along with creating a subscription by using
binary_upgrade_replorigin_advance SQL function to restore the
underlying replication origin remote LSN. This is supported only in
binary upgrade mode.

pg_upgrade will check that all the subscription relations are in 'i'
(init) or in 'r' (ready) state and will error out if that's not the case,
logging the reason for the failure. This helps to avoid the risk of any
dangling slot or origin after the upgrade.

Author: Vignesh C, Julien Rouhaud, Shlok Kyal
Reviewed-by: Peter Smith, Masahiko Sawada, Michael Paquier, Amit Kapila, Hayato Kuroda
Discussion: https://postgr.es/m/20230217075433.u5mjly4d5cr4hcfe@jrouhaud
  • Loading branch information
Amit Kapila committed Jan 2, 2024
1 parent cea89c9 commit 9a17be1
Show file tree
Hide file tree
Showing 17 changed files with 1,032 additions and 16 deletions.
50 changes: 50 additions & 0 deletions doc/src/sgml/ref/pgupgrade.sgml
Expand Up @@ -456,6 +456,56 @@ make prefix=/usr/local/pgsql.new install

</step>

<step>
<title>Prepare for subscriber upgrades</title>

<para>
Setup the <link linkend="logical-replication-config-subscriber">
subscriber configurations</link> in the new subscriber.
<application>pg_upgrade</application> attempts to migrate subscription
dependencies which includes the subscription's table information present in
<link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>
system catalog and also the subscription's replication origin. This allows
logical replication on the new subscriber to continue from where the
old subscriber was up to. Migration of subscription dependencies is only
supported when the old cluster is version 17.0 or later. Subscription
dependencies on clusters before version 17.0 will silently be ignored.
</para>

<para>
There are some prerequisites for <application>pg_upgrade</application> to
be able to upgrade the subscriptions. If these are not met an error
will be reported.
</para>

<itemizedlist>
<listitem>
<para>
All the subscription tables in the old subscriber should be in state
<literal>i</literal> (initialize) or <literal>r</literal> (ready). This
can be verified by checking <link linkend="catalog-pg-subscription-rel">pg_subscription_rel</link>.<structfield>srsubstate</structfield>.
</para>
</listitem>
<listitem>
<para>
The replication origin entry corresponding to each of the subscriptions
should exist in the old cluster. This can be found by checking
<link linkend="catalog-pg-subscription">pg_subscription</link> and
<link linkend="catalog-pg-replication-origin">pg_replication_origin</link>
system tables.
</para>
</listitem>
<listitem>
<para>
The new cluster must have
<link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
configured to a value greater than or equal to the number of
subscriptions present in the old cluster.
</para>
</listitem>
</itemizedlist>
</step>

<step>
<title>Stop both servers</title>

Expand Down
16 changes: 14 additions & 2 deletions src/backend/catalog/pg_subscription.c
Expand Up @@ -228,10 +228,14 @@ textarray_to_stringlist(ArrayType *textarray)

/*
* Add new state record for a subscription table.
*
* If retain_lock is true, then don't release the locks taken in this function.
* We normally release the locks at the end of transaction but in binary-upgrade
* mode, we expect to release those immediately.
*/
void
AddSubscriptionRelState(Oid subid, Oid relid, char state,
XLogRecPtr sublsn)
XLogRecPtr sublsn, bool retain_lock)
{
Relation rel;
HeapTuple tup;
Expand Down Expand Up @@ -269,7 +273,15 @@ AddSubscriptionRelState(Oid subid, Oid relid, char state,
heap_freetuple(tup);

/* Cleanup. */
table_close(rel, NoLock);
if (retain_lock)
{
table_close(rel, NoLock);
}
else
{
table_close(rel, RowExclusiveLock);
UnlockSharedObject(SubscriptionRelationId, subid, 0, AccessShareLock);
}
}

/*
Expand Down
4 changes: 2 additions & 2 deletions src/backend/commands/subscriptioncmds.c
Expand Up @@ -773,7 +773,7 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
rv->schemaname, rv->relname);

AddSubscriptionRelState(subid, relid, table_state,
InvalidXLogRecPtr);
InvalidXLogRecPtr, true);
}

/*
Expand Down Expand Up @@ -943,7 +943,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
{
AddSubscriptionRelState(sub->oid, relid,
copy_data ? SUBREL_STATE_INIT : SUBREL_STATE_READY,
InvalidXLogRecPtr);
InvalidXLogRecPtr, true);
ereport(DEBUG1,
(errmsg_internal("table \"%s.%s\" added to subscription \"%s\"",
rv->schemaname, rv->relname, sub->name)));
Expand Down
106 changes: 106 additions & 0 deletions src/backend/utils/adt/pg_upgrade_support.c
Expand Up @@ -11,15 +11,24 @@

#include "postgres.h"

#include "access/relation.h"
#include "access/table.h"
#include "catalog/binary_upgrade.h"
#include "catalog/heap.h"
#include "catalog/namespace.h"
#include "catalog/pg_subscription_rel.h"
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "miscadmin.h"
#include "replication/logical.h"
#include "replication/origin.h"
#include "replication/worker_internal.h"
#include "storage/lmgr.h"
#include "utils/array.h"
#include "utils/builtins.h"
#include "utils/lsyscache.h"
#include "utils/pg_lsn.h"
#include "utils/syscache.h"


#define CHECK_IS_BINARY_UPGRADE \
Expand Down Expand Up @@ -305,3 +314,100 @@ binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)

PG_RETURN_BOOL(!found_pending_wal);
}

/*
* binary_upgrade_add_sub_rel_state
*
* Add the relation with the specified relation state to pg_subscription_rel
* catalog.
*/
Datum
binary_upgrade_add_sub_rel_state(PG_FUNCTION_ARGS)
{
Relation subrel;
Relation rel;
Oid subid;
char *subname;
Oid relid;
char relstate;
XLogRecPtr sublsn;

CHECK_IS_BINARY_UPGRADE;

/* We must check these things before dereferencing the arguments */
if (PG_ARGISNULL(0) || PG_ARGISNULL(1) || PG_ARGISNULL(2))
elog(ERROR, "null argument to binary_upgrade_add_sub_rel_state is not allowed");

subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
relid = PG_GETARG_OID(1);
relstate = PG_GETARG_CHAR(2);
sublsn = PG_ARGISNULL(3) ? InvalidXLogRecPtr : PG_GETARG_LSN(3);

subrel = table_open(SubscriptionRelationId, RowExclusiveLock);
subid = get_subscription_oid(subname, false);
rel = relation_open(relid, AccessShareLock);

/*
* Since there are no concurrent ALTER/DROP SUBSCRIPTION commands during
* the upgrade process, and the apply worker (which builds cache based on
* the subscription catalog) is not running, the locks can be released
* immediately.
*/
AddSubscriptionRelState(subid, relid, relstate, sublsn, false);
relation_close(rel, AccessShareLock);
table_close(subrel, RowExclusiveLock);

PG_RETURN_VOID();
}

/*
* binary_upgrade_replorigin_advance
*
* Update the remote_lsn for the subscriber's replication origin.
*/
Datum
binary_upgrade_replorigin_advance(PG_FUNCTION_ARGS)
{
Relation rel;
Oid subid;
char *subname;
char originname[NAMEDATALEN];
RepOriginId node;
XLogRecPtr remote_commit;

CHECK_IS_BINARY_UPGRADE;

/*
* We must ensure a non-NULL subscription name before dereferencing the
* arguments.
*/
if (PG_ARGISNULL(0))
elog(ERROR, "null argument to binary_upgrade_replorigin_advance is not allowed");

subname = text_to_cstring(PG_GETARG_TEXT_PP(0));
remote_commit = PG_ARGISNULL(1) ? InvalidXLogRecPtr : PG_GETARG_LSN(1);

rel = table_open(SubscriptionRelationId, RowExclusiveLock);
subid = get_subscription_oid(subname, false);

ReplicationOriginNameForLogicalRep(subid, InvalidOid, originname, sizeof(originname));

/* Lock to prevent the replication origin from vanishing */
LockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
node = replorigin_by_name(originname, false);

/*
* The server will be stopped after setting up the objects in the new
* cluster and the origins will be flushed during the shutdown checkpoint.
* This will ensure that the latest LSN values for origin will be
* available after the upgrade.
*/
replorigin_advance(node, remote_commit, InvalidXLogRecPtr,
false /* backward */ ,
false /* WAL log */ );

UnlockRelationOid(ReplicationOriginRelationId, RowExclusiveLock);
table_close(rel, RowExclusiveLock);

PG_RETURN_VOID();
}
22 changes: 22 additions & 0 deletions src/bin/pg_dump/common.c
Expand Up @@ -24,6 +24,7 @@
#include "catalog/pg_operator_d.h"
#include "catalog/pg_proc_d.h"
#include "catalog/pg_publication_d.h"
#include "catalog/pg_subscription_d.h"
#include "catalog/pg_type_d.h"
#include "common/hashfn.h"
#include "fe_utils/string_utils.h"
Expand Down Expand Up @@ -265,6 +266,9 @@ getSchemaData(Archive *fout, int *numTablesPtr)
pg_log_info("reading subscriptions");
getSubscriptions(fout);

pg_log_info("reading subscription membership of tables");
getSubscriptionTables(fout);

free(inhinfo); /* not needed any longer */

*numTablesPtr = numTables;
Expand Down Expand Up @@ -978,6 +982,24 @@ findPublicationByOid(Oid oid)
return (PublicationInfo *) dobj;
}

/*
* findSubscriptionByOid
* finds the DumpableObject for the subscription with the given oid
* returns NULL if not found
*/
SubscriptionInfo *
findSubscriptionByOid(Oid oid)
{
CatalogId catId;
DumpableObject *dobj;

catId.tableoid = SubscriptionRelationId;
catId.oid = oid;
dobj = findObjectByCatalogId(catId);
Assert(dobj == NULL || dobj->objType == DO_SUBSCRIPTION);
return (SubscriptionInfo *) dobj;
}


/*
* recordExtensionMembership
Expand Down

0 comments on commit 9a17be1

Please sign in to comment.