diff --git a/Cargo.lock b/Cargo.lock index 8e862bf05d0b..7efc4a0b0a93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5158,7 +5158,7 @@ dependencies = [ [[package]] name = "tokio-epoll-uring" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#2ae71cebd8f032ebfa987b713a77bc3d23aac57c" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9" dependencies = [ "futures", "once_cell", @@ -5714,7 +5714,7 @@ dependencies = [ [[package]] name = "uring-common" version = "0.1.0" -source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#2ae71cebd8f032ebfa987b713a77bc3d23aac57c" +source = "git+https://github.com/neondatabase/tokio-epoll-uring.git?branch=main#82d74064c5019b0e9a8ae1bcdc75b0345d41bba9" dependencies = [ "io-uring", "libc", diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 070a794372cb..21ee1806eb8a 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -16,7 +16,7 @@ use camino::{Utf8Path, Utf8PathBuf}; use once_cell::sync::OnceCell; use std::fs::{self, File}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; -use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd}; +use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, OwnedFd, RawFd}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; @@ -106,7 +106,7 @@ struct SlotInner { tag: u64, /// the underlying file - file: Option, + file: Option, } /// Impl of [`tokio_epoll_uring::IoBuf`] and [`tokio_epoll_uring::IoBufMut`] for [`PageWriteGuard`]. @@ -305,6 +305,10 @@ macro_rules! with_file { let $ident = $this.lock_file().await?; observe_duration!($op, $($body)*) }}; + ($this:expr, $op:expr, | mut $ident:ident | $($body:tt)*) => {{ + let mut $ident = $this.lock_file().await?; + observe_duration!($op, $($body)*) + }}; } impl VirtualFile { @@ -401,7 +405,7 @@ impl VirtualFile { std::io::Error::new(std::io::ErrorKind::Other, system) } })?; - File::from(file) + file })); // Strip all options other than read and write. @@ -428,15 +432,13 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - with_file!(self, StorageIoOperation::Fsync, |file| file - .as_ref() - .sync_all()) + with_file!(self, StorageIoOperation::Fsync, |file_guard| file_guard + .with_std_file(|std_file| std_file.sync_all())) } pub async fn metadata(&self) -> Result { - with_file!(self, StorageIoOperation::Metadata, |file| file - .as_ref() - .metadata()) + with_file!(self, StorageIoOperation::Metadata, |file_guard| file_guard + .with_std_file(|std_file| std_file.metadata())) } /// Helper function internal to `VirtualFile` that looks up the underlying File, @@ -445,7 +447,7 @@ impl VirtualFile { /// /// We are doing it via a macro as Rust doesn't support async closures that /// take on parameters with lifetimes. - async fn lock_file(&self) -> Result, Error> { + async fn lock_file(&self) -> Result { let open_files = get_open_files(); let mut handle_guard = { @@ -502,7 +504,7 @@ impl VirtualFile { std::io::Error::new(std::io::ErrorKind::Other, system) } })?; - File::from(file) + file }); // Store the File in the slot and update the handle in the VirtualFile @@ -528,9 +530,8 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = with_file!(self, StorageIoOperation::Seek, |file| file - .as_ref() - .seek(SeekFrom::End(offset)))? + self.pos = with_file!(self, StorageIoOperation::Seek, |mut file_guard| file_guard + .with_std_file_mut(|std_file| std_file.seek(SeekFrom::End(offset))))? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -656,7 +657,7 @@ impl VirtualFile { } async fn read_at0( &self, - file_guard: FileGuard<'_>, + file_guard: FileGuard, buf: B, offset: u64, ) -> (B, Result) @@ -664,19 +665,7 @@ impl VirtualFile { B: tokio_epoll_uring::BoundedBufMut + Send, { let system = tokio_epoll_uring::thread_local_system().await; - // SAFETY: when file_guard gets dropped, the raw fd becomes invalid or may get re-used - // while the io_uring operation is still executing. - // The `file_guard` could get dropped due to future cancellation-by-drop. - // We prevent this situation using the scopeguard: it will abort the process in such cases. - // Fixing this is subject of https://github.com/neondatabase/neon/pull/6101 - let owned_fd = unsafe { OwnedFd::from_raw_fd(file_guard.as_ref().as_raw_fd()) }; - let guard = scopeguard::guard(file_guard, |_| { - eprintln!("must not drop future while operation is ongoing (todo: pass file_guard to tokio_epoll_uring to aovid this)"); - std::process::abort(); - }); - let ((owned_fd, buf), res) = system.read(owned_fd, offset, buf).await; - let _ = OwnedFd::into_raw_fd(owned_fd); - let _ = scopeguard::ScopeGuard::into_inner(guard); + let ((_file_guard, buf), res) = system.read(file_guard, offset, buf).await; if let Ok(size) = res { // TODO: don't use with_label_values on hot path // https://github.com/neondatabase/neon/issues/6107 @@ -688,9 +677,9 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = with_file!(self, StorageIoOperation::Write, |file| file - .as_ref() - .write_at(buf, offset)); + let result = with_file!(self, StorageIoOperation::Write, |file_guard| { + file_guard.with_std_file(|std_file| std_file.write_at(buf, offset)) + }); if let Ok(size) = result { // TODO: don't use with_label_values on hot path // https://github.com/neondatabase/neon/issues/6107 @@ -702,18 +691,54 @@ impl VirtualFile { } } -struct FileGuard<'a> { - slot_guard: RwLockReadGuard<'a, SlotInner>, +struct FileGuard { + slot_guard: RwLockReadGuard<'static, SlotInner>, } -impl<'a> AsRef for FileGuard<'a> { - fn as_ref(&self) -> &File { +impl AsRef for FileGuard { + fn as_ref(&self) -> &OwnedFd { // This unwrap is safe because we only create `FileGuard`s // if we know that the file is Some. self.slot_guard.file.as_ref().unwrap() } } +impl FileGuard { + // TODO: switch to tokio-epoll-uring native operations. + fn with_std_file(&self, with: F) -> R + where + F: FnOnce(&File) -> R, + { + // SAFETY: + // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`. + // - `&` usage below: `self` is `&`, hence Rust typesystem guarantees there are is no `&mut` + let file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) }; + let res = with(&file); + let _ = file.into_raw_fd(); + res + } + // TODO: switch to tokio-epoll-uring native operations. + fn with_std_file_mut(&mut self, with: F) -> R + where + F: FnOnce(&mut File) -> R, + { + // SAFETY: + // - lifetime of the fd: `file` doesn't outlive the OwnedFd stored in `self`. + // - &mut usage below: `self` is `&mut`, hence this call is the only task/thread that has control over the underlying fd + let mut file = unsafe { File::from_raw_fd(self.as_ref().as_raw_fd()) }; + let res = with(&mut file); + let _ = file.into_raw_fd(); + res + } +} + +impl tokio_epoll_uring::IoFd for FileGuard { + unsafe fn as_fd(&self) -> RawFd { + let owned_fd: &OwnedFd = self.as_ref(); + owned_fd.as_raw_fd() + } +} + #[cfg(test)] impl VirtualFile { pub(crate) async fn read_blk(