diff --git a/fuzz/src/fs_store.rs b/fuzz/src/fs_store.rs index 821439f390e..0b6e2050bcf 100644 --- a/fuzz/src/fs_store.rs +++ b/fuzz/src/fs_store.rs @@ -78,7 +78,7 @@ async fn do_test_internal(data: &[u8], _out: Out) { Some(b) => b[0], None => break, }; - match v % 13 { + match v % 12 { // Sync write 0 => { let data_value = get_next_data_value(); @@ -96,8 +96,7 @@ async fn do_test_internal(data: &[u8], _out: Out) { }, // Sync remove 1 => { - KVStoreSync::remove(fs_store, primary_namespace, secondary_namespace, key, false) - .unwrap(); + KVStoreSync::remove(fs_store, primary_namespace, secondary_namespace, key).unwrap(); current_data = None; }, @@ -131,10 +130,8 @@ async fn do_test_internal(data: &[u8], _out: Out) { handles.push(handle); }, // Async remove - 10 | 11 => { - let lazy = v == 10; - let fut = - KVStore::remove(fs_store, primary_namespace, secondary_namespace, key, lazy); + 10 => { + let fut = KVStore::remove(fs_store, primary_namespace, secondary_namespace, key); // Already set the current_data, even though writing hasn't finished yet. This supports the call-time // ordering semantics. @@ -144,7 +141,7 @@ async fn do_test_internal(data: &[u8], _out: Out) { handles.push(handle); }, // Join tasks. - 12 => { + 11 => { for handle in handles.drain(..) { let _ = handle.await.unwrap(); } diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 44ce52b8291..2cd17cfdf33 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -736,14 +736,14 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp /// # impl lightning::util::persist::KVStoreSync for StoreSync { /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result> { Ok(Vec::new()) } /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec) -> io::Result<()> { Ok(()) } -/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) } +/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<()> { Ok(()) } /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { Ok(Vec::new()) } /// # } /// # struct Store {} /// # impl lightning::util::persist::KVStore for Store { /// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin, io::Error>> + 'static + Send>> { todo!() } /// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec) -> Pin> + 'static + Send>> { todo!() } -/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin> + 'static + Send>> { todo!() } +/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin> + 'static + Send>> { todo!() } /// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin, io::Error>> + 'static + Send>> { todo!() } /// # } /// # use core::time::Duration; @@ -2135,9 +2135,9 @@ mod tests { } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> lightning::io::Result<()> { - self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy) + self.kv_store.remove(primary_namespace, secondary_namespace, key) } fn list( diff --git a/lightning-liquidity/src/lsps2/service.rs b/lightning-liquidity/src/lsps2/service.rs index c8ad1288f6d..2d727acf16e 100644 --- a/lightning-liquidity/src/lsps2/service.rs +++ b/lightning-liquidity/src/lsps2/service.rs @@ -1674,7 +1674,6 @@ where LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, &key, - true, ) .await?; } diff --git a/lightning-liquidity/src/lsps5/service.rs b/lightning-liquidity/src/lsps5/service.rs index 7ac24ed38f2..4439130a056 100644 --- a/lightning-liquidity/src/lsps5/service.rs +++ b/lightning-liquidity/src/lsps5/service.rs @@ -272,7 +272,6 @@ where LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE, &key, - true, ) .await?; } diff --git a/lightning-persister/src/fs_store.rs b/lightning-persister/src/fs_store.rs index 9b15398d4d1..7055f2aa9f9 100644 --- a/lightning-persister/src/fs_store.rs +++ b/lightning-persister/src/fs_store.rs @@ -125,7 +125,7 @@ impl KVStoreSync for FilesystemStore { } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Result<(), lightning::io::Error> { let path = self.inner.get_checked_dest_file_path( primary_namespace, @@ -134,7 +134,7 @@ impl KVStoreSync for FilesystemStore { "remove", )?; let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); - self.inner.remove_version(inner_lock_ref, path, lazy, version) + self.inner.remove_version(inner_lock_ref, path, version) } fn list( @@ -334,81 +334,76 @@ impl FilesystemStoreInner { } fn remove_version( - &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, lazy: bool, version: u64, + &self, inner_lock_ref: Arc>, dest_file_path: PathBuf, version: u64, ) -> lightning::io::Result<()> { self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || { if !dest_file_path.is_file() { return Ok(()); } - if lazy { - // If we're lazy we just call remove and be done with it. + // We try our best to persist the updated metadata to ensure + // atomicity of this call. + #[cfg(not(target_os = "windows"))] + { fs::remove_file(&dest_file_path)?; - } else { - // If we're not lazy we try our best to persist the updated metadata to ensure - // atomicity of this call. - #[cfg(not(target_os = "windows"))] - { - fs::remove_file(&dest_file_path)?; - let parent_directory = dest_file_path.parent().ok_or_else(|| { - let msg = format!( - "Could not retrieve parent directory of {}.", - dest_file_path.display() - ); - std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) - })?; - let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; - // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes - // to the inode might get cached (and hence possibly lost on crash), depending on - // the target platform and file system. - // - // In order to assert we permanently removed the file in question we therefore - // call `fsync` on the parent directory on platforms that support it. - dir_file.sync_all()?; - } - - #[cfg(target_os = "windows")] - { - // Since Windows `DeleteFile` API is not persisted until the last open file handle - // is dropped, and there seemingly is no reliable way to flush the directory - // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the - // file to be deleted to a temporary trash file and remove the latter file - // afterwards. - // - // This should be marginally better, as, according to the documentation, - // `MoveFileExW` APIs should offer stronger persistence guarantees, - // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set. - // However, all this is partially based on assumptions and local experiments, as - // Windows API is horribly underdocumented. - let mut trash_file_path = dest_file_path.clone(); - let trash_file_ext = - format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); - trash_file_path.set_extension(trash_file_ext); + let parent_directory = dest_file_path.parent().ok_or_else(|| { + let msg = format!( + "Could not retrieve parent directory of {}.", + dest_file_path.display() + ); + std::io::Error::new(std::io::ErrorKind::InvalidInput, msg) + })?; + let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?; + // The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes + // to the inode might get cached (and hence possibly lost on crash), depending on + // the target platform and file system. + // + // In order to assert we permanently removed the file in question we therefore + // call `fsync` on the parent directory on platforms that support it. + dir_file.sync_all()?; + } - call!(unsafe { - windows_sys::Win32::Storage::FileSystem::MoveFileExW( - path_to_windows_str(&dest_file_path).as_ptr(), - path_to_windows_str(&trash_file_path).as_ptr(), - windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH + #[cfg(target_os = "windows")] + { + // Since Windows `DeleteFile` API is not persisted until the last open file handle + // is dropped, and there seemingly is no reliable way to flush the directory + // metadata, we here fall back to use a 'recycling bin' model, i.e., first move the + // file to be deleted to a temporary trash file and remove the latter file + // afterwards. + // + // This should be marginally better, as, according to the documentation, + // `MoveFileExW` APIs should offer stronger persistence guarantees, + // at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set. + // However, all this is partially based on assumptions and local experiments, as + // Windows API is horribly underdocumented. + let mut trash_file_path = dest_file_path.clone(); + let trash_file_ext = + format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel)); + trash_file_path.set_extension(trash_file_ext); + + call!(unsafe { + windows_sys::Win32::Storage::FileSystem::MoveFileExW( + path_to_windows_str(&dest_file_path).as_ptr(), + path_to_windows_str(&trash_file_path).as_ptr(), + windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH | windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING, - ) - })?; - - { - // We fsync the trash file in hopes this will also flush the original's file - // metadata to disk. - let trash_file = fs::OpenOptions::new() - .read(true) - .write(true) - .open(&trash_file_path.clone())?; - trash_file.sync_all()?; - } + ) + })?; - // We're fine if this remove would fail as the trash file will be cleaned up in - // list eventually. - fs::remove_file(trash_file_path).ok(); + { + // We fsync the trash file in hopes this will also flush the original's file + // metadata to disk. + let trash_file = fs::OpenOptions::new() + .read(true) + .write(true) + .open(&trash_file_path.clone())?; + trash_file.sync_all()?; } + + // We're fine if this remove would fail as the trash file will be cleaned up in + // list eventually. + fs::remove_file(trash_file_path).ok(); } Ok(()) @@ -508,7 +503,7 @@ impl KVStore for FilesystemStore { } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin> + 'static + Send>> { let this = Arc::clone(&self.inner); let path = match this.get_checked_dest_file_path( @@ -523,11 +518,11 @@ impl KVStore for FilesystemStore { let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone()); Box::pin(async move { - tokio::task::spawn_blocking(move || { - this.remove_version(inner_lock_ref, path, lazy, version) - }) - .await - .unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))) + tokio::task::spawn_blocking(move || this.remove_version(inner_lock_ref, path, version)) + .await + .unwrap_or_else(|e| { + Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)) + }) }) } @@ -772,7 +767,7 @@ mod tests { let fut1 = async_fs_store.write(primary_namespace, secondary_namespace, key, data1); assert_eq!(fs_store.state_size(), 1); - let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key, false); + let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key); assert_eq!(fs_store.state_size(), 1); let fut3 = async_fs_store.write(primary_namespace, secondary_namespace, key, data2.clone()); @@ -799,7 +794,7 @@ mod tests { assert_eq!(data2, &*read_data); // Test remove. - async_fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap(); + async_fs_store.remove(primary_namespace, secondary_namespace, key).await.unwrap(); let listed_keys = async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap(); diff --git a/lightning-persister/src/test_utils.rs b/lightning-persister/src/test_utils.rs index 636967a6937..0ef0242c419 100644 --- a/lightning-persister/src/test_utils.rs +++ b/lightning-persister/src/test_utils.rs @@ -40,7 +40,7 @@ pub(crate) fn do_read_write_remove_list_persist( let read_data = kv_store.read(primary_namespace, secondary_namespace, key).unwrap(); assert_eq!(data, &*read_data); - kv_store.remove(primary_namespace, secondary_namespace, key, false).unwrap(); + kv_store.remove(primary_namespace, secondary_namespace, key).unwrap(); let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap(); assert_eq!(listed_keys.len(), 0); @@ -57,7 +57,7 @@ pub(crate) fn do_read_write_remove_list_persist( let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap(); assert_eq!(data, &*read_data); - kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap(); + kv_store.remove(&max_chars, &max_chars, &max_chars).unwrap(); let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap(); assert_eq!(listed_keys.len(), 0); diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 9036a27f49c..f4b0248e81d 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -129,7 +129,7 @@ pub trait KVStoreSync { ) -> Result<(), io::Error>; /// A synchronous version of the [`KVStore::remove`] method. fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Result<(), io::Error>; /// A synchronous version of the [`KVStore::list`] method. fn list( @@ -174,9 +174,9 @@ where } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin> + 'static + Send>> { - let res = self.0.remove(primary_namespace, secondary_namespace, key, lazy); + let res = self.0.remove(primary_namespace, secondary_namespace, key); Box::pin(async move { res }) } @@ -244,21 +244,11 @@ pub trait KVStore { ) -> Pin> + 'static + Send>>; /// Removes any data that had previously been persisted under the given `key`. /// - /// If the `lazy` flag is set to `true`, the backend implementation might choose to lazily - /// remove the given `key` at some point in time after the method returns, e.g., as part of an - /// eventual batch deletion of multiple keys. As a consequence, subsequent calls to - /// [`KVStoreSync::list`] might include the removed key until the changes are actually persisted. - /// - /// Note that while setting the `lazy` flag reduces the I/O burden of multiple subsequent - /// `remove` calls, it also influences the atomicity guarantees as lazy `remove`s could - /// potentially get lost on crash after the method returns. Therefore, this flag should only be - /// set for `remove` operations that can be safely replayed at a later time. - /// /// Returns successfully if no data will be stored for the given `primary_namespace`, /// `secondary_namespace`, and `key`, independently of whether it was present before its /// invokation or not. fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin> + 'static + Send>>; /// Returns a list of keys that are stored under the given `secondary_namespace` in /// `primary_namespace`. @@ -362,7 +352,6 @@ impl Persist(future: F) -> F::Output { /// /// Stale updates are pruned when the consolidation threshold is reached according to `maximum_pending_updates`. /// Monitor updates in the range between the latest `update_id` and `update_id - maximum_pending_updates` -/// are deleted. -/// The `lazy` flag is used on the [`KVStoreSync::remove`] method, so there are no guarantees that the deletions -/// will complete. However, stale updates are not a problem for data integrity, since updates are -/// only read that are higher than the stored [`ChannelMonitor`]'s `update_id`. -/// -/// If you have many stale updates stored (such as after a crash with pending lazy deletes), and -/// would like to get rid of them, consider using the +/// are deleted. If you have many stale updates stored and would like to get rid of them, consider using the /// [`MonitorUpdatingPersister::cleanup_stale_updates`] function. pub struct MonitorUpdatingPersister( MonitorUpdatingPersisterAsync, PanicingSpawner, L, ES, SP, BI, FE>, @@ -620,10 +603,9 @@ where /// /// This function works by first listing all monitors, and then for each of them, listing all /// updates. The updates that have an `update_id` less than or equal to than the stored monitor - /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will - /// be passed to [`KVStoreSync::remove`]. - pub fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { - poll_sync_future(self.0.cleanup_stale_updates(lazy)) + /// are deleted. + pub fn cleanup_stale_updates(&self) -> Result<(), io::Error> { + poll_sync_future(self.0.cleanup_stale_updates()) } } @@ -837,10 +819,9 @@ where /// /// This function works by first listing all monitors, and then for each of them, listing all /// updates. The updates that have an `update_id` less than or equal to than the stored monitor - /// are deleted. The deletion can either be lazy or non-lazy based on the `lazy` flag; this will - /// be passed to [`KVStoreSync::remove`]. - pub async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { - self.0.cleanup_stale_updates(lazy).await + /// are deleted. + pub async fn cleanup_stale_updates(&self) -> Result<(), io::Error> { + self.0.cleanup_stale_updates().await } } @@ -1039,7 +1020,7 @@ where }) } - async fn cleanup_stale_updates(&self, lazy: bool) -> Result<(), io::Error> { + async fn cleanup_stale_updates(&self) -> Result<(), io::Error> { let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; let monitor_keys = self.kv_store.list(primary, secondary).await?; @@ -1047,13 +1028,13 @@ where let monitor_name = MonitorName::from_str(&monitor_key)?; let (_, current_monitor) = self.read_monitor(&monitor_name, &monitor_key).await?; let latest_update_id = current_monitor.get_latest_update_id(); - self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id, lazy).await?; + self.cleanup_stale_updates_for_monitor_to(&monitor_key, latest_update_id).await?; } Ok(()) } async fn cleanup_stale_updates_for_monitor_to( - &self, monitor_key: &str, latest_update_id: u64, lazy: bool, + &self, monitor_key: &str, latest_update_id: u64, ) -> Result<(), io::Error> { let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; let updates = self.kv_store.list(primary, monitor_key).await?; @@ -1061,7 +1042,7 @@ where let update_name = UpdateName::new(update)?; // if the update_id is lower than the stored monitor, delete if update_name.0 <= latest_update_id { - self.kv_store.remove(primary, monitor_key, update_name.as_str(), lazy).await?; + self.kv_store.remove(primary, monitor_key, update_name.as_str()).await?; } } Ok(()) @@ -1137,7 +1118,6 @@ where self.cleanup_stale_updates_for_monitor_to( &monitor_key, latest_update_id, - true, ) .await?; } else { @@ -1188,7 +1168,7 @@ where }; let primary = CHANNEL_MONITOR_PERSISTENCE_PRIMARY_NAMESPACE; let secondary = CHANNEL_MONITOR_PERSISTENCE_SECONDARY_NAMESPACE; - let _ = self.kv_store.remove(primary, secondary, &monitor_key, true).await; + let _ = self.kv_store.remove(primary, secondary, &monitor_key).await; } // Cleans up monitor updates for given monitor in range `start..=end`. @@ -1197,7 +1177,7 @@ where for update_id in start..=end { let update_name = UpdateName::from(update_id); let primary = CHANNEL_MONITOR_UPDATE_PERSISTENCE_PRIMARY_NAMESPACE; - let res = self.kv_store.remove(primary, &monitor_key, update_name.as_str(), true).await; + let res = self.kv_store.remove(primary, &monitor_key, update_name.as_str()).await; if let Err(e) = res { log_error!( self.logger, @@ -1786,7 +1766,7 @@ mod tests { .unwrap(); // Do the stale update cleanup - persister_0.cleanup_stale_updates(false).unwrap(); + persister_0.cleanup_stale_updates().unwrap(); // Confirm the stale update is unreadable/gone assert!(KVStoreSync::read( diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index deeb3a38e9f..aacc38e366a 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -965,7 +965,7 @@ impl TestStore { } fn remove_internal( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, _lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result<()> { if self.read_only { return Err(io::Error::new( @@ -1029,9 +1029,9 @@ impl KVStore for TestStore { Box::pin(OneShotChannel(future)) } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> Pin> + 'static + Send>> { - let res = self.remove_internal(&primary_namespace, &secondary_namespace, &key, lazy); + let res = self.remove_internal(&primary_namespace, &secondary_namespace, &key); Box::pin(async move { res }) } fn list( @@ -1079,9 +1079,9 @@ impl KVStoreSync for TestStore { } fn remove( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool, + &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result<()> { - self.remove_internal(primary_namespace, secondary_namespace, key, lazy) + self.remove_internal(primary_namespace, secondary_namespace, key) } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> {