Skip to content

Commit

Permalink
Keep walproposer alive until shutdown checkpoint is safe on safekepeers
Browse files Browse the repository at this point in the history
The walproposer pretends to be a walsender in many ways. It has a
WalSnd slot, it claims to be a walsender by calling
MarkPostmasterChildWalSender() etc. But one different to real
walsenders was that the postmaster still treated it as a bgworker
rather than a walsender. The difference is that at shutdown,
walsenders are not killed until the very end, after the checkpointer
process has written the shutdown checkpoint and exited.

As a result, the walproposer always got killed before the shutdown
checkpoint was written, so the shutdown checkpoint never made it to
safekeepers. That's fine in principle, we don't require a clean
shutdown after all. But it also feels a bit silly not to stream the
shutdown checkpoint. It could be useful for initializing hot standby
mode in a read replica, for example.

Change postmaster to treat background workers that have called
MarkPostmasterChildWalSender() as walsenders. That unfortunately
requires another small change in postgres core.

After doing that, walproposers stay alive longer. However, it also
means that the checkpointer will wait for the walproposer to switch to
WALSNDSTATE_STOPPING state, when the checkpointer sends the
PROCSIG_WALSND_INIT_STOPPING signal. We don't have the machinery in
walproposer to receive and handle that signal reliably. Instead, we
mark walproposer as being in WALSNDSTATE_STOPPING always.

In commit 568f914, I assumed that shutdown will wait for all the
remaining WAL to be streamed to safekeepers, but before this commit
that was not true, and the test became flaky. This should make it
stable again.
  • Loading branch information
hlinnaka authored and arssher committed Mar 11, 2024
1 parent cc5d6c6 commit 008dc99
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 43 deletions.
4 changes: 2 additions & 2 deletions libs/walproposer/src/api_bindings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,11 @@ extern "C" fn finish_sync_safekeepers(wp: *mut WalProposer, lsn: XLogRecPtr) {
}
}

extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer, commit_lsn: XLogRecPtr) {
extern "C" fn process_safekeeper_feedback(wp: *mut WalProposer) {
unsafe {
let callback_data = (*(*wp).config).callback_data;
let api = callback_data as *mut Box<dyn ApiImpl>;
(*api).process_safekeeper_feedback(&mut (*wp), commit_lsn)
(*api).process_safekeeper_feedback(&mut (*wp))
}
}

Expand Down
2 changes: 1 addition & 1 deletion libs/walproposer/src/walproposer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ pub trait ApiImpl {
todo!()
}

fn process_safekeeper_feedback(&self, _wp: &mut WalProposer, _commit_lsn: u64) {
fn process_safekeeper_feedback(&mut self, _wp: &mut WalProposer) {
todo!()
}

Expand Down
23 changes: 11 additions & 12 deletions pgxn/neon/walproposer.c
Original file line number Diff line number Diff line change
Expand Up @@ -1220,7 +1220,7 @@ PrepareAppendRequest(WalProposer *wp, AppendRequestHeader *req, XLogRecPtr begin
req->epochStartLsn = wp->propEpochStartLsn;
req->beginLsn = beginLsn;
req->endLsn = endLsn;
req->commitLsn = GetAcknowledgedByQuorumWALPosition(wp);
req->commitLsn = wp->commitLsn;
req->truncateLsn = wp->truncateLsn;
req->proposerId = wp->greetRequest.proposerId;
}
Expand Down Expand Up @@ -1405,7 +1405,7 @@ static bool
RecvAppendResponses(Safekeeper *sk)
{
WalProposer *wp = sk->wp;
XLogRecPtr minQuorumLsn;
XLogRecPtr newCommitLsn;
bool readAnything = false;

while (true)
Expand Down Expand Up @@ -1444,18 +1444,19 @@ RecvAppendResponses(Safekeeper *sk)
if (!readAnything)
return sk->state == SS_ACTIVE;

HandleSafekeeperResponse(wp);

/* update commit_lsn */
newCommitLsn = GetAcknowledgedByQuorumWALPosition(wp);
/*
* Also send the new commit lsn to all the safekeepers.
* Send the new value to all safekeepers.
*/
minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp);
if (minQuorumLsn > wp->lastSentCommitLsn)
if (newCommitLsn > wp->commitLsn)
{
BroadcastAppendRequest(wp);
wp->lastSentCommitLsn = minQuorumLsn;
wp->commitLsn = newCommitLsn;
}

HandleSafekeeperResponse(wp);

return sk->state == SS_ACTIVE;
}

Expand Down Expand Up @@ -1632,11 +1633,9 @@ GetDonor(WalProposer *wp, XLogRecPtr *donor_lsn)
static void
HandleSafekeeperResponse(WalProposer *wp)
{
XLogRecPtr minQuorumLsn;
XLogRecPtr candidateTruncateLsn;

minQuorumLsn = GetAcknowledgedByQuorumWALPosition(wp);
wp->api.process_safekeeper_feedback(wp, minQuorumLsn);
wp->api.process_safekeeper_feedback(wp);

/*
* Try to advance truncateLsn -- the last record flushed to all
Expand All @@ -1649,7 +1648,7 @@ HandleSafekeeperResponse(WalProposer *wp)
* can't commit entries from previous term' in Raft); 2)
*/
candidateTruncateLsn = CalculateMinFlushLsn(wp);
candidateTruncateLsn = Min(candidateTruncateLsn, minQuorumLsn);
candidateTruncateLsn = Min(candidateTruncateLsn, wp->commitLsn);
if (candidateTruncateLsn > wp->truncateLsn)
{
wp->truncateLsn = candidateTruncateLsn;
Expand Down
6 changes: 3 additions & 3 deletions pgxn/neon/walproposer.h
Original file line number Diff line number Diff line change
Expand Up @@ -564,7 +564,7 @@ typedef struct walproposer_api
* backpressure feedback and to confirm WAL persistence (has been commited
* on the quorum of safekeepers).
*/
void (*process_safekeeper_feedback) (WalProposer *wp, XLogRecPtr commitLsn);
void (*process_safekeeper_feedback) (WalProposer *wp);

/*
* Write a log message to the internal log processor. This is used only
Expand Down Expand Up @@ -646,8 +646,8 @@ typedef struct WalProposer
/* WAL has been generated up to this point */
XLogRecPtr availableLsn;

/* last commitLsn broadcasted to safekeepers */
XLogRecPtr lastSentCommitLsn;
/* cached GetAcknowledgedByQuorumWALPosition result */
XLogRecPtr commitLsn;

ProposerGreeting greetRequest;

Expand Down
102 changes: 88 additions & 14 deletions pgxn/neon/walproposer_pg.c
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ static WalproposerShmemState *walprop_shared;
static WalProposerConfig walprop_config;
static XLogRecPtr sentPtr = InvalidXLogRecPtr;
static const walproposer_api walprop_pg;
static volatile sig_atomic_t got_SIGUSR2 = false;
static bool reported_sigusr2 = false;

static void nwp_shmem_startup_hook(void);
static void nwp_register_gucs(void);
Expand Down Expand Up @@ -101,6 +103,8 @@ static void add_nwr_event_set(Safekeeper *sk, uint32 events);
static void update_nwr_event_set(Safekeeper *sk, uint32 events);
static void rm_safekeeper_event_set(Safekeeper *to_remove, bool is_sk);

static void CheckGracefulShutdown(WalProposer *wp);

static XLogRecPtr GetLogRepRestartLSN(WalProposer *wp);

static void
Expand Down Expand Up @@ -492,6 +496,24 @@ walprop_pg_init_standalone_sync_safekeepers(void)
BackgroundWorkerUnblockSignals();
}

/*
* We pretend to be a walsender process, and the lifecycle of a walsender is
* slightly different than other procesess. At shutdown, walsender processes
* stay alive until the very end, after the checkpointer has written the
* shutdown checkpoint. When the checkpointer exits, the postmaster sends all
* remaining walsender processes SIGUSR2. On receiving SIGUSR2, we try to send
* the remaining WAL, and then exit. This ensures that the checkpoint record
* reaches durable storage (in safekeepers), before the server shuts down
* completely.
*/
static void
walprop_sigusr2(SIGNAL_ARGS)
{
got_SIGUSR2 = true;

SetLatch(MyLatch);
}

static void
walprop_pg_init_bgworker(void)
{
Expand All @@ -503,6 +525,7 @@ walprop_pg_init_bgworker(void)
pqsignal(SIGUSR1, procsignal_sigusr1_handler);
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGTERM, die);
pqsignal(SIGUSR2, walprop_sigusr2);

BackgroundWorkerUnblockSignals();

Expand Down Expand Up @@ -1075,14 +1098,26 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
#endif

/*
* When we first start replication the standby will be behind the primary.
* For some applications, for example synchronous replication, it is
* important to have a clear state for this initial catchup mode, so we
* can trigger actions when we change streaming state later. We may stay
* in this state for a long time, which is exactly why we want to be able
* to monitor whether or not we are still here.
* XXX: Move straight to STOPPING state, skipping the STREAMING state.
*
* This is a bit weird. Normal walsenders stay in STREAMING state, until
* the checkpointer signals them that it is about to start writing the
* shutdown checkpoint. The walsenders acknowledge that they have received
* that signal by switching to STOPPING state. That tells the walsenders
* that they must not write any new WAL.
*
* However, we cannot easily intercept that signal from the checkpointer.
* It's sent by WalSndInitStopping(), using
* SendProcSignal(PROCSIGNAL_WALSND_INIT_STOPPING). It's received by
* HandleWalSndInitStopping, which sets a process-local got_STOPPING flag.
* However, that's all private to walsender.c.
*
* We don't need to do anything special upon receiving the signal, the
* walproposer doesn't write any WAL anyway, so we skip the STREAMING
* state and go directly to STOPPING mode. That way, the checkpointer
* won't wait for us.
*/
WalSndSetState(WALSNDSTATE_CATCHUP);
WalSndSetState(WALSNDSTATE_STOPPING);

/*
* Don't allow a request to stream from a future point in WAL that hasn't
Expand Down Expand Up @@ -1122,6 +1157,8 @@ StartProposerReplication(WalProposer *wp, StartReplicationCmd *cmd)
static void
WalSndLoop(WalProposer *wp)
{
XLogRecPtr flushPtr;

/* Clear any already-pending wakeups */
ResetLatch(MyLatch);

Expand All @@ -1130,9 +1167,6 @@ WalSndLoop(WalProposer *wp)
CHECK_FOR_INTERRUPTS();

XLogBroadcastWalProposer(wp);

if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
WalSndSetState(WALSNDSTATE_STREAMING);
WalProposerPoll(wp);
}
}
Expand Down Expand Up @@ -1745,6 +1779,9 @@ walprop_pg_wait_event_set(WalProposer *wp, long timeout, Safekeeper **sk, uint32
{
ConditionVariableCancelSleep();
ResetLatch(MyLatch);

CheckGracefulShutdown(wp);

*events = WL_LATCH_SET;
return 1;
}
Expand Down Expand Up @@ -1798,6 +1835,41 @@ walprop_pg_finish_sync_safekeepers(WalProposer *wp, XLogRecPtr lsn)
exit(0);
}

