Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
101 changes: 58 additions & 43 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,11 @@ mod fwd_batch;

use fwd_batch::BatchDelay;

#[cfg(not(c_bindings))]
use lightning::chain;
#[cfg(not(c_bindings))]
use lightning::chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use lightning::chain::chainmonitor::{ChainMonitor, Persist};
use lightning::chain::chainmonitor::AChainMonitor;
#[cfg(feature = "std")]
use lightning::events::EventHandler;
#[cfg(feature = "std")]
Expand All @@ -50,9 +52,9 @@ use lightning::onion_message::messenger::AOnionMessenger;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::scoring::{ScoreUpdate, WriteableScore};
use lightning::routing::utxo::UtxoLookup;
use lightning::sign::{
ChangeDestinationSource, ChangeDestinationSourceSync, EntropySource, OutputSpender,
};
#[cfg(not(c_bindings))]
use lightning::sign::EntropySource;
use lightning::sign::{ChangeDestinationSource, ChangeDestinationSourceSync, OutputSpender};
#[cfg(not(c_bindings))]
use lightning::util::async_poll::MaybeSend;
use lightning::util::logger::Logger;
Expand Down Expand Up @@ -118,6 +120,7 @@ use alloc::vec::Vec;
///
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
/// [`ChainMonitor::rebroadcast_pending_claims`]: lightning::chain::chainmonitor::ChainMonitor::rebroadcast_pending_claims
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
/// [`Event`]: lightning::events::Event
/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
Expand Down Expand Up @@ -923,16 +926,11 @@ use futures_util::{dummy_waker, Joiner, OptionalSelector, Selector, SelectorOutp
pub async fn process_events_async<
'a,
UL: UtxoLookup,
CF: chain::Filter,
T: BroadcasterInterface,
F: FeeEstimator,
G: Deref<Target = NetworkGraph<L>>,
L: Logger,
P: Deref,
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
ES: EntropySource,
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
M: Deref,
CM: Deref,
OM: Deref,
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
Expand All @@ -942,7 +940,17 @@ pub async fn process_events_async<
D: Deref,
O: OutputSpender,
K: KVStore,
OS: Deref<Target = OutputSweeper<T, D, F, CF, K, L, O>>,
OS: Deref<
Target = OutputSweeper<
<M::Target as AChainMonitor>::Broadcaster,
D,
<M::Target as AChainMonitor>::FeeEstimator,
<M::Target as AChainMonitor>::Filter,
K,
L,
O,
>,
>,
S: Deref<Target = SC>,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Expand All @@ -955,7 +963,7 @@ pub async fn process_events_async<
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
Expand Down Expand Up @@ -1004,7 +1012,7 @@ where
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
channel_manager.get_cm().timer_tick_occurred();
log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
chain_monitor.rebroadcast_pending_claims();
chain_monitor.get_cm().rebroadcast_pending_claims();

let mut last_freshness_call = sleeper(FRESHNESS_TIMER);
let mut last_onion_message_handler_call = sleeper(ONION_MESSAGE_HANDLER_TIMER);
Expand All @@ -1022,7 +1030,7 @@ where

loop {
channel_manager.get_cm().process_pending_events_async(async_event_handler).await;
chain_monitor.process_pending_events_async(async_event_handler).await;
chain_monitor.get_cm().process_pending_events_async(async_event_handler).await;
if let Some(om) = &onion_messenger {
om.get_om().process_pending_events_async(async_event_handler).await
}
Expand Down Expand Up @@ -1072,7 +1080,7 @@ where
let fut = Selector {
a: sleeper(sleep_delay),
b: channel_manager.get_cm().get_event_or_persistence_needed_future(),
c: chain_monitor.get_update_future(),
c: chain_monitor.get_cm().get_update_future(),
d: om_fut,
e: lm_fut,
f: gv_fut,
Expand Down Expand Up @@ -1164,7 +1172,7 @@ where
};
if archive_timer_elapsed {
log_trace!(logger, "Archiving stale ChannelMonitors.");
chain_monitor.archive_fully_resolved_channel_monitors();
chain_monitor.get_cm().archive_fully_resolved_channel_monitors();
have_archived = true;
log_trace!(logger, "Archived stale ChannelMonitors.");
}
Expand Down Expand Up @@ -1354,7 +1362,7 @@ where
match check_and_reset_sleeper(&mut last_rebroadcast_call, || sleeper(REBROADCAST_TIMER)) {
Some(false) => {
log_trace!(logger, "Rebroadcasting monitor's pending claims");
chain_monitor.rebroadcast_pending_claims();
chain_monitor.get_cm().rebroadcast_pending_claims();
},
Some(true) => break,
None => {},
Expand Down Expand Up @@ -1416,16 +1424,11 @@ fn check_and_reset_sleeper<
/// synchronous background persistence.
pub async fn process_events_async_with_kv_store_sync<
UL: UtxoLookup,
CF: chain::Filter,
T: BroadcasterInterface,
F: FeeEstimator,
G: Deref<Target = NetworkGraph<L>>,
L: Logger,
P: Deref,
EventHandlerFuture: core::future::Future<Output = Result<(), ReplayEvent>>,
EventHandler: Fn(Event) -> EventHandlerFuture,
ES: EntropySource,
M: Deref<Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>>,
M: Deref,
CM: Deref,
OM: Deref,
PGS: Deref<Target = P2PGossipSync<G, UL, L>>,
Expand All @@ -1435,7 +1438,17 @@ pub async fn process_events_async_with_kv_store_sync<
D: Deref,
O: OutputSpender,
K: Deref,
OS: Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>>,
OS: Deref<
Target = OutputSweeperSync<
<M::Target as AChainMonitor>::Broadcaster,
D,
<M::Target as AChainMonitor>::FeeEstimator,
<M::Target as AChainMonitor>::Filter,
K,
L,
O,
>,
>,
S: Deref<Target = SC>,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Expand All @@ -1448,7 +1461,7 @@ pub async fn process_events_async_with_kv_store_sync<
sleeper: Sleeper, mobile_interruptable_platform: bool, fetch_time: FetchTime,
) -> Result<(), lightning::io::Error>
where
P::Target: Persist<<CM::Target as AChannelManager>::Signer>,
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
Expand Down Expand Up @@ -1523,20 +1536,10 @@ impl BackgroundProcessor {
pub fn start<
'a,
UL: 'static + UtxoLookup,
CF: 'static + chain::Filter,
T: 'static + BroadcasterInterface,
F: 'static + FeeEstimator + Send,
G: 'static + Deref<Target = NetworkGraph<L>>,
L: 'static + Deref + Send,
P: 'static + Deref,
EH: 'static + EventHandler + Send,
ES: 'static + EntropySource + Send,
M: 'static
+ Deref<
Target = ChainMonitor<<CM::Target as AChannelManager>::Signer, CF, T, F, L, P, ES>,
>
+ Send
+ Sync,
M: 'static + Deref + Send + Sync,
CM: 'static + Deref + Send,
OM: 'static + Deref + Send,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send,
Expand All @@ -1548,15 +1551,27 @@ impl BackgroundProcessor {
D: 'static + Deref,
O: 'static + OutputSpender,
K: 'static + Deref + Send,
OS: 'static + Deref<Target = OutputSweeperSync<T, D, F, CF, K, L, O>> + Send,
OS: 'static
+ Deref<
Target = OutputSweeperSync<
<M::Target as AChainMonitor>::Broadcaster,
D,
<M::Target as AChainMonitor>::FeeEstimator,
<M::Target as AChainMonitor>::Filter,
K,
L,
O,
>,
>
+ Send,
>(
kv_store: K, event_handler: EH, chain_monitor: M, channel_manager: CM,
onion_messenger: Option<OM>, gossip_sync: GossipSync<PGS, RGS, G, UL, L>, peer_manager: PM,
liquidity_manager: Option<LM>, sweeper: Option<OS>, logger: L, scorer: Option<S>,
) -> Self
where
L::Target: 'static + Logger,
P::Target: 'static + Persist<<CM::Target as AChannelManager>::Signer>,
M::Target: AChainMonitor<Signer = <CM::Target as AChannelManager>::Signer, Logger = L>,
CM::Target: AChannelManager,
OM::Target: AOnionMessenger,
PM::Target: APeerManager,
Expand Down Expand Up @@ -1596,7 +1611,7 @@ impl BackgroundProcessor {
log_trace!(logger, "Calling ChannelManager's timer_tick_occurred on startup");
channel_manager.get_cm().timer_tick_occurred();
log_trace!(logger, "Rebroadcasting monitor's pending claims on startup");
chain_monitor.rebroadcast_pending_claims();
chain_monitor.get_cm().rebroadcast_pending_claims();

let mut last_freshness_call = Instant::now();
let mut last_onion_message_handler_call = Instant::now();
Expand All @@ -1615,7 +1630,7 @@ impl BackgroundProcessor {

loop {
channel_manager.get_cm().process_pending_events(&event_handler);
chain_monitor.process_pending_events(&event_handler);
chain_monitor.get_cm().process_pending_events(&event_handler);
if let Some(om) = &onion_messenger {
om.get_om().process_pending_events(&event_handler)
};
Expand Down Expand Up @@ -1648,7 +1663,7 @@ impl BackgroundProcessor {
let gv_fut = gossip_sync.validation_completion_future();
let always_futures = [
channel_manager.get_cm().get_event_or_persistence_needed_future(),
chain_monitor.get_update_future(),
chain_monitor.get_cm().get_update_future(),
];
let futures = always_futures.into_iter().chain(om_fut).chain(lm_fut).chain(gv_fut);
let sleeper = Sleeper::from_futures(futures);
Expand Down Expand Up @@ -1701,7 +1716,7 @@ impl BackgroundProcessor {
let archive_timer_elapsed = last_archive_call.elapsed() > archive_timer;
if archive_timer_elapsed {
log_trace!(logger, "Archiving stale ChannelMonitors.");
chain_monitor.archive_fully_resolved_channel_monitors();
chain_monitor.get_cm().archive_fully_resolved_channel_monitors();
have_archived = true;
last_archive_call = Instant::now();
log_trace!(logger, "Archived stale ChannelMonitors.");
Expand Down Expand Up @@ -1786,7 +1801,7 @@ impl BackgroundProcessor {
}
if last_rebroadcast_call.elapsed() > REBROADCAST_TIMER {
log_trace!(logger, "Rebroadcasting monitor's pending claims");
chain_monitor.rebroadcast_pending_claims();
chain_monitor.get_cm().rebroadcast_pending_claims();
last_rebroadcast_call = Instant::now();
}
}
Expand Down
60 changes: 60 additions & 0 deletions lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1488,6 +1488,66 @@ where
}
}

/// A trivial trait which describes any [`ChainMonitor`].
///
/// This is not exported to bindings users as general cover traits aren't useful in other
/// languages.
pub trait AChainMonitor {
/// A type implementing [`EcdsaChannelSigner`].
type Signer: EcdsaChannelSigner + Sized;
/// A type implementing [`chain::Filter`].
type Filter: chain::Filter;
/// A type implementing [`BroadcasterInterface`].
type Broadcaster: BroadcasterInterface;
/// A type implementing [`FeeEstimator`].
type FeeEstimator: FeeEstimator;
/// A type implementing [`Logger`].
type Logger: Logger;
/// A type that derefs to [`Persist`].
type Persister: Deref<Target = Self::PersisterTarget>;
/// The target of [`Self::Persister`].
type PersisterTarget: Persist<Self::Signer> + ?Sized;
/// A type implementing [`EntropySource`].
type EntropySource: EntropySource;
/// Returns a reference to the actual [`ChainMonitor`] object.
fn get_cm(
&self,
) -> &ChainMonitor<
Self::Signer,
Self::Filter,
Self::Broadcaster,
Self::FeeEstimator,
Self::Logger,
Self::Persister,
Self::EntropySource,
>;
}

impl<
ChannelSigner: EcdsaChannelSigner,
C: chain::Filter,
T: BroadcasterInterface,
F: FeeEstimator,
L: Logger,
P: Deref,
ES: EntropySource,
> AChainMonitor for ChainMonitor<ChannelSigner, C, T, F, L, P, ES>
where
P::Target: Persist<ChannelSigner>,
{
type Signer = ChannelSigner;
type Filter = C;
type Broadcaster = T;
type FeeEstimator = F;
type Logger = L;
type Persister = P;
type PersisterTarget = P::Target;
type EntropySource = ES;
fn get_cm(&self) -> &ChainMonitor<ChannelSigner, C, T, F, L, P, ES> {
self
}
}

#[cfg(test)]
mod tests {
use crate::chain::channelmonitor::ANTI_REORG_DELAY;
Expand Down
Loading