diff --git a/libs/pageserver_api/src/models.rs b/libs/pageserver_api/src/models.rs index 3aa84f89033b..0d0702e38e0e 100644 --- a/libs/pageserver_api/src/models.rs +++ b/libs/pageserver_api/src/models.rs @@ -4,6 +4,7 @@ pub mod utilization; pub use utilization::PageserverUtilization; use std::{ + borrow::Cow, collections::HashMap, io::{BufRead, Read}, num::{NonZeroU64, NonZeroUsize}, @@ -577,7 +578,7 @@ pub struct TimelineInfo { pub walreceiver_status: String, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct LayerMapInfo { pub in_memory_layers: Vec, pub historic_layers: Vec, @@ -595,7 +596,7 @@ pub enum LayerAccessKind { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct LayerAccessStatFullDetails { pub when_millis_since_epoch: u64, - pub task_kind: &'static str, + pub task_kind: Cow<'static, str>, pub access_kind: LayerAccessKind, } @@ -654,23 +655,23 @@ impl LayerResidenceEvent { } } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct LayerAccessStats { pub access_count_by_access_kind: HashMap, - pub task_kind_access_flag: Vec<&'static str>, + pub task_kind_access_flag: Vec>, pub first: Option, pub accesses_history: HistoryBufferWithDropCounter, pub residence_events_history: HistoryBufferWithDropCounter, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind")] pub enum InMemoryLayerInfo { Open { lsn_start: Lsn }, Frozen { lsn_start: Lsn, lsn_end: Lsn }, } -#[derive(Debug, Clone, Serialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind")] pub enum HistoricLayerInfo { Delta { @@ -692,6 +693,32 @@ pub enum HistoricLayerInfo { }, } +impl HistoricLayerInfo { + pub fn layer_file_name(&self) -> &str { + match self { + HistoricLayerInfo::Delta { + layer_file_name, .. + } => layer_file_name, + HistoricLayerInfo::Image { + layer_file_name, .. + } => layer_file_name, + } + } + pub fn is_remote(&self) -> bool { + match self { + HistoricLayerInfo::Delta { remote, .. } => *remote, + HistoricLayerInfo::Image { remote, .. } => *remote, + } + } + pub fn set_remote(&mut self, value: bool) { + let field = match self { + HistoricLayerInfo::Delta { remote, .. } => remote, + HistoricLayerInfo::Image { remote, .. } => remote, + }; + *field = value; + } +} + #[derive(Debug, Serialize, Deserialize)] pub struct DownloadRemoteLayersTaskSpawnRequest { pub max_concurrent_downloads: NonZeroUsize, diff --git a/libs/utils/src/history_buffer.rs b/libs/utils/src/history_buffer.rs index 1f07f5560fb3..bd35e2bad6b5 100644 --- a/libs/utils/src/history_buffer.rs +++ b/libs/utils/src/history_buffer.rs @@ -47,9 +47,10 @@ impl ops::Deref for HistoryBufferWithDropCounter { } } -#[derive(serde::Serialize)] +#[derive(serde::Serialize, serde::Deserialize)] struct SerdeRepr { buffer: Vec, + buffer_size: usize, drop_count: u64, } @@ -61,6 +62,7 @@ where let HistoryBufferWithDropCounter { buffer, drop_count } = value; SerdeRepr { buffer: buffer.iter().cloned().collect(), + buffer_size: L, drop_count: *drop_count, } } @@ -78,19 +80,52 @@ where } } +impl<'de, T, const L: usize> serde::de::Deserialize<'de> for HistoryBufferWithDropCounter +where + T: Clone + serde::Deserialize<'de>, +{ + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let SerdeRepr { + buffer: des_buffer, + drop_count, + buffer_size, + } = SerdeRepr::::deserialize(deserializer)?; + if buffer_size != L { + use serde::de::Error; + return Err(D::Error::custom(format!( + "invalid buffer_size, expecting {L} got {buffer_size}" + ))); + } + let mut buffer = HistoryBuffer::new(); + buffer.extend(des_buffer); + Ok(HistoryBufferWithDropCounter { buffer, drop_count }) + } +} + #[cfg(test)] mod test { use super::HistoryBufferWithDropCounter; #[test] fn test_basics() { - let mut b = HistoryBufferWithDropCounter::<_, 2>::default(); + let mut b = HistoryBufferWithDropCounter::::default(); b.write(1); b.write(2); b.write(3); assert!(b.iter().any(|e| *e == 2)); assert!(b.iter().any(|e| *e == 3)); assert!(!b.iter().any(|e| *e == 1)); + + // round-trip serde + let round_tripped: HistoryBufferWithDropCounter = + serde_json::from_str(&serde_json::to_string(&b).unwrap()).unwrap(); + assert_eq!( + round_tripped.iter().cloned().collect::>(), + b.iter().cloned().collect::>() + ); } #[test] diff --git a/pageserver/client/src/mgmt_api.rs b/pageserver/client/src/mgmt_api.rs index ed9f63325340..1a8f7e0524b5 100644 --- a/pageserver/client/src/mgmt_api.rs +++ b/pageserver/client/src/mgmt_api.rs @@ -169,7 +169,7 @@ impl Client { self.request(Method::GET, uri, ()).await } - async fn request( + async fn request_noerror( &self, method: Method, uri: U, @@ -181,7 +181,16 @@ impl Client { } else { req }; - let res = req.json(&body).send().await.map_err(Error::ReceiveBody)?; + req.json(&body).send().await.map_err(Error::ReceiveBody) + } + + async fn request( + &self, + method: Method, + uri: U, + body: B, + ) -> Result { + let res = self.request_noerror(method, uri, body).await?; let response = res.error_from_body().await?; Ok(response) } @@ -425,4 +434,68 @@ impl Client { .await .map_err(Error::ReceiveBody) } + + pub async fn layer_map_info( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{}/timeline/{}/layer", + self.mgmt_api_endpoint, tenant_shard_id, timeline_id, + ); + self.get(&uri) + .await? + .json() + .await + .map_err(Error::ReceiveBody) + } + + pub async fn layer_evict( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + layer_file_name: &str, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{}/timeline/{}/layer/{}", + self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name + ); + let resp = self.request_noerror(Method::DELETE, &uri, ()).await?; + match resp.status() { + StatusCode::OK => Ok(true), + StatusCode::NOT_MODIFIED => Ok(false), + // TODO: dedupe this pattern / introduce separate error variant? + status => Err(match resp.json::().await { + Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg), + Err(_) => { + Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri)) + } + }), + } + } + + pub async fn layer_ondemand_download( + &self, + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + layer_file_name: &str, + ) -> Result { + let uri = format!( + "{}/v1/tenant/{}/timeline/{}/layer/{}", + self.mgmt_api_endpoint, tenant_shard_id, timeline_id, layer_file_name + ); + let resp = self.request_noerror(Method::GET, &uri, ()).await?; + match resp.status() { + StatusCode::OK => Ok(true), + StatusCode::NOT_MODIFIED => Ok(false), + // TODO: dedupe this pattern / introduce separate error variant? + status => Err(match resp.json::().await { + Ok(HttpErrorBody { msg }) => Error::ApiError(status, msg), + Err(_) => { + Error::ReceiveErrorBody(format!("Http error ({}) at {}.", status.as_u16(), uri)) + } + }), + } + } } diff --git a/pageserver/pagebench/src/cmd/ondemand_download_churn.rs b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs new file mode 100644 index 000000000000..197e782dca47 --- /dev/null +++ b/pageserver/pagebench/src/cmd/ondemand_download_churn.rs @@ -0,0 +1,272 @@ +use pageserver_api::{models::HistoricLayerInfo, shard::TenantShardId}; + +use pageserver_client::mgmt_api; +use rand::seq::SliceRandom; +use tracing::{debug, info}; +use utils::id::{TenantTimelineId, TimelineId}; + +use tokio::{ + sync::{mpsc, OwnedSemaphorePermit}, + task::JoinSet, +}; + +use std::{ + num::NonZeroUsize, + sync::{ + atomic::{AtomicU64, Ordering}, + Arc, + }, + time::{Duration, Instant}, +}; + +/// Evict & on-demand download random layers. +#[derive(clap::Parser)] +pub(crate) struct Args { + #[clap(long, default_value = "http://localhost:9898")] + mgmt_api_endpoint: String, + #[clap(long)] + pageserver_jwt: Option, + #[clap(long)] + runtime: Option, + #[clap(long, default_value = "1")] + tasks_per_target: NonZeroUsize, + #[clap(long, default_value = "1")] + concurrency_per_target: NonZeroUsize, + /// Probability for sending `latest=true` in the request (uniform distribution). + #[clap(long)] + limit_to_first_n_targets: Option, + /// Before starting the benchmark, live-reconfigure the pageserver to use the given + /// [`pageserver_api::models::virtual_file::IoEngineKind`]. + #[clap(long)] + set_io_engine: Option, + targets: Option>, +} + +pub(crate) fn main(args: Args) -> anyhow::Result<()> { + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build()?; + let task = rt.spawn(main_impl(args)); + rt.block_on(task).unwrap().unwrap(); + Ok(()) +} + +#[derive(Debug, Default)] +struct LiveStats { + evictions: AtomicU64, + downloads: AtomicU64, + timeline_restarts: AtomicU64, +} + +impl LiveStats { + fn eviction_done(&self) { + self.evictions.fetch_add(1, Ordering::Relaxed); + } + fn download_done(&self) { + self.downloads.fetch_add(1, Ordering::Relaxed); + } + fn timeline_restart_done(&self) { + self.timeline_restarts.fetch_add(1, Ordering::Relaxed); + } +} + +async fn main_impl(args: Args) -> anyhow::Result<()> { + let args: &'static Args = Box::leak(Box::new(args)); + + let mgmt_api_client = Arc::new(pageserver_client::mgmt_api::Client::new( + args.mgmt_api_endpoint.clone(), + args.pageserver_jwt.as_deref(), + )); + + if let Some(engine_str) = &args.set_io_engine { + mgmt_api_client.put_io_engine(engine_str).await?; + } + + // discover targets + let timelines: Vec = crate::util::cli::targets::discover( + &mgmt_api_client, + crate::util::cli::targets::Spec { + limit_to_first_n_targets: args.limit_to_first_n_targets, + targets: args.targets.clone(), + }, + ) + .await?; + + let mut tasks = JoinSet::new(); + + let live_stats = Arc::new(LiveStats::default()); + tasks.spawn({ + let live_stats = Arc::clone(&live_stats); + async move { + let mut last_at = Instant::now(); + loop { + tokio::time::sleep_until((last_at + Duration::from_secs(1)).into()).await; + let now = Instant::now(); + let delta: Duration = now - last_at; + last_at = now; + + let LiveStats { + evictions, + downloads, + timeline_restarts, + } = &*live_stats; + let evictions = evictions.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64(); + let downloads = downloads.swap(0, Ordering::Relaxed) as f64 / delta.as_secs_f64(); + let timeline_restarts = timeline_restarts.swap(0, Ordering::Relaxed); + info!("evictions={evictions:.2}/s downloads={downloads:.2}/s timeline_restarts={timeline_restarts}"); + } + } + }); + + for tl in timelines { + for _ in 0..args.tasks_per_target.get() { + tasks.spawn(timeline_actor( + args, + Arc::clone(&mgmt_api_client), + tl, + Arc::clone(&live_stats), + )); + } + } + + while let Some(res) = tasks.join_next().await { + res.unwrap(); + } + Ok(()) +} + +async fn timeline_actor( + args: &'static Args, + mgmt_api_client: Arc, + timeline: TenantTimelineId, + live_stats: Arc, +) { + // TODO: support sharding + let tenant_shard_id = TenantShardId::unsharded(timeline.tenant_id); + + struct Timeline { + joinset: JoinSet<()>, + layers: Vec>, + concurrency: Arc, + } + loop { + debug!("restarting timeline"); + let layer_map_info = mgmt_api_client + .layer_map_info(tenant_shard_id, timeline.timeline_id) + .await + .unwrap(); + let concurrency = Arc::new(tokio::sync::Semaphore::new( + args.concurrency_per_target.get(), + )); + + let mut joinset = JoinSet::new(); + let layers = layer_map_info + .historic_layers + .into_iter() + .map(|historic_layer| { + let (tx, rx) = mpsc::channel(1); + joinset.spawn(layer_actor( + tenant_shard_id, + timeline.timeline_id, + historic_layer, + rx, + Arc::clone(&mgmt_api_client), + Arc::clone(&live_stats), + )); + tx + }) + .collect::>(); + + let mut timeline = Timeline { + joinset, + layers, + concurrency, + }; + + live_stats.timeline_restart_done(); + + loop { + assert!(!timeline.joinset.is_empty()); + if let Some(res) = timeline.joinset.try_join_next() { + debug!(?res, "a layer actor exited, should not happen"); + timeline.joinset.shutdown().await; + break; + } + + let mut permit = Some( + Arc::clone(&timeline.concurrency) + .acquire_owned() + .await + .unwrap(), + ); + + loop { + let layer_tx = { + let mut rng = rand::thread_rng(); + timeline.layers.choose_mut(&mut rng).expect("no layers") + }; + match layer_tx.try_send(permit.take().unwrap()) { + Ok(_) => break, + Err(e) => match e { + mpsc::error::TrySendError::Full(back) => { + // TODO: retrying introduces bias away from slow downloaders + permit.replace(back); + } + mpsc::error::TrySendError::Closed(_) => panic!(), + }, + } + } + } + } +} + +async fn layer_actor( + tenant_shard_id: TenantShardId, + timeline_id: TimelineId, + mut layer: HistoricLayerInfo, + mut rx: mpsc::Receiver, + mgmt_api_client: Arc, + live_stats: Arc, +) { + #[derive(Clone, Copy)] + enum Action { + Evict, + OnDemandDownload, + } + + while let Some(_permit) = rx.recv().await { + let action = if layer.is_remote() { + Action::OnDemandDownload + } else { + Action::Evict + }; + + let did_it = match action { + Action::Evict => { + let did_it = mgmt_api_client + .layer_evict(tenant_shard_id, timeline_id, layer.layer_file_name()) + .await + .unwrap(); + live_stats.eviction_done(); + did_it + } + Action::OnDemandDownload => { + let did_it = mgmt_api_client + .layer_ondemand_download(tenant_shard_id, timeline_id, layer.layer_file_name()) + .await + .unwrap(); + live_stats.download_done(); + did_it + } + }; + if !did_it { + debug!("local copy of layer map appears out of sync, re-downloading"); + return; + } + debug!("did it"); + layer.set_remote(match action { + Action::Evict => true, + Action::OnDemandDownload => false, + }); + } +} diff --git a/pageserver/pagebench/src/main.rs b/pageserver/pagebench/src/main.rs index 5d688ed2d17d..743102d85353 100644 --- a/pageserver/pagebench/src/main.rs +++ b/pageserver/pagebench/src/main.rs @@ -16,6 +16,7 @@ mod util { mod cmd { pub(super) mod basebackup; pub(super) mod getpage_latest_lsn; + pub(super) mod ondemand_download_churn; pub(super) mod trigger_initial_size_calculation; } @@ -25,6 +26,7 @@ enum Args { Basebackup(cmd::basebackup::Args), GetPageLatestLsn(cmd::getpage_latest_lsn::Args), TriggerInitialSizeCalculation(cmd::trigger_initial_size_calculation::Args), + OndemandDownloadChurn(cmd::ondemand_download_churn::Args), } fn main() { @@ -43,6 +45,7 @@ fn main() { Args::TriggerInitialSizeCalculation(args) => { cmd::trigger_initial_size_calculation::main(args) } + Args::OndemandDownloadChurn(args) => cmd::ondemand_download_churn::main(args), } .unwrap() } diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 6fff6e78e2e9..6ee8ad715509 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -23,7 +23,7 @@ use crate::tenant::storage_layer::LayerFileName; use crate::tenant::Generation; use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; -use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode}; +use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode, RemotePath}; use utils::crashsafe::path_with_suffix_extension; use utils::id::TimelineId; @@ -73,55 +73,13 @@ pub async fn download_layer_file<'a>( // If pageserver crashes the temp file will be deleted on startup and re-downloaded. let temp_file_path = path_with_suffix_extension(&local_path, TEMP_DOWNLOAD_EXTENSION); - let (mut destination_file, bytes_amount) = download_retry( - || async { - let destination_file = tokio::fs::File::create(&temp_file_path) - .await - .with_context(|| format!("create a destination file for layer '{temp_file_path}'")) - .map_err(DownloadError::Other)?; - - let download = storage.download(&remote_path, cancel).await?; - - let mut destination_file = - tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file); - - let mut reader = tokio_util::io::StreamReader::new(download.download_stream); - - let bytes_amount = tokio::io::copy_buf(&mut reader, &mut destination_file).await; - - match bytes_amount { - Ok(bytes_amount) => { - let destination_file = destination_file.into_inner(); - Ok((destination_file, bytes_amount)) - } - Err(e) => { - if let Err(e) = tokio::fs::remove_file(&temp_file_path).await { - on_fatal_io_error(&e, &format!("Removing temporary file {temp_file_path}")); - } - - Err(e.into()) - } - } - }, + let bytes_amount = download_retry( + || async { download_object(storage, &remote_path, &temp_file_path, cancel).await }, &format!("download {remote_path:?}"), cancel, ) .await?; - // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: - // A file will not be closed immediately when it goes out of scope if there are any IO operations - // that have not yet completed. To ensure that a file is closed immediately when it is dropped, - // you should call flush before dropping it. - // - // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because - // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations. - // But for additional safety lets check/wait for any pending operations. - destination_file - .flush() - .await - .with_context(|| format!("flush source file at {temp_file_path}")) - .map_err(DownloadError::Other)?; - let expected = layer_metadata.file_size(); if expected != bytes_amount { return Err(DownloadError::Other(anyhow!( @@ -129,14 +87,6 @@ pub async fn download_layer_file<'a>( ))); } - // not using sync_data because it can lose file size update - destination_file - .sync_all() - .await - .with_context(|| format!("failed to fsync source file at {temp_file_path}")) - .map_err(DownloadError::Other)?; - drop(destination_file); - fail::fail_point!("remote-storage-download-pre-rename", |_| { Err(DownloadError::Other(anyhow!( "remote-storage-download-pre-rename failpoint triggered" @@ -169,6 +119,128 @@ pub async fn download_layer_file<'a>( Ok(bytes_amount) } +/// Download the object `src_path` in the remote `storage` to local path `dst_path`. +/// +/// If Ok() is returned, the download succeeded and the inode & data have been made durable. +/// (Note that the directory entry for the inode is not made durable.) +/// The file size in bytes is returned. +/// +/// If Err() is returned, there was some error. The file at `dst_path` has been unlinked. +/// The unlinking has _not_ been made durable. +async fn download_object<'a>( + storage: &'a GenericRemoteStorage, + src_path: &RemotePath, + dst_path: &Utf8PathBuf, + cancel: &CancellationToken, +) -> Result { + let res = match crate::virtual_file::io_engine::get() { + crate::virtual_file::io_engine::IoEngine::NotSet => panic!("unset"), + crate::virtual_file::io_engine::IoEngine::StdFs => { + async { + let destination_file = tokio::fs::File::create(dst_path) + .await + .with_context(|| format!("create a destination file for layer '{dst_path}'")) + .map_err(DownloadError::Other)?; + + let download = storage.download(src_path, cancel).await?; + + let mut buf_writer = + tokio::io::BufWriter::with_capacity(super::BUFFER_SIZE, destination_file); + + let mut reader = tokio_util::io::StreamReader::new(download.download_stream); + + let bytes_amount = tokio::io::copy_buf(&mut reader, &mut buf_writer).await?; + buf_writer.flush().await?; + + let mut destination_file = buf_writer.into_inner(); + + // Tokio doc here: https://docs.rs/tokio/1.17.0/tokio/fs/struct.File.html states that: + // A file will not be closed immediately when it goes out of scope if there are any IO operations + // that have not yet completed. To ensure that a file is closed immediately when it is dropped, + // you should call flush before dropping it. + // + // From the tokio code I see that it waits for pending operations to complete. There shouldt be any because + // we assume that `destination_file` file is fully written. I e there is no pending .write(...).await operations. + // But for additional safety lets check/wait for any pending operations. + destination_file + .flush() + .await + .with_context(|| format!("flush source file at {dst_path}")) + .map_err(DownloadError::Other)?; + + // not using sync_data because it can lose file size update + destination_file + .sync_all() + .await + .with_context(|| format!("failed to fsync source file at {dst_path}")) + .map_err(DownloadError::Other)?; + + Ok(bytes_amount) + } + .await + } + #[cfg(target_os = "linux")] + crate::virtual_file::io_engine::IoEngine::TokioEpollUring => { + use crate::virtual_file::owned_buffers_io::{self, util::size_tracking_writer}; + async { + let destination_file = VirtualFile::create(dst_path) + .await + .with_context(|| format!("create a destination file for layer '{dst_path}'")) + .map_err(DownloadError::Other)?; + + let mut download = storage.download(src_path, cancel).await?; + + // TODO: use vectored write (writev) once supported by tokio-epoll-uring. + // There's chunks_vectored() on the stream. + let (bytes_amount, destination_file) = async { + let size_tracking = size_tracking_writer::Writer::new(destination_file); + let mut buffered = owned_buffers_io::write::BufferedWriter::< + { super::BUFFER_SIZE }, + _, + >::new(size_tracking); + while let Some(res) = + futures::StreamExt::next(&mut download.download_stream).await + { + let chunk = match res { + Ok(chunk) => chunk, + Err(e) => return Err(e), + }; + buffered + .write_buffered(tokio_epoll_uring::BoundedBuf::slice_full(chunk)) + .await?; + } + let size_tracking = buffered.flush_and_into_inner().await?; + Ok(size_tracking.into_inner()) + } + .await?; + + // not using sync_data because it can lose file size update + destination_file + .sync_all() + .await + .with_context(|| format!("failed to fsync source file at {dst_path}")) + .map_err(DownloadError::Other)?; + + Ok(bytes_amount) + } + .await + } + }; + + // in case the download failed, clean up + match res { + Ok(bytes_amount) => Ok(bytes_amount), + Err(e) => { + if let Err(e) = tokio::fs::remove_file(dst_path).await { + if e.kind() != std::io::ErrorKind::NotFound { + on_fatal_io_error(&e, &format!("Removing temporary file {dst_path}")); + } + } + Err(e) + } + } +} + const TEMP_DOWNLOAD_EXTENSION: &str = "temp_download"; pub(crate) fn is_temp_download_file(path: &Utf8Path) -> bool { diff --git a/pageserver/src/tenant/storage_layer.rs b/pageserver/src/tenant/storage_layer.rs index 299950cc21a7..5c3bab986888 100644 --- a/pageserver/src/tenant/storage_layer.rs +++ b/pageserver/src/tenant/storage_layer.rs @@ -20,6 +20,7 @@ use pageserver_api::keyspace::{KeySpace, KeySpaceRandomAccum}; use pageserver_api::models::{ LayerAccessKind, LayerResidenceEvent, LayerResidenceEventReason, LayerResidenceStatus, }; +use std::borrow::Cow; use std::cmp::{Ordering, Reverse}; use std::collections::hash_map::Entry; use std::collections::{BinaryHeap, HashMap}; @@ -427,7 +428,7 @@ impl LayerAccessStatFullDetails { } = self; pageserver_api::models::LayerAccessStatFullDetails { when_millis_since_epoch: system_time_to_millis_since_epoch(when), - task_kind: task_kind.into(), // into static str, powered by strum_macros + task_kind: Cow::Borrowed(task_kind.into()), // into static str, powered by strum_macros access_kind: *access_kind, } } @@ -525,7 +526,7 @@ impl LayerAccessStats { .collect(), task_kind_access_flag: task_kind_flag .iter() - .map(|task_kind| task_kind.into()) // into static str, powered by strum_macros + .map(|task_kind| Cow::Borrowed(task_kind.into())) // into static str, powered by strum_macros .collect(), first: first_access.as_ref().map(|a| a.as_api_model()), accesses_history: last_accesses.map(|m| m.as_api_model()), diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index ae44e9edc46a..dee36d8afd6c 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -36,6 +36,23 @@ pub(crate) use io_engine::IoEngineKind; pub(crate) use metadata::Metadata; pub(crate) use open_options::*; +#[cfg_attr(not(target_os = "linux"), allow(dead_code))] +pub(crate) mod owned_buffers_io { + //! Abstractions for IO with owned buffers. + //! + //! Not actually tied to [`crate::virtual_file`] specifically, but, it's the primary + //! reason we need this abstraction. + //! + //! Over time, this could move into the `tokio-epoll-uring` crate, maybe `uring-common`, + //! but for the time being we're proving out the primitives in the neon.git repo + //! for faster iteration. + + pub(crate) mod write; + pub(crate) mod util { + pub(crate) mod size_tracking_writer; + } +} + /// /// A virtual file descriptor. You can use this just like std::fs::File, but internally /// the underlying file is closed if the system is low on file descriptors, diff --git a/pageserver/src/virtual_file/io_engine.rs b/pageserver/src/virtual_file/io_engine.rs index 7f2342e76ef4..55fa59e53b06 100644 --- a/pageserver/src/virtual_file/io_engine.rs +++ b/pageserver/src/virtual_file/io_engine.rs @@ -282,49 +282,52 @@ impl From for IoEngineKind { /// Panics if we can't set up the feature test. pub fn feature_test() -> anyhow::Result { std::thread::spawn(|| { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); + #[cfg(not(target_os = "linux"))] { - return Ok(FeatureTestResult::PlatformPreferred( - FeatureTestResult::PLATFORM_DEFAULT, - )); + Ok(FeatureTestResult::PlatformPreferred( + FeatureTestResult::PLATFORM_PREFERRED, + )) } #[cfg(target_os = "linux")] - Ok(match rt.block_on(tokio_epoll_uring::System::launch()) { - Ok(_) => FeatureTestResult::PlatformPreferred({ - assert!(matches!( - IoEngineKind::TokioEpollUring, + { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + Ok(match rt.block_on(tokio_epoll_uring::System::launch()) { + Ok(_) => FeatureTestResult::PlatformPreferred({ + assert!(matches!( + IoEngineKind::TokioEpollUring, + FeatureTestResult::PLATFORM_PREFERRED + )); FeatureTestResult::PLATFORM_PREFERRED - )); - FeatureTestResult::PLATFORM_PREFERRED - }), - Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => { - let remark = match e.raw_os_error() { - Some(nix::libc::EPERM) => { - // fall back - "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled " - .to_string() - } - Some(nix::libc::EFAULT) => { - // fail feature test - anyhow::bail!( - "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory" - ); - } - Some(_) | None => { - // fall back - format!("creating tokio-epoll-uring fails with error: {e:#}") + }), + Err(tokio_epoll_uring::LaunchResult::IoUringBuild(e)) => { + let remark = match e.raw_os_error() { + Some(nix::libc::EPERM) => { + // fall back + "creating tokio-epoll-uring fails with EPERM, assuming it's admin-disabled " + .to_string() + } + Some(nix::libc::EFAULT) => { + // fail feature test + anyhow::bail!( + "creating tokio-epoll-uring fails with EFAULT, might have corrupted memory" + ); + } + Some(_) | None => { + // fall back + format!("creating tokio-epoll-uring fails with error: {e:#}") + } + }; + FeatureTestResult::Worse { + engine: IoEngineKind::StdFs, + remark, } - }; - FeatureTestResult::Worse { - engine: IoEngineKind::StdFs, - remark, } - } - }) + }) + } }) .join() .unwrap() diff --git a/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs new file mode 100644 index 000000000000..7505b7487e67 --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/util/size_tracking_writer.rs @@ -0,0 +1,34 @@ +use crate::virtual_file::{owned_buffers_io::write::OwnedAsyncWriter, VirtualFile}; +use tokio_epoll_uring::{BoundedBuf, IoBuf}; + +pub struct Writer { + dst: VirtualFile, + bytes_amount: u64, +} + +impl Writer { + pub fn new(dst: VirtualFile) -> Self { + Self { + dst, + bytes_amount: 0, + } + } + /// Returns the wrapped `VirtualFile` object as well as the number + /// of bytes that were written to it through this object. + pub fn into_inner(self) -> (u64, VirtualFile) { + (self.bytes_amount, self.dst) + } +} + +impl OwnedAsyncWriter for Writer { + #[inline(always)] + async fn write_all, Buf: IoBuf + Send>( + &mut self, + buf: B, + ) -> std::io::Result<(usize, B::Buf)> { + let (buf, res) = self.dst.write_all(buf).await; + let nwritten = res?; + self.bytes_amount += u64::try_from(nwritten).unwrap(); + Ok((nwritten, buf)) + } +} diff --git a/pageserver/src/virtual_file/owned_buffers_io/write.rs b/pageserver/src/virtual_file/owned_buffers_io/write.rs new file mode 100644 index 000000000000..f1812d9b518a --- /dev/null +++ b/pageserver/src/virtual_file/owned_buffers_io/write.rs @@ -0,0 +1,206 @@ +use bytes::BytesMut; +use tokio_epoll_uring::{BoundedBuf, IoBuf, Slice}; + +/// A trait for doing owned-buffer write IO. +/// Think [`tokio::io::AsyncWrite`] but with owned buffers. +pub trait OwnedAsyncWriter { + async fn write_all, Buf: IoBuf + Send>( + &mut self, + buf: B, + ) -> std::io::Result<(usize, B::Buf)>; +} + +/// A wrapper aorund an [`OwnedAsyncWriter`] that batches smaller writers +/// into `BUFFER_SIZE`-sized writes. +/// +/// # Passthrough Of Large Writers +/// +/// Buffered writes larger than the `BUFFER_SIZE` cause the internal +/// buffer to be flushed, even if it is not full yet. Then, the large +/// buffered write is passed through to the unerlying [`OwnedAsyncWriter`]. +/// +/// This pass-through is generally beneficial for throughput, but if +/// the storage backend of the [`OwnedAsyncWriter`] is a shared resource, +/// unlimited large writes may cause latency or fairness issues. +/// +/// In such cases, a different implementation that always buffers in memory +/// may be preferable. +pub struct BufferedWriter { + writer: W, + // invariant: always remains Some(buf) + // with buf.capacity() == BUFFER_SIZE except + // - while IO is ongoing => goes back to Some() once the IO completed successfully + // - after an IO error => stays `None` forever + // In these exceptional cases, it's `None`. + buf: Option, +} + +impl BufferedWriter +where + W: OwnedAsyncWriter, +{ + pub fn new(writer: W) -> Self { + Self { + writer, + buf: Some(BytesMut::with_capacity(BUFFER_SIZE)), + } + } + + pub async fn flush_and_into_inner(mut self) -> std::io::Result { + self.flush().await?; + let Self { buf, writer } = self; + assert!(buf.is_some()); + Ok(writer) + } + + pub async fn write_buffered(&mut self, chunk: Slice) -> std::io::Result<()> + where + B: IoBuf + Send, + { + // avoid memcpy for the middle of the chunk + if chunk.len() >= BUFFER_SIZE { + self.flush().await?; + // do a big write, bypassing `buf` + assert_eq!( + self.buf + .as_ref() + .expect("must not use after an error") + .len(), + 0 + ); + let chunk_len = chunk.len(); + let (nwritten, chunk) = self.writer.write_all(chunk).await?; + assert_eq!(nwritten, chunk_len); + drop(chunk); + return Ok(()); + } + // in-memory copy the < BUFFER_SIZED tail of the chunk + assert!(chunk.len() < BUFFER_SIZE); + let mut chunk = &chunk[..]; + while !chunk.is_empty() { + let buf = self.buf.as_mut().expect("must not use after an error"); + let need = BUFFER_SIZE - buf.len(); + let have = chunk.len(); + let n = std::cmp::min(need, have); + buf.extend_from_slice(&chunk[..n]); + chunk = &chunk[n..]; + if buf.len() >= BUFFER_SIZE { + assert_eq!(buf.len(), BUFFER_SIZE); + self.flush().await?; + } + } + assert!(chunk.is_empty(), "by now we should have drained the chunk"); + Ok(()) + } + + async fn flush(&mut self) -> std::io::Result<()> { + let buf = self.buf.take().expect("must not use after an error"); + if buf.is_empty() { + self.buf = Some(buf); + return std::io::Result::Ok(()); + } + let buf_len = buf.len(); + let (nwritten, mut buf) = self.writer.write_all(buf).await?; + assert_eq!(nwritten, buf_len); + buf.clear(); + self.buf = Some(buf); + Ok(()) + } +} + +impl OwnedAsyncWriter for Vec { + async fn write_all, Buf: IoBuf + Send>( + &mut self, + buf: B, + ) -> std::io::Result<(usize, B::Buf)> { + let nbytes = buf.bytes_init(); + if nbytes == 0 { + return Ok((0, Slice::into_inner(buf.slice_full()))); + } + let buf = buf.slice(0..nbytes); + self.extend_from_slice(&buf[..]); + Ok((buf.len(), Slice::into_inner(buf))) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[derive(Default)] + struct RecorderWriter { + writes: Vec>, + } + impl OwnedAsyncWriter for RecorderWriter { + async fn write_all, Buf: IoBuf + Send>( + &mut self, + buf: B, + ) -> std::io::Result<(usize, B::Buf)> { + let nbytes = buf.bytes_init(); + if nbytes == 0 { + self.writes.push(vec![]); + return Ok((0, Slice::into_inner(buf.slice_full()))); + } + let buf = buf.slice(0..nbytes); + self.writes.push(Vec::from(&buf[..])); + Ok((buf.len(), Slice::into_inner(buf))) + } + } + + macro_rules! write { + ($writer:ident, $data:literal) => {{ + $writer + .write_buffered(::bytes::Bytes::from_static($data).slice_full()) + .await?; + }}; + } + + #[tokio::test] + async fn test_buffered_writes_only() -> std::io::Result<()> { + let recorder = RecorderWriter::default(); + let mut writer = BufferedWriter::<2, _>::new(recorder); + write!(writer, b"a"); + write!(writer, b"b"); + write!(writer, b"c"); + write!(writer, b"d"); + write!(writer, b"e"); + let recorder = writer.flush_and_into_inner().await?; + assert_eq!( + recorder.writes, + vec![Vec::from(b"ab"), Vec::from(b"cd"), Vec::from(b"e")] + ); + Ok(()) + } + + #[tokio::test] + async fn test_passthrough_writes_only() -> std::io::Result<()> { + let recorder = RecorderWriter::default(); + let mut writer = BufferedWriter::<2, _>::new(recorder); + write!(writer, b"abc"); + write!(writer, b"de"); + write!(writer, b""); + write!(writer, b"fghijk"); + let recorder = writer.flush_and_into_inner().await?; + assert_eq!( + recorder.writes, + vec![Vec::from(b"abc"), Vec::from(b"de"), Vec::from(b"fghijk")] + ); + Ok(()) + } + + #[tokio::test] + async fn test_passthrough_write_with_nonempty_buffer() -> std::io::Result<()> { + let recorder = RecorderWriter::default(); + let mut writer = BufferedWriter::<2, _>::new(recorder); + write!(writer, b"a"); + write!(writer, b"bc"); + write!(writer, b"d"); + write!(writer, b"e"); + let recorder = writer.flush_and_into_inner().await?; + assert_eq!( + recorder.writes, + vec![Vec::from(b"a"), Vec::from(b"bc"), Vec::from(b"de")] + ); + Ok(()) + } +} diff --git a/scripts/ps_ec2_setup_instance_store b/scripts/ps_ec2_setup_instance_store index 4cca3a9857c0..1f88f252ebb3 100755 --- a/scripts/ps_ec2_setup_instance_store +++ b/scripts/ps_ec2_setup_instance_store @@ -40,7 +40,7 @@ To run your local neon.git build on the instance store volume, run the following commands from the top of the neon.git checkout # raise file descriptor limit of your shell and its child processes - sudo prlimit -p $$ --nofile=800000:800000 + sudo prlimit -p \$\$ --nofile=800000:800000 # test suite run export TEST_OUTPUT="$TEST_OUTPUT"