/*
* Like vanilla walsender, on sigusr2 send all remaining WAL and exit.
*
* Note that unlike sync-safekeepers waiting here is not reliable: we
* don't check that majority of safekeepers received and persisted
* commit_lsn -- only that walproposer reached it (which immediately
* broadcasts new value). Doing that without incurring redundant control
* file syncing would need wp -> sk protocol change. OTOH unlike
* sync-safekeepers which must bump commit_lsn or basebackup will fail,
* this catchup is important only for tests where safekeepers/network
* don't crash on their own.
*/
static void
CheckGracefulShutdown(WalProposer *wp)
{
if (got_SIGUSR2)
{
if (!reported_sigusr2)
{
XLogRecPtr flushPtr = walprop_pg_get_flush_rec_ptr(wp);

wpg_log(LOG, "walproposer will send and wait for remaining WAL between %X/%X and %X/%X",
LSN_FORMAT_ARGS(wp->commitLsn), LSN_FORMAT_ARGS(flushPtr));
reported_sigusr2 = true;
}

if (wp->commitLsn >= walprop_pg_get_flush_rec_ptr(wp))
{
wpg_log(LOG, "walproposer sent all WAL up to %X/%X, exiting",
LSN_FORMAT_ARGS(wp->commitLsn));
proc_exit(0);
}
}
}

