From f53c5e9f6ae59cb550b1a095a893a0e9110660c2 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 23 Sep 2025 13:16:26 +0000 Subject: [PATCH 1/6] Drop unnecessary Arcs in Sweeper and LiquidityManager sync wrappers Both `OutputSweeperSync` and `LiquidityManagerSync` added `Arc`s to their internal state to allow returning a reference to that internal state in testing. While this is required to get a forever-lifetime reference to that state, we don't actually need that in testing. Instead, we turn off the borrow checker in the `lightning-background-processor` async tests where required, avoiding the extra heap indirection. --- lightning-background-processor/src/lib.rs | 51 +++++++++++++++++++---- lightning-liquidity/src/manager.rs | 40 ++++++++---------- lightning/src/util/sweep.rs | 40 ++++++++---------- 3 files changed, 79 insertions(+), 52 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 44ce52b8291..b4b0186bd9a 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1846,11 +1846,13 @@ mod tests { SCORER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::Writeable; - use lightning::util::sweep::{OutputSpendStatus, OutputSweeperSync, PRUNE_DELAY_BLOCKS}; + use lightning::util::sweep::{ + OutputSpendStatus, OutputSweeper, OutputSweeperSync, PRUNE_DELAY_BLOCKS, + }; use lightning::util::test_utils; use lightning::{get_event, get_event_msg}; use lightning_liquidity::utils::time::DefaultTimeProvider; - use lightning_liquidity::{ALiquidityManagerSync, LiquidityManagerSync}; + use lightning_liquidity::{ALiquidityManagerSync, LiquidityManager, LiquidityManagerSync}; use lightning_persister::fs_store::FilesystemStore; use lightning_rapid_gossip_sync::RapidGossipSync; use std::collections::VecDeque; @@ -2781,6 +2783,17 @@ mod tests { ); let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); + // Yes, you can unsafe { turn off the borrow checker } + let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe { + &*(nodes[0].liquidity_manager.get_lm_async() + as *const LiquidityManager<_, _, _, _, _, _>) + as &'static LiquidityManager<_, _, _, _, _, _> + }; + let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe { + &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>) + as &'static OutputSweeper<_, _, _, _, _, _, _> + }; + let bp_future = super::process_events_async( kv_store, |_: _| async { Ok(()) }, @@ -2789,8 +2802,8 @@ mod tests { Some(Arc::clone(&nodes[0].messenger)), nodes[0].rapid_gossip_sync(), Arc::clone(&nodes[0].peer_manager), - Some(nodes[0].liquidity_manager.get_lm_async()), - Some(nodes[0].sweeper.sweeper_async()), + Some(lm_async), + Some(sweeper_async), Arc::clone(&nodes[0].logger), Some(Arc::clone(&nodes[0].scorer)), move |dur: Duration| { @@ -3289,6 +3302,17 @@ mod tests { Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); + // Yes, you can unsafe { turn off the borrow checker } + let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe { + &*(nodes[0].liquidity_manager.get_lm_async() + as *const LiquidityManager<_, _, _, _, _, _>) + as &'static LiquidityManager<_, _, _, _, _, _> + }; + let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe { + &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>) + as &'static OutputSweeper<_, _, _, _, _, _, _> + }; + let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); let bp_future = super::process_events_async( kv_store, @@ -3298,8 +3322,8 @@ mod tests { Some(Arc::clone(&nodes[0].messenger)), nodes[0].rapid_gossip_sync(), Arc::clone(&nodes[0].peer_manager), - Some(nodes[0].liquidity_manager.get_lm_async()), - Some(nodes[0].sweeper.sweeper_async()), + Some(lm_async), + Some(sweeper_async), Arc::clone(&nodes[0].logger), Some(Arc::clone(&nodes[0].scorer)), move |dur: Duration| { @@ -3505,6 +3529,17 @@ mod tests { let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); + // Yes, you can unsafe { turn off the borrow checker } + let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe { + &*(nodes[0].liquidity_manager.get_lm_async() + as *const LiquidityManager<_, _, _, _, _, _>) + as &'static LiquidityManager<_, _, _, _, _, _> + }; + let sweeper_async: &'static OutputSweeper<_, _, _, _, _, _, _> = unsafe { + &*(nodes[0].sweeper.sweeper_async() as *const OutputSweeper<_, _, _, _, _, _, _>) + as &'static OutputSweeper<_, _, _, _, _, _, _> + }; + let bp_future = super::process_events_async( kv_store, event_handler, @@ -3513,8 +3548,8 @@ mod tests { Some(Arc::clone(&nodes[0].messenger)), nodes[0].no_gossip_sync(), Arc::clone(&nodes[0].peer_manager), - Some(nodes[0].liquidity_manager.get_lm_async()), - Some(nodes[0].sweeper.sweeper_async()), + Some(lm_async), + Some(sweeper_async), Arc::clone(&nodes[0].logger), Some(Arc::clone(&nodes[0].scorer)), move |dur: Duration| { diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index 490ca8b0bb4..d0925e11b24 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -195,15 +195,13 @@ pub trait ALiquidityManagerSync { #[cfg(any(test, feature = "_test_utils"))] fn get_lm_async( &self, - ) -> Arc< - LiquidityManager< - Self::ES, - Self::NS, - Self::CM, - Self::C, - Arc>, - Self::TP, - >, + ) -> &LiquidityManager< + Self::ES, + Self::NS, + Self::CM, + Self::C, + Arc>, + Self::TP, >; /// Returns a reference to the actual [`LiquidityManager`] object. fn get_lm( @@ -243,17 +241,15 @@ where #[cfg(any(test, feature = "_test_utils"))] fn get_lm_async( &self, - ) -> Arc< - LiquidityManager< - Self::ES, - Self::NS, - Self::CM, - Self::C, - Arc>, - Self::TP, - >, + ) -> &LiquidityManager< + Self::ES, + Self::NS, + Self::CM, + Self::C, + Arc>, + Self::TP, > { - Arc::clone(&self.inner) + &self.inner } fn get_lm(&self) -> &LiquidityManagerSync { self @@ -1040,7 +1036,7 @@ pub struct LiquidityManagerSync< KS::Target: KVStoreSync, TP::Target: TimeProvider, { - inner: Arc>, TP>>, + inner: LiquidityManager>, TP>, } #[cfg(feature = "time")] @@ -1089,7 +1085,7 @@ where unreachable!("LiquidityManager::new should not be pending in a sync context"); }, }?; - Ok(Self { inner: Arc::new(inner) }) + Ok(Self { inner }) } } @@ -1140,7 +1136,7 @@ where unreachable!("LiquidityManager::new should not be pending in a sync context"); }, }?; - Ok(Self { inner: Arc::new(inner) }) + Ok(Self { inner }) } /// Returns a reference to the LSPS0 client-side handler. diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index 8f3df5ec804..51cb5b3dfa9 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -981,16 +981,14 @@ where L::Target: Logger, O::Target: OutputSpender, { - sweeper: Arc< - OutputSweeper< - B, - Arc>, - E, - F, - Arc>, - L, - O, - >, + sweeper: OutputSweeper< + B, + Arc>, + E, + F, + Arc>, + L, + O, >, } @@ -1025,7 +1023,7 @@ where kv_store, logger, ); - Self { sweeper: Arc::new(sweeper) } + Self { sweeper } } /// Regenerates and broadcasts the spending transaction for any outputs that are pending. Wraps @@ -1074,18 +1072,16 @@ where #[cfg(any(test, feature = "_test_utils"))] pub fn sweeper_async( &self, - ) -> Arc< - OutputSweeper< - B, - Arc>, - E, - F, - Arc>, - L, - O, - >, + ) -> &OutputSweeper< + B, + Arc>, + E, + F, + Arc>, + L, + O, > { - Arc::clone(&self.sweeper) + &self.sweeper } } From d665bef3c5adf5cb7ee8d761a37b4d75744497dd Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 23 Sep 2025 13:23:53 +0000 Subject: [PATCH 2/6] Drop unnecessary 'static bounds on BP async methods Given the future returned from async BP methods will be concrete on whatever types get used, there's not much reason to require that the arguments be `'static`, we can let Rust figure out if they're `'static` (and thus the returned future is `'static`). --- lightning-background-processor/src/lib.rs | 124 +++++++++++----------- 1 file changed, 61 insertions(+), 63 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index b4b0186bd9a..57b4a11cad4 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -304,7 +304,7 @@ where /// Updates scorer based on event and returns whether an update occurred so we can decide whether /// to persist. -fn update_scorer<'a, S: 'static + Deref + Send + Sync, SC: 'a + WriteableScore<'a>>( +fn update_scorer<'a, S: Deref + Send + Sync, SC: 'a + WriteableScore<'a>>( scorer: &'a S, event: &Event, duration_since_epoch: Duration, ) -> bool { match event { @@ -866,31 +866,30 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp ///``` pub async fn process_events_async< 'a, - UL: 'static + Deref, - CF: 'static + Deref, - T: 'static + Deref, - F: 'static + Deref, - G: 'static + Deref>, - L: 'static + Deref, - P: 'static + Deref, + UL: Deref, + CF: Deref, + T: Deref, + F: Deref, + G: Deref>, + L: Deref, + P: Deref, EventHandlerFuture: core::future::Future>, EventHandler: Fn(Event) -> EventHandlerFuture, - ES: 'static + Deref + Send, - M: 'static - + Deref::Signer, CF, T, F, L, P, ES>> + ES: Deref + Send, + M: Deref::Signer, CF, T, F, L, P, ES>> + Send + Sync, - CM: 'static + Deref, - OM: 'static + Deref, - PGS: 'static + Deref>, - RGS: 'static + Deref>, - PM: 'static + Deref, - LM: 'static + Deref, - D: 'static + Deref, - O: 'static + Deref, - K: 'static + Deref, - OS: 'static + Deref>, - S: 'static + Deref + Send + Sync, + CM: Deref, + OM: Deref, + PGS: Deref>, + RGS: Deref>, + PM: Deref, + LM: Deref, + D: Deref, + O: Deref, + K: Deref, + OS: Deref>, + S: Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, Sleeper: Fn(Duration) -> SleepFuture, @@ -902,20 +901,20 @@ pub async fn process_events_async< sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, ) -> Result<(), lightning::io::Error> where - UL::Target: 'static + UtxoLookup, - CF::Target: 'static + chain::Filter, - T::Target: 'static + BroadcasterInterface, - F::Target: 'static + FeeEstimator, - L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, - ES::Target: 'static + EntropySource, + UL::Target: UtxoLookup, + CF::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: Persist<::Signer>, + ES::Target: EntropySource, CM::Target: AChannelManager, OM::Target: AOnionMessenger, PM::Target: APeerManager, LM::Target: ALiquidityManager, - O::Target: 'static + OutputSpender, - D::Target: 'static + ChangeDestinationSource, - K::Target: 'static + KVStore, + O::Target: OutputSpender, + D::Target: ChangeDestinationSource, + K::Target: KVStore, { let async_event_handler = |event| { let network_graph = gossip_sync.network_graph(); @@ -1340,31 +1339,30 @@ fn check_and_reset_sleeper< /// Async events processor that is based on [`process_events_async`] but allows for [`KVStoreSync`] to be used for /// synchronous background persistence. pub async fn process_events_async_with_kv_store_sync< - UL: 'static + Deref, - CF: 'static + Deref, - T: 'static + Deref, - F: 'static + Deref, - G: 'static + Deref>, - L: 'static + Deref + Send + Sync, - P: 'static + Deref, + UL: Deref, + CF: Deref, + T: Deref, + F: Deref, + G: Deref>, + L: Deref + Send + Sync, + P: Deref, EventHandlerFuture: core::future::Future>, EventHandler: Fn(Event) -> EventHandlerFuture, - ES: 'static + Deref + Send, - M: 'static - + Deref::Signer, CF, T, F, L, P, ES>> + ES: Deref + Send, + M: Deref::Signer, CF, T, F, L, P, ES>> + Send + Sync, - CM: 'static + Deref + Send + Sync, - OM: 'static + Deref, - PGS: 'static + Deref>, - RGS: 'static + Deref>, - PM: 'static + Deref, - LM: 'static + Deref, - D: 'static + Deref, - O: 'static + Deref, - K: 'static + Deref, - OS: 'static + Deref, L, O>>, - S: 'static + Deref + Send + Sync, + CM: Deref + Send + Sync, + OM: Deref, + PGS: Deref>, + RGS: Deref>, + PM: Deref, + LM: Deref, + D: Deref, + O: Deref, + K: Deref, + OS: Deref, L, O>>, + S: Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, Sleeper: Fn(Duration) -> SleepFuture, @@ -1376,20 +1374,20 @@ pub async fn process_events_async_with_kv_store_sync< sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime, ) -> Result<(), lightning::io::Error> where - UL::Target: 'static + UtxoLookup, - CF::Target: 'static + chain::Filter, - T::Target: 'static + BroadcasterInterface, - F::Target: 'static + FeeEstimator, - L::Target: 'static + Logger, - P::Target: 'static + Persist<::Signer>, - ES::Target: 'static + EntropySource, + UL::Target: UtxoLookup, + CF::Target: chain::Filter, + T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + P::Target: Persist<::Signer>, + ES::Target: EntropySource, CM::Target: AChannelManager, OM::Target: AOnionMessenger, PM::Target: APeerManager, LM::Target: ALiquidityManager, - O::Target: 'static + OutputSpender, - D::Target: 'static + ChangeDestinationSource, - K::Target: 'static + KVStoreSync, + O::Target: OutputSpender, + D::Target: ChangeDestinationSource, + K::Target: KVStoreSync, { let kv_store = KVStoreSyncWrapper(kv_store); process_events_async( From 603f90188a5eb96a36e270d561e6b36283077910 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Tue, 23 Sep 2025 13:38:42 +0000 Subject: [PATCH 3/6] Drop unnecessary `Arc`s around `KVStoreWrapper` --- lightning-background-processor/src/lib.rs | 6 ++--- lightning-liquidity/src/manager.rs | 30 +++++++++-------------- lightning/src/util/persist.rs | 1 + lightning/src/util/sweep.rs | 6 ++--- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 57b4a11cad4..a5df6b30f9b 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -2779,7 +2779,7 @@ mod tests { let kv_store_sync = Arc::new( Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"), ); - let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); + let kv_store = KVStoreSyncWrapper(kv_store_sync); // Yes, you can unsafe { turn off the borrow checker } let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe { @@ -3298,7 +3298,7 @@ mod tests { let data_dir = nodes[0].kv_store.get_data_dir(); let kv_store_sync = Arc::new(Persister::new(data_dir).with_graph_persistence_notifier(sender)); - let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); + let kv_store = KVStoreSyncWrapper(kv_store_sync); // Yes, you can unsafe { turn off the borrow checker } let lm_async: &'static LiquidityManager<_, _, _, _, _, _> = unsafe { @@ -3523,7 +3523,7 @@ mod tests { let (_, nodes) = create_nodes(1, "test_payment_path_scoring_async"); let data_dir = nodes[0].kv_store.get_data_dir(); let kv_store_sync = Arc::new(Persister::new(data_dir)); - let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); + let kv_store = KVStoreSyncWrapper(kv_store_sync); let (exit_sender, exit_receiver) = tokio::sync::watch::channel(()); diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index d0925e11b24..a2def4bde08 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -200,7 +200,7 @@ pub trait ALiquidityManagerSync { Self::NS, Self::CM, Self::C, - Arc>, + KVStoreSyncWrapper, Self::TP, >; /// Returns a reference to the actual [`LiquidityManager`] object. @@ -246,7 +246,7 @@ where Self::NS, Self::CM, Self::C, - Arc>, + KVStoreSyncWrapper, Self::TP, > { &self.inner @@ -1036,7 +1036,7 @@ pub struct LiquidityManagerSync< KS::Target: KVStoreSync, TP::Target: TimeProvider, { - inner: LiquidityManager>, TP>, + inner: LiquidityManager, TP>, } #[cfg(feature = "time")] @@ -1063,7 +1063,7 @@ where service_config: Option, client_config: Option, ) -> Result { - let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); + let kv_store = KVStoreSyncWrapper(kv_store_sync); let mut fut = Box::pin(LiquidityManager::new( entropy_source, @@ -1114,7 +1114,7 @@ where service_config: Option, client_config: Option, time_provider: TP, ) -> Result { - let kv_store = Arc::new(KVStoreSyncWrapper(kv_store_sync)); + let kv_store = KVStoreSyncWrapper(kv_store_sync); let mut fut = Box::pin(LiquidityManager::new_with_custom_time_provider( entropy_source, node_signer, @@ -1142,7 +1142,7 @@ where /// Returns a reference to the LSPS0 client-side handler. /// /// Wraps [`LiquidityManager::lsps0_client_handler`]. - pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler>> { + pub fn lsps0_client_handler(&self) -> &LSPS0ClientHandler> { self.inner.lsps0_client_handler() } @@ -1156,9 +1156,7 @@ where /// Returns a reference to the LSPS1 client-side handler. /// /// Wraps [`LiquidityManager::lsps1_client_handler`]. - pub fn lsps1_client_handler( - &self, - ) -> Option<&LSPS1ClientHandler>>> { + pub fn lsps1_client_handler(&self) -> Option<&LSPS1ClientHandler>> { self.inner.lsps1_client_handler() } @@ -1168,16 +1166,14 @@ where #[cfg(lsps1_service)] pub fn lsps1_service_handler( &self, - ) -> Option<&LSPS1ServiceHandler>>> { + ) -> Option<&LSPS1ServiceHandler>> { self.inner.lsps1_service_handler() } /// Returns a reference to the LSPS2 client-side handler. /// /// Wraps [`LiquidityManager::lsps2_client_handler`]. - pub fn lsps2_client_handler( - &self, - ) -> Option<&LSPS2ClientHandler>>> { + pub fn lsps2_client_handler(&self) -> Option<&LSPS2ClientHandler>> { self.inner.lsps2_client_handler() } @@ -1186,16 +1182,14 @@ where /// Wraps [`LiquidityManager::lsps2_service_handler`]. pub fn lsps2_service_handler<'a>( &'a self, - ) -> Option>>> { + ) -> Option>> { self.inner.lsps2_service_handler.as_ref().map(|r| LSPS2ServiceHandlerSync::from_inner(r)) } /// Returns a reference to the LSPS5 client-side handler. /// /// Wraps [`LiquidityManager::lsps5_client_handler`]. - pub fn lsps5_client_handler( - &self, - ) -> Option<&LSPS5ClientHandler>>> { + pub fn lsps5_client_handler(&self) -> Option<&LSPS5ClientHandler>> { self.inner.lsps5_client_handler() } @@ -1204,7 +1198,7 @@ where /// Wraps [`LiquidityManager::lsps5_service_handler`]. pub fn lsps5_service_handler( &self, - ) -> Option<&LSPS5ServiceHandler>, TP>> { + ) -> Option<&LSPS5ServiceHandler, TP>> { self.inner.lsps5_service_handler() } diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index 9036a27f49c..a646e09bd22 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -139,6 +139,7 @@ pub trait KVStoreSync { /// A wrapper around a [`KVStoreSync`] that implements the [`KVStore`] trait. It is not necessary to use this type /// directly. +#[derive(Clone)] pub struct KVStoreSyncWrapper(pub K) where K::Target: KVStoreSync; diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index 51cb5b3dfa9..052a56de2d5 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -986,7 +986,7 @@ where Arc>, E, F, - Arc>, + KVStoreSyncWrapper, L, O, >, @@ -1011,7 +1011,7 @@ where let change_destination_source = Arc::new(ChangeDestinationSourceSyncWrapper::new(change_destination_source)); - let kv_store = Arc::new(KVStoreSyncWrapper(kv_store)); + let kv_store = KVStoreSyncWrapper(kv_store); let sweeper = OutputSweeper::new( best_block, @@ -1077,7 +1077,7 @@ where Arc>, E, F, - Arc>, + KVStoreSyncWrapper, L, O, > { From 552cd39fff51daa806b9496b0bc2788eb1a43741 Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 24 Sep 2025 18:34:05 +0000 Subject: [PATCH 4/6] Drop `Arc`s around `lightning-liquidity`'s `DefaultTimeProvider` There's no need for these given `DefaultTimeProvider` is a unit struct. --- lightning-background-processor/src/lib.rs | 2 +- lightning-liquidity/src/manager.rs | 7 +++---- lightning-liquidity/src/utils/time.rs | 12 ++++++++++++ 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index a5df6b30f9b..86576e72655 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -1953,7 +1953,7 @@ mod tests { Arc, Arc, Arc, - Arc, + DefaultTimeProvider, >; struct Node { diff --git a/lightning-liquidity/src/manager.rs b/lightning-liquidity/src/manager.rs index a2def4bde08..9b4523383ae 100644 --- a/lightning-liquidity/src/manager.rs +++ b/lightning-liquidity/src/manager.rs @@ -318,7 +318,7 @@ impl< CM: Deref + Clone, C: Deref + Clone, K: Deref + Clone, - > LiquidityManager> + > LiquidityManager where ES::Target: EntropySource, NS::Target: NodeSigner, @@ -335,7 +335,6 @@ where service_config: Option, client_config: Option, ) -> Result { - let time_provider = Arc::new(DefaultTimeProvider); Self::new_with_custom_time_provider( entropy_source, node_signer, @@ -345,7 +344,7 @@ where kv_store, service_config, client_config, - time_provider, + DefaultTimeProvider, ) .await } @@ -1046,7 +1045,7 @@ impl< CM: Deref + Clone, C: Deref + Clone, KS: Deref + Clone, - > LiquidityManagerSync> + > LiquidityManagerSync where ES::Target: EntropySource, NS::Target: NodeSigner, diff --git a/lightning-liquidity/src/utils/time.rs b/lightning-liquidity/src/utils/time.rs index 5f1622b4fe6..a8b2bb4acda 100644 --- a/lightning-liquidity/src/utils/time.rs +++ b/lightning-liquidity/src/utils/time.rs @@ -12,6 +12,11 @@ pub trait TimeProvider { } /// Default time provider using the system clock. +/// +/// You likely don't need to use this directly, it is used automatically with +/// [`LiquidityManager::new`] +/// +/// [`LiquidityManager::new`]: crate::manager::LiquidityManager::new #[derive(Clone, Debug)] #[cfg(feature = "time")] pub struct DefaultTimeProvider; @@ -23,3 +28,10 @@ impl TimeProvider for DefaultTimeProvider { SystemTime::now().duration_since(UNIX_EPOCH).expect("system time before Unix epoch") } } +#[cfg(feature = "time")] +impl core::ops::Deref for DefaultTimeProvider { + type Target = Self; + fn deref(&self) -> &Self { + self + } +} From b68f128c523fc2dab17c8585d933d3d52cb37c2a Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 24 Sep 2025 18:35:50 +0000 Subject: [PATCH 5/6] Allow passing an `OutputSweeperSync` to the sync-KVStore-async-BP `OutputSweeper::new_with_kv_store_sync` is a pretty strange API - it allows building an async `OutputSweeper` where the only `await`s are on a sync `KVStore`, ie will immediately block until the IO operation completes. While this isn't broken (futures are allowed to take their time, and async runtimes have to handle this, though they often don't handle it particularly well), its pretty weird. It seems to exist largely for `process_events_async_with_kv_store_sync`, which does async `Event` handling but sync `KVStore` operations (like the existing pre-0.2 "async" background processor). Instead, we allow passing an `OutputSweeperSync` to `process_events_async_with_kv_store_sync`, keeping the API consistent such that a user would use the appropriate `OutputSweeper` variant, but fetching the inner async `OutputSweeper` inside the BP. --- lightning-background-processor/src/lib.rs | 18 +++--- lightning/src/sign/mod.rs | 19 ++++-- lightning/src/util/sweep.rs | 75 ++++------------------- 3 files changed, 33 insertions(+), 79 deletions(-) diff --git a/lightning-background-processor/src/lib.rs b/lightning-background-processor/src/lib.rs index 86576e72655..627b4e673b8 100644 --- a/lightning-background-processor/src/lib.rs +++ b/lightning-background-processor/src/lib.rs @@ -48,11 +48,9 @@ use lightning::onion_message::messenger::AOnionMessenger; use lightning::routing::gossip::{NetworkGraph, P2PGossipSync}; use lightning::routing::scoring::{ScoreUpdate, WriteableScore}; use lightning::routing::utxo::UtxoLookup; -use lightning::sign::ChangeDestinationSource; -#[cfg(feature = "std")] -use lightning::sign::ChangeDestinationSourceSync; -use lightning::sign::EntropySource; -use lightning::sign::OutputSpender; +use lightning::sign::{ + ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender, +}; use lightning::util::logger::Logger; use lightning::util::persist::{ KVStore, KVStoreSync, KVStoreSyncWrapper, CHANNEL_MANAGER_PERSISTENCE_KEY, @@ -61,9 +59,7 @@ use lightning::util::persist::{ NETWORK_GRAPH_PERSISTENCE_SECONDARY_NAMESPACE, SCORER_PERSISTENCE_KEY, SCORER_PERSISTENCE_PRIMARY_NAMESPACE, SCORER_PERSISTENCE_SECONDARY_NAMESPACE, }; -use lightning::util::sweep::OutputSweeper; -#[cfg(feature = "std")] -use lightning::util::sweep::OutputSweeperSync; +use lightning::util::sweep::{OutputSweeper, OutputSweeperSync}; #[cfg(feature = "std")] use lightning::util::wakers::Sleeper; use lightning_rapid_gossip_sync::RapidGossipSync; @@ -1361,7 +1357,7 @@ pub async fn process_events_async_with_kv_store_sync< D: Deref, O: Deref, K: Deref, - OS: Deref, L, O>>, + OS: Deref>, S: Deref + Send + Sync, SC: for<'b> WriteableScore<'b>, SleepFuture: core::future::Future + core::marker::Unpin, @@ -1386,7 +1382,7 @@ where PM::Target: APeerManager, LM::Target: ALiquidityManager, O::Target: OutputSpender, - D::Target: ChangeDestinationSource, + D::Target: ChangeDestinationSourceSync, K::Target: KVStoreSync, { let kv_store = KVStoreSyncWrapper(kv_store); @@ -1399,7 +1395,7 @@ where gossip_sync, peer_manager, liquidity_manager, - sweeper, + sweeper.as_ref().map(|os| os.sweeper_async()), logger, scorer, sleeper, diff --git a/lightning/src/sign/mod.rs b/lightning/src/sign/mod.rs index f9db5ff4672..c0bbb94b41e 100644 --- a/lightning/src/sign/mod.rs +++ b/lightning/src/sign/mod.rs @@ -1052,12 +1052,11 @@ pub trait ChangeDestinationSourceSync { } /// A wrapper around [`ChangeDestinationSource`] to allow for async calls. -#[cfg(any(test, feature = "_test_utils"))] +/// +/// You should likely never use this directly but rather allow LDK to build this when required to +/// build higher-level sync wrappers. +#[doc(hidden)] pub struct ChangeDestinationSourceSyncWrapper(T) -where - T::Target: ChangeDestinationSourceSync; -#[cfg(not(any(test, feature = "_test_utils")))] -pub(crate) struct ChangeDestinationSourceSyncWrapper(T) where T::Target: ChangeDestinationSourceSync; @@ -1080,6 +1079,16 @@ where } } +impl Deref for ChangeDestinationSourceSyncWrapper +where + T::Target: ChangeDestinationSourceSync, +{ + type Target = Self; + fn deref(&self) -> &Self { + self + } +} + mod sealed { use bitcoin::secp256k1::{Scalar, SecretKey}; diff --git a/lightning/src/util/sweep.rs b/lightning/src/util/sweep.rs index 052a56de2d5..b60d4d8281f 100644 --- a/lightning/src/util/sweep.rs +++ b/lightning/src/util/sweep.rs @@ -19,7 +19,6 @@ use crate::sign::{ ChangeDestinationSource, ChangeDestinationSourceSync, ChangeDestinationSourceSyncWrapper, OutputSpender, SpendableOutputDescriptor, }; -use crate::sync::Arc; use crate::sync::Mutex; use crate::util::logger::Logger; use crate::util::persist::{ @@ -353,47 +352,6 @@ where logger: L, } -impl - OutputSweeper, L, O> -where - B::Target: BroadcasterInterface, - D::Target: ChangeDestinationSource, - E::Target: FeeEstimator, - F::Target: Filter + Send + Sync, - K::Target: KVStoreSync, - L::Target: Logger, - O::Target: OutputSpender, -{ - /// Constructs a new [`OutputSweeper`] based on a [`KVStoreSync`]. - pub fn new_with_kv_store_sync( - best_block: BestBlock, broadcaster: B, fee_estimator: E, chain_data_source: Option, - output_spender: O, change_destination_source: D, kv_store_sync: K, logger: L, - ) -> Self { - let kv_store = KVStoreSyncWrapper(kv_store_sync); - - Self::new( - best_block, - broadcaster, - fee_estimator, - chain_data_source, - output_spender, - change_destination_source, - kv_store, - logger, - ) - } - - /// Reads an [`OutputSweeper`] from the given reader and returns it with a synchronous [`KVStoreSync`]. - pub fn read_with_kv_store_sync( - reader: &mut R, args: (B, E, Option, O, D, K, L), - ) -> Result { - let kv_store = KVStoreSyncWrapper(args.5); - let args = (args.0, args.1, args.2, args.3, args.4, kv_store, args.6); - - Self::read(reader, args) - } -} - impl OutputSweeper where @@ -981,15 +939,8 @@ where L::Target: Logger, O::Target: OutputSpender, { - sweeper: OutputSweeper< - B, - Arc>, - E, - F, - KVStoreSyncWrapper, - L, - O, - >, + sweeper: + OutputSweeper, E, F, KVStoreSyncWrapper, L, O>, } impl @@ -1009,7 +960,7 @@ where output_spender: O, change_destination_source: D, kv_store: K, logger: L, ) -> Self { let change_destination_source = - Arc::new(ChangeDestinationSourceSyncWrapper::new(change_destination_source)); + ChangeDestinationSourceSyncWrapper::new(change_destination_source); let kv_store = KVStoreSyncWrapper(kv_store); @@ -1068,19 +1019,17 @@ where self.sweeper.tracked_spendable_outputs() } - /// Returns the inner async sweeper for testing purposes. - #[cfg(any(test, feature = "_test_utils"))] + /// Fetch the inner async sweeper. + /// + /// In general you shouldn't have much reason to use this - you have a sync [`KVStore`] backing + /// this [`OutputSweeperSync`], fetching an async [`OutputSweeper`] won't accomplish much, all + /// the async methods will hang waiting on your sync [`KVStore`] and likely confuse your async + /// runtime. This exists primarily for LDK-internal use, including outside of this crate. + #[doc(hidden)] pub fn sweeper_async( &self, - ) -> &OutputSweeper< - B, - Arc>, - E, - F, - KVStoreSyncWrapper, - L, - O, - > { + ) -> &OutputSweeper, E, F, KVStoreSyncWrapper, L, O> + { &self.sweeper } } From 5e679b4332d0edd03fa790965b52247c4d6b8f7e Mon Sep 17 00:00:00 2001 From: Matt Corallo Date: Wed, 24 Sep 2025 18:35:54 +0000 Subject: [PATCH 6/6] Drop unnecessary `Arc`s in `lightning-liquidity`'s `EventQueue` --- lightning-liquidity/src/events/event_queue.rs | 38 +++++++++---------- 1 file changed, 17 insertions(+), 21 deletions(-) diff --git a/lightning-liquidity/src/events/event_queue.rs b/lightning-liquidity/src/events/event_queue.rs index dc68ee61a6e..cd1162cee31 100644 --- a/lightning-liquidity/src/events/event_queue.rs +++ b/lightning-liquidity/src/events/event_queue.rs @@ -29,10 +29,10 @@ pub(crate) struct EventQueue where K::Target: KVStore, { - state: Arc>, - waker: Arc>>, + state: Mutex, + waker: Mutex>, #[cfg(feature = "std")] - condvar: Arc, + condvar: crate::sync::Condvar, kv_store: K, persist_notifier: Arc, } @@ -44,13 +44,13 @@ where pub fn new( queue: VecDeque, kv_store: K, persist_notifier: Arc, ) -> Self { - let state = Arc::new(Mutex::new(QueueState { queue, needs_persist: false })); - let waker = Arc::new(Mutex::new(None)); + let state = Mutex::new(QueueState { queue, needs_persist: false }); + let waker = Mutex::new(None); Self { state, waker, #[cfg(feature = "std")] - condvar: Arc::new(crate::sync::Condvar::new()), + condvar: crate::sync::Condvar::new(), kv_store, persist_notifier, } @@ -74,12 +74,7 @@ where } pub async fn next_event_async(&self) -> LiquidityEvent { - EventFuture { - queue_state: Arc::clone(&self.state), - waker: Arc::clone(&self.waker), - persist_notifier: Arc::clone(&self.persist_notifier), - } - .await + EventFuture(self).await } #[cfg(feature = "std")] @@ -213,31 +208,32 @@ where } } -struct EventFuture { - queue_state: Arc>, - waker: Arc>>, - persist_notifier: Arc, -} +struct EventFuture<'a, K: Deref + Clone>(&'a EventQueue) +where + K::Target: KVStore; -impl Future for EventFuture { +impl Future for EventFuture<'_, K> +where + K::Target: KVStore, +{ type Output = LiquidityEvent; fn poll( self: core::pin::Pin<&mut Self>, cx: &mut core::task::Context<'_>, ) -> core::task::Poll { let (res, should_persist_notify) = { - let mut state_lock = self.queue_state.lock().unwrap(); + let mut state_lock = self.0.state.lock().unwrap(); if let Some(event) = state_lock.queue.pop_front() { state_lock.needs_persist = true; (Poll::Ready(event), true) } else { - *self.waker.lock().unwrap() = Some(cx.waker().clone()); + *self.0.waker.lock().unwrap() = Some(cx.waker().clone()); (Poll::Pending, false) } }; if should_persist_notify { - self.persist_notifier.notify(); + self.0.persist_notifier.notify(); } res