Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions fuzz/src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
Some(b) => b[0],
None => break,
};
match v % 13 {
match v % 12 {
// Sync write
0 => {
let data_value = get_next_data_value();
Expand All @@ -96,8 +96,7 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
},
// Sync remove
1 => {
KVStoreSync::remove(fs_store, primary_namespace, secondary_namespace, key, false)
.unwrap();
KVStoreSync::remove(fs_store, primary_namespace, secondary_namespace, key).unwrap();

current_data = None;
},
Expand Down Expand Up @@ -131,10 +130,8 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
handles.push(handle);
},
// Async remove
10 | 11 => {
let lazy = v == 10;
let fut =
KVStore::remove(fs_store, primary_namespace, secondary_namespace, key, lazy);
10 => {
let fut = KVStore::remove(fs_store, primary_namespace, secondary_namespace, key);

// Already set the current_data, even though writing hasn't finished yet. This supports the call-time
// ordering semantics.
Expand All @@ -144,7 +141,7 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
handles.push(handle);
},
// Join tasks.
12 => {
11 => {
for handle in handles.drain(..) {
let _ = handle.await.unwrap();
}
Expand Down
8 changes: 4 additions & 4 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -736,14 +736,14 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<()> { Ok(()) }
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
/// # }
/// # struct Store {}
/// # impl lightning::util::persist::KVStore for Store {
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
/// # }
/// # use core::time::Duration;
Expand Down Expand Up @@ -2135,9 +2135,9 @@ mod tests {
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> lightning::io::Result<()> {
self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
self.kv_store.remove(primary_namespace, secondary_namespace, key)
}

fn list(
Expand Down
1 change: 0 additions & 1 deletion lightning-liquidity/src/lsps2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1674,7 +1674,6 @@ where
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
true,
)
.await?;
}
Expand Down
1 change: 0 additions & 1 deletion lightning-liquidity/src/lsps5/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,6 @@ where
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
&key,
true,
)
.await?;
}
Expand Down
141 changes: 68 additions & 73 deletions lightning-persister/src/fs_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ impl KVStoreSync for FilesystemStore {
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> Result<(), lightning::io::Error> {
let path = self.inner.get_checked_dest_file_path(
primary_namespace,
Expand All @@ -134,7 +134,7 @@ impl KVStoreSync for FilesystemStore {
"remove",
)?;
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
self.inner.remove_version(inner_lock_ref, path, lazy, version)
self.inner.remove_version(inner_lock_ref, path, version)
}

fn list(
Expand Down Expand Up @@ -334,81 +334,76 @@ impl FilesystemStoreInner {
}

fn remove_version(
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, lazy: bool, version: u64,
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, version: u64,
) -> lightning::io::Result<()> {
self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || {
if !dest_file_path.is_file() {
return Ok(());
}

if lazy {
// If we're lazy we just call remove and be done with it.
// We try our best to persist the updated metadata to ensure
// atomicity of this call.
#[cfg(not(target_os = "windows"))]
{
fs::remove_file(&dest_file_path)?;
} else {
// If we're not lazy we try our best to persist the updated metadata to ensure
// atomicity of this call.
#[cfg(not(target_os = "windows"))]
{
fs::remove_file(&dest_file_path)?;

let parent_directory = dest_file_path.parent().ok_or_else(|| {
let msg = format!(
"Could not retrieve parent directory of {}.",
dest_file_path.display()
);
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
})?;
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
// to the inode might get cached (and hence possibly lost on crash), depending on
// the target platform and file system.
//
// In order to assert we permanently removed the file in question we therefore
// call `fsync` on the parent directory on platforms that support it.
dir_file.sync_all()?;
}

#[cfg(target_os = "windows")]
{
// Since Windows `DeleteFile` API is not persisted until the last open file handle
// is dropped, and there seemingly is no reliable way to flush the directory
// metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
// file to be deleted to a temporary trash file and remove the latter file
// afterwards.
//
// This should be marginally better, as, according to the documentation,
// `MoveFileExW` APIs should offer stronger persistence guarantees,
// at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
// However, all this is partially based on assumptions and local experiments, as
// Windows API is horribly underdocumented.
let mut trash_file_path = dest_file_path.clone();
let trash_file_ext =
format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
trash_file_path.set_extension(trash_file_ext);
let parent_directory = dest_file_path.parent().ok_or_else(|| {
let msg = format!(
"Could not retrieve parent directory of {}.",
dest_file_path.display()
);
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
})?;
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
// to the inode might get cached (and hence possibly lost on crash), depending on
// the target platform and file system.
//
// In order to assert we permanently removed the file in question we therefore
// call `fsync` on the parent directory on platforms that support it.
dir_file.sync_all()?;
}

call!(unsafe {
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
path_to_windows_str(&dest_file_path).as_ptr(),
path_to_windows_str(&trash_file_path).as_ptr(),
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
#[cfg(target_os = "windows")]
{
// Since Windows `DeleteFile` API is not persisted until the last open file handle
// is dropped, and there seemingly is no reliable way to flush the directory
// metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
// file to be deleted to a temporary trash file and remove the latter file
// afterwards.
//
// This should be marginally better, as, according to the documentation,
// `MoveFileExW` APIs should offer stronger persistence guarantees,
// at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
// However, all this is partially based on assumptions and local experiments, as
// Windows API is horribly underdocumented.
let mut trash_file_path = dest_file_path.clone();
let trash_file_ext =
format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
trash_file_path.set_extension(trash_file_ext);

call!(unsafe {
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
path_to_windows_str(&dest_file_path).as_ptr(),
path_to_windows_str(&trash_file_path).as_ptr(),
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
)
})?;

{
// We fsync the trash file in hopes this will also flush the original's file
// metadata to disk.
let trash_file = fs::OpenOptions::new()
.read(true)
.write(true)
.open(&trash_file_path.clone())?;
trash_file.sync_all()?;
}
)
})?;

// We're fine if this remove would fail as the trash file will be cleaned up in
// list eventually.
fs::remove_file(trash_file_path).ok();
{
// We fsync the trash file in hopes this will also flush the original's file
// metadata to disk.
let trash_file = fs::OpenOptions::new()
.read(true)
.write(true)
.open(&trash_file_path.clone())?;
trash_file.sync_all()?;
}

// We're fine if this remove would fail as the trash file will be cleaned up in
// list eventually.
fs::remove_file(trash_file_path).ok();
}

Ok(())
Expand Down Expand Up @@ -508,7 +503,7 @@ impl KVStore for FilesystemStore {
}

fn remove(
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
let this = Arc::clone(&self.inner);
let path = match this.get_checked_dest_file_path(
Expand All @@ -523,11 +518,11 @@ impl KVStore for FilesystemStore {

let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
Box::pin(async move {
tokio::task::spawn_blocking(move || {
this.remove_version(inner_lock_ref, path, lazy, version)
})
.await
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
tokio::task::spawn_blocking(move || this.remove_version(inner_lock_ref, path, version))
.await
.unwrap_or_else(|e| {
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
})
})
}

Expand Down Expand Up @@ -772,7 +767,7 @@ mod tests {
let fut1 = async_fs_store.write(primary_namespace, secondary_namespace, key, data1);
assert_eq!(fs_store.state_size(), 1);

let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key, false);
let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key);
assert_eq!(fs_store.state_size(), 1);

let fut3 = async_fs_store.write(primary_namespace, secondary_namespace, key, data2.clone());
Expand All @@ -799,7 +794,7 @@ mod tests {
assert_eq!(data2, &*read_data);

// Test remove.
async_fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
async_fs_store.remove(primary_namespace, secondary_namespace, key).await.unwrap();

let listed_keys =
async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions lightning-persister/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
let read_data = kv_store.read(primary_namespace, secondary_namespace, key).unwrap();
assert_eq!(data, &*read_data);

kv_store.remove(primary_namespace, secondary_namespace, key, false).unwrap();
kv_store.remove(primary_namespace, secondary_namespace, key).unwrap();

let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap();
assert_eq!(listed_keys.len(), 0);
Expand All @@ -57,7 +57,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap();
assert_eq!(data, &*read_data);

kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap();
kv_store.remove(&max_chars, &max_chars, &max_chars).unwrap();

let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
assert_eq!(listed_keys.len(), 0);
Expand Down
Loading
Loading