Skip to content

Commit

Permalink
Allow logical decoding on standbys
Browse files Browse the repository at this point in the history
Unsurprisingly, this requires wal_level = logical to be set on the primary and
standby. The infrastructure added in 2666975 ensures that slots are
invalidated if the primary's wal_level is lowered.

Creating a slot on a standby waits for a xl_running_xact record to be
processed. If the primary is idle (and thus not emitting xl_running_xact
records), that can take a while.  To make that faster, this commit also
introduces the pg_log_standby_snapshot() function. By executing it on the
primary, completion of slot creation on the standby can be accelerated.

Note that logical decoding on a standby does not itself enforce that required
catalog rows are not removed. The user has to use physical replication slots +
hot_standby_feedback or other measures to prevent that. If catalog rows
required for a slot are removed, the slot is invalidated.

See 6af1793 for an overall design of logical decoding on a standby.

Bumps catversion, for the addition of the pg_log_standby_snapshot() function.

Author: "Drouvot, Bertrand" <bertranddrouvot.pg@gmail.com>
Author: Andres Freund <andres@anarazel.de> (in an older version)
Author: Amit Khandekar <amitdkhan.pg@gmail.com> (in an older version)
Reviewed-by: Andres Freund <andres@anarazel.de>
Reviewed-by: FabrÌzio de Royes Mello <fabriziomello@gmail.com>
Reviewed-by: Amit Kapila <amit.kapila16@gmail.com>
Reviewed-By: Robert Haas <robertmhaas@gmail.com>
  • Loading branch information
anarazel committed Apr 8, 2023
1 parent e101dfa commit 0fdab27
Show file tree
Hide file tree
Showing 12 changed files with 202 additions and 61 deletions.
15 changes: 15 additions & 0 deletions doc/src/sgml/func.sgml
Expand Up @@ -27074,6 +27074,21 @@ postgres=# SELECT '0/0'::pg_lsn + pd.segment_number * ps.setting::int + :offset
prepared with <xref linkend="sql-prepare-transaction"/>.
</para></entry>
</row>
<row>
<entry role="func_table_entry"><para role="func_signature">
<indexterm>
<primary>pg_log_standby_snapshot</primary>
</indexterm>
<function>pg_log_standby_snapshot</function> ()
<returnvalue>pg_lsn</returnvalue>
</para>
<para>
Take a snapshot of running transactions and write it to WAL, without
having to wait bgwriter or checkpointer to log one. This is useful for
logical decoding on standby, as logical slot creation has to wait
until such a record is replayed on the standby.
</para></entry>
</row>
</tbody>
</tgroup>
</table>
Expand Down
27 changes: 27 additions & 0 deletions doc/src/sgml/logicaldecoding.sgml
Expand Up @@ -316,6 +316,33 @@ postgres=# select * from pg_logical_slot_get_changes('regression_slot', NULL, NU
may consume changes from a slot at any given time.
</para>

<para>
A logical replication slot can also be created on a hot standby. To prevent
<command>VACUUM</command> from removing required rows from the system
catalogs, <varname>hot_standby_feedback</varname> should be set on the
standby. In spite of that, if any required rows get removed, the slot gets
invalidated. It's highly recommended to use a physical slot between the primary
and the standby. Otherwise, hot_standby_feedback will work, but only while the
connection is alive (for example a node restart would break it). Then, the
primary may delete system catalog rows that could be needed by the logical
decoding on the standby (as it does not know about the catalog_xmin on the
standby). Existing logical slots on standby also get invalidated if wal_level
on primary is reduced to less than 'logical'. This is done as soon as the
standby detects such a change in the WAL stream. It means, that for walsenders
that are lagging (if any), some WAL records up to the wal_level parameter change
on the primary won't be decoded.
</para>

<para>
Creation of a logical slot requires information about all the currently
running transactions. On the primary, this information is available
directly, but on a standby, this information has to be obtained from
primary. Thus, slot creation may need to wait for some activity to happen
on the primary. If the primary is idle, creating a logical slot on
standby may take noticeable time. This can be sped up by calling the
<function>pg_log_standby_snapshot</function> on the primary.
</para>

