Skip to content

Commit

Permalink
f - Move ChainListener to lightning crate and implement there
Browse files Browse the repository at this point in the history
  • Loading branch information
jkczyz committed Feb 23, 2021
1 parent a9aeeb2 commit 5e4b218
Show file tree
Hide file tree
Showing 6 changed files with 132 additions and 112 deletions.
92 changes: 8 additions & 84 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,11 @@
use crate::{BlockSource, BlockSourceResult, Cache, ChainListener, ChainNotifier};
use crate::{BlockSource, BlockSourceResult, Cache, ChainNotifier};
use crate::poll::{ChainPoller, Validate, ValidatedBlockHeader};

use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::hash_types::BlockHash;
use bitcoin::network::constants::Network;

use lightning::chain;
use lightning::chain::chainmonitor::ChainMonitor;
use lightning::chain::chaininterface;
use lightning::chain::channelmonitor;
use lightning::chain::channelmonitor::ChannelMonitor;
use lightning::chain::keysinterface;
use lightning::ln::channelmanager::ChannelManager;
use lightning::util::logger;

use std::cell::RefCell;
use std::ops::Deref;
use lightning::chain::ChainListener;

/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
Expand All @@ -32,6 +22,7 @@ use std::ops::Deref;
/// use bitcoin::network::constants::Network;
///
/// use lightning::chain;
/// use lightning::chain::ChainListener;
/// use lightning::chain::Watch;
/// use lightning::chain::chainmonitor::ChainMonitor;
/// use lightning::chain::channelmonitor;
Expand Down Expand Up @@ -90,11 +81,11 @@ use std::ops::Deref;
/// };
///
/// let mut cache = UnboundedCache::new();
/// let mut monitor_listener = (RefCell::new(monitor), tx_broadcaster, fee_estimator, logger);
/// let mut manager_listener = &manager;
/// let mut monitor_listener = (RefCell::new(monitor), &*tx_broadcaster, &*fee_estimator, &*logger);
/// let mut manager_listener = &mut manager;
/// let listeners = vec![
/// (monitor_block_hash, &mut monitor_listener as &mut dyn ChainListener),
/// (manager_block_hash, &mut manager_listener as &mut dyn ChainListener),
/// (manager_block_hash, manager_listener as &mut dyn ChainListener),
/// ];
/// let chain_tip =
/// init::sync_listeners(block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
Expand All @@ -103,8 +94,8 @@ use std::ops::Deref;
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
///
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
/// let chain_listener = (&chain_monitor, &manager_listener);
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, chain_listener);
/// let mut chain_listener = (chain_monitor, manager_listener);
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, &chain_listener);
/// }
/// ```
///
Expand Down Expand Up @@ -223,73 +214,6 @@ impl<'a> ChainListener for ChainListenerSet<'a> {
}
}

impl<S, B: Deref, F: Deref, L: Deref> ChainListener for (RefCell<ChannelMonitor<S>>, B, F, L)
where
S: keysinterface::Sign,
B::Target: chaininterface::BroadcasterInterface,
F::Target: chaininterface::FeeEstimator,
L::Target: logger::Logger,
{
fn block_connected(&self, block: &Block, height: u32) {
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
self.0.borrow_mut().block_connected(&block.header, &txdata, height, &*self.1, &*self.2, &*self.3);
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
self.0.borrow_mut().block_disconnected(header, height, &*self.1, &*self.2, &*self.3);
}
}

impl<S, M: Deref, B: Deref, K: Deref, F: Deref, L: Deref> ChainListener for &ChannelManager<S, M, B, K, F, L>
where
S: keysinterface::Sign,
M::Target: chain::Watch<S>,
B::Target: chaininterface::BroadcasterInterface,
K::Target: keysinterface::KeysInterface<Signer = S>,
F::Target: chaininterface::FeeEstimator,
L::Target: logger::Logger,
{
fn block_connected(&self, block: &Block, height: u32) {
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
ChannelManager::block_connected(self, &block.header, &txdata, height);
}

fn block_disconnected(&self, header: &BlockHeader, _height: u32) {
ChannelManager::block_disconnected(self, header);
}
}

