From 89aff7e93c3c2db51532ad1d31b9bd5ed53736e5 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 2 Dec 2025 08:34:12 +0100 Subject: [PATCH] Revert batched VSS `lazy` deletes, rather `spawn` them into the background Previously, we implemented `lazy` deletes in `VssStore` by batching them with the next write call as part of the next `PutObjectRequest` sent. However, we unfortunately overlooked that in this instance any non-existent `delete_items` would yield a `ConflictError`. Rather than batched `VssStore` lazy deletes, we therefore here opt to simply spawn them into the background and ignore any errors. --- src/io/vss_store.rs | 125 ++++++-------------------------------------- 1 file changed, 16 insertions(+), 109 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 2906b89ca..2fd1ab2ca 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; -use bdk_chain::Merge; use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine}; use lightning::impl_writeable_tlv_based_enum; use lightning::io::{self, Error, ErrorKind}; @@ -244,11 +243,15 @@ impl KVStoreSync for VssStore { primary_namespace, secondary_namespace, key, - lazy, ) .await }; - tokio::task::block_in_place(move || internal_runtime.block_on(fut)) + if lazy { + internal_runtime.spawn(async { fut.await }); + Ok(()) + } else { + tokio::task::block_in_place(move || internal_runtime.block_on(fut)) + } } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { @@ -316,7 +319,7 @@ impl KVStore for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { + let fut = async move { inner .remove_internal( &inner.async_client, @@ -326,10 +329,15 @@ impl KVStore for VssStore { primary_namespace, secondary_namespace, key, - lazy, ) .await - }) + }; + if lazy { + tokio::task::spawn(async { fut.await }); + Box::pin(async { Ok(()) }) + } else { + Box::pin(async { fut.await }) + } } fn list( &self, primary_namespace: &str, secondary_namespace: &str, @@ -362,7 +370,6 @@ struct VssStoreInner { // Per-key locks that ensures that we don't have concurrent writes to the same namespace/key. // The lock also encapsulates the latest written version per key. locks: Mutex>>>, - pending_lazy_deletes: Mutex>, } impl VssStoreInner { @@ -372,7 +379,6 @@ impl VssStoreInner { data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator, ) -> Self { let locks = Mutex::new(HashMap::new()); - let pending_lazy_deletes = Mutex::new(Vec::new()); Self { schema_version, blocking_client, @@ -381,7 +387,6 @@ impl VssStoreInner { data_encryption_key, key_obfuscator, locks, - pending_lazy_deletes, } } @@ -520,12 +525,6 @@ impl VssStoreInner { "write", )?; - let delete_items = self - .pending_lazy_deletes - .try_lock() - .ok() - .and_then(|mut guard| guard.take()) - .unwrap_or_default(); let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); let vss_version = -1; let storable_builder = StorableBuilder::new(RandEntropySource); @@ -541,16 +540,11 @@ impl VssStoreInner { version: vss_version, value: storable.encode_to_vec(), }], - delete_items: delete_items.clone(), + delete_items: vec![], }; self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { client.put_object(&request).await.map_err(|e| { - // Restore delete items so they'll be retried on next write. - if !delete_items.is_empty() { - self.pending_lazy_deletes.lock().unwrap().extend(delete_items); - } - let msg = format!( "Failed to write to key {}/{}/{}: {}", primary_namespace, secondary_namespace, key, e @@ -566,7 +560,7 @@ impl VssStoreInner { async fn remove_internal( &self, client: &VssClient, inner_lock_ref: Arc>, locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String, - key: String, lazy: bool, + key: String, ) -> io::Result<()> { check_namespace_key_validity( &primary_namespace, @@ -579,12 +573,6 @@ impl VssStoreInner { self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); let key_value = KeyValue { key: obfuscated_key, version: -1, value: vec![] }; - if lazy { - let mut pending_lazy_deletes = self.pending_lazy_deletes.lock().unwrap(); - pending_lazy_deletes.push(key_value); - return Ok(()); - } - self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { let request = DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(key_value) }; @@ -851,85 +839,4 @@ mod tests { do_read_write_remove_list_persist(&vss_store); drop(vss_store) } - - #[tokio::test(flavor = "multi_thread", worker_threads = 1)] - async fn vss_lazy_delete() { - let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); - let mut rng = rng(); - let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); - let mut vss_seed = [0u8; 32]; - rng.fill_bytes(&mut vss_seed); - let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); - let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap(); - - let primary_namespace = "test_namespace"; - let secondary_namespace = ""; - let key_to_delete = "key_to_delete"; - let key_for_trigger = "key_for_trigger"; - let data_to_delete = b"data_to_delete".to_vec(); - let trigger_data = b"trigger_data".to_vec(); - - // Write the key that we'll later lazily delete - KVStore::write( - &vss_store, - primary_namespace, - secondary_namespace, - key_to_delete, - data_to_delete.clone(), - ) - .await - .unwrap(); - - // Verify the key exists - let read_data = - KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete) - .await - .unwrap(); - assert_eq!(read_data, data_to_delete); - - // Perform a lazy delete - KVStore::remove(&vss_store, primary_namespace, secondary_namespace, key_to_delete, true) - .await - .unwrap(); - - // Verify the key still exists (lazy delete doesn't immediately remove it) - let read_data = - KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete) - .await - .unwrap(); - assert_eq!(read_data, data_to_delete); - - // Verify the key is still in the list - let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap(); - assert!(keys.contains(&key_to_delete.to_string())); - - // Trigger the actual deletion by performing a write operation - KVStore::write( - &vss_store, - primary_namespace, - secondary_namespace, - key_for_trigger, - trigger_data.clone(), - ) - .await - .unwrap(); - - // Now verify the key is actually deleted - let read_result = - KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete).await; - assert!(read_result.is_err()); - assert_eq!(read_result.unwrap_err().kind(), ErrorKind::NotFound); - - // Verify the key is no longer in the list - let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap(); - assert!(!keys.contains(&key_to_delete.to_string())); - - // Verify the trigger key still exists - let read_data = - KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_for_trigger) - .await - .unwrap(); - assert_eq!(read_data, trigger_data); - } }