From 5353da74a12890155d3fcfa9d4090bfde78ff0c6 Mon Sep 17 00:00:00 2001 From: Christian Schwarz Date: Wed, 15 May 2024 15:04:52 +0200 Subject: [PATCH] chore!: always use async walredo, warn if sync is configured (#7754) refs https://github.com/neondatabase/neon/issues/7753 This PR is step (1) of removing sync walredo from Pageserver. Changes: * Remove the sync impl * If sync is configured, warn! and use async instead * Remove the metric that exposes `kind` * Remove the tenant status API that exposes `kind` Future Work ----------- After we've released this change to prod and are sure we won't roll back, we will 1. update the prod Ansible to remove the config flag from the prod pageserver.toml. 2. remove the remaining `kind` code in pageserver These two changes need no release inbetween. See https://github.com/neondatabase/neon/issues/7753 for details. --- libs/pageserver_api/src/models.rs | 3 - pageserver/benches/bench_walredo.rs | 139 ++---- pageserver/src/bin/pageserver.rs | 1 - pageserver/src/config.rs | 2 +- pageserver/src/metrics.rs | 23 - pageserver/src/walredo.rs | 5 +- pageserver/src/walredo/process.rs | 57 +-- .../process/process_impl/process_std.rs | 405 ------------------ test_runner/regress/test_pageserver_config.py | 35 -- 9 files changed, 67 insertions(+), 603 deletions(-) delete mode 100644 pageserver/src/walredo/process/process_impl/process_std.rs delete mode 100644 test_runner/regress/test_pageserver_config.py diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 1df5820fb946..d78d2bcbeae0 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -776,9 +776,6 @@ pub struct TimelineGcRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct WalRedoManagerProcessStatus { pub pid: u32, - /// The strum-generated `into::<&'static str>()` for `pageserver::walredo::ProcessKind`. - /// `ProcessKind` are a transitory thing, so, they have no enum representation in `pageserver_api`. - pub kind: Cow<'static, str>, } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/pageserver/benches/bench_walredo.rs b/pageserver/benches/bench_walredo.rs index 5b871c5d5e23..5aab10e5d9c0 100644 --- a/pageserver/benches/bench_walredo.rs +++ b/pageserver/benches/bench_walredo.rs @@ -30,47 +30,27 @@ //! 2024-04-15 on i3en.3xlarge //! //! ```text -//! async-short/1 time: [24.584 µs 24.737 µs 24.922 µs] -//! async-short/2 time: [33.479 µs 33.660 µs 33.888 µs] -//! async-short/4 time: [42.713 µs 43.046 µs 43.440 µs] -//! async-short/8 time: [71.814 µs 72.478 µs 73.240 µs] -//! async-short/16 time: [132.73 µs 134.45 µs 136.22 µs] -//! async-short/32 time: [258.31 µs 260.73 µs 263.27 µs] -//! async-short/64 time: [511.61 µs 514.44 µs 517.51 µs] -//! async-short/128 time: [992.64 µs 998.23 µs 1.0042 ms] -//! async-medium/1 time: [110.11 µs 110.50 µs 110.96 µs] -//! async-medium/2 time: [153.06 µs 153.85 µs 154.99 µs] -//! async-medium/4 time: [317.51 µs 319.92 µs 322.85 µs] -//! async-medium/8 time: [638.30 µs 644.68 µs 652.12 µs] -//! async-medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms] -//! async-medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms] -//! async-medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms] -//! async-medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms] -//! sync-short/1 time: [25.503 µs 25.626 µs 25.771 µs] -//! sync-short/2 time: [30.850 µs 31.013 µs 31.208 µs] -//! sync-short/4 time: [45.543 µs 45.856 µs 46.193 µs] -//! sync-short/8 time: [84.114 µs 84.639 µs 85.220 µs] -//! sync-short/16 time: [185.22 µs 186.15 µs 187.13 µs] -//! sync-short/32 time: [377.43 µs 378.87 µs 380.46 µs] -//! sync-short/64 time: [756.49 µs 759.04 µs 761.70 µs] -//! sync-short/128 time: [1.4825 ms 1.4874 ms 1.4923 ms] -//! sync-medium/1 time: [105.66 µs 106.01 µs 106.43 µs] -//! sync-medium/2 time: [153.10 µs 153.84 µs 154.72 µs] -//! sync-medium/4 time: [327.13 µs 329.44 µs 332.27 µs] -//! sync-medium/8 time: [654.26 µs 658.73 µs 663.63 µs] -//! sync-medium/16 time: [1.2682 ms 1.2748 ms 1.2816 ms] -//! sync-medium/32 time: [2.4456 ms 2.4595 ms 2.4731 ms] -//! sync-medium/64 time: [4.6523 ms 4.6890 ms 4.7256 ms] -//! sync-medium/128 time: [8.7215 ms 8.8323 ms 8.9344 ms] +//! short/1 time: [24.584 µs 24.737 µs 24.922 µs] +//! short/2 time: [33.479 µs 33.660 µs 33.888 µs] +//! short/4 time: [42.713 µs 43.046 µs 43.440 µs] +//! short/8 time: [71.814 µs 72.478 µs 73.240 µs] +//! short/16 time: [132.73 µs 134.45 µs 136.22 µs] +//! short/32 time: [258.31 µs 260.73 µs 263.27 µs] +//! short/64 time: [511.61 µs 514.44 µs 517.51 µs] +//! short/128 time: [992.64 µs 998.23 µs 1.0042 ms] +//! medium/1 time: [110.11 µs 110.50 µs 110.96 µs] +//! medium/2 time: [153.06 µs 153.85 µs 154.99 µs] +//! medium/4 time: [317.51 µs 319.92 µs 322.85 µs] +//! medium/8 time: [638.30 µs 644.68 µs 652.12 µs] +//! medium/16 time: [1.2651 ms 1.2773 ms 1.2914 ms] +//! medium/32 time: [2.5117 ms 2.5410 ms 2.5720 ms] +//! medium/64 time: [4.8088 ms 4.8555 ms 4.9047 ms] +//! medium/128 time: [8.8311 ms 8.9849 ms 9.1263 ms] //! ``` use bytes::{Buf, Bytes}; use criterion::{BenchmarkId, Criterion}; -use pageserver::{ - config::PageServerConf, - walrecord::NeonWalRecord, - walredo::{PostgresRedoManager, ProcessKind}, -}; +use pageserver::{config::PageServerConf, walrecord::NeonWalRecord, walredo::PostgresRedoManager}; use pageserver_api::{key::Key, shard::TenantShardId}; use std::{ sync::Arc, @@ -80,39 +60,32 @@ use tokio::{sync::Barrier, task::JoinSet}; use utils::{id::TenantId, lsn::Lsn}; fn bench(c: &mut Criterion) { - for process_kind in &[ProcessKind::Async, ProcessKind::Sync] { - { - let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; - for nclients in nclients { - let mut group = c.benchmark_group(format!("{process_kind}-short")); - group.bench_with_input( - BenchmarkId::from_parameter(nclients), - &nclients, - |b, nclients| { - let redo_work = Arc::new(Request::short_input()); - b.iter_custom(|iters| { - bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients) - }); - }, - ); - } + { + let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; + for nclients in nclients { + let mut group = c.benchmark_group("short"); + group.bench_with_input( + BenchmarkId::from_parameter(nclients), + &nclients, + |b, nclients| { + let redo_work = Arc::new(Request::short_input()); + b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients)); + }, + ); } - - { - let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; - for nclients in nclients { - let mut group = c.benchmark_group(format!("{process_kind}-medium")); - group.bench_with_input( - BenchmarkId::from_parameter(nclients), - &nclients, - |b, nclients| { - let redo_work = Arc::new(Request::medium_input()); - b.iter_custom(|iters| { - bench_impl(*process_kind, Arc::clone(&redo_work), iters, *nclients) - }); - }, - ); - } + } + { + let nclients = [1, 2, 4, 8, 16, 32, 64, 128]; + for nclients in nclients { + let mut group = c.benchmark_group("medium"); + group.bench_with_input( + BenchmarkId::from_parameter(nclients), + &nclients, + |b, nclients| { + let redo_work = Arc::new(Request::medium_input()); + b.iter_custom(|iters| bench_impl(Arc::clone(&redo_work), iters, *nclients)); + }, + ); } } } @@ -120,16 +93,10 @@ criterion::criterion_group!(benches, bench); criterion::criterion_main!(benches); // Returns the sum of each client's wall-clock time spent executing their share of the n_redos. -fn bench_impl( - process_kind: ProcessKind, - redo_work: Arc, - n_redos: u64, - nclients: u64, -) -> Duration { +fn bench_impl(redo_work: Arc, n_redos: u64, nclients: u64) -> Duration { let repo_dir = camino_tempfile::tempdir_in(env!("CARGO_TARGET_TMPDIR")).unwrap(); - let mut conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); - conf.walredo_process_kind = process_kind; + let conf = PageServerConf::dummy_conf(repo_dir.path().to_path_buf()); let conf = Box::leak(Box::new(conf)); let tenant_shard_id = TenantShardId::unsharded(TenantId::generate()); @@ -158,27 +125,13 @@ fn bench_impl( }); } - let elapsed = rt.block_on(async move { + rt.block_on(async move { let mut total_wallclock_time = Duration::ZERO; while let Some(res) = tasks.join_next().await { total_wallclock_time += res.unwrap(); } total_wallclock_time - }); - - // consistency check to ensure process kind setting worked - if nredos_per_client > 0 { - assert_eq!( - manager - .status() - .process - .map(|p| p.kind) - .expect("the benchmark work causes a walredo process to be spawned"), - std::borrow::Cow::Borrowed(process_kind.into()) - ); - } - - elapsed + }) } async fn client( diff --git a/pageserver/src/bin/pageserver.rs b/pageserver/src/bin/pageserver.rs index c0099aa7046f..a04195e12b98 100644 --- a/pageserver/src/bin/pageserver.rs +++ b/pageserver/src/bin/pageserver.rs @@ -284,7 +284,6 @@ fn start_pageserver( )) .unwrap(); pageserver::preinitialize_metrics(); - pageserver::metrics::wal_redo::set_process_kind_metric(conf.walredo_process_kind); // If any failpoints were set from FAILPOINTS environment variable, // print them to the log for debugging purposes diff --git a/pageserver/src/config.rs b/pageserver/src/config.rs index 258eed0b129d..b0afb6414be2 100644 --- a/pageserver/src/config.rs +++ b/pageserver/src/config.rs @@ -99,7 +99,7 @@ pub mod defaults { pub const DEFAULT_EPHEMERAL_BYTES_PER_MEMORY_KB: usize = 0; - pub const DEFAULT_WALREDO_PROCESS_KIND: &str = "sync"; + pub const DEFAULT_WALREDO_PROCESS_KIND: &str = "async"; /// /// Default built-in configuration file. diff --git a/pageserver/src/metrics.rs b/pageserver/src/metrics.rs index b27bfb43b077..ffcd08b4b3ee 100644 --- a/pageserver/src/metrics.rs +++ b/pageserver/src/metrics.rs @@ -1999,29 +1999,6 @@ impl Default for WalRedoProcessCounters { pub(crate) static WAL_REDO_PROCESS_COUNTERS: Lazy = Lazy::new(WalRedoProcessCounters::default); -#[cfg(not(test))] -pub mod wal_redo { - use super::*; - - static PROCESS_KIND: Lazy> = Lazy::new(|| { - std::sync::Mutex::new( - register_uint_gauge_vec!( - "pageserver_wal_redo_process_kind", - "The configured process kind for walredo", - &["kind"], - ) - .unwrap(), - ) - }); - - pub fn set_process_kind_metric(kind: crate::walredo::ProcessKind) { - // use guard to avoid races around the next two steps - let guard = PROCESS_KIND.lock().unwrap(); - guard.reset(); - guard.with_label_values(&[&format!("{kind}")]).set(1); - } -} - /// Similar to `prometheus::HistogramTimer` but does not record on drop. pub(crate) struct StorageTimeMetricsTimer { metrics: StorageTimeMetrics, diff --git a/pageserver/src/walredo.rs b/pageserver/src/walredo.rs index 9776d4ce8882..3decea0c6de4 100644 --- a/pageserver/src/walredo.rs +++ b/pageserver/src/walredo.rs @@ -153,10 +153,7 @@ impl PostgresRedoManager { process: self .redo_process .get() - .map(|p| WalRedoManagerProcessStatus { - pid: p.id(), - kind: std::borrow::Cow::Borrowed(p.kind().into()), - }), + .map(|p| WalRedoManagerProcessStatus { pid: p.id() }), } } } diff --git a/pageserver/src/walredo/process.rs b/pageserver/src/walredo/process.rs index ad6b4e5fe96f..02c9c04bf185 100644 --- a/pageserver/src/walredo/process.rs +++ b/pageserver/src/walredo/process.rs @@ -1,7 +1,10 @@ +/// Layer of indirection previously used to support multiple implementations. +/// Subject to removal: use std::time::Duration; use bytes::Bytes; use pageserver_api::{reltag::RelTag, shard::TenantShardId}; +use tracing::warn; use utils::lsn::Lsn; use crate::{config::PageServerConf, walrecord::NeonWalRecord}; @@ -12,7 +15,6 @@ mod protocol; mod process_impl { pub(super) mod process_async; - pub(super) mod process_std; } #[derive( @@ -34,10 +36,7 @@ pub enum Kind { Async, } -pub(crate) enum Process { - Sync(process_impl::process_std::WalRedoProcess), - Async(process_impl::process_async::WalRedoProcess), -} +pub(crate) struct Process(process_impl::process_async::WalRedoProcess); impl Process { #[inline(always)] @@ -46,18 +45,17 @@ impl Process { tenant_shard_id: TenantShardId, pg_version: u32, ) -> anyhow::Result { - Ok(match conf.walredo_process_kind { - Kind::Sync => Self::Sync(process_impl::process_std::WalRedoProcess::launch( - conf, - tenant_shard_id, - pg_version, - )?), - Kind::Async => Self::Async(process_impl::process_async::WalRedoProcess::launch( - conf, - tenant_shard_id, - pg_version, - )?), - }) + if conf.walredo_process_kind != Kind::Async { + warn!( + configured = %conf.walredo_process_kind, + "the walredo_process_kind setting has been turned into a no-op, using async implementation" + ); + } + Ok(Self(process_impl::process_async::WalRedoProcess::launch( + conf, + tenant_shard_id, + pg_version, + )?)) } #[inline(always)] @@ -69,29 +67,12 @@ impl Process { records: &[(Lsn, NeonWalRecord)], wal_redo_timeout: Duration, ) -> anyhow::Result { - match self { - Process::Sync(p) => { - p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) - .await - } - Process::Async(p) => { - p.apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) - .await - } - } + self.0 + .apply_wal_records(rel, blknum, base_img, records, wal_redo_timeout) + .await } pub(crate) fn id(&self) -> u32 { - match self { - Process::Sync(p) => p.id(), - Process::Async(p) => p.id(), - } - } - - pub(crate) fn kind(&self) -> Kind { - match self { - Process::Sync(_) => Kind::Sync, - Process::Async(_) => Kind::Async, - } + self.0.id() } } diff --git a/pageserver/src/walredo/process/process_impl/process_std.rs b/pageserver/src/walredo/process/process_impl/process_std.rs deleted file mode 100644 index e7a6c263c9bb..000000000000 --- a/pageserver/src/walredo/process/process_impl/process_std.rs +++ /dev/null @@ -1,405 +0,0 @@ -use self::no_leak_child::NoLeakChild; -use crate::{ - config::PageServerConf, - metrics::{WalRedoKillCause, WAL_REDO_PROCESS_COUNTERS, WAL_REDO_RECORD_COUNTER}, - walrecord::NeonWalRecord, - walredo::process::{no_leak_child, protocol}, -}; -use anyhow::Context; -use bytes::Bytes; -use nix::poll::{PollFd, PollFlags}; -use pageserver_api::{reltag::RelTag, shard::TenantShardId}; -use postgres_ffi::BLCKSZ; -use std::os::fd::AsRawFd; -#[cfg(feature = "testing")] -use std::sync::atomic::AtomicUsize; -use std::{ - collections::VecDeque, - io::{Read, Write}, - process::{ChildStdin, ChildStdout, Command, Stdio}, - sync::{Mutex, MutexGuard}, - time::Duration, -}; -use tracing::{debug, error, instrument, Instrument}; -use utils::{lsn::Lsn, nonblock::set_nonblock}; - -pub struct WalRedoProcess { - #[allow(dead_code)] - conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, - // Some() on construction, only becomes None on Drop. - child: Option, - stdout: Mutex, - stdin: Mutex, - /// Counter to separate same sized walredo inputs failing at the same millisecond. - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize, -} - -struct ProcessInput { - stdin: ChildStdin, - n_requests: usize, -} - -struct ProcessOutput { - stdout: ChildStdout, - pending_responses: VecDeque>, - n_processed_responses: usize, -} - -impl WalRedoProcess { - // - // Start postgres binary in special WAL redo mode. - // - #[instrument(skip_all,fields(pg_version=pg_version))] - pub(crate) fn launch( - conf: &'static PageServerConf, - tenant_shard_id: TenantShardId, - pg_version: u32, - ) -> anyhow::Result { - crate::span::debug_assert_current_span_has_tenant_id(); - - let pg_bin_dir_path = conf.pg_bin_dir(pg_version).context("pg_bin_dir")?; // TODO these should be infallible. - let pg_lib_dir_path = conf.pg_lib_dir(pg_version).context("pg_lib_dir")?; - - use no_leak_child::NoLeakChildCommandExt; - // Start postgres itself - let child = Command::new(pg_bin_dir_path.join("postgres")) - // the first arg must be --wal-redo so the child process enters into walredo mode - .arg("--wal-redo") - // the child doesn't process this arg, but, having it in the argv helps indentify the - // walredo process for a particular tenant when debugging a pagserver - .args(["--tenant-shard-id", &format!("{tenant_shard_id}")]) - .stdin(Stdio::piped()) - .stderr(Stdio::piped()) - .stdout(Stdio::piped()) - .env_clear() - .env("LD_LIBRARY_PATH", &pg_lib_dir_path) - .env("DYLD_LIBRARY_PATH", &pg_lib_dir_path) - // NB: The redo process is not trusted after we sent it the first - // walredo work. Before that, it is trusted. Specifically, we trust - // it to - // 1. close all file descriptors except stdin, stdout, stderr because - // pageserver might not be 100% diligent in setting FD_CLOEXEC on all - // the files it opens, and - // 2. to use seccomp to sandbox itself before processing the first - // walredo request. - .spawn_no_leak_child(tenant_shard_id) - .context("spawn process")?; - WAL_REDO_PROCESS_COUNTERS.started.inc(); - let mut child = scopeguard::guard(child, |child| { - error!("killing wal-redo-postgres process due to a problem during launch"); - child.kill_and_wait(WalRedoKillCause::Startup); - }); - - let stdin = child.stdin.take().unwrap(); - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - let stderr = tokio::process::ChildStderr::from_std(stderr) - .context("convert to tokio::ChildStderr")?; - macro_rules! set_nonblock_or_log_err { - ($file:ident) => {{ - let res = set_nonblock($file.as_raw_fd()); - if let Err(e) = &res { - error!(error = %e, file = stringify!($file), pid = child.id(), "set_nonblock failed"); - } - res - }}; - } - set_nonblock_or_log_err!(stdin)?; - set_nonblock_or_log_err!(stdout)?; - - // all fallible operations post-spawn are complete, so get rid of the guard - let child = scopeguard::ScopeGuard::into_inner(child); - - tokio::spawn( - async move { - scopeguard::defer! { - debug!("wal-redo-postgres stderr_logger_task finished"); - crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_finished.inc(); - } - debug!("wal-redo-postgres stderr_logger_task started"); - crate::metrics::WAL_REDO_PROCESS_COUNTERS.active_stderr_logger_tasks_started.inc(); - - use tokio::io::AsyncBufReadExt; - let mut stderr_lines = tokio::io::BufReader::new(stderr); - let mut buf = Vec::new(); - let res = loop { - buf.clear(); - // TODO we don't trust the process to cap its stderr length. - // Currently it can do unbounded Vec allocation. - match stderr_lines.read_until(b'\n', &mut buf).await { - Ok(0) => break Ok(()), // eof - Ok(num_bytes) => { - let output = String::from_utf8_lossy(&buf[..num_bytes]); - error!(%output, "received output"); - } - Err(e) => { - break Err(e); - } - } - }; - match res { - Ok(()) => (), - Err(e) => { - error!(error=?e, "failed to read from walredo stderr"); - } - } - }.instrument(tracing::info_span!(parent: None, "wal-redo-postgres-stderr", pid = child.id(), tenant_id = %tenant_shard_id.tenant_id, shard_id = %tenant_shard_id.shard_slug(), %pg_version)) - ); - - Ok(Self { - conf, - tenant_shard_id, - child: Some(child), - stdin: Mutex::new(ProcessInput { - stdin, - n_requests: 0, - }), - stdout: Mutex::new(ProcessOutput { - stdout, - pending_responses: VecDeque::new(), - n_processed_responses: 0, - }), - #[cfg(feature = "testing")] - dump_sequence: AtomicUsize::default(), - }) - } - - pub(crate) fn id(&self) -> u32 { - self.child - .as_ref() - .expect("must not call this during Drop") - .id() - } - - // Apply given WAL records ('records') over an old page image. Returns - // new page image. - // - #[instrument(skip_all, fields(tenant_id=%self.tenant_shard_id.tenant_id, shard_id=%self.tenant_shard_id.shard_slug(), pid=%self.id()))] - pub(crate) async fn apply_wal_records( - &self, - rel: RelTag, - blknum: u32, - base_img: &Option, - records: &[(Lsn, NeonWalRecord)], - wal_redo_timeout: Duration, - ) -> anyhow::Result { - let tag = protocol::BufferTag { rel, blknum }; - let input = self.stdin.lock().unwrap(); - - // Serialize all the messages to send the WAL redo process first. - // - // This could be problematic if there are millions of records to replay, - // but in practice the number of records is usually so small that it doesn't - // matter, and it's better to keep this code simple. - // - // Most requests start with a before-image with BLCKSZ bytes, followed by - // by some other WAL records. Start with a buffer that can hold that - // comfortably. - let mut writebuf: Vec = Vec::with_capacity((BLCKSZ as usize) * 3); - protocol::build_begin_redo_for_block_msg(tag, &mut writebuf); - if let Some(img) = base_img { - protocol::build_push_page_msg(tag, img, &mut writebuf); - } - for (lsn, rec) in records.iter() { - if let NeonWalRecord::Postgres { - will_init: _, - rec: postgres_rec, - } = rec - { - protocol::build_apply_record_msg(*lsn, postgres_rec, &mut writebuf); - } else { - anyhow::bail!("tried to pass neon wal record to postgres WAL redo"); - } - } - protocol::build_get_page_msg(tag, &mut writebuf); - WAL_REDO_RECORD_COUNTER.inc_by(records.len() as u64); - - let res = self.apply_wal_records0(&writebuf, input, wal_redo_timeout); - - if res.is_err() { - // not all of these can be caused by this particular input, however these are so rare - // in tests so capture all. - self.record_and_log(&writebuf); - } - - res - } - - fn apply_wal_records0( - &self, - writebuf: &[u8], - input: MutexGuard, - wal_redo_timeout: Duration, - ) -> anyhow::Result { - let mut proc = { input }; // TODO: remove this legacy rename, but this keep the patch small. - let mut nwrite = 0usize; - - while nwrite < writebuf.len() { - let mut stdin_pollfds = [PollFd::new(&proc.stdin, PollFlags::POLLOUT)]; - let n = loop { - match nix::poll::poll(&mut stdin_pollfds[..], wal_redo_timeout.as_millis() as i32) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; - - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If 'stdin' is writeable, do write. - let in_revents = stdin_pollfds[0].revents().unwrap(); - if in_revents & (PollFlags::POLLERR | PollFlags::POLLOUT) != PollFlags::empty() { - nwrite += proc.stdin.write(&writebuf[nwrite..])?; - } - if in_revents.contains(PollFlags::POLLHUP) { - // We still have more data to write, but the process closed the pipe. - anyhow::bail!("WAL redo process closed its stdin unexpectedly"); - } - } - let request_no = proc.n_requests; - proc.n_requests += 1; - drop(proc); - - // To improve walredo performance we separate sending requests and receiving - // responses. Them are protected by different mutexes (output and input). - // If thread T1, T2, T3 send requests D1, D2, D3 to walredo process - // then there is not warranty that T1 will first granted output mutex lock. - // To address this issue we maintain number of sent requests, number of processed - // responses and ring buffer with pending responses. After sending response - // (under input mutex), threads remembers request number. Then it releases - // input mutex, locks output mutex and fetch in ring buffer all responses until - // its stored request number. The it takes correspondent element from - // pending responses ring buffer and truncate all empty elements from the front, - // advancing processed responses number. - - let mut output = self.stdout.lock().unwrap(); - let n_processed_responses = output.n_processed_responses; - while n_processed_responses + output.pending_responses.len() <= request_no { - // We expect the WAL redo process to respond with an 8k page image. We read it - // into this buffer. - let mut resultbuf = vec![0; BLCKSZ.into()]; - let mut nresult: usize = 0; // # of bytes read into 'resultbuf' so far - while nresult < BLCKSZ.into() { - let mut stdout_pollfds = [PollFd::new(&output.stdout, PollFlags::POLLIN)]; - // We do two things simultaneously: reading response from stdout - // and forward any logging information that the child writes to its stderr to the page server's log. - let n = loop { - match nix::poll::poll( - &mut stdout_pollfds[..], - wal_redo_timeout.as_millis() as i32, - ) { - Err(nix::errno::Errno::EINTR) => continue, - res => break res, - } - }?; - - if n == 0 { - anyhow::bail!("WAL redo timed out"); - } - - // If we have some data in stdout, read it to the result buffer. - let out_revents = stdout_pollfds[0].revents().unwrap(); - if out_revents & (PollFlags::POLLERR | PollFlags::POLLIN) != PollFlags::empty() { - nresult += output.stdout.read(&mut resultbuf[nresult..])?; - } - if out_revents.contains(PollFlags::POLLHUP) { - anyhow::bail!("WAL redo process closed its stdout unexpectedly"); - } - } - output - .pending_responses - .push_back(Some(Bytes::from(resultbuf))); - } - // Replace our request's response with None in `pending_responses`. - // Then make space in the ring buffer by clearing out any seqence of contiguous - // `None`'s from the front of `pending_responses`. - // NB: We can't pop_front() because other requests' responses because another - // requester might have grabbed the output mutex before us: - // T1: grab input mutex - // T1: send request_no 23 - // T1: release input mutex - // T2: grab input mutex - // T2: send request_no 24 - // T2: release input mutex - // T2: grab output mutex - // T2: n_processed_responses + output.pending_responses.len() <= request_no - // 23 0 24 - // T2: enters poll loop that reads stdout - // T2: put response for 23 into pending_responses - // T2: put response for 24 into pending_resposnes - // pending_responses now looks like this: Front Some(response_23) Some(response_24) Back - // T2: takes its response_24 - // pending_responses now looks like this: Front Some(response_23) None Back - // T2: does the while loop below - // pending_responses now looks like this: Front Some(response_23) None Back - // T2: releases output mutex - // T1: grabs output mutex - // T1: n_processed_responses + output.pending_responses.len() > request_no - // 23 2 23 - // T1: skips poll loop that reads stdout - // T1: takes its response_23 - // pending_responses now looks like this: Front None None Back - // T2: does the while loop below - // pending_responses now looks like this: Front Back - // n_processed_responses now has value 25 - let res = output.pending_responses[request_no - n_processed_responses] - .take() - .expect("we own this request_no, nobody else is supposed to take it"); - while let Some(front) = output.pending_responses.front() { - if front.is_none() { - output.pending_responses.pop_front(); - output.n_processed_responses += 1; - } else { - break; - } - } - Ok(res) - } - - #[cfg(feature = "testing")] - fn record_and_log(&self, writebuf: &[u8]) { - use std::sync::atomic::Ordering; - - let millis = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap() - .as_millis(); - - let seq = self.dump_sequence.fetch_add(1, Ordering::Relaxed); - - // these files will be collected to an allure report - let filename = format!("walredo-{millis}-{}-{seq}.walredo", writebuf.len()); - - let path = self.conf.tenant_path(&self.tenant_shard_id).join(&filename); - - let res = std::fs::OpenOptions::new() - .write(true) - .create_new(true) - .read(true) - .open(path) - .and_then(|mut f| f.write_all(writebuf)); - - // trip up allowed_errors - if let Err(e) = res { - tracing::error!(target=%filename, length=writebuf.len(), "failed to write out the walredo errored input: {e}"); - } else { - tracing::error!(filename, "erroring walredo input saved"); - } - } - - #[cfg(not(feature = "testing"))] - fn record_and_log(&self, _: &[u8]) {} -} - -impl Drop for WalRedoProcess { - fn drop(&mut self) { - self.child - .take() - .expect("we only do this once") - .kill_and_wait(WalRedoKillCause::WalRedoProcessDrop); - // no way to wait for stderr_logger_task from Drop because that is async only - } -} diff --git a/test_runner/regress/test_pageserver_config.py b/test_runner/regress/test_pageserver_config.py deleted file mode 100644 index c04348b4881c..000000000000 --- a/test_runner/regress/test_pageserver_config.py +++ /dev/null @@ -1,35 +0,0 @@ -import pytest -from fixtures.neon_fixtures import ( - NeonEnvBuilder, - last_flush_lsn_upload, -) - - -@pytest.mark.parametrize("kind", ["sync", "async"]) -def test_walredo_process_kind_config(neon_env_builder: NeonEnvBuilder, kind: str): - neon_env_builder.pageserver_config_override = f"walredo_process_kind = '{kind}'" - # ensure it starts - env = neon_env_builder.init_start() - # ensure the metric is set - ps_http = env.pageserver.http_client() - metrics = ps_http.get_metrics() - samples = metrics.query_all("pageserver_wal_redo_process_kind") - assert [(s.labels, s.value) for s in samples] == [({"kind": kind}, 1)] - # ensure default tenant's config kind matches - # => write some data to force-spawn walredo - ep = env.endpoints.create_start("main") - with ep.connect() as conn: - with conn.cursor() as cur: - cur.execute("create table foo(bar text)") - cur.execute("insert into foo select from generate_series(1, 100)") - last_flush_lsn_upload(env, ep, env.initial_tenant, env.initial_timeline) - ep.stop() - ep.start() - with ep.connect() as conn: - with conn.cursor() as cur: - cur.execute("select count(*) from foo") - [(count,)] = cur.fetchall() - assert count == 100 - - status = ps_http.tenant_status(env.initial_tenant) - assert status["walredo"]["process"]["kind"] == kind