Skip to content

Commit 1f7e9ba

Browse files
author
Amit Kapila
committed
Post-commit review fixes for 228c370.
This commit fixes three issues: 1) When a disabled subscription is created with retain_dead_tuples set to true, the launcher is not woken up immediately, which may lead to delays in creating the conflict detection slot. Creating the conflict detection slot is essential even when the subscription is not enabled. This ensures that dead tuples are retained, which is necessary for accurately identifying the type of conflict during replication. 2) Conflict-related data was unnecessarily retained when the subscription does not have a table. 3) Conflict-relevant data could be prematurely removed before applying prepared transactions on the publisher that are in the commit critical section. This issue occurred because the backend executing COMMIT PREPARED was not accounted for during the computation of oldestXid in the commit phase on the publisher. As a result, the subscriber could advance the conflict slot's xmin without waiting for such COMMIT PREPARED transactions to complete. We fixed this issue by identifying prepared transactions that are in the commit critical section during computation of oldestXid in commit phase. Author: Zhijie Hou <houzj.fnst@fujitsu.com> Reviewed-by: shveta malik <shveta.malik@gmail.com> Reviewed-by: Dilip Kumar <dilipbalaut@gmail.com> Reviewed-by: Nisha Moond <nisha.moond412@gmail.com> Reviewed-by: Amit Kapila <amit.kapila16@gmail.com> Discussion: https://postgr.es/m/OS9PR01MB16913DACB64E5721872AA5C02943BA@OS9PR01MB16913.jpnprd01.prod.outlook.com Discussion: https://postgr.es/m/OS9PR01MB16913F67856B0DA2A909788129400A@OS9PR01MB16913.jpnprd01.prod.outlook.com
1 parent 43eb2c5 commit 1f7e9ba

File tree

8 files changed

+157
-5
lines changed

8 files changed

+157
-5
lines changed

src/backend/access/transam/twophase.c

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2809,3 +2809,58 @@ LookupGXactBySubid(Oid subid)
28092809

