diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 1df5820fb946..d78d2bcbeae0 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -776,9 +776,6 @@ pub struct TimelineGcRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WalRedoManagerProcessStatus { pub pid: u32, - /// The strum-generated `into::<&'static str>()` for `pageserver::walredo::ProcessKind`. - /// `ProcessKind` are a transitory thing, so, they have no enum representation in `pageserver_api`. - pub kind: Cow<'static, str>, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 5b871c5d5e23..5aab10e5d9c0 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -30,47 +30,27 @@ //! 2024-04-15 on i3en.3xlarge //! //! ```text -//! async-short/1 time: [24.584 µs 24.737 µs 24.922 µs] -//! async-short/2 time: [33.479 µs 33.660 µs 33.888 µs] -//! async-short/4 time: [42.713 µs 43.046 µs 43.440 µs] -//! async-short/8 time: [71.814 µs 72.478 µs 73.240 µs] -//! async-short/16 time: [132.73 µs 134.45 µs 136.22 µs] -//! async-short/32 time: [258.31 µs 260.73 µs 263.27 µs] -//! async-short/64 time: [511.61 µs 514.44 µs 517.51 µs] -//! async-short/128 time: [992.64 µs 998.23 µs 1.0042 ms] -//! async-medium/1 time: [110.11 µs 110.50 µs 110.96 µs] -//! async-medium/2 time: [153.06 µs 153.85 µs 154.99 µs] -//! async-medium/4 time: [317.51 µs 319.92 µs 322.85 µs] -//! async-medium/8 time: [638.30 µs 644.68 µs 652.12 µs] -//! async-medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms] -//! async-medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms] -//! async-medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms] -//! async-medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms] -//! sync-short/1 time: [25.503 µs 25.626 µs 25.771 µs] -//! sync-short/2 time: [30.850 µs 31.013 µs 31.208 µs] -//! sync-short/4 time: [45.543 µs 45.856 µs 46.193 µs] -//! sync-short/8 time: [84.114 µs 84.639 µs 85.220 µs] -//! sync-short/16 time: [185.22 µs 186.15 µs 187.13 µs] -//! sync-short/32 time: [377.43 µs 378.87 µs 380.46 µs] -//! sync-short/64 time: [756.49 µs 759.04 µs 761.70 µs] -//! sync-short/128 time: [1.4825 ms 1.4874 ms 1.4923 ms] -//! sync-medium/1 time: [105.66 µs 106.01 µs 106.43 µs] -//! sync-medium/2 time: [153.10 µs 153.84 µs 154.72 µs] -//! sync-medium/4 time: [327.13 µs 329.44 µs 332.27 µs] -//! sync-medium/8 time: [654.26 µs 658.73 µs 663.63 µs] -//! sync-medium/16 time: [1.2682 ms 1.2748 ms 1.2816 ms] -//! sync-medium/32 time: [2.4456 ms 2.4595 ms 2.4731 ms] -//! sync-medium/64 time: [4.6523 ms 4.6890 ms 4.7256 ms] -//! sync-medium/128 time: [8.7215 ms 8.8323 ms 8.9344 ms] +//! short/1 time: [24.584 µs 24.737 µs 24.922 µs] +//! short/2 time: [33.479 µs 33.660 µs 33.888 µs] +//! short/4 time: [42.713 µs 43.046 µs 43.440 µs] +//! short/8 time: [71.814 µs 72.478 µs 73.240 µs] +//! short/16 time: [132.73 µs 134.45 µs 136.22 µs] +//! short/32 time: [258.31 µs 260.73 µs 263.27 µs] +//! short/64 time: [511.61 µs 514.44 µs 517.51 µs] +//! short/128 time: [992.64 µs 998.23 µs 1.0042 ms] +//! medium/1 time: [110.11 µs 110.50 µs 110.96 µs] +//! medium/2 time: [153.06 µs 153.85 µs 154.99 µs] +//! medium/4 time: [317.51 µs 319.92 µs 322.85 µs] +//! medium/8 time: [638.30 µs 644.68 µs 652.12 µs] +//! medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms] +//! medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms] +//! medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms] +//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms] //! ``` use bytes::{Buf, Bytes}; use criterion::{BenchmarkId, Criterion}; -use pageserver::{ - config::PageServerConf, - walrecord::NeonWalRecord, - walredo::{PostgresRedoManager, ProcessKind}, -}; +use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager}; use pageserver_api::{key::Key, shard::TenantShardId}; use std::{ sync::Arc, @@ -80,39 +60,32 @@ use tokio::{sync::Barrier, task::JoinSet}; use utils::{id::TenantId, lsn::Lsn}; fn bench(c: &mut Criterion) { - for process_kind in &[ProcessKind::Async, ProcessKind::Sync] { - { - let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; - for nclients in nclients { - let mut group = c.benchmark_group(format!("{process_kind}-short")); - group.bench_with_input( - BenchmarkId::from_parameter(nclients), - &nclients, - |b, nclients| { - let redo_work = Arc::new(Request::short_input()); - b.iter_custom(|iters| { - bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients) - }); - }, - ); - } + { + let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; + for nclients in nclients { + let mut group = c.benchmark_group("short"); + group.bench_with_input( + BenchmarkId::from_parameter(nclients), + &nclients, + |b, nclients| { + let redo_work = Arc::new(Request::short_input()); + b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients)); + }, + ); } - - { - let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; - for nclients in nclients { - let mut group = c.benchmark_group(format!("{process_kind}-medium")); - group.bench_with_input( - BenchmarkId::from_parameter(nclients), - &nclients, - |b, nclients| { - let redo_work = Arc::new(Request::medium_input()); - b.iter_custom(|iters| { - bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients) - }); - }, - ); - } + } + { + let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; + for nclients in nclients { + let mut group = c.benchmark_group("medium"); + group.bench_with_input( + BenchmarkId::from_parameter(nclients), + &nclients, + |b, nclients| { + let redo_work = Arc::new(Request::medium_input()); + b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients)); + }, + ); } } } @@ -120,16 +93,10 @@ criterion::criterion_group!(benches, bench); criterion::criterion_main!(benches); // Returns the sum of each client's wall-clock time spent executing their share of the n_redos. -fn bench_impl( - process_kind: ProcessKind, - redo_work: Arc, - n_redos: u64, - nclients: u64, -) -> Duration { +fn bench_impl(redo_work: Arc, n_redos: u64, nclients: u64) -> Duration { let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap(); - let mut conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); - conf.walredo_process_kind = process_kind; + let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); let conf = Box::leak(Box::new(conf)); let tenant_shard_id = TenantShardId::unsharded(TenantId::generate()); @@ -158,27 +125,13 @@ fn bench_impl( }); } - let elapsed = rt.block_on(async move { + rt.block_on(async move { let mut total_wallclock_time = Duration::ZERO; while let Some(res) = tasks.join_next().await { total_wallclock_time += res.unwrap(); } total_wallclock_time - }); - - // consistency check to ensure process kind setting worked - if nredos_per_client > 0 { - assert_eq!( - manager - .status() - .process - .map(|p| p.kind) - .expect("the benchmark work causes a walredo process to be spawned"), - std::borrow::Cow::Borrowed(process_kind.into()) - ); - } - - elapsed + }) } async fn client( diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index 49f8a41b37a5..1be249c69f74 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -284,7 +284,6 @@ fn start_pageserver( )) .unwrap(); pageserver::preinitialize_metrics(); - pageserver::metrics::wal_redo::set_process_kind_metric(conf.walredo_process_kind); // If any failpoints were set from FAILPOINTS environment variable, // print them to the log for debugging purposes diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index b27bfb43b077..ffcd08b4b3ee 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1999,29 +1999,6 @@ impl Default for WalRedoProcessCounters { pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy = Lazy::new(WalRedoProcessCounters::default); -#[cfg(not(test))] -pub mod wal_redo { - use super::*; - - static PROCESS_KIND: Lazy> = Lazy::new(|| { - std::sync::Mutex::new( - register_uint_gauge_vec!( - "pageserver_wal_redo_process_kind", - "The configured process kind for walredo", - &["kind"], - ) - .unwrap(), - ) - }); - - pub fn set_process_kind_metric(kind: crate::walredo::ProcessKind) { - // use guard to avoid races around the next two steps - let guard = PROCESS_KIND.lock().unwrap(); - guard.reset(); - guard.with_label_values(&[&format!("{kind}")]).set(1); - } -} - /// Similar to `prometheus::HistogramTimer` but does not record on drop. pub(crate) struct StorageTimeMetricsTimer { metrics: StorageTimeMetrics, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 9776d4ce8882..3decea0c6de4 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -153,10 +153,7 @@ impl PostgresRedoManager { process: self .redo_process .get() - .map(|p| WalRedoManagerProcessStatus { - pid: p.id(), - kind: std::borrow::Cow::Borrowed(p.kind().into()), - }), + .map(|p| WalRedoManagerProcessStatus { pid: p.id() }), } } } diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index ad6b4e5fe96f..02c9c04bf185 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -1,7 +1,10 @@ +/// Layer of indirection previously used to support multiple implementations. +/// Subject to removal: use std::time::Duration; use bytes::Bytes; use pageserver_api::{reltag::RelTag, shard::TenantShardId}; +use tracing::warn; use utils::lsn::Lsn; use crate::{config::PageServerConf, walrecord::NeonWalRecord}; @@ -12,7 +15,6 @@ mod protocol; mod process_impl { pub(super) mod process_async; - pub(super) mod process_std; } #[derive( @@ -34,10 +36,7 @@ pub enum Kind { Async, } -pub(crate) enum Process { - Sync(process_impl::process_std::WalRedoProcess), - Async(process_impl::process_async::WalRedoProcess), -} +pub(crate) struct Process(process_impl::process_async::WalRedoProcess); impl Process { #[inline(always)] @@ -46,18 +45,17 @@ impl Process { tenant_shard_id: TenantShardId, pg_version: u32, ) -> anyhow::Result { - Ok(match conf.walredo_process_kind { - Kind::Sync => Self::Sync(process_impl::process_std::WalRedoProcess::launch( - conf, - tenant_shard_id, - pg_version, - )?), - Kind::Async => Self::Async(process_impl::process_async::WalRedoProcess::launch( - conf, - tenant_shard_id, - pg_version, - )?), - }) + if conf.walredo_process_kind != Kind::Async { + warn!( + configured = %conf.walredo_process_kind, + "the walredo_process_kind setting has been turned into a no-op, using async implementation" + ); + } + Ok(Self(process_impl::process_async::WalRedoProcess::launch( + conf, + tenant_shard_id, + pg_version, + )?)) } #[inline(always)] @@ -69,29 +67,12 @@ impl Process { records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, ) -> anyhow::Result { - match self { - Process::Sync(p) => { - p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) - .await - } - Process::Async(p) => { - p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) - .await - } - } + self.0 + .apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) + .await } pub(crate) fn id(&self) -> u32 { - match self { - Process::Sync(p) => p.id(), - Process::Async(p) => p.id(), - } - } - - pub(crate) fn kind(&self) -> Kind { - match self { - Process::Sync(_) => Kind::Sync, - Process::Async(_) => Kind::Async, - } + self.0.id() } } diff --git a/pageserver/src/walredo/process/process_impl/process_std.rs b/pageserver/src/walredo/process/process_impl/process_std.rs deleted file mode 100644 index e7a6c263c9bb..000000000000 --- a/pageserver/src/walredo/process/process_impl/process_std.rs +++ /dev/null @@ -1,405 +0,0 @@ -use self::no_leak_child::NoLeakChild; -use crate::{ - config::PageServerConf, - metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, - walrecord::NeonWalRecord, - walredo::process::{no_leak_child, protocol}, -}; -use anyhow::Context; -use bytes::Bytes; -use nix::poll::{PollFd, PollFlags}; -use pageserver_api::{reltag::RelTag, shard::TenantShardId}; -use postgres_ffi::BLCKSZ; -use std::os::fd::AsRawFd; -#[cfg(feature = "testing")] -use std::sync::atomic::AtomicUsize; -use std::{ - collections::VecDeque, - io::{Read, Write}, - process::{ChildStdin, ChildStdout, Command, Stdio}, - sync::{Mutex, MutexGuard}, - time::Duration, -}; -use tracing::{debug, error, instrument, Instrument}; -use utils::{lsn::Lsn, nonblock::set_nonblock}; - -pub struct WalRedoProcess { - #[allow(dead_code)] - conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, - // Some() on construction, only becomes None on Drop. - child: Option, - stdout: Mutex, - stdin: Mutex, - /// Counter to separate same sized walredo inputs failing at the same millisecond. - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize, -} - -struct ProcessInput { - stdin: ChildStdin, - n_requests: usize, -} - -struct ProcessOutput { - stdout: ChildStdout, - pending_responses: VecDeque>, - n_processed_responses: usize, -} - -impl WalRedoProcess { - // - // Start postgres binary in special WAL redo mode. - // - #[instrument(skip_all,fields(pg_version=pg_version))] - pub(crate) fn launch( - conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, - pg_version: u32, - ) -> anyhow::Result { - crate::span::debug_assert_current_span_has_tenant_id(); - - let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. - let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?; - - use no_leak_child::NoLeakChildCommandExt; - // Start postgres itself - let child = Command::new(pg_bin_dir_path.join("postgres")) - // the first arg must be --wal-redo so the child process enters into walredo mode - .arg("--wal-redo") - // the child doesn't process this arg, but, having it in the argv helps indentify the - // walredo process for a particular tenant when debugging a pagserver - .args(["--tenant-shard-id", &format!("{tenant_shard_id}")]) - .stdin(Stdio::piped()) - .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .env_clear() - .env("LD_LIBRARY_PATH", &pg_lib_dir_path) - .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) - // NB: The redo process is not trusted after we sent it the first - // walredo work. Before that, it is trusted. Specifically, we trust - // it to - // 1. close all file descriptors except stdin, stdout, stderr because - // pageserver might not be 100% diligent in setting FD_CLOEXEC on all - // the files it opens, and - // 2. to use seccomp to sandbox itself before processing the first - // walredo request. - .spawn_no_leak_child(tenant_shard_id) - .context("spawn process")?; - WAL_REDO_PROCESS_COUNTERS.started.inc(); - let mut child = scopeguard::guard(child, |child| { - error!("killing wal-redo-postgres process due to a problem during launch"); - child.kill_and_wait(WalRedoKillCause::Startup); - }); - - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - let stderr = tokio::process::ChildStderr::from_std(stderr) - .context("convert to tokio::ChildStderr")?; - macro_rules! set_nonblock_or_log_err { - ($file:ident) => {{ - let res = set_nonblock($file.as_raw_fd()); - if let Err(e) = &res { - error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed"); - } - res - }}; - } - set_nonblock_or_log_err!(stdin)?; - set_nonblock_or_log_err!(stdout)?; - - // all fallible operations post-spawn are complete, so get rid of the guard - let child = scopeguard::ScopeGuard::into_inner(child); - - tokio::spawn( - async move { - scopeguard::defer! { - debug!("wal-redo-postgres stderr_logger_task finished"); - crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc(); - } - debug!("wal-redo-postgres stderr_logger_task started"); - crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc(); - - use tokio::io::AsyncBufReadExt; - let mut stderr_lines = tokio::io::BufReader::new(stderr); - let mut buf = Vec::new(); - let res = loop { - buf.clear(); - // TODO we don't trust the process to cap its stderr length. - // Currently it can do unbounded Vec allocation. - match stderr_lines.read_until(b'\n', &mut buf).await { - Ok(0) => break Ok(()), // eof - Ok(num_bytes) => { - let output = String::from_utf8_lossy(&buf[..num_bytes]); - error!(%output, "received output"); - } - Err(e) => { - break Err(e); - } - } - }; - match res { - Ok(()) => (), - Err(e) => { - error!(error=?e, "failed to read from walredo stderr"); - } - } - }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version)) - ); - - Ok(Self { - conf, - tenant_shard_id, - child: Some(child), - stdin: Mutex::new(ProcessInput { - stdin, - n_requests: 0, - }), - stdout: Mutex::new(ProcessOutput { - stdout, - pending_responses: VecDeque::new(), - n_processed_responses: 0, - }), - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize::default(), - }) - } - - pub(crate) fn id(&self) -> u32 { - self.child - .as_ref() - .expect("must not call this during Drop") - .id() - } - - // Apply given WAL records ('records') over an old page image. Returns - // new page image. - // - #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))] - pub(crate) async fn apply_wal_records( - &self, - rel: RelTag, - blknum: u32, - base_img: &Option, - records: &[(Lsn, NeonWalRecord)], - wal_redo_timeout: Duration, - ) -> anyhow::Result { - let tag = protocol::BufferTag { rel, blknum }; - let input = self.stdin.lock().unwrap(); - - // Serialize all the messages to send the WAL redo process first. - // - // This could be problematic if there are millions of records to replay, - // but in practice the number of records is usually so small that it doesn't - // matter, and it's better to keep this code simple. - // - // Most requests start with a before-image with BLCKSZ bytes, followed by - // by some other WAL records. Start with a buffer that can hold that - // comfortably. - let mut writebuf: Vec = Vec::with_capacity((BLCKSZ as usize) * 3); - protocol::build_begin_redo_for_block_msg(tag, &mut writebuf); - if let Some(img) = base_img { - protocol::build_push_page_msg(tag, img, &mut writebuf); - } - for (lsn, rec) in records.iter() { - if let NeonWalRecord::Postgres { - will_init: _, - rec: postgres_rec, - } = rec - { - protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); - } else { - anyhow::bail!("tried to pass neon wal record to postgres WAL redo"); - } - } - protocol::build_get_page_msg(tag, &mut writebuf); - WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); - - let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout); - - if res.is_err() { - // not all of these can be caused by this particular input, however these are so rare - // in tests so capture all. - self.record_and_log(&writebuf); - } - - res - } - - fn apply_wal_records0( - &self, - writebuf: &[u8], - input: MutexGuard, - wal_redo_timeout: Duration, - ) -> anyhow::Result { - let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small. - let mut nwrite = 0usize; - - while nwrite < writebuf.len() { - let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)]; - let n = loop { - match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; - - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If 'stdin' is writeable, do write. - let in_revents = stdin_pollfds[0].revents().unwrap(); - if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { - nwrite += proc.stdin.write(&writebuf[nwrite..])?; - } - if in_revents.contains(PollFlags::POLLHUP) { - // We still have more data to write, but the process closed the pipe. - anyhow::bail!("WAL redo process closed its stdin unexpectedly"); - } - } - let request_no = proc.n_requests; - proc.n_requests += 1; - drop(proc); - - // To improve walredo performance we separate sending requests and receiving - // responses. Them are protected by different mutexes (output and input). - // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process - // then there is not warranty that T1 will first granted output mutex lock. - // To address this issue we maintain number of sent requests, number of processed - // responses and ring buffer with pending responses. After sending response - // (under input mutex), threads remembers request number. Then it releases - // input mutex, locks output mutex and fetch in ring buffer all responses until - // its stored request number. The it takes correspondent element from - // pending responses ring buffer and truncate all empty elements from the front, - // advancing processed responses number. - - let mut output = self.stdout.lock().unwrap(); - let n_processed_responses = output.n_processed_responses; - while n_processed_responses + output.pending_responses.len() <= request_no { - // We expect the WAL redo process to respond with an 8k page image. We read it - // into this buffer. - let mut resultbuf = vec![0; BLCKSZ.into()]; - let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far - while nresult < BLCKSZ.into() { - let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)]; - // We do two things simultaneously: reading response from stdout - // and forward any logging information that the child writes to its stderr to the page server's log. - let n = loop { - match nix::poll::poll( - &mut stdout_pollfds[..], - wal_redo_timeout.as_millis() as i32, - ) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; - - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If we have some data in stdout, read it to the result buffer. - let out_revents = stdout_pollfds[0].revents().unwrap(); - if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - nresult += output.stdout.read(&mut resultbuf[nresult..])?; - } - if out_revents.contains(PollFlags::POLLHUP) { - anyhow::bail!("WAL redo process closed its stdout unexpectedly"); - } - } - output - .pending_responses - .push_back(Some(Bytes::from(resultbuf))); - } - // Replace our request's response with None in `pending_responses`. - // Then make space in the ring buffer by clearing out any seqence of contiguous - // `None`'s from the front of `pending_responses`. - // NB: We can't pop_front() because other requests' responses because another - // requester might have grabbed the output mutex before us: - // T1: grab input mutex - // T1: send request_no 23 - // T1: release input mutex - // T2: grab input mutex - // T2: send request_no 24 - // T2: release input mutex - // T2: grab output mutex - // T2: n_processed_responses + output.pending_responses.len() <= request_no - // 23 0 24 - // T2: enters poll loop that reads stdout - // T2: put response for 23 into pending_responses - // T2: put response for 24 into pending_resposnes - // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back - // T2: takes its response_24 - // pending_responses now looks like this: Front Some(response_23) None Back - // T2: does the while loop below - // pending_responses now looks like this: Front Some(response_23) None Back - // T2: releases output mutex - // T1: grabs output mutex - // T1: n_processed_responses + output.pending_responses.len() > request_no - // 23 2 23 - // T1: skips poll loop that reads stdout - // T1: takes its response_23 - // pending_responses now looks like this: Front None None Back - // T2: does the while loop below - // pending_responses now looks like this: Front Back - // n_processed_responses now has value 25 - let res = output.pending_responses[request_no - n_processed_responses] - .take() - .expect("we own this request_no, nobody else is supposed to take it"); - while let Some(front) = output.pending_responses.front() { - if front.is_none() { - output.pending_responses.pop_front(); - output.n_processed_responses += 1; - } else { - break; - } - } - Ok(res) - } - - #[cfg(feature = "testing")] - fn record_and_log(&self, writebuf: &[u8]) { - use std::sync::atomic::Ordering; - - let millis = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis(); - - let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed); - - // these files will be collected to an allure report - let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len()); - - let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename); - - let res = std::fs::OpenOptions::new() - .write(true) - .create_new(true) - .read(true) - .open(path) - .and_then(|mut f| f.write_all(writebuf)); - - // trip up allowed_errors - if let Err(e) = res { - tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}"); - } else { - tracing::error!(filename, "erroring walredo input saved"); - } - } - - #[cfg(not(feature = "testing"))] - fn record_and_log(&self, _: &[u8]) {} -} - -impl Drop for WalRedoProcess { - fn drop(&mut self) { - self.child - .take() - .expect("we only do this once") - .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop); - // no way to wait for stderr_logger_task from Drop because that is async only - } -}