diff --git a/pageserver/src/virtual_file.rs b/pageserver/src/virtual_file.rs index 10bed7ca0662..e549d208b259 100644 --- a/pageserver/src/virtual_file.rs +++ b/pageserver/src/virtual_file.rs @@ -18,7 +18,8 @@ use std::fs::{self, File, OpenOptions}; use std::io::{Error, ErrorKind, Seek, SeekFrom}; use std::os::unix::fs::FileExt; use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering}; -use std::sync::{RwLock, RwLockWriteGuard}; +use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard}; +use tokio::time::Instant; use utils::fs_ext; /// @@ -111,7 +112,7 @@ impl OpenFiles { /// /// On return, we hold a lock on the slot, and its 'tag' has been updated /// recently_used has been set. It's all ready for reuse. - fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { + async fn find_victim_slot(&self) -> (SlotHandle, RwLockWriteGuard) { // // Run the clock algorithm to find a slot to replace. // @@ -143,7 +144,7 @@ impl OpenFiles { } retries += 1; } else { - slot_guard = slot.inner.write().unwrap(); + slot_guard = slot.inner.write().await; index = next; break; } @@ -250,6 +251,29 @@ impl MaybeFatalIo for std::io::Result { } } +/// Observe duration for the given storage I/O operation +/// +/// Unlike `observe_closure_duration`, this supports async, +/// where "support" means that we measure wall clock time. +macro_rules! observe_duration { + ($op:expr, $($body:tt)*) => {{ + let instant = Instant::now(); + let result = $($body)*; + let elapsed = instant.elapsed().as_secs_f64(); + STORAGE_IO_TIME_METRIC + .get($op) + .observe(elapsed); + result + }} +} + +macro_rules! with_file { + ($this:expr, $op:expr, | $ident:ident | $($body:tt)*) => {{ + let $ident = $this.lock_file().await?; + observe_duration!($op, $($body)*) + }}; +} + impl VirtualFile { /// Open a file in read-only mode. Like File::open. pub async fn open(path: &Utf8Path) -> Result { @@ -286,14 +310,12 @@ impl VirtualFile { tenant_id = "*".to_string(); timeline_id = "*".to_string(); } - let (handle, mut slot_guard) = get_open_files().find_victim_slot(); + let (handle, mut slot_guard) = get_open_files().find_victim_slot().await; // NB: there is also StorageIoOperation::OpenAfterReplace which is for the case // where our caller doesn't get to use the returned VirtualFile before its // slot gets re-used by someone else. - let file = STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::Open) - .observe_closure_duration(|| open_options.open(path))?; + let file = observe_duration!(StorageIoOperation::Open, open_options.open(path))?; // Strip all options other than read and write. // @@ -366,22 +388,24 @@ impl VirtualFile { /// Call File::sync_all() on the underlying File. pub async fn sync_all(&self) -> Result<(), Error> { - self.with_file(StorageIoOperation::Fsync, |file| file.sync_all()) - .await? + with_file!(self, StorageIoOperation::Fsync, |file| file + .as_ref() + .sync_all()) } pub async fn metadata(&self) -> Result { - self.with_file(StorageIoOperation::Metadata, |file| file.metadata()) - .await? + with_file!(self, StorageIoOperation::Metadata, |file| file + .as_ref() + .metadata()) } - /// Helper function that looks up the underlying File for this VirtualFile, - /// opening it and evicting some other File if necessary. It calls 'func' - /// with the physical File. - async fn with_file(&self, op: StorageIoOperation, mut func: F) -> Result - where - F: FnMut(&File) -> R, - { + /// Helper function internal to `VirtualFile` that looks up the underlying File, + /// opens it and evicts some other File if necessary. The passed parameter is + /// assumed to be a function available for the physical `File`. + /// + /// We are doing it via a macro as Rust doesn't support async closures that + /// take on parameters with lifetimes. + async fn lock_file(&self) -> Result, Error> { let open_files = get_open_files(); let mut handle_guard = { @@ -391,27 +415,23 @@ impl VirtualFile { // We only need to hold the handle lock while we read the current handle. If // another thread closes the file and recycles the slot for a different file, // we will notice that the handle we read is no longer valid and retry. - let mut handle = *self.handle.read().unwrap(); + let mut handle = *self.handle.read().await; loop { // Check if the slot contains our File { let slot = &open_files.slots[handle.index]; - let slot_guard = slot.inner.read().unwrap(); - if slot_guard.tag == handle.tag { - if let Some(file) = &slot_guard.file { - // Found a cached file descriptor. - slot.recently_used.store(true, Ordering::Relaxed); - return Ok(STORAGE_IO_TIME_METRIC - .get(op) - .observe_closure_duration(|| func(file))); - } + let slot_guard = slot.inner.read().await; + if slot_guard.tag == handle.tag && slot_guard.file.is_some() { + // Found a cached file descriptor. + slot.recently_used.store(true, Ordering::Relaxed); + return Ok(FileGuard { slot_guard }); } } // The slot didn't contain our File. We will have to open it ourselves, // but before that, grab a write lock on handle in the VirtualFile, so // that no other thread will try to concurrently open the same file. - let handle_guard = self.handle.write().unwrap(); + let handle_guard = self.handle.write().await; // If another thread changed the handle while we were not holding the lock, // then the handle might now be valid again. Loop back to retry. @@ -425,20 +445,16 @@ impl VirtualFile { // We need to open the file ourselves. The handle in the VirtualFile is // now locked in write-mode. Find a free slot to put it in. - let (handle, mut slot_guard) = open_files.find_victim_slot(); + let (handle, mut slot_guard) = open_files.find_victim_slot().await; // Re-open the physical file. // NB: we use StorageIoOperation::OpenAferReplace for this to distinguish this // case from StorageIoOperation::Open. This helps with identifying thrashing // of the virtual file descriptor cache. - let file = STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::OpenAfterReplace) - .observe_closure_duration(|| self.open_options.open(&self.path))?; - - // Perform the requested operation on it - let result = STORAGE_IO_TIME_METRIC - .get(op) - .observe_closure_duration(|| func(&file)); + let file = observe_duration!( + StorageIoOperation::OpenAfterReplace, + self.open_options.open(&self.path) + )?; // Store the File in the slot and update the handle in the VirtualFile // to point to it. @@ -446,7 +462,9 @@ impl VirtualFile { *handle_guard = handle; - Ok(result) + return Ok(FileGuard { + slot_guard: slot_guard.downgrade(), + }); } pub fn remove(self) { @@ -461,11 +479,9 @@ impl VirtualFile { self.pos = offset; } SeekFrom::End(offset) => { - self.pos = self - .with_file(StorageIoOperation::Seek, |mut file| { - file.seek(SeekFrom::End(offset)) - }) - .await?? + self.pos = with_file!(self, StorageIoOperation::Seek, |file| file + .as_ref() + .seek(SeekFrom::End(offset)))? } SeekFrom::Current(offset) => { let pos = self.pos as i128 + offset as i128; @@ -553,9 +569,9 @@ impl VirtualFile { } pub async fn read_at(&self, buf: &mut [u8], offset: u64) -> Result { - let result = self - .with_file(StorageIoOperation::Read, |file| file.read_at(buf, offset)) - .await?; + let result = with_file!(self, StorageIoOperation::Read, |file| file + .as_ref() + .read_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["read", &self.tenant_id, &self.timeline_id]) @@ -565,9 +581,9 @@ impl VirtualFile { } async fn write_at(&self, buf: &[u8], offset: u64) -> Result { - let result = self - .with_file(StorageIoOperation::Write, |file| file.write_at(buf, offset)) - .await?; + let result = with_file!(self, StorageIoOperation::Write, |file| file + .as_ref() + .write_at(buf, offset)); if let Ok(size) = result { STORAGE_IO_SIZE .with_label_values(&["write", &self.tenant_id, &self.timeline_id]) @@ -577,6 +593,18 @@ impl VirtualFile { } } +struct FileGuard<'a> { + slot_guard: RwLockReadGuard<'a, SlotInner>, +} + +impl<'a> AsRef for FileGuard<'a> { + fn as_ref(&self) -> &File { + // This unwrap is safe because we only create `FileGuard`s + // if we know that the file is Some. + self.slot_guard.file.as_ref().unwrap() + } +} + #[cfg(test)] impl VirtualFile { pub(crate) async fn read_blk( @@ -609,22 +637,41 @@ impl VirtualFile { impl Drop for VirtualFile { /// If a VirtualFile is dropped, close the underlying file if it was open. fn drop(&mut self) { - let handle = self.handle.get_mut().unwrap(); - - // We could check with a read-lock first, to avoid waiting on an - // unrelated I/O. - let slot = &get_open_files().slots[handle.index]; - let mut slot_guard = slot.inner.write().unwrap(); - if slot_guard.tag == handle.tag { - slot.recently_used.store(false, Ordering::Relaxed); - // there is also operation "close-by-replace" for closes done on eviction for - // comparison. - if let Some(fd) = slot_guard.file.take() { - STORAGE_IO_TIME_METRIC - .get(StorageIoOperation::Close) - .observe_closure_duration(|| drop(fd)); + let handle = self.handle.get_mut(); + + fn clean_slot(slot: &Slot, mut slot_guard: RwLockWriteGuard<'_, SlotInner>, tag: u64) { + if slot_guard.tag == tag { + slot.recently_used.store(false, Ordering::Relaxed); + // there is also operation "close-by-replace" for closes done on eviction for + // comparison. + if let Some(fd) = slot_guard.file.take() { + STORAGE_IO_TIME_METRIC + .get(StorageIoOperation::Close) + .observe_closure_duration(|| drop(fd)); + } } } + + // We don't have async drop so we cannot directly await the lock here. + // Instead, first do a best-effort attempt at closing the underlying + // file descriptor by using `try_write`, and if that fails, spawn + // a tokio task to do it asynchronously: we just want it to be + // cleaned up eventually. + // Most of the time, the `try_lock` should succeed though, + // as we have `&mut self` access. In other words, if the slot + // is still occupied by our file, there should be no access from + // other I/O operations; the only other possible place to lock + // the slot is the lock algorithm looking for free slots. + let slot = &get_open_files().slots[handle.index]; + if let Ok(slot_guard) = slot.inner.try_write() { + clean_slot(slot, slot_guard, handle.tag); + } else { + let tag = handle.tag; + tokio::spawn(async move { + let slot_guard = slot.inner.write().await; + clean_slot(slot, slot_guard, tag); + }); + }; } }