28102810
return found;
28112811
}
2812+
2813+
/*
2814+
* TwoPhaseGetXidByLockingProc
2815+
* Return the oldest transaction ID from prepared transactions that are
2816+
* currently in the commit critical section.
2817+
*
2818+
* This function only considers transactions in the currently connected
2819+
* database. If no matching transactions are found, it returns
2820+
* InvalidTransactionId.
2821+
*/
2822+
TransactionId
2823+
TwoPhaseGetOldestXidInCommit(void)
2824+
{
2825+
TransactionId oldestRunningXid = InvalidTransactionId;
2826+
2827+
LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
2828+
2829+
for (int i = 0; i < TwoPhaseState->numPrepXacts; i++)
2830+
{
2831+
GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
2832+
PGPROC *commitproc;
2833+
TransactionId xid;
2834+
2835+
if (!gxact->valid)
2836+
continue;
2837+
2838+
if (gxact->locking_backend == INVALID_PROC_NUMBER)
2839+
continue;
2840+
2841+
/*
2842+
* Get the backend that is handling the transaction. It's safe to
2843+
* access this backend while holding TwoPhaseStateLock, as the backend
2844+
* can only be destroyed after either removing or unlocking the
2845+
* current global transaction, both of which require an exclusive
2846+
* TwoPhaseStateLock.
2847+
*/
2848+
commitproc = GetPGProcByNumber(gxact->locking_backend);
2849+
2850+
if (MyDatabaseId != commitproc->databaseId)
2851+
continue;
2852+
2853+
if ((commitproc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
2854+
continue;
2855+
2856+
xid = XidFromFullTransactionId(gxact->fxid);
2857+
2858+
if (!TransactionIdIsValid(oldestRunningXid) ||
2859+
TransactionIdPrecedes(xid, oldestRunningXid))
2860+
oldestRunningXid = xid;
2861+
}
2862+
2863+
LWLockRelease(TwoPhaseStateLock);
2864+
2865+
return oldestRunningXid;
2866+
}

src/backend/commands/subscriptioncmds.c

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -854,7 +854,17 @@ CreateSubscription(ParseState *pstate, CreateSubscriptionStmt *stmt,
854854

855855
pgstat_create_subscription(subid);
856856

857-
if (opts.enabled)
857+
/*
858+
* Notify the launcher to start the apply worker if the subscription is
859+
* enabled, or to create the conflict detection slot if retain_dead_tuples
860+
* is enabled.
861+
*
862+
* Creating the conflict detection slot is essential even when the
863+
* subscription is not enabled. This ensures that dead tuples are
864+
* retained, which is necessary for accurately identifying the type of
865+
* conflict during replication.
866+
*/
867+
if (opts.enabled || opts.retaindeadtuples)
858868
ApplyLauncherWakeupAtCommit();
859869

860870
ObjectAddressSet(myself, SubscriptionRelationId, subid);

src/backend/replication/logical/tablesync.c

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1788,6 +1788,32 @@ AllTablesyncsReady(void)
17881788
return has_subrels && (table_states_not_ready == NIL);
17891789
}
17901790

1791+
/*
1792+
* Return whether the subscription currently has any relations.
1793+
*
1794+
* Note: Unlike HasSubscriptionRelations(), this function relies on cached
1795+
* information for subscription relations. Additionally, it should not be
1796+
* invoked outside of apply or tablesync workers, as MySubscription must be
1797+
* initialized first.
1798+
*/
1799+
bool
1800+
HasSubscriptionRelationsCached(void)
1801+
{
1802+
bool started_tx;
1803+
bool has_subrels;
1804+
1805+
/* We need up-to-date subscription tables info here */
1806+
has_subrels = FetchTableStates(&started_tx);
1807+
1808+
if (started_tx)
1809+
{
1810+
CommitTransactionCommand();
1811+
pgstat_report_stat(true);
1812+
}
1813+
1814+
return has_subrels;
1815+
}
1816+
17911817
/*
17921818
* Update the two_phase state of the specified subscription in pg_subscription.
17931819
*/

src/backend/replication/logical/worker.c

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4595,11 +4595,28 @@ wait_for_local_flush(RetainDeadTuplesData *rdt_data)
45954595
* workers is complex and not worth the effort, so we simply return if not
45964596
* all tables are in the READY state.
45974597
*
4598-
* It is safe to add new tables with initial states to the subscription
4599-
* after this check because any changes applied to these tables should
4600-
* have a WAL position greater than the rdt_data->remote_lsn.
4598+
* Advancing the transaction ID is necessary even when no tables are
4599+
* currently subscribed, to avoid retaining dead tuples unnecessarily.
4600+
* While it might seem safe to skip all phases and directly assign
4601+
* candidate_xid to oldest_nonremovable_xid during the
4602+
* RDT_GET_CANDIDATE_XID phase in such cases, this is unsafe. If users
4603+
* concurrently add tables to the subscription, the apply worker may not
4604+
* process invalidations in time. Consequently,
4605+
* HasSubscriptionRelationsCached() might miss the new tables, leading to
4606+
* premature advancement of oldest_nonremovable_xid.
4607+
*
4608+
* Performing the check during RDT_WAIT_FOR_LOCAL_FLUSH is safe, as
4609+
* invalidations are guaranteed to be processed before applying changes
4610+
* from newly added tables while waiting for the local flush to reach
4611+
* remote_lsn.
4612+
*
4613+
* Additionally, even if we check for subscription tables during
4614+
* RDT_GET_CANDIDATE_XID, they might be dropped before reaching
4615+
* RDT_WAIT_FOR_LOCAL_FLUSH. Therefore, it's still necessary to verify
4616+
* subscription tables at this stage to prevent unnecessary tuple
4617+
* retention.
46014618
*/
4602-
if (!AllTablesyncsReady())
4619+
if (HasSubscriptionRelationsCached() && !AllTablesyncsReady())
46034620
{
46044621
TimestampTz now;
46054622

src/backend/replication/walsender.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151

5252
#include "access/timeline.h"
5353
#include "access/transam.h"
54+
#include "access/twophase.h"
5455
#include "access/xact.h"
5556
#include "access/xlog_internal.h"
5657
#include "access/xlogreader.h"
@@ -2719,6 +2720,7 @@ ProcessStandbyPSRequestMessage(void)
27192720
{
27202721
XLogRecPtr lsn = InvalidXLogRecPtr;
27212722
TransactionId oldestXidInCommit;
2723+
TransactionId oldestGXidInCommit;
27222724
FullTransactionId nextFullXid;
27232725
FullTransactionId fullOldestXidInCommit;
27242726
WalSnd *walsnd = MyWalSnd;
@@ -2746,6 +2748,16 @@ ProcessStandbyPSRequestMessage(void)
27462748
* ones replicated.
27472749
*/
27482750
oldestXidInCommit = GetOldestActiveTransactionId(true, false);
2751+
oldestGXidInCommit = TwoPhaseGetOldestXidInCommit();
2752+
2753+
/*
2754+
* Update the oldest xid for standby transmission if an older prepared
2755+
* transaction exists and is currently in commit phase.
2756+
*/
2757+
if (TransactionIdIsValid(oldestGXidInCommit) &&
2758+
TransactionIdPrecedes(oldestGXidInCommit, oldestXidInCommit))
2759+
oldestXidInCommit = oldestGXidInCommit;
2760+
27492761
nextFullXid = ReadNextFullTransactionId();
27502762
fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
27512763
oldestXidInCommit);

src/include/access/twophase.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,4 +68,6 @@ extern void TwoPhaseTransactionGid(Oid subid, TransactionId xid, char *gid_res,
6868
int szgid);
6969
extern bool LookupGXactBySubid(Oid subid);
7070

71+
extern TransactionId TwoPhaseGetOldestXidInCommit(void);
72+
7173
#endif /* TWOPHASE_H */

src/include/replication/worker_internal.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -272,6 +272,7 @@ extern void ReplicationOriginNameForLogicalRep(Oid suboid, Oid relid,
272272
char *originname, Size szoriginname);
273273

274274
extern bool AllTablesyncsReady(void);
275+
extern bool HasSubscriptionRelationsCached(void);
275276
extern void UpdateTwoPhaseState(Oid suboid, char new_state);
276277

277278
extern void process_syncing_tables(XLogRecPtr current_lsn);

src/test/subscription/t/035_conflicts.pl

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,35 @@
386386
.*Remote row \(2, 4\); replica identity full \(2, 2\)/,
387387
'update target row was deleted in tab');
388388

389+
###############################################################################
390+
# Check that the xmin value of the conflict detection slot can be advanced when
391+
# the subscription has no tables.
392+
###############################################################################
393+
394+
# Remove the table from the publication
395+
$node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B DROP TABLE tab");
396+
397+
$node_A->safe_psql('postgres',
398+
"ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION");
399+
400+
# Remember the next transaction ID to be assigned
401+
$next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
402+
403+
# Confirm that the xmin value is advanced to the latest nextXid. If no
404+
# transactions are running, the apply worker selects nextXid as the candidate
405+
# for the non-removable xid. See GetOldestActiveTransactionId().
406+
ok( $node_A->poll_query_until(
407+
'postgres',
408+
"SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
409+
),
410+
"the xmin value of slot 'pg_conflict_detection' is updated on Node A");
411+
412+
# Re-add the table to the publication for further tests
413+
$node_B->safe_psql('postgres', "ALTER PUBLICATION tap_pub_B ADD TABLE tab");
414+
415+
$node_A->safe_psql('postgres',
416+
"ALTER SUBSCRIPTION $subname_AB REFRESH PUBLICATION WITH (copy_data = false)");
417+
389418
###############################################################################
390419
# Check that dead tuple retention stops due to the wait time surpassing
391420
# max_retention_duration.

0 commit comments

Comments
 (0)