/*
* Choose most advanced PageserverFeedback and set it to *rf.
*/
Expand Down Expand Up @@ -1878,7 +1950,7 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback *hs, WalProposer *wp)
* None of that is functional in sync-safekeepers.
*/
static void
walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
walprop_pg_process_safekeeper_feedback(WalProposer *wp)
{
HotStandbyFeedback hsFeedback;
XLogRecPtr oldDiskConsistentLsn;
Expand All @@ -1893,10 +1965,10 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
replication_feedback_set(&quorumFeedback.rf);
SetZenithCurrentClusterSize(quorumFeedback.rf.currentClusterSize);

if (commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
if (wp->commitLsn > quorumFeedback.flushLsn || oldDiskConsistentLsn != quorumFeedback.rf.disk_consistent_lsn)
{
if (commitLsn > quorumFeedback.flushLsn)
quorumFeedback.flushLsn = commitLsn;
if (wp->commitLsn > quorumFeedback.flushLsn)
quorumFeedback.flushLsn = wp->commitLsn;

/*
* Advance the replication slot to commitLsn. WAL before it is
Expand Down Expand Up @@ -1929,6 +2001,8 @@ walprop_pg_process_safekeeper_feedback(WalProposer *wp, XLogRecPtr commitLsn)
XidFromFullTransactionId(hsFeedback.catalog_xmin),
EpochFromFullTransactionId(hsFeedback.catalog_xmin));
}

CheckGracefulShutdown(wp);
}

static XLogRecPtr
Expand Down
15 changes: 7 additions & 8 deletions safekeeper/tests/walproposer_sim/walproposer_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ pub struct SimulationApi {
safekeepers: RefCell<Vec<SafekeeperConn>>,
disk: Arc<DiskWalProposer>,
redo_start_lsn: Option<Lsn>,
last_logged_commit_lsn: u64,
shmem: UnsafeCell<walproposer::bindings::WalproposerShmemState>,
config: Config,
event_set: RefCell<Option<EventSet>>,
Expand Down Expand Up @@ -228,6 +229,7 @@ impl SimulationApi {
safekeepers: RefCell::new(sk_conns),
disk: args.disk,
redo_start_lsn: args.redo_start_lsn,
last_logged_commit_lsn: 0,
shmem: UnsafeCell::new(walproposer::bindings::WalproposerShmemState {
mutex: 0,
feedback: PageserverFeedback {
Expand Down Expand Up @@ -596,14 +598,11 @@ impl ApiImpl for SimulationApi {
}
}

fn process_safekeeper_feedback(
&self,
wp: &mut walproposer::bindings::WalProposer,
commit_lsn: u64,
) {
debug!("process_safekeeper_feedback, commit_lsn={}", commit_lsn);
if commit_lsn > wp.lastSentCommitLsn {
self.os.log_event(format!("commit_lsn;{}", commit_lsn));
fn process_safekeeper_feedback(&mut self, wp: &mut walproposer::bindings::WalProposer) {
debug!("process_safekeeper_feedback, commit_lsn={}", wp.commitLsn);
if wp.commitLsn > self.last_logged_commit_lsn {
self.os.log_event(format!("commit_lsn;{}", wp.commitLsn));
self.last_logged_commit_lsn = wp.commitLsn;
}
}

Expand Down
14 changes: 14 additions & 0 deletions test_runner/fixtures/neon_fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -2547,6 +2547,20 @@ def run_capture(
)
return base_path

def get_pg_controldata_checkpoint_lsn(self, pgdata: str) -> Lsn:
"""
Run pg_controldata on given datadir and extract checkpoint lsn.
"""

pg_controldata_path = os.path.join(self.pg_bin_path, "pg_controldata")
cmd = f"{pg_controldata_path} -D {pgdata}"
result = subprocess.run(cmd, capture_output=True, text=True, shell=True)
checkpoint_lsn = re.findall(
"Latest checkpoint location:\\s+([0-9A-F]+/[0-9A-F]+)", result.stdout
)[0]
log.info(f"last checkpoint at {checkpoint_lsn}")
return Lsn(checkpoint_lsn)


@pytest.fixture(scope="function")
def pg_bin(test_output_dir: Path, pg_distrib_dir: Path, pg_version: PgVersion) -> PgBin:
Expand Down
30 changes: 30 additions & 0 deletions test_runner/regress/test_wal_acceptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,36 @@ def test_peer_recovery(neon_env_builder: NeonEnvBuilder):
endpoint.safe_psql("insert into t select generate_series(1,100), 'payload'")


# Test that when compute is terminated in fast (or smart) mode, walproposer is
# allowed to run and self terminate after shutdown checkpoint is written, so it
# commits it to safekeepers before exiting. This not required for correctness,
# but needed for tests using check_restored_datadir_content.
def test_wp_graceful_shutdown(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()

tenant_id = env.initial_tenant
timeline_id = env.neon_cli.create_branch("test_wp_graceful_shutdown")
ep = env.endpoints.create_start("test_wp_graceful_shutdown")
ep.safe_psql("create table t(key int, value text)")
ep.stop()

# figure out checkpoint lsn
ckpt_lsn = pg_bin.get_pg_controldata_checkpoint_lsn(ep.pg_data_dir_path())

sk_http_cli = env.safekeepers[0].http_client()
commit_lsn = sk_http_cli.timeline_status(tenant_id, timeline_id).commit_lsn
# Note: this is in memory value. Graceful shutdown of walproposer currently
# doesn't guarantee persisted value, which is ok as we need it only for
# tests. Persisting it without risking too many cf flushes needs a wp -> sk
# protocol change. (though in reality shutdown sync-safekeepers does flush
# of cf, so most of the time persisted value wouldn't lag)
log.info(f"sk commit_lsn {commit_lsn}")
# note that ckpt_lsn is the *beginning* of checkpoint record, so commit_lsn
# must be actually higher
assert commit_lsn > ckpt_lsn, "safekeeper must have checkpoint record"


class SafekeeperEnv:
def __init__(
self,
Expand Down
2 changes: 1 addition & 1 deletion vendor/postgres-v14
2 changes: 1 addition & 1 deletion vendor/postgres-v15

0 comments on commit 008dc99

Please sign in to comment.