<caution>
<para>
Replication slots persist across crashes and know nothing about the state
Expand Down
11 changes: 11 additions & 0 deletions src/backend/access/transam/xlog.c
Expand Up @@ -4469,6 +4469,17 @@ LocalProcessControlFile(bool reset)
ReadControlFile();
}

/*
* Get the wal_level from the control file. For a standby, this value should be
* considered as its active wal_level, because it may be different from what
* was originally configured on standby.
*/
WalLevel
GetActiveWalLevelOnStandby(void)
{
return ControlFile->wal_level;
}

/*
* Initialization of shared memory for XLOG
*/
Expand Down
31 changes: 31 additions & 0 deletions src/backend/access/transam/xlogfuncs.c
Expand Up @@ -31,6 +31,7 @@
#include "storage/fd.h"
#include "storage/ipc.h"
#include "storage/smgr.h"
#include "storage/standby.h"
#include "utils/builtins.h"
#include "utils/guc.h"
#include "utils/memutils.h"
Expand Down Expand Up @@ -196,6 +197,36 @@ pg_switch_wal(PG_FUNCTION_ARGS)
PG_RETURN_LSN(switchpoint);
}

/*
* pg_log_standby_snapshot: call LogStandbySnapshot()
*
* Permission checking for this function is managed through the normal
* GRANT system.
*/
Datum
pg_log_standby_snapshot(PG_FUNCTION_ARGS)
{
XLogRecPtr recptr;

if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("recovery is in progress"),
errhint("pg_log_standby_snapshot() cannot be executed during recovery.")));

if (!XLogStandbyInfoActive())
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("pg_log_standby_snapshot() can only be used if wal_level >= replica")));

recptr = LogStandbySnapshot();

/*
* As a convenience, return the WAL location of the last inserted record
*/
PG_RETURN_LSN(recptr);
}

/*
* pg_create_restore_point: a named point for restore
*
Expand Down
2 changes: 2 additions & 0 deletions src/backend/catalog/system_functions.sql
Expand Up @@ -644,6 +644,8 @@ REVOKE EXECUTE ON FUNCTION pg_create_restore_point(text) FROM public;

REVOKE EXECUTE ON FUNCTION pg_switch_wal() FROM public;

REVOKE EXECUTE ON FUNCTION pg_log_standby_snapshot() FROM public;

REVOKE EXECUTE ON FUNCTION pg_wal_replay_pause() FROM public;

REVOKE EXECUTE ON FUNCTION pg_wal_replay_resume() FROM public;
Expand Down
30 changes: 29 additions & 1 deletion src/backend/replication/logical/decode.c
Expand Up @@ -152,11 +152,39 @@ xlog_decode(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
* can restart from there.
*/
break;
case XLOG_PARAMETER_CHANGE:
{
xl_parameter_change *xlrec =
(xl_parameter_change *) XLogRecGetData(buf->record);

/*
* If wal_level on the primary is reduced to less than
* logical, we want to prevent existing logical slots from
* being used. Existing logical slots on the standby get
* invalidated when this WAL record is replayed; and further,
* slot creation fails when wal_level is not sufficient; but
* all these operations are not synchronized, so a logical
* slot may creep in while the wal_level is being
* reduced. Hence this extra check.
*/
if (xlrec->wal_level < WAL_LEVEL_LOGICAL)
{
/*
* This can occur only on a standby, as a primary would
* not allow to restart after changing wal_level < logical
* if there is pre-existing logical slot.
*/
Assert(RecoveryInProgress());
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
}
break;
}
case XLOG_NOOP:
case XLOG_NEXTOID:
case XLOG_SWITCH:
case XLOG_BACKUP_END:
case XLOG_PARAMETER_CHANGE:
case XLOG_RESTORE_POINT:
case XLOG_FPW_CHANGE:
case XLOG_FPI_FOR_HINT:
Expand Down
36 changes: 20 additions & 16 deletions src/backend/replication/logical/logical.c
Expand Up @@ -124,23 +124,21 @@ CheckLogicalDecodingRequirements(void)
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding requires a database connection")));

