diff --git a/Cargo.toml b/Cargo.toml index 222de893aa4..f4df9a31707 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,9 @@ [workspace] members = [ - "lightning", - "lightning-net-tokio", + "lightning", + "lightning-net-tokio", + "lightning-data-persister", ] # Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it diff --git a/fuzz/src/chanmon_consistency.rs b/fuzz/src/chanmon_consistency.rs index 3191eb27308..9286e438cfb 100644 --- a/fuzz/src/chanmon_consistency.rs +++ b/fuzz/src/chanmon_consistency.rs @@ -34,6 +34,7 @@ use lightning::chain::transaction::OutPoint; use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,ChainListener,FeeEstimator,ChainWatchInterfaceUtil,ChainWatchInterface}; use lightning::chain::keysinterface::{KeysInterface, InMemoryChannelKeys}; use lightning::ln::channelmonitor; +use lightning::ln::data_persister::ChannelDataPersister; use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, MonitorEvent}; use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, ChannelManagerReadArgs}; use lightning::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures}; @@ -48,6 +49,7 @@ use lightning::routing::router::{Route, RouteHop}; use utils::test_logger; +use utils::test_data_persister::TestChanDataPersister; use bitcoin::secp256k1::key::{PublicKey,SecretKey}; use bitcoin::secp256k1::Secp256k1; @@ -84,7 +86,7 @@ impl Writer for VecWriter { struct TestChannelMonitor { pub logger: Arc, - pub simple_monitor: Arc, Arc, Arc, Arc>>, + pub simple_monitor: Arc, Arc, Arc, Arc, Arc>>>, pub update_ret: Mutex>, // If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization // logic will automatically force-close our channels for us (as we don't have an up-to-date @@ -95,9 +97,9 @@ struct TestChannelMonitor { pub should_update_manager: atomic::AtomicBool, } impl TestChannelMonitor { - pub fn new(chain_monitor: Arc, broadcaster: Arc, logger: Arc, feeest: Arc) -> Self { + pub fn new(chain_monitor: Arc, broadcaster: Arc, logger: Arc, feeest: Arc, data_persister: Arc>) -> Self { Self { - simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest)), + simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest, data_persister)), logger, update_ret: Mutex::new(Ok(())), latest_monitors: Mutex::new(HashMap::new()), @@ -193,7 +195,8 @@ pub fn do_test(data: &[u8], out: Out) { ($node_id: expr) => { { let logger: Arc = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone())); let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin)); - let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone())); + let data_persister = Arc::new(TestChanDataPersister{}); + let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone(), data_persister.clone())); let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) }); let mut config = UserConfig::default(); @@ -209,7 +212,8 @@ pub fn do_test(data: &[u8], out: Out) { ($ser: expr, $node_id: expr, $old_monitors: expr) => { { let logger: Arc = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone())); let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin)); - let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone())); + let data_persister = Arc::new(TestChanDataPersister{}); + let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone(), data_persister.clone())); let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) }); let mut config = UserConfig::default(); diff --git a/fuzz/src/full_stack.rs b/fuzz/src/full_stack.rs index 543357d39bc..9cfe11d6e21 100644 --- a/fuzz/src/full_stack.rs +++ b/fuzz/src/full_stack.rs @@ -30,6 +30,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,C use lightning::chain::transaction::OutPoint; use lightning::chain::keysinterface::{InMemoryChannelKeys, KeysInterface}; use lightning::ln::channelmonitor; +use lightning::ln::data_persister::ChannelDataPersister; use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret}; use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor}; use lightning::routing::router::get_route; @@ -40,6 +41,7 @@ use lightning::util::logger::Logger; use lightning::util::config::UserConfig; use utils::test_logger; +use utils::test_data_persister::TestChanDataPersister; use bitcoin::secp256k1::key::{PublicKey,SecretKey}; use bitcoin::secp256k1::Secp256k1; @@ -145,14 +147,14 @@ impl<'a> std::hash::Hash for Peer<'a> { type ChannelMan = ChannelManager< EnforcingChannelKeys, - Arc, Arc, Arc, Arc>>, + Arc, Arc, Arc, Arc, Arc>>>, Arc, Arc, Arc, Arc>; type PeerMan<'a> = PeerManager, Arc, Arc, Arc>>, Arc>; struct MoneyLossDetector<'a> { manager: Arc, monitor: Arc, Arc, Arc, Arc>>, + OutPoint, EnforcingChannelKeys, Arc, Arc, Arc, Arc, Arc>>>, handler: PeerMan<'a>, peers: &'a RefCell<[bool; 256]>, @@ -166,7 +168,7 @@ struct MoneyLossDetector<'a> { impl<'a> MoneyLossDetector<'a> { pub fn new(peers: &'a RefCell<[bool; 256]>, manager: Arc, - monitor: Arc, Arc, Arc, Arc>>, + monitor: Arc, Arc, Arc, Arc, Arc>>>, handler: PeerMan<'a>) -> Self { MoneyLossDetector { manager, @@ -337,7 +339,8 @@ pub fn do_test(data: &[u8], logger: &Arc) { let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin)); let broadcast = Arc::new(TestBroadcaster{}); - let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone())); + let data_persister: Arc> = Arc::new(TestChanDataPersister{}); + let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone(), data_persister.clone())); let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) }); let mut config = UserConfig::default(); diff --git a/fuzz/src/utils/mod.rs b/fuzz/src/utils/mod.rs index bb5b00a5b54..4d11aa9b184 100644 --- a/fuzz/src/utils/mod.rs +++ b/fuzz/src/utils/mod.rs @@ -8,3 +8,4 @@ // licenses. pub mod test_logger; +pub mod test_data_persister; diff --git a/fuzz/src/utils/test_data_persister.rs b/fuzz/src/utils/test_data_persister.rs new file mode 100644 index 00000000000..952227c212d --- /dev/null +++ b/fuzz/src/utils/test_data_persister.rs @@ -0,0 +1,23 @@ +use lightning::ln::data_persister::ChannelDataPersister; +use lightning::ln::channelmonitor; +use lightning::chain::transaction::OutPoint; +use lightning::util::enforcing_trait_impls::EnforcingChannelKeys; + +use std::collections::HashMap; + +pub struct TestChanDataPersister {} +impl ChannelDataPersister for TestChanDataPersister { + type Keys = EnforcingChannelKeys; + + fn persist_channel_data(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { + Ok(()) + } + + fn update_channel_data(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { + Ok(()) + } + + fn load_channel_data(&self) -> Result>, channelmonitor::ChannelMonitorUpdateErr> { + Ok(HashMap::new()) + } +} diff --git a/lightning-data-persister/Cargo.toml b/lightning-data-persister/Cargo.toml new file mode 100644 index 00000000000..fe6d56089b0 --- /dev/null +++ b/lightning-data-persister/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "lightning-data-persister" +version = "0.0.1" +authors = ["Valentine Wallace"] +license = "Apache-2.0" +edition = "2018" +description = """ +Utilities to manage channel data persistence and retrieval. +""" + +[dependencies] +bitcoin = "0.23" +lightning = { version = "0.0.11", path = "../lightning" } diff --git a/lightning-data-persister/src/lib.rs b/lightning-data-persister/src/lib.rs new file mode 100644 index 00000000000..503ef71090d --- /dev/null +++ b/lightning-data-persister/src/lib.rs @@ -0,0 +1,124 @@ +use lightning::chain::keysinterface::ChannelKeys; +use lightning::ln::data_persister::ChannelDataPersister; +use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr}; +use lightning::util::ser::{Writeable, Readable}; +use lightning::chain::transaction::OutPoint; +use bitcoin::hash_types::{BlockHash, Txid}; +use bitcoin::hashes::hex::{ToHex, FromHex}; +use std::collections::HashMap; +use std::fs; +use std::io::{Error, ErrorKind, Cursor}; +use std::marker::PhantomData; + +/// LinuxPersister can persist channel data on disk on Linux machines, where +/// each channel's data is stored in a file named after its outpoint. +pub struct LinuxPersister { + path_to_channel_data: String, + phantom: PhantomData, // TODO: is there a way around this? +} + +impl LinuxPersister { + /// Initialize a new LinuxPersister and set the path to the individual channels' + /// files. + pub fn new(path_to_channel_data: String) -> Self { + return Self { + path_to_channel_data, + phantom: PhantomData, + } + } + + fn get_full_filepath(&self, funding_txo: OutPoint) -> String { + format!("{}/{}_{}", self.path_to_channel_data, funding_txo.txid.to_hex(), funding_txo.index) + } + + fn write_channel_data(&self, funding_txo: OutPoint, monitor: &ChannelMonitor) -> std::io::Result<()> { + // Do a crazy dance with lots of fsync()s to be overly cautious here... + // We never want to end up in a state where we've lost the old data, or end up using the + // old data on power loss after we've returned + // Note that this actually *isn't* enough (at least on Linux)! We need to fsync an fd with + // the containing dir, but Rust doesn't let us do that directly, sadly. TODO: Fix this with + // the libc crate! + let filename = self.get_full_filepath(funding_txo); + let tmp_filename = filename.clone() + ".tmp"; + + { + let mut f = fs::File::create(&tmp_filename)?; + monitor.write_for_disk(&mut f)?; + f.sync_all()?; + } + // We don't need to create a backup if didn't already have the file, but in any other case + // try to create the backup and expect failure on fs::copy() if eg there's a perms issue. + let need_bk = match fs::metadata(&filename) { + Ok(data) => { + if !data.is_file() { return Err(Error::new(ErrorKind::InvalidInput, "Filename given was not a file")); } + true + }, + Err(e) => match e.kind() { + std::io::ErrorKind::NotFound => false, + _ => true, + } + }; + let bk_filename = filename.clone() + ".bk"; + if need_bk { + fs::copy(&filename, &bk_filename)?; + { + let f = fs::File::open(&bk_filename)?; + f.sync_all()?; + } + } + fs::rename(&tmp_filename, &filename)?; + { + let f = fs::File::open(&filename)?; + f.sync_all()?; + } + if need_bk { + fs::remove_file(&bk_filename)?; + } + Ok(()) + } +} + +impl ChannelDataPersister for LinuxPersister { + type Keys = ChanSigner; + + fn persist_channel_data(&self, funding_txo: OutPoint, monitor: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { + match self.write_channel_data(funding_txo, monitor) { + Ok(_) => Ok(()), + Err(_) => Err(ChannelMonitorUpdateErr::TemporaryFailure) + } + } + + fn update_channel_data(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { + match self.write_channel_data(funding_txo, monitor) { + Ok(_) => Ok(()), + Err(_) => Err(ChannelMonitorUpdateErr::TemporaryFailure) + } + } + + fn load_channel_data(&self) -> Result>, ChannelMonitorUpdateErr> { + let mut res = HashMap::new(); + for file_option in fs::read_dir(&self.path_to_channel_data).unwrap() { + let mut loaded = false; + let file = file_option.unwrap(); + if let Some(filename) = file.file_name().to_str() { + if filename.is_ascii() && filename.len() > 65 { + if let Ok(txid) = Txid::from_hex(filename.split_at(64).0) { + if let Ok(index) = filename.split_at(65).1.split('.').next().unwrap().parse() { + if let Ok(contents) = fs::read(&file.path()) { + if let Ok((_, loaded_monitor)) = <(BlockHash, ChannelMonitor)>::read(&mut Cursor::new(&contents)) { + res.insert(OutPoint { txid, index }, loaded_monitor); + loaded = true; + } + } + } + } + } + } + if !loaded { + // TODO(val): this should prob error not just print something + println!("WARNING: Failed to read one of the channel monitor storage files! Check perms!"); + } + } + Ok(res) + } +} diff --git a/lightning-net-tokio/src/lib.rs b/lightning-net-tokio/src/lib.rs index a7818d7b85c..db099b5fe3c 100644 --- a/lightning-net-tokio/src/lib.rs +++ b/lightning-net-tokio/src/lib.rs @@ -35,7 +35,8 @@ //! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator; //! type Logger = dyn lightning::util::logger::Logger; //! type ChainWatchInterface = dyn lightning::chain::chaininterface::ChainWatchInterface; -//! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor, Arc, Arc, Arc>; +//! type DataPersister = dyn lightning::ln::data_persister::ChannelDataPersister; +//! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor, Arc, Arc, Arc, Arc>; //! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager; //! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager; //! diff --git a/lightning/src/ln/channelmonitor.rs b/lightning/src/ln/channelmonitor.rs index 570982fb70c..b3e98663bb6 100644 --- a/lightning/src/ln/channelmonitor.rs +++ b/lightning/src/ln/channelmonitor.rs @@ -41,6 +41,7 @@ use ln::chan_utils; use ln::chan_utils::{CounterpartyCommitmentSecrets, HTLCOutputInCommitment, LocalCommitmentTransaction, HTLCType}; use ln::channelmanager::{HTLCSource, PaymentPreimage, PaymentHash}; use ln::onchaintx::{OnchainTxHandler, InputDescriptors}; +use ln::data_persister::ChannelDataPersister; use chain::chaininterface::{ChainListener, ChainWatchInterface, BroadcasterInterface, FeeEstimator}; use chain::transaction::OutPoint; use chain::keysinterface::{SpendableOutputDescriptor, ChannelKeys}; @@ -181,26 +182,28 @@ impl_writeable!(HTLCUpdate, 0, { payment_hash, payment_preimage, source }); /// /// If you're using this for local monitoring of your own channels, you probably want to use /// `OutPoint` as the key, which will give you a ManyChannelMonitor implementation. -pub struct SimpleManyChannelMonitor - where T::Target: BroadcasterInterface, - F::Target: FeeEstimator, - L::Target: Logger, - C::Target: ChainWatchInterface, +pub struct SimpleManyChannelMonitor +where T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + C::Target: ChainWatchInterface, + D::Target: ChannelDataPersister, { /// The monitors pub monitors: Mutex>>, chain_monitor: C, broadcaster: T, logger: L, - fee_estimator: F + fee_estimator: F, + data_persister: D, } -impl - ChainListener for SimpleManyChannelMonitor - where T::Target: BroadcasterInterface, - F::Target: FeeEstimator, - L::Target: Logger, - C::Target: ChainWatchInterface, +impl ChainListener for SimpleManyChannelMonitor +where T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + C::Target: ChainWatchInterface, + D::Target: ChannelDataPersister, { fn block_connected(&self, header: &BlockHeader, height: u32, txn_matched: &[&Transaction], _indexes_of_txn_matched: &[usize]) { let block_hash = header.bitcoin_hash(); @@ -227,21 +230,23 @@ impl SimpleManyChannelMonitor - where T::Target: BroadcasterInterface, - F::Target: FeeEstimator, - L::Target: Logger, - C::Target: ChainWatchInterface, +impl SimpleManyChannelMonitor +where T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + C::Target: ChainWatchInterface, + D::Target: ChannelDataPersister, { /// Creates a new object which can be used to monitor several channels given the chain /// interface with which to register to receive notifications. - pub fn new(chain_monitor: C, broadcaster: T, logger: L, feeest: F) -> SimpleManyChannelMonitor { + pub fn new(chain_monitor: C, broadcaster: T, logger: L, feeest: F, data_persister: D) -> SimpleManyChannelMonitor { let res = SimpleManyChannelMonitor { monitors: Mutex::new(HashMap::new()), chain_monitor, broadcaster, logger, fee_estimator: feeest, + data_persister, }; res @@ -282,22 +287,35 @@ impl ManyChannelMonitor for SimpleManyChannelMonitor - where T::Target: BroadcasterInterface, - F::Target: FeeEstimator, - L::Target: Logger, - C::Target: ChainWatchInterface, +impl ManyChannelMonitor for SimpleManyChannelMonitor +where T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + C::Target: ChainWatchInterface, + D::Target: ChannelDataPersister, { type Keys = ChanSigner; fn add_monitor(&self, funding_txo: OutPoint, monitor: ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr> { match self.add_monitor_by_key(funding_txo, monitor) { - Ok(_) => Ok(()), - Err(_) => Err(ChannelMonitorUpdateErr::PermanentFailure), + Ok(_) => {}, + Err(_) => { return Err(ChannelMonitorUpdateErr::PermanentFailure); }, } + + let monitors = self.monitors.lock().unwrap(); + self.data_persister.persist_channel_data(funding_txo, monitors.get(&funding_txo).unwrap()) } fn update_monitor(&self, funding_txo: OutPoint, update: ChannelMonitorUpdate) -> Result<(), ChannelMonitorUpdateErr> { + { + let monitors = self.monitors.lock().unwrap(); + match monitors.get(&funding_txo) { + Some(monitor) => { + self.data_persister.update_channel_data(funding_txo, &update, monitor)?; + }, + None => { return Err(ChannelMonitorUpdateErr::PermanentFailure); }, + } + } match self.update_monitor_by_key(funding_txo, update) { Ok(_) => Ok(()), Err(_) => Err(ChannelMonitorUpdateErr::PermanentFailure), @@ -313,11 +331,12 @@ impl events::EventsProvider for SimpleManyChannelMonitor - where T::Target: BroadcasterInterface, - F::Target: FeeEstimator, - L::Target: Logger, - C::Target: ChainWatchInterface, +impl events::EventsProvider for SimpleManyChannelMonitor +where T::Target: BroadcasterInterface, + F::Target: FeeEstimator, + L::Target: Logger, + C::Target: ChainWatchInterface, + D::Target: ChannelDataPersister, { fn get_and_clear_pending_events(&self) -> Vec { let mut pending_events = Vec::new(); diff --git a/lightning/src/ln/data_persister.rs b/lightning/src/ln/data_persister.rs new file mode 100644 index 00000000000..9bd4dfbf23b --- /dev/null +++ b/lightning/src/ln/data_persister.rs @@ -0,0 +1,27 @@ +//! Logic for persisting data from ChannelMonitors on-disk. Per-platform data +//! persisters are separated into the lightning-persist-data crate. +//! These objects mainly interface with the SimpleManyChannelMonitor when a +//! channel monitor is added or updated, and when they are all synced on startup. +use ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr}; +use chain::keysinterface::ChannelKeys; +use chain::transaction::OutPoint; +use std::collections::HashMap; + +/// ChannelDataPersister is responsible for persisting channel data: this could +/// mean writing once to disk, and/or uploading to several backup services. +pub trait ChannelDataPersister: Send + Sync { + /// The concrete type which signs for transactions and provides access to our channel public + /// keys. + type Keys: ChannelKeys; + + /// Persist one channel's data. All backups should agree on a channel's state. + /// The data can be stored with any file name/path, but the identifier provided + /// is the channel's outpoint. + fn persist_channel_data(&self, funding_txo: OutPoint, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + + /// Update one channel's data. + fn update_channel_data(&self, funding_txo: OutPoint, update: &ChannelMonitorUpdate, data: &ChannelMonitor) -> Result<(), ChannelMonitorUpdateErr>; + + /// Load the data for all channels. Generally only called on startup. + fn load_channel_data(&self) -> Result>, ChannelMonitorUpdateErr>; +} diff --git a/lightning/src/ln/functional_test_utils.rs b/lightning/src/ln/functional_test_utils.rs index b3d5e8c9e9d..783cea50a18 100644 --- a/lightning/src/ln/functional_test_utils.rs +++ b/lightning/src/ln/functional_test_utils.rs @@ -69,6 +69,7 @@ pub struct TestChanMonCfg { pub tx_broadcaster: test_utils::TestBroadcaster, pub fee_estimator: test_utils::TestFeeEstimator, pub chain_monitor: chaininterface::ChainWatchInterfaceUtil, + pub data_persister: test_utils::TestChanDataPersister, pub logger: test_utils::TestLogger, } @@ -174,7 +175,8 @@ impl<'a, 'b, 'c> Drop for Node<'a, 'b, 'c> { } let chain_watch = chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet); - let channel_monitor = test_utils::TestChannelMonitor::new(&chain_watch, self.tx_broadcaster.clone(), &self.logger, &feeest); + let data_persister = test_utils::TestChanDataPersister{}; + let channel_monitor = test_utils::TestChannelMonitor::new(&chain_watch, self.tx_broadcaster.clone(), &self.logger, &feeest, &data_persister); for deserialized_monitor in deserialized_monitors.drain(..) { if let Err(_) = channel_monitor.add_monitor(deserialized_monitor.get_funding_txo().0, deserialized_monitor) { panic!(); @@ -1070,7 +1072,8 @@ pub fn create_chanmon_cfgs(node_count: usize) -> Vec { let fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; let chain_monitor = chaininterface::ChainWatchInterfaceUtil::new(Network::Testnet); let logger = test_utils::TestLogger::with_id(format!("node {}", i)); - chan_mon_cfgs.push(TestChanMonCfg{ tx_broadcaster, fee_estimator, chain_monitor, logger }); + let data_persister = test_utils::TestChanDataPersister{}; + chan_mon_cfgs.push(TestChanMonCfg{ tx_broadcaster, fee_estimator, chain_monitor, data_persister, logger }); } chan_mon_cfgs @@ -1082,7 +1085,7 @@ pub fn create_node_cfgs<'a>(node_count: usize, chanmon_cfgs: &'a Vec; @@ -4299,7 +4300,8 @@ fn test_no_txn_manager_serialize_deserialize() { logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; - new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + data_persister = test_utils::TestChanDataPersister{}; + new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator, &data_persister); nodes[0].chan_monitor = &new_chan_monitor; let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor)>::read(&mut chan_0_monitor_read).unwrap(); @@ -4358,6 +4360,7 @@ fn test_manager_serialize_deserialize_events() { let node_cfgs = create_node_cfgs(2, &chanmon_cfgs); let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let fee_estimator: test_utils::TestFeeEstimator; + let data_persister: test_utils::TestChanDataPersister; let logger: test_utils::TestLogger; let new_chan_monitor: test_utils::TestChannelMonitor; let keys_manager: test_utils::TestKeysInterface; @@ -4407,7 +4410,8 @@ fn test_manager_serialize_deserialize_events() { fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; logger = test_utils::TestLogger::new(); - new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + data_persister = test_utils::TestChanDataPersister{}; + new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator, &data_persister); nodes[0].chan_monitor = &new_chan_monitor; let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor)>::read(&mut chan_0_monitor_read).unwrap(); @@ -4481,6 +4485,7 @@ fn test_simple_manager_serialize_deserialize() { let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); let logger: test_utils::TestLogger; let fee_estimator: test_utils::TestFeeEstimator; + let data_persister: test_utils::TestChanDataPersister; let new_chan_monitor: test_utils::TestChannelMonitor; let keys_manager: test_utils::TestKeysInterface; let nodes_0_deserialized: ChannelManager; @@ -4498,7 +4503,8 @@ fn test_simple_manager_serialize_deserialize() { logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; - new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + data_persister = test_utils::TestChanDataPersister{}; + new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator, &data_persister); nodes[0].chan_monitor = &new_chan_monitor; let mut chan_0_monitor_read = &chan_0_monitor_serialized.0[..]; let (_, mut chan_0_monitor) = <(BlockHash, ChannelMonitor)>::read(&mut chan_0_monitor_read).unwrap(); @@ -4540,6 +4546,7 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { let node_chanmgrs = create_node_chanmgrs(4, &node_cfgs, &[None, None, None, None]); let logger: test_utils::TestLogger; let fee_estimator: test_utils::TestFeeEstimator; + let data_persister: test_utils::TestChanDataPersister; let new_chan_monitor: test_utils::TestChannelMonitor; let keys_manager: test_utils::TestKeysInterface; let nodes_0_deserialized: ChannelManager; @@ -4576,7 +4583,8 @@ fn test_manager_serialize_deserialize_inconsistent_monitor() { logger = test_utils::TestLogger::new(); fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; - new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator); + data_persister = test_utils::TestChanDataPersister{}; + new_chan_monitor = test_utils::TestChannelMonitor::new(nodes[0].chain_monitor.clone(), nodes[0].tx_broadcaster.clone(), &logger, &fee_estimator, &data_persister); nodes[0].chan_monitor = &new_chan_monitor; let mut node_0_stale_monitors = Vec::new(); @@ -5687,7 +5695,7 @@ fn test_key_derivation_params() { // We manually create the node configuration to backup the seed. let seed = [42; 32]; let keys_manager = test_utils::TestKeysInterface::new(&seed, Network::Testnet); - let chan_monitor = test_utils::TestChannelMonitor::new(&chanmon_cfgs[0].chain_monitor, &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator); + let chan_monitor = test_utils::TestChannelMonitor::new(&chanmon_cfgs[0].chain_monitor, &chanmon_cfgs[0].tx_broadcaster, &chanmon_cfgs[0].logger, &chanmon_cfgs[0].fee_estimator, &chanmon_cfgs[0].data_persister); let node = NodeCfg { chain_monitor: &chanmon_cfgs[0].chain_monitor, logger: &chanmon_cfgs[0].logger, tx_broadcaster: &chanmon_cfgs[0].tx_broadcaster, fee_estimator: &chanmon_cfgs[0].fee_estimator, chan_monitor, keys_manager, node_seed: seed }; let mut node_cfgs = create_node_cfgs(3, &chanmon_cfgs); node_cfgs.remove(0); @@ -7849,6 +7857,7 @@ fn test_data_loss_protect() { // * we close channel in case of detecting other being fallen behind // * we are able to claim our own outputs thanks to to_remote being static let keys_manager; + let data_persister; let logger; let fee_estimator; let tx_broadcaster; @@ -7880,7 +7889,8 @@ fn test_data_loss_protect() { tx_broadcaster = test_utils::TestBroadcaster{txn_broadcasted: Mutex::new(Vec::new())}; fee_estimator = test_utils::TestFeeEstimator { sat_per_kw: 253 }; keys_manager = test_utils::TestKeysInterface::new(&nodes[0].node_seed, Network::Testnet); - monitor = test_utils::TestChannelMonitor::new(&chain_monitor, &tx_broadcaster, &logger, &fee_estimator); + data_persister = test_utils::TestChanDataPersister{}; + monitor = test_utils::TestChannelMonitor::new(&chain_monitor, &tx_broadcaster, &logger, &fee_estimator, &data_persister); node_state_0 = { let mut channel_monitors = HashMap::new(); channel_monitors.insert(OutPoint { txid: chan.3.txid(), index: 0 }, &mut chan_monitor); @@ -8712,7 +8722,7 @@ fn test_update_err_monitor_lockdown() { let new_monitor = <(BlockHash, channelmonitor::ChannelMonitor)>::read( &mut ::std::io::Cursor::new(&w.0)).unwrap().1; assert!(new_monitor == *monitor); - let watchtower = test_utils::TestChannelMonitor::new(&chain_monitor, &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator); + let watchtower = test_utils::TestChannelMonitor::new(&chain_monitor, &chanmon_cfgs[0].tx_broadcaster, &logger, &chanmon_cfgs[0].fee_estimator, &chanmon_cfgs[0].data_persister); assert!(watchtower.add_monitor(outpoint, new_monitor).is_ok()); watchtower }; diff --git a/lightning/src/ln/mod.rs b/lightning/src/ln/mod.rs index 1062df9ac27..90bdc3a704f 100644 --- a/lightning/src/ln/mod.rs +++ b/lightning/src/ln/mod.rs @@ -20,6 +20,7 @@ pub mod channelmanager; pub mod channelmonitor; +pub mod data_persister; pub mod msgs; pub mod peer_handler; pub mod chan_utils; diff --git a/lightning/src/util/test_utils.rs b/lightning/src/util/test_utils.rs index 78263ecacad..9616fc0bb68 100644 --- a/lightning/src/util/test_utils.rs +++ b/lightning/src/util/test_utils.rs @@ -16,6 +16,7 @@ use ln::features::{ChannelFeatures, InitFeatures}; use ln::msgs; use ln::msgs::OptionalField; use ln::channelmonitor::MonitorEvent; +use ln::data_persister::ChannelDataPersister; use util::enforcing_trait_impls::EnforcingChannelKeys; use util::events; use util::logger::{Logger, Level, Record}; @@ -63,18 +64,18 @@ impl chaininterface::FeeEstimator for TestFeeEstimator { pub struct TestChannelMonitor<'a> { pub added_monitors: Mutex)>>, pub latest_monitor_update_id: Mutex>, - pub simple_monitor: channelmonitor::SimpleManyChannelMonitor, + pub simple_monitor: channelmonitor::SimpleManyChannelMonitor, pub update_ret: Mutex>, // If this is set to Some(), after the next return, we'll always return this until update_ret // is changed: pub next_update_ret: Mutex>>, } impl<'a> TestChannelMonitor<'a> { - pub fn new(chain_monitor: &'a chaininterface::ChainWatchInterface, broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator) -> Self { + pub fn new(chain_monitor: &'a chaininterface::ChainWatchInterface, broadcaster: &'a chaininterface::BroadcasterInterface, logger: &'a TestLogger, fee_estimator: &'a TestFeeEstimator, data_persister: &'a TestChanDataPersister) -> Self { Self { added_monitors: Mutex::new(Vec::new()), latest_monitor_update_id: Mutex::new(HashMap::new()), - simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, fee_estimator), + simple_monitor: channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger, fee_estimator, data_persister), update_ret: Mutex::new(Ok(())), next_update_ret: Mutex::new(None), } @@ -134,6 +135,24 @@ impl<'a> channelmonitor::ManyChannelMonitor for TestChannelMonitor<'a> { } } +pub struct TestChanDataPersister {} + +impl ChannelDataPersister for TestChanDataPersister { + type Keys = EnforcingChannelKeys; + + fn persist_channel_data(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { + Ok(()) + } + + fn update_channel_data(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> { + Ok(()) + } + + fn load_channel_data(&self) -> Result>, channelmonitor::ChannelMonitorUpdateErr> { + Ok(HashMap::new()) + } +} + pub struct TestBroadcaster { pub txn_broadcasted: Mutex>, }