From 227271ccad8b6594cf5348cc81dc56ce437dc8e7 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Sat, 29 Apr 2023 03:34:28 +0400 Subject: [PATCH] Switch safekeepers to async. This is a full switch, fs io operations are also tokio ones, working through thread pool. Similar to pageserver, we have multiple runtimes for easier `top` usage and isolation. Notable points: - Now that guts of safekeeper.rs are full of .await's, we need to be very careful not to drop task at random point, leaving timeline in unclear state. Currently the only writer is walreceiver and we don't have top level cancellation there, so we are good. But to be safe probably we should add a fuse panicking if task is being dropped while operation on a timeline is in progress. - Timeline lock is Tokio one now, as we do disk IO under it. - Collecting metrics got a crutch: since prometheus Collector is synchronous, it spawns a thread with current thread runtime collecting data. - Anything involving closures becomes significantly more complicated, as async fns are already kinda closures + 'async closures are unstable'. - Main thread now tracks other main tasks, which got much easier. - The only sync place left is initial data loading, as otherwise clippy complains on timeline map lock being held across await points -- which is not bad here as it happens only in single threaded runtime of main thread. But having it sync doesn't hurt either. I'm concerned about performance of thread pool io offloading, async traits and many await points; but we can try and see how it goes. fixes https://github.com/neondatabase/neon/issues/3036 fixes https://github.com/neondatabase/neon/issues/3966 --- libs/utils/src/http/endpoint.rs | 41 +----- safekeeper/src/bin/safekeeper.rs | 173 +++++++++++++++---------- safekeeper/src/broker.rs | 40 +++--- safekeeper/src/control_file.rs | 99 ++++++++------ safekeeper/src/debug_dump.rs | 6 +- safekeeper/src/handler.rs | 6 +- safekeeper/src/http/mod.rs | 15 +++ safekeeper/src/http/routes.rs | 38 +++--- safekeeper/src/json_ctrl.rs | 18 +-- safekeeper/src/lib.rs | 59 ++++++++- safekeeper/src/metrics.rs | 42 ++++-- safekeeper/src/pull_timeline.rs | 2 +- safekeeper/src/receive_wal.rs | 64 +++++---- safekeeper/src/remove_wal.rs | 25 ++-- safekeeper/src/safekeeper.rs | 116 ++++++++++------- safekeeper/src/send_wal.rs | 6 +- safekeeper/src/timeline.rs | 171 ++++++++++++------------ safekeeper/src/timelines_global_map.rs | 27 ++-- safekeeper/src/wal_backup.rs | 46 +++---- safekeeper/src/wal_service.rs | 142 +++++++++----------- safekeeper/src/wal_storage.rs | 128 +++++++++--------- 21 files changed, 672 insertions(+), 592 deletions(-) diff --git a/libs/utils/src/http/endpoint.rs b/libs/utils/src/http/endpoint.rs index 7cb96d909458..33241dbdf76e 100644 --- a/libs/utils/src/http/endpoint.rs +++ b/libs/utils/src/http/endpoint.rs @@ -1,19 +1,18 @@ use crate::auth::{Claims, JwtAuth}; use crate::http::error::{api_error_handler, route_error_handler, ApiError}; -use anyhow::{anyhow, Context}; +use anyhow::Context; use hyper::header::{HeaderName, AUTHORIZATION}; use hyper::http::HeaderValue; use hyper::Method; -use hyper::{header::CONTENT_TYPE, Body, Request, Response, Server}; +use hyper::{header::CONTENT_TYPE, Body, Request, Response}; use metrics::{register_int_counter, Encoder, IntCounter, TextEncoder}; use once_cell::sync::Lazy; use routerify::ext::RequestExt; -use routerify::{Middleware, RequestInfo, Router, RouterBuilder, RouterService}; +use routerify::{Middleware, RequestInfo, Router, RouterBuilder}; use tokio::task::JoinError; use tracing::{self, debug, info, info_span, warn, Instrument}; use std::future::Future; -use std::net::TcpListener; use std::str::FromStr; static SERVE_METRICS_COUNT: Lazy = Lazy::new(|| { @@ -348,40 +347,6 @@ pub fn check_permission_with( } } -/// -/// Start listening for HTTP requests on given socket. -/// -/// 'shutdown_future' can be used to stop. If the Future becomes -/// ready, we stop listening for new requests, and the function returns. -/// -pub fn serve_thread_main( - router_builder: RouterBuilder, - listener: TcpListener, - shutdown_future: S, -) -> anyhow::Result<()> -where - S: Future + Send + Sync, -{ - info!("Starting an HTTP endpoint at {}", listener.local_addr()?); - - // Create a Service from the router above to handle incoming requests. - let service = RouterService::new(router_builder.build().map_err(|err| anyhow!(err))?).unwrap(); - - // Enter a single-threaded tokio runtime bound to the current thread - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let _guard = runtime.enter(); - - let server = Server::from_tcp(listener)? - .serve(service) - .with_graceful_shutdown(shutdown_future); - - runtime.block_on(server)?; - - Ok(()) -} #[cfg(test)] mod tests { use super::*; diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index fecbb8bd414d..0625538bf316 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -3,15 +3,19 @@ // use anyhow::{bail, Context, Result}; use clap::Parser; +use futures::future::BoxFuture; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, StreamExt}; use remote_storage::RemoteStorageConfig; +use tokio::runtime::Handle; +use tokio::signal::unix::{signal, SignalKind}; +use tokio::task::JoinError; use toml_edit::Document; -use utils::signals::ShutdownSignals; use std::fs::{self, File}; use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; -use std::thread; use std::time::Duration; use storage_broker::Uri; use tokio::sync::mpsc; @@ -20,22 +24,21 @@ use tracing::*; use utils::pid_file; use metrics::set_build_info_metric; -use safekeeper::broker; -use safekeeper::control_file; use safekeeper::defaults::{ DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_PG_LISTEN_ADDR, }; -use safekeeper::http; -use safekeeper::remove_wal; -use safekeeper::wal_backup; use safekeeper::wal_service; use safekeeper::GlobalTimelines; use safekeeper::SafeKeeperConf; +use safekeeper::{broker, WAL_SERVICE_RUNTIME}; +use safekeeper::{control_file, BROKER_RUNTIME}; +use safekeeper::{http, WAL_REMOVER_RUNTIME}; +use safekeeper::{remove_wal, WAL_BACKUP_RUNTIME}; +use safekeeper::{wal_backup, HTTP_RUNTIME}; use storage_broker::DEFAULT_ENDPOINT; use utils::auth::JwtAuth; use utils::{ - http::endpoint, id::NodeId, logging::{self, LogFormat}, project_git_version, @@ -104,10 +107,6 @@ struct Args { /// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes #[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)] max_offloader_lag: u64, - /// Number of threads for wal backup runtime, by default number of cores - /// available to the system. - #[arg(long)] - wal_backup_threads: Option, /// Number of max parallel WAL segments to be offloaded to remote storage. #[arg(long, default_value = "5")] wal_backup_parallel_jobs: usize, @@ -121,9 +120,14 @@ struct Args { /// Format for logging, either 'plain' or 'json'. #[arg(long, default_value = "plain")] log_format: String, + /// Run everything in single threaded current thread runtime, might be + /// useful for debugging. + #[arg(long)] + current_thread_runtime: bool, } -fn main() -> anyhow::Result<()> { +#[tokio::main(flavor = "current_thread")] +async fn main() -> anyhow::Result<()> { let args = Args::parse(); if let Some(addr) = args.dump_control_file { @@ -183,10 +187,10 @@ fn main() -> anyhow::Result<()> { heartbeat_timeout: args.heartbeat_timeout, remote_storage: args.remote_storage, max_offloader_lag_bytes: args.max_offloader_lag, - backup_runtime_threads: args.wal_backup_threads, wal_backup_enabled: !args.disable_wal_backup, backup_parallel_jobs: args.wal_backup_parallel_jobs, auth, + current_thread_runtime: args.current_thread_runtime, }; // initialize sentry if SENTRY_DSN is provided @@ -194,10 +198,14 @@ fn main() -> anyhow::Result<()> { Some(GIT_VERSION.into()), &[("node_id", &conf.my_id.to_string())], ); - start_safekeeper(conf) + start_safekeeper(conf).await } -fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { +/// Result of joining any of main tasks: upper error means task failed to +/// complete, e.g. panicked, inner is error produced by task itself. +type JoinTaskRes = Result, JoinError>; + +async fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { // Prevent running multiple safekeepers on the same directory let lock_file_path = conf.workdir.join(PID_FILE_NAME); let lock_file = @@ -208,14 +216,18 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { // we need to release the lock file only when the current process is gone std::mem::forget(lock_file); - let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| { - error!("failed to bind to address {}: {}", conf.listen_http_addr, e); + info!("starting safekeeper WAL service on {}", conf.listen_pg_addr); + let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| { + error!("failed to bind to address {}: {}", conf.listen_pg_addr, e); e })?; - info!("starting safekeeper on {}", conf.listen_pg_addr); - let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| { - error!("failed to bind to address {}: {}", conf.listen_pg_addr, e); + info!( + "starting safekeeper HTTP service on {}", + conf.listen_http_addr + ); + let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| { + error!("failed to bind to address {}: {}", conf.listen_http_addr, e); e })?; @@ -224,71 +236,88 @@ fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { let timeline_collector = safekeeper::metrics::TimelineCollector::new(); metrics::register_internal(Box::new(timeline_collector))?; - let mut threads = vec![]; let (wal_backup_launcher_tx, wal_backup_launcher_rx) = mpsc::channel(100); // Load all timelines from disk to memory. GlobalTimelines::init(conf.clone(), wal_backup_launcher_tx)?; - let conf_ = conf.clone(); - threads.push( - thread::Builder::new() - .name("http_endpoint_thread".into()) - .spawn(|| { - let router = http::make_router(conf_); - endpoint::serve_thread_main( - router, - http_listener, - std::future::pending(), // never shut down - ) - .unwrap(); - })?, - ); + // Keep handles to main tasks to die if any of them disappears. + let mut tasks_handles: FuturesUnordered> = + FuturesUnordered::new(); - let conf_cloned = conf.clone(); - let safekeeper_thread = thread::Builder::new() - .name("WAL service thread".into()) - .spawn(|| wal_service::thread_main(conf_cloned, pg_listener)) - .unwrap(); + let conf_ = conf.clone(); + // Run everything in current thread rt, if asked. + if conf.current_thread_runtime { + info!("running in current thread runtime"); + } + let current_thread_rt = conf + .current_thread_runtime + .then(|| Handle::try_current().expect("no runtime in main")); + let wal_service_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| WAL_SERVICE_RUNTIME.handle()) + .spawn(wal_service::task_main(conf_, pg_listener)) + // wrap with task name for error reporting + .map(|res| ("WAL service main".to_owned(), res)); + tasks_handles.push(Box::pin(wal_service_handle)); - threads.push(safekeeper_thread); + let conf_ = conf.clone(); + let http_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| HTTP_RUNTIME.handle()) + .spawn(http::task_main(conf_, http_listener)) + .map(|res| ("HTTP service main".to_owned(), res)); + tasks_handles.push(Box::pin(http_handle)); let conf_ = conf.clone(); - threads.push( - thread::Builder::new() - .name("broker thread".into()) - .spawn(|| { - broker::thread_main(conf_); - })?, - ); + let broker_task_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| BROKER_RUNTIME.handle()) + .spawn(broker::task_main(conf_).instrument(info_span!("broker"))) + .map(|res| ("broker main".to_owned(), res)); + tasks_handles.push(Box::pin(broker_task_handle)); let conf_ = conf.clone(); - threads.push( - thread::Builder::new() - .name("WAL removal thread".into()) - .spawn(|| { - remove_wal::thread_main(conf_); - })?, - ); + let wal_remover_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| WAL_REMOVER_RUNTIME.handle()) + .spawn(remove_wal::task_main(conf_)) + .map(|res| ("WAL remover".to_owned(), res)); + tasks_handles.push(Box::pin(wal_remover_handle)); - threads.push( - thread::Builder::new() - .name("WAL backup launcher thread".into()) - .spawn(move || { - wal_backup::wal_backup_launcher_thread_main(conf, wal_backup_launcher_rx); - })?, - ); + let conf_ = conf.clone(); + let wal_backup_handle = current_thread_rt + .as_ref() + .unwrap_or_else(|| WAL_BACKUP_RUNTIME.handle()) + .spawn(wal_backup::wal_backup_launcher_task_main( + conf_, + wal_backup_launcher_rx, + )) + .map(|res| ("WAL backup launcher".to_owned(), res)); + tasks_handles.push(Box::pin(wal_backup_handle)); set_build_info_metric(GIT_VERSION); - // TODO: put more thoughts into handling of failed threads - // We should catch & die if they are in trouble. - - // On any shutdown signal, log receival and exit. Additionally, handling - // SIGQUIT prevents coredump. - ShutdownSignals::handle(|signal| { - info!("received {}, terminating", signal.name()); - std::process::exit(0); - }) + + // TODO: update tokio-stream, convert to real async Stream with + // SignalStream, map it to obtain missing signal name, combine streams into + // single stream we can easily sit on. + let mut sigquit_stream = signal(SignalKind::quit())?; + let mut sigint_stream = signal(SignalKind::interrupt())?; + let mut sigterm_stream = signal(SignalKind::terminate())?; + + tokio::select! { + Some((task_name, res)) = tasks_handles.next()=> { + error!("{} task failed: {:?}, exiting", task_name, res); + std::process::exit(1); + } + // On any shutdown signal, log receival and exit. Additionally, handling + // SIGQUIT prevents coredump. + _ = sigquit_stream.recv() => info!("received SIGQUIT, terminating"), + _ = sigint_stream.recv() => info!("received SIGINT, terminating"), + _ = sigterm_stream.recv() => info!("received SIGTERM, terminating") + + }; + std::process::exit(0); } /// Determine safekeeper id. diff --git a/safekeeper/src/broker.rs b/safekeeper/src/broker.rs index 3282afc72d75..2b1db2714b10 100644 --- a/safekeeper/src/broker.rs +++ b/safekeeper/src/broker.rs @@ -16,7 +16,7 @@ use storage_broker::Request; use std::time::Duration; use std::time::Instant; use tokio::task::JoinHandle; -use tokio::{runtime, time::sleep}; +use tokio::time::sleep; use tracing::*; use crate::metrics::BROKER_ITERATION_TIMELINES; @@ -29,20 +29,6 @@ use crate::SafeKeeperConf; const RETRY_INTERVAL_MSEC: u64 = 1000; const PUSH_INTERVAL_MSEC: u64 = 1000; -pub fn thread_main(conf: SafeKeeperConf) { - let runtime = runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let _enter = info_span!("broker").entered(); - info!("started, broker endpoint {:?}", conf.broker_endpoint); - - runtime.block_on(async { - main_loop(conf).await; - }); -} - /// Push once in a while data about all active timelines to the broker. async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { let mut client = @@ -56,20 +42,27 @@ async fn push_loop(conf: SafeKeeperConf) -> anyhow::Result<()> { // sensitive and there is no risk of deadlock as we don't await while // lock is held. let now = Instant::now(); - let mut active_tlis = GlobalTimelines::get_all(); - active_tlis.retain(|tli| tli.is_active()); - for tli in &active_tlis { - let sk_info = tli.get_safekeeper_info(&conf); + let all_tlis = GlobalTimelines::get_all(); + let mut n_pushed_tlis = 0; + for tli in &all_tlis { + // filtering alternative futures::stream::iter(all_tlis) + // .filter(|tli| {let tli = tli.clone(); async move { tli.is_active().await}}).collect::>().await; + // doesn't look better, and I'm not sure how to do that without collect. + if !tli.is_active().await { + continue; + } + let sk_info = tli.get_safekeeper_info(&conf).await; yield sk_info; BROKER_PUSHED_UPDATES.inc(); + n_pushed_tlis += 1; } let elapsed = now.elapsed(); BROKER_PUSH_ALL_UPDATES_SECONDS.observe(elapsed.as_secs_f64()); - BROKER_ITERATION_TIMELINES.observe(active_tlis.len() as f64); + BROKER_ITERATION_TIMELINES.observe(n_pushed_tlis as f64); if elapsed > push_interval / 2 { - info!("broker push is too long, pushed {} timeline updates to broker in {:?}", active_tlis.len(), elapsed); + info!("broker push is too long, pushed {} timeline updates to broker in {:?}", n_pushed_tlis, elapsed); } sleep(push_interval).await; @@ -126,10 +119,13 @@ async fn pull_loop(conf: SafeKeeperConf) -> Result<()> { bail!("end of stream"); } -async fn main_loop(conf: SafeKeeperConf) { +pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { + info!("started, broker endpoint {:?}", conf.broker_endpoint); + let mut ticker = tokio::time::interval(Duration::from_millis(RETRY_INTERVAL_MSEC)); let mut push_handle: Option>> = None; let mut pull_handle: Option>> = None; + // Selecting on JoinHandles requires some squats; is there a better way to // reap tasks individually? diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index b1b0c032d7c6..6c4ad24323d2 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -2,9 +2,10 @@ use anyhow::{bail, ensure, Context, Result}; use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt}; +use tokio::fs::{self, File}; +use tokio::io::AsyncWriteExt; -use std::fs::{self, File, OpenOptions}; -use std::io::{Read, Write}; +use std::io::Read; use std::ops::Deref; use std::path::{Path, PathBuf}; use std::time::Instant; @@ -26,9 +27,10 @@ pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); /// Storage should keep actual state inside of it. It should implement Deref /// trait to access state fields and have persist method for updating that state. +#[async_trait::async_trait] pub trait Storage: Deref { /// Persist safekeeper state on disk and update internal state. - fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; + async fn persist(&mut self, s: &SafeKeeperState) -> Result<()>; /// Timestamp of last persist. fn last_persist_at(&self) -> Instant; @@ -82,7 +84,7 @@ impl FileStorage { /// Check the magic/version in the on-disk data and deserialize it, if possible. fn deser_sk_state(buf: &mut &[u8]) -> Result { // Read the version independent part - let magic = buf.read_u32::()?; + let magic = ReadBytesExt::read_u32::(buf)?; if magic != SK_MAGIC { bail!( "bad control file magic: {:X}, expected {:X}", @@ -90,7 +92,7 @@ impl FileStorage { SK_MAGIC ); } - let version = buf.read_u32::()?; + let version = ReadBytesExt::read_u32::(buf)?; if version == SK_FORMAT_VERSION { let res = SafeKeeperState::des(buf)?; return Ok(res); @@ -110,7 +112,7 @@ impl FileStorage { /// Read in the control file. pub fn load_control_file>(control_file_path: P) -> Result { - let mut control_file = OpenOptions::new() + let mut control_file = std::fs::OpenOptions::new() .read(true) .write(true) .open(&control_file_path) @@ -159,30 +161,31 @@ impl Deref for FileStorage { } } +#[async_trait::async_trait] impl Storage for FileStorage { /// persists state durably to underlying storage /// for description see https://lwn.net/Articles/457667/ - fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { + async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { let _timer = PERSIST_CONTROL_FILE_SECONDS.start_timer(); // write data to safekeeper.control.partial let control_partial_path = self.timeline_dir.join(CONTROL_FILE_NAME_PARTIAL); - let mut control_partial = File::create(&control_partial_path).with_context(|| { + let mut control_partial = File::create(&control_partial_path).await.with_context(|| { format!( "failed to create partial control file at: {}", &control_partial_path.display() ) })?; let mut buf: Vec = Vec::new(); - buf.write_u32::(SK_MAGIC)?; - buf.write_u32::(SK_FORMAT_VERSION)?; + WriteBytesExt::write_u32::(&mut buf, SK_MAGIC)?; + WriteBytesExt::write_u32::(&mut buf, SK_FORMAT_VERSION)?; s.ser_into(&mut buf)?; // calculate checksum before resize let checksum = crc32c::crc32c(&buf); buf.extend_from_slice(&checksum.to_le_bytes()); - control_partial.write_all(&buf).with_context(|| { + control_partial.write_all(&buf).await.with_context(|| { format!( "failed to write safekeeper state into control file at: {}", control_partial_path.display() @@ -191,7 +194,7 @@ impl Storage for FileStorage { // fsync the file if !self.conf.no_sync { - control_partial.sync_all().with_context(|| { + control_partial.sync_all().await.with_context(|| { format!( "failed to sync partial control file at {}", control_partial_path.display() @@ -202,21 +205,22 @@ impl Storage for FileStorage { let control_path = self.timeline_dir.join(CONTROL_FILE_NAME); // rename should be atomic - fs::rename(&control_partial_path, &control_path)?; + fs::rename(&control_partial_path, &control_path).await?; // this sync is not required by any standard but postgres does this (see durable_rename) if !self.conf.no_sync { - File::open(&control_path) - .and_then(|f| f.sync_all()) - .with_context(|| { - format!( - "failed to sync control file at: {}", - &control_path.display() - ) - })?; + let new_f = File::open(&control_path).await?; + new_f.sync_all().await.with_context(|| { + format!( + "failed to sync control file at: {}", + &control_path.display() + ) + })?; // fsync the directory (linux specific) - File::open(&self.timeline_dir) - .and_then(|f| f.sync_all()) + let tli_dir = File::open(&self.timeline_dir).await?; + tli_dir + .sync_all() + .await .context("failed to sync control file directory")?; } @@ -236,7 +240,6 @@ mod test { use super::*; use crate::{safekeeper::SafeKeeperState, SafeKeeperConf}; use anyhow::Result; - use std::fs; use utils::{id::TenantTimelineId, lsn::Lsn}; fn stub_conf() -> SafeKeeperConf { @@ -247,59 +250,75 @@ mod test { } } - fn load_from_control_file( + async fn load_from_control_file( conf: &SafeKeeperConf, ttid: &TenantTimelineId, ) -> Result<(FileStorage, SafeKeeperState)> { - fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir"); + fs::create_dir_all(conf.timeline_dir(ttid)) + .await + .expect("failed to create timeline dir"); Ok(( FileStorage::restore_new(ttid, conf)?, FileStorage::load_control_file_conf(conf, ttid)?, )) } - fn create( + async fn create( conf: &SafeKeeperConf, ttid: &TenantTimelineId, ) -> Result<(FileStorage, SafeKeeperState)> { - fs::create_dir_all(conf.timeline_dir(ttid)).expect("failed to create timeline dir"); + fs::create_dir_all(conf.timeline_dir(ttid)) + .await + .expect("failed to create timeline dir"); let state = SafeKeeperState::empty(); let storage = FileStorage::create_new(ttid, conf, state.clone())?; Ok((storage, state)) } - #[test] - fn test_read_write_safekeeper_state() { + #[tokio::test] + async fn test_read_write_safekeeper_state() { let conf = stub_conf(); let ttid = TenantTimelineId::generate(); { - let (mut storage, mut state) = create(&conf, &ttid).expect("failed to create state"); + let (mut storage, mut state) = + create(&conf, &ttid).await.expect("failed to create state"); // change something state.commit_lsn = Lsn(42); - storage.persist(&state).expect("failed to persist state"); + storage + .persist(&state) + .await + .expect("failed to persist state"); } - let (_, state) = load_from_control_file(&conf, &ttid).expect("failed to read state"); + let (_, state) = load_from_control_file(&conf, &ttid) + .await + .expect("failed to read state"); assert_eq!(state.commit_lsn, Lsn(42)); } - #[test] - fn test_safekeeper_state_checksum_mismatch() { + #[tokio::test] + async fn test_safekeeper_state_checksum_mismatch() { let conf = stub_conf(); let ttid = TenantTimelineId::generate(); { - let (mut storage, mut state) = create(&conf, &ttid).expect("failed to read state"); + let (mut storage, mut state) = + create(&conf, &ttid).await.expect("failed to read state"); // change something state.commit_lsn = Lsn(42); - storage.persist(&state).expect("failed to persist state"); + storage + .persist(&state) + .await + .expect("failed to persist state"); } let control_path = conf.timeline_dir(&ttid).join(CONTROL_FILE_NAME); - let mut data = fs::read(&control_path).unwrap(); + let mut data = fs::read(&control_path).await.unwrap(); data[0] += 1; // change the first byte of the file to fail checksum validation - fs::write(&control_path, &data).expect("failed to write control file"); + fs::write(&control_path, &data) + .await + .expect("failed to write control file"); - match load_from_control_file(&conf, &ttid) { + match load_from_control_file(&conf, &ttid).await { Err(err) => assert!(err .to_string() .contains("safekeeper control file checksum mismatch")), diff --git a/safekeeper/src/debug_dump.rs b/safekeeper/src/debug_dump.rs index f711c4429d7f..387b577a1370 100644 --- a/safekeeper/src/debug_dump.rs +++ b/safekeeper/src/debug_dump.rs @@ -121,7 +121,7 @@ pub struct FileInfo { } /// Build debug dump response, using the provided [`Args`] filters. -pub fn build(args: Args) -> Result { +pub async fn build(args: Args) -> Result { let start_time = Utc::now(); let timelines_count = GlobalTimelines::timelines_count(); @@ -155,7 +155,7 @@ pub fn build(args: Args) -> Result { } let control_file = if args.dump_control_file { - let mut state = tli.get_state().1; + let mut state = tli.get_state().await.1; if !args.dump_term_history { state.acceptor_state.term_history = TermHistory(vec![]); } @@ -165,7 +165,7 @@ pub fn build(args: Args) -> Result { }; let memory = if args.dump_memory { - Some(tli.memory_dump()) + Some(tli.memory_dump().await) } else { None }; diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index 7d25ced4493c..1367d5eebbc3 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -256,14 +256,14 @@ impl SafekeeperPostgresHandler { let lsn = if self.is_walproposer_recovery() { // walproposer should get all local WAL until flush_lsn - tli.get_flush_lsn() + tli.get_flush_lsn().await } else { // other clients shouldn't get any uncommitted WAL - tli.get_state().0.commit_lsn + tli.get_state().await.0.commit_lsn } .to_string(); - let sysid = tli.get_state().1.server.system_id.to_string(); + let sysid = tli.get_state().await.1.server.system_id.to_string(); let lsn_bytes = lsn.as_bytes(); let tli = PG_TLI.to_string(); let tli_bytes = tli.as_bytes(); diff --git a/safekeeper/src/http/mod.rs b/safekeeper/src/http/mod.rs index 1831470007f6..2a9570595f8a 100644 --- a/safekeeper/src/http/mod.rs +++ b/safekeeper/src/http/mod.rs @@ -2,3 +2,18 @@ pub mod routes; pub use routes::make_router; pub use safekeeper_api::models; + +use crate::SafeKeeperConf; + +pub async fn task_main( + conf: SafeKeeperConf, + http_listener: std::net::TcpListener, +) -> anyhow::Result<()> { + let router = make_router(conf) + .build() + .map_err(|err| anyhow::anyhow!(err))?; + let service = utils::http::RouterService::new(router).unwrap(); + let server = hyper::Server::from_tcp(http_listener)?; + server.serve(service).await?; + Ok(()) // unreachable +} diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index a498d868afc8..b26da55be511 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -13,7 +13,6 @@ use storage_broker::proto::SafekeeperTimelineInfo; use storage_broker::proto::TenantTimelineId as ProtoTenantTimelineId; use tokio::fs::File; use tokio::io::AsyncReadExt; -use tokio::task::JoinError; use crate::safekeeper::ServerInfo; use crate::safekeeper::Term; @@ -116,8 +115,8 @@ async fn timeline_status_handler(request: Request) -> Result) -> Result timeline_id, }; - let resp = tokio::task::spawn_blocking(move || { - debug_dump::build(args).map_err(ApiError::InternalServerError) - }) - .await - .map_err(|e: JoinError| ApiError::InternalServerError(e.into()))??; + let resp = debug_dump::build(args) + .await + .map_err(ApiError::InternalServerError)?; // TODO: use streaming response json_response(StatusCode::OK, resp) diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs index dc9188723e4e..14d0cc3653fe 100644 --- a/safekeeper/src/json_ctrl.rs +++ b/safekeeper/src/json_ctrl.rs @@ -73,12 +73,12 @@ pub async fn handle_json_ctrl( // if send_proposer_elected is true, we need to update local history if append_request.send_proposer_elected { - send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn)?; + send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?; } - let inserted_wal = append_logical_message(&tli, append_request)?; + let inserted_wal = append_logical_message(&tli, append_request).await?; let response = AppendResult { - state: tli.get_state().1, + state: tli.get_state().await.1, inserted_wal, }; let response_data = serde_json::to_vec(&response) @@ -114,9 +114,9 @@ async fn prepare_safekeeper( .await } -fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> anyhow::Result<()> { +async fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> anyhow::Result<()> { // add new term to existing history - let history = tli.get_state().1.acceptor_state.term_history; + let history = tli.get_state().await.1.acceptor_state.term_history; let history = history.up_to(lsn.checked_sub(1u64).unwrap()); let mut history_entries = history.0; history_entries.push(TermSwitchEntry { term, lsn }); @@ -129,7 +129,7 @@ fn send_proposer_elected(tli: &Arc, term: Term, lsn: Lsn) -> anyhow::R timeline_start_lsn: lsn, }); - tli.process_msg(&proposer_elected_request)?; + tli.process_msg(&proposer_elected_request).await?; Ok(()) } @@ -142,12 +142,12 @@ pub struct InsertedWAL { /// Extend local WAL with new LogicalMessage record. To do that, /// create AppendRequest with new WAL and pass it to safekeeper. -pub fn append_logical_message( +pub async fn append_logical_message( tli: &Arc, msg: &AppendLogicalMessage, ) -> anyhow::Result { let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); - let sk_state = tli.get_state().1; + let sk_state = tli.get_state().await.1; let begin_lsn = msg.begin_lsn; let end_lsn = begin_lsn + wal_data.len() as u64; @@ -171,7 +171,7 @@ pub fn append_logical_message( wal_data: Bytes::from(wal_data), }); - let response = tli.process_msg(&append_request)?; + let response = tli.process_msg(&append_request).await?; let append_response = match response { Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 22d6d57e19b6..b8e110136948 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -1,4 +1,6 @@ +use once_cell::sync::Lazy; use remote_storage::RemoteStorageConfig; +use tokio::runtime::Runtime; use std::path::PathBuf; use std::time::Duration; @@ -36,7 +38,6 @@ pub mod defaults { DEFAULT_PG_LISTEN_PORT, }; - pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8; pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms"; pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20); } @@ -60,10 +61,10 @@ pub struct SafeKeeperConf { pub heartbeat_timeout: Duration, pub remote_storage: Option, pub max_offloader_lag_bytes: u64, - pub backup_runtime_threads: Option, pub backup_parallel_jobs: usize, pub wal_backup_enabled: bool, pub auth: Option>, + pub current_thread_runtime: bool, } impl SafeKeeperConf { @@ -92,12 +93,64 @@ impl SafeKeeperConf { .parse() .expect("failed to parse default broker endpoint"), broker_keepalive_interval: Duration::from_secs(5), - backup_runtime_threads: None, wal_backup_enabled: true, backup_parallel_jobs: 1, auth: None, heartbeat_timeout: Duration::new(5, 0), max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES, + current_thread_runtime: false, } } } + +// Tokio runtimes. +pub static WAL_SERVICE_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("WAL service worker") + .enable_all() + .build() + .expect("Failed to create WAL service runtime") +}); + +pub static HTTP_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("HTTP worker") + .enable_all() + .build() + .expect("Failed to create WAL service runtime") +}); + +pub static BROKER_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("broker worker") + .worker_threads(2) // there are only 2 tasks, having more threads doesn't make sense + .enable_all() + .build() + .expect("Failed to create broker runtime") +}); + +pub static WAL_REMOVER_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("WAL remover") + .worker_threads(1) + .enable_all() + .build() + .expect("Failed to create broker runtime") +}); + +pub static WAL_BACKUP_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("WAL backup worker") + .enable_all() + .build() + .expect("Failed to create WAL backup runtime") +}); + +pub static METRICS_SHIFTER_RUNTIME: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_multi_thread() + .thread_name("metric shifter") + .worker_threads(1) + .enable_all() + .build() + .expect("Failed to create broker runtime") +}); diff --git a/safekeeper/src/metrics.rs b/safekeeper/src/metrics.rs index 235a88501d20..0711beb2905d 100644 --- a/safekeeper/src/metrics.rs +++ b/safekeeper/src/metrics.rs @@ -7,6 +7,7 @@ use std::{ use ::metrics::{register_histogram, GaugeVec, Histogram, IntGauge, DISK_WRITE_SECONDS_BUCKETS}; use anyhow::Result; +use futures::Future; use metrics::{ core::{AtomicU64, Collector, Desc, GenericCounter, GenericGaugeVec, Opts}, proto::MetricFamily, @@ -292,14 +293,17 @@ impl WalStorageMetrics { } } -/// Accepts a closure that returns a result, and returns the duration of the closure. -pub fn time_io_closure(closure: impl FnOnce() -> Result<()>) -> Result { +/// Accepts async function that returns empty anyhow result, and returns the duration of its execution. +pub async fn time_io_closure>( + closure: impl Future>, +) -> Result { let start = std::time::Instant::now(); - closure()?; + closure.await.map_err(|e| e.into())?; Ok(start.elapsed().as_secs_f64()) } /// Metrics for a single timeline. +#[derive(Clone)] pub struct FullTimelineInfo { pub ttid: TenantTimelineId, pub ps_feedback: PageserverFeedback, @@ -575,13 +579,19 @@ impl Collector for TimelineCollector { let timelines = GlobalTimelines::get_all(); let timelines_count = timelines.len(); - for arc_tli in timelines { - let tli = arc_tli.info_for_metrics(); - if tli.is_none() { - continue; - } - let tli = tli.unwrap(); - + // Prometheus Collector is sync, and data is stored under async lock. To + // bridge the gap with a crutch, collect data in spawned thread with + // local tokio runtime. + let infos = std::thread::spawn(|| { + let rt = tokio::runtime::Builder::new_current_thread() + .build() + .expect("failed to create rt"); + rt.block_on(collect_timeline_metrics()) + }) + .join() + .expect("collect_timeline_metrics thread panicked"); + + for tli in &infos { let tenant_id = tli.ttid.tenant_id.to_string(); let timeline_id = tli.ttid.timeline_id.to_string(); let labels = &[tenant_id.as_str(), timeline_id.as_str()]; @@ -682,3 +692,15 @@ impl Collector for TimelineCollector { mfs } } + +async fn collect_timeline_metrics() -> Vec { + let mut res = vec![]; + let timelines = GlobalTimelines::get_all(); + + for tli in timelines { + if let Some(info) = tli.info_for_metrics().await { + res.push(info); + } + } + res +} diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 344b760fd3cb..61ba37efaa22 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -231,7 +231,7 @@ async fn pull_timeline(status: TimelineStatus, host: String) -> Result info!( "Loaded timeline {}, flush_lsn={}", ttid, - tli.get_flush_lsn() + tli.get_flush_lsn().await ); Ok(Response { diff --git a/safekeeper/src/receive_wal.rs b/safekeeper/src/receive_wal.rs index 195470e3ca91..a5e99c5f0a30 100644 --- a/safekeeper/src/receive_wal.rs +++ b/safekeeper/src/receive_wal.rs @@ -18,15 +18,14 @@ use postgres_backend::QueryError; use pq_proto::BeMessage; use std::net::SocketAddr; use std::sync::Arc; -use std::thread; -use std::thread::JoinHandle; use tokio::io::AsyncRead; use tokio::io::AsyncWrite; use tokio::sync::mpsc::channel; use tokio::sync::mpsc::error::TryRecvError; use tokio::sync::mpsc::Receiver; use tokio::sync::mpsc::Sender; -use tokio::task::spawn_blocking; +use tokio::task; +use tokio::task::JoinHandle; use tokio::time::Duration; use tokio::time::Instant; use tracing::*; @@ -97,7 +96,7 @@ impl SafekeeperPostgresHandler { Err(res.expect_err("no error with WalAcceptor not spawn")) } Some(handle) => { - let wal_acceptor_res = handle.join(); + let wal_acceptor_res = handle.await; // If there was any network error, return it. res?; @@ -107,7 +106,7 @@ impl SafekeeperPostgresHandler { Ok(Ok(_)) => Ok(()), // can't happen currently; would be if we add graceful termination Ok(Err(e)) => Err(CopyStreamHandlerEnd::Other(e.context("WAL acceptor"))), Err(_) => Err(CopyStreamHandlerEnd::Other(anyhow!( - "WalAcceptor thread panicked", + "WalAcceptor task panicked", ))), } } @@ -154,10 +153,12 @@ impl<'a, IO: AsyncRead + AsyncWrite + Unpin> NetworkReader<'a, IO> { } }; - *self.acceptor_handle = Some( - WalAcceptor::spawn(tli.clone(), msg_rx, reply_tx, self.conn_id) - .context("spawn WalAcceptor thread")?, - ); + *self.acceptor_handle = Some(WalAcceptor::spawn( + tli.clone(), + msg_rx, + reply_tx, + self.conn_id, + )); // Forward all messages to WalAcceptor read_network_loop(self.pgb_reader, msg_tx, next_msg).await @@ -226,28 +227,19 @@ impl WalAcceptor { msg_rx: Receiver, reply_tx: Sender, conn_id: ConnectionId, - ) -> anyhow::Result>> { - let thread_name = format!("WAL acceptor {}", tli.ttid); - thread::Builder::new() - .name(thread_name) - .spawn(move || -> anyhow::Result<()> { - let mut wa = WalAcceptor { - tli, - msg_rx, - reply_tx, - }; + ) -> JoinHandle> { + task::spawn(async move { + let mut wa = WalAcceptor { + tli, + msg_rx, + reply_tx, + }; - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - - let span_ttid = wa.tli.ttid; // satisfy borrow checker - runtime.block_on( - wa.run() - .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid)), - ) - }) - .map_err(anyhow::Error::from) + let span_ttid = wa.tli.ttid; // satisfy borrow checker + wa.run() + .instrument(info_span!("WAL acceptor", cid = %conn_id, ttid = %span_ttid)) + .await + }) } /// The main loop. Returns Ok(()) if either msg_rx or reply_tx got closed; @@ -281,7 +273,7 @@ impl WalAcceptor { while let ProposerAcceptorMessage::AppendRequest(append_request) = next_msg { let noflush_msg = ProposerAcceptorMessage::NoFlushAppendRequest(append_request); - if let Some(reply) = self.tli.process_msg(&noflush_msg)? { + if let Some(reply) = self.tli.process_msg(&noflush_msg).await? { if self.reply_tx.send(reply).await.is_err() { return Ok(()); // chan closed, streaming terminated } @@ -300,10 +292,12 @@ impl WalAcceptor { } // flush all written WAL to the disk - self.tli.process_msg(&ProposerAcceptorMessage::FlushWAL)? + self.tli + .process_msg(&ProposerAcceptorMessage::FlushWAL) + .await? } else { // process message other than AppendRequest - self.tli.process_msg(&next_msg)? + self.tli.process_msg(&next_msg).await? }; if let Some(reply) = reply_msg { @@ -326,8 +320,8 @@ impl Drop for ComputeConnectionGuard { let tli = self.timeline.clone(); // tokio forbids to call blocking_send inside the runtime, and see // comments in on_compute_disconnect why we call blocking_send. - spawn_blocking(move || { - if let Err(e) = tli.on_compute_disconnect() { + tokio::spawn(async move { + if let Err(e) = tli.on_compute_disconnect().await { error!("failed to unregister compute connection: {}", e); } }); diff --git a/safekeeper/src/remove_wal.rs b/safekeeper/src/remove_wal.rs index ad9d655fae02..3306f0b63a39 100644 --- a/safekeeper/src/remove_wal.rs +++ b/safekeeper/src/remove_wal.rs @@ -1,29 +1,36 @@ //! Thread removing old WAL. -use std::{thread, time::Duration}; +use std::time::Duration; +use tokio::time::sleep; use tracing::*; use crate::{GlobalTimelines, SafeKeeperConf}; -pub fn thread_main(conf: SafeKeeperConf) { +pub async fn task_main(conf: SafeKeeperConf) -> anyhow::Result<()> { let wal_removal_interval = Duration::from_millis(5000); loop { let tlis = GlobalTimelines::get_all(); for tli in &tlis { - if !tli.is_active() { + if !tli.is_active().await { continue; } let ttid = tli.ttid; - let _enter = - info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id).entered(); - if let Err(e) = tli.maybe_pesist_control_file() { + if let Err(e) = tli + .maybe_persist_control_file() + .instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id)) + .await + { warn!("failed to persist control file: {e}"); } - if let Err(e) = tli.remove_old_wal(conf.wal_backup_enabled) { - warn!("failed to remove WAL: {}", e); + if let Err(e) = tli + .remove_old_wal(conf.wal_backup_enabled) + .instrument(info_span!("", tenant = %ttid.tenant_id, timeline = %ttid.timeline_id)) + .await + { + error!("failed to remove WAL: {}", e); } } - thread::sleep(wal_removal_interval) + sleep(wal_removal_interval).await; } } diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 7378ccb994f5..d0b14a1282ef 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -568,25 +568,27 @@ where /// Process message from proposer and possibly form reply. Concurrent /// callers must exclude each other. - pub fn process_msg( + pub async fn process_msg( &mut self, msg: &ProposerAcceptorMessage, ) -> Result> { match msg { - ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg), - ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg), - ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg), - ProposerAcceptorMessage::AppendRequest(msg) => self.handle_append_request(msg, true), + ProposerAcceptorMessage::Greeting(msg) => self.handle_greeting(msg).await, + ProposerAcceptorMessage::VoteRequest(msg) => self.handle_vote_request(msg).await, + ProposerAcceptorMessage::Elected(msg) => self.handle_elected(msg).await, + ProposerAcceptorMessage::AppendRequest(msg) => { + self.handle_append_request(msg, true).await + } ProposerAcceptorMessage::NoFlushAppendRequest(msg) => { - self.handle_append_request(msg, false) + self.handle_append_request(msg, false).await } - ProposerAcceptorMessage::FlushWAL => self.handle_flush(), + ProposerAcceptorMessage::FlushWAL => self.handle_flush().await, } } /// Handle initial message from proposer: check its sanity and send my /// current term. - fn handle_greeting( + async fn handle_greeting( &mut self, msg: &ProposerGreeting, ) -> Result> { @@ -649,7 +651,7 @@ where if msg.pg_version != UNKNOWN_SERVER_VERSION { state.server.pg_version = msg.pg_version; } - self.state.persist(&state)?; + self.state.persist(&state).await?; } info!( @@ -664,7 +666,7 @@ where } /// Give vote for the given term, if we haven't done that previously. - fn handle_vote_request( + async fn handle_vote_request( &mut self, msg: &VoteRequest, ) -> Result> { @@ -678,7 +680,7 @@ where // handle_elected instead. Currently not a big deal, as proposer is the // only source of WAL; with peer2peer recovery it would be more // important. - self.wal_store.flush_wal()?; + self.wal_store.flush_wal().await?; // initialize with refusal let mut resp = VoteResponse { term: self.state.acceptor_state.term, @@ -692,7 +694,7 @@ where let mut state = self.state.clone(); state.acceptor_state.term = msg.term; // persist vote before sending it out - self.state.persist(&state)?; + self.state.persist(&state).await?; resp.term = self.state.acceptor_state.term; resp.vote_given = true as u64; @@ -715,12 +717,15 @@ where ar } - fn handle_elected(&mut self, msg: &ProposerElected) -> Result> { + async fn handle_elected( + &mut self, + msg: &ProposerElected, + ) -> Result> { info!("received ProposerElected {:?}", msg); if self.state.acceptor_state.term < msg.term { let mut state = self.state.clone(); state.acceptor_state.term = msg.term; - self.state.persist(&state)?; + self.state.persist(&state).await?; } // If our term is higher, ignore the message (next feedback will inform the compute) @@ -750,7 +755,7 @@ where // intersection of our history and history from msg // truncate wal, update the LSNs - self.wal_store.truncate_wal(msg.start_streaming_at)?; + self.wal_store.truncate_wal(msg.start_streaming_at).await?; // and now adopt term history from proposer { @@ -784,7 +789,7 @@ where self.inmem.backup_lsn = max(self.inmem.backup_lsn, state.timeline_start_lsn); state.acceptor_state.term_history = msg.term_history.clone(); - self.persist_control_file(state)?; + self.persist_control_file(state).await?; } info!("start receiving WAL since {:?}", msg.start_streaming_at); @@ -796,7 +801,7 @@ where /// /// Note: it is assumed that 'WAL we have is from the right term' check has /// already been done outside. - fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> { + async fn update_commit_lsn(&mut self, mut candidate: Lsn) -> Result<()> { // Both peers and walproposer communicate this value, we might already // have a fresher (higher) version. candidate = max(candidate, self.inmem.commit_lsn); @@ -818,29 +823,32 @@ where // that we receive new epoch_start_lsn, and we still need to sync // control file in this case. if commit_lsn == self.epoch_start_lsn && self.state.commit_lsn != commit_lsn { - self.persist_control_file(self.state.clone())?; + self.persist_control_file(self.state.clone()).await?; } Ok(()) } /// Persist control file to disk, called only after timeline creation (bootstrap). - pub fn persist(&mut self) -> Result<()> { - self.persist_control_file(self.state.clone()) + pub async fn persist(&mut self) -> Result<()> { + self.persist_control_file(self.state.clone()).await } /// Persist in-memory state to the disk, taking other data from state. - fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> { + async fn persist_control_file(&mut self, mut state: SafeKeeperState) -> Result<()> { state.commit_lsn = self.inmem.commit_lsn; state.backup_lsn = self.inmem.backup_lsn; state.peer_horizon_lsn = self.inmem.peer_horizon_lsn; state.proposer_uuid = self.inmem.proposer_uuid; - self.state.persist(&state) + self.state.persist(&state).await } /// Persist control file if there is something to save and enough time /// passed after the last save. - pub fn maybe_persist_control_file(&mut self, inmem_remote_consistent_lsn: Lsn) -> Result<()> { + pub async fn maybe_persist_control_file( + &mut self, + inmem_remote_consistent_lsn: Lsn, + ) -> Result<()> { const CF_SAVE_INTERVAL: Duration = Duration::from_secs(300); if self.state.last_persist_at().elapsed() < CF_SAVE_INTERVAL { return Ok(()); @@ -852,7 +860,7 @@ where if need_persist { let mut state = self.state.clone(); state.remote_consistent_lsn = inmem_remote_consistent_lsn; - self.persist_control_file(state)?; + self.persist_control_file(state).await?; trace!("saved control file: {CF_SAVE_INTERVAL:?} passed"); } Ok(()) @@ -860,7 +868,7 @@ where /// Handle request to append WAL. #[allow(clippy::comparison_chain)] - fn handle_append_request( + async fn handle_append_request( &mut self, msg: &AppendRequest, require_flush: bool, @@ -883,17 +891,19 @@ where // do the job if !msg.wal_data.is_empty() { - self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; + self.wal_store + .write_wal(msg.h.begin_lsn, &msg.wal_data) + .await?; } // flush wal to the disk, if required if require_flush { - self.wal_store.flush_wal()?; + self.wal_store.flush_wal().await?; } // Update commit_lsn. if msg.h.commit_lsn != Lsn(0) { - self.update_commit_lsn(msg.h.commit_lsn)?; + self.update_commit_lsn(msg.h.commit_lsn).await?; } // Value calculated by walproposer can always lag: // - safekeepers can forget inmem value and send to proposer lower @@ -909,7 +919,7 @@ where if self.state.peer_horizon_lsn + (self.state.server.wal_seg_size as u64) < self.inmem.peer_horizon_lsn { - self.persist_control_file(self.state.clone())?; + self.persist_control_file(self.state.clone()).await?; } trace!( @@ -931,15 +941,15 @@ where } /// Flush WAL to disk. Return AppendResponse with latest LSNs. - fn handle_flush(&mut self) -> Result> { - self.wal_store.flush_wal()?; + async fn handle_flush(&mut self) -> Result> { + self.wal_store.flush_wal().await?; Ok(Some(AcceptorProposerMessage::AppendResponse( self.append_response(), ))) } /// Update timeline state with peer safekeeper data. - pub fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> { + pub async fn record_safekeeper_info(&mut self, sk_info: &SafekeeperTimelineInfo) -> Result<()> { let mut sync_control_file = false; if (Lsn(sk_info.commit_lsn) != Lsn::INVALID) && (sk_info.last_log_term != INVALID_TERM) { @@ -947,7 +957,7 @@ where // commit_lsn if our history matches (is part of) history of advanced // commit_lsn provider. if sk_info.last_log_term == self.get_epoch() { - self.update_commit_lsn(Lsn(sk_info.commit_lsn))?; + self.update_commit_lsn(Lsn(sk_info.commit_lsn)).await?; } } @@ -973,7 +983,7 @@ where // Note: we could make remote_consistent_lsn update in cf common by // storing Arc to walsenders in Safekeeper. state.remote_consistent_lsn = new_remote_consistent_lsn; - self.persist_control_file(state)?; + self.persist_control_file(state).await?; } Ok(()) } @@ -997,6 +1007,7 @@ where #[cfg(test)] mod tests { + use futures::future::BoxFuture; use postgres_ffi::WAL_SEGMENT_SIZE; use super::*; @@ -1008,8 +1019,9 @@ mod tests { persisted_state: SafeKeeperState, } + #[async_trait::async_trait] impl control_file::Storage for InMemoryState { - fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { + async fn persist(&mut self, s: &SafeKeeperState) -> Result<()> { self.persisted_state = s.clone(); Ok(()) } @@ -1039,27 +1051,28 @@ mod tests { lsn: Lsn, } + #[async_trait::async_trait] impl wal_storage::Storage for DummyWalStore { fn flush_lsn(&self) -> Lsn { self.lsn } - fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { self.lsn = startpos + buf.len() as u64; Ok(()) } - fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { + async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { self.lsn = end_pos; Ok(()) } - fn flush_wal(&mut self) -> Result<()> { + async fn flush_wal(&mut self) -> Result<()> { Ok(()) } - fn remove_up_to(&self) -> Box Result<()>> { - Box::new(move |_segno_up_to: XLogSegNo| Ok(())) + fn remove_up_to(&self, _segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> { + Box::pin(async { Ok(()) }) } fn get_metrics(&self) -> crate::metrics::WalStorageMetrics { @@ -1067,8 +1080,8 @@ mod tests { } } - #[test] - fn test_voting() { + #[tokio::test] + async fn test_voting() { let storage = InMemoryState { persisted_state: test_sk_state(), }; @@ -1077,7 +1090,7 @@ mod tests { // check voting for 1 is ok let vote_request = ProposerAcceptorMessage::VoteRequest(VoteRequest { term: 1 }); - let mut vote_resp = sk.process_msg(&vote_request); + let mut vote_resp = sk.process_msg(&vote_request).await; match vote_resp.unwrap() { Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given != 0), r => panic!("unexpected response: {:?}", r), @@ -1092,15 +1105,15 @@ mod tests { sk = SafeKeeper::new(storage, sk.wal_store, NodeId(0)).unwrap(); // and ensure voting second time for 1 is not ok - vote_resp = sk.process_msg(&vote_request); + vote_resp = sk.process_msg(&vote_request).await; match vote_resp.unwrap() { Some(AcceptorProposerMessage::VoteResponse(resp)) => assert!(resp.vote_given == 0), r => panic!("unexpected response: {:?}", r), } } - #[test] - fn test_epoch_switch() { + #[tokio::test] + async fn test_epoch_switch() { let storage = InMemoryState { persisted_state: test_sk_state(), }; @@ -1132,10 +1145,13 @@ mod tests { timeline_start_lsn: Lsn(0), }; sk.process_msg(&ProposerAcceptorMessage::Elected(pem)) + .await .unwrap(); // check that AppendRequest before epochStartLsn doesn't switch epoch - let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); + let resp = sk + .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) + .await; assert!(resp.is_ok()); assert_eq!(sk.get_epoch(), 0); @@ -1146,9 +1162,11 @@ mod tests { h: ar_hdr, wal_data: Bytes::from_static(b"b"), }; - let resp = sk.process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)); + let resp = sk + .process_msg(&ProposerAcceptorMessage::AppendRequest(append_request)) + .await; assert!(resp.is_ok()); - sk.wal_store.truncate_wal(Lsn(3)).unwrap(); // imitate the complete record at 3 %) + sk.wal_store.truncate_wal(Lsn(3)).await.unwrap(); // imitate the complete record at 3 %) assert_eq!(sk.get_epoch(), 1); } } diff --git a/safekeeper/src/send_wal.rs b/safekeeper/src/send_wal.rs index fb420cba6489..abca0a86b1f8 100644 --- a/safekeeper/src/send_wal.rs +++ b/safekeeper/src/send_wal.rs @@ -396,7 +396,7 @@ impl SafekeeperPostgresHandler { // on this safekeeper itself. That's ok as (old) proposer will never be // able to commit such WAL. let stop_pos: Option = if self.is_walproposer_recovery() { - let wal_end = tli.get_flush_lsn(); + let wal_end = tli.get_flush_lsn().await; Some(wal_end) } else { None @@ -418,7 +418,7 @@ impl SafekeeperPostgresHandler { // switch to copy pgb.write_message(&BeMessage::CopyBothResponse).await?; - let (_, persisted_state) = tli.get_state(); + let (_, persisted_state) = tli.get_state().await; let wal_reader = WalReader::new( self.conf.workdir.clone(), self.conf.timeline_dir(&tli.ttid), @@ -562,7 +562,7 @@ impl WalSender<'_, IO> { .walsenders .get_ws_remote_consistent_lsn(self.ws_guard.id) { - if self.tli.should_walsender_stop(remote_consistent_lsn) { + if self.tli.should_walsender_stop(remote_consistent_lsn).await { // Terminate if there is nothing more to send. return Err(CopyStreamHandlerEnd::ServerInitiated(format!( "ending streaming to {:?} at {}, receiver is caughtup and there is no computes", diff --git a/safekeeper/src/timeline.rs b/safekeeper/src/timeline.rs index 941f8dae547e..52c3e8d4be50 100644 --- a/safekeeper/src/timeline.rs +++ b/safekeeper/src/timeline.rs @@ -2,12 +2,13 @@ //! to glue together SafeKeeper and all other background services. use anyhow::{anyhow, bail, Result}; -use parking_lot::{Mutex, MutexGuard}; use postgres_ffi::XLogSegNo; +use tokio::fs; use std::cmp::max; use std::path::PathBuf; use std::sync::Arc; +use tokio::sync::{Mutex, MutexGuard}; use tokio::{ sync::{mpsc::Sender, watch}, time::Instant, @@ -286,8 +287,9 @@ pub struct Timeline { commit_lsn_watch_tx: watch::Sender, commit_lsn_watch_rx: watch::Receiver, - /// Safekeeper and other state, that should remain consistent and synchronized - /// with the disk. + /// Safekeeper and other state, that should remain consistent and + /// synchronized with the disk. This is tokio mutex as we write WAL to disk + /// while holding it, ensuring that consensus checks are in order. mutex: Mutex, walsenders: Arc, @@ -361,8 +363,8 @@ impl Timeline { /// /// Bootstrap is transactional, so if it fails, created files will be deleted, /// and state on disk should remain unchanged. - pub fn bootstrap(&self, shared_state: &mut MutexGuard) -> Result<()> { - match std::fs::metadata(&self.timeline_dir) { + pub async fn bootstrap(&self, shared_state: &mut MutexGuard<'_, SharedState>) -> Result<()> { + match fs::metadata(&self.timeline_dir).await { Ok(_) => { // Timeline directory exists on disk, we should leave state unchanged // and return error. @@ -375,53 +377,51 @@ impl Timeline { } // Create timeline directory. - std::fs::create_dir_all(&self.timeline_dir)?; + fs::create_dir_all(&self.timeline_dir).await?; // Write timeline to disk and TODO: start background tasks. - match || -> Result<()> { - shared_state.sk.persist()?; - // TODO: add more initialization steps here - self.update_status(shared_state); - Ok(()) - }() { - Ok(_) => Ok(()), - Err(e) => { - // Bootstrap failed, cancel timeline and remove timeline directory. - self.cancel(shared_state); - - if let Err(fs_err) = std::fs::remove_dir_all(&self.timeline_dir) { - warn!( - "failed to remove timeline {} directory after bootstrap failure: {}", - self.ttid, fs_err - ); - } - - Err(e) + if let Err(e) = shared_state.sk.persist().await { + // Bootstrap failed, cancel timeline and remove timeline directory. + self.cancel(shared_state); + + if let Err(fs_err) = fs::remove_dir_all(&self.timeline_dir).await { + warn!( + "failed to remove timeline {} directory after bootstrap failure: {}", + self.ttid, fs_err + ); } + + return Err(e); } + + // TODO: add more initialization steps here + self.update_status(shared_state); + Ok(()) } /// Delete timeline from disk completely, by removing timeline directory. Background /// timeline activities will stop eventually. - pub fn delete_from_disk( + pub async fn delete_from_disk( &self, - shared_state: &mut MutexGuard, + shared_state: &mut MutexGuard<'_, SharedState>, ) -> Result<(bool, bool)> { let was_active = shared_state.active; self.cancel(shared_state); - let dir_existed = delete_dir(&self.timeline_dir)?; + let dir_existed = delete_dir(&self.timeline_dir).await?; Ok((dir_existed, was_active)) } /// Cancel timeline to prevent further usage. Background tasks will stop /// eventually after receiving cancellation signal. - fn cancel(&self, shared_state: &mut MutexGuard) { + /// + /// Note that we can't notify backup launcher here while holding + /// shared_state lock, as this is a potential deadlock: caller is + /// responsible for that. Generally we should probably make WAL backup tasks + /// to shut down on their own, checking once in a while whether it is the + /// time. + fn cancel(&self, shared_state: &mut MutexGuard<'_, SharedState>) { info!("timeline {} is cancelled", self.ttid); let _ = self.cancellation_tx.send(true); - let res = self.wal_backup_launcher_tx.blocking_send(self.ttid); - if let Err(e) = res { - error!("Failed to send stop signal to wal_backup_launcher: {}", e); - } // Close associated FDs. Nobody will be able to touch timeline data once // it is cancelled, so WAL storage won't be opened again. shared_state.sk.wal_store.close(); @@ -433,8 +433,8 @@ impl Timeline { } /// Take a writing mutual exclusive lock on timeline shared_state. - pub fn write_shared_state(&self) -> MutexGuard { - self.mutex.lock() + pub async fn write_shared_state(&self) -> MutexGuard { + self.mutex.lock().await } fn update_status(&self, shared_state: &mut SharedState) -> bool { @@ -450,7 +450,7 @@ impl Timeline { let is_wal_backup_action_pending: bool; { - let mut shared_state = self.write_shared_state(); + let mut shared_state = self.write_shared_state().await; shared_state.num_computes += 1; is_wal_backup_action_pending = self.update_status(&mut shared_state); } @@ -464,22 +464,17 @@ impl Timeline { /// De-register compute connection, shutting down timeline activity if /// pageserver doesn't need catchup. - pub fn on_compute_disconnect(&self) -> Result<()> { + pub async fn on_compute_disconnect(&self) -> Result<()> { let is_wal_backup_action_pending: bool; { - let mut shared_state = self.write_shared_state(); + let mut shared_state = self.write_shared_state().await; shared_state.num_computes -= 1; is_wal_backup_action_pending = self.update_status(&mut shared_state); } // Wake up wal backup launcher, if it is time to stop the offloading. if is_wal_backup_action_pending { // Can fail only if channel to a static thread got closed, which is not normal at all. - // - // Note: this is blocking_send because on_compute_disconnect is called in Drop, there is - // no async Drop and we use current thread runtimes. With current thread rt spawning - // task in drop impl is racy, as thread along with runtime might finish before the task. - // This should be switched send.await when/if we go to full async. - self.wal_backup_launcher_tx.blocking_send(self.ttid)?; + self.wal_backup_launcher_tx.send(self.ttid).await?; } Ok(()) } @@ -489,11 +484,11 @@ impl Timeline { /// computes. While there might be nothing to stream already, we learn about /// remote_consistent_lsn update through replication feedback, and we want /// to stop pushing to the broker if pageserver is fully caughtup. - pub fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool { + pub async fn should_walsender_stop(&self, reported_remote_consistent_lsn: Lsn) -> bool { if self.is_cancelled() { return true; } - let shared_state = self.write_shared_state(); + let shared_state = self.write_shared_state().await; if shared_state.num_computes == 0 { return shared_state.sk.inmem.commit_lsn == Lsn(0) || // no data at all yet reported_remote_consistent_lsn >= shared_state.sk.inmem.commit_lsn; @@ -503,12 +498,12 @@ impl Timeline { /// Returns whether s3 offloading is required and sets current status as /// matching it. - pub fn wal_backup_attend(&self) -> bool { + pub async fn wal_backup_attend(&self) -> bool { if self.is_cancelled() { return false; } - self.write_shared_state().wal_backup_attend() + self.write_shared_state().await.wal_backup_attend() } /// Returns commit_lsn watch channel. @@ -517,7 +512,7 @@ impl Timeline { } /// Pass arrived message to the safekeeper. - pub fn process_msg( + pub async fn process_msg( &self, msg: &ProposerAcceptorMessage, ) -> Result> { @@ -528,8 +523,8 @@ impl Timeline { let mut rmsg: Option; let commit_lsn: Lsn; { - let mut shared_state = self.write_shared_state(); - rmsg = shared_state.sk.process_msg(msg)?; + let mut shared_state = self.write_shared_state().await; + rmsg = shared_state.sk.process_msg(msg).await?; // if this is AppendResponse, fill in proper pageserver and hot // standby feedback. @@ -546,37 +541,37 @@ impl Timeline { } /// Returns wal_seg_size. - pub fn get_wal_seg_size(&self) -> usize { - self.write_shared_state().get_wal_seg_size() + pub async fn get_wal_seg_size(&self) -> usize { + self.write_shared_state().await.get_wal_seg_size() } /// Returns true only if the timeline is loaded and active. - pub fn is_active(&self) -> bool { + pub async fn is_active(&self) -> bool { if self.is_cancelled() { return false; } - self.write_shared_state().active + self.write_shared_state().await.active } /// Returns state of the timeline. - pub fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { - let state = self.write_shared_state(); + pub async fn get_state(&self) -> (SafekeeperMemState, SafeKeeperState) { + let state = self.write_shared_state().await; (state.sk.inmem.clone(), state.sk.state.clone()) } /// Returns latest backup_lsn. - pub fn get_wal_backup_lsn(&self) -> Lsn { - self.write_shared_state().sk.inmem.backup_lsn + pub async fn get_wal_backup_lsn(&self) -> Lsn { + self.write_shared_state().await.sk.inmem.backup_lsn } /// Sets backup_lsn to the given value. - pub fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> { + pub async fn set_wal_backup_lsn(&self, backup_lsn: Lsn) -> Result<()> { if self.is_cancelled() { bail!(TimelineError::Cancelled(self.ttid)); } - let mut state = self.write_shared_state(); + let mut state = self.write_shared_state().await; state.sk.inmem.backup_lsn = max(state.sk.inmem.backup_lsn, backup_lsn); // we should check whether to shut down offloader, but this will be done // soon by peer communication anyway. @@ -584,8 +579,8 @@ impl Timeline { } /// Get safekeeper info for broadcasting to broker and other peers. - pub fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo { - let shared_state = self.write_shared_state(); + pub async fn get_safekeeper_info(&self, conf: &SafeKeeperConf) -> SafekeeperTimelineInfo { + let shared_state = self.write_shared_state().await; shared_state.get_safekeeper_info( &self.ttid, conf, @@ -604,8 +599,8 @@ impl Timeline { let is_wal_backup_action_pending: bool; let commit_lsn: Lsn; { - let mut shared_state = self.write_shared_state(); - shared_state.sk.record_safekeeper_info(&sk_info)?; + let mut shared_state = self.write_shared_state().await; + shared_state.sk.record_safekeeper_info(&sk_info).await?; let peer_info = PeerInfo::from_sk_info(&sk_info, Instant::now()); shared_state.peers_info.upsert(&peer_info); is_wal_backup_action_pending = self.update_status(&mut shared_state); @@ -622,8 +617,8 @@ impl Timeline { /// Get our latest view of alive peers status on the timeline. /// We pass our own info through the broker as well, so when we don't have connection /// to the broker returned vec is empty. - pub fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { - let shared_state = self.write_shared_state(); + pub async fn get_peers(&self, conf: &SafeKeeperConf) -> Vec { + let shared_state = self.write_shared_state().await; let now = Instant::now(); shared_state .peers_info @@ -640,34 +635,34 @@ impl Timeline { } /// Returns flush_lsn. - pub fn get_flush_lsn(&self) -> Lsn { - self.write_shared_state().sk.wal_store.flush_lsn() + pub async fn get_flush_lsn(&self) -> Lsn { + self.write_shared_state().await.sk.wal_store.flush_lsn() } /// Delete WAL segments from disk that are no longer needed. This is determined /// based on pageserver's remote_consistent_lsn and local backup_lsn/peer_lsn. - pub fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> { + pub async fn remove_old_wal(&self, wal_backup_enabled: bool) -> Result<()> { if self.is_cancelled() { bail!(TimelineError::Cancelled(self.ttid)); } let horizon_segno: XLogSegNo; - let remover: Box Result<(), anyhow::Error>>; - { - let shared_state = self.write_shared_state(); + let remover = { + let shared_state = self.write_shared_state().await; horizon_segno = shared_state.sk.get_horizon_segno(wal_backup_enabled); - remover = shared_state.sk.wal_store.remove_up_to(); if horizon_segno <= 1 || horizon_segno <= shared_state.last_removed_segno { - return Ok(()); + return Ok(()); // nothing to do } + let remover = shared_state.sk.wal_store.remove_up_to(horizon_segno - 1); // release the lock before removing - } + remover + }; // delete old WAL files - remover(horizon_segno - 1)?; + remover.await?; // update last_removed_segno - let mut shared_state = self.write_shared_state(); + let mut shared_state = self.write_shared_state().await; shared_state.last_removed_segno = horizon_segno; Ok(()) } @@ -676,22 +671,24 @@ impl Timeline { /// passed after the last save. This helps to keep remote_consistent_lsn up /// to date so that storage nodes restart doesn't cause many pageserver -> /// safekeeper reconnections. - pub fn maybe_pesist_control_file(&self) -> Result<()> { + pub async fn maybe_persist_control_file(&self) -> Result<()> { let remote_consistent_lsn = self.walsenders.get_remote_consistent_lsn(); self.write_shared_state() + .await .sk .maybe_persist_control_file(remote_consistent_lsn) + .await } - /// Returns full timeline info, required for the metrics. If the timeline is - /// not active, returns None instead. - pub fn info_for_metrics(&self) -> Option { + /// Gather timeline data for metrics. If the timeline is not active, returns + /// None, we do not collect these. + pub async fn info_for_metrics(&self) -> Option { if self.is_cancelled() { return None; } let ps_feedback = self.walsenders.get_ps_feedback(); - let state = self.write_shared_state(); + let state = self.write_shared_state().await; if state.active { Some(FullTimelineInfo { ttid: self.ttid, @@ -713,8 +710,8 @@ impl Timeline { } /// Returns in-memory timeline state to build a full debug dump. - pub fn memory_dump(&self) -> debug_dump::Memory { - let state = self.write_shared_state(); + pub async fn memory_dump(&self) -> debug_dump::Memory { + let state = self.write_shared_state().await; let (write_lsn, write_record_lsn, flush_lsn, file_open) = state.sk.wal_store.internal_state(); @@ -738,8 +735,8 @@ impl Timeline { } /// Deletes directory and it's contents. Returns false if directory does not exist. -fn delete_dir(path: &PathBuf) -> Result { - match std::fs::remove_dir_all(path) { +async fn delete_dir(path: &PathBuf) -> Result { + match fs::remove_dir_all(path).await { Ok(_) => Ok(true), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(false), Err(e) => Err(e.into()), diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index 41809794dc55..f2d5df8744c1 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -113,9 +113,17 @@ impl GlobalTimelines { Ok(()) } - /// Loads all timelines for the given tenant to memory. Returns fs::read_dir errors if any. + /// Loads all timelines for the given tenant to memory. Returns fs::read_dir + /// errors if any. + /// + /// Note: This function (and all reading/loading below) is sync because + /// timelines are loaded while holding GlobalTimelinesState lock. Which is + /// fine as this is called only from single threaded main runtime on boot, + /// but clippy complains anyway, and suppressing that isn't trivial as async + /// is the keyword, ha. That only other user is pull_timeline.rs for which + /// being blocked is not that bad, and we can do spawn_blocking. fn load_tenant_timelines( - state: &mut MutexGuard, + state: &mut MutexGuard<'_, GlobalTimelinesState>, tenant_id: TenantId, ) -> Result<()> { let timelines_dir = state.get_conf().tenant_dir(&tenant_id); @@ -220,7 +228,7 @@ impl GlobalTimelines { // Take a lock and finish the initialization holding this mutex. No other threads // can interfere with creation after we will insert timeline into the map. { - let mut shared_state = timeline.write_shared_state(); + let mut shared_state = timeline.write_shared_state().await; // We can get a race condition here in case of concurrent create calls, but only // in theory. create() will return valid timeline on the next try. @@ -232,7 +240,7 @@ impl GlobalTimelines { // Write the new timeline to the disk and start background workers. // Bootstrap is transactional, so if it fails, the timeline will be deleted, // and the state on disk should remain unchanged. - if let Err(e) = timeline.bootstrap(&mut shared_state) { + if let Err(e) = timeline.bootstrap(&mut shared_state).await { // Note: the most likely reason for bootstrap failure is that the timeline // directory already exists on disk. This happens when timeline is corrupted // and wasn't loaded from disk on startup because of that. We want to preserve @@ -294,15 +302,16 @@ impl GlobalTimelines { } /// Cancels timeline, then deletes the corresponding data directory. - pub fn delete_force(ttid: &TenantTimelineId) -> Result { + pub async fn delete_force(ttid: &TenantTimelineId) -> Result { let tli_res = TIMELINES_STATE.lock().unwrap().get(ttid); match tli_res { Ok(timeline) => { // Take a lock and finish the deletion holding this mutex. - let mut shared_state = timeline.write_shared_state(); + let mut shared_state = timeline.write_shared_state().await; info!("deleting timeline {}", ttid); - let (dir_existed, was_active) = timeline.delete_from_disk(&mut shared_state)?; + let (dir_existed, was_active) = + timeline.delete_from_disk(&mut shared_state).await?; // Remove timeline from the map. // FIXME: re-enable it once we fix the issue with recreation of deleted timelines @@ -335,7 +344,7 @@ impl GlobalTimelines { /// the tenant had, `true` if a timeline was active. There may be a race if new timelines are /// created simultaneously. In that case the function will return error and the caller should /// retry tenant deletion again later. - pub fn delete_force_all_for_tenant( + pub async fn delete_force_all_for_tenant( tenant_id: &TenantId, ) -> Result> { info!("deleting all timelines for tenant {}", tenant_id); @@ -345,7 +354,7 @@ impl GlobalTimelines { let mut deleted = HashMap::new(); for tli in &to_delete { - match Self::delete_force(&tli.ttid) { + match Self::delete_force(&tli.ttid).await { Ok(result) => { deleted.insert(tli.ttid, result); } diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 4d341a7ef831..eae3f3fe86a9 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -17,7 +17,6 @@ use postgres_ffi::XLogFileName; use postgres_ffi::{XLogSegNo, PG_TLI}; use remote_storage::{GenericRemoteStorage, RemotePath}; use tokio::fs::File; -use tokio::runtime::Builder; use tokio::select; use tokio::sync::mpsc::{self, Receiver, Sender}; @@ -36,30 +35,16 @@ use once_cell::sync::OnceCell; const UPLOAD_FAILURE_RETRY_MIN_MS: u64 = 10; const UPLOAD_FAILURE_RETRY_MAX_MS: u64 = 5000; -pub fn wal_backup_launcher_thread_main( - conf: SafeKeeperConf, - wal_backup_launcher_rx: Receiver, -) { - let mut builder = Builder::new_multi_thread(); - if let Some(num_threads) = conf.backup_runtime_threads { - builder.worker_threads(num_threads); - } - let rt = builder - .enable_all() - .build() - .expect("failed to create wal backup runtime"); - - rt.block_on(async { - wal_backup_launcher_main_loop(conf, wal_backup_launcher_rx).await; - }); -} - /// Check whether wal backup is required for timeline. If yes, mark that launcher is /// aware of current status and return the timeline. -fn is_wal_backup_required(ttid: TenantTimelineId) -> Option> { - GlobalTimelines::get(ttid) - .ok() - .filter(|tli| tli.wal_backup_attend()) +async fn is_wal_backup_required(ttid: TenantTimelineId) -> Option> { + match GlobalTimelines::get(ttid).ok() { + Some(tli) => { + tli.wal_backup_attend().await; + Some(tli) + } + None => None, + } } struct WalBackupTaskHandle { @@ -143,8 +128,8 @@ async fn update_task( ttid: TenantTimelineId, entry: &mut WalBackupTimelineEntry, ) { - let alive_peers = entry.timeline.get_peers(conf); - let wal_backup_lsn = entry.timeline.get_wal_backup_lsn(); + let alive_peers = entry.timeline.get_peers(conf).await; + let wal_backup_lsn = entry.timeline.get_wal_backup_lsn().await; let (offloader, election_dbg_str) = determine_offloader(&alive_peers, wal_backup_lsn, ttid, conf); let elected_me = Some(conf.my_id) == offloader; @@ -183,10 +168,10 @@ const CHECK_TASKS_INTERVAL_MSEC: u64 = 1000; /// Sits on wal_backup_launcher_rx and starts/stops per timeline wal backup /// tasks. Having this in separate task simplifies locking, allows to reap /// panics and separate elections from offloading itself. -async fn wal_backup_launcher_main_loop( +pub async fn wal_backup_launcher_task_main( conf: SafeKeeperConf, mut wal_backup_launcher_rx: Receiver, -) { +) -> anyhow::Result<()> { info!( "WAL backup launcher started, remote config {:?}", conf.remote_storage @@ -214,7 +199,7 @@ async fn wal_backup_launcher_main_loop( if conf.remote_storage.is_none() || !conf.wal_backup_enabled { continue; /* just drain the channel and do nothing */ } - let timeline = is_wal_backup_required(ttid); + let timeline = is_wal_backup_required(ttid).await; // do we need to do anything at all? if timeline.is_some() != tasks.contains_key(&ttid) { if let Some(timeline) = timeline { @@ -269,7 +254,7 @@ async fn backup_task_main( let tli = res.unwrap(); let mut wb = WalBackupTask { - wal_seg_size: tli.get_wal_seg_size(), + wal_seg_size: tli.get_wal_seg_size().await, commit_lsn_watch_rx: tli.get_commit_lsn_watch_rx(), timeline: tli, timeline_dir, @@ -326,7 +311,7 @@ impl WalBackupTask { continue; /* nothing to do, common case as we wake up on every commit_lsn bump */ } // Perhaps peers advanced the position, check shmem value. - backup_lsn = self.timeline.get_wal_backup_lsn(); + backup_lsn = self.timeline.get_wal_backup_lsn().await; if backup_lsn.segment_number(self.wal_seg_size) >= commit_lsn.segment_number(self.wal_seg_size) { @@ -402,6 +387,7 @@ pub async fn backup_lsn_range( let new_backup_lsn = segment.end_lsn; timeline .set_wal_backup_lsn(new_backup_lsn) + .await .context("setting wal_backup_lsn")?; *backup_lsn = new_backup_lsn; } else { diff --git a/safekeeper/src/wal_service.rs b/safekeeper/src/wal_service.rs index fb0d77a9f22a..406132b2b045 100644 --- a/safekeeper/src/wal_service.rs +++ b/safekeeper/src/wal_service.rs @@ -4,7 +4,7 @@ //! use anyhow::{Context, Result}; use postgres_backend::QueryError; -use std::{future, thread, time::Duration}; +use std::{future, time::Duration}; use tokio::net::TcpStream; use tokio_io_timeout::TimeoutReader; use tracing::*; @@ -16,104 +16,82 @@ use crate::SafeKeeperConf; use postgres_backend::{AuthType, PostgresBackend}; /// Accept incoming TCP connections and spawn them into a background thread. -pub fn thread_main(conf: SafeKeeperConf, pg_listener: std::net::TcpListener) { - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .context("create runtime") - // todo catch error in main thread - .expect("failed to create runtime"); +pub async fn task_main( + conf: SafeKeeperConf, + pg_listener: std::net::TcpListener, +) -> anyhow::Result<()> { + // Tokio's from_std won't do this for us, per its comment. + pg_listener.set_nonblocking(true)?; - runtime - .block_on(async move { - // Tokio's from_std won't do this for us, per its comment. - pg_listener.set_nonblocking(true)?; - let listener = tokio::net::TcpListener::from_std(pg_listener)?; - let mut connection_count: ConnectionCount = 0; + let listener = tokio::net::TcpListener::from_std(pg_listener)?; + let mut connection_count: ConnectionCount = 0; - loop { - match listener.accept().await { - Ok((socket, peer_addr)) => { - debug!("accepted connection from {}", peer_addr); - let conf = conf.clone(); - let conn_id = issue_connection_id(&mut connection_count); + loop { + let (socket, peer_addr) = listener.accept().await.context("accept")?; + debug!("accepted connection from {}", peer_addr); + let conf = conf.clone(); + let conn_id = issue_connection_id(&mut connection_count); - let _ = thread::Builder::new() - .name("WAL service thread".into()) - .spawn(move || { - if let Err(err) = handle_socket(socket, conf, conn_id) { - error!("connection handler exited: {}", err); - } - }) - .unwrap(); - } - Err(e) => error!("Failed to accept connection: {}", e), - } + tokio::spawn(async move { + if let Err(err) = handle_socket(socket, conf, conn_id) + .instrument(info_span!("", cid = %conn_id)) + .await + { + error!("connection handler exited: {}", err); } - #[allow(unreachable_code)] // hint compiler the closure return type - Ok::<(), anyhow::Error>(()) - }) - .expect("listener failed") + }); + } } -/// This is run by `thread_main` above, inside a background thread. +/// This is run by `task_main` above, inside a background thread. /// -fn handle_socket( +async fn handle_socket( socket: TcpStream, conf: SafeKeeperConf, conn_id: ConnectionId, ) -> Result<(), QueryError> { - let _enter = info_span!("", cid = %conn_id).entered(); - - let runtime = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build()?; - socket.set_nodelay(true)?; let peer_addr = socket.peer_addr()?; - // TimeoutReader wants async runtime during creation. - runtime.block_on(async move { - // Set timeout on reading from the socket. It prevents hanged up connection - // if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by - // default, and tokio doesn't provide ability to set it out of the box. - let mut socket = TimeoutReader::new(socket); - let wal_service_timeout = Duration::from_secs(60 * 10); - socket.set_timeout(Some(wal_service_timeout)); - // pin! is here because TimeoutReader (due to storing sleep future inside) - // is not Unpin, and all pgbackend/framed/tokio dependencies require stream - // to be Unpin. Which is reasonable, as indeed something like TimeoutReader - // shouldn't be moved. - tokio::pin!(socket); + // Set timeout on reading from the socket. It prevents hanged up connection + // if client suddenly disappears. Note that TCP_KEEPALIVE is not enabled by + // default, and tokio doesn't provide ability to set it out of the box. + let mut socket = TimeoutReader::new(socket); + let wal_service_timeout = Duration::from_secs(60 * 10); + socket.set_timeout(Some(wal_service_timeout)); + // pin! is here because TimeoutReader (due to storing sleep future inside) + // is not Unpin, and all pgbackend/framed/tokio dependencies require stream + // to be Unpin. Which is reasonable, as indeed something like TimeoutReader + // shouldn't be moved. + tokio::pin!(socket); - let traffic_metrics = TrafficMetrics::new(); - if let Some(current_az) = conf.availability_zone.as_deref() { - traffic_metrics.set_sk_az(current_az); - } + let traffic_metrics = TrafficMetrics::new(); + if let Some(current_az) = conf.availability_zone.as_deref() { + traffic_metrics.set_sk_az(current_az); + } - let socket = MeasuredStream::new( - socket, - |cnt| { - traffic_metrics.observe_read(cnt); - }, - |cnt| { - traffic_metrics.observe_write(cnt); - }, - ); + let socket = MeasuredStream::new( + socket, + |cnt| { + traffic_metrics.observe_read(cnt); + }, + |cnt| { + traffic_metrics.observe_write(cnt); + }, + ); - let auth_type = match conf.auth { - None => AuthType::Trust, - Some(_) => AuthType::NeonJWT, - }; - let mut conn_handler = - SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone())); - let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; - // libpq protocol between safekeeper and walproposer / pageserver - // We don't use shutdown. - pgbackend - .run(&mut conn_handler, future::pending::<()>) - .await - }) + let auth_type = match conf.auth { + None => AuthType::Trust, + Some(_) => AuthType::NeonJWT, + }; + let mut conn_handler = + SafekeeperPostgresHandler::new(conf, conn_id, Some(traffic_metrics.clone())); + let pgbackend = PostgresBackend::new_from_io(socket, peer_addr, auth_type, None)?; + // libpq protocol between safekeeper and walproposer / pageserver + // We don't use shutdown. + pgbackend + .run(&mut conn_handler, future::pending::<()>) + .await } /// Unique WAL service connection ids are logged in spans for observability. diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 644c956fc1c9..e97b212093e4 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -8,54 +8,47 @@ //! Note that last file has `.partial` suffix, that's different from postgres. use anyhow::{bail, Context, Result}; -use remote_storage::RemotePath; - -use std::io::{self, Seek, SeekFrom}; -use std::pin::Pin; -use tokio::io::AsyncRead; - +use bytes::Bytes; +use futures::future::BoxFuture; use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName}; use postgres_ffi::{XLogSegNo, PG_TLI}; +use remote_storage::RemotePath; use std::cmp::{max, min}; - -use bytes::Bytes; -use std::fs::{self, remove_file, File, OpenOptions}; -use std::io::Write; +use std::io::{self, SeekFrom}; use std::path::{Path, PathBuf}; - +use std::pin::Pin; +use tokio::fs::{self, remove_file, File, OpenOptions}; +use tokio::io::{AsyncRead, AsyncWriteExt}; +use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tracing::*; -use utils::{id::TenantTimelineId, lsn::Lsn}; - use crate::metrics::{time_io_closure, WalStorageMetrics, REMOVED_WAL_SEGMENTS}; use crate::safekeeper::SafeKeeperState; - use crate::wal_backup::read_object; use crate::SafeKeeperConf; +use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::XLogFileName; use postgres_ffi::XLOG_BLCKSZ; - -use postgres_ffi::waldecoder::WalStreamDecoder; - use pq_proto::SystemId; -use tokio::io::{AsyncReadExt, AsyncSeekExt}; +use utils::{id::TenantTimelineId, lsn::Lsn}; +#[async_trait::async_trait] pub trait Storage { /// LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn; /// Write piece of WAL from buf to disk, but not necessarily sync it. - fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>; + async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()>; /// Truncate WAL at specified LSN, which must be the end of WAL record. - fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>; + async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()>; /// Durably store WAL on disk, up to the last written WAL record. - fn flush_wal(&mut self) -> Result<()>; + async fn flush_wal(&mut self) -> Result<()>; - /// Remove all segments <= given segno. Returns closure as we want to do - /// that without timeline lock. - fn remove_up_to(&self) -> Box Result<()>>; + /// Remove all segments <= given segno. Returns function doing that as we + /// want to perform it without timeline lock. + fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>>; /// Release resources associated with the storage -- technically, close FDs. /// Currently we don't remove timelines until restart (#3146), so need to @@ -178,33 +171,37 @@ impl PhysicalStorage { } /// Call fdatasync if config requires so. - fn fdatasync_file(&mut self, file: &mut File) -> Result<()> { + async fn fdatasync_file(&mut self, file: &mut File) -> Result<()> { if !self.conf.no_sync { self.metrics - .observe_flush_seconds(time_io_closure(|| Ok(file.sync_data()?))?); + .observe_flush_seconds(time_io_closure(file.sync_data()).await?); } Ok(()) } /// Call fsync if config requires so. - fn fsync_file(&mut self, file: &mut File) -> Result<()> { + async fn fsync_file(&mut self, file: &mut File) -> Result<()> { if !self.conf.no_sync { self.metrics - .observe_flush_seconds(time_io_closure(|| Ok(file.sync_all()?))?); + .observe_flush_seconds(time_io_closure(file.sync_all()).await?); } Ok(()) } /// Open or create WAL segment file. Caller must call seek to the wanted position. /// Returns `file` and `is_partial`. - fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> { + async fn open_or_create(&mut self, segno: XLogSegNo) -> Result<(File, bool)> { let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; // Try to open already completed segment - if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path) { + if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_path).await { Ok((file, false)) - } else if let Ok(file) = OpenOptions::new().write(true).open(&wal_file_partial_path) { + } else if let Ok(file) = OpenOptions::new() + .write(true) + .open(&wal_file_partial_path) + .await + { // Try to open existing partial file Ok((file, true)) } else { @@ -213,35 +210,36 @@ impl PhysicalStorage { .create(true) .write(true) .open(&wal_file_partial_path) + .await .with_context(|| format!("Failed to open log file {:?}", &wal_file_path))?; - write_zeroes(&mut file, self.wal_seg_size)?; - self.fsync_file(&mut file)?; + write_zeroes(&mut file, self.wal_seg_size).await?; + self.fsync_file(&mut file).await?; Ok((file, true)) } } /// Write WAL bytes, which are known to be located in a single WAL segment. - fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { + async fn write_in_segment(&mut self, segno: u64, xlogoff: usize, buf: &[u8]) -> Result<()> { let mut file = if let Some(file) = self.file.take() { file } else { - let (mut file, is_partial) = self.open_or_create(segno)?; + let (mut file, is_partial) = self.open_or_create(segno).await?; assert!(is_partial, "unexpected write into non-partial segment file"); - file.seek(SeekFrom::Start(xlogoff as u64))?; + file.seek(SeekFrom::Start(xlogoff as u64)).await?; file }; - file.write_all(buf)?; + file.write_all(buf).await?; if xlogoff + buf.len() == self.wal_seg_size { // If we reached the end of a WAL segment, flush and close it. - self.fdatasync_file(&mut file)?; + self.fdatasync_file(&mut file).await?; // Rename partial file to completed file let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; - fs::rename(wal_file_partial_path, wal_file_path)?; + fs::rename(wal_file_partial_path, wal_file_path).await?; } else { // otherwise, file can be reused later self.file = Some(file); @@ -255,11 +253,11 @@ impl PhysicalStorage { /// be flushed separately later. /// /// Updates `write_lsn`. - fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { + async fn write_exact(&mut self, pos: Lsn, mut buf: &[u8]) -> Result<()> { if self.write_lsn != pos { // need to flush the file before discarding it if let Some(mut file) = self.file.take() { - self.fdatasync_file(&mut file)?; + self.fdatasync_file(&mut file).await?; } self.write_lsn = pos; @@ -277,7 +275,8 @@ impl PhysicalStorage { buf.len() }; - self.write_in_segment(segno, xlogoff, &buf[..bytes_write])?; + self.write_in_segment(segno, xlogoff, &buf[..bytes_write]) + .await?; self.write_lsn += bytes_write as u64; buf = &buf[bytes_write..]; } @@ -286,6 +285,7 @@ impl PhysicalStorage { } } +#[async_trait::async_trait] impl Storage for PhysicalStorage { /// flush_lsn returns LSN of last durably stored WAL record. fn flush_lsn(&self) -> Lsn { @@ -293,7 +293,7 @@ impl Storage for PhysicalStorage { } /// Write WAL to disk. - fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { + async fn write_wal(&mut self, startpos: Lsn, buf: &[u8]) -> Result<()> { // Disallow any non-sequential writes, which can result in gaps or overwrites. // If we need to move the pointer, use truncate_wal() instead. if self.write_lsn > startpos { @@ -311,7 +311,7 @@ impl Storage for PhysicalStorage { ); } - let write_seconds = time_io_closure(|| self.write_exact(startpos, buf))?; + let write_seconds = time_io_closure(self.write_exact(startpos, buf)).await?; // WAL is written, updating write metrics self.metrics.observe_write_seconds(write_seconds); self.metrics.observe_write_bytes(buf.len()); @@ -340,14 +340,14 @@ impl Storage for PhysicalStorage { Ok(()) } - fn flush_wal(&mut self) -> Result<()> { + async fn flush_wal(&mut self) -> Result<()> { if self.flush_record_lsn == self.write_record_lsn { // no need to do extra flush return Ok(()); } if let Some(mut unflushed_file) = self.file.take() { - self.fdatasync_file(&mut unflushed_file)?; + self.fdatasync_file(&mut unflushed_file).await?; self.file = Some(unflushed_file); } else { // We have unflushed data (write_lsn != flush_lsn), but no file. @@ -369,7 +369,7 @@ impl Storage for PhysicalStorage { /// Truncate written WAL by removing all WAL segments after the given LSN. /// end_pos must point to the end of the WAL record. - fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { + async fn truncate_wal(&mut self, end_pos: Lsn) -> Result<()> { // Streaming must not create a hole, so truncate cannot be called on non-written lsn if self.write_lsn != Lsn(0) && end_pos > self.write_lsn { bail!( @@ -387,27 +387,27 @@ impl Storage for PhysicalStorage { // Close previously opened file, if any if let Some(mut unflushed_file) = self.file.take() { - self.fdatasync_file(&mut unflushed_file)?; + self.fdatasync_file(&mut unflushed_file).await?; } let xlogoff = end_pos.segment_offset(self.wal_seg_size); let segno = end_pos.segment_number(self.wal_seg_size); // Remove all segments after the given LSN. - remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno)?; + remove_segments_from_disk(&self.timeline_dir, self.wal_seg_size, |x| x > segno).await?; - let (mut file, is_partial) = self.open_or_create(segno)?; + let (mut file, is_partial) = self.open_or_create(segno).await?; // Fill end with zeroes - file.seek(SeekFrom::Start(xlogoff as u64))?; - write_zeroes(&mut file, self.wal_seg_size - xlogoff)?; - self.fdatasync_file(&mut file)?; + file.seek(SeekFrom::Start(xlogoff as u64)).await?; + write_zeroes(&mut file, self.wal_seg_size - xlogoff).await?; + self.fdatasync_file(&mut file).await?; if !is_partial { // Make segment partial once again let (wal_file_path, wal_file_partial_path) = wal_file_paths(&self.timeline_dir, segno, self.wal_seg_size)?; - fs::rename(wal_file_path, wal_file_partial_path)?; + fs::rename(wal_file_path, wal_file_partial_path).await?; } // Update LSNs @@ -417,11 +417,11 @@ impl Storage for PhysicalStorage { Ok(()) } - fn remove_up_to(&self) -> Box Result<()>> { + fn remove_up_to(&self, segno_up_to: XLogSegNo) -> BoxFuture<'static, anyhow::Result<()>> { let timeline_dir = self.timeline_dir.clone(); let wal_seg_size = self.wal_seg_size; - Box::new(move |segno_up_to: XLogSegNo| { - remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to) + Box::pin(async move { + remove_segments_from_disk(&timeline_dir, wal_seg_size, |x| x <= segno_up_to).await }) } @@ -436,7 +436,7 @@ impl Storage for PhysicalStorage { } /// Remove all WAL segments in timeline_dir that match the given predicate. -fn remove_segments_from_disk( +async fn remove_segments_from_disk( timeline_dir: &Path, wal_seg_size: usize, remove_predicate: impl Fn(XLogSegNo) -> bool, @@ -445,8 +445,8 @@ fn remove_segments_from_disk( let mut min_removed = u64::MAX; let mut max_removed = u64::MIN; - for entry in fs::read_dir(timeline_dir)? { - let entry = entry?; + let mut entries = fs::read_dir(timeline_dir).await?; + while let Some(entry) = entries.next_entry().await? { let entry_path = entry.path(); let fname = entry_path.file_name().unwrap(); @@ -457,7 +457,7 @@ fn remove_segments_from_disk( } let (segno, _) = XLogFromFileName(fname_str, wal_seg_size); if remove_predicate(segno) { - remove_file(entry_path)?; + remove_file(entry_path).await?; n_removed += 1; min_removed = min(min_removed, segno); max_removed = max(max_removed, segno); @@ -689,12 +689,12 @@ impl WalReader { const ZERO_BLOCK: &[u8] = &[0u8; XLOG_BLCKSZ]; /// Helper for filling file with zeroes. -fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { +async fn write_zeroes(file: &mut File, mut count: usize) -> Result<()> { while count >= XLOG_BLCKSZ { - file.write_all(ZERO_BLOCK)?; + file.write_all(ZERO_BLOCK).await?; count -= XLOG_BLCKSZ; } - file.write_all(&ZERO_BLOCK[0..count])?; + file.write_all(&ZERO_BLOCK[0..count]).await?; Ok(()) }