/* ----
* TODO: We got to change that someday soon...
*
* There's basically three things missing to allow this:
* 1) We need to be able to correctly and quickly identify the timeline a
* LSN belongs to
* 2) We need to force hot_standby_feedback to be enabled at all times so
* the primary cannot remove rows we need.
* 3) support dropping replication slots referring to a database, in
* dbase_redo. There can't be any active ones due to HS recovery
* conflicts, so that should be relatively easy.
* ----
*/
if (RecoveryInProgress())
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("logical decoding cannot be used while in recovery")));
{
/*
* This check may have race conditions, but whenever
* XLOG_PARAMETER_CHANGE indicates that wal_level has changed, we
* verify that there are no existing logical replication slots. And to
* avoid races around creating a new slot,
* CheckLogicalDecodingRequirements() is called once before creating
* the slot, and once when logical decoding is initially starting up.
*/
if (GetActiveWalLevelOnStandby() < WAL_LEVEL_LOGICAL)
ereport(ERROR,
(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
errmsg("logical decoding on a standby requires wal_level to be at least logical on the primary")));
}
}

/*
Expand Down Expand Up @@ -342,6 +340,12 @@ CreateInitDecodingContext(const char *plugin,
LogicalDecodingContext *ctx;
MemoryContext old_context;

/*
* On a standby, this check is also required while creating the
* slot. Check the comments in the function.
*/
CheckLogicalDecodingRequirements();

/* shorter lines... */
slot = MyReplicationSlot;

Expand Down
57 changes: 30 additions & 27 deletions src/backend/replication/slot.c
Expand Up @@ -41,6 +41,7 @@

#include "access/transam.h"
#include "access/xlog_internal.h"
#include "access/xlogrecovery.h"
#include "common/file_utils.h"
#include "common/string.h"
#include "miscadmin.h"
Expand Down Expand Up @@ -1192,37 +1193,28 @@ ReplicationSlotReserveWal(void)
/*
* For logical slots log a standby snapshot and start logical decoding
* at exactly that position. That allows the slot to start up more
* quickly.
* quickly. But on a standby we cannot do WAL writes, so just use the
* replay pointer; effectively, an attempt to create a logical slot on
* standby will cause it to wait for an xl_running_xact record to be
* logged independently on the primary, so that a snapshot can be
* built using the record.
*
* That's not needed (or indeed helpful) for physical slots as they'll
* start replay at the last logged checkpoint anyway. Instead return
* the location of the last redo LSN. While that slightly increases
* the chance that we have to retry, it's where a base backup has to
* start replay at.
* None of this is needed (or indeed helpful) for physical slots as
* they'll start replay at the last logged checkpoint anyway. Instead
* return the location of the last redo LSN. While that slightly
* increases the chance that we have to retry, it's where a base
* backup has to start replay at.
*/
if (!RecoveryInProgress() && SlotIsLogical(slot))
{
XLogRecPtr flushptr;

/* start at current insert position */
if (SlotIsPhysical(slot))
restart_lsn = GetRedoRecPtr();
else if (RecoveryInProgress())
restart_lsn = GetXLogReplayRecPtr(NULL);
else
restart_lsn = GetXLogInsertRecPtr();
SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = restart_lsn;
SpinLockRelease(&slot->mutex);

/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();

/* and make sure it's fsynced to disk */
XLogFlush(flushptr);
}
else
{
restart_lsn = GetRedoRecPtr();
SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = restart_lsn;
SpinLockRelease(&slot->mutex);
}
SpinLockAcquire(&slot->mutex);
slot->data.restart_lsn = restart_lsn;
SpinLockRelease(&slot->mutex);

/* prevent WAL removal as fast as possible */
ReplicationSlotsComputeRequiredLSN();
Expand All @@ -1238,6 +1230,17 @@ ReplicationSlotReserveWal(void)
if (XLogGetLastRemovedSegno() < segno)
break;
}

