Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make read_exact_at cancel-safe #6101

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

95 changes: 60 additions & 35 deletions pageserver/src/virtual_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use camino::{Utf8Path, Utf8PathBuf};
use once_cell::sync::OnceCell;
use std::fs::{self, File};
use std::io::{Error, ErrorKind, Seek, SeekFrom};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd};
use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd};
use std::os::unix::fs::FileExt;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
Expand Down Expand Up @@ -106,7 +106,7 @@ struct SlotInner {
tag: u64,

/// the underlying file
file: Option<File>,
file: Option<OwnedFd>,
}

/// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`].
Expand Down Expand Up @@ -305,6 +305,10 @@ macro_rules! with_file {
let $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{
let mut $ident = $this.lock_file().await?;
observe_duration!($op, $($body)*)
}};
}

impl VirtualFile {
Expand Down Expand Up @@ -401,7 +405,7 @@ impl VirtualFile {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
File::from(file)
file
}));

// Strip all options other than read and write.
Expand All @@ -428,15 +432,13 @@ impl VirtualFile {

/// Call File::sync_all() on the underlying File.
pub async fn sync_all(&self) -> Result<(), Error> {
with_file!(self, StorageIoOperation::Fsync, |file| file
.as_ref()
.sync_all())
with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard
.with_std_file(|std_file| std_file.sync_all()))
}

pub async fn metadata(&self) -> Result<fs::Metadata, Error> {
with_file!(self, StorageIoOperation::Metadata, |file| file
.as_ref()
.metadata())
with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard
.with_std_file(|std_file| std_file.metadata()))
}

/// Helper function internal to `VirtualFile` that looks up the underlying File,
Expand All @@ -445,7 +447,7 @@ impl VirtualFile {
///
/// We are doing it via a macro as Rust doesn't support async closures that
/// take on parameters with lifetimes.
async fn lock_file(&self) -> Result<FileGuard<'static>, Error> {
async fn lock_file(&self) -> Result<FileGuard, Error> {
let open_files = get_open_files();

let mut handle_guard = {
Expand Down Expand Up @@ -502,7 +504,7 @@ impl VirtualFile {
std::io::Error::new(std::io::ErrorKind::Other, system)
}
})?;
File::from(file)
file
});

// Store the File in the slot and update the handle in the VirtualFile
Expand All @@ -528,9 +530,8 @@ impl VirtualFile {
self.pos = offset;
}
SeekFrom::End(offset) => {
self.pos = with_file!(self, StorageIoOperation::Seek, |file| file
.as_ref()
.seek(SeekFrom::End(offset)))?
self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard
.with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))?
}
SeekFrom::Current(offset) => {
let pos = self.pos as i128 + offset as i128;
Expand Down Expand Up @@ -656,27 +657,15 @@ impl VirtualFile {
}
async fn read_at0<B>(
&self,
file_guard: FileGuard<'_>,
file_guard: FileGuard,
buf: B,
offset: u64,
) -> (B, Result<usize, Error>)
where
B: tokio_epoll_uring::BoundedBufMut + Send,
{
let system = tokio_epoll_uring::thread_local_system().await;
// SAFETY: when file_guard gets dropped, the raw fd becomes invalid or may get re-used
// while the io_uring operation is still executing.
// The `file_guard` could get dropped due to future cancellation-by-drop.
// We prevent this situation using the scopeguard: it will abort the process in such cases.
// Fixing this is subject of https://github.com/neondatabase/neon/pull/6101
let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) };
let guard = scopeguard::guard(file_guard, |_| {
eprintln!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)");
std::process::abort();
});
let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await;
let _ = OwnedFd::into_raw_fd(owned_fd);
let _ = scopeguard::ScopeGuard::into_inner(guard);
let ((_file_guard, buf), res) = system.read(file_guard, offset, buf).await;
if let Ok(size) = res {
// TODO: don't use with_label_values on hot path
// https://github.com/neondatabase/neon/issues/6107
Expand All @@ -688,9 +677,9 @@ impl VirtualFile {
}

async fn write_at(&self, buf: &[u8], offset: u64) -> Result<usize, Error> {
let result = with_file!(self, StorageIoOperation::Write, |file| file
.as_ref()
.write_at(buf, offset));
let result = with_file!(self, StorageIoOperation::Write, |file_guard| {
file_guard.with_std_file(|std_file| std_file.write_at(buf, offset))
});
if let Ok(size) = result {
// TODO: don't use with_label_values on hot path
// https://github.com/neondatabase/neon/issues/6107
Expand All @@ -702,18 +691,54 @@ impl VirtualFile {
}
}

struct FileGuard<'a> {
slot_guard: RwLockReadGuard<'a, SlotInner>,
struct FileGuard {
slot_guard: RwLockReadGuard<'static, SlotInner>,
}

impl<'a> AsRef<File> for FileGuard<'a> {
fn as_ref(&self) -> &File {
impl AsRef<OwnedFd> for FileGuard {
fn as_ref(&self) -> &OwnedFd {
// This unwrap is safe because we only create `FileGuard`s
// if we know that the file is Some.
self.slot_guard.file.as_ref().unwrap()
}
}

impl FileGuard {
// TODO: switch to tokio-epoll-uring native operations.
fn with_std_file<F, R>(&self, with: F) -> R
where
F: FnOnce(&File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut`
let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
let res = with(&file);
let _ = file.into_raw_fd();
res
}
// TODO: switch to tokio-epoll-uring native operations.
fn with_std_file_mut<F, R>(&mut self, with: F) -> R
where
F: FnOnce(&mut File) -> R,
{
// SAFETY:
// - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`.
// - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd
let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) };
let res = with(&mut file);
let _ = file.into_raw_fd();
res
}
}

impl tokio_epoll_uring::IoFd for FileGuard {
unsafe fn as_fd(&self) -> RawFd {
problame marked this conversation as resolved.
Show resolved Hide resolved
let owned_fd: &OwnedFd = self.as_ref();
owned_fd.as_raw_fd()
}
}

#[cfg(test)]
impl VirtualFile {
pub(crate) async fn read_blk(
Expand Down
Loading