Skip to content

Commit

Permalink
WIP: Common ChainListener implementations and example
Browse files Browse the repository at this point in the history
  • Loading branch information
jkczyz committed Feb 22, 2021
1 parent 6be8ccb commit a9aeeb2
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 26 deletions.
172 changes: 166 additions & 6 deletions lightning-block-sync/src/init.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ use bitcoin::blockdata::block::{Block, BlockHeader};
use bitcoin::hash_types::BlockHash;
use bitcoin::network::constants::Network;

use lightning::chain;
use lightning::chain::chainmonitor::ChainMonitor;
use lightning::chain::chaininterface;
use lightning::chain::channelmonitor;
use lightning::chain::channelmonitor::ChannelMonitor;
use lightning::chain::keysinterface;
use lightning::ln::channelmanager::ChannelManager;
use lightning::util::logger;

use std::cell::RefCell;
use std::ops::Deref;

/// Performs a one-time sync of chain listeners using a single *trusted* block source, bringing each
/// listener's view of the chain from its paired block hash to `block_source`'s best chain tip.
///
Expand All @@ -13,7 +25,88 @@ use bitcoin::network::constants::Network;
/// paired with.
///
/// Useful during startup to bring the [`ChannelManager`] and each [`ChannelMonitor`] in sync before
/// switching to [`SpvClient`].
/// switching to [`SpvClient`]. For example:
///
/// ```
/// use bitcoin::hash_types::BlockHash;
/// use bitcoin::network::constants::Network;
///
/// use lightning::chain;
/// use lightning::chain::Watch;
/// use lightning::chain::chainmonitor::ChainMonitor;
/// use lightning::chain::channelmonitor;
/// use lightning::chain::channelmonitor::ChannelMonitor;
/// use lightning::chain::chaininterface::BroadcasterInterface;
/// use lightning::chain::chaininterface::FeeEstimator;
/// use lightning::chain::keysinterface;
/// use lightning::chain::keysinterface::KeysInterface;
/// use lightning::ln::channelmanager::ChannelManager;
/// use lightning::ln::channelmanager::ChannelManagerReadArgs;
/// use lightning::util::config::UserConfig;
/// use lightning::util::logger::Logger;
/// use lightning::util::ser::ReadableArgs;
///
/// use lightning_block_sync::*;
///
/// use std::cell::RefCell;
/// use std::io::Cursor;
///
/// async fn init_sync<
/// B: BlockSource,
/// K: KeysInterface<Signer = S>,
/// S: keysinterface::Sign,
/// T: BroadcasterInterface,
/// F: FeeEstimator,
/// L: Logger,
/// C: chain::Filter,
/// P: channelmonitor::Persist<S>,
/// >(
/// block_source: &mut B,
/// chain_monitor: &ChainMonitor<S, &C, &T, &F, &L, &P>,
/// config: UserConfig,
/// keys_manager: &K,
/// tx_broadcaster: &T,
/// fee_estimator: &F,
/// logger: &L,
/// persister: &P,
/// ) {
/// let serialized_monitor = "...";
/// let (monitor_block_hash, mut monitor) = <(BlockHash, ChannelMonitor<S>)>::read(
/// &mut Cursor::new(&serialized_monitor), keys_manager).unwrap();
///
/// let serialized_manager = "...";
/// let (manager_block_hash, mut manager) = {
/// let read_args = ChannelManagerReadArgs::new(
/// keys_manager,
/// fee_estimator,
/// chain_monitor,
/// tx_broadcaster,
/// logger,
/// config,
/// vec![&mut monitor],
/// );
/// <(BlockHash, ChannelManager<S, &ChainMonitor<S, &C, &T, &F, &L, &P>, &T, &K, &F, &L>)>::read(
/// &mut Cursor::new(&serialized_manager), read_args).unwrap()
/// };
///
/// let mut cache = UnboundedCache::new();
/// let mut monitor_listener = (RefCell::new(monitor), tx_broadcaster, fee_estimator, logger);
/// let mut manager_listener = &manager;
/// let listeners = vec![
/// (monitor_block_hash, &mut monitor_listener as &mut dyn ChainListener),
/// (manager_block_hash, &mut manager_listener as &mut dyn ChainListener),
/// ];
/// let chain_tip =
/// init::sync_listeners(block_source, Network::Bitcoin, &mut cache, listeners).await.unwrap();
///
/// let monitor = monitor_listener.0.into_inner();
/// chain_monitor.watch_channel(monitor.get_funding_txo().0, monitor);
///
/// let chain_poller = poll::ChainPoller::new(block_source, Network::Bitcoin);
/// let chain_listener = (&chain_monitor, &manager_listener);
/// let spv_client = SpvClient::new(chain_tip, chain_poller, &mut cache, chain_listener);
/// }
/// ```
///
/// [`SpvClient`]: ../struct.SpvClient.html
/// [`ChannelManager`]: ../../lightning/ln/channelmanager/struct.ChannelManager.html
Expand Down Expand Up @@ -104,11 +197,11 @@ impl<'a, C: Cache> Cache for ReadOnlyCache<'a, C> {
struct DynamicChainListener<'a>(&'a mut dyn ChainListener);

