Skip to content

Commit d53d6b4

Browse files
authored
Merge pull request #4189 from TheBlueMatt/2025-10-lazy-again
Restore `lazy` flag to `KVStore::remove`
2 parents 28ef147 + 0f9548b commit d53d6b4

File tree

8 files changed

+138
-102
lines changed

8 files changed

+138
-102
lines changed

fuzz/src/fs_store.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
7878
Some(b) => b[0],
7979
None => break,
8080
};
81-
match v % 12 {
81+
match v % 13 {
8282
// Sync write
8383
0 => {
8484
let data_value = get_next_data_value();
@@ -96,7 +96,8 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
9696
},
9797
// Sync remove
9898
1 => {
99-
KVStoreSync::remove(fs_store, primary_namespace, secondary_namespace, key).unwrap();
99+
KVStoreSync::remove(fs_store, primary_namespace, secondary_namespace, key, false)
100+
.unwrap();
100101

101102
current_data = None;
102103
},
@@ -130,8 +131,10 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
130131
handles.push(handle);
131132
},
132133
// Async remove
133-
10 => {
134-
let fut = KVStore::remove(fs_store, primary_namespace, secondary_namespace, key);
134+
10 | 11 => {
135+
let lazy = v == 10;
136+
let fut =
137+
KVStore::remove(fs_store, primary_namespace, secondary_namespace, key, lazy);
135138

136139
// Already set the current_data, even though writing hasn't finished yet. This supports the call-time
137140
// ordering semantics.
@@ -141,7 +144,7 @@ async fn do_test_internal<Out: test_logger::Output>(data: &[u8], _out: Out) {
141144
handles.push(handle);
142145
},
143146
// Join tasks.
144-
11 => {
147+
12 => {
145148
for handle in handles.drain(..) {
146149
let _ = handle.await.unwrap();
147150
}

lightning-background-processor/src/lib.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -748,14 +748,14 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
748748
/// # impl lightning::util::persist::KVStoreSync for StoreSync {
749749
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<Vec<u8>> { Ok(Vec::new()) }
750750
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> io::Result<()> { Ok(()) }
751-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> io::Result<()> { Ok(()) }
751+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> io::Result<()> { Ok(()) }
752752
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> { Ok(Vec::new()) }
753753
/// # }
754754
/// # struct Store {}
755755
/// # impl lightning::util::persist::KVStore for Store {
756756
/// # fn read(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<Vec<u8>, io::Error>> + 'static + Send>> { todo!() }
757757
/// # fn write(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec<u8>) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
758-
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
758+
/// # fn remove(&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool) -> Pin<Box<dyn Future<Output = Result<(), io::Error>> + 'static + Send>> { todo!() }
759759
/// # fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> Pin<Box<dyn Future<Output = Result<Vec<String>, io::Error>> + 'static + Send>> { todo!() }
760760
/// # }
761761
/// # use core::time::Duration;
@@ -2144,9 +2144,9 @@ mod tests {
21442144
}
21452145

21462146
fn remove(
2147-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
2147+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
21482148
) -> lightning::io::Result<()> {
2149-
self.kv_store.remove(primary_namespace, secondary_namespace, key)
2149+
self.kv_store.remove(primary_namespace, secondary_namespace, key, lazy)
21502150
}
21512151

21522152
fn list(

lightning-liquidity/src/lsps2/service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1838,6 +1838,7 @@ where
18381838
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
18391839
LSPS2_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
18401840
&key,
1841+
true,
18411842
));
18421843
} else {
18431844
// If the peer got new state, force a re-persist of the current state.

lightning-liquidity/src/lsps5/service.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ where
297297
LIQUIDITY_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
298298
LSPS5_SERVICE_PERSISTENCE_SECONDARY_NAMESPACE,
299299
&key,
300+
true,
300301
));
301302
} else {
302303
// If the peer was re-added, force a re-persist of the current state.

lightning-persister/src/fs_store.rs

Lines changed: 73 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ impl KVStoreSync for FilesystemStore {
125125
}
126126

127127
fn remove(
128-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
128+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
129129
) -> Result<(), lightning::io::Error> {
130130
let path = self.inner.get_checked_dest_file_path(
131131
primary_namespace,
@@ -134,7 +134,7 @@ impl KVStoreSync for FilesystemStore {
134134
"remove",
135135
)?;
136136
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
137-
self.inner.remove_version(inner_lock_ref, path, version)
137+
self.inner.remove_version(inner_lock_ref, path, lazy, version)
138138
}
139139

140140
fn list(
@@ -334,76 +334,81 @@ impl FilesystemStoreInner {
334334
}
335335

336336
fn remove_version(
337-
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, version: u64,
337+
&self, inner_lock_ref: Arc<RwLock<u64>>, dest_file_path: PathBuf, lazy: bool, version: u64,
338338
) -> lightning::io::Result<()> {
339339
self.execute_locked_write(inner_lock_ref, dest_file_path.clone(), version, || {
340340
if !dest_file_path.is_file() {
341341
return Ok(());
342342
}
343343

344-
// We try our best to persist the updated metadata to ensure
345-
// atomicity of this call.
346-
#[cfg(not(target_os = "windows"))]
347-
{
344+
if lazy {
345+
// If we're lazy we just call remove and be done with it.
348346
fs::remove_file(&dest_file_path)?;
347+
} else {
348+
// If we're not lazy we try our best to persist the updated metadata to ensure
349+
// atomicity of this call.
350+
#[cfg(not(target_os = "windows"))]
351+
{
352+
fs::remove_file(&dest_file_path)?;
349353

350-
let parent_directory = dest_file_path.parent().ok_or_else(|| {
351-
let msg = format!(
352-
"Could not retrieve parent directory of {}.",
353-
dest_file_path.display()
354-
);
355-
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
356-
})?;
357-
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
358-
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
359-
// to the inode might get cached (and hence possibly lost on crash), depending on
360-
// the target platform and file system.
361-
//
362-
// In order to assert we permanently removed the file in question we therefore
363-
// call `fsync` on the parent directory on platforms that support it.
364-
dir_file.sync_all()?;
365-
}
354+
let parent_directory = dest_file_path.parent().ok_or_else(|| {
355+
let msg = format!(
356+
"Could not retrieve parent directory of {}.",
357+
dest_file_path.display()
358+
);
359+
std::io::Error::new(std::io::ErrorKind::InvalidInput, msg)
360+
})?;
361+
let dir_file = fs::OpenOptions::new().read(true).open(parent_directory)?;
362+
// The above call to `fs::remove_file` corresponds to POSIX `unlink`, whose changes
363+
// to the inode might get cached (and hence possibly lost on crash), depending on
364+
// the target platform and file system.
365+
//
366+
// In order to assert we permanently removed the file in question we therefore
367+
// call `fsync` on the parent directory on platforms that support it.
368+
dir_file.sync_all()?;
369+
}
366370

367-
#[cfg(target_os = "windows")]
368-
{
369-
// Since Windows `DeleteFile` API is not persisted until the last open file handle
370-
// is dropped, and there seemingly is no reliable way to flush the directory
371-
// metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
372-
// file to be deleted to a temporary trash file and remove the latter file
373-
// afterwards.
374-
//
375-
// This should be marginally better, as, according to the documentation,
376-
// `MoveFileExW` APIs should offer stronger persistence guarantees,
377-
// at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
378-
// However, all this is partially based on assumptions and local experiments, as
379-
// Windows API is horribly underdocumented.
380-
let mut trash_file_path = dest_file_path.clone();
381-
let trash_file_ext =
382-
format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
383-
trash_file_path.set_extension(trash_file_ext);
384-
385-
call!(unsafe {
386-
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
387-
path_to_windows_str(&dest_file_path).as_ptr(),
388-
path_to_windows_str(&trash_file_path).as_ptr(),
389-
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
371+
#[cfg(target_os = "windows")]
372+
{
373+
// Since Windows `DeleteFile` API is not persisted until the last open file handle
374+
// is dropped, and there seemingly is no reliable way to flush the directory
375+
// metadata, we here fall back to use a 'recycling bin' model, i.e., first move the
376+
// file to be deleted to a temporary trash file and remove the latter file
377+
// afterwards.
378+
//
379+
// This should be marginally better, as, according to the documentation,
380+
// `MoveFileExW` APIs should offer stronger persistence guarantees,
381+
// at least if `MOVEFILE_WRITE_THROUGH`/`MOVEFILE_REPLACE_EXISTING` is set.
382+
// However, all this is partially based on assumptions and local experiments, as
383+
// Windows API is horribly underdocumented.
384+
let mut trash_file_path = dest_file_path.clone();
385+
let trash_file_ext =
386+
format!("{}.trash", self.tmp_file_counter.fetch_add(1, Ordering::AcqRel));
387+
trash_file_path.set_extension(trash_file_ext);
388+
389+
call!(unsafe {
390+
windows_sys::Win32::Storage::FileSystem::MoveFileExW(
391+
path_to_windows_str(&dest_file_path).as_ptr(),
392+
path_to_windows_str(&trash_file_path).as_ptr(),
393+
windows_sys::Win32::Storage::FileSystem::MOVEFILE_WRITE_THROUGH
390394
| windows_sys::Win32::Storage::FileSystem::MOVEFILE_REPLACE_EXISTING,
391-
)
392-
})?;
395+
)
396+
})?;
397+
398+
{
399+
// We fsync the trash file in hopes this will also flush the original's file
400+
// metadata to disk.
401+
let trash_file = fs::OpenOptions::new()
402+
.read(true)
403+
.write(true)
404+
.open(&trash_file_path.clone())?;
405+
trash_file.sync_all()?;
406+
}
393407

394-
{
395-
// We fsync the trash file in hopes this will also flush the original's file
396-
// metadata to disk.
397-
let trash_file = fs::OpenOptions::new()
398-
.read(true)
399-
.write(true)
400-
.open(&trash_file_path.clone())?;
401-
trash_file.sync_all()?;
408+
// We're fine if this remove would fail as the trash file will be cleaned up in
409+
// list eventually.
410+
fs::remove_file(trash_file_path).ok();
402411
}
403-
404-
// We're fine if this remove would fail as the trash file will be cleaned up in
405-
// list eventually.
406-
fs::remove_file(trash_file_path).ok();
407412
}
408413

409414
Ok(())
@@ -503,7 +508,7 @@ impl KVStore for FilesystemStore {
503508
}
504509

505510
fn remove(
506-
&self, primary_namespace: &str, secondary_namespace: &str, key: &str,
511+
&self, primary_namespace: &str, secondary_namespace: &str, key: &str, lazy: bool,
507512
) -> Pin<Box<dyn Future<Output = Result<(), lightning::io::Error>> + 'static + Send>> {
508513
let this = Arc::clone(&self.inner);
509514
let path = match this.get_checked_dest_file_path(
@@ -518,11 +523,11 @@ impl KVStore for FilesystemStore {
518523

519524
let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(path.clone());
520525
Box::pin(async move {
521-
tokio::task::spawn_blocking(move || this.remove_version(inner_lock_ref, path, version))
522-
.await
523-
.unwrap_or_else(|e| {
524-
Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e))
525-
})
526+
tokio::task::spawn_blocking(move || {
527+
this.remove_version(inner_lock_ref, path, lazy, version)
528+
})
529+
.await
530+
.unwrap_or_else(|e| Err(lightning::io::Error::new(lightning::io::ErrorKind::Other, e)))
526531
})
527532
}
528533

@@ -767,7 +772,7 @@ mod tests {
767772
let fut1 = async_fs_store.write(primary_namespace, secondary_namespace, key, data1);
768773
assert_eq!(fs_store.state_size(), 1);
769774

770-
let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key);
775+
let fut2 = async_fs_store.remove(primary_namespace, secondary_namespace, key, false);
771776
assert_eq!(fs_store.state_size(), 1);
772777

773778
let fut3 = async_fs_store.write(primary_namespace, secondary_namespace, key, data2.clone());
@@ -794,7 +799,7 @@ mod tests {
794799
assert_eq!(data2, &*read_data);
795800

796801
// Test remove.
797-
async_fs_store.remove(primary_namespace, secondary_namespace, key).await.unwrap();
802+
async_fs_store.remove(primary_namespace, secondary_namespace, key, false).await.unwrap();
798803

799804
let listed_keys =
800805
async_fs_store.list(primary_namespace, secondary_namespace).await.unwrap();

lightning-persister/src/test_utils.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
4040
let read_data = kv_store.read(primary_namespace, secondary_namespace, key).unwrap();
4141
assert_eq!(data, &*read_data);
4242

43-
kv_store.remove(primary_namespace, secondary_namespace, key).unwrap();
43+
kv_store.remove(primary_namespace, secondary_namespace, key, false).unwrap();
4444

4545
let listed_keys = kv_store.list(primary_namespace, secondary_namespace).unwrap();
4646
assert_eq!(listed_keys.len(), 0);
@@ -57,7 +57,7 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
5757
let read_data = kv_store.read(&max_chars, &max_chars, &max_chars).unwrap();
5858
assert_eq!(data, &*read_data);
5959

60-
kv_store.remove(&max_chars, &max_chars, &max_chars).unwrap();
60+
kv_store.remove(&max_chars, &max_chars, &max_chars, false).unwrap();
6161

6262
let listed_keys = kv_store.list(&max_chars, &max_chars).unwrap();
6363
assert_eq!(listed_keys.len(), 0);

0 commit comments

Comments
 (0)