if (!RecoveryInProgress() && SlotIsLogical(slot))
{
XLogRecPtr flushptr;

/* make sure we have enough information to start */
flushptr = LogStandbySnapshot();

/* and make sure it's fsynced to disk */
XLogFlush(flushptr);
}
}

/*
Expand Down
48 changes: 32 additions & 16 deletions src/backend/replication/walsender.c
Expand Up @@ -906,23 +906,34 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
int count;
WALReadError errinfo;
XLogSegNo segno;
TimeLineID currTLI = GetWALInsertionTimeLine();
TimeLineID currTLI;

/*
* Make sure we have enough WAL available before retrieving the current
* timeline. This is needed to determine am_cascading_walsender accurately
* which is needed to determine the current timeline.
*/
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);

/*
* Since logical decoding is only permitted on a primary server, we know
* that the current timeline ID can't be changing any more. If we did this
* on a standby, we'd have to worry about the values we compute here
* becoming invalid due to a promotion or timeline change.
* Since logical decoding is also permitted on a standby server, we need
* to check if the server is in recovery to decide how to get the current
* timeline ID (so that it also cover the promotion or timeline change
* cases).
*/
am_cascading_walsender = RecoveryInProgress();

if (am_cascading_walsender)
GetXLogReplayRecPtr(&currTLI);
else
currTLI = GetWALInsertionTimeLine();

XLogReadDetermineTimeline(state, targetPagePtr, reqLen, currTLI);
sendTimeLineIsHistoric = (state->currTLI != currTLI);
sendTimeLine = state->currTLI;
sendTimeLineValidUpto = state->currTLIValidUntil;
sendTimeLineNextTLI = state->nextTLI;

/* make sure we have enough WAL available */
flushptr = WalSndWaitForWal(targetPagePtr + reqLen);

/* fail if not (implies we are going to shut down) */
if (flushptr < targetPagePtr + reqLen)
return -1;
Expand All @@ -937,9 +948,9 @@ logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int req
cur_page,
targetPagePtr,
XLOG_BLCKSZ,
state->seg.ws_tli, /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new
* TLI is needed. */
currTLI, /* Pass the current TLI because only
* WalSndSegmentOpen controls whether new TLI
* is needed. */
&errinfo))
WALReadRaiseError(&errinfo);

Expand Down Expand Up @@ -3076,10 +3087,14 @@ XLogSendLogical(void)
* If first time through in this session, initialize flushPtr. Otherwise,
* we only need to update flushPtr if EndRecPtr is past it.
*/
if (flushPtr == InvalidXLogRecPtr)
flushPtr = GetFlushRecPtr(NULL);
else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
flushPtr = GetFlushRecPtr(NULL);
if (flushPtr == InvalidXLogRecPtr ||
logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
{
if (am_cascading_walsender)
flushPtr = GetStandbyFlushRecPtr(NULL);
else
flushPtr = GetFlushRecPtr(NULL);
}

/* If EndRecPtr is still past our flushPtr, it means we caught up. */
if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
Expand Down Expand Up @@ -3170,7 +3185,8 @@ GetStandbyFlushRecPtr(TimeLineID *tli)
receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
replayPtr = GetXLogReplayRecPtr(&replayTLI);

*tli = replayTLI;
if (tli)
*tli = replayTLI;

result = replayPtr;
if (receiveTLI == replayTLI && receivePtr > replayPtr)
Expand Down
1 change: 1 addition & 0 deletions src/include/access/xlog.h
Expand Up @@ -230,6 +230,7 @@ extern void XLOGShmemInit(void);
extern void BootStrapXLOG(void);
extern void InitializeWalConsistencyChecking(void);
extern void LocalProcessControlFile(bool reset);
extern WalLevel GetActiveWalLevelOnStandby(void);
extern void StartupXLOG(void);
extern void ShutdownXLOG(int code, Datum arg);
extern void CreateCheckPoint(int flags);
Expand Down

0 comments on commit 0fdab27

Please sign in to comment.