impl<'a> ChainListener for DynamicChainListener<'a> {
fn block_connected(&mut self, _block: &Block, _height: u32) {
fn block_connected(&self, _block: &Block, _height: u32) {
unreachable!()
}

fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
self.0.block_disconnected(header, height)
}
}
Expand All @@ -117,19 +210,86 @@ impl<'a> ChainListener for DynamicChainListener<'a> {
struct ChainListenerSet<'a>(Vec<(u32, &'a mut dyn ChainListener)>);

impl<'a> ChainListener for ChainListenerSet<'a> {
fn block_connected(&mut self, block: &Block, height: u32) {
for (starting_height, chain_listener) in self.0.iter_mut() {
fn block_connected(&self, block: &Block, height: u32) {
for (starting_height, chain_listener) in self.0.iter() {
if height > *starting_height {
chain_listener.block_connected(block, height);
}
}
}

fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {
fn block_disconnected(&self, _header: &BlockHeader, _height: u32) {
unreachable!()
}
}

impl<S, B: Deref, F: Deref, L: Deref> ChainListener for (RefCell<ChannelMonitor<S>>, B, F, L)
where
S: keysinterface::Sign,
B::Target: chaininterface::BroadcasterInterface,
F::Target: chaininterface::FeeEstimator,
L::Target: logger::Logger,
{
fn block_connected(&self, block: &Block, height: u32) {
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
self.0.borrow_mut().block_connected(&block.header, &txdata, height, &*self.1, &*self.2, &*self.3);
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
self.0.borrow_mut().block_disconnected(header, height, &*self.1, &*self.2, &*self.3);
}
}

impl<S, M: Deref, B: Deref, K: Deref, F: Deref, L: Deref> ChainListener for &ChannelManager<S, M, B, K, F, L>
where
S: keysinterface::Sign,
M::Target: chain::Watch<S>,
B::Target: chaininterface::BroadcasterInterface,
K::Target: keysinterface::KeysInterface<Signer = S>,
F::Target: chaininterface::FeeEstimator,
L::Target: logger::Logger,
{
fn block_connected(&self, block: &Block, height: u32) {
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
ChannelManager::block_connected(self, &block.header, &txdata, height);
}

fn block_disconnected(&self, header: &BlockHeader, _height: u32) {
ChannelManager::block_disconnected(self, header);
}
}

impl<S, C: Deref, T: Deref, F: Deref, L: Deref, P: Deref> ChainListener for &ChainMonitor<S, C, T, F, L, P>
where
S: keysinterface::Sign,
C::Target: chain::Filter,
T::Target: chaininterface::BroadcasterInterface,
F::Target: chaininterface::FeeEstimator,
L::Target: logger::Logger,
P::Target: channelmonitor::Persist<S>,
{
fn block_connected(&self, block: &Block, height: u32) {
let txdata: Vec<_> = block.txdata.iter().enumerate().collect();
ChainMonitor::block_connected(self, &block.header, &txdata, height);
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
ChainMonitor::block_disconnected(self, header, height);
}
}

impl<T: ChainListener, U: ChainListener> ChainListener for (&T, &U) {
fn block_connected(&self, block: &Block, height: u32) {
self.0.block_connected(block, height);
self.1.block_connected(block, height);
}

fn block_disconnected(&self, header: &BlockHeader, height: u32) {
self.0.block_disconnected(header, height);
self.1.block_disconnected(header, height);
}
}

#[cfg(test)]
mod tests {
use crate::test_utils::{Blockchain, MockChainListener};
Expand Down
4 changes: 2 additions & 2 deletions lightning-block-sync/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,10 @@ pub struct SpvClient<'a, P: Poll, C: Cache, L: ChainListener> {
/// Used when needing to replay chain data upon startup or as new chain events occur.
pub trait ChainListener {
/// Notifies the listener that a block was added at the given height.
fn block_connected(&mut self, block: &Block, height: u32);
fn block_connected(&self, block: &Block, height: u32);

/// Notifies the listener that a block was removed at the given height.
fn block_disconnected(&mut self, header: &BlockHeader, height: u32);
fn block_disconnected(&self, header: &BlockHeader, height: u32);
}

/// The `Cache` trait defines behavior for managing a block header cache, where block headers are
Expand Down
41 changes: 23 additions & 18 deletions lightning-block-sync/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use bitcoin::hash_types::BlockHash;
use bitcoin::network::constants::Network;
use bitcoin::util::uint::Uint256;

use std::cell::RefCell;
use std::collections::VecDeque;

#[derive(Default)]
Expand Down Expand Up @@ -163,37 +164,37 @@ impl BlockSource for Blockchain {
pub struct NullChainListener;

impl ChainListener for NullChainListener {
fn block_connected(&mut self, _block: &Block, _height: u32) {}
fn block_disconnected(&mut self, _header: &BlockHeader, _height: u32) {}
fn block_connected(&self, _block: &Block, _height: u32) {}
fn block_disconnected(&self, _header: &BlockHeader, _height: u32) {}
}

pub struct MockChainListener {
expected_blocks_connected: VecDeque<BlockHeaderData>,
expected_blocks_disconnected: VecDeque<BlockHeaderData>,
expected_blocks_connected: RefCell<VecDeque<BlockHeaderData>>,
expected_blocks_disconnected: RefCell<VecDeque<BlockHeaderData>>,
}

impl MockChainListener {
pub fn new() -> Self {
Self {
expected_blocks_connected: VecDeque::new(),
expected_blocks_disconnected: VecDeque::new(),
expected_blocks_connected: RefCell::new(VecDeque::new()),
expected_blocks_disconnected: RefCell::new(VecDeque::new()),
}
}

pub fn expect_block_connected(mut self, block: BlockHeaderData) -> Self {
self.expected_blocks_connected.push_back(block);
pub fn expect_block_connected(self, block: BlockHeaderData) -> Self {
self.expected_blocks_connected.borrow_mut().push_back(block);
self
}

pub fn expect_block_disconnected(mut self, block: BlockHeaderData) -> Self {
self.expected_blocks_disconnected.push_back(block);
pub fn expect_block_disconnected(self, block: BlockHeaderData) -> Self {
self.expected_blocks_disconnected.borrow_mut().push_back(block);
self
}
}

impl ChainListener for MockChainListener {
fn block_connected(&mut self, block: &Block, height: u32) {
match self.expected_blocks_connected.pop_front() {
fn block_connected(&self, block: &Block, height: u32) {
match self.expected_blocks_connected.borrow_mut().pop_front() {
None => {
panic!("Unexpected block connected: {:?}", block.block_hash());
},
Expand All @@ -204,8 +205,8 @@ impl ChainListener for MockChainListener {
}
}

fn block_disconnected(&mut self, header: &BlockHeader, height: u32) {
match self.expected_blocks_disconnected.pop_front() {
fn block_disconnected(&self, header: &BlockHeader, height: u32) {
match self.expected_blocks_disconnected.borrow_mut().pop_front() {
None => {
panic!("Unexpected block disconnected: {:?}", header.block_hash());
},
Expand All @@ -222,11 +223,15 @@ impl Drop for MockChainListener {
if std::thread::panicking() {
return;
}
if !self.expected_blocks_connected.is_empty() {
panic!("Expected blocks connected: {:?}", self.expected_blocks_connected);

let expected_blocks_connected = self.expected_blocks_connected.borrow();
if !expected_blocks_connected.is_empty() {
panic!("Expected blocks connected: {:?}", expected_blocks_connected);
}
if !self.expected_blocks_disconnected.is_empty() {
panic!("Expected blocks disconnected: {:?}", self.expected_blocks_disconnected);

let expected_blocks_disconnected = self.expected_blocks_disconnected.borrow();
if !expected_blocks_disconnected.is_empty() {
panic!("Expected blocks disconnected: {:?}", expected_blocks_disconnected);
}
}
}

0 comments on commit a9aeeb2

Please sign in to comment.