diff --git a/src/builder.rs b/src/builder.rs index b64fc23a0..b4a146e7c 100644 --- a/src/builder.rs +++ b/src/builder.rs @@ -31,8 +31,8 @@ use lightning::routing::scoring::{ }; use lightning::sign::{EntropySource, NodeSigner}; use lightning::util::persist::{ - read_channel_monitors, KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, - CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, + KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, + CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE, }; use lightning::util::ser::ReadableArgs; use lightning::util::sweep::OutputSweeper; @@ -66,7 +66,7 @@ use crate::runtime::Runtime; use crate::tx_broadcaster::TransactionBroadcaster; use crate::types::{ ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter, - OnionMessenger, PaymentStore, PeerManager, + OnionMessenger, PaymentStore, PeerManager, Persister, }; use crate::wallet::persist::KVStoreWalletPersister; use crate::wallet::Wallet; @@ -75,6 +75,7 @@ use crate::{Node, NodeMetrics}; const VSS_HARDENED_CHILD_INDEX: u32 = 877; const VSS_LNURL_AUTH_HARDENED_CHILD_INDEX: u32 = 138; const LSPS_HARDENED_CHILD_INDEX: u32 = 577; +const PERSISTER_MAX_PENDING_UPDATES: u64 = 100; #[derive(Debug, Clone)] enum ChainDataSourceConfig { @@ -1317,6 +1318,28 @@ fn build_with_store_internal( )); let peer_storage_key = keys_manager.get_peer_storage_key(); + let persister = Arc::new(Persister::new( + Arc::clone(&kv_store), + Arc::clone(&logger), + PERSISTER_MAX_PENDING_UPDATES, + Arc::clone(&keys_manager), + Arc::clone(&keys_manager), + Arc::clone(&tx_broadcaster), + Arc::clone(&fee_estimator), + )); + + // Read ChannelMonitor state from store + let channel_monitors = match persister.read_all_channel_monitors_with_updates() { + Ok(monitors) => monitors, + Err(e) => { + if e.kind() == lightning::io::ErrorKind::NotFound { + Vec::new() + } else { + log_error!(logger, "Failed to read channel monitors: {}", e.to_string()); + return Err(BuildError::ReadFailed); + } + }, + }; // Initialize the ChainMonitor let chain_monitor: Arc = Arc::new(chainmonitor::ChainMonitor::new( @@ -1324,7 +1347,7 @@ fn build_with_store_internal( Arc::clone(&tx_broadcaster), Arc::clone(&logger), Arc::clone(&fee_estimator), - Arc::clone(&kv_store), + Arc::clone(&persister), Arc::clone(&keys_manager), peer_storage_key, )); @@ -1371,23 +1394,6 @@ fn build_with_store_internal( scoring_fee_params, )); - // Read ChannelMonitor state from store - let channel_monitors = match read_channel_monitors( - Arc::clone(&kv_store), - Arc::clone(&keys_manager), - Arc::clone(&keys_manager), - ) { - Ok(monitors) => monitors, - Err(e) => { - if e.kind() == lightning::io::ErrorKind::NotFound { - Vec::new() - } else { - log_error!(logger, "Failed to read channel monitors: {}", e.to_string()); - return Err(BuildError::ReadFailed); - } - }, - }; - let mut user_config = default_user_config(&config); if liquidity_source_config.and_then(|lsc| lsc.lsps2_service.as_ref()).is_some() { diff --git a/src/io/test_utils.rs b/src/io/test_utils.rs index 22f1a4ea5..8fbf4279d 100644 --- a/src/io/test_utils.rs +++ b/src/io/test_utils.rs @@ -11,14 +11,28 @@ use std::path::PathBuf; use lightning::events::ClosureReason; use lightning::ln::functional_test_utils::{ connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block, - create_network, create_node_cfgs, create_node_chanmgrs, send_payment, + create_network, create_node_cfgs, create_node_chanmgrs, send_payment, TestChanMonCfg, }; -use lightning::util::persist::{read_channel_monitors, KVStoreSync, KVSTORE_NAMESPACE_KEY_MAX_LEN}; +use lightning::util::persist::{ + KVStoreSync, MonitorUpdatingPersister, KVSTORE_NAMESPACE_KEY_MAX_LEN, +}; + use lightning::util::test_utils; use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event}; use rand::distributions::Alphanumeric; use rand::{thread_rng, Rng}; +type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister< + &'a K, + &'a test_utils::TestLogger, + &'a test_utils::TestKeysInterface, + &'a test_utils::TestKeysInterface, + &'a test_utils::TestBroadcaster, + &'a test_utils::TestFeeEstimator, +>; + +const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5; + pub(crate) fn random_storage_path() -> PathBuf { let mut temp_path = std::env::temp_dir(); let mut rng = thread_rng(); @@ -77,27 +91,50 @@ pub(crate) fn do_read_write_remove_list_persist( assert_eq!(listed_keys.len(), 0); } +pub(crate) fn create_persister<'a, K: KVStoreSync + Sync>( + store: &'a K, chanmon_cfg: &'a TestChanMonCfg, max_pending_updates: u64, +) -> TestMonitorUpdatePersister<'a, K> { + MonitorUpdatingPersister::new( + store, + &chanmon_cfg.logger, + max_pending_updates, + &chanmon_cfg.keys_manager, + &chanmon_cfg.keys_manager, + &chanmon_cfg.tx_broadcaster, + &chanmon_cfg.fee_estimator, + ) +} + +pub(crate) fn create_chain_monitor<'a, K: KVStoreSync + Sync>( + chanmon_cfg: &'a TestChanMonCfg, persister: &'a TestMonitorUpdatePersister<'a, K>, +) -> test_utils::TestChainMonitor<'a> { + test_utils::TestChainMonitor::new( + Some(&chanmon_cfg.chain_source), + &chanmon_cfg.tx_broadcaster, + &chanmon_cfg.logger, + &chanmon_cfg.fee_estimator, + persister, + &chanmon_cfg.keys_manager, + ) +} + // Integration-test the given KVStore implementation. Test relaying a few payments and check that // the persisted data is updated the appropriate number of times. pub(crate) fn do_test_store(store_0: &K, store_1: &K) { + // This value is used later to limit how many iterations we perform. + let persister_0_max_pending_updates = 7; + // Intentionally set this to a smaller value to test a different alignment. + let persister_1_max_pending_updates = 3; + let chanmon_cfgs = create_chanmon_cfgs(2); + + let persister_0 = create_persister(store_0, &chanmon_cfgs[0], persister_0_max_pending_updates); + let persister_1 = create_persister(store_1, &chanmon_cfgs[1], persister_1_max_pending_updates); + + let chain_mon_0 = create_chain_monitor(&chanmon_cfgs[0], &persister_0); + let chain_mon_1 = create_chain_monitor(&chanmon_cfgs[1], &persister_1); + let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs); - let chain_mon_0 = test_utils::TestChainMonitor::new( - Some(&chanmon_cfgs[0].chain_source), - &chanmon_cfgs[0].tx_broadcaster, - &chanmon_cfgs[0].logger, - &chanmon_cfgs[0].fee_estimator, - store_0, - node_cfgs[0].keys_manager, - ); - let chain_mon_1 = test_utils::TestChainMonitor::new( - Some(&chanmon_cfgs[1].chain_source), - &chanmon_cfgs[1].tx_broadcaster, - &chanmon_cfgs[1].logger, - &chanmon_cfgs[1].fee_estimator, - store_1, - node_cfgs[1].keys_manager, - ); node_cfgs[0].chain_monitor = chain_mon_0; node_cfgs[1].chain_monitor = chain_mon_1; let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]); @@ -105,26 +142,20 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { // Check that the persisted channel data is empty before any channels are // open. - let mut persisted_chan_data_0 = - read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap(); + let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_0.len(), 0); - let mut persisted_chan_data_1 = - read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap(); + let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_1.len(), 0); // Helper to make sure the channel is on the expected update ID. macro_rules! check_persisted_data { ($expected_update_id: expr) => { - persisted_chan_data_0 = - read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager) - .unwrap(); + persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_0.len(), 1); for (_, mon) in persisted_chan_data_0.iter() { assert_eq!(mon.get_latest_update_id(), $expected_update_id); } - persisted_chan_data_1 = - read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager) - .unwrap(); + persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap(); assert_eq!(persisted_chan_data_1.len(), 1); for (_, mon) in persisted_chan_data_1.iter() { assert_eq!(mon.get_latest_update_id(), $expected_update_id); @@ -137,10 +168,29 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { check_persisted_data!(0); // Send a few payments and make sure the monitors are updated to the latest. - send_payment(&nodes[0], &vec![&nodes[1]][..], 8000000); - check_persisted_data!(5); - send_payment(&nodes[1], &vec![&nodes[0]][..], 4000000); - check_persisted_data!(10); + let expected_route = &[&nodes[1]][..]; + send_payment(&nodes[0], expected_route, 8_000_000); + check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT); + let expected_route = &[&nodes[0]][..]; + send_payment(&nodes[1], expected_route, 4_000_000); + check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT); + + // Send a few more payments to try all the alignments of max pending updates with + // updates for a payment sent and received. + let mut sender = 0; + for i in 3..=persister_0_max_pending_updates * 2 { + let receiver; + if sender == 0 { + sender = 1; + receiver = 0; + } else { + sender = 0; + receiver = 1; + } + let expected_route = &[&nodes[receiver]][..]; + send_payment(&nodes[sender], expected_route, 21_000); + check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT); + } // Force close because cooperative close doesn't result in any persisted // updates. @@ -163,27 +213,18 @@ pub(crate) fn do_test_store(store_0: &K, store_1: &K) { check_closed_broadcast!(nodes[0], true); check_added_monitors!(nodes[0], 1); - let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap(); + let node_txn = nodes[0].tx_broadcaster.txn_broadcast(); assert_eq!(node_txn.len(), 1); + let txn = vec![node_txn[0].clone(), node_txn[0].clone()]; + let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn); + connect_block(&nodes[1], &dummy_block); - connect_block( - &nodes[1], - &create_dummy_block( - nodes[0].best_block_hash(), - 42, - vec![node_txn[0].clone(), node_txn[0].clone()], - ), - ); check_closed_broadcast!(nodes[1], true); - check_closed_event!( - nodes[1], - 1, - ClosureReason::CommitmentTxConfirmed, - [nodes[0].node.get_our_node_id()], - 100000 - ); + let reason = ClosureReason::CommitmentTxConfirmed; + let node_id_0 = nodes[0].node.get_our_node_id(); + check_closed_event!(nodes[1], 1, reason, false, [node_id_0], 100000); check_added_monitors!(nodes[1], 1); // Make sure everything is persisted as expected after close. - check_persisted_data!(11); + check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1); } diff --git a/src/types.rs b/src/types.rs index ddd587985..2fc1c6488 100644 --- a/src/types.rs +++ b/src/types.rs @@ -19,7 +19,7 @@ use lightning::routing::gossip; use lightning::routing::router::DefaultRouter; use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters}; use lightning::sign::InMemorySigner; -use lightning::util::persist::{KVStore, KVStoreSync}; +use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister}; use lightning::util::ser::{Readable, Writeable, Writer}; use lightning::util::sweep::OutputSweeper; use lightning_block_sync::gossip::{GossipVerifier, UtxoSource}; @@ -49,13 +49,22 @@ where /// A type alias for [`SyncAndAsyncKVStore`] with `Sync`/`Send` markers; pub type DynStore = dyn SyncAndAsyncKVStore + Sync + Send; +pub type Persister = MonitorUpdatingPersister< + Arc, + Arc, + Arc, + Arc, + Arc, + Arc, +>; + pub(crate) type ChainMonitor = chainmonitor::ChainMonitor< InMemorySigner, Arc, Arc, Arc, Arc, - Arc, + Arc, Arc, >;