From edff34ce6454654635f519f5e0bc68ca93c8ccbb Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 14 Oct 2025 11:20:11 +0200 Subject: [PATCH 1/7] Drop superfluous `cargo build` step .. as we're about to `cargo test` anyways. --- .github/workflows/vss-integration.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/vss-integration.yml b/.github/workflows/vss-integration.yml index 81b63fdf9..5f6e6065b 100644 --- a/.github/workflows/vss-integration.yml +++ b/.github/workflows/vss-integration.yml @@ -44,5 +44,4 @@ jobs: run: | cd ldk-node export TEST_VSS_BASE_URL="http://localhost:8080/vss" - RUSTFLAGS="--cfg vss_test" cargo build --verbose --color always RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss From ae98befd9c66c0f9086a203efb7bdd0eab0892cf Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Tue, 14 Oct 2025 13:03:05 +0200 Subject: [PATCH 2/7] Add missing error logs for `ReadFailed` cases .. some of the `ReadFailed` cases didn't log why they failed. Here we fix that oversight. --- src/builder.rs | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/builder.rs b/src/builder.rs index 0c843447a..195ac65c3 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -1138,6 +1138,7 @@ fn build_with_store_internal( if e.kind() == std::io::ErrorKind::NotFound { Arc::new(RwLock::new(NodeMetrics::default())) } else { + log_error!(logger, "Failed to read node metrics from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1201,7 +1202,8 @@ fn build_with_store_internal( Arc::clone(&kv_store), Arc::clone(&logger), )), - Err(_) => { + Err(e) => { + log_error!(logger, "Failed to read payment data from store: {}", e); return Err(BuildError::ReadFailed); }, }; @@ -1334,7 +1336,7 @@ fn build_with_store_internal( if e.kind() == lightning::io::ErrorKind::NotFound { Vec::new() } else { - log_error!(logger, "Failed to read channel monitors: {}", e.to_string()); + log_error!(logger, "Failed to read channel monitors from store: {}", e.to_string()); return Err(BuildError::ReadFailed); } }, @@ -1359,6 +1361,7 @@ fn build_with_store_internal( if e.kind() == std::io::ErrorKind::NotFound { Arc::new(Graph::new(config.network.into(), Arc::clone(&logger))) } else { + log_error!(logger, "Failed to read network graph from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1379,6 +1382,7 @@ fn build_with_store_internal( Arc::clone(&logger), ))) } else { + log_error!(logger, "Failed to read scoring data from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1448,7 +1452,7 @@ fn build_with_store_internal( ); let (_hash, channel_manager) = <(BlockHash, ChannelManager)>::read(&mut reader, read_args).map_err(|e| { - log_error!(logger, "Failed to read channel manager from KVStore: {}", e); + log_error!(logger, "Failed to read channel manager from store: {}", e); BuildError::ReadFailed })?; channel_manager @@ -1677,6 +1681,7 @@ fn build_with_store_internal( Arc::clone(&logger), )) } else { + log_error!(logger, "Failed to read output sweeper data from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1689,6 +1694,7 @@ fn build_with_store_internal( if e.kind() == std::io::ErrorKind::NotFound { Arc::new(EventQueue::new(Arc::clone(&kv_store), Arc::clone(&logger))) } else { + log_error!(logger, "Failed to read event queue from store: {}", e); return Err(BuildError::ReadFailed); } }, @@ -1700,6 +1706,7 @@ fn build_with_store_internal( if e.kind() == std::io::ErrorKind::NotFound { Arc::new(PeerStore::new(Arc::clone(&kv_store), Arc::clone(&logger))) } else { + log_error!(logger, "Failed to read peer data from store: {}", e); return Err(BuildError::ReadFailed); } }, From b8cf41c1b5d9936230b8c106ca135bf704348ac6 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 16 Oct 2025 10:47:20 +0200 Subject: [PATCH 3/7] Drop `Condvar` and use `block_on` for `wait_next_event` Given we regularly run into issues arising from mixing sync and async contexts, we here simplify our `EventQueue` implementation by avoiding to use `Condvar::wait_while` (which parks the current thread) and rather simply us `block_on` on our `next_event_async` method. --- src/event.rs | 18 +++--------------- src/lib.rs | 5 ++++- 2 files changed, 7 insertions(+), 16 deletions(-) diff --git a/src/event.rs b/src/event.rs index db6ef13f1..c9881fc22 100644 --- a/src/event.rs +++ b/src/event.rs @@ -9,7 +9,7 @@ use core::future::Future; use core::task::{Poll, Waker}; use std::collections::VecDeque; use std::ops::Deref; -use std::sync::{Arc, Condvar, Mutex}; +use std::sync::{Arc, Mutex}; use bitcoin::blockdata::locktime::absolute::LockTime; use bitcoin::secp256k1::PublicKey; @@ -287,7 +287,6 @@ where { queue: Arc>>, waker: Arc>>, - notifier: Condvar, kv_store: Arc, logger: L, } @@ -299,8 +298,7 @@ where pub(crate) fn new(kv_store: Arc, logger: L) -> Self { let queue = Arc::new(Mutex::new(VecDeque::new())); let waker = Arc::new(Mutex::new(None)); - let notifier = Condvar::new(); - Self { queue, waker, notifier, kv_store, logger } + Self { queue, waker, kv_store, logger } } pub(crate) fn add_event(&self, event: Event) -> Result<(), Error> { @@ -310,8 +308,6 @@ where self.persist_queue(&locked_queue)?; } - self.notifier.notify_one(); - if let Some(waker) = self.waker.lock().unwrap().take() { waker.wake(); } @@ -327,19 +323,12 @@ where EventFuture { event_queue: Arc::clone(&self.queue), waker: Arc::clone(&self.waker) }.await } - pub(crate) fn wait_next_event(&self) -> Event { - let locked_queue = - self.notifier.wait_while(self.queue.lock().unwrap(), |queue| queue.is_empty()).unwrap(); - locked_queue.front().unwrap().clone() - } - pub(crate) fn event_handled(&self) -> Result<(), Error> { { let mut locked_queue = self.queue.lock().unwrap(); locked_queue.pop_front(); self.persist_queue(&locked_queue)?; } - self.notifier.notify_one(); if let Some(waker) = self.waker.lock().unwrap().take() { waker.wake(); @@ -383,8 +372,7 @@ where let read_queue: EventQueueDeserWrapper = Readable::read(reader)?; let queue = Arc::new(Mutex::new(read_queue.0)); let waker = Arc::new(Mutex::new(None)); - let notifier = Condvar::new(); - Ok(Self { queue, waker, notifier, kv_store, logger }) + Ok(Self { queue, waker, kv_store, logger }) } } diff --git a/src/lib.rs b/src/lib.rs index f07b2def3..2ea704d27 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -749,7 +749,10 @@ impl Node { /// **Caution:** Users must handle events as quickly as possible to prevent a large event backlog, /// which can increase the memory footprint of [`Node`]. pub fn wait_next_event(&self) -> Event { - self.event_queue.wait_next_event() + let fut = self.event_queue.next_event_async(); + // We use our runtime for the sync variant to ensure `tokio::task::block_in_place` is + // always called if we'd ever hit this in an outer runtime context. + self.runtime.block_on(fut) } /// Confirm the last retrieved event handled. From 05dab40f64071875d9471574b91366e2625883a9 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Thu, 16 Oct 2025 11:38:55 +0200 Subject: [PATCH 4/7] Async'ify our test suite .. as LDK Node is moving towards a more `async` core, it starts to make sense to switch our test suite over to be `async`. This change should make our CI more efficient (as not every node will spawn its independent runtime, but we just have one runtime per test created) and also makes sure we won't run into any edge cases arising from blocking test threads that are executing other async tasks. --- src/event.rs | 30 +-- tests/common/mod.rs | 100 +++++----- tests/integration_tests_cln.rs | 15 +- tests/integration_tests_lnd.rs | 9 +- tests/integration_tests_rust.rs | 319 +++++++++++++++++--------------- tests/integration_tests_vss.rs | 7 +- tests/reorg_test.rs | 314 ++++++++++++++++--------------- 7 files changed, 402 insertions(+), 392 deletions(-) diff --git a/src/event.rs b/src/event.rs index c9881fc22..eedfb1c14 100644 --- a/src/event.rs +++ b/src/event.rs @@ -1625,7 +1625,6 @@ mod tests { // Check we get the expected event and that it is returned until we mark it handled. for _ in 0..5 { - assert_eq!(event_queue.wait_next_event(), expected_event); assert_eq!(event_queue.next_event_async().await, expected_event); assert_eq!(event_queue.next_event(), Some(expected_event.clone())); } @@ -1640,7 +1639,7 @@ mod tests { .unwrap(); let deser_event_queue = EventQueue::read(&mut &persisted_bytes[..], (Arc::clone(&store), logger)).unwrap(); - assert_eq!(deser_event_queue.wait_next_event(), expected_event); + assert_eq!(deser_event_queue.next_event_async().await, expected_event); event_queue.event_handled().unwrap(); assert_eq!(event_queue.next_event(), None); @@ -1709,32 +1708,5 @@ mod tests { } } assert_eq!(event_queue.next_event(), None); - - // Check we operate correctly, even when mixing and matching blocking and async API calls. - let (tx, mut rx) = tokio::sync::watch::channel(()); - let thread_queue = Arc::clone(&event_queue); - let thread_event = expected_event.clone(); - std::thread::spawn(move || { - let e = thread_queue.wait_next_event(); - assert_eq!(e, thread_event); - thread_queue.event_handled().unwrap(); - tx.send(()).unwrap(); - }); - - let thread_queue = Arc::clone(&event_queue); - let thread_event = expected_event.clone(); - std::thread::spawn(move || { - // Sleep a bit before we enqueue the events everybody is waiting for. - std::thread::sleep(Duration::from_millis(20)); - thread_queue.add_event(thread_event.clone()).unwrap(); - thread_queue.add_event(thread_event.clone()).unwrap(); - }); - - let e = event_queue.next_event_async().await; - assert_eq!(e, expected_event.clone()); - event_queue.event_handled().unwrap(); - - rx.changed().await.unwrap(); - assert_eq!(event_queue.next_event(), None); } } diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 4d02895c7..05326b03d 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -49,7 +49,7 @@ use serde_json::{json, Value}; macro_rules! expect_event { ($node:expr, $event_type:ident) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::$event_type { .. } => { println!("{} got event {:?}", $node.node_id(), e); $node.event_handled().unwrap(); @@ -65,7 +65,7 @@ pub(crate) use expect_event; macro_rules! expect_channel_pending_event { ($node:expr, $counterparty_node_id:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::ChannelPending { funding_txo, counterparty_node_id, .. } => { println!("{} got event {:?}", $node.node_id(), e); assert_eq!(counterparty_node_id, $counterparty_node_id); @@ -83,7 +83,7 @@ pub(crate) use expect_channel_pending_event; macro_rules! expect_channel_ready_event { ($node:expr, $counterparty_node_id:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::ChannelReady { user_channel_id, counterparty_node_id, .. } => { println!("{} got event {:?}", $node.node_id(), e); assert_eq!(counterparty_node_id, Some($counterparty_node_id)); @@ -101,7 +101,7 @@ pub(crate) use expect_channel_ready_event; macro_rules! expect_payment_received_event { ($node:expr, $amount_msat:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::PaymentReceived { payment_id, amount_msat, .. } => { println!("{} got event {:?}", $node.node_id(), e); assert_eq!(amount_msat, $amount_msat); @@ -123,7 +123,7 @@ pub(crate) use expect_payment_received_event; macro_rules! expect_payment_claimable_event { ($node:expr, $payment_id:expr, $payment_hash:expr, $claimable_amount_msat:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::PaymentClaimable { payment_id, payment_hash, @@ -148,7 +148,7 @@ pub(crate) use expect_payment_claimable_event; macro_rules! expect_payment_successful_event { ($node:expr, $payment_id:expr, $fee_paid_msat:expr) => {{ - match $node.wait_next_event() { + match $node.next_event_async().await { ref e @ Event::PaymentSuccessful { payment_id, fee_paid_msat, .. } => { println!("{} got event {:?}", $node.node_id(), e); if let Some(fee_msat) = $fee_paid_msat { @@ -389,7 +389,7 @@ pub(crate) fn setup_node_for_async_payments( node } -pub(crate) fn generate_blocks_and_wait( +pub(crate) async fn generate_blocks_and_wait( bitcoind: &BitcoindClient, electrs: &E, num: usize, ) { let _ = bitcoind.create_wallet("ldk_node_test"); @@ -400,7 +400,7 @@ pub(crate) fn generate_blocks_and_wait( let address = bitcoind.new_address().expect("failed to get new address"); // TODO: expect this Result once the WouldBlock issue is resolved upstream. let _block_hashes_res = bitcoind.generate_to_address(num, &address); - wait_for_block(electrs, cur_height as usize + num); + wait_for_block(electrs, cur_height as usize + num).await; print!(" Done!"); println!("\n"); } @@ -420,14 +420,14 @@ pub(crate) fn invalidate_blocks(bitcoind: &BitcoindClient, num_blocks: usize) { assert!(new_cur_height + num_blocks == cur_height); } -pub(crate) fn wait_for_block(electrs: &E, min_height: usize) { +pub(crate) async fn wait_for_block(electrs: &E, min_height: usize) { let mut header = match electrs.block_headers_subscribe() { Ok(header) => header, Err(_) => { // While subscribing should succeed the first time around, we ran into some cases where // it didn't. Since we can't proceed without subscribing, we try again after a delay // and panic if it still fails. - std::thread::sleep(Duration::from_secs(3)); + tokio::time::sleep(Duration::from_secs(3)).await; electrs.block_headers_subscribe().expect("failed to subscribe to block headers") }, }; @@ -438,11 +438,12 @@ pub(crate) fn wait_for_block(electrs: &E, min_height: usize) { header = exponential_backoff_poll(|| { electrs.ping().expect("failed to ping electrs"); electrs.block_headers_pop().expect("failed to pop block header") - }); + }) + .await; } } -pub(crate) fn wait_for_tx(electrs: &E, txid: Txid) { +pub(crate) async fn wait_for_tx(electrs: &E, txid: Txid) { if electrs.transaction_get(&txid).is_ok() { return; } @@ -450,10 +451,11 @@ pub(crate) fn wait_for_tx(electrs: &E, txid: Txid) { exponential_backoff_poll(|| { electrs.ping().unwrap(); electrs.transaction_get(&txid).ok() - }); + }) + .await; } -pub(crate) fn wait_for_outpoint_spend(electrs: &E, outpoint: OutPoint) { +pub(crate) async fn wait_for_outpoint_spend(electrs: &E, outpoint: OutPoint) { let tx = electrs.transaction_get(&outpoint.txid).unwrap(); let txout_script = tx.output.get(outpoint.vout as usize).unwrap().clone().script_pubkey; @@ -467,10 +469,11 @@ pub(crate) fn wait_for_outpoint_spend(electrs: &E, outpoint: Out let is_spent = !electrs.script_get_history(&txout_script).unwrap().is_empty(); is_spent.then_some(()) - }); + }) + .await; } -pub(crate) fn exponential_backoff_poll(mut poll: F) -> T +pub(crate) async fn exponential_backoff_poll(mut poll: F) -> T where F: FnMut() -> Option, { @@ -487,26 +490,26 @@ where } assert!(tries < 20, "Reached max tries."); tries += 1; - std::thread::sleep(delay); + tokio::time::sleep(delay).await; } } -pub(crate) fn premine_and_distribute_funds( +pub(crate) async fn premine_and_distribute_funds( bitcoind: &BitcoindClient, electrs: &E, addrs: Vec
, amount: Amount, ) { - premine_blocks(bitcoind, electrs); + premine_blocks(bitcoind, electrs).await; - distribute_funds_unconfirmed(bitcoind, electrs, addrs, amount); - generate_blocks_and_wait(bitcoind, electrs, 1); + distribute_funds_unconfirmed(bitcoind, electrs, addrs, amount).await; + generate_blocks_and_wait(bitcoind, electrs, 1).await; } -pub(crate) fn premine_blocks(bitcoind: &BitcoindClient, electrs: &E) { +pub(crate) async fn premine_blocks(bitcoind: &BitcoindClient, electrs: &E) { let _ = bitcoind.create_wallet("ldk_node_test"); let _ = bitcoind.load_wallet("ldk_node_test"); - generate_blocks_and_wait(bitcoind, electrs, 101); + generate_blocks_and_wait(bitcoind, electrs, 101).await; } -pub(crate) fn distribute_funds_unconfirmed( +pub(crate) async fn distribute_funds_unconfirmed( bitcoind: &BitcoindClient, electrs: &E, addrs: Vec
, amount: Amount, ) -> Txid { let mut amounts = HashMap::::new(); @@ -524,7 +527,7 @@ pub(crate) fn distribute_funds_unconfirmed( .parse() .unwrap(); - wait_for_tx(electrs, txid); + wait_for_tx(electrs, txid).await; txid } @@ -543,7 +546,7 @@ pub(crate) fn prepare_rbf( (tx, fee_output_index) } -pub(crate) fn bump_fee_and_broadcast( +pub(crate) async fn bump_fee_and_broadcast( bitcoind: &BitcoindClient, electrs: &E, mut tx: Transaction, fee_output_index: usize, is_insert_block: bool, ) -> Transaction { @@ -573,10 +576,10 @@ pub(crate) fn bump_fee_and_broadcast( match bitcoind.send_raw_transaction(&tx) { Ok(res) => { if is_insert_block { - generate_blocks_and_wait(bitcoind, electrs, 1); + generate_blocks_and_wait(bitcoind, electrs, 1).await; } let new_txid: Txid = res.0.parse().unwrap(); - wait_for_tx(electrs, new_txid); + wait_for_tx(electrs, new_txid).await; return tx; }, Err(_) => { @@ -591,14 +594,14 @@ pub(crate) fn bump_fee_and_broadcast( panic!("Failed to bump fee after {} attempts", attempts); } -pub fn open_channel( +pub async fn open_channel( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { - open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd) + open_channel_push_amt(node_a, node_b, funding_amount_sat, None, should_announce, electrsd).await } -pub fn open_channel_push_amt( +pub async fn open_channel_push_amt( node_a: &TestNode, node_b: &TestNode, funding_amount_sat: u64, push_amount_msat: Option, should_announce: bool, electrsd: &ElectrsD, ) -> OutPoint { @@ -628,12 +631,12 @@ pub fn open_channel_push_amt( let funding_txo_a = expect_channel_pending_event!(node_a, node_b.node_id()); let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); - wait_for_tx(&electrsd.client, funding_txo_a.txid); + wait_for_tx(&electrsd.client, funding_txo_a.txid).await; funding_txo_a } -pub(crate) fn do_channel_full_cycle( +pub(crate) async fn do_channel_full_cycle( node_a: TestNode, node_b: TestNode, bitcoind: &BitcoindClient, electrsd: &E, allow_0conf: bool, expect_anchor_channel: bool, force_close: bool, ) { @@ -647,7 +650,8 @@ pub(crate) fn do_channel_full_cycle( electrsd, vec![addr_a, addr_b], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); @@ -706,10 +710,10 @@ pub(crate) fn do_channel_full_cycle( let funding_txo_b = expect_channel_pending_event!(node_b, node_a.node_id()); assert_eq!(funding_txo_a, funding_txo_b); - wait_for_tx(electrsd, funding_txo_a.txid); + wait_for_tx(electrsd, funding_txo_a.txid).await; if !allow_0conf { - generate_blocks_and_wait(&bitcoind, electrsd, 6); + generate_blocks_and_wait(&bitcoind, electrsd, 6).await; } node_a.sync_wallets().unwrap(); @@ -839,7 +843,7 @@ pub(crate) fn do_channel_full_cycle( let payment_id = node_a.bolt11_payment().send_using_amount(&invoice, overpaid_amount_msat, None).unwrap(); expect_event!(node_a, PaymentSuccessful); - let received_amount = match node_b.wait_next_event() { + let received_amount = match node_b.next_event_async().await { ref e @ Event::PaymentReceived { amount_msat, .. } => { println!("{} got event {:?}", std::stringify!(node_b), e); node_b.event_handled().unwrap(); @@ -877,7 +881,7 @@ pub(crate) fn do_channel_full_cycle( .unwrap(); expect_event!(node_a, PaymentSuccessful); - let received_amount = match node_b.wait_next_event() { + let received_amount = match node_b.next_event_async().await { ref e @ Event::PaymentReceived { amount_msat, .. } => { println!("{} got event {:?}", std::stringify!(node_b), e); node_b.event_handled().unwrap(); @@ -999,7 +1003,7 @@ pub(crate) fn do_channel_full_cycle( .send_with_custom_tlvs(keysend_amount_msat, node_b.node_id(), None, custom_tlvs.clone()) .unwrap(); expect_event!(node_a, PaymentSuccessful); - let next_event = node_b.wait_next_event(); + let next_event = node_b.next_event_async().await; let (received_keysend_amount, received_custom_records) = match next_event { ref e @ Event::PaymentReceived { amount_msat, ref custom_records, .. } => { println!("{} got event {:?}", std::stringify!(node_b), e); @@ -1049,7 +1053,7 @@ pub(crate) fn do_channel_full_cycle( println!("\nB close_channel (force: {})", force_close); if force_close { - std::thread::sleep(Duration::from_secs(1)); + tokio::time::sleep(Duration::from_secs(1)).await; node_a.force_close_channel(&user_channel_id, node_b.node_id(), None).unwrap(); } else { node_a.close_channel(&user_channel_id, node_b.node_id()).unwrap(); @@ -1058,9 +1062,9 @@ pub(crate) fn do_channel_full_cycle( expect_event!(node_a, ChannelClosed); expect_event!(node_b, ChannelClosed); - wait_for_outpoint_spend(electrsd, funding_txo_b); + wait_for_outpoint_spend(electrsd, funding_txo_b).await; - generate_blocks_and_wait(&bitcoind, electrsd, 1); + generate_blocks_and_wait(&bitcoind, electrsd, 1).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1076,7 +1080,7 @@ pub(crate) fn do_channel_full_cycle( assert_eq!(counterparty_node_id, node_a.node_id()); let cur_height = node_b.status().current_best_block.height; let blocks_to_go = confirmation_height - cur_height; - generate_blocks_and_wait(&bitcoind, electrsd, blocks_to_go as usize); + generate_blocks_and_wait(&bitcoind, electrsd, blocks_to_go as usize).await; node_b.sync_wallets().unwrap(); node_a.sync_wallets().unwrap(); }, @@ -1089,7 +1093,7 @@ pub(crate) fn do_channel_full_cycle( PendingSweepBalance::BroadcastAwaitingConfirmation { .. } => {}, _ => panic!("Unexpected balance state!"), } - generate_blocks_and_wait(&bitcoind, electrsd, 1); + generate_blocks_and_wait(&bitcoind, electrsd, 1).await; node_b.sync_wallets().unwrap(); node_a.sync_wallets().unwrap(); @@ -1099,7 +1103,7 @@ pub(crate) fn do_channel_full_cycle( PendingSweepBalance::AwaitingThresholdConfirmations { .. } => {}, _ => panic!("Unexpected balance state!"), } - generate_blocks_and_wait(&bitcoind, electrsd, 5); + generate_blocks_and_wait(&bitcoind, electrsd, 5).await; node_b.sync_wallets().unwrap(); node_a.sync_wallets().unwrap(); @@ -1117,7 +1121,7 @@ pub(crate) fn do_channel_full_cycle( assert_eq!(counterparty_node_id, node_b.node_id()); let cur_height = node_a.status().current_best_block.height; let blocks_to_go = confirmation_height - cur_height; - generate_blocks_and_wait(&bitcoind, electrsd, blocks_to_go as usize); + generate_blocks_and_wait(&bitcoind, electrsd, blocks_to_go as usize).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); }, @@ -1130,7 +1134,7 @@ pub(crate) fn do_channel_full_cycle( PendingSweepBalance::BroadcastAwaitingConfirmation { .. } => {}, _ => panic!("Unexpected balance state!"), } - generate_blocks_and_wait(&bitcoind, electrsd, 1); + generate_blocks_and_wait(&bitcoind, electrsd, 1).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1140,7 +1144,7 @@ pub(crate) fn do_channel_full_cycle( PendingSweepBalance::AwaitingThresholdConfirmations { .. } => {}, _ => panic!("Unexpected balance state!"), } - generate_blocks_and_wait(&bitcoind, electrsd, 5); + generate_blocks_and_wait(&bitcoind, electrsd, 5).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); } diff --git a/tests/integration_tests_cln.rs b/tests/integration_tests_cln.rs index 6fc72b2c2..38e345f15 100644 --- a/tests/integration_tests_cln.rs +++ b/tests/integration_tests_cln.rs @@ -25,8 +25,8 @@ use lightning_invoice::{Bolt11Invoice, Bolt11InvoiceDescription, Description}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; -#[test] -fn test_cln() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_cln() { // Setup bitcoind / electrs clients let bitcoind_client = BitcoindClient::new_with_auth( "http://127.0.0.1:18443", @@ -36,7 +36,7 @@ fn test_cln() { let electrs_client = ElectrumClient::new("tcp://127.0.0.1:50001").unwrap(); // Give electrs a kick. - common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1); + common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1).await; // Setup LDK Node let config = common::random_config(true); @@ -54,7 +54,8 @@ fn test_cln() { &electrs_client, vec![address], premine_amount, - ); + ) + .await; // Setup CLN let sock = "/tmp/lightning-rpc"; @@ -67,7 +68,7 @@ fn test_cln() { if info.blockheight > 0 { break info; } - std::thread::sleep(std::time::Duration::from_millis(250)); + tokio::time::sleep(std::time::Duration::from_millis(250)).await; } }; let cln_node_id = PublicKey::from_str(&cln_info.id).unwrap(); @@ -92,8 +93,8 @@ fn test_cln() { .unwrap(); let funding_txo = common::expect_channel_pending_event!(node, cln_node_id); - common::wait_for_tx(&electrs_client, funding_txo.txid); - common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 6); + common::wait_for_tx(&electrs_client, funding_txo.txid).await; + common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 6).await; node.sync_wallets().unwrap(); let user_channel_id = common::expect_channel_ready_event!(node, cln_node_id); diff --git a/tests/integration_tests_lnd.rs b/tests/integration_tests_lnd.rs index 7dfc1e4f9..311a11c3c 100755 --- a/tests/integration_tests_lnd.rs +++ b/tests/integration_tests_lnd.rs @@ -34,7 +34,7 @@ async fn test_lnd() { let electrs_client = ElectrumClient::new("tcp://127.0.0.1:50001").unwrap(); // Give electrs a kick. - common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1); + common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 1).await; // Setup LDK Node let config = common::random_config(true); @@ -52,7 +52,8 @@ async fn test_lnd() { &electrs_client, vec![address], premine_amount, - ); + ) + .await; // Setup LND let endpoint = "127.0.0.1:8081"; @@ -73,8 +74,8 @@ async fn test_lnd() { .unwrap(); let funding_txo = common::expect_channel_pending_event!(node, lnd_node_id); - common::wait_for_tx(&electrs_client, funding_txo.txid); - common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 6); + common::wait_for_tx(&electrs_client, funding_txo.txid).await; + common::generate_blocks_and_wait(&bitcoind_client, &electrs_client, 6).await; node.sync_wallets().unwrap(); let user_channel_id = common::expect_channel_ready_event!(node, lnd_node_id); diff --git a/tests/integration_tests_rust.rs b/tests/integration_tests_rust.rs index 804bba876..e2d4207cd 100644 --- a/tests/integration_tests_rust.rs +++ b/tests/integration_tests_rust.rs @@ -39,72 +39,80 @@ use lightning_invoice::{Bolt11InvoiceDescription, Description}; use lightning_types::payment::{PaymentHash, PaymentPreimage}; use log::LevelFilter; -#[test] -fn channel_full_cycle() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; } -#[test] -fn channel_full_cycle_electrum() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_electrum() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Electrum(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; } -#[test] -fn channel_full_cycle_bitcoind_rpc_sync() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_bitcoind_rpc_sync() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::BitcoindRpcSync(&bitcoind); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; } -#[test] -fn channel_full_cycle_bitcoind_rest_sync() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_bitcoind_rest_sync() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::BitcoindRestSync(&bitcoind); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, false) + .await; } -#[test] -fn channel_full_cycle_force_close() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_force_close() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true) + .await; } -#[test] -fn channel_full_cycle_force_close_trusted_no_reserve() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_force_close_trusted_no_reserve() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, true); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, true, true) + .await; } -#[test] -fn channel_full_cycle_0conf() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_0conf() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, true, true, false); do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, true, true, false) + .await; } -#[test] -fn channel_full_cycle_legacy_staticremotekey() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_legacy_staticremotekey() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false); - do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, false, false); + do_channel_full_cycle(node_a, node_b, &bitcoind.client, &electrsd.client, false, false, false) + .await; } -#[test] -fn channel_open_fails_when_funds_insufficient() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_open_fails_when_funds_insufficient() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -119,7 +127,8 @@ fn channel_open_fails_when_funds_insufficient() { &electrsd.client, vec![addr_a, addr_b], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, premine_amount_sat); @@ -138,8 +147,8 @@ fn channel_open_fails_when_funds_insufficient() { ); } -#[test] -fn multi_hop_sending() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn multi_hop_sending() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -162,7 +171,8 @@ fn multi_hop_sending() { &electrsd.client, addresses, Amount::from_sat(premine_amount_sat), - ); + ) + .await; for n in &nodes { n.sync_wallets().unwrap(); @@ -177,18 +187,18 @@ fn multi_hop_sending() { // \ / // (1M:0)- N3 -(1M:0) - open_channel(&nodes[0], &nodes[1], 100_000, true, &electrsd); - open_channel(&nodes[1], &nodes[2], 1_000_000, true, &electrsd); + open_channel(&nodes[0], &nodes[1], 100_000, true, &electrsd).await; + open_channel(&nodes[1], &nodes[2], 1_000_000, true, &electrsd).await; // We need to sync wallets in-between back-to-back channel opens from the same node so BDK // wallet picks up on the broadcast funding tx and doesn't double-spend itself. // // TODO: Remove once fixed in BDK. nodes[1].sync_wallets().unwrap(); - open_channel(&nodes[1], &nodes[3], 1_000_000, true, &electrsd); - open_channel(&nodes[2], &nodes[4], 1_000_000, true, &electrsd); - open_channel(&nodes[3], &nodes[4], 1_000_000, true, &electrsd); + open_channel(&nodes[1], &nodes[3], 1_000_000, true, &electrsd).await; + open_channel(&nodes[2], &nodes[4], 1_000_000, true, &electrsd).await; + open_channel(&nodes[3], &nodes[4], 1_000_000, true, &electrsd).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; for n in &nodes { n.sync_wallets().unwrap(); @@ -206,7 +216,7 @@ fn multi_hop_sending() { expect_event!(nodes[4], ChannelReady); // Sleep a bit for gossip to propagate. - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let route_params = RouteParametersConfig { max_total_routing_fee_msat: Some(75_000), @@ -235,8 +245,8 @@ fn multi_hop_sending() { expect_payment_successful_event!(nodes[0], payment_id, Some(fee_paid_msat)); } -#[test] -fn start_stop_reinit() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn start_stop_reinit() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let config = random_config(true); @@ -265,7 +275,8 @@ fn start_stop_reinit() { &electrsd.client, vec![funding_address], expected_amount, - ); + ) + .await; node.sync_wallets().unwrap(); assert_eq!(node.list_balances().spendable_onchain_balance_sats, expected_amount.to_sat()); @@ -304,8 +315,8 @@ fn start_stop_reinit() { reinitialized_node.stop().unwrap(); } -#[test] -fn onchain_send_receive() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -323,7 +334,8 @@ fn onchain_send_receive() { &electrsd.client, vec![addr_a.clone(), addr_b.clone()], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -350,8 +362,8 @@ fn onchain_send_receive() { let channel_amount_sat = 1_000_000; let reserve_amount_sat = 25_000; - open_channel(&node_b, &node_a, channel_amount_sat, true, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_b, &node_a, channel_amount_sat, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -393,7 +405,7 @@ fn onchain_send_receive() { let amount_to_send_sats = 54321; let txid = node_b.onchain_payment().send_to_address(&addr_a, amount_to_send_sats, None).unwrap(); - wait_for_tx(&electrsd.client, txid); + wait_for_tx(&electrsd.client, txid).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -420,7 +432,7 @@ fn onchain_send_receive() { assert_eq!(payment_a.amount_msat, payment_b.amount_msat); assert_eq!(payment_a.fee_paid_msat, payment_b.fee_paid_msat); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -458,8 +470,8 @@ fn onchain_send_receive() { let addr_b = node_b.onchain_payment().new_address().unwrap(); let txid = node_a.onchain_payment().send_all_to_address(&addr_b, true, None).unwrap(); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); - wait_for_tx(&electrsd.client, txid); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_tx(&electrsd.client, txid).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -481,8 +493,8 @@ fn onchain_send_receive() { let addr_b = node_b.onchain_payment().new_address().unwrap(); let txid = node_a.onchain_payment().send_all_to_address(&addr_b, false, None).unwrap(); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); - wait_for_tx(&electrsd.client, txid); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_tx(&electrsd.client, txid).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -504,8 +516,8 @@ fn onchain_send_receive() { assert_eq!(node_b_payments.len(), 5); } -#[test] -fn onchain_send_all_retains_reserve() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_send_all_retains_reserve() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -522,7 +534,8 @@ fn onchain_send_all_retains_reserve() { &electrsd.client, vec![addr_a.clone(), addr_b.clone()], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -532,8 +545,8 @@ fn onchain_send_all_retains_reserve() { // Send all over, with 0 reserve as we don't have any channels open. let txid = node_a.onchain_payment().send_all_to_address(&addr_b, true, None).unwrap(); - wait_for_tx(&electrsd.client, txid); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + wait_for_tx(&electrsd.client, txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -550,15 +563,15 @@ fn onchain_send_all_retains_reserve() { .0 .parse() .unwrap(); - wait_for_tx(&electrsd.client, txid); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + wait_for_tx(&electrsd.client, txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); assert_eq!(node_a.list_balances().spendable_onchain_balance_sats, reserve_amount_sat); // Open a channel. - open_channel(&node_b, &node_a, premine_amount_sat, false, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_b, &node_a, premine_amount_sat, false, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); expect_channel_ready_event!(node_a, node_b.node_id()); @@ -573,8 +586,8 @@ fn onchain_send_all_retains_reserve() { // Send all over again, this time ensuring the reserve is accounted for let txid = node_b.onchain_payment().send_all_to_address(&addr_a, true, None).unwrap(); - wait_for_tx(&electrsd.client, txid); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + wait_for_tx(&electrsd.client, txid).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -587,8 +600,8 @@ fn onchain_send_all_retains_reserve() { .contains(&node_a.list_balances().spendable_onchain_balance_sats)); } -#[test] -fn onchain_wallet_recovery() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn onchain_wallet_recovery() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -607,7 +620,8 @@ fn onchain_wallet_recovery() { &electrsd.client, vec![addr_1], Amount::from_sat(premine_amount_sat), - ); + ) + .await; original_node.sync_wallets().unwrap(); assert_eq!(original_node.list_balances().spendable_onchain_balance_sats, premine_amount_sat); @@ -620,9 +634,9 @@ fn onchain_wallet_recovery() { .0 .parse() .unwrap(); - wait_for_tx(&electrsd.client, txid); + wait_for_tx(&electrsd.client, txid).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; original_node.sync_wallets().unwrap(); assert_eq!( @@ -656,9 +670,9 @@ fn onchain_wallet_recovery() { .0 .parse() .unwrap(); - wait_for_tx(&electrsd.client, txid); + wait_for_tx(&electrsd.client, txid).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 1).await; recovered_node.sync_wallets().unwrap(); assert_eq!( @@ -667,20 +681,20 @@ fn onchain_wallet_recovery() { ); } -#[test] -fn test_rbf_via_mempool() { - run_rbf_test(false); +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_rbf_via_mempool() { + run_rbf_test(false).await; } -#[test] -fn test_rbf_via_direct_block_insertion() { - run_rbf_test(true); +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_rbf_via_direct_block_insertion() { + run_rbf_test(true).await; } // `is_insert_block`: // - `true`: transaction is mined immediately (no mempool), testing confirmed-Tx handling. // - `false`: transaction stays in mempool until confirmation, testing unconfirmed-Tx handling. -fn run_rbf_test(is_insert_block: bool) { +async fn run_rbf_test(is_insert_block: bool) { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source_bitcoind = TestChainSource::BitcoindRpcSync(&bitcoind); let chain_source_electrsd = TestChainSource::Electrum(&electrsd); @@ -701,7 +715,7 @@ fn run_rbf_test(is_insert_block: bool) { ]; let (bitcoind, electrs) = (&bitcoind.client, &electrsd.client); - premine_blocks(bitcoind, electrs); + premine_blocks(bitcoind, electrs).await; // Helpers declaration before starting the test let all_addrs = @@ -715,7 +729,8 @@ fn run_rbf_test(is_insert_block: bool) { electrs, all_addrs.clone(), Amount::from_sat(amount_sat), - ); + ) + .await; }; } macro_rules! validate_balances { @@ -745,14 +760,14 @@ fn run_rbf_test(is_insert_block: bool) { output.script_pubkey = new_addr.script_pubkey(); } }); - bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block); + bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block).await; validate_balances!(0, is_insert_block); // Not modifying the output scripts, but still bumping the fee. distribute_funds_all_nodes!(); validate_balances!(amount_sat, false); (tx, fee_output_index) = prepare_rbf(electrs, txid, &scripts_buf); - bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block); + bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block).await; validate_balances!(amount_sat, is_insert_block); let mut final_amount_sat = amount_sat * 2; @@ -766,7 +781,7 @@ fn run_rbf_test(is_insert_block: bool) { output.value = Amount::from_sat(output.value.to_sat() + value_sat); } }); - bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block); + bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block).await; final_amount_sat += value_sat; validate_balances!(final_amount_sat, is_insert_block); @@ -779,12 +794,12 @@ fn run_rbf_test(is_insert_block: bool) { output.value = Amount::from_sat(output.value.to_sat() - value_sat); } }); - bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block); + bump_fee_and_broadcast(bitcoind, electrs, tx, fee_output_index, is_insert_block).await; final_amount_sat -= value_sat; validate_balances!(final_amount_sat, is_insert_block); if !is_insert_block { - generate_blocks_and_wait(bitcoind, electrs, 1); + generate_blocks_and_wait(bitcoind, electrs, 1).await; validate_balances!(final_amount_sat, true); } @@ -795,15 +810,15 @@ fn run_rbf_test(is_insert_block: bool) { let txid = node.onchain_payment().send_all_to_address(&addr, true, None).unwrap(); txids.push(txid); }); - txids.iter().for_each(|txid| { - wait_for_tx(electrs, *txid); - }); - generate_blocks_and_wait(bitcoind, electrs, 6); + for txid in txids { + wait_for_tx(electrs, txid).await; + } + generate_blocks_and_wait(bitcoind, electrs, 6).await; validate_balances!(0, true); } -#[test] -fn sign_verify_msg() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn sign_verify_msg() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let config = random_config(true); let chain_source = TestChainSource::Esplora(&electrsd); @@ -816,8 +831,8 @@ fn sign_verify_msg() { assert!(node.verify_signature(msg, sig.as_str(), &pkey)); } -#[test] -fn connection_multi_listen() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn connection_multi_listen() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false); @@ -831,13 +846,13 @@ fn connection_multi_listen() { } } -#[test] -fn connection_restart_behavior() { - do_connection_restart_behavior(true); - do_connection_restart_behavior(false); +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn connection_restart_behavior() { + do_connection_restart_behavior(true).await; + do_connection_restart_behavior(false).await; } -fn do_connection_restart_behavior(persist: bool) { +async fn do_connection_restart_behavior(persist: bool) { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, false, false); @@ -865,7 +880,7 @@ fn do_connection_restart_behavior(persist: bool) { node_a.start().unwrap(); // Sleep a bit to allow for the reconnect to happen. - std::thread::sleep(std::time::Duration::from_secs(5)); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; if persist { let peer_details_a = node_a.list_peers().first().unwrap().clone(); @@ -883,8 +898,8 @@ fn do_connection_restart_behavior(persist: bool) { } } -#[test] -fn concurrent_connections_succeed() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn concurrent_connections_succeed() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -910,8 +925,8 @@ fn concurrent_connections_succeed() { } } -#[test] -fn simple_bolt12_send_receive() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn simple_bolt12_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -923,12 +938,13 @@ fn simple_bolt12_send_receive() { &electrsd.client, vec![address_a], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); - open_channel(&node_a, &node_b, 4_000_000, true, &electrsd); + open_channel(&node_a, &node_b, 4_000_000, true, &electrsd).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -938,11 +954,11 @@ fn simple_bolt12_send_receive() { // Sleep until we broadcasted a node announcement. while node_b.status().latest_node_announcement_broadcast_timestamp.is_none() { - std::thread::sleep(std::time::Duration::from_millis(10)); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; } // Sleep one more sec to make sure the node announcement propagates. - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let expected_amount_msat = 100_000_000; let offer = @@ -1131,8 +1147,8 @@ fn simple_bolt12_send_receive() { assert_eq!(node_a_payments.first().unwrap().amount_msat, Some(overpaid_amount)); } -#[test] -fn async_payment() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn async_payment() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1186,15 +1202,16 @@ fn async_payment() { &electrsd.client, vec![address_sender, address_sender_lsp, address_receiver_lsp, address_receiver], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_sender.sync_wallets().unwrap(); node_sender_lsp.sync_wallets().unwrap(); node_receiver_lsp.sync_wallets().unwrap(); node_receiver.sync_wallets().unwrap(); - open_channel(&node_sender, &node_sender_lsp, 400_000, false, &electrsd); - open_channel(&node_sender_lsp, &node_receiver_lsp, 400_000, true, &electrsd); + open_channel(&node_sender, &node_sender_lsp, 400_000, false, &electrsd).await; + open_channel(&node_sender_lsp, &node_receiver_lsp, 400_000, true, &electrsd).await; open_channel_push_amt( &node_receiver, &node_receiver_lsp, @@ -1202,9 +1219,10 @@ fn async_payment() { Some(200_000_000), false, &electrsd, - ); + ) + .await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_sender.sync_wallets().unwrap(); node_sender_lsp.sync_wallets().unwrap(); @@ -1238,7 +1256,7 @@ fn async_payment() { || !has_node_announcements(&node_receiver_lsp) || !has_node_announcements(&node_receiver) { - std::thread::sleep(std::time::Duration::from_millis(100)); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; } let recipient_id = vec![1, 2, 3]; @@ -1251,7 +1269,7 @@ fn async_payment() { break offer; } - std::thread::sleep(std::time::Duration::from_millis(100)); + tokio::time::sleep(std::time::Duration::from_millis(100)).await; }; node_receiver.stop().unwrap(); @@ -1260,15 +1278,15 @@ fn async_payment() { node_sender.bolt12_payment().send_using_amount(&offer, 5_000, None, None).unwrap(); // Sleep to allow the payment reach a state where the htlc is held and waiting for the receiver to come online. - std::thread::sleep(std::time::Duration::from_millis(3000)); + tokio::time::sleep(std::time::Duration::from_millis(3000)).await; node_receiver.start().unwrap(); expect_payment_successful_event!(node_sender, Some(payment_id), None); } -#[test] -fn test_node_announcement_propagation() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn test_node_announcement_propagation() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1306,14 +1324,15 @@ fn test_node_announcement_propagation() { &electrsd.client, vec![address_a], Amount::from_sat(premine_amount_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); // Open an announced channel from node_a to node_b - open_channel(&node_a, &node_b, 4_000_000, true, &electrsd); + open_channel(&node_a, &node_b, 4_000_000, true, &electrsd).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1323,11 +1342,11 @@ fn test_node_announcement_propagation() { // Wait until node_b broadcasts a node announcement while node_b.status().latest_node_announcement_broadcast_timestamp.is_none() { - std::thread::sleep(std::time::Duration::from_millis(10)); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; } // Sleep to make sure the node announcement propagates - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; // Get node info from the other node's perspective let node_a_info = node_b.network_graph().node(&NodeId::from_pubkey(&node_a.node_id())).unwrap(); @@ -1358,8 +1377,8 @@ fn test_node_announcement_propagation() { assert_eq!(node_b_announcement_info.addresses, node_b_listening_addresses); } -#[test] -fn generate_bip21_uri() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn generate_bip21_uri() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1388,11 +1407,12 @@ fn generate_bip21_uri() { &electrsd.client, vec![address_a], Amount::from_sat(premined_sats), - ); + ) + .await; node_a.sync_wallets().unwrap(); - open_channel(&node_a, &node_b, 4_000_000, true, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_a, &node_b, 4_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1412,8 +1432,8 @@ fn generate_bip21_uri() { assert!(uqr_payment.contains("lno=")); } -#[test] -fn unified_qr_send_receive() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn unified_qr_send_receive() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1427,11 +1447,12 @@ fn unified_qr_send_receive() { &electrsd.client, vec![address_a], Amount::from_sat(premined_sats), - ); + ) + .await; node_a.sync_wallets().unwrap(); - open_channel(&node_a, &node_b, 4_000_000, true, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_a, &node_b, 4_000_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1441,11 +1462,11 @@ fn unified_qr_send_receive() { // Sleep until we broadcast a node announcement. while node_b.status().latest_node_announcement_broadcast_timestamp.is_none() { - std::thread::sleep(std::time::Duration::from_millis(10)); + tokio::time::sleep(std::time::Duration::from_millis(10)).await; } // Sleep one more sec to make sure the node announcement propagates. - std::thread::sleep(std::time::Duration::from_secs(1)); + tokio::time::sleep(std::time::Duration::from_secs(1)).await; let expected_amount_sats = 100_000; let expiry_sec = 4_000; @@ -1512,8 +1533,8 @@ fn unified_qr_send_receive() { }, }; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); - wait_for_tx(&electrsd.client, txid); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; + wait_for_tx(&electrsd.client, txid).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); @@ -1522,8 +1543,8 @@ fn unified_qr_send_receive() { assert_eq!(node_b.list_balances().total_lightning_balance_sats, 200_000); } -#[test] -fn lsps2_client_service_integration() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn lsps2_client_service_integration() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -1579,16 +1600,17 @@ fn lsps2_client_service_integration() { &electrsd.client, vec![service_addr, client_addr, payer_addr], Amount::from_sat(premine_amount_sat), - ); + ) + .await; service_node.sync_wallets().unwrap(); client_node.sync_wallets().unwrap(); payer_node.sync_wallets().unwrap(); // Open a channel payer -> service that will allow paying the JIT invoice println!("Opening channel payer_node -> service_node!"); - open_channel(&payer_node, &service_node, 5_000_000, false, &electrsd); + open_channel(&payer_node, &service_node, 5_000_000, false, &electrsd).await; - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; service_node.sync_wallets().unwrap(); payer_node.sync_wallets().unwrap(); expect_channel_ready_event!(payer_node, service_node.node_id()); @@ -1743,8 +1765,8 @@ fn lsps2_client_service_integration() { assert_eq!(client_node.payment(&payment_id).unwrap().status, PaymentStatus::Failed); } -#[test] -fn facade_logging() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn facade_logging() { let (_bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); @@ -1761,8 +1783,8 @@ fn facade_logging() { } } -#[test] -fn spontaneous_send_with_custom_preimage() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn spontaneous_send_with_custom_preimage() { let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); let chain_source = TestChainSource::Esplora(&electrsd); let (node_a, node_b) = setup_two_nodes(&chain_source, false, true, false); @@ -1774,11 +1796,12 @@ fn spontaneous_send_with_custom_preimage() { &electrsd.client, vec![address_a], Amount::from_sat(premine_sat), - ); + ) + .await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); - open_channel(&node_a, &node_b, 500_000, true, &electrsd); - generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6); + open_channel(&node_a, &node_b, 500_000, true, &electrsd).await; + generate_blocks_and_wait(&bitcoind.client, &electrsd.client, 6).await; node_a.sync_wallets().unwrap(); node_b.sync_wallets().unwrap(); expect_channel_ready_event!(node_a, node_b.node_id()); diff --git a/tests/integration_tests_vss.rs b/tests/integration_tests_vss.rs index bdd876003..93f167dae 100644 --- a/tests/integration_tests_vss.rs +++ b/tests/integration_tests_vss.rs @@ -13,8 +13,8 @@ use std::collections::HashMap; use ldk_node::Builder; -#[test] -fn channel_full_cycle_with_vss_store() { +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn channel_full_cycle_with_vss_store() { let (bitcoind, electrsd) = common::setup_bitcoind_and_electrsd(); println!("== Node A =="); let esplora_url = format!("http://{}", electrsd.esplora_url.as_ref().unwrap()); @@ -52,5 +52,6 @@ fn channel_full_cycle_with_vss_store() { false, true, false, - ); + ) + .await; } diff --git a/tests/reorg_test.rs b/tests/reorg_test.rs index 03ace908f..491a37fd4 100644 --- a/tests/reorg_test.rs +++ b/tests/reorg_test.rs @@ -17,179 +17,187 @@ proptest! { #![proptest_config(proptest::test_runner::Config::with_cases(5))] #[test] fn reorg_test(reorg_depth in 1..=6usize, force_close in prop::bool::ANY) { - let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); - - let chain_source_bitcoind = TestChainSource::BitcoindRpcSync(&bitcoind); - let chain_source_electrsd = TestChainSource::Electrum(&electrsd); - let chain_source_esplora = TestChainSource::Esplora(&electrsd); - - macro_rules! config_node { - ($chain_source: expr, $anchor_channels: expr) => {{ - let config_a = random_config($anchor_channels); - let node = setup_node(&$chain_source, config_a, None); - node - }}; - } - let anchor_channels = true; - let nodes = vec![ - config_node!(chain_source_electrsd, anchor_channels), - config_node!(chain_source_bitcoind, anchor_channels), - config_node!(chain_source_esplora, anchor_channels), - ]; - - let (bitcoind, electrs) = (&bitcoind.client, &electrsd.client); - macro_rules! reorg { - ($reorg_depth: expr) => {{ - invalidate_blocks(bitcoind, $reorg_depth); - generate_blocks_and_wait(bitcoind, electrs, $reorg_depth); - }}; - } - - let amount_sat = 2_100_000; - let addr_nodes = - nodes.iter().map(|node| node.onchain_payment().new_address().unwrap()).collect::>(); - premine_and_distribute_funds(bitcoind, electrs, addr_nodes, Amount::from_sat(amount_sat)); - - macro_rules! sync_wallets { - () => { - nodes.iter().for_each(|node| node.sync_wallets().unwrap()) - }; - } - sync_wallets!(); - nodes.iter().for_each(|node| { - assert_eq!(node.list_balances().spendable_onchain_balance_sats, amount_sat); - assert_eq!(node.list_balances().total_onchain_balance_sats, amount_sat); - }); - - - let mut nodes_funding_tx = HashMap::new(); - let funding_amount_sat = 2_000_000; - for (node, next_node) in nodes.iter().zip(nodes.iter().cycle().skip(1)) { - let funding_txo = open_channel(node, next_node, funding_amount_sat, true, &electrsd); - nodes_funding_tx.insert(node.node_id(), funding_txo); - } - - generate_blocks_and_wait(bitcoind, electrs, 6); - sync_wallets!(); - - reorg!(reorg_depth); - sync_wallets!(); - - macro_rules! collect_channel_ready_events { - ($node:expr, $expected:expr) => {{ - let mut user_channels = HashMap::new(); - for _ in 0..$expected { - match $node.wait_next_event() { - Event::ChannelReady { user_channel_id, counterparty_node_id, .. } => { - $node.event_handled().unwrap(); - user_channels.insert(counterparty_node_id, user_channel_id); - }, - other => panic!("Unexpected event: {:?}", other), + let rt = tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async { + let (bitcoind, electrsd) = setup_bitcoind_and_electrsd(); + + let chain_source_bitcoind = TestChainSource::BitcoindRpcSync(&bitcoind); + let chain_source_electrsd = TestChainSource::Electrum(&electrsd); + let chain_source_esplora = TestChainSource::Esplora(&electrsd); + + macro_rules! config_node { + ($chain_source: expr, $anchor_channels: expr) => {{ + let config_a = random_config($anchor_channels); + let node = setup_node(&$chain_source, config_a, None); + node + }}; + } + let anchor_channels = true; + let nodes = vec![ + config_node!(chain_source_electrsd, anchor_channels), + config_node!(chain_source_bitcoind, anchor_channels), + config_node!(chain_source_esplora, anchor_channels), + ]; + + let (bitcoind, electrs) = (&bitcoind.client, &electrsd.client); + macro_rules! reorg { + ($reorg_depth: expr) => {{ + invalidate_blocks(bitcoind, $reorg_depth); + generate_blocks_and_wait(bitcoind, electrs, $reorg_depth).await; + }}; + } + + let amount_sat = 2_100_000; + let addr_nodes = + nodes.iter().map(|node| node.onchain_payment().new_address().unwrap()).collect::>(); + premine_and_distribute_funds(bitcoind, electrs, addr_nodes, Amount::from_sat(amount_sat)).await; + + macro_rules! sync_wallets { + () => { + for node in &nodes { + node.sync_wallets().unwrap(); } - } - user_channels - }}; - } + }; + } + sync_wallets!(); + nodes.iter().for_each(|node| { + assert_eq!(node.list_balances().spendable_onchain_balance_sats, amount_sat); + assert_eq!(node.list_balances().total_onchain_balance_sats, amount_sat); + }); - let mut node_channels_id = HashMap::new(); - for (i, node) in nodes.iter().enumerate() { - assert_eq!( - node - .list_payments_with_filter(|p| p.direction == PaymentDirection::Outbound - && matches!(p.kind, PaymentKind::Onchain { .. })) - .len(), - 1 - ); - let user_channels = collect_channel_ready_events!(node, 2); - let next_node = nodes.get((i + 1) % nodes.len()).unwrap(); - let prev_node = nodes.get((i + nodes.len() - 1) % nodes.len()).unwrap(); + let mut nodes_funding_tx = HashMap::new(); + let funding_amount_sat = 2_000_000; + for (node, next_node) in nodes.iter().zip(nodes.iter().cycle().skip(1)) { + let funding_txo = open_channel(node, next_node, funding_amount_sat, true, &electrsd).await; + nodes_funding_tx.insert(node.node_id(), funding_txo); + } - assert!(user_channels.get(&Some(next_node.node_id())) != None); - assert!(user_channels.get(&Some(prev_node.node_id())) != None); + generate_blocks_and_wait(bitcoind, electrs, 6).await; + sync_wallets!(); + + reorg!(reorg_depth); + sync_wallets!(); + + macro_rules! collect_channel_ready_events { + ($node:expr, $expected:expr) => {{ + let mut user_channels = HashMap::new(); + for _ in 0..$expected { + match $node.next_event_async().await { + Event::ChannelReady { user_channel_id, counterparty_node_id, .. } => { + $node.event_handled().unwrap(); + user_channels.insert(counterparty_node_id, user_channel_id); + }, + other => panic!("Unexpected event: {:?}", other), + } + } + user_channels + }}; + } - let user_channel_id = - user_channels.get(&Some(next_node.node_id())).expect("Missing user channel for node"); - node_channels_id.insert(node.node_id(), *user_channel_id); - } + let mut node_channels_id = HashMap::new(); + for (i, node) in nodes.iter().enumerate() { + assert_eq!( + node + .list_payments_with_filter(|p| p.direction == PaymentDirection::Outbound + && matches!(p.kind, PaymentKind::Onchain { .. })) + .len(), + 1 + ); + + let user_channels = collect_channel_ready_events!(node, 2); + let next_node = nodes.get((i + 1) % nodes.len()).unwrap(); + let prev_node = nodes.get((i + nodes.len() - 1) % nodes.len()).unwrap(); + + assert!(user_channels.get(&Some(next_node.node_id())) != None); + assert!(user_channels.get(&Some(prev_node.node_id())) != None); + + let user_channel_id = + user_channels.get(&Some(next_node.node_id())).expect("Missing user channel for node"); + node_channels_id.insert(node.node_id(), *user_channel_id); + } - for (node, next_node) in nodes.iter().zip(nodes.iter().cycle().skip(1)) { - let user_channel_id = node_channels_id.get(&node.node_id()).expect("user channel id not exist"); - let funding = nodes_funding_tx.get(&node.node_id()).expect("Funding tx not exist"); + for (node, next_node) in nodes.iter().zip(nodes.iter().cycle().skip(1)) { + let user_channel_id = node_channels_id.get(&node.node_id()).expect("user channel id not exist"); + let funding = nodes_funding_tx.get(&node.node_id()).expect("Funding tx not exist"); - if force_close { - node.force_close_channel(&user_channel_id, next_node.node_id(), None).unwrap(); - } else { - node.close_channel(&user_channel_id, next_node.node_id()).unwrap(); - } + if force_close { + node.force_close_channel(&user_channel_id, next_node.node_id(), None).unwrap(); + } else { + node.close_channel(&user_channel_id, next_node.node_id()).unwrap(); + } - expect_event!(node, ChannelClosed); - expect_event!(next_node, ChannelClosed); + expect_event!(node, ChannelClosed); + expect_event!(next_node, ChannelClosed); - wait_for_outpoint_spend(electrs, *funding); - } + wait_for_outpoint_spend(electrs, *funding).await; + } - reorg!(reorg_depth); - sync_wallets!(); + reorg!(reorg_depth); + sync_wallets!(); - generate_blocks_and_wait(bitcoind, electrs, 1); - sync_wallets!(); + generate_blocks_and_wait(bitcoind, electrs, 1).await; + sync_wallets!(); - if force_close { - nodes.iter().for_each(|node| { - node.sync_wallets().unwrap(); - // If there is no more balance, there is nothing to process here. - if node.list_balances().lightning_balances.len() < 1 { - return; - } - match node.list_balances().lightning_balances[0] { - LightningBalance::ClaimableAwaitingConfirmations { - confirmation_height, - .. - } => { - let cur_height = node.status().current_best_block.height; - let blocks_to_go = confirmation_height - cur_height; - generate_blocks_and_wait(bitcoind, electrs, blocks_to_go as usize); - node.sync_wallets().unwrap(); - }, - _ => panic!("Unexpected balance state for node_hub!"), - } + if force_close { + for node in &nodes { + node.sync_wallets().unwrap(); + // If there is no more balance, there is nothing to process here. + if node.list_balances().lightning_balances.len() < 1 { + return; + } + match node.list_balances().lightning_balances[0] { + LightningBalance::ClaimableAwaitingConfirmations { + confirmation_height, + .. + } => { + let cur_height = node.status().current_best_block.height; + let blocks_to_go = confirmation_height - cur_height; + generate_blocks_and_wait(bitcoind, electrs, blocks_to_go as usize).await; + node.sync_wallets().unwrap(); + }, + _ => panic!("Unexpected balance state for node_hub!"), + } - assert!(node.list_balances().lightning_balances.len() < 2); - assert!(node.list_balances().pending_balances_from_channel_closures.len() > 0); - match node.list_balances().pending_balances_from_channel_closures[0] { - PendingSweepBalance::BroadcastAwaitingConfirmation { .. } => {}, - _ => panic!("Unexpected balance state!"), - } + assert!(node.list_balances().lightning_balances.len() < 2); + assert!(node.list_balances().pending_balances_from_channel_closures.len() > 0); + match node.list_balances().pending_balances_from_channel_closures[0] { + PendingSweepBalance::BroadcastAwaitingConfirmation { .. } => {}, + _ => panic!("Unexpected balance state!"), + } - generate_blocks_and_wait(&bitcoind, electrs, 1); - node.sync_wallets().unwrap(); - assert!(node.list_balances().lightning_balances.len() < 2); - assert!(node.list_balances().pending_balances_from_channel_closures.len() > 0); - match node.list_balances().pending_balances_from_channel_closures[0] { - PendingSweepBalance::AwaitingThresholdConfirmations { .. } => {}, - _ => panic!("Unexpected balance state!"), + generate_blocks_and_wait(&bitcoind, electrs, 1).await; + node.sync_wallets().unwrap(); + assert!(node.list_balances().lightning_balances.len() < 2); + assert!(node.list_balances().pending_balances_from_channel_closures.len() > 0); + match node.list_balances().pending_balances_from_channel_closures[0] { + PendingSweepBalance::AwaitingThresholdConfirmations { .. } => {}, + _ => panic!("Unexpected balance state!"), + } } - }); - } + } - generate_blocks_and_wait(bitcoind, electrs, 6); - sync_wallets!(); + generate_blocks_and_wait(bitcoind, electrs, 6).await; + sync_wallets!(); - reorg!(reorg_depth); - sync_wallets!(); + reorg!(reorg_depth); + sync_wallets!(); - let fee_sat = 7000; - // Check balance after close channel - nodes.iter().for_each(|node| { - assert!(node.list_balances().spendable_onchain_balance_sats > amount_sat - fee_sat); - assert!(node.list_balances().spendable_onchain_balance_sats < amount_sat); + let fee_sat = 7000; + // Check balance after close channel + nodes.iter().for_each(|node| { + assert!(node.list_balances().spendable_onchain_balance_sats > amount_sat - fee_sat); + assert!(node.list_balances().spendable_onchain_balance_sats < amount_sat); - assert_eq!(node.list_balances().total_anchor_channels_reserve_sats, 0); - assert!(node.list_balances().lightning_balances.is_empty()); + assert_eq!(node.list_balances().total_anchor_channels_reserve_sats, 0); + assert!(node.list_balances().lightning_balances.is_empty()); - assert_eq!(node.next_event(), None); - }); + assert_eq!(node.next_event(), None); + }); + }) } } From 8512c26035109f78b15b8e46be9ffc41037690bd Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 20 Oct 2025 09:16:08 -0500 Subject: [PATCH 5/7] Add `async { ..await }` in remaining places Since it seems to make a difference to `tokio` (see https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures are always put in an `async` closure. --- src/runtime.rs | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/runtime.rs b/src/runtime.rs index 2275d5bea..1e9883ae4 100644 --- a/src/runtime.rs +++ b/src/runtime.rs @@ -67,7 +67,10 @@ impl Runtime { { let mut background_tasks = self.background_tasks.lock().unwrap(); let runtime_handle = self.handle(); - background_tasks.spawn_on(future, runtime_handle); + // Since it seems to make a difference to `tokio` (see + // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures + // are always put in an `async` / `.await` closure. + background_tasks.spawn_on(async { future.await }, runtime_handle); } pub fn spawn_cancellable_background_task(&self, future: F) @@ -76,7 +79,10 @@ impl Runtime { { let mut cancellable_background_tasks = self.cancellable_background_tasks.lock().unwrap(); let runtime_handle = self.handle(); - cancellable_background_tasks.spawn_on(future, runtime_handle); + // Since it seems to make a difference to `tokio` (see + // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures + // are always put in an `async` / `.await` closure. + cancellable_background_tasks.spawn_on(async { future.await }, runtime_handle); } pub fn spawn_background_processor_task(&self, future: F) @@ -107,7 +113,10 @@ impl Runtime { // to detect the outer context here, and otherwise use whatever was set during // initialization. let handle = tokio::runtime::Handle::try_current().unwrap_or(self.handle().clone()); - tokio::task::block_in_place(move || handle.block_on(future)) + // Since it seems to make a difference to `tokio` (see + // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures + // are always put in an `async` / `.await` closure. + tokio::task::block_in_place(move || handle.block_on(async { future.await })) } pub fn abort_cancellable_background_tasks(&self) { @@ -154,6 +163,9 @@ impl Runtime { self.background_processor_task.lock().unwrap().take() { let abort_handle = background_processor_task.abort_handle(); + // Since it seems to make a difference to `tokio` (see + // https://docs.rs/tokio/latest/tokio/time/fn.timeout.html#panics) we make sure the futures + // are always put in an `async` / `.await` closure. let timeout_res = self.block_on(async { tokio::time::timeout( Duration::from_secs(LDK_EVENT_HANDLER_SHUTDOWN_TIMEOUT_SECS), From e01ea056c865ff3ffb12b38e2438cdb838ff019e Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 20 Oct 2025 15:26:58 -0500 Subject: [PATCH 6/7] Re-introduce internal runtime to `VssStore` In order to avoid the recently, discovered blocking-task-deadlock (in which the task holding the runtime reactor got blocked and hence stopped polling VSS write tasks), we where re-introduce an internal runtime to the `VssStore`, on which we spawn the tasks, while still using `block_on` of our regular runtime for async-sync conversions. This also finally fixes our VSS CI. --- src/io/vss_store.rs | 218 ++++++++++++++++++++++++++++++++++---------- 1 file changed, 169 insertions(+), 49 deletions(-) diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 134ff7af2..4f90dd6db 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -11,7 +11,7 @@ use std::future::Future; #[cfg(test)] use std::panic::RefUnwindSafe; use std::pin::Pin; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering}; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -44,6 +44,11 @@ type CustomRetryPolicy = FilteredRetryPolicy< Box bool + 'static + Send + Sync>, >; +// We set this to a small number of threads that would still allow to make some progress if one +// would hit a blocking case +const INTERNAL_RUNTIME_WORKERS: usize = 2; +const VSS_IO_TIMEOUT: Duration = Duration::from_secs(5); + /// A [`KVStoreSync`] implementation that writes to and reads from a [VSS](https://github.com/lightningdevkit/vss-server/blob/main/README.md) backend. pub struct VssStore { inner: Arc, @@ -51,6 +56,13 @@ pub struct VssStore { // operations aren't sensitive to the order of execution. next_version: AtomicU64, runtime: Arc, + // A VSS-internal runtime we use to avoid any deadlocks we could hit when waiting on a spawned + // blocking task to finish while the blocked thread had acquired the reactor. In particular, + // this works around a previously-hit case where a concurrent call to + // `PeerManager::process_pending_events` -> `ChannelManager::get_and_clear_pending_msg_events` + // would deadlock when trying to acquire sync `Mutex` locks that are held by the thread + // currently being blocked waiting on the VSS operation to finish. + internal_runtime: Option, } impl VssStore { @@ -60,7 +72,21 @@ impl VssStore { ) -> Self { let inner = Arc::new(VssStoreInner::new(base_url, store_id, vss_seed, header_provider)); let next_version = AtomicU64::new(1); - Self { inner, next_version, runtime } + let internal_runtime = Some( + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .thread_name_fn(|| { + static ATOMIC_ID: AtomicUsize = AtomicUsize::new(0); + let id = ATOMIC_ID.fetch_add(1, Ordering::SeqCst); + format!("ldk-node-vss-runtime-{}", id) + }) + .worker_threads(INTERNAL_RUNTIME_WORKERS) + .max_blocking_threads(INTERNAL_RUNTIME_WORKERS) + .build() + .unwrap(), + ); + + Self { inner, next_version, runtime, internal_runtime } } // Same logic as for the obfuscated keys below, but just for locking, using the plaintext keys @@ -94,46 +120,122 @@ impl KVStoreSync for VssStore { fn read( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result> { - let fut = self.inner.read_internal(primary_namespace, secondary_namespace, key); - self.runtime.block_on(fut) + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { + debug_assert!(false, "Failed to access internal runtime"); + let msg = format!("Failed to access internal runtime"); + Error::new(ErrorKind::Other, msg) + })?; + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let fut = + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }; + // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always + // times out. + let spawned_fut = internal_runtime.spawn(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::read timed out"; + Error::new(ErrorKind::Other, msg) + }) + }); + self.runtime.block_on(spawned_fut).expect("We should always finish")? } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, ) -> io::Result<()> { - let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { + debug_assert!(false, "Failed to access internal runtime"); + let msg = format!("Failed to access internal runtime"); + Error::new(ErrorKind::Other, msg) + })?; + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); - let fut = self.inner.write_internal( - inner_lock_ref, - locking_key, - version, - primary_namespace, - secondary_namespace, - key, - buf, - ); - self.runtime.block_on(fut) + let fut = async move { + inner + .write_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + buf, + ) + .await + }; + // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always + // times out. + let spawned_fut = internal_runtime.spawn(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::write timed out"; + Error::new(ErrorKind::Other, msg) + }) + }); + self.runtime.block_on(spawned_fut).expect("We should always finish")? } fn remove( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, ) -> io::Result<()> { - let locking_key = self.build_locking_key(primary_namespace, secondary_namespace, key); + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { + debug_assert!(false, "Failed to access internal runtime"); + let msg = format!("Failed to access internal runtime"); + Error::new(ErrorKind::Other, msg) + })?; + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let key = key.to_string(); + let inner = Arc::clone(&self.inner); + let locking_key = self.build_locking_key(&primary_namespace, &secondary_namespace, &key); let (inner_lock_ref, version) = self.get_new_version_and_lock_ref(locking_key.clone()); - let fut = self.inner.remove_internal( - inner_lock_ref, - locking_key, - version, - primary_namespace, - secondary_namespace, - key, - ); - self.runtime.block_on(fut) + let fut = async move { + inner + .remove_internal( + inner_lock_ref, + locking_key, + version, + primary_namespace, + secondary_namespace, + key, + ) + .await + }; + // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always + // times out. + let spawned_fut = internal_runtime.spawn(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::remove timed out"; + Error::new(ErrorKind::Other, msg) + }) + }); + self.runtime.block_on(spawned_fut).expect("We should always finish")? } fn list(&self, primary_namespace: &str, secondary_namespace: &str) -> io::Result> { - let fut = self.inner.list_internal(primary_namespace, secondary_namespace); - self.runtime.block_on(fut) + let internal_runtime = self.internal_runtime.as_ref().ok_or_else(|| { + debug_assert!(false, "Failed to access internal runtime"); + let msg = format!("Failed to access internal runtime"); + Error::new(ErrorKind::Other, msg) + })?; + let primary_namespace = primary_namespace.to_string(); + let secondary_namespace = secondary_namespace.to_string(); + let inner = Arc::clone(&self.inner); + let fut = async move { inner.list_internal(primary_namespace, secondary_namespace).await }; + // TODO: We could drop the timeout here once we ensured vss-client's Retry logic always + // times out. + let spawned_fut = internal_runtime.spawn(async move { + tokio::time::timeout(VSS_IO_TIMEOUT, fut).await.map_err(|_| { + let msg = "VssStore::list timed out"; + Error::new(ErrorKind::Other, msg) + }) + }); + self.runtime.block_on(spawned_fut).expect("We should always finish")? } } @@ -145,9 +247,9 @@ impl KVStore for VssStore { let secondary_namespace = secondary_namespace.to_string(); let key = key.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { - inner.read_internal(&primary_namespace, &secondary_namespace, &key).await - }) + Box::pin( + async move { inner.read_internal(primary_namespace, secondary_namespace, key).await }, + ) } fn write( &self, primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, @@ -164,9 +266,9 @@ impl KVStore for VssStore { inner_lock_ref, locking_key, version, - &primary_namespace, - &secondary_namespace, - &key, + primary_namespace, + secondary_namespace, + key, buf, ) .await @@ -187,9 +289,9 @@ impl KVStore for VssStore { inner_lock_ref, locking_key, version, - &primary_namespace, - &secondary_namespace, - &key, + primary_namespace, + secondary_namespace, + key, ) .await }) @@ -200,7 +302,14 @@ impl KVStore for VssStore { let primary_namespace = primary_namespace.to_string(); let secondary_namespace = secondary_namespace.to_string(); let inner = Arc::clone(&self.inner); - Box::pin(async move { inner.list_internal(&primary_namespace, &secondary_namespace).await }) + Box::pin(async move { inner.list_internal(primary_namespace, secondary_namespace).await }) + } +} + +impl Drop for VssStore { + fn drop(&mut self) { + let internal_runtime = self.internal_runtime.take(); + tokio::task::block_in_place(move || drop(internal_runtime)); } } @@ -300,11 +409,12 @@ impl VssStoreInner { } async fn read_internal( - &self, primary_namespace: &str, secondary_namespace: &str, key: &str, + &self, primary_namespace: String, secondary_namespace: String, key: String, ) -> io::Result> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "read")?; + check_namespace_key_validity(&primary_namespace, &secondary_namespace, Some(&key), "read")?; - let obfuscated_key = self.build_obfuscated_key(primary_namespace, secondary_namespace, key); + let obfuscated_key = + self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); let request = GetObjectRequest { store_id: self.store_id.clone(), key: obfuscated_key }; let resp = self.client.get_object(&request).await.map_err(|e| { let msg = format!( @@ -332,13 +442,18 @@ impl VssStoreInner { async fn write_internal( &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: &str, secondary_namespace: &str, key: &str, buf: Vec, + primary_namespace: String, secondary_namespace: String, key: String, buf: Vec, ) -> io::Result<()> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "write")?; + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "write", + )?; self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { let obfuscated_key = - self.build_obfuscated_key(primary_namespace, secondary_namespace, key); + self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); let vss_version = -1; let storable = self.storable_builder.build(buf, vss_version); let request = PutObjectRequest { @@ -367,13 +482,18 @@ impl VssStoreInner { async fn remove_internal( &self, inner_lock_ref: Arc>, locking_key: String, version: u64, - primary_namespace: &str, secondary_namespace: &str, key: &str, + primary_namespace: String, secondary_namespace: String, key: String, ) -> io::Result<()> { - check_namespace_key_validity(primary_namespace, secondary_namespace, Some(key), "remove")?; + check_namespace_key_validity( + &primary_namespace, + &secondary_namespace, + Some(&key), + "remove", + )?; self.execute_locked_write(inner_lock_ref, locking_key, version, async move || { let obfuscated_key = - self.build_obfuscated_key(primary_namespace, secondary_namespace, key); + self.build_obfuscated_key(&primary_namespace, &secondary_namespace, &key); let request = DeleteObjectRequest { store_id: self.store_id.clone(), key_value: Some(KeyValue { key: obfuscated_key, version: -1, value: vec![] }), @@ -393,12 +513,12 @@ impl VssStoreInner { } async fn list_internal( - &self, primary_namespace: &str, secondary_namespace: &str, + &self, primary_namespace: String, secondary_namespace: String, ) -> io::Result> { - check_namespace_key_validity(primary_namespace, secondary_namespace, None, "list")?; + check_namespace_key_validity(&primary_namespace, &secondary_namespace, None, "list")?; let keys = - self.list_all_keys(primary_namespace, secondary_namespace).await.map_err(|e| { + self.list_all_keys(&primary_namespace, &secondary_namespace).await.map_err(|e| { let msg = format!( "Failed to retrieve keys in namespace: {}/{} : {}", primary_namespace, secondary_namespace, e From 8cad63b3b022e27ee17315f75cfa30adcdbf3a95 Mon Sep 17 00:00:00 2001 From: Elias Rohrer Date: Mon, 20 Oct 2025 15:33:37 -0500 Subject: [PATCH 7/7] Fix and run all tests under `cfg(vss_test)` .. we previously avoided running some tests which turned out to be broken. --- .github/workflows/vss-integration.yml | 1 + src/io/vss_store.rs | 12 +++++++----- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/.github/workflows/vss-integration.yml b/.github/workflows/vss-integration.yml index 5f6e6065b..8473ed413 100644 --- a/.github/workflows/vss-integration.yml +++ b/.github/workflows/vss-integration.yml @@ -44,4 +44,5 @@ jobs: run: | cd ldk-node export TEST_VSS_BASE_URL="http://localhost:8080/vss" + RUSTFLAGS="--cfg vss_test" cargo test io::vss_store RUSTFLAGS="--cfg vss_test" cargo test --test integration_tests_vss diff --git a/src/io/vss_store.rs b/src/io/vss_store.rs index 4f90dd6db..ed8e13890 100644 --- a/src/io/vss_store.rs +++ b/src/io/vss_store.rs @@ -606,38 +606,40 @@ mod tests { use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng, RngCore}; - use tokio::runtime; use vss_client::headers::FixedHeaders; use super::*; use crate::io::test_utils::do_read_write_remove_list_persist; + use crate::logger::Logger; #[test] fn vss_read_write_remove_list_persist() { - let runtime = Arc::new(Runtime::new().unwrap()); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); let mut rng = thread_rng(); let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); + let logger = Arc::new(Logger::new_log_facade()); + let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); do_read_write_remove_list_persist(&vss_store); } #[tokio::test(flavor = "multi_thread", worker_threads = 1)] async fn vss_read_write_remove_list_persist_in_runtime_context() { - let runtime = Arc::new(Runtime::new().unwrap()); let vss_base_url = std::env::var("TEST_VSS_BASE_URL").unwrap(); let mut rng = thread_rng(); let rand_store_id: String = (0..7).map(|_| rng.sample(Alphanumeric) as char).collect(); let mut vss_seed = [0u8; 32]; rng.fill_bytes(&mut vss_seed); let header_provider = Arc::new(FixedHeaders::new(HashMap::new())); + let logger = Arc::new(Logger::new_log_facade()); + let runtime = Arc::new(Runtime::new(logger).unwrap()); let vss_store = - VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime).unwrap(); + VssStore::new(vss_base_url, rand_store_id, vss_seed, header_provider, runtime); do_read_write_remove_list_persist(&vss_store); drop(vss_store)