diff --git a/lightning/src/chain/chainmonitor.rs b/lightning/src/chain/chainmonitor.rs index 4abd0cd88c0..62b6c9f5541 100644 --- a/lightning/src/chain/chainmonitor.rs +++ b/lightning/src/chain/chainmonitor.rs @@ -26,6 +26,8 @@ use bitcoin::block::Header; use bitcoin::hash_types::{BlockHash, Txid}; +use bitcoin::secp256k1::PublicKey; + use crate::chain; use crate::chain::chaininterface::{BroadcasterInterface, FeeEstimator}; #[cfg(peer_storage)] @@ -57,7 +59,8 @@ use crate::util::persist::{KVStore, MonitorName, MonitorUpdatingPersisterAsync}; #[cfg(peer_storage)] use crate::util::ser::{VecWriter, Writeable}; use crate::util::wakers::{Future, Notifier}; -use bitcoin::secp256k1::PublicKey; + +use alloc::sync::Arc; #[cfg(peer_storage)] use core::iter::Cycle; use core::ops::Deref; @@ -267,6 +270,7 @@ pub struct AsyncPersister< FE::Target: FeeEstimator, { persister: MonitorUpdatingPersisterAsync, + event_notifier: Arc, } impl< @@ -314,7 +318,8 @@ where &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, ) -> ChannelMonitorUpdateStatus { - self.persister.spawn_async_persist_new_channel(monitor_name, monitor); + let notifier = Arc::clone(&self.event_notifier); + self.persister.spawn_async_persist_new_channel(monitor_name, monitor, notifier); ChannelMonitorUpdateStatus::InProgress } @@ -322,7 +327,8 @@ where &self, monitor_name: MonitorName, monitor_update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<::EcdsaSigner>, ) -> ChannelMonitorUpdateStatus { - self.persister.spawn_async_update_persisted_channel(monitor_name, monitor_update, monitor); + let notifier = Arc::clone(&self.event_notifier); + self.persister.spawn_async_update_channel(monitor_name, monitor_update, monitor, notifier); ChannelMonitorUpdateStatus::InProgress } @@ -382,7 +388,7 @@ pub struct ChainMonitor< /// A [`Notifier`] used to wake up the background processor in case we have any [`Event`]s for /// it to give to users (or [`MonitorEvent`]s for `ChannelManager` to process). - event_notifier: Notifier, + event_notifier: Arc, /// Messages to send to the peer. This is currently used to distribute PeerStorage to channel partners. pending_send_only_events: Mutex>, @@ -430,17 +436,18 @@ impl< persister: MonitorUpdatingPersisterAsync, _entropy_source: ES, _our_peerstorage_encryption_key: PeerStorageKey, ) -> Self { + let event_notifier = Arc::new(Notifier::new()); Self { monitors: RwLock::new(new_hash_map()), chain_source, broadcaster, logger, fee_estimator: feeest, - persister: AsyncPersister { persister }, _entropy_source, pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), - event_notifier: Notifier::new(), + event_notifier: Arc::clone(&event_notifier), + persister: AsyncPersister { persister, event_notifier }, pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, @@ -656,7 +663,7 @@ where _entropy_source, pending_monitor_events: Mutex::new(Vec::new()), highest_chain_height: AtomicUsize::new(0), - event_notifier: Notifier::new(), + event_notifier: Arc::new(Notifier::new()), pending_send_only_events: Mutex::new(Vec::new()), #[cfg(peer_storage)] our_peerstorage_encryption_key: _our_peerstorage_encryption_key, diff --git a/lightning/src/util/persist.rs b/lightning/src/util/persist.rs index e75f35e65cd..49addd7dbc3 100644 --- a/lightning/src/util/persist.rs +++ b/lightning/src/util/persist.rs @@ -38,6 +38,7 @@ use crate::util::async_poll::{dummy_waker, AsyncResult, MaybeSend, MaybeSync}; use crate::util::logger::Logger; use crate::util::native_async::FutureSpawner; use crate::util::ser::{Readable, ReadableArgs, Writeable}; +use crate::util::wakers::Notifier; /// The alphabet of characters allowed for namespaces and keys. pub const KVSTORE_NAMESPACE_KEY_ALPHABET: &str = @@ -875,6 +876,7 @@ where pub(crate) fn spawn_async_persist_new_channel( &self, monitor_name: MonitorName, monitor: &ChannelMonitor<::EcdsaSigner>, + notifier: Arc, ) { let inner = Arc::clone(&self.0); // Note that `persist_new_channel` is a sync method which calls all the way through to the @@ -884,7 +886,10 @@ where let completion = (monitor.channel_id(), monitor.get_latest_update_id()); self.0.future_spawner.spawn(async move { match future.await { - Ok(()) => inner.async_completed_updates.lock().unwrap().push(completion), + Ok(()) => { + inner.async_completed_updates.lock().unwrap().push(completion); + notifier.notify(); + }, Err(e) => { log_error!( inner.logger, @@ -895,9 +900,10 @@ where }); } - pub(crate) fn spawn_async_update_persisted_channel( + pub(crate) fn spawn_async_update_channel( &self, monitor_name: MonitorName, update: Option<&ChannelMonitorUpdate>, monitor: &ChannelMonitor<::EcdsaSigner>, + notifier: Arc, ) { let inner = Arc::clone(&self.0); // Note that `update_persisted_channel` is a sync method which calls all the way through to @@ -914,6 +920,7 @@ where match future.await { Ok(()) => if let Some(completion) = completion { inner.async_completed_updates.lock().unwrap().push(completion); + notifier.notify(); }, Err(e) => { log_error!(