Skip to content

Commit

Permalink
Migrate logical slots to the new node during an upgrade.
Browse files Browse the repository at this point in the history
While reading information from the old cluster, a list of logical
slots is fetched. At the later part of upgrading, pg_upgrade revisits the
list and restores slots by executing pg_create_logical_replication_slot()
on the new cluster. Migration of logical replication slots is only
supported when the old cluster is version 17.0 or later.

If the old node has invalid slots or slots with unconsumed WAL records,
the pg_upgrade fails. These checks are needed to prevent data loss.

The significant advantage of this commit is that it makes it easy to
continue logical replication even after upgrading the publisher node.
Previously, pg_upgrade allowed copying publications to a new node. With
this patch, adjusting the connection string to the new publisher will
cause the apply worker on the subscriber to connect to the new publisher
automatically. This enables seamless continuation of logical replication,
even after an upgrade.

Author: Hayato Kuroda, Hou Zhijie
Reviewed-by: Peter Smith, Bharath Rupireddy, Dilip Kumar, Vignesh C, Shlok Kyal
Discussion: http://postgr.es/m/TYAPR01MB58664C81887B3AF2EB6B16E3F5939@TYAPR01MB5866.jpnprd01.prod.outlook.com
Discussion: http://postgr.es/m/CAA4eK1+t7xYcfa0rEQw839=b2MzsfvYDPz3xbD+ZqOdP3zpKYg@mail.gmail.com
  • Loading branch information
Amit Kapila committed Oct 26, 2023
1 parent bddc2f7 commit 29d0a77
Show file tree
Hide file tree
Showing 18 changed files with 927 additions and 27 deletions.
78 changes: 76 additions & 2 deletions doc/src/sgml/ref/pgupgrade.sgml
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,79 @@ make prefix=/usr/local/pgsql.new install
</para>
</step>

<step>
<title>Prepare for publisher upgrades</title>

<para>
<application>pg_upgrade</application> attempts to migrate logical
slots. This helps avoid the need for manually defining the same
logical slots on the new publisher. Migration of logical slots is
only supported when the old cluster is version 17.0 or later.
Logical slots on clusters before version 17.0 will silently be
ignored.
</para>

<para>
Before you start upgrading the publisher cluster, ensure that the
subscription is temporarily disabled, by executing
<link linkend="sql-altersubscription"><command>ALTER SUBSCRIPTION ... DISABLE</command></link>.
Re-enable the subscription after the upgrade.
</para>

<para>
There are some prerequisites for <application>pg_upgrade</application> to
be able to upgrade the logical slots. If these are not met an error
will be reported.
</para>

<itemizedlist>
<listitem>
<para>
The new cluster must have
<link linkend="guc-wal-level"><varname>wal_level</varname></link> as
<literal>logical</literal>.
</para>
</listitem>
<listitem>
<para>
The new cluster must have
<link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
configured to a value greater than or equal to the number of slots
present in the old cluster.
</para>
</listitem>
<listitem>
<para>
The output plugins referenced by the slots on the old cluster must be
installed in the new PostgreSQL executable directory.
</para>
</listitem>
<listitem>
<para>
The old cluster has replicated all the transactions and logical decoding
messages to subscribers.
</para>
</listitem>
<listitem>
<para>
All slots on the old cluster must be usable, i.e., there are no slots
whose
<link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>conflicting</structfield>
is <literal>true</literal>.
</para>
</listitem>
<listitem>
<para>
The new cluster must not have permanent logical slots, i.e.,
there must be no slots where
<link linkend="view-pg-replication-slots">pg_replication_slots</link>.<structfield>temporary</structfield>
is <literal>false</literal>.
</para>
</listitem>
</itemizedlist>

</step>

<step>
<title>Stop both servers</title>

Expand Down Expand Up @@ -650,8 +723,9 @@ rsync --archive --delete --hard-links --size-only --no-inc-recursive /vol1/pg_tb
Configure the servers for log shipping. (You do not need to run
<function>pg_backup_start()</function> and <function>pg_backup_stop()</function>
or take a file system backup as the standbys are still synchronized
with the primary.) Replication slots are not copied and must
be recreated.
with the primary.) Only logical slots on the primary are copied to the
new standby, but other slots on the old standby are not copied so must
be recreated manually.
</para>
</step>