impl<S, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainListener for &ChainMonitor<S, C, T, F, L, P>
where
S: keysinterface::Sign,
C::Target: chain::Filter,
T::Target: chaininterface::BroadcasterInterface,
F::Target: chaininterface::FeeEstimator,
L::Target: logger::Logger,
P::Target: channelmonitor::Persist<S>,
{
fn block_connected(&self, block: &Block, height: u32) {
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
ChainMonitor::block_connected(self, &block.header, &txdata, height);
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
ChainMonitor::block_disconnected(self, header, height);
}
}

impl<T: ChainListener, U: ChainListener> ChainListener for (&T, &U) {
fn block_connected(&self, block: &Block, height: u32) {
self.0.block_connected(block, height);
self.1.block_connected(block, height);
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
self.0.block_disconnected(header, height);
self.1.block_disconnected(header, height);
}
}

#[cfg(test)]
mod tests {
use crate::test_utils::{Blockchain, MockChainListener};
Expand Down
50 changes: 25 additions & 25 deletions lightning-block-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,10 @@ use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::hash_types::BlockHash;
use bitcoin::util::uint::Uint256;

use lightning::chain::ChainListener;

use std::future::Future;
use std::ops::Deref;
use std::pin::Pin;

/// Abstract type for retrieving block headers and data.
Expand Down Expand Up @@ -155,24 +158,14 @@ pub struct BlockHeaderData {
/// custom cache eviction policy. This offers flexibility to those sensitive to resource usage.
/// Hence, there is a trade-off between a lower memory footprint and potentially increased network
/// I/O as headers are re-fetched during fork detection.
pub struct SpvClient<'a, P: Poll, C: Cache, L: ChainListener> {
pub struct SpvClient<'a, P: Poll, C: Cache, L: Deref>
where L::Target: ChainListener {
chain_tip: ValidatedBlockHeader,
chain_poller: P,
chain_notifier: ChainNotifier<'a, C>,
chain_listener: L,
}

/// Adaptor used for notifying when blocks have been connected or disconnected from the chain.
///
/// Used when needing to replay chain data upon startup or as new chain events occur.
pub trait ChainListener {
/// Notifies the listener that a block was added at the given height.
fn block_connected(&self, block: &Block, height: u32);

/// Notifies the listener that a block was removed at the given height.
fn block_disconnected(&self, header: &BlockHeader, height: u32);
}

/// The `Cache` trait defines behavior for managing a block header cache, where block headers are
/// keyed by block hash.
///
Expand Down Expand Up @@ -215,7 +208,8 @@ impl Cache for UnboundedCache {
}
}

impl<'a, P: Poll, C: Cache, L: ChainListener> SpvClient<'a, P, C, L> {
impl<'a, P: Poll, C: Cache, L: Deref> SpvClient<'a, P, C, L>
where L::Target: ChainListener {
/// Creates a new SPV client using `chain_tip` as the best known chain tip.
///
/// Subsequent calls to [`poll_best_tip`] will poll for the best chain tip using the given chain
Expand All @@ -241,7 +235,7 @@ impl<'a, P: Poll, C: Cache, L: ChainListener> SpvClient<'a, P, C, L> {
///
/// Returns the best polled chain tip relative to the previous best known tip and whether any
/// blocks were indeed connected or disconnected.
pub async fn poll_best_tip(&mut self) -> BlockSourceResult<(ChainTip, bool)> {
pub async fn poll_best_tip(&mut self) -> BlockSourceResult<(ChainTip, bool)> where <L as std::ops::Deref>::Target: std::marker::Sized {
let chain_tip = self.chain_poller.poll_chain_tip(self.chain_tip).await?;
let blocks_connected = match chain_tip {
ChainTip::Common => false,
Expand All @@ -261,8 +255,8 @@ impl<'a, P: Poll, C: Cache, L: ChainListener> SpvClient<'a, P, C, L> {

/// Updates the chain tip, syncing the chain listener with any connected or disconnected
/// blocks. Returns whether there were any such blocks.
async fn update_chain_tip(&mut self, best_chain_tip: ValidatedBlockHeader) -> bool {
match self.chain_notifier.sync_listener(best_chain_tip, &self.chain_tip, &mut self.chain_poller, &mut self.chain_listener).await {
async fn update_chain_tip(&mut self, best_chain_tip: ValidatedBlockHeader) -> bool where <L as std::ops::Deref>::Target: std::marker::Sized {
match self.chain_notifier.sync_listener(best_chain_tip, &self.chain_tip, &mut self.chain_poller, &*self.chain_listener).await {
Ok(_) => {
self.chain_tip = best_chain_tip;
true
Expand Down Expand Up @@ -315,7 +309,7 @@ impl<'a, C: Cache> ChainNotifier<'a, C> {
new_header: ValidatedBlockHeader,
old_header: &ValidatedBlockHeader,
chain_poller: &mut P,
chain_listener: &mut L,
chain_listener: &L,
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
let difference = self.find_difference(new_header, old_header, chain_poller).await
.map_err(|e| (e, None))?;
Expand Down Expand Up @@ -383,7 +377,7 @@ impl<'a, C: Cache> ChainNotifier<'a, C> {
fn disconnect_blocks<L: ChainListener>(
&mut self,
mut disconnected_blocks: Vec<ValidatedBlockHeader>,
chain_listener: &mut L,
chain_listener: &L,
) {
for header in disconnected_blocks.drain(..) {
if let Some(cached_header) = self.header_cache.block_disconnected(&header.block_hash) {
Expand All @@ -399,7 +393,7 @@ impl<'a, C: Cache> ChainNotifier<'a, C> {
mut new_tip: ValidatedBlockHeader,
mut connected_blocks: Vec<ValidatedBlockHeader>,
chain_poller: &mut P,
chain_listener: &mut L,
chain_listener: &L,
) -> Result<(), (BlockSourceError, Option<ValidatedBlockHeader>)> {
for header in connected_blocks.drain(..).rev() {
let block = chain_poller
Expand Down Expand Up @@ -430,7 +424,8 @@ mod spv_client_tests {

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let mut client = SpvClient::new(best_tip, poller, &mut cache, NullChainListener {});
let mut listener = NullChainListener {};
let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => {
assert_eq!(e.kind(), BlockSourceErrorKind::Persistent);
Expand All @@ -448,7 +443,8 @@ mod spv_client_tests {

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let mut client = SpvClient::new(common_tip, poller, &mut cache, NullChainListener {});
let mut listener = NullChainListener {};
let mut client = SpvClient::new(common_tip, poller, &mut cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -467,7 +463,8 @@ mod spv_client_tests {

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let mut client = SpvClient::new(old_tip, poller, &mut cache, NullChainListener {});
let mut listener = NullChainListener {};
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -486,7 +483,8 @@ mod spv_client_tests {

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let mut client = SpvClient::new(old_tip, poller, &mut cache, NullChainListener {});
let mut listener = NullChainListener {};
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -505,7 +503,8 @@ mod spv_client_tests {

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let mut client = SpvClient::new(old_tip, poller, &mut cache, NullChainListener {});
let mut listener = NullChainListener {};
let mut client = SpvClient::new(old_tip, poller, &mut cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand All @@ -525,7 +524,8 @@ mod spv_client_tests {

let poller = poll::ChainPoller::new(&mut chain, Network::Testnet);
let mut cache = UnboundedCache::new();
let mut client = SpvClient::new(best_tip, poller, &mut cache, NullChainListener {});
let mut listener = NullChainListener {};
let mut client = SpvClient::new(best_tip, poller, &mut cache, &mut listener);
match client.poll_best_tip().await {
Err(e) => panic!("Unexpected error: {:?}", e),
Ok((chain_tip, blocks_connected)) => {
Expand Down
23 changes: 22 additions & 1 deletion lightning/src/chain/chainmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@
//! [`ChannelMonitor`]: ../channelmonitor/struct.ChannelMonitor.html
//! [`MonitorEvent`]: ../channelmonitor/enum.MonitorEvent.html

use bitcoin::blockdata::block::BlockHeader;
use bitcoin::blockdata::block::{Block, BlockHeader};

use chain;
use chain::ChainListener;
use chain::Filter;
use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use chain::channelmonitor;
Expand Down Expand Up @@ -140,6 +141,26 @@ where C::Target: chain::Filter,
}
}

impl<ChannelSigner: Sign, C: Deref + Send + Sync, T: Deref + Send + Sync, F: Deref + Send + Sync, L: Deref + Send + Sync, P: Deref + Send + Sync>
ChainListener for ChainMonitor<ChannelSigner, C, T, F, L, P>
where
ChannelSigner: Sign,
C::Target: chain::Filter,
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
P::Target: channelmonitor::Persist<ChannelSigner>,
{
fn block_connected(&self, block: &Block, height: u32) {
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
ChainMonitor::block_connected(self, &block.header, &txdata, height);
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
ChainMonitor::block_disconnected(self, header, height);
}
}

impl<ChannelSigner: Sign, C: Deref + Sync + Send, T: Deref + Sync + Send, F: Deref + Sync + Send, L: Deref + Sync + Send, P: Deref + Sync + Send>
chain::Watch<ChannelSigner> for ChainMonitor<ChannelSigner, C, T, F, L, P>
where C::Target: chain::Filter,
Expand Down
20 changes: 19 additions & 1 deletion lightning/src/chain/channelmonitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
//!
//! [`chain::Watch`]: ../trait.Watch.html

use bitcoin::blockdata::block::BlockHeader;
use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::blockdata::transaction::{TxOut,Transaction};
use bitcoin::blockdata::transaction::OutPoint as BitcoinOutPoint;
use bitcoin::blockdata::script::{Script, Builder};
Expand All @@ -41,6 +41,7 @@ use ln::chan_utils;
use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, HTLCType, ChannelTransactionParameters, HolderCommitmentTransaction};
use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash};
use ln::onchaintx::{OnchainTxHandler, InputDescriptors};
use chain::ChainListener;
use chain::chaininterface::{BroadcasterInterface, FeeEstimator};
use chain::transaction::{OutPoint, TransactionData};
use chain::keysinterface::{SpendableOutputDescriptor, StaticPaymentOutputDescriptor, DelayedPaymentOutputDescriptor, Sign, KeysInterface};
Expand All @@ -49,6 +50,7 @@ use util::ser::{Readable, ReadableArgs, MaybeReadable, Writer, Writeable, U48};
use util::byte_utils;
use util::events::Event;

use std::cell::RefCell;
use std::collections::{HashMap, HashSet, hash_map};
use std::{cmp, mem};
use std::ops::Deref;
Expand Down Expand Up @@ -2297,6 +2299,22 @@ pub trait Persist<ChannelSigner: Sign>: Send + Sync {
fn update_persisted_channel(&self, id: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor<ChannelSigner>) -> Result<(), ChannelMonitorUpdateErr>;
}

impl<Signer: Sign, T: Deref, F: Deref, L: Deref> ChainListener for (RefCell<ChannelMonitor<Signer>>, T, F, L)
where
T::Target: BroadcasterInterface,
F::Target: FeeEstimator,
L::Target: Logger,
{
fn block_connected(&self, block: &Block, height: u32) {
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
self.0.borrow_mut().block_connected(&block.header, &txdata, height, &*self.1, &*self.2, &*self.3);
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
self.0.borrow_mut().block_disconnected(header, height, &*self.1, &*self.2, &*self.3);
}
}

const MAX_ALLOC_SIZE: usize = 64*1024;

impl<'a, Signer: Sign, K: KeysInterface<Signer = Signer>> ReadableArgs<&'a K>
Expand Down

0 comments on commit 5e4b218

Please sign in to comment.