Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

layer file download: final rename: fix durability #6991

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 19 additions & 9 deletions pageserver/src/tenant/remote_timeline_client/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use tokio::io::{AsyncSeekExt, AsyncWriteExt};
use tokio_util::io::StreamReader;
use tokio_util::sync::CancellationToken;
use tracing::warn;
use utils::{backoff, crashsafe};
use utils::backoff;

use crate::config::PageServerConf;
use crate::span::debug_assert_current_span_has_tenant_and_timeline_id;
use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path};
use crate::tenant::storage_layer::LayerFileName;
use crate::tenant::Generation;
use crate::virtual_file::on_fatal_io_error;
use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile};
use crate::TEMP_FILE_SUFFIX;
use remote_storage::{DownloadError, GenericRemoteStorage, ListingMode};
use utils::crashsafe::path_with_suffix_extension;
Expand Down Expand Up @@ -50,9 +50,8 @@ pub async fn download_layer_file<'a>(
) -> Result<u64, DownloadError> {
debug_assert_current_span_has_tenant_and_timeline_id();

let local_path = conf
.timeline_path(&tenant_shard_id, &timeline_id)
.join(layer_file_name.file_name());
let timeline_path = conf.timeline_path(&tenant_shard_id, &timeline_id);
let local_path = timeline_path.join(layer_file_name.file_name());

let remote_path = remote_layer_path(
&tenant_shard_id.tenant_id,
Expand Down Expand Up @@ -149,10 +148,21 @@ pub async fn download_layer_file<'a>(
.with_context(|| format!("rename download layer file to {local_path}"))
.map_err(DownloadError::Other)?;

problame marked this conversation as resolved.
Show resolved Hide resolved
crashsafe::fsync_async(&local_path)
.await
.with_context(|| format!("fsync layer file {local_path}"))
.map_err(DownloadError::Other)?;
// We use fatal_err() below because the after the rename above,
// the in-memory state of the filesystem already has the layer file in its final place,
// and subsequent pageserver code could think it's durable while it really isn't.
let work = async move {
let timeline_dir = VirtualFile::open(&timeline_path)
.await
.fatal_err("VirtualFile::open for timeline dir fsync");
timeline_dir
.sync_all()
.await
.fatal_err("VirtualFile::sync_all timeline dir");
};
crate::virtual_file::io_engine::get()
.spawn_blocking_and_block_on_if_std(work)
.await;

tracing::debug!("download complete: {local_path}");

Expand Down
26 changes: 26 additions & 0 deletions pageserver/src/virtual_file/io_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
//! Then use [`get`] and [`super::OpenOptions`].

use tokio_epoll_uring::{IoBuf, Slice};
use tracing::Instrument;

pub(crate) use super::api::IoEngineKind;
#[derive(Clone, Copy)]
Expand Down Expand Up @@ -225,4 +226,29 @@ impl IoEngine {
}
}
}

/// If we switch a user of [`tokio::fs`] to use [`super::io_engine`],
/// they'd start blocking the executor thread if [`IoEngine::StdFs`] is configured
/// whereas before the switch to [`super::io_engine`], that wasn't the case.
/// This method helps avoid such a regression.
///
/// Panics if the `spawn_blocking` fails, see [`tokio::task::JoinError`] for reasons why that can happen.
pub(crate) async fn spawn_blocking_and_block_on_if_std<Fut, R>(&self, work: Fut) -> R
where
Fut: 'static + Send + std::future::Future<Output = R>,
R: 'static + Send,
{
match self {
IoEngine::NotSet => panic!("not initialized"),
IoEngine::StdFs => {
let span = tracing::info_span!("spawn_blocking_block_on_if_std");
tokio::task::spawn_blocking({
move || tokio::runtime::Handle::current().block_on(work.instrument(span))
})
.await
.expect("failed to join blocking code most likely it panicked, panicking as well")
}
IoEngine::TokioEpollUring => work.await,
}
}
}