Expand Down
48 changes: 39 additions & 9 deletions src/backend/replication/logical/decode.c
Original file line number Diff line number Diff line change
Expand Up @@ -600,12 +600,8 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)

ReorderBufferProcessXid(ctx->reorder, XLogRecGetXid(r), buf->origptr);

/*
* If we don't have snapshot or we are just fast-forwarding, there is no
* point in decoding messages.
*/
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT ||
ctx->fast_forward)
/* If we don't have snapshot, there is no point in decoding messages */
if (SnapBuildCurrentState(builder) < SNAPBUILD_FULL_SNAPSHOT)
return;

message = (xl_logical_message *) XLogRecGetData(r);
Expand All @@ -622,6 +618,26 @@ logicalmsg_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
SnapBuildXactNeedsSkip(builder, buf->origptr)))
return;

/*
* We also skip decoding in fast_forward mode. This check must be last
* because we don't want to set the processing_required flag unless we
* have a decodable message.
*/
if (ctx->fast_forward)
{
/*
* We need to set processing_required flag to notify the message's
* existence to the caller. Usually, the flag is set when either the
* COMMIT or ABORT records are decoded, but this must be turned on
* here because the non-transactional logical message is decoded
* without waiting for these records.
*/
if (!message->transactional)
ctx->processing_required = true;

return;
}

/*
* If this is a non-transactional change, get the snapshot we're expected
* to use. We only get here when the snapshot is consistent, and the
Expand Down Expand Up @@ -1286,7 +1302,21 @@ static bool
DecodeTXNNeedSkip(LogicalDecodingContext *ctx, XLogRecordBuffer *buf,
Oid txn_dbid, RepOriginId origin_id)
{
return (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
ctx->fast_forward || FilterByOrigin(ctx, origin_id));
if (SnapBuildXactNeedsSkip(ctx->snapshot_builder, buf->origptr) ||
(txn_dbid != InvalidOid && txn_dbid != ctx->slot->data.database) ||
FilterByOrigin(ctx, origin_id))
return true;

/*
* We also skip decoding in fast_forward mode. In passing set the
* processing_required flag to indicate that if it were not for
* fast_forward mode, processing would have been required.
*/
if (ctx->fast_forward)
{
ctx->processing_required = true;
return true;
}

return false;
}
75 changes: 75 additions & 0 deletions src/backend/replication/logical/logical.c
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#include "postgres.h"

#include "access/xact.h"
#include "access/xlogutils.h"
#include "access/xlog_internal.h"
#include "fmgr.h"
#include "miscadmin.h"
Expand All @@ -41,6 +42,7 @@
#include "storage/proc.h"
#include "storage/procarray.h"
#include "utils/builtins.h"
#include "utils/inval.h"
#include "utils/memutils.h"

/* data for errcontext callback */
Expand Down Expand Up @@ -1949,3 +1951,76 @@ UpdateDecodingStats(LogicalDecodingContext *ctx)
rb->totalTxns = 0;
rb->totalBytes = 0;
}

