Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 16 additions & 109 deletions src/io/vss_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use bdk_chain::Merge;
use bitcoin::hashes::{sha256, Hash, HashEngine, Hmac, HmacEngine};
use lightning::impl_writeable_tlv_based_enum;
use lightning::io::{self, Error, ErrorKind};
Expand Down Expand Up @@ -244,11 +243,15 @@ impl KVStoreSync for VssStore {
primary_namespace,
secondary_namespace,
key,
lazy,
)
.await
};
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
if lazy {
internal_runtime.spawn(async { fut.await });
Ok(())
} else {
tokio::task::block_in_place(move || internal_runtime.block_on(fut))
}
}

fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result<Vec<String>> {
Expand Down Expand Up @@ -316,7 +319,7 @@ impl KVStore for VssStore {
let secondary_namespace = secondary_namespace.to_string();
let key = key.to_string();
let inner = Arc::clone(&self.inner);
Box::pin(async move {
let fut = async move {
inner
.remove_internal(
&inner.async_client,
Expand All @@ -326,10 +329,15 @@ impl KVStore for VssStore {
primary_namespace,
secondary_namespace,
key,
lazy,
)
.await
})
};
if lazy {
tokio::task::spawn(async { fut.await });
Box::pin(async { Ok(()) })
} else {
Box::pin(async { fut.await })
}
}
fn list(
&self, primary_namespace: &str, secondary_namespace: &str,
Expand Down Expand Up @@ -362,7 +370,6 @@ struct VssStoreInner {
// Per-key locks that ensures that we don't have concurrent writes to the same namespace/key.
// The lock also encapsulates the latest written version per key.
locks: Mutex<HashMap<String, Arc<tokio::sync::Mutex<u64>>>>,
pending_lazy_deletes: Mutex<Vec<KeyValue>>,
}

impl VssStoreInner {
Expand All @@ -372,7 +379,6 @@ impl VssStoreInner {
data_encryption_key: [u8; 32], key_obfuscator: KeyObfuscator,
) -> Self {
let locks = Mutex::new(HashMap::new());
let pending_lazy_deletes = Mutex::new(Vec::new());
Self {
schema_version,
blocking_client,
Expand All @@ -381,7 +387,6 @@ impl VssStoreInner {
data_encryption_key,
key_obfuscator,
locks,
pending_lazy_deletes,
}
}

Expand Down Expand Up @@ -520,12 +525,6 @@ impl VssStoreInner {
"write",
)?;

let delete_items = self
.pending_lazy_deletes
.try_lock()
.ok()
.and_then(|mut guard| guard.take())
.unwrap_or_default();
let store_key = self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);
let vss_version = -1;
let storable_builder = StorableBuilder::new(RandEntropySource);
Expand All @@ -541,16 +540,11 @@ impl VssStoreInner {
version: vss_version,
value: storable.encode_to_vec(),
}],
delete_items: delete_items.clone(),
delete_items: vec![],
};

self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
client.put_object(&request).await.map_err(|e| {
// Restore delete items so they'll be retried on next write.
if !delete_items.is_empty() {
self.pending_lazy_deletes.lock().unwrap().extend(delete_items);
}

let msg = format!(
"Failed to write to key {}/{}/{}: {}",
primary_namespace, secondary_namespace, key, e
Expand All @@ -566,7 +560,7 @@ impl VssStoreInner {
async fn remove_internal(
&self, client: &VssClient<CustomRetryPolicy>, inner_lock_ref: Arc<tokio::sync::Mutex<u64>>,
locking_key: String, version: u64, primary_namespace: String, secondary_namespace: String,
key: String, lazy: bool,
key: String,
) -> io::Result<()> {
check_namespace_key_validity(
&primary_namespace,
Expand All @@ -579,12 +573,6 @@ impl VssStoreInner {
self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key);

let key_value = KeyValue { key: obfuscated_key, version: -1, value: vec![] };
if lazy {
let mut pending_lazy_deletes = self.pending_lazy_deletes.lock().unwrap();
pending_lazy_deletes.push(key_value);
return Ok(());
}

self.execute_locked_write(inner_lock_ref, locking_key, version, async move || {
let request =
DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(key_value) };
Expand Down Expand Up @@ -851,85 +839,4 @@ mod tests {
do_read_write_remove_list_persist(&vss_store);
drop(vss_store)
}

#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn vss_lazy_delete() {
let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap();
let mut rng = rng();
let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect();
let mut vss_seed = [0u8; 32];
rng.fill_bytes(&mut vss_seed);
let header_provider = Arc::new(FixedHeaders::new(HashMap::new()));
let vss_store =
VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider).unwrap();

let primary_namespace = "test_namespace";
let secondary_namespace = "";
let key_to_delete = "key_to_delete";
let key_for_trigger = "key_for_trigger";
let data_to_delete = b"data_to_delete".to_vec();
let trigger_data = b"trigger_data".to_vec();

// Write the key that we'll later lazily delete
KVStore::write(
&vss_store,
primary_namespace,
secondary_namespace,
key_to_delete,
data_to_delete.clone(),
)
.await
.unwrap();

// Verify the key exists
let read_data =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete)
.await
.unwrap();
assert_eq!(read_data, data_to_delete);

// Perform a lazy delete
KVStore::remove(&vss_store, primary_namespace, secondary_namespace, key_to_delete, true)
.await
.unwrap();

// Verify the key still exists (lazy delete doesn't immediately remove it)
let read_data =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete)
.await
.unwrap();
assert_eq!(read_data, data_to_delete);

// Verify the key is still in the list
let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap();
assert!(keys.contains(&key_to_delete.to_string()));

// Trigger the actual deletion by performing a write operation
KVStore::write(
&vss_store,
primary_namespace,
secondary_namespace,
key_for_trigger,
trigger_data.clone(),
)
.await
.unwrap();

// Now verify the key is actually deleted
let read_result =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_to_delete).await;
assert!(read_result.is_err());
assert_eq!(read_result.unwrap_err().kind(), ErrorKind::NotFound);

// Verify the key is no longer in the list
let keys = KVStore::list(&vss_store, primary_namespace, secondary_namespace).await.unwrap();
assert!(!keys.contains(&key_to_delete.to_string()));

// Verify the trigger key still exists
let read_data =
KVStore::read(&vss_store, primary_namespace, secondary_namespace, key_for_trigger)
.await
.unwrap();
assert_eq!(read_data, trigger_data);
}
}