diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index 7cb96d909458..33241dbdf76e 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -1,19 +1,18 @@ use crate::auth::{Claims, JwtAuth}; use crate::http::error::{api_error_handler, route_error_handler, ApiError}; -use anyhow::{anyhow, Context}; +use anyhow::Context; use hyper::header::{HeaderName, AUTHORIZATION}; use hyper::http::HeaderValue; use hyper::Method; -use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server}; +use hyper::{header::CONTENT_TYPE, Body, Request, Response}; use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder}; use once_cell::sync::Lazy; use routerify::ext::RequestExt; -use routerify::{Middleware, RequestInfo, Router, RouterBuilder, RouterService}; +use routerify::{Middleware, RequestInfo, Router, RouterBuilder}; use tokio::task::JoinError; use tracing::{self, debug, info, info_span, warn, Instrument}; use std::future::Future; -use std::net::TcpListener; use std::str::FromStr; static SERVE_METRICS_COUNT: Lazy = Lazy::new(|| { @@ -348,40 +347,6 @@ pub fn check_permission_with( } } -/// -/// Start listening for HTTP requests on given socket. -/// -/// 'shutdown_future' can be used to stop. If the Future becomes -/// ready, we stop listening for new requests, and the function returns. -/// -pub fn serve_thread_main( - router_builder: RouterBuilder, - listener: TcpListener, - shutdown_future: S, -) -> anyhow::Result<()> -where - S: Future + Send + Sync, -{ - info!("Starting an HTTP endpoint at {}", listener.local_addr()?); - - // Create a Service from the router above to handle incoming requests. - let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap(); - - // Enter a single-threaded tokio runtime bound to the current thread - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let _guard = runtime.enter(); - - let server = Server::from_tcp(listener)? - .serve(service) - .with_graceful_shutdown(shutdown_future); - - runtime.block_on(server)?; - - Ok(()) -} #[cfg(test)] mod tests { use super::*; diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index fecbb8bd414d..0625538bf316 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -3,15 +3,19 @@ // use anyhow::{bail, Context, Result}; use clap::Parser; +use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, StreamExt}; use remote_storage::RemoteStorageConfig; +use tokio::runtime::Handle; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::task::JoinError; use toml_edit::Document; -use utils::signals::ShutdownSignals; use std::fs::{self, File}; use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::thread; use std::time::Duration; use storage_broker::Uri; use tokio::sync::mpsc; @@ -20,22 +24,21 @@ use tracing::*; use utils::pid_file; use metrics::set_build_info_metric; -use safekeeper::broker; -use safekeeper::control_file; use safekeeper::defaults::{ DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PG_LISTEN_ADDR, }; -use safekeeper::http; -use safekeeper::remove_wal; -use safekeeper::wal_backup; use safekeeper::wal_service; use safekeeper::GlobalTimelines; use safekeeper::SafeKeeperConf; +use safekeeper::{broker, WAL_SERVICE_RUNTIME}; +use safekeeper::{control_file, BROKER_RUNTIME}; +use safekeeper::{http, WAL_REMOVER_RUNTIME}; +use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME}; +use safekeeper::{wal_backup, HTTP_RUNTIME}; use storage_broker::DEFAULT_ENDPOINT; use utils::auth::JwtAuth; use utils::{ - http::endpoint, id::NodeId, logging::{self, LogFormat}, project_git_version, @@ -104,10 +107,6 @@ struct Args { /// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes #[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)] max_offloader_lag: u64, - /// Number of threads for wal backup runtime, by default number of cores - /// available to the system. - #[arg(long)] - wal_backup_threads: Option, /// Number of max parallel WAL segments to be offloaded to remote storage. #[arg(long, default_value = "5")] wal_backup_parallel_jobs: usize, @@ -121,9 +120,14 @@ struct Args { /// Format for logging, either 'plain' or 'json'. #[arg(long, default_value = "plain")] log_format: String, + /// Run everything in single threaded current thread runtime, might be + /// useful for debugging. + #[arg(long)] + current_thread_runtime: bool, } -fn main() -> anyhow::Result<()> { +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { let args = Args::parse(); if let Some(addr) = args.dump_control_file { @@ -183,10 +187,10 @@ fn main() -> anyhow::Result<()> { heartbeat_timeout: args.heartbeat_timeout, remote_storage: args.remote_storage, max_offloader_lag_bytes: args.max_offloader_lag, - backup_runtime_threads: args.wal_backup_threads, wal_backup_enabled: !args.disable_wal_backup, backup_parallel_jobs: args.wal_backup_parallel_jobs, auth, + current_thread_runtime: args.current_thread_runtime, }; // initialize sentry if SENTRY_DSN is provided @@ -194,10 +198,14 @@ fn main() -> anyhow::Result<()> { Some(GIT_VERSION.into()), &[("node_id", &conf.my_id.to_string())], ); - start_safekeeper(conf) + start_safekeeper(conf).await } -fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { +/// Result of joining any of main tasks: upper error means task failed to +/// complete, e.g. panicked, inner is error produced by task itself. +type JoinTaskRes = Result, JoinError>; + +async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { // Prevent running multiple safekeepers on the same directory let lock_file_path = conf.workdir.join(PID_FILE_NAME); let lock_file = @@ -208,14 +216,18 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { // we need to release the lock file only when the current process is gone std::mem::forget(lock_file); - let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| { - error!("failed to bind to address {}: {}", conf.listen_http_addr, e); + info!("starting safekeeper WAL service on {}", conf.listen_pg_addr); + let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| { + error!("failed to bind to address {}: {}", conf.listen_pg_addr, e); e })?; - info!("starting safekeeper on {}", conf.listen_pg_addr); - let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| { - error!("failed to bind to address {}: {}", conf.listen_pg_addr, e); + info!( + "starting safekeeper HTTP service on {}", + conf.listen_http_addr + ); + let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| { + error!("failed to bind to address {}: {}", conf.listen_http_addr, e); e })?; @@ -224,71 +236,88 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { let timeline_collector = safekeeper::metrics::TimelineCollector::new(); metrics::register_internal(Box::new(timeline_collector))?; - let mut threads = vec![]; let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); // Load all timelines from disk to memory. GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?; - let conf_ = conf.clone(); - threads.push( - thread::Builder::new() - .name("http_endpoint_thread".into()) - .spawn(|| { - let router = http::make_router(conf_); - endpoint::serve_thread_main( - router, - http_listener, - std::future::pending(), // never shut down - ) - .unwrap(); - })?, - ); + // Keep handles to main tasks to die if any of them disappears. + let mut tasks_handles: FuturesUnordered> = + FuturesUnordered::new(); - let conf_cloned = conf.clone(); - let safekeeper_thread = thread::Builder::new() - .name("WAL service thread".into()) - .spawn(|| wal_service::thread_main(conf_cloned, pg_listener)) - .unwrap(); + let conf_ = conf.clone(); + // Run everything in current thread rt, if asked. + if conf.current_thread_runtime { + info!("running in current thread runtime"); + } + let current_thread_rt = conf + .current_thread_runtime + .then(|| Handle::try_current().expect("no runtime in main")); + let wal_service_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) + .spawn(wal_service::task_main(conf_, pg_listener)) + // wrap with task name for error reporting + .map(|res| ("WAL service main".to_owned(), res)); + tasks_handles.push(Box::pin(wal_service_handle)); - threads.push(safekeeper_thread); + let conf_ = conf.clone(); + let http_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| HTTP_RUNTIME.handle()) + .spawn(http::task_main(conf_, http_listener)) + .map(|res| ("HTTP service main".to_owned(), res)); + tasks_handles.push(Box::pin(http_handle)); let conf_ = conf.clone(); - threads.push( - thread::Builder::new() - .name("broker thread".into()) - .spawn(|| { - broker::thread_main(conf_); - })?, - ); + let broker_task_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| BROKER_RUNTIME.handle()) + .spawn(broker::task_main(conf_).instrument(info_span!("broker"))) + .map(|res| ("broker main".to_owned(), res)); + tasks_handles.push(Box::pin(broker_task_handle)); let conf_ = conf.clone(); - threads.push( - thread::Builder::new() - .name("WAL removal thread".into()) - .spawn(|| { - remove_wal::thread_main(conf_); - })?, - ); + let wal_remover_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| WAL_REMOVER_RUNTIME.handle()) + .spawn(remove_wal::task_main(conf_)) + .map(|res| ("WAL remover".to_owned(), res)); + tasks_handles.push(Box::pin(wal_remover_handle)); - threads.push( - thread::Builder::new() - .name("WAL backup launcher thread".into()) - .spawn(move || { - wal_backup::wal_backup_launcher_thread_main(conf, wal_backup_launcher_rx); - })?, - ); + let conf_ = conf.clone(); + let wal_backup_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle()) + .spawn(wal_backup::wal_backup_launcher_task_main( + conf_, + wal_backup_launcher_rx, + )) + .map(|res| ("WAL backup launcher".to_owned(), res)); + tasks_handles.push(Box::pin(wal_backup_handle)); set_build_info_metric(GIT_VERSION); - // TODO: put more thoughts into handling of failed threads - // We should catch & die if they are in trouble. - - // On any shutdown signal, log receival and exit. Additionally, handling - // SIGQUIT prevents coredump. - ShutdownSignals::handle(|signal| { - info!("received {}, terminating", signal.name()); - std::process::exit(0); - }) + + // TODO: update tokio-stream, convert to real async Stream with + // SignalStream, map it to obtain missing signal name, combine streams into + // single stream we can easily sit on. + let mut sigquit_stream = signal(SignalKind::quit())?; + let mut sigint_stream = signal(SignalKind::interrupt())?; + let mut sigterm_stream = signal(SignalKind::terminate())?; + + tokio::select! { + Some((task_name, res)) = tasks_handles.next()=> { + error!("{} task failed: {:?}, exiting", task_name, res); + std::process::exit(1); + } + // On any shutdown signal, log receival and exit. Additionally, handling + // SIGQUIT prevents coredump. + _ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"), + _ = sigint_stream.recv() => info!("received SIGINT, terminating"), + _ = sigterm_stream.recv() => info!("received SIGTERM, terminating") + + }; + std::process::exit(0); } /// Determine safekeeper id. diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 3282afc72d75..2b1db2714b10 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -16,7 +16,7 @@ use storage_broker::Request; use std::time::Duration; use std::time::Instant; use tokio::task::JoinHandle; -use tokio::{runtime, time::sleep}; +use tokio::time::sleep; use tracing::*; use crate::metrics::BROKER_ITERATION_TIMELINES; @@ -29,20 +29,6 @@ use crate::SafeKeeperConf; const RETRY_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000; -pub fn thread_main(conf: SafeKeeperConf) { - let runtime = runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let _enter = info_span!("broker").entered(); - info!("started, broker endpoint {:?}", conf.broker_endpoint); - - runtime.block_on(async { - main_loop(conf).await; - }); -} - /// Push once in a while data about all active timelines to the broker. async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { let mut client = @@ -56,20 +42,27 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // sensitive and there is no risk of deadlock as we don't await while // lock is held. let now = Instant::now(); - let mut active_tlis = GlobalTimelines::get_all(); - active_tlis.retain(|tli| tli.is_active()); - for tli in &active_tlis { - let sk_info = tli.get_safekeeper_info(&conf); + let all_tlis = GlobalTimelines::get_all(); + let mut n_pushed_tlis = 0; + for tli in &all_tlis { + // filtering alternative futures::stream::iter(all_tlis) + // .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::>().await; + // doesn't look better, and I'm not sure how to do that without collect. + if !tli.is_active().await { + continue; + } + let sk_info = tli.get_safekeeper_info(&conf).await; yield sk_info; BROKER_PUSHED_UPDATES.inc(); + n_pushed_tlis += 1; } let elapsed = now.elapsed(); BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64()); - BROKER_ITERATION_TIMELINES.observe(active_tlis.len() as f64); + BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64); if elapsed > push_interval / 2 { - info!("broker push is too long, pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed); + info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed); } sleep(push_interval).await; @@ -126,10 +119,13 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { bail!("end of stream"); } -async fn main_loop(conf: SafeKeeperConf) { +pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { + info!("started, broker endpoint {:?}", conf.broker_endpoint); + let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC)); let mut push_handle: Option>> = None; let mut pull_handle: Option>> = None; + // Selecting on JoinHandles requires some squats; is there a better way to // reap tasks individually? diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index b1b0c032d7c6..6c4ad24323d2 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -2,9 +2,10 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use tokio::fs::{self, File}; +use tokio::io::AsyncWriteExt; -use std::fs::{self, File, OpenOptions}; -use std::io::{Read, Write}; +use std::io::Read; use std::ops::Deref; use std::path::{Path, PathBuf}; use std::time::Instant; @@ -26,9 +27,10 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); /// Storage should keep actual state inside of it. It should implement Deref /// trait to access state fields and have persist method for updating that state. +#[async_trait::async_trait] pub trait Storage: Deref { /// Persist safekeeper state on disk and update internal state. - fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; + async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; /// Timestamp of last persist. fn last_persist_at(&self) -> Instant; @@ -82,7 +84,7 @@ impl FileStorage { /// Check the magic/version in the on-disk data and deserialize it, if possible. fn deser_sk_state(buf: &mut &[u8]) -> Result { // Read the version independent part - let magic = buf.read_u32::()?; + let magic = ReadBytesExt::read_u32::(buf)?; if magic != SK_MAGIC { bail!( "bad control file magic: {:X}, expected {:X}", @@ -90,7 +92,7 @@ impl FileStorage { SK_MAGIC ); } - let version = buf.read_u32::()?; + let version = ReadBytesExt::read_u32::(buf)?; if version == SK_FORMAT_VERSION { let res = SafeKeeperState::des(buf)?; return Ok(res); @@ -110,7 +112,7 @@ impl FileStorage { /// Read in the control file. pub fn load_control_file>(control_file_path: P) -> Result { - let mut control_file = OpenOptions::new() + let mut control_file = std::fs::OpenOptions::new() .read(true) .write(true) .open(&control_file_path) @@ -159,30 +161,31 @@ impl Deref for FileStorage { } } +#[async_trait::async_trait] impl Storage for FileStorage { /// persists state durably to underlying storage /// for description see https://lwn.net/Articles/457667/ - fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { + async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer(); // write data to safekeeper.control.partial let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); - let mut control_partial = File::create(&control_partial_path).with_context(|| { + let mut control_partial = File::create(&control_partial_path).await.with_context(|| { format!( "failed to create partial control file at: {}", &control_partial_path.display() ) })?; let mut buf: Vec = Vec::new(); - buf.write_u32::(SK_MAGIC)?; - buf.write_u32::(SK_FORMAT_VERSION)?; + WriteBytesExt::write_u32::(&mut buf, SK_MAGIC)?; + WriteBytesExt::write_u32::(&mut buf, SK_FORMAT_VERSION)?; s.ser_into(&mut buf)?; // calculate checksum before resize let checksum = crc32c::crc32c(&buf); buf.extend_from_slice(&checksum.to_le_bytes()); - control_partial.write_all(&buf).with_context(|| { + control_partial.write_all(&buf).await.with_context(|| { format!( "failed to write safekeeper state into control file at: {}", control_partial_path.display() @@ -191,7 +194,7 @@ impl Storage for FileStorage { // fsync the file if !self.conf.no_sync { - control_partial.sync_all().with_context(|| { + control_partial.sync_all().await.with_context(|| { format!( "failed to sync partial control file at {}", control_partial_path.display() @@ -202,21 +205,22 @@ impl Storage for FileStorage { let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); // rename should be atomic - fs::rename(&control_partial_path, &control_path)?; + fs::rename(&control_partial_path, &control_path).await?; // this sync is not required by any standard but postgres does this (see durable_rename) if !self.conf.no_sync { - File::open(&control_path) - .and_then(|f| f.sync_all()) - .with_context(|| { - format!( - "failed to sync control file at: {}", - &control_path.display() - ) - })?; + let new_f = File::open(&control_path).await?; + new_f.sync_all().await.with_context(|| { + format!( + "failed to sync control file at: {}", + &control_path.display() + ) + })?; // fsync the directory (linux specific) - File::open(&self.timeline_dir) - .and_then(|f| f.sync_all()) + let tli_dir = File::open(&self.timeline_dir).await?; + tli_dir + .sync_all() + .await .context("failed to sync control file directory")?; } @@ -236,7 +240,6 @@ mod test { use super::*; use crate::{safekeeper::SafeKeeperState, SafeKeeperConf}; use anyhow::Result; - use std::fs; use utils::{id::TenantTimelineId, lsn::Lsn}; fn stub_conf() -> SafeKeeperConf { @@ -247,59 +250,75 @@ mod test { } } - fn load_from_control_file( + async fn load_from_control_file( conf: &SafeKeeperConf, ttid: &TenantTimelineId, ) -> Result<(FileStorage, SafeKeeperState)> { - fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir"); + fs::create_dir_all(conf.timeline_dir(ttid)) + .await + .expect("failed to create timeline dir"); Ok(( FileStorage::restore_new(ttid, conf)?, FileStorage::load_control_file_conf(conf, ttid)?, )) } - fn create( + async fn create( conf: &SafeKeeperConf, ttid: &TenantTimelineId, ) -> Result<(FileStorage, SafeKeeperState)> { - fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir"); + fs::create_dir_all(conf.timeline_dir(ttid)) + .await + .expect("failed to create timeline dir"); let state = SafeKeeperState::empty(); let storage = FileStorage::create_new(ttid, conf, state.clone())?; Ok((storage, state)) } - #[test] - fn test_read_write_safekeeper_state() { + #[tokio::test] + async fn test_read_write_safekeeper_state() { let conf = stub_conf(); let ttid = TenantTimelineId::generate(); { - let (mut storage, mut state) = create(&conf, &ttid).expect("failed to create state"); + let (mut storage, mut state) = + create(&conf, &ttid).await.expect("failed to create state"); // change something state.commit_lsn = Lsn(42); - storage.persist(&state).expect("failed to persist state"); + storage + .persist(&state) + .await + .expect("failed to persist state"); } - let (_, state) = load_from_control_file(&conf, &ttid).expect("failed to read state"); + let (_, state) = load_from_control_file(&conf, &ttid) + .await + .expect("failed to read state"); assert_eq!(state.commit_lsn, Lsn(42)); } - #[test] - fn test_safekeeper_state_checksum_mismatch() { + #[tokio::test] + async fn test_safekeeper_state_checksum_mismatch() { let conf = stub_conf(); let ttid = TenantTimelineId::generate(); { - let (mut storage, mut state) = create(&conf, &ttid).expect("failed to read state"); + let (mut storage, mut state) = + create(&conf, &ttid).await.expect("failed to read state"); // change something state.commit_lsn = Lsn(42); - storage.persist(&state).expect("failed to persist state"); + storage + .persist(&state) + .await + .expect("failed to persist state"); } let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME); - let mut data = fs::read(&control_path).unwrap(); + let mut data = fs::read(&control_path).await.unwrap(); data[0] += 1; // change the first byte of the file to fail checksum validation - fs::write(&control_path, &data).expect("failed to write control file"); + fs::write(&control_path, &data) + .await + .expect("failed to write control file"); - match load_from_control_file(&conf, &ttid) { + match load_from_control_file(&conf, &ttid).await { Err(err) => assert!(err .to_string() .contains("safekeeper control file checksum mismatch")), diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index f711c4429d7f..387b577a1370 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -121,7 +121,7 @@ pub struct FileInfo { } /// Build debug dump response, using the provided [`Args`] filters. -pub fn build(args: Args) -> Result { +pub async fn build(args: Args) -> Result { let start_time = Utc::now(); let timelines_count = GlobalTimelines::timelines_count(); @@ -155,7 +155,7 @@ pub fn build(args: Args) -> Result { } let control_file = if args.dump_control_file { - let mut state = tli.get_state().1; + let mut state = tli.get_state().await.1; if !args.dump_term_history { state.acceptor_state.term_history = TermHistory(vec![]); } @@ -165,7 +165,7 @@ pub fn build(args: Args) -> Result { }; let memory = if args.dump_memory { - Some(tli.memory_dump()) + Some(tli.memory_dump().await) } else { None }; diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 7d25ced4493c..1367d5eebbc3 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -256,14 +256,14 @@ impl SafekeeperPostgresHandler { let lsn = if self.is_walproposer_recovery() { // walproposer should get all local WAL until flush_lsn - tli.get_flush_lsn() + tli.get_flush_lsn().await } else { // other clients shouldn't get any uncommitted WAL - tli.get_state().0.commit_lsn + tli.get_state().await.0.commit_lsn } .to_string(); - let sysid = tli.get_state().1.server.system_id.to_string(); + let sysid = tli.get_state().await.1.server.system_id.to_string(); let lsn_bytes = lsn.as_bytes(); let tli = PG_TLI.to_string(); let tli_bytes = tli.as_bytes(); diff --git a/safekeeper/src/http/mod.rs b/safekeeper/src/http/mod.rs index 1831470007f6..2a9570595f8a 100644 --- a/safekeeper/src/http/mod.rs +++ b/safekeeper/src/http/mod.rs @@ -2,3 +2,18 @@ pub mod routes; pub use routes::make_router; pub use safekeeper_api::models; + +use crate::SafeKeeperConf; + +pub async fn task_main( + conf: SafeKeeperConf, + http_listener: std::net::TcpListener, +) -> anyhow::Result<()> { + let router = make_router(conf) + .build() + .map_err(|err| anyhow::anyhow!(err))?; + let service = utils::http::RouterService::new(router).unwrap(); + let server = hyper::Server::from_tcp(http_listener)?; + server.serve(service).await?; + Ok(()) // unreachable +} diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index a498d868afc8..b26da55be511 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -13,7 +13,6 @@ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use tokio::fs::File; use tokio::io::AsyncReadExt; -use tokio::task::JoinError; use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; @@ -116,8 +115,8 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result timeline_id, }; - let resp = tokio::task::spawn_blocking(move || { - debug_dump::build(args).map_err(ApiError::InternalServerError) - }) - .await - .map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??; + let resp = debug_dump::build(args) + .await + .map_err(ApiError::InternalServerError)?; // TODO: use streaming response json_response(StatusCode::OK, resp) diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index dc9188723e4e..14d0cc3653fe 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -73,12 +73,12 @@ pub async fn handle_json_ctrl( // if send_proposer_elected is true, we need to update local history if append_request.send_proposer_elected { - send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn)?; + send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?; } - let inserted_wal = append_logical_message(&tli, append_request)?; + let inserted_wal = append_logical_message(&tli, append_request).await?; let response = AppendResult { - state: tli.get_state().1, + state: tli.get_state().await.1, inserted_wal, }; let response_data = serde_json::to_vec(&response) @@ -114,9 +114,9 @@ async fn prepare_safekeeper( .await } -fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> anyhow::Result<()> { +async fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> anyhow::Result<()> { // add new term to existing history - let history = tli.get_state().1.acceptor_state.term_history; + let history = tli.get_state().await.1.acceptor_state.term_history; let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let mut history_entries = history.0; history_entries.push(TermSwitchEntry { term, lsn }); @@ -129,7 +129,7 @@ fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> anyhow::R timeline_start_lsn: lsn, }); - tli.process_msg(&proposer_elected_request)?; + tli.process_msg(&proposer_elected_request).await?; Ok(()) } @@ -142,12 +142,12 @@ pub struct InsertedWAL { /// Extend local WAL with new LogicalMessage record. To do that, /// create AppendRequest with new WAL and pass it to safekeeper. -pub fn append_logical_message( +pub async fn append_logical_message( tli: &Arc, msg: &AppendLogicalMessage, ) -> anyhow::Result { let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); - let sk_state = tli.get_state().1; + let sk_state = tli.get_state().await.1; let begin_lsn = msg.begin_lsn; let end_lsn = begin_lsn + wal_data.len() as u64; @@ -171,7 +171,7 @@ pub fn append_logical_message( wal_data: Bytes::from(wal_data), }); - let response = tli.process_msg(&append_request)?; + let response = tli.process_msg(&append_request).await?; let append_response = match response { Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 22d6d57e19b6..b8e110136948 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -1,4 +1,6 @@ +use once_cell::sync::Lazy; use remote_storage::RemoteStorageConfig; +use tokio::runtime::Runtime; use std::path::PathBuf; use std::time::Duration; @@ -36,7 +38,6 @@ pub mod defaults { DEFAULT_PG_LISTEN_PORT, }; - pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8; pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms"; pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20); } @@ -60,10 +61,10 @@ pub struct SafeKeeperConf { pub heartbeat_timeout: Duration, pub remote_storage: Option, pub max_offloader_lag_bytes: u64, - pub backup_runtime_threads: Option, pub backup_parallel_jobs: usize, pub wal_backup_enabled: bool, pub auth: Option>, + pub current_thread_runtime: bool, } impl SafeKeeperConf { @@ -92,12 +93,64 @@ impl SafeKeeperConf { .parse() .expect("failed to parse default broker endpoint"), broker_keepalive_interval: Duration::from_secs(5), - backup_runtime_threads: None, wal_backup_enabled: true, backup_parallel_jobs: 1, auth: None, heartbeat_timeout: Duration::new(5, 0), max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES, + current_thread_runtime: false, } } } + +// Tokio runtimes. +pub static WAL_SERVICE_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("WAL service worker") + .enable_all() + .build() + .expect("Failed to create WAL service runtime") +}); + +pub static HTTP_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("HTTP worker") + .enable_all() + .build() + .expect("Failed to create WAL service runtime") +}); + +pub static BROKER_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("broker worker") + .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense + .enable_all() + .build() + .expect("Failed to create broker runtime") +}); + +pub static WAL_REMOVER_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("WAL remover") + .worker_threads(1) + .enable_all() + .build() + .expect("Failed to create broker runtime") +}); + +pub static WAL_BACKUP_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("WAL backup worker") + .enable_all() + .build() + .expect("Failed to create WAL backup runtime") +}); + +pub static METRICS_SHIFTER_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("metric shifter") + .worker_threads(1) + .enable_all() + .build() + .expect("Failed to create broker runtime") +}); diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 235a88501d20..0711beb2905d 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -7,6 +7,7 @@ use std::{ use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS}; use anyhow::Result; +use futures::Future; use metrics::{ core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts}, proto::MetricFamily, @@ -292,14 +293,17 @@ impl WalStorageMetrics { } } -/// Accepts a closure that returns a result, and returns the duration of the closure. -pub fn time_io_closure(closure: impl FnOnce() -> Result<()>) -> Result { +/// Accepts async function that returns empty anyhow result, and returns the duration of its execution. +pub async fn time_io_closure>( + closure: impl Future>, +) -> Result { let start = std::time::Instant::now(); - closure()?; + closure.await.map_err(|e| e.into())?; Ok(start.elapsed().as_secs_f64()) } /// Metrics for a single timeline. +#[derive(Clone)] pub struct FullTimelineInfo { pub ttid: TenantTimelineId, pub ps_feedback: PageserverFeedback, @@ -575,13 +579,19 @@ impl Collector for TimelineCollector { let timelines = GlobalTimelines::get_all(); let timelines_count = timelines.len(); - for arc_tli in timelines { - let tli = arc_tli.info_for_metrics(); - if tli.is_none() { - continue; - } - let tli = tli.unwrap(); - + // Prometheus Collector is sync, and data is stored under async lock. To + // bridge the gap with a crutch, collect data in spawned thread with + // local tokio runtime. + let infos = std::thread::spawn(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .expect("failed to create rt"); + rt.block_on(collect_timeline_metrics()) + }) + .join() + .expect("collect_timeline_metrics thread panicked"); + + for tli in &infos { let tenant_id = tli.ttid.tenant_id.to_string(); let timeline_id = tli.ttid.timeline_id.to_string(); let labels = &[tenant_id.as_str(), timeline_id.as_str()]; @@ -682,3 +692,15 @@ impl Collector for TimelineCollector { mfs } } + +async fn collect_timeline_metrics() -> Vec { + let mut res = vec![]; + let timelines = GlobalTimelines::get_all(); + + for tli in timelines { + if let Some(info) = tli.info_for_metrics().await { + res.push(info); + } + } + res +} diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 344b760fd3cb..61ba37efaa22 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -231,7 +231,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result info!( "Loaded timeline {}, flush_lsn={}", ttid, - tli.get_flush_lsn() + tli.get_flush_lsn().await ); Ok(Response { diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 195470e3ca91..a5e99c5f0a30 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -18,15 +18,14 @@ use postgres_backend::QueryError; use pq_proto::BeMessage; use std::net::SocketAddr; use std::sync::Arc; -use std::thread; -use std::thread::JoinHandle; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; -use tokio::task::spawn_blocking; +use tokio::task; +use tokio::task::JoinHandle; use tokio::time::Duration; use tokio::time::Instant; use tracing::*; @@ -97,7 +96,7 @@ impl SafekeeperPostgresHandler { Err(res.expect_err("no error with WalAcceptor not spawn")) } Some(handle) => { - let wal_acceptor_res = handle.join(); + let wal_acceptor_res = handle.await; // If there was any network error, return it. res?; @@ -107,7 +106,7 @@ impl SafekeeperPostgresHandler { Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))), Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!( - "WalAcceptor thread panicked", + "WalAcceptor task panicked", ))), } } @@ -154,10 +153,12 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { } }; - *self.acceptor_handle = Some( - WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, self.conn_id) - .context("spawn WalAcceptor thread")?, - ); + *self.acceptor_handle = Some(WalAcceptor::spawn( + tli.clone(), + msg_rx, + reply_tx, + self.conn_id, + )); // Forward all messages to WalAcceptor read_network_loop(self.pgb_reader, msg_tx, next_msg).await @@ -226,28 +227,19 @@ impl WalAcceptor { msg_rx: Receiver, reply_tx: Sender, conn_id: ConnectionId, - ) -> anyhow::Result>> { - let thread_name = format!("WAL acceptor {}", tli.ttid); - thread::Builder::new() - .name(thread_name) - .spawn(move || -> anyhow::Result<()> { - let mut wa = WalAcceptor { - tli, - msg_rx, - reply_tx, - }; + ) -> JoinHandle> { + task::spawn(async move { + let mut wa = WalAcceptor { + tli, + msg_rx, + reply_tx, + }; - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let span_ttid = wa.tli.ttid; // satisfy borrow checker - runtime.block_on( - wa.run() - .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid)), - ) - }) - .map_err(anyhow::Error::from) + let span_ttid = wa.tli.ttid; // satisfy borrow checker + wa.run() + .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid)) + .await + }) } /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed; @@ -281,7 +273,7 @@ impl WalAcceptor { while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg { let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); - if let Some(reply) = self.tli.process_msg(&noflush_msg)? { + if let Some(reply) = self.tli.process_msg(&noflush_msg).await? { if self.reply_tx.send(reply).await.is_err() { return Ok(()); // chan closed, streaming terminated } @@ -300,10 +292,12 @@ impl WalAcceptor { } // flush all written WAL to the disk - self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? + self.tli + .process_msg(&ProposerAcceptorMessage::FlushWAL) + .await? } else { // process message other than AppendRequest - self.tli.process_msg(&next_msg)? + self.tli.process_msg(&next_msg).await? }; if let Some(reply) = reply_msg { @@ -326,8 +320,8 @@ impl Drop for ComputeConnectionGuard { let tli = self.timeline.clone(); // tokio forbids to call blocking_send inside the runtime, and see // comments in on_compute_disconnect why we call blocking_send. - spawn_blocking(move || { - if let Err(e) = tli.on_compute_disconnect() { + tokio::spawn(async move { + if let Err(e) = tli.on_compute_disconnect().await { error!("failed to unregister compute connection: {}", e); } }); diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index ad9d655fae02..3306f0b63a39 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -1,29 +1,36 @@ //! Thread removing old WAL. -use std::{thread, time::Duration}; +use std::time::Duration; +use tokio::time::sleep; use tracing::*; use crate::{GlobalTimelines, SafeKeeperConf}; -pub fn thread_main(conf: SafeKeeperConf) { +pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { let wal_removal_interval = Duration::from_millis(5000); loop { let tlis = GlobalTimelines::get_all(); for tli in &tlis { - if !tli.is_active() { + if !tli.is_active().await { continue; } let ttid = tli.ttid; - let _enter = - info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered(); - if let Err(e) = tli.maybe_pesist_control_file() { + if let Err(e) = tli + .maybe_persist_control_file() + .instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id)) + .await + { warn!("failed to persist control file: {e}"); } - if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) { - warn!("failed to remove WAL: {}", e); + if let Err(e) = tli + .remove_old_wal(conf.wal_backup_enabled) + .instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id)) + .await + { + error!("failed to remove WAL: {}", e); } } - thread::sleep(wal_removal_interval) + sleep(wal_removal_interval).await; } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 7378ccb994f5..d0b14a1282ef 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -568,25 +568,27 @@ where /// Process message from proposer and possibly form reply. Concurrent /// callers must exclude each other. - pub fn process_msg( + pub async fn process_msg( &mut self, msg: &ProposerAcceptorMessage, ) -> Result> { match msg { - ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg), - ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg), - ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg), - ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg, true), + ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await, + ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await, + ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await, + ProposerAcceptorMessage::AppendRequest(msg) => { + self.handle_append_request(msg, true).await + } ProposerAcceptorMessage::NoFlushAppendRequest(msg) => { - self.handle_append_request(msg, false) + self.handle_append_request(msg, false).await } - ProposerAcceptorMessage::FlushWAL => self.handle_flush(), + ProposerAcceptorMessage::FlushWAL => self.handle_flush().await, } } /// Handle initial message from proposer: check its sanity and send my /// current term. - fn handle_greeting( + async fn handle_greeting( &mut self, msg: &ProposerGreeting, ) -> Result> { @@ -649,7 +651,7 @@ where if msg.pg_version != UNKNOWN_SERVER_VERSION { state.server.pg_version = msg.pg_version; } - self.state.persist(&state)?; + self.state.persist(&state).await?; } info!( @@ -664,7 +666,7 @@ where } /// Give vote for the given term, if we haven't done that previously. - fn handle_vote_request( + async fn handle_vote_request( &mut self, msg: &VoteRequest, ) -> Result> { @@ -678,7 +680,7 @@ where // handle_elected instead. Currently not a big deal, as proposer is the // only source of WAL; with peer2peer recovery it would be more // important. - self.wal_store.flush_wal()?; + self.wal_store.flush_wal().await?; // initialize with refusal let mut resp = VoteResponse { term: self.state.acceptor_state.term, @@ -692,7 +694,7 @@ where let mut state = self.state.clone(); state.acceptor_state.term = msg.term; // persist vote before sending it out - self.state.persist(&state)?; + self.state.persist(&state).await?; resp.term = self.state.acceptor_state.term; resp.vote_given = true as u64; @@ -715,12 +717,15 @@ where ar } - fn handle_elected(&mut self, msg: &ProposerElected) -> Result> { + async fn handle_elected( + &mut self, + msg: &ProposerElected, + ) -> Result> { info!("received ProposerElected {:?}", msg); if self.state.acceptor_state.term < msg.term { let mut state = self.state.clone(); state.acceptor_state.term = msg.term; - self.state.persist(&state)?; + self.state.persist(&state).await?; } // If our term is higher, ignore the message (next feedback will inform the compute) @@ -750,7 +755,7 @@ where // intersection of our history and history from msg // truncate wal, update the LSNs - self.wal_store.truncate_wal(msg.start_streaming_at)?; + self.wal_store.truncate_wal(msg.start_streaming_at).await?; // and now adopt term history from proposer { @@ -784,7 +789,7 @@ where self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn); state.acceptor_state.term_history = msg.term_history.clone(); - self.persist_control_file(state)?; + self.persist_control_file(state).await?; } info!("start receiving WAL since {:?}", msg.start_streaming_at); @@ -796,7 +801,7 @@ where /// /// Note: it is assumed that 'WAL we have is from the right term' check has /// already been done outside. - fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> { + async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> { // Both peers and walproposer communicate this value, we might already // have a fresher (higher) version. candidate = max(candidate, self.inmem.commit_lsn); @@ -818,29 +823,32 @@ where // that we receive new epoch_start_lsn, and we still need to sync // control file in this case. if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn { - self.persist_control_file(self.state.clone())?; + self.persist_control_file(self.state.clone()).await?; } Ok(()) } /// Persist control file to disk, called only after timeline creation (bootstrap). - pub fn persist(&mut self) -> Result<()> { - self.persist_control_file(self.state.clone()) + pub async fn persist(&mut self) -> Result<()> { + self.persist_control_file(self.state.clone()).await } /// Persist in-memory state to the disk, taking other data from state. - fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> { + async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> { state.commit_lsn = self.inmem.commit_lsn; state.backup_lsn = self.inmem.backup_lsn; state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; state.proposer_uuid = self.inmem.proposer_uuid; - self.state.persist(&state) + self.state.persist(&state).await } /// Persist control file if there is something to save and enough time /// passed after the last save. - pub fn maybe_persist_control_file(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> { + pub async fn maybe_persist_control_file( + &mut self, + inmem_remote_consistent_lsn: Lsn, + ) -> Result<()> { const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL { return Ok(()); @@ -852,7 +860,7 @@ where if need_persist { let mut state = self.state.clone(); state.remote_consistent_lsn = inmem_remote_consistent_lsn; - self.persist_control_file(state)?; + self.persist_control_file(state).await?; trace!("saved control file: {CF_SAVE_INTERVAL:?} passed"); } Ok(()) @@ -860,7 +868,7 @@ where /// Handle request to append WAL. #[allow(clippy::comparison_chain)] - fn handle_append_request( + async fn handle_append_request( &mut self, msg: &AppendRequest, require_flush: bool, @@ -883,17 +891,19 @@ where // do the job if !msg.wal_data.is_empty() { - self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; + self.wal_store + .write_wal(msg.h.begin_lsn, &msg.wal_data) + .await?; } // flush wal to the disk, if required if require_flush { - self.wal_store.flush_wal()?; + self.wal_store.flush_wal().await?; } // Update commit_lsn. if msg.h.commit_lsn != Lsn(0) { - self.update_commit_lsn(msg.h.commit_lsn)?; + self.update_commit_lsn(msg.h.commit_lsn).await?; } // Value calculated by walproposer can always lag: // - safekeepers can forget inmem value and send to proposer lower @@ -909,7 +919,7 @@ where if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64) < self.inmem.peer_horizon_lsn { - self.persist_control_file(self.state.clone())?; + self.persist_control_file(self.state.clone()).await?; } trace!( @@ -931,15 +941,15 @@ where } /// Flush WAL to disk. Return AppendResponse with latest LSNs. - fn handle_flush(&mut self) -> Result> { - self.wal_store.flush_wal()?; + async fn handle_flush(&mut self) -> Result> { + self.wal_store.flush_wal().await?; Ok(Some(AcceptorProposerMessage::AppendResponse( self.append_response(), ))) } /// Update timeline state with peer safekeeper data. - pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> { + pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> { let mut sync_control_file = false; if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) { @@ -947,7 +957,7 @@ where // commit_lsn if our history matches (is part of) history of advanced // commit_lsn provider. if sk_info.last_log_term == self.get_epoch() { - self.update_commit_lsn(Lsn(sk_info.commit_lsn))?; + self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?; } } @@ -973,7 +983,7 @@ where // Note: we could make remote_consistent_lsn update in cf common by // storing Arc to walsenders in Safekeeper. state.remote_consistent_lsn = new_remote_consistent_lsn; - self.persist_control_file(state)?; + self.persist_control_file(state).await?; } Ok(()) } @@ -997,6 +1007,7 @@ where #[cfg(test)] mod tests { + use futures::future::BoxFuture; use postgres_ffi::WAL_SEGMENT_SIZE; use super::*; @@ -1008,8 +1019,9 @@ mod tests { persisted_state: SafeKeeperState, } + #[async_trait::async_trait] impl control_file::Storage for InMemoryState { - fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { + async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { self.persisted_state = s.clone(); Ok(()) } @@ -1039,27 +1051,28 @@ mod tests { lsn: Lsn, } + #[async_trait::async_trait] impl wal_storage::Storage for DummyWalStore { fn flush_lsn(&self) -> Lsn { self.lsn } - fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { self.lsn = startpos + buf.len() as u64; Ok(()) } - fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { + async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { self.lsn = end_pos; Ok(()) } - fn flush_wal(&mut self) -> Result<()> { + async fn flush_wal(&mut self) -> Result<()> { Ok(()) } - fn remove_up_to(&self) -> Box Result<()>> { - Box::new(move |_segno_up_to: XLogSegNo| Ok(())) + fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> { + Box::pin(async { Ok(()) }) } fn get_metrics(&self) -> crate::metrics::WalStorageMetrics { @@ -1067,8 +1080,8 @@ mod tests { } } - #[test] - fn test_voting() { + #[tokio::test] + async fn test_voting() { let storage = InMemoryState { persisted_state: test_sk_state(), }; @@ -1077,7 +1090,7 @@ mod tests { // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); - let mut vote_resp = sk.process_msg(&vote_request); + let mut vote_resp = sk.process_msg(&vote_request).await; match vote_resp.unwrap() { Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0), r => panic!("unexpected response: {:?}", r), @@ -1092,15 +1105,15 @@ mod tests { sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap(); // and ensure voting second time for 1 is not ok - vote_resp = sk.process_msg(&vote_request); + vote_resp = sk.process_msg(&vote_request).await; match vote_resp.unwrap() { Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0), r => panic!("unexpected response: {:?}", r), } } - #[test] - fn test_epoch_switch() { + #[tokio::test] + async fn test_epoch_switch() { let storage = InMemoryState { persisted_state: test_sk_state(), }; @@ -1132,10 +1145,13 @@ mod tests { timeline_start_lsn: Lsn(0), }; sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) + .await .unwrap(); // check that AppendRequest before epochStartLsn doesn't switch epoch - let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); + let resp = sk + .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) + .await; assert!(resp.is_ok()); assert_eq!(sk.get_epoch(), 0); @@ -1146,9 +1162,11 @@ mod tests { h: ar_hdr, wal_data: Bytes::from_static(b"b"), }; - let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); + let resp = sk + .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) + .await; assert!(resp.is_ok()); - sk.wal_store.truncate_wal(Lsn(3)).unwrap(); // imitate the complete record at 3 %) + sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %) assert_eq!(sk.get_epoch(), 1); } } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index fb420cba6489..abca0a86b1f8 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -396,7 +396,7 @@ impl SafekeeperPostgresHandler { // on this safekeeper itself. That's ok as (old) proposer will never be // able to commit such WAL. let stop_pos: Option = if self.is_walproposer_recovery() { - let wal_end = tli.get_flush_lsn(); + let wal_end = tli.get_flush_lsn().await; Some(wal_end) } else { None @@ -418,7 +418,7 @@ impl SafekeeperPostgresHandler { // switch to copy pgb.write_message(&BeMessage::CopyBothResponse).await?; - let (_, persisted_state) = tli.get_state(); + let (_, persisted_state) = tli.get_state().await; let wal_reader = WalReader::new( self.conf.workdir.clone(), self.conf.timeline_dir(&tli.ttid), @@ -562,7 +562,7 @@ impl WalSender<'_, IO> { .walsenders .get_ws_remote_consistent_lsn(self.ws_guard.id) { - if self.tli.should_walsender_stop(remote_consistent_lsn) { + if self.tli.should_walsender_stop(remote_consistent_lsn).await { // Terminate if there is nothing more to send. return Err(CopyStreamHandlerEnd::ServerInitiated(format!( "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 941f8dae547e..52c3e8d4be50 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -2,12 +2,13 @@ //! to glue together SafeKeeper and all other background services. use anyhow::{anyhow, bail, Result}; -use parking_lot::{Mutex, MutexGuard}; use postgres_ffi::XLogSegNo; +use tokio::fs; use std::cmp::max; use std::path::PathBuf; use std::sync::Arc; +use tokio::sync::{Mutex, MutexGuard}; use tokio::{ sync::{mpsc::Sender, watch}, time::Instant, @@ -286,8 +287,9 @@ pub struct Timeline { commit_lsn_watch_tx: watch::Sender, commit_lsn_watch_rx: watch::Receiver, - /// Safekeeper and other state, that should remain consistent and synchronized - /// with the disk. + /// Safekeeper and other state, that should remain consistent and + /// synchronized with the disk. This is tokio mutex as we write WAL to disk + /// while holding it, ensuring that consensus checks are in order. mutex: Mutex, walsenders: Arc, @@ -361,8 +363,8 @@ impl Timeline { /// /// Bootstrap is transactional, so if it fails, created files will be deleted, /// and state on disk should remain unchanged. - pub fn bootstrap(&self, shared_state: &mut MutexGuard) -> Result<()> { - match std::fs::metadata(&self.timeline_dir) { + pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> { + match fs::metadata(&self.timeline_dir).await { Ok(_) => { // Timeline directory exists on disk, we should leave state unchanged // and return error. @@ -375,53 +377,51 @@ impl Timeline { } // Create timeline directory. - std::fs::create_dir_all(&self.timeline_dir)?; + fs::create_dir_all(&self.timeline_dir).await?; // Write timeline to disk and TODO: start background tasks. - match || -> Result<()> { - shared_state.sk.persist()?; - // TODO: add more initialization steps here - self.update_status(shared_state); - Ok(()) - }() { - Ok(_) => Ok(()), - Err(e) => { - // Bootstrap failed, cancel timeline and remove timeline directory. - self.cancel(shared_state); - - if let Err(fs_err) = std::fs::remove_dir_all(&self.timeline_dir) { - warn!( - "failed to remove timeline {} directory after bootstrap failure: {}", - self.ttid, fs_err - ); - } - - Err(e) + if let Err(e) = shared_state.sk.persist().await { + // Bootstrap failed, cancel timeline and remove timeline directory. + self.cancel(shared_state); + + if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await { + warn!( + "failed to remove timeline {} directory after bootstrap failure: {}", + self.ttid, fs_err + ); } + + return Err(e); } + + // TODO: add more initialization steps here + self.update_status(shared_state); + Ok(()) } /// Delete timeline from disk completely, by removing timeline directory. Background /// timeline activities will stop eventually. - pub fn delete_from_disk( + pub async fn delete_from_disk( &self, - shared_state: &mut MutexGuard, + shared_state: &mut MutexGuard<'_, SharedState>, ) -> Result<(bool, bool)> { let was_active = shared_state.active; self.cancel(shared_state); - let dir_existed = delete_dir(&self.timeline_dir)?; + let dir_existed = delete_dir(&self.timeline_dir).await?; Ok((dir_existed, was_active)) } /// Cancel timeline to prevent further usage. Background tasks will stop /// eventually after receiving cancellation signal. - fn cancel(&self, shared_state: &mut MutexGuard) { + /// + /// Note that we can't notify backup launcher here while holding + /// shared_state lock, as this is a potential deadlock: caller is + /// responsible for that. Generally we should probably make WAL backup tasks + /// to shut down on their own, checking once in a while whether it is the + /// time. + fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) { info!("timeline {} is cancelled", self.ttid); let _ = self.cancellation_tx.send(true); - let res = self.wal_backup_launcher_tx.blocking_send(self.ttid); - if let Err(e) = res { - error!("Failed to send stop signal to wal_backup_launcher: {}", e); - } // Close associated FDs. Nobody will be able to touch timeline data once // it is cancelled, so WAL storage won't be opened again. shared_state.sk.wal_store.close(); @@ -433,8 +433,8 @@ impl Timeline { } /// Take a writing mutual exclusive lock on timeline shared_state. - pub fn write_shared_state(&self) -> MutexGuard { - self.mutex.lock() + pub async fn write_shared_state(&self) -> MutexGuard { + self.mutex.lock().await } fn update_status(&self, shared_state: &mut SharedState) -> bool { @@ -450,7 +450,7 @@ impl Timeline { let is_wal_backup_action_pending: bool; { - let mut shared_state = self.write_shared_state(); + let mut shared_state = self.write_shared_state().await; shared_state.num_computes += 1; is_wal_backup_action_pending = self.update_status(&mut shared_state); } @@ -464,22 +464,17 @@ impl Timeline { /// De-register compute connection, shutting down timeline activity if /// pageserver doesn't need catchup. - pub fn on_compute_disconnect(&self) -> Result<()> { + pub async fn on_compute_disconnect(&self) -> Result<()> { let is_wal_backup_action_pending: bool; { - let mut shared_state = self.write_shared_state(); + let mut shared_state = self.write_shared_state().await; shared_state.num_computes -= 1; is_wal_backup_action_pending = self.update_status(&mut shared_state); } // Wake up wal backup launcher, if it is time to stop the offloading. if is_wal_backup_action_pending { // Can fail only if channel to a static thread got closed, which is not normal at all. - // - // Note: this is blocking_send because on_compute_disconnect is called in Drop, there is - // no async Drop and we use current thread runtimes. With current thread rt spawning - // task in drop impl is racy, as thread along with runtime might finish before the task. - // This should be switched send.await when/if we go to full async. - self.wal_backup_launcher_tx.blocking_send(self.ttid)?; + self.wal_backup_launcher_tx.send(self.ttid).await?; } Ok(()) } @@ -489,11 +484,11 @@ impl Timeline { /// computes. While there might be nothing to stream already, we learn about /// remote_consistent_lsn update through replication feedback, and we want /// to stop pushing to the broker if pageserver is fully caughtup. - pub fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool { + pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool { if self.is_cancelled() { return true; } - let shared_state = self.write_shared_state(); + let shared_state = self.write_shared_state().await; if shared_state.num_computes == 0 { return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn; @@ -503,12 +498,12 @@ impl Timeline { /// Returns whether s3 offloading is required and sets current status as /// matching it. - pub fn wal_backup_attend(&self) -> bool { + pub async fn wal_backup_attend(&self) -> bool { if self.is_cancelled() { return false; } - self.write_shared_state().wal_backup_attend() + self.write_shared_state().await.wal_backup_attend() } /// Returns commit_lsn watch channel. @@ -517,7 +512,7 @@ impl Timeline { } /// Pass arrived message to the safekeeper. - pub fn process_msg( + pub async fn process_msg( &self, msg: &ProposerAcceptorMessage, ) -> Result> { @@ -528,8 +523,8 @@ impl Timeline { let mut rmsg: Option; let commit_lsn: Lsn; { - let mut shared_state = self.write_shared_state(); - rmsg = shared_state.sk.process_msg(msg)?; + let mut shared_state = self.write_shared_state().await; + rmsg = shared_state.sk.process_msg(msg).await?; // if this is AppendResponse, fill in proper pageserver and hot // standby feedback. @@ -546,37 +541,37 @@ impl Timeline { } /// Returns wal_seg_size. - pub fn get_wal_seg_size(&self) -> usize { - self.write_shared_state().get_wal_seg_size() + pub async fn get_wal_seg_size(&self) -> usize { + self.write_shared_state().await.get_wal_seg_size() } /// Returns true only if the timeline is loaded and active. - pub fn is_active(&self) -> bool { + pub async fn is_active(&self) -> bool { if self.is_cancelled() { return false; } - self.write_shared_state().active + self.write_shared_state().await.active } /// Returns state of the timeline. - pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { - let state = self.write_shared_state(); + pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { + let state = self.write_shared_state().await; (state.sk.inmem.clone(), state.sk.state.clone()) } /// Returns latest backup_lsn. - pub fn get_wal_backup_lsn(&self) -> Lsn { - self.write_shared_state().sk.inmem.backup_lsn + pub async fn get_wal_backup_lsn(&self) -> Lsn { + self.write_shared_state().await.sk.inmem.backup_lsn } /// Sets backup_lsn to the given value. - pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> { + pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> { if self.is_cancelled() { bail!(TimelineError::Cancelled(self.ttid)); } - let mut state = self.write_shared_state(); + let mut state = self.write_shared_state().await; state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn); // we should check whether to shut down offloader, but this will be done // soon by peer communication anyway. @@ -584,8 +579,8 @@ impl Timeline { } /// Get safekeeper info for broadcasting to broker and other peers. - pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo { - let shared_state = self.write_shared_state(); + pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo { + let shared_state = self.write_shared_state().await; shared_state.get_safekeeper_info( &self.ttid, conf, @@ -604,8 +599,8 @@ impl Timeline { let is_wal_backup_action_pending: bool; let commit_lsn: Lsn; { - let mut shared_state = self.write_shared_state(); - shared_state.sk.record_safekeeper_info(&sk_info)?; + let mut shared_state = self.write_shared_state().await; + shared_state.sk.record_safekeeper_info(&sk_info).await?; let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now()); shared_state.peers_info.upsert(&peer_info); is_wal_backup_action_pending = self.update_status(&mut shared_state); @@ -622,8 +617,8 @@ impl Timeline { /// Get our latest view of alive peers status on the timeline. /// We pass our own info through the broker as well, so when we don't have connection /// to the broker returned vec is empty. - pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { - let shared_state = self.write_shared_state(); + pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { + let shared_state = self.write_shared_state().await; let now = Instant::now(); shared_state .peers_info @@ -640,34 +635,34 @@ impl Timeline { } /// Returns flush_lsn. - pub fn get_flush_lsn(&self) -> Lsn { - self.write_shared_state().sk.wal_store.flush_lsn() + pub async fn get_flush_lsn(&self) -> Lsn { + self.write_shared_state().await.sk.wal_store.flush_lsn() } /// Delete WAL segments from disk that are no longer needed. This is determined /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn. - pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> { + pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> { if self.is_cancelled() { bail!(TimelineError::Cancelled(self.ttid)); } let horizon_segno: XLogSegNo; - let remover: Box Result<(), anyhow::Error>>; - { - let shared_state = self.write_shared_state(); + let remover = { + let shared_state = self.write_shared_state().await; horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled); - remover = shared_state.sk.wal_store.remove_up_to(); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { - return Ok(()); + return Ok(()); // nothing to do } + let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1); // release the lock before removing - } + remover + }; // delete old WAL files - remover(horizon_segno - 1)?; + remover.await?; // update last_removed_segno - let mut shared_state = self.write_shared_state(); + let mut shared_state = self.write_shared_state().await; shared_state.last_removed_segno = horizon_segno; Ok(()) } @@ -676,22 +671,24 @@ impl Timeline { /// passed after the last save. This helps to keep remote_consistent_lsn up /// to date so that storage nodes restart doesn't cause many pageserver -> /// safekeeper reconnections. - pub fn maybe_pesist_control_file(&self) -> Result<()> { + pub async fn maybe_persist_control_file(&self) -> Result<()> { let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn(); self.write_shared_state() + .await .sk .maybe_persist_control_file(remote_consistent_lsn) + .await } - /// Returns full timeline info, required for the metrics. If the timeline is - /// not active, returns None instead. - pub fn info_for_metrics(&self) -> Option { + /// Gather timeline data for metrics. If the timeline is not active, returns + /// None, we do not collect these. + pub async fn info_for_metrics(&self) -> Option { if self.is_cancelled() { return None; } let ps_feedback = self.walsenders.get_ps_feedback(); - let state = self.write_shared_state(); + let state = self.write_shared_state().await; if state.active { Some(FullTimelineInfo { ttid: self.ttid, @@ -713,8 +710,8 @@ impl Timeline { } /// Returns in-memory timeline state to build a full debug dump. - pub fn memory_dump(&self) -> debug_dump::Memory { - let state = self.write_shared_state(); + pub async fn memory_dump(&self) -> debug_dump::Memory { + let state = self.write_shared_state().await; let (write_lsn, write_record_lsn, flush_lsn, file_open) = state.sk.wal_store.internal_state(); @@ -738,8 +735,8 @@ impl Timeline { } /// Deletes directory and it's contents. Returns false if directory does not exist. -fn delete_dir(path: &PathBuf) -> Result { - match std::fs::remove_dir_all(path) { +async fn delete_dir(path: &PathBuf) -> Result { + match fs::remove_dir_all(path).await { Ok(_) => Ok(true), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false), Err(e) => Err(e.into()), diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 41809794dc55..f2d5df8744c1 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -113,9 +113,17 @@ impl GlobalTimelines { Ok(()) } - /// Loads all timelines for the given tenant to memory. Returns fs::read_dir errors if any. + /// Loads all timelines for the given tenant to memory. Returns fs::read_dir + /// errors if any. + /// + /// Note: This function (and all reading/loading below) is sync because + /// timelines are loaded while holding GlobalTimelinesState lock. Which is + /// fine as this is called only from single threaded main runtime on boot, + /// but clippy complains anyway, and suppressing that isn't trivial as async + /// is the keyword, ha. That only other user is pull_timeline.rs for which + /// being blocked is not that bad, and we can do spawn_blocking. fn load_tenant_timelines( - state: &mut MutexGuard, + state: &mut MutexGuard<'_, GlobalTimelinesState>, tenant_id: TenantId, ) -> Result<()> { let timelines_dir = state.get_conf().tenant_dir(&tenant_id); @@ -220,7 +228,7 @@ impl GlobalTimelines { // Take a lock and finish the initialization holding this mutex. No other threads // can interfere with creation after we will insert timeline into the map. { - let mut shared_state = timeline.write_shared_state(); + let mut shared_state = timeline.write_shared_state().await; // We can get a race condition here in case of concurrent create calls, but only // in theory. create() will return valid timeline on the next try. @@ -232,7 +240,7 @@ impl GlobalTimelines { // Write the new timeline to the disk and start background workers. // Bootstrap is transactional, so if it fails, the timeline will be deleted, // and the state on disk should remain unchanged. - if let Err(e) = timeline.bootstrap(&mut shared_state) { + if let Err(e) = timeline.bootstrap(&mut shared_state).await { // Note: the most likely reason for bootstrap failure is that the timeline // directory already exists on disk. This happens when timeline is corrupted // and wasn't loaded from disk on startup because of that. We want to preserve @@ -294,15 +302,16 @@ impl GlobalTimelines { } /// Cancels timeline, then deletes the corresponding data directory. - pub fn delete_force(ttid: &TenantTimelineId) -> Result { + pub async fn delete_force(ttid: &TenantTimelineId) -> Result { let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid); match tli_res { Ok(timeline) => { // Take a lock and finish the deletion holding this mutex. - let mut shared_state = timeline.write_shared_state(); + let mut shared_state = timeline.write_shared_state().await; info!("deleting timeline {}", ttid); - let (dir_existed, was_active) = timeline.delete_from_disk(&mut shared_state)?; + let (dir_existed, was_active) = + timeline.delete_from_disk(&mut shared_state).await?; // Remove timeline from the map. // FIXME: re-enable it once we fix the issue with recreation of deleted timelines @@ -335,7 +344,7 @@ impl GlobalTimelines { /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are /// created simultaneously. In that case the function will return error and the caller should /// retry tenant deletion again later. - pub fn delete_force_all_for_tenant( + pub async fn delete_force_all_for_tenant( tenant_id: &TenantId, ) -> Result> { info!("deleting all timelines for tenant {}", tenant_id); @@ -345,7 +354,7 @@ impl GlobalTimelines { let mut deleted = HashMap::new(); for tli in &to_delete { - match Self::delete_force(&tli.ttid) { + match Self::delete_force(&tli.ttid).await { Ok(result) => { deleted.insert(tli.ttid, result); } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 4d341a7ef831..eae3f3fe86a9 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -17,7 +17,6 @@ use postgres_ffi::XLogFileName; use postgres_ffi::{XLogSegNo, PG_TLI}; use remote_storage::{GenericRemoteStorage, RemotePath}; use tokio::fs::File; -use tokio::runtime::Builder; use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -36,30 +35,16 @@ use once_cell::sync::OnceCell; const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000; -pub fn wal_backup_launcher_thread_main( - conf: SafeKeeperConf, - wal_backup_launcher_rx: Receiver, -) { - let mut builder = Builder::new_multi_thread(); - if let Some(num_threads) = conf.backup_runtime_threads { - builder.worker_threads(num_threads); - } - let rt = builder - .enable_all() - .build() - .expect("failed to create wal backup runtime"); - - rt.block_on(async { - wal_backup_launcher_main_loop(conf, wal_backup_launcher_rx).await; - }); -} - /// Check whether wal backup is required for timeline. If yes, mark that launcher is /// aware of current status and return the timeline. -fn is_wal_backup_required(ttid: TenantTimelineId) -> Option> { - GlobalTimelines::get(ttid) - .ok() - .filter(|tli| tli.wal_backup_attend()) +async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option> { + match GlobalTimelines::get(ttid).ok() { + Some(tli) => { + tli.wal_backup_attend().await; + Some(tli) + } + None => None, + } } struct WalBackupTaskHandle { @@ -143,8 +128,8 @@ async fn update_task( ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry, ) { - let alive_peers = entry.timeline.get_peers(conf); - let wal_backup_lsn = entry.timeline.get_wal_backup_lsn(); + let alive_peers = entry.timeline.get_peers(conf).await; + let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await; let (offloader, election_dbg_str) = determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf); let elected_me = Some(conf.my_id) == offloader; @@ -183,10 +168,10 @@ const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000; /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup /// tasks. Having this in separate task simplifies locking, allows to reap /// panics and separate elections from offloading itself. -async fn wal_backup_launcher_main_loop( +pub async fn wal_backup_launcher_task_main( conf: SafeKeeperConf, mut wal_backup_launcher_rx: Receiver, -) { +) -> anyhow::Result<()> { info!( "WAL backup launcher started, remote config {:?}", conf.remote_storage @@ -214,7 +199,7 @@ async fn wal_backup_launcher_main_loop( if conf.remote_storage.is_none() || !conf.wal_backup_enabled { continue; /* just drain the channel and do nothing */ } - let timeline = is_wal_backup_required(ttid); + let timeline = is_wal_backup_required(ttid).await; // do we need to do anything at all? if timeline.is_some() != tasks.contains_key(&ttid) { if let Some(timeline) = timeline { @@ -269,7 +254,7 @@ async fn backup_task_main( let tli = res.unwrap(); let mut wb = WalBackupTask { - wal_seg_size: tli.get_wal_seg_size(), + wal_seg_size: tli.get_wal_seg_size().await, commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), timeline: tli, timeline_dir, @@ -326,7 +311,7 @@ impl WalBackupTask { continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ } // Perhaps peers advanced the position, check shmem value. - backup_lsn = self.timeline.get_wal_backup_lsn(); + backup_lsn = self.timeline.get_wal_backup_lsn().await; if backup_lsn.segment_number(self.wal_seg_size) >= commit_lsn.segment_number(self.wal_seg_size) { @@ -402,6 +387,7 @@ pub async fn backup_lsn_range( let new_backup_lsn = segment.end_lsn; timeline .set_wal_backup_lsn(new_backup_lsn) + .await .context("setting wal_backup_lsn")?; *backup_lsn = new_backup_lsn; } else { diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index fb0d77a9f22a..406132b2b045 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -4,7 +4,7 @@ //! use anyhow::{Context, Result}; use postgres_backend::QueryError; -use std::{future, thread, time::Duration}; +use std::{future, time::Duration}; use tokio::net::TcpStream; use tokio_io_timeout::TimeoutReader; use tracing::*; @@ -16,104 +16,82 @@ use crate::SafeKeeperConf; use postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. -pub fn thread_main(conf: SafeKeeperConf, pg_listener: std::net::TcpListener) { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .context("create runtime") - // todo catch error in main thread - .expect("failed to create runtime"); +pub async fn task_main( + conf: SafeKeeperConf, + pg_listener: std::net::TcpListener, +) -> anyhow::Result<()> { + // Tokio's from_std won't do this for us, per its comment. + pg_listener.set_nonblocking(true)?; - runtime - .block_on(async move { - // Tokio's from_std won't do this for us, per its comment. - pg_listener.set_nonblocking(true)?; - let listener = tokio::net::TcpListener::from_std(pg_listener)?; - let mut connection_count: ConnectionCount = 0; + let listener = tokio::net::TcpListener::from_std(pg_listener)?; + let mut connection_count: ConnectionCount = 0; - loop { - match listener.accept().await { - Ok((socket, peer_addr)) => { - debug!("accepted connection from {}", peer_addr); - let conf = conf.clone(); - let conn_id = issue_connection_id(&mut connection_count); + loop { + let (socket, peer_addr) = listener.accept().await.context("accept")?; + debug!("accepted connection from {}", peer_addr); + let conf = conf.clone(); + let conn_id = issue_connection_id(&mut connection_count); - let _ = thread::Builder::new() - .name("WAL service thread".into()) - .spawn(move || { - if let Err(err) = handle_socket(socket, conf, conn_id) { - error!("connection handler exited: {}", err); - } - }) - .unwrap(); - } - Err(e) => error!("Failed to accept connection: {}", e), - } + tokio::spawn(async move { + if let Err(err) = handle_socket(socket, conf, conn_id) + .instrument(info_span!("", cid = %conn_id)) + .await + { + error!("connection handler exited: {}", err); } - #[allow(unreachable_code)] // hint compiler the closure return type - Ok::<(), anyhow::Error>(()) - }) - .expect("listener failed") + }); + } } -/// This is run by `thread_main` above, inside a background thread. +/// This is run by `task_main` above, inside a background thread. /// -fn handle_socket( +async fn handle_socket( socket: TcpStream, conf: SafeKeeperConf, conn_id: ConnectionId, ) -> Result<(), QueryError> { - let _enter = info_span!("", cid = %conn_id).entered(); - - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - socket.set_nodelay(true)?; let peer_addr = socket.peer_addr()?; - // TimeoutReader wants async runtime during creation. - runtime.block_on(async move { - // Set timeout on reading from the socket. It prevents hanged up connection - // if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by - // default, and tokio doesn't provide ability to set it out of the box. - let mut socket = TimeoutReader::new(socket); - let wal_service_timeout = Duration::from_secs(60 * 10); - socket.set_timeout(Some(wal_service_timeout)); - // pin! is here because TimeoutReader (due to storing sleep future inside) - // is not Unpin, and all pgbackend/framed/tokio dependencies require stream - // to be Unpin. Which is reasonable, as indeed something like TimeoutReader - // shouldn't be moved. - tokio::pin!(socket); + // Set timeout on reading from the socket. It prevents hanged up connection + // if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by + // default, and tokio doesn't provide ability to set it out of the box. + let mut socket = TimeoutReader::new(socket); + let wal_service_timeout = Duration::from_secs(60 * 10); + socket.set_timeout(Some(wal_service_timeout)); + // pin! is here because TimeoutReader (due to storing sleep future inside) + // is not Unpin, and all pgbackend/framed/tokio dependencies require stream + // to be Unpin. Which is reasonable, as indeed something like TimeoutReader + // shouldn't be moved. + tokio::pin!(socket); - let traffic_metrics = TrafficMetrics::new(); - if let Some(current_az) = conf.availability_zone.as_deref() { - traffic_metrics.set_sk_az(current_az); - } + let traffic_metrics = TrafficMetrics::new(); + if let Some(current_az) = conf.availability_zone.as_deref() { + traffic_metrics.set_sk_az(current_az); + } - let socket = MeasuredStream::new( - socket, - |cnt| { - traffic_metrics.observe_read(cnt); - }, - |cnt| { - traffic_metrics.observe_write(cnt); - }, - ); + let socket = MeasuredStream::new( + socket, + |cnt| { + traffic_metrics.observe_read(cnt); + }, + |cnt| { + traffic_metrics.observe_write(cnt); + }, + ); - let auth_type = match conf.auth { - None => AuthType::Trust, - Some(_) => AuthType::NeonJWT, - }; - let mut conn_handler = - SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone())); - let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; - // libpq protocol between safekeeper and walproposer / pageserver - // We don't use shutdown. - pgbackend - .run(&mut conn_handler, future::pending::<()>) - .await - }) + let auth_type = match conf.auth { + None => AuthType::Trust, + Some(_) => AuthType::NeonJWT, + }; + let mut conn_handler = + SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone())); + let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; + // libpq protocol between safekeeper and walproposer / pageserver + // We don't use shutdown. + pgbackend + .run(&mut conn_handler, future::pending::<()>) + .await } /// Unique WAL service connection ids are logged in spans for observability. diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 644c956fc1c9..e97b212093e4 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -8,54 +8,47 @@ //! Note that last file has `.partial` suffix, that's different from postgres. use anyhow::{bail, Context, Result}; -use remote_storage::RemotePath; - -use std::io::{self, Seek, SeekFrom}; -use std::pin::Pin; -use tokio::io::AsyncRead; - +use bytes::Bytes; +use futures::future::BoxFuture; use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName}; use postgres_ffi::{XLogSegNo, PG_TLI}; +use remote_storage::RemotePath; use std::cmp::{max, min}; - -use bytes::Bytes; -use std::fs::{self, remove_file, File, OpenOptions}; -use std::io::Write; +use std::io::{self, SeekFrom}; use std::path::{Path, PathBuf}; - +use std::pin::Pin; +use tokio::fs::{self, remove_file, File, OpenOptions}; +use tokio::io::{AsyncRead, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tracing::*; -use utils::{id::TenantTimelineId, lsn::Lsn}; - use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS}; use crate::safekeeper::SafeKeeperState; - use crate::wal_backup::read_object; use crate::SafeKeeperConf; +use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::XLogFileName; use postgres_ffi::XLOG_BLCKSZ; - -use postgres_ffi::waldecoder::WalStreamDecoder; - use pq_proto::SystemId; -use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use utils::{id::TenantTimelineId, lsn::Lsn}; +#[async_trait::async_trait] pub trait Storage { /// LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn; /// Write piece of WAL from buf to disk, but not necessarily sync it. - fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>; + async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>; /// Truncate WAL at specified LSN, which must be the end of WAL record. - fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>; + async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>; /// Durably store WAL on disk, up to the last written WAL record. - fn flush_wal(&mut self) -> Result<()>; + async fn flush_wal(&mut self) -> Result<()>; - /// Remove all segments <= given segno. Returns closure as we want to do - /// that without timeline lock. - fn remove_up_to(&self) -> Box Result<()>>; + /// Remove all segments <= given segno. Returns function doing that as we + /// want to perform it without timeline lock. + fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>; /// Release resources associated with the storage -- technically, close FDs. /// Currently we don't remove timelines until restart (#3146), so need to @@ -178,33 +171,37 @@ impl PhysicalStorage { } /// Call fdatasync if config requires so. - fn fdatasync_file(&mut self, file: &mut File) -> Result<()> { + async fn fdatasync_file(&mut self, file: &mut File) -> Result<()> { if !self.conf.no_sync { self.metrics - .observe_flush_seconds(time_io_closure(|| Ok(file.sync_data()?))?); + .observe_flush_seconds(time_io_closure(file.sync_data()).await?); } Ok(()) } /// Call fsync if config requires so. - fn fsync_file(&mut self, file: &mut File) -> Result<()> { + async fn fsync_file(&mut self, file: &mut File) -> Result<()> { if !self.conf.no_sync { self.metrics - .observe_flush_seconds(time_io_closure(|| Ok(file.sync_all()?))?); + .observe_flush_seconds(time_io_closure(file.sync_all()).await?); } Ok(()) } /// Open or create WAL segment file. Caller must call seek to the wanted position. /// Returns `file` and `is_partial`. - fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> { + async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> { let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; // Try to open already completed segment - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await { Ok((file, false)) - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) { + } else if let Ok(file) = OpenOptions::new() + .write(true) + .open(&wal_file_partial_path) + .await + { // Try to open existing partial file Ok((file, true)) } else { @@ -213,35 +210,36 @@ impl PhysicalStorage { .create(true) .write(true) .open(&wal_file_partial_path) + .await .with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?; - write_zeroes(&mut file, self.wal_seg_size)?; - self.fsync_file(&mut file)?; + write_zeroes(&mut file, self.wal_seg_size).await?; + self.fsync_file(&mut file).await?; Ok((file, true)) } } /// Write WAL bytes, which are known to be located in a single WAL segment. - fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { + async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { let mut file = if let Some(file) = self.file.take() { file } else { - let (mut file, is_partial) = self.open_or_create(segno)?; + let (mut file, is_partial) = self.open_or_create(segno).await?; assert!(is_partial, "unexpected write into non-partial segment file"); - file.seek(SeekFrom::Start(xlogoff as u64))?; + file.seek(SeekFrom::Start(xlogoff as u64)).await?; file }; - file.write_all(buf)?; + file.write_all(buf).await?; if xlogoff + buf.len() == self.wal_seg_size { // If we reached the end of a WAL segment, flush and close it. - self.fdatasync_file(&mut file)?; + self.fdatasync_file(&mut file).await?; // Rename partial file to completed file let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; - fs::rename(wal_file_partial_path, wal_file_path)?; + fs::rename(wal_file_partial_path, wal_file_path).await?; } else { // otherwise, file can be reused later self.file = Some(file); @@ -255,11 +253,11 @@ impl PhysicalStorage { /// be flushed separately later. /// /// Updates `write_lsn`. - fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { + async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { if self.write_lsn != pos { // need to flush the file before discarding it if let Some(mut file) = self.file.take() { - self.fdatasync_file(&mut file)?; + self.fdatasync_file(&mut file).await?; } self.write_lsn = pos; @@ -277,7 +275,8 @@ impl PhysicalStorage { buf.len() }; - self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?; + self.write_in_segment(segno, xlogoff, &buf[..bytes_write]) + .await?; self.write_lsn += bytes_write as u64; buf = &buf[bytes_write..]; } @@ -286,6 +285,7 @@ impl PhysicalStorage { } } +#[async_trait::async_trait] impl Storage for PhysicalStorage { /// flush_lsn returns LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn { @@ -293,7 +293,7 @@ impl Storage for PhysicalStorage { } /// Write WAL to disk. - fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { // Disallow any non-sequential writes, which can result in gaps or overwrites. // If we need to move the pointer, use truncate_wal() instead. if self.write_lsn > startpos { @@ -311,7 +311,7 @@ impl Storage for PhysicalStorage { ); } - let write_seconds = time_io_closure(|| self.write_exact(startpos, buf))?; + let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?; // WAL is written, updating write metrics self.metrics.observe_write_seconds(write_seconds); self.metrics.observe_write_bytes(buf.len()); @@ -340,14 +340,14 @@ impl Storage for PhysicalStorage { Ok(()) } - fn flush_wal(&mut self) -> Result<()> { + async fn flush_wal(&mut self) -> Result<()> { if self.flush_record_lsn == self.write_record_lsn { // no need to do extra flush return Ok(()); } if let Some(mut unflushed_file) = self.file.take() { - self.fdatasync_file(&mut unflushed_file)?; + self.fdatasync_file(&mut unflushed_file).await?; self.file = Some(unflushed_file); } else { // We have unflushed data (write_lsn != flush_lsn), but no file. @@ -369,7 +369,7 @@ impl Storage for PhysicalStorage { /// Truncate written WAL by removing all WAL segments after the given LSN. /// end_pos must point to the end of the WAL record. - fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { + async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { // Streaming must not create a hole, so truncate cannot be called on non-written lsn if self.write_lsn != Lsn(0) && end_pos > self.write_lsn { bail!( @@ -387,27 +387,27 @@ impl Storage for PhysicalStorage { // Close previously opened file, if any if let Some(mut unflushed_file) = self.file.take() { - self.fdatasync_file(&mut unflushed_file)?; + self.fdatasync_file(&mut unflushed_file).await?; } let xlogoff = end_pos.segment_offset(self.wal_seg_size); let segno = end_pos.segment_number(self.wal_seg_size); // Remove all segments after the given LSN. - remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?; + remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?; - let (mut file, is_partial) = self.open_or_create(segno)?; + let (mut file, is_partial) = self.open_or_create(segno).await?; // Fill end with zeroes - file.seek(SeekFrom::Start(xlogoff as u64))?; - write_zeroes(&mut file, self.wal_seg_size - xlogoff)?; - self.fdatasync_file(&mut file)?; + file.seek(SeekFrom::Start(xlogoff as u64)).await?; + write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?; + self.fdatasync_file(&mut file).await?; if !is_partial { // Make segment partial once again let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; - fs::rename(wal_file_path, wal_file_partial_path)?; + fs::rename(wal_file_path, wal_file_partial_path).await?; } // Update LSNs @@ -417,11 +417,11 @@ impl Storage for PhysicalStorage { Ok(()) } - fn remove_up_to(&self) -> Box Result<()>> { + fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> { let timeline_dir = self.timeline_dir.clone(); let wal_seg_size = self.wal_seg_size; - Box::new(move |segno_up_to: XLogSegNo| { - remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to) + Box::pin(async move { + remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await }) } @@ -436,7 +436,7 @@ impl Storage for PhysicalStorage { } /// Remove all WAL segments in timeline_dir that match the given predicate. -fn remove_segments_from_disk( +async fn remove_segments_from_disk( timeline_dir: &Path, wal_seg_size: usize, remove_predicate: impl Fn(XLogSegNo) -> bool, @@ -445,8 +445,8 @@ fn remove_segments_from_disk( let mut min_removed = u64::MAX; let mut max_removed = u64::MIN; - for entry in fs::read_dir(timeline_dir)? { - let entry = entry?; + let mut entries = fs::read_dir(timeline_dir).await?; + while let Some(entry) = entries.next_entry().await? { let entry_path = entry.path(); let fname = entry_path.file_name().unwrap(); @@ -457,7 +457,7 @@ fn remove_segments_from_disk( } let (segno, _) = XLogFromFileName(fname_str, wal_seg_size); if remove_predicate(segno) { - remove_file(entry_path)?; + remove_file(entry_path).await?; n_removed += 1; min_removed = min(min_removed, segno); max_removed = max(max_removed, segno); @@ -689,12 +689,12 @@ impl WalReader { const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; /// Helper for filling file with zeroes. -fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { +async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { while count >= XLOG_BLCKSZ { - file.write_all(ZERO_BLOCK)?; + file.write_all(ZERO_BLOCK).await?; count -= XLOG_BLCKSZ; } - file.write_all(&ZERO_BLOCK[0..count])?; + file.write_all(&ZERO_BLOCK[0..count]).await?; Ok(()) }