/*
* Read up to the end of WAL starting from the decoding slot's restart_lsn.
* Return true if any meaningful/decodable WAL records are encountered,
* otherwise false.
*/
bool
LogicalReplicationSlotHasPendingWal(XLogRecPtr end_of_wal)
{
bool has_pending_wal = false;

Assert(MyReplicationSlot);

PG_TRY();
{
LogicalDecodingContext *ctx;

/*
* Create our decoding context in fast_forward mode, passing start_lsn
* as InvalidXLogRecPtr, so that we start processing from the slot's
* confirmed_flush.
*/
ctx = CreateDecodingContext(InvalidXLogRecPtr,
NIL,
true, /* fast_forward */
XL_ROUTINE(.page_read = read_local_xlog_page,
.segment_open = wal_segment_open,
.segment_close = wal_segment_close),
NULL, NULL, NULL);

/*
* Start reading at the slot's restart_lsn, which we know points to a
* valid record.
*/
XLogBeginRead(ctx->reader, MyReplicationSlot->data.restart_lsn);

/* Invalidate non-timetravel entries */
InvalidateSystemCaches();

/* Loop until the end of WAL or some changes are processed */
while (!has_pending_wal && ctx->reader->EndRecPtr < end_of_wal)
{
XLogRecord *record;
char *errm = NULL;

record = XLogReadRecord(ctx->reader, &errm);

if (errm)
elog(ERROR, "could not find record for logical decoding: %s", errm);

if (record != NULL)
LogicalDecodingProcessRecord(ctx, ctx->reader);

has_pending_wal = ctx->processing_required;

CHECK_FOR_INTERRUPTS();
}

/* Clean up */
FreeDecodingContext(ctx);
InvalidateSystemCaches();
}
PG_CATCH();
{
/* clear all timetravel entries */
InvalidateSystemCaches();

PG_RE_THROW();
}
PG_END_TRY();

return has_pending_wal;
}
14 changes: 14 additions & 0 deletions src/backend/replication/slot.c
Original file line number Diff line number Diff line change
Expand Up @@ -1423,6 +1423,20 @@ InvalidatePossiblyObsoleteSlot(ReplicationSlotInvalidationCause cause,

SpinLockRelease(&s->mutex);

/*
* The logical replication slots shouldn't be invalidated as
* max_slot_wal_keep_size GUC is set to -1 during the upgrade.
*
* The following is just a sanity check.
*/
if (*invalidated && SlotIsLogical(s) && IsBinaryUpgrade)
{
ereport(ERROR,
errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("replication slots must not be invalidated during the upgrade"),
errhint("\"max_slot_wal_keep_size\" must be set to -1 during the upgrade"));
}

if (active_pid != 0)
{
/*
Expand Down
44 changes: 44 additions & 0 deletions src/backend/utils/adt/pg_upgrade_support.c
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "catalog/pg_type.h"
#include "commands/extension.h"
#include "miscadmin.h"
#include "replication/logical.h"
#include "utils/array.h"
#include "utils/builtins.h"

Expand Down Expand Up @@ -261,3 +262,46 @@ binary_upgrade_set_missing_value(PG_FUNCTION_ARGS)

PG_RETURN_VOID();
}

/*
* Verify the given slot has already consumed all the WAL changes.
*
* Returns true if there are no decodable WAL records after the
* confirmed_flush_lsn. Otherwise false.
*
* This is a special purpose function to ensure that the given slot can be
* upgraded without data loss.
*/
Datum
binary_upgrade_logical_slot_has_caught_up(PG_FUNCTION_ARGS)
{
Name slot_name;
XLogRecPtr end_of_wal;
bool found_pending_wal;

CHECK_IS_BINARY_UPGRADE;

/* We must check before dereferencing the argument */
if (PG_ARGISNULL(0))
elog(ERROR, "null argument to binary_upgrade_validate_wal_records is not allowed");

CheckSlotPermissions();

slot_name = PG_GETARG_NAME(0);

/* Acquire the given slot */
ReplicationSlotAcquire(NameStr(*slot_name), true);

Assert(SlotIsLogical(MyReplicationSlot));

/* Slots must be valid as otherwise we won't be able to scan the WAL */
Assert(MyReplicationSlot->data.invalidated == RS_INVAL_NONE);

end_of_wal = GetFlushRecPtr(NULL);
found_pending_wal = LogicalReplicationSlotHasPendingWal(end_of_wal);

/* Clean up */
ReplicationSlotRelease();

PG_RETURN_BOOL(!found_pending_wal);
}
3 changes: 3 additions & 0 deletions src/bin/pg_upgrade/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
PGFILEDESC = "pg_upgrade - an in-place binary upgrade utility"
PGAPPICON = win32

# required for 003_upgrade_logical_replication_slots.pl
EXTRA_INSTALL=contrib/test_decoding

subdir = src/bin/pg_upgrade
top_builddir = ../../..
include $(top_builddir)/src/Makefile.global
Expand Down
Loading

0 comments on commit 29d0a77

Please sign in to comment.