Skip to content

Commit

Permalink
Keep walproposer alive until shutdown checkpoint is safe on safekepeers
Browse files Browse the repository at this point in the history
The walproposer pretends to be a walsender in many ways. It has a
WalSnd slot, it claims to be a walsender by calling
MarkPostmasterChildWalSender() etc. But one different to real
walsenders was that the postmaster still treated it as a bgworker
rather than a walsender. The difference is that at shutdown,
walsenders are not killed until the very end, after the checkpointer
process has written the shutdown checkpoint and exited.

As a result, the walproposer always got killed before the shutdown
checkpoint was written, so the shutdown checkpoint never made it to
safekeepers. That's fine in principle, we don't require a clean
shutdown after all. But it also feels a bit silly not to stream the
shutdown checkpoint. It could be useful for initializing hot standby
mode in a read replica, for example.

Change postmaster to treat background workers that have called
MarkPostmasterChildWalSender() as walsenders. That unfortunately
requires another small change in postgres core.

After doing that, walproposers stay alive longer. However, it also
means that the checkpointer will wait for the walproposer to switch to
WALSNDSTATE_STOPPING state, when the checkpointer sends the
PROCSIG_WALSND_INIT_STOPPING signal. We don't have the machinery in
walproposer to receive and handle that signal reliably. Instead, we
mark walproposer as being in WALSNDSTATE_STOPPING always.

In commit 568f914, I assumed that shutdown will wait for all the
remaining WAL to be streamed to safekeepers, but before this commit
that was not true, and the test became flaky. This should make it
stable again.

Some tests wrongly assumed that no WAL could have been written between
pg_current_wal_flush_lsn and quick pg stop after it. Fix them by introducing
flush_ep_to_pageserver which first stops the endpoint and then waits till all
committed WAL reaches the pageserver.

In passing extract safekeeper http client to its own module.
  • Loading branch information
hlinnaka authored and arssher committed Mar 11, 2024
1 parent cc5d6c6 commit 0ff31fc
Show file tree
Hide file tree
Showing 18 changed files with 460 additions and 281 deletions.
4 changes: 2 additions & 2 deletions libs/walproposer/src/api_bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,11 @@ extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
}
}

extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) {
extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).process_safekeeper_feedback(&mut (*wp), commit_lsn)
(*api).process_safekeeper_feedback(&mut (*wp))
}
}

Expand Down
2 changes: 1 addition & 1 deletion libs/walproposer/src/walproposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub trait ApiImpl {
todo!()
}

fn process_safekeeper_feedback(&self, _wp: &mut WalProposer, _commit_lsn: u64) {
fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer) {
todo!()
}

Expand Down
23 changes: 11 additions & 12 deletions pgxn/neon/walproposer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr begin
req->epochStartLsn = wp->propEpochStartLsn;
req->beginLsn = beginLsn;
req->endLsn = endLsn;
req->commitLsn = GetAcknowledgedByQuorumWALPosition(wp);
req->commitLsn = wp->commitLsn;
req->truncateLsn = wp->truncateLsn;
req->proposerId = wp->greetRequest.proposerId;
}
Expand Down Expand Up @@ -1405,7 +1405,7 @@ static bool
RecvAppendResponses(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
XLogRecPtr minQuorumLsn;
XLogRecPtr newCommitLsn;
bool readAnything = false;

while (true)
Expand Down Expand Up @@ -1444,18 +1444,19 @@ RecvAppendResponses(Safekeeper *sk)
if (!readAnything)
return sk->state == SS_ACTIVE;

HandleSafekeeperResponse(wp);

/* update commit_lsn */
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
/*
* Also send the new commit lsn to all the safekeepers.
* Send the new value to all safekeepers.
*/
minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp);
if (minQuorumLsn > wp->lastSentCommitLsn)
if (newCommitLsn > wp->commitLsn)
{
wp->commitLsn = newCommitLsn;
BroadcastAppendRequest(wp);
wp->lastSentCommitLsn = minQuorumLsn;
}

HandleSafekeeperResponse(wp);

return sk->state == SS_ACTIVE;
}

Expand Down Expand Up @@ -1632,11 +1633,9 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
static void
HandleSafekeeperResponse(WalProposer *wp)
{
XLogRecPtr minQuorumLsn;
XLogRecPtr candidateTruncateLsn;

minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp);
wp->api.process_safekeeper_feedback(wp, minQuorumLsn);
wp->api.process_safekeeper_feedback(wp);

