From 5da64613875d387f6d3f60a383ec20189b84ff54 Mon Sep 17 00:00:00 2001 From: Andres Freund Date: Sun, 23 Apr 2017 20:41:29 -0700 Subject: [PATCH] Preserve required !catalog tuples while computing initial decoding snapshot. The logical decoding machinery already preserved all the required catalog tuples, which is sufficient in the course of normal logical decoding, but did not guarantee that non-catalog tuples were preserved during computation of the initial snapshot when creating a slot over the replication protocol. This could cause a corrupted initial snapshot being exported. The time window for issues is usually not terribly large, but on a busy server it's perfectly possible to it hit it. Ongoing decoding is not affected by this bug. To avoid increased overhead for the SQL API, only retain additional tuples when a logical slot is being created over the replication protocol. To do so this commit changes the signature of CreateInitDecodingContext(), but it seems unlikely that it's being used in an extension, so that's probably ok. In a drive-by fix, fix handling of ReplicationSlotsComputeRequiredXmin's already_locked argument, which should only apply to ProcArrayLock, not ReplicationSlotControlLock. Reported-By: Erik Rijkers Analyzed-By: Petr Jelinek Author: Petr Jelinek, heavily editorialized by Andres Freund Reviewed-By: Andres Freund Discussion: https://postgr.es/m/9a897b86-46e1-9915-ee4c-da02e4ff6a95@2ndquadrant.com Backport: 9.4, where logical decoding was introduced. --- src/backend/replication/logical/logical.c | 25 ++++++++++++++------- src/backend/replication/logical/snapbuild.c | 12 ++++++++++ src/backend/replication/slot.c | 25 +++++++++++++++++---- src/backend/replication/slotfuncs.c | 4 ++-- src/backend/replication/walsender.c | 1 + src/backend/storage/ipc/procarray.c | 14 +++++++++--- src/include/replication/logical.h | 1 + src/include/storage/procarray.h | 2 +- 8 files changed, 66 insertions(+), 18 deletions(-) diff --git a/src/backend/replication/logical/logical.c b/src/backend/replication/logical/logical.c index 2e72b2928f0f9..e798914ea3744 100644 --- a/src/backend/replication/logical/logical.c +++ b/src/backend/replication/logical/logical.c @@ -206,6 +206,7 @@ StartupDecodingContext(List *output_plugin_options, LogicalDecodingContext * CreateInitDecodingContext(char *plugin, List *output_plugin_options, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write) @@ -310,23 +311,31 @@ CreateInitDecodingContext(char *plugin, * the slot machinery about the new limit. Once that's done the * ProcArrayLock can be released as the slot machinery now is * protecting against vacuum. + * + * Note that, temporarily, the data, not just the catalog, xmin has to be + * reserved if a data snapshot is to be exported. Otherwise the initial + * data snapshot created here is not guaranteed to be valid. After that + * the data xmin doesn't need to be managed anymore and the global xmin + * should be recomputed. As we are fine with losing the pegged data xmin + * after crash - no chance a snapshot would get exported anymore - we can + * get away with just setting the slot's + * effective_xmin. ReplicationSlotRelease will reset it again. + * * ---- */ LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE); - slot->effective_catalog_xmin = GetOldestSafeDecodingTransactionId(); - slot->data.catalog_xmin = slot->effective_catalog_xmin; + xmin_horizon = GetOldestSafeDecodingTransactionId(need_full_snapshot); + + slot->effective_catalog_xmin = xmin_horizon; + slot->data.catalog_xmin = xmin_horizon; + if (need_full_snapshot) + slot->effective_xmin = xmin_horizon; ReplicationSlotsComputeRequiredXmin(true); LWLockRelease(ProcArrayLock); - /* - * tell the snapshot builder to only assemble snapshot once reaching the a - * running_xact's record with the respective xmin. - */ - xmin_horizon = slot->data.catalog_xmin; - ReplicationSlotMarkDirty(); ReplicationSlotSave(); diff --git a/src/backend/replication/logical/snapbuild.c b/src/backend/replication/logical/snapbuild.c index 5ec8b4b60e708..9bffa53345e26 100644 --- a/src/backend/replication/logical/snapbuild.c +++ b/src/backend/replication/logical/snapbuild.c @@ -554,6 +554,18 @@ SnapBuildExportSnapshot(SnapBuild *builder) * mechanism. Due to that we can do this without locks, we're only * changing our own value. */ +#ifdef USE_ASSERT_CHECKING + { + TransactionId safeXid; + + LWLockAcquire(ProcArrayLock, LW_SHARED); + safeXid = GetOldestSafeDecodingTransactionId(true); + LWLockRelease(ProcArrayLock); + + Assert(TransactionIdPrecedesOrEquals(safeXid, snap->xmin)); + } +#endif + MyPgXact->xmin = snap->xmin; /* allocate in transaction context */ diff --git a/src/backend/replication/slot.c b/src/backend/replication/slot.c index b8761e4f88fbe..f108979e6f1be 100644 --- a/src/backend/replication/slot.c +++ b/src/backend/replication/slot.c @@ -394,6 +394,22 @@ ReplicationSlotRelease(void) SpinLockRelease(&slot->mutex); } + + /* + * If slot needed to temporarily restrain both data and catalog xmin to + * create the catalog snapshot, remove that temporary constraint. + * Snapshots can only be exported while the initial snapshot is still + * acquired. + */ + if (!TransactionIdIsValid(slot->data.xmin) && + TransactionIdIsValid(slot->effective_xmin)) + { + SpinLockAcquire(&slot->mutex); + slot->effective_xmin = InvalidTransactionId; + SpinLockRelease(&slot->mutex); + ReplicationSlotsComputeRequiredXmin(false); + } + MyReplicationSlot = NULL; /* might not have been set when we've been a plain slot */ @@ -578,6 +594,9 @@ ReplicationSlotPersist(void) /* * Compute the oldest xmin across all slots and store it in the ProcArray. + * + * If already_locked is true, ProcArrayLock has already been acquired + * exclusively. */ void ReplicationSlotsComputeRequiredXmin(bool already_locked) @@ -588,8 +607,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) Assert(ReplicationSlotCtl != NULL); - if (!already_locked) - LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); + LWLockAcquire(ReplicationSlotControlLock, LW_SHARED); for (i = 0; i < max_replication_slots; i++) { @@ -622,8 +640,7 @@ ReplicationSlotsComputeRequiredXmin(bool already_locked) agg_catalog_xmin = effective_catalog_xmin; } - if (!already_locked) - LWLockRelease(ReplicationSlotControlLock); + LWLockRelease(ReplicationSlotControlLock); ProcArraySetReplicationSlotXmin(agg_xmin, agg_catalog_xmin, already_locked); } diff --git a/src/backend/replication/slotfuncs.c b/src/backend/replication/slotfuncs.c index 83d06b2361afa..676e306cc94a2 100644 --- a/src/backend/replication/slotfuncs.c +++ b/src/backend/replication/slotfuncs.c @@ -109,8 +109,8 @@ pg_create_logical_replication_slot(PG_FUNCTION_ARGS) /* * Create logical decoding context, to build the initial snapshot. */ - ctx = CreateInitDecodingContext( - NameStr(*plugin), NIL, + ctx = CreateInitDecodingContext(NameStr(*plugin), NIL, + false, /* do not build snapshot */ logical_read_local_xlog_page, NULL, NULL); /* build initial snapshot, might take a while */ diff --git a/src/backend/replication/walsender.c b/src/backend/replication/walsender.c index 2dcd4fb3ae3b2..96ad33ea831f8 100644 --- a/src/backend/replication/walsender.c +++ b/src/backend/replication/walsender.c @@ -810,6 +810,7 @@ CreateReplicationSlot(CreateReplicationSlotCmd *cmd) LogicalDecodingContext *ctx; ctx = CreateInitDecodingContext(cmd->plugin, NIL, + true, /* build snapshot */ logical_read_xlog_page, WalSndPrepareWrite, WalSndWriteData); diff --git a/src/backend/storage/ipc/procarray.c b/src/backend/storage/ipc/procarray.c index bde2a60c023d8..0d55f6d1098c5 100644 --- a/src/backend/storage/ipc/procarray.c +++ b/src/backend/storage/ipc/procarray.c @@ -1936,7 +1936,7 @@ GetOldestActiveTransactionId(void) * that the caller will immediately use the xid to peg the xmin horizon. */ TransactionId -GetOldestSafeDecodingTransactionId(void) +GetOldestSafeDecodingTransactionId(bool catalogOnly) { ProcArrayStruct *arrayP = procArray; TransactionId oldestSafeXid; @@ -1959,9 +1959,17 @@ GetOldestSafeDecodingTransactionId(void) /* * If there's already a slot pegging the xmin horizon, we can start with * that value, it's guaranteed to be safe since it's computed by this - * routine initally and has been enforced since. + * routine initially and has been enforced since. We can always use the + * slot's general xmin horizon, but the catalog horizon is only usable + * when we only catalog data is going to be looked at. */ - if (TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && + if (TransactionIdIsValid(procArray->replication_slot_xmin) && + TransactionIdPrecedes(procArray->replication_slot_xmin, + oldestSafeXid)) + oldestSafeXid = procArray->replication_slot_xmin; + + if (catalogOnly && + TransactionIdIsValid(procArray->replication_slot_catalog_xmin) && TransactionIdPrecedes(procArray->replication_slot_catalog_xmin, oldestSafeXid)) oldestSafeXid = procArray->replication_slot_catalog_xmin; diff --git a/src/include/replication/logical.h b/src/include/replication/logical.h index 26be127bf7bca..02add55207635 100644 --- a/src/include/replication/logical.h +++ b/src/include/replication/logical.h @@ -79,6 +79,7 @@ extern void CheckLogicalDecodingRequirements(void); extern LogicalDecodingContext *CreateInitDecodingContext(char *plugin, List *output_plugin_options, + bool need_full_snapshot, XLogPageReadCB read_page, LogicalOutputPluginWriterPrepareWrite prepare_write, LogicalOutputPluginWriterWrite do_write); diff --git a/src/include/storage/procarray.h b/src/include/storage/procarray.h index 0c4611bda2683..b24b732dc3396 100644 --- a/src/include/storage/procarray.h +++ b/src/include/storage/procarray.h @@ -53,7 +53,7 @@ extern bool TransactionIdIsInProgress(TransactionId xid); extern bool TransactionIdIsActive(TransactionId xid); extern TransactionId GetOldestXmin(Relation rel, bool ignoreVacuum); extern TransactionId GetOldestActiveTransactionId(void); -extern TransactionId GetOldestSafeDecodingTransactionId(void); +extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly); extern VirtualTransactionId *GetVirtualXIDsDelayingChkpt(int *nvxids); extern bool HaveVirtualXIDsDelayingChkpt(VirtualTransactionId *vxids, int nvxids);