/*
* Try to advance truncateLsn -- the last record flushed to all
Expand All @@ -1649,7 +1648,7 @@ HandleSafekeeperResponse(WalProposer *wp)
* can't commit entries from previous term' in Raft); 2)
*/
candidateTruncateLsn = CalculateMinFlushLsn(wp);
candidateTruncateLsn = Min(candidateTruncateLsn, minQuorumLsn);
candidateTruncateLsn = Min(candidateTruncateLsn, wp->commitLsn);
if (candidateTruncateLsn > wp->truncateLsn)
{
wp->truncateLsn = candidateTruncateLsn;
Expand Down
6 changes: 3 additions & 3 deletions pgxn/neon/walproposer.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ typedef struct walproposer_api
* backpressure feedback and to confirm WAL persistence (has been commited
* on the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
void (*process_safekeeper_feedback) (WalProposer *wp);

/*
* Write a log message to the internal log processor. This is used only
Expand Down Expand Up @@ -646,8 +646,8 @@ typedef struct WalProposer
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;

/* last commitLsn broadcasted to safekeepers */
XLogRecPtr lastSentCommitLsn;
/* cached GetAcknowledgedByQuorumWALPosition result */
XLogRecPtr commitLsn;

ProposerGreeting greetRequest;

Expand Down
102 changes: 88 additions & 14 deletions pgxn/neon/walproposer_pg.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ static WalproposerShmemState *walprop_shared;
static WalProposerConfig walprop_config;
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static const walproposer_api walprop_pg;
static volatile sig_atomic_t got_SIGUSR2 = false;
static bool reported_sigusr2 = false;

static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
Expand Down Expand Up @@ -101,6 +103,8 @@ static void add_nwr_event_set(Safekeeper *sk, uint32 events);
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);

static void CheckGracefulShutdown(WalProposer *wp);

static XLogRecPtr GetLogRepRestartLSN(WalProposer *wp);

static void
Expand Down Expand Up @@ -492,6 +496,24 @@ walprop_pg_init_standalone_sync_safekeepers(void)
BackgroundWorkerUnblockSignals();
}

/*
* We pretend to be a walsender process, and the lifecycle of a walsender is
* slightly different than other procesess. At shutdown, walsender processes
* stay alive until the very end, after the checkpointer has written the
* shutdown checkpoint. When the checkpointer exits, the postmaster sends all
* remaining walsender processes SIGUSR2. On receiving SIGUSR2, we try to send
* the remaining WAL, and then exit. This ensures that the checkpoint record
* reaches durable storage (in safekeepers), before the server shuts down
* completely.
*/
static void
walprop_sigusr2(SIGNAL_ARGS)
{
got_SIGUSR2 = true;

SetLatch(MyLatch);
}

static void
walprop_pg_init_bgworker(void)
{
Expand All @@ -503,6 +525,7 @@ walprop_pg_init_bgworker(void)
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
pqsignal(SIGUSR2, walprop_sigusr2);

BackgroundWorkerUnblockSignals();

Expand Down Expand Up @@ -1075,14 +1098,26 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
#endif

/*
* When we first start replication the standby will be behind the primary.
* For some applications, for example synchronous replication, it is
* important to have a clear state for this initial catchup mode, so we
* can trigger actions when we change streaming state later. We may stay
* in this state for a long time, which is exactly why we want to be able
* to monitor whether or not we are still here.
* XXX: Move straight to STOPPING state, skipping the STREAMING state.
*
* This is a bit weird. Normal walsenders stay in STREAMING state, until
* the checkpointer signals them that it is about to start writing the
* shutdown checkpoint. The walsenders acknowledge that they have received
* that signal by switching to STOPPING state. That tells the walsenders
* that they must not write any new WAL.
*
* However, we cannot easily intercept that signal from the checkpointer.
* It's sent by WalSndInitStopping(), using
* SendProcSignal(PROCSIGNAL_WALSND_INIT_STOPPING). It's received by
* HandleWalSndInitStopping, which sets a process-local got_STOPPING flag.
* However, that's all private to walsender.c.
*
* We don't need to do anything special upon receiving the signal, the
* walproposer doesn't write any WAL anyway, so we skip the STREAMING
* state and go directly to STOPPING mode. That way, the checkpointer
* won't wait for us.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
WalSndSetState(WALSNDSTATE_STOPPING);

/*
* Don't allow a request to stream from a future point in WAL that hasn't
Expand Down Expand Up @@ -1122,6 +1157,8 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
static void
WalSndLoop(WalProposer *wp)
{
XLogRecPtr flushPtr;

/* Clear any already-pending wakeups */
ResetLatch(MyLatch);

Expand All @@ -1130,9 +1167,6 @@ WalSndLoop(WalProposer *wp)
CHECK_FOR_INTERRUPTS();

XLogBroadcastWalProposer(wp);

if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll(wp);
}
}
Expand Down Expand Up @@ -1745,6 +1779,9 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
{
ConditionVariableCancelSleep();
ResetLatch(MyLatch);

CheckGracefulShutdown(wp);

*events = WL_LATCH_SET;
return 1;
}
Expand Down Expand Up @@ -1798,6 +1835,41 @@ walprop_pg_finish_sync_safekeepers(WalProposer *wp, XLogRecPtr lsn)
exit(0);
}

/*
* Like vanilla walsender, on sigusr2 send all remaining WAL and exit.
*
* Note that unlike sync-safekeepers waiting here is not reliable: we
* don't check that majority of safekeepers received and persisted
* commit_lsn -- only that walproposer reached it (which immediately
* broadcasts new value). Doing that without incurring redundant control
* file syncing would need wp -> sk protocol change. OTOH unlike
* sync-safekeepers which must bump commit_lsn or basebackup will fail,
* this catchup is important only for tests where safekeepers/network
* don't crash on their own.
*/
static void
CheckGracefulShutdown(WalProposer *wp)
{
if (got_SIGUSR2)
{
if (!reported_sigusr2)
{
XLogRecPtr flushPtr = walprop_pg_get_flush_rec_ptr(wp);

wpg_log(LOG, "walproposer will send and wait for remaining WAL between %X/%X and %X/%X",
LSN_FORMAT_ARGS(wp->commitLsn), LSN_FORMAT_ARGS(flushPtr));
reported_sigusr2 = true;
}

if (wp->commitLsn >= walprop_pg_get_flush_rec_ptr(wp))
{
wpg_log(LOG, "walproposer sent all WAL up to %X/%X, exiting",
LSN_FORMAT_ARGS(wp->commitLsn));
proc_exit(0);
}
}
}

/*
* Choose most advanced PageserverFeedback and set it to *rf.
*/
Expand Down Expand Up @@ -1878,7 +1950,7 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
* None of that is functional in sync-safekeepers.
*/
static void
walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
walprop_pg_process_safekeeper_feedback(WalProposer *wp)
{
HotStandbyFeedback hsFeedback;
XLogRecPtr oldDiskConsistentLsn;
Expand All @@ -1893,10 +1965,10 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
replication_feedback_set(&quorumFeedback.rf);
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);

if (commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
if (wp->commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
{
if (commitLsn > quorumFeedback.flushLsn)
quorumFeedback.flushLsn = commitLsn;
if (wp->commitLsn > quorumFeedback.flushLsn)
quorumFeedback.flushLsn = wp->commitLsn;

/*
* Advance the replication slot to commitLsn. WAL before it is
Expand Down Expand Up @@ -1929,6 +2001,8 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
XidFromFullTransactionId(hsFeedback.catalog_xmin),
EpochFromFullTransactionId(hsFeedback.catalog_xmin));
}

CheckGracefulShutdown(wp);
}

static XLogRecPtr
Expand Down
15 changes: 7 additions & 8 deletions safekeeper/tests/walproposer_sim/walproposer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ pub struct SimulationApi {
safekeepers: RefCell<Vec<SafekeeperConn>>,
disk: Arc<DiskWalProposer>,
redo_start_lsn: Option<Lsn>,
last_logged_commit_lsn: u64,
shmem: UnsafeCell<walproposer::bindings::WalproposerShmemState>,
config: Config,
event_set: RefCell<Option<EventSet>>,
Expand Down Expand Up @@ -228,6 +229,7 @@ impl SimulationApi {
safekeepers: RefCell::new(sk_conns),
disk: args.disk,
redo_start_lsn: args.redo_start_lsn,
last_logged_commit_lsn: 0,
shmem: UnsafeCell::new(walproposer::bindings::WalproposerShmemState {
mutex: 0,
feedback: PageserverFeedback {
Expand Down Expand Up @@ -596,14 +598,11 @@ impl ApiImpl for SimulationApi {
}
}

fn process_safekeeper_feedback(
&self,
wp: &mut walproposer::bindings::WalProposer,
commit_lsn: u64,
) {
debug!("process_safekeeper_feedback, commit_lsn={}", commit_lsn);
if commit_lsn > wp.lastSentCommitLsn {
self.os.log_event(format!("commit_lsn;{}", commit_lsn));
fn process_safekeeper_feedback(&mut self, wp: &mut walproposer::bindings::WalProposer) {
debug!("process_safekeeper_feedback, commit_lsn={}", wp.commitLsn);
if wp.commitLsn > self.last_logged_commit_lsn {
self.os.log_event(format!("commit_lsn;{}", wp.commitLsn));
self.last_logged_commit_lsn = wp.commitLsn;
}
}

Expand Down
Loading

0 comments on commit 0ff31fc

Please sign in to comment.