Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 27 additions & 21 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ use lightning::routing::scoring::{
};
use lightning::sign::{EntropySource, NodeSigner};
use lightning::util::persist::{
read_channel_monitors, KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY,
CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE, CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
KVStoreSync, CHANNEL_MANAGER_PERSISTENCE_KEY, CHANNEL_MANAGER_PERSISTENCE_PRIMARY_NAMESPACE,
CHANNEL_MANAGER_PERSISTENCE_SECONDARY_NAMESPACE,
};
use lightning::util::ser::ReadableArgs;
use lightning::util::sweep::OutputSweeper;
Expand Down Expand Up @@ -66,7 +66,7 @@ use crate::runtime::Runtime;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
ChainMonitor, ChannelManager, DynStore, GossipSync, Graph, KeysManager, MessageRouter,
OnionMessenger, PaymentStore, PeerManager,
OnionMessenger, PaymentStore, PeerManager, Persister,
};
use crate::wallet::persist::KVStoreWalletPersister;
use crate::wallet::Wallet;
Expand All @@ -75,6 +75,7 @@ use crate::{Node, NodeMetrics};
const VSS_HARDENED_CHILD_INDEX: u32 = 877;
const VSS_LNURL_AUTH_HARDENED_CHILD_INDEX: u32 = 138;
const LSPS_HARDENED_CHILD_INDEX: u32 = 577;
const PERSISTER_MAX_PENDING_UPDATES: u64 = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there science to the number 100?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, just handwaving currently. There has been some benchmarking over at lightningdevkit/rust-lightning#3834 but it never got anywhere super conclusive. Let me know if you have a better guess.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not seem unreasonable. Going higher is probably not gaining much. Maybe it could be a bit lower. But also fine as is.


#[derive(Debug, Clone)]
enum ChainDataSourceConfig {
Expand Down Expand Up @@ -1317,14 +1318,36 @@ fn build_with_store_internal(
));

let peer_storage_key = keys_manager.get_peer_storage_key();
let persister = Arc::new(Persister::new(
Arc::clone(&kv_store),
Arc::clone(&logger),
PERSISTER_MAX_PENDING_UPDATES,
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
Arc::clone(&tx_broadcaster),
Arc::clone(&fee_estimator),
));

// Read ChannelMonitor state from store
let channel_monitors = match persister.read_all_channel_monitors_with_updates() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the code move?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'd have to ask OP for the original reason, I left it in as grouping the channel monitor IO with setting up the persister and ChainMonitor made sense to me, but there might be different ways to think about it, probably. Let me know if you'd prefer to revert the change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's ok. The origin of the comment was diff optimization.

Ok(monitors) => monitors,
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound {
Vec::new()
} else {
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
return Err(BuildError::ReadFailed);
}
},
};

// Initialize the ChainMonitor
let chain_monitor: Arc<ChainMonitor> = Arc::new(chainmonitor::ChainMonitor::new(
Some(Arc::clone(&chain_source)),
Arc::clone(&tx_broadcaster),
Arc::clone(&logger),
Arc::clone(&fee_estimator),
Arc::clone(&kv_store),
Arc::clone(&persister),
Arc::clone(&keys_manager),
peer_storage_key,
));
Expand Down Expand Up @@ -1371,23 +1394,6 @@ fn build_with_store_internal(
scoring_fee_params,
));

// Read ChannelMonitor state from store
let channel_monitors = match read_channel_monitors(
Arc::clone(&kv_store),
Arc::clone(&keys_manager),
Arc::clone(&keys_manager),
) {
Ok(monitors) => monitors,
Err(e) => {
if e.kind() == lightning::io::ErrorKind::NotFound {
Vec::new()
} else {
log_error!(logger, "Failed to read channel monitors: {}", e.to_string());
return Err(BuildError::ReadFailed);
}
},
};

let mut user_config = default_user_config(&config);

if liquidity_source_config.and_then(|lsc| lsc.lsps2_service.as_ref()).is_some() {
Expand Down
139 changes: 90 additions & 49 deletions src/io/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,28 @@ use std::path::PathBuf;
use lightning::events::ClosureReason;
use lightning::ln::functional_test_utils::{
connect_block, create_announced_chan_between_nodes, create_chanmon_cfgs, create_dummy_block,
create_network, create_node_cfgs, create_node_chanmgrs, send_payment,
create_network, create_node_cfgs, create_node_chanmgrs, send_payment, TestChanMonCfg,
};
use lightning::util::persist::{read_channel_monitors, KVStoreSync, KVSTORE_NAMESPACE_KEY_MAX_LEN};
use lightning::util::persist::{
KVStoreSync, MonitorUpdatingPersister, KVSTORE_NAMESPACE_KEY_MAX_LEN,
};

use lightning::util::test_utils;
use lightning::{check_added_monitors, check_closed_broadcast, check_closed_event};
use rand::distributions::Alphanumeric;
use rand::{thread_rng, Rng};

type TestMonitorUpdatePersister<'a, K> = MonitorUpdatingPersister<
&'a K,
&'a test_utils::TestLogger,
&'a test_utils::TestKeysInterface,
&'a test_utils::TestKeysInterface,
&'a test_utils::TestBroadcaster,
&'a test_utils::TestFeeEstimator,
>;

const EXPECTED_UPDATES_PER_PAYMENT: u64 = 5;

pub(crate) fn random_storage_path() -> PathBuf {
let mut temp_path = std::env::temp_dir();
let mut rng = thread_rng();
Expand Down Expand Up @@ -77,54 +91,71 @@ pub(crate) fn do_read_write_remove_list_persist<K: KVStoreSync + RefUnwindSafe>(
assert_eq!(listed_keys.len(), 0);
}

pub(crate) fn create_persister<'a, K: KVStoreSync + Sync>(
store: &'a K, chanmon_cfg: &'a TestChanMonCfg, max_pending_updates: u64,
) -> TestMonitorUpdatePersister<'a, K> {
MonitorUpdatingPersister::new(
store,
&chanmon_cfg.logger,
max_pending_updates,
&chanmon_cfg.keys_manager,
&chanmon_cfg.keys_manager,
&chanmon_cfg.tx_broadcaster,
&chanmon_cfg.fee_estimator,
)
}

pub(crate) fn create_chain_monitor<'a, K: KVStoreSync + Sync>(
chanmon_cfg: &'a TestChanMonCfg, persister: &'a TestMonitorUpdatePersister<'a, K>,
) -> test_utils::TestChainMonitor<'a> {
test_utils::TestChainMonitor::new(
Some(&chanmon_cfg.chain_source),
&chanmon_cfg.tx_broadcaster,
&chanmon_cfg.logger,
&chanmon_cfg.fee_estimator,
persister,
&chanmon_cfg.keys_manager,
)
}

// Integration-test the given KVStore implementation. Test relaying a few payments and check that
// the persisted data is updated the appropriate number of times.
pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
// This value is used later to limit how many iterations we perform.
let persister_0_max_pending_updates = 7;
// Intentionally set this to a smaller value to test a different alignment.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've seen this also in rust-lightning, and also there I didn't understand what alignment there is to test because the nodes are independent?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you'd have to ask the OP regarding the comment, but it's also of course not wrong to set a different value here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It isn't wrong, but thought it might be worth knowing a bit more about it now that you are taking over.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@joostjager I think I wrote this sentence first in rust-lightning in this PR: https://github.com/lightningdevkit/rust-lightning/pull/2359/files#diff-0f1c2f21752b2bf468479020ff092a484d8b0a4a1c018241e3a127b62aa51de2R914

I think what we were concerned with was weird edge cases / off-by-one errors in the compaction algorithm in particular. I think there was a state of this PR where odd numbered maximum_pending_updates would drop one or something. Initially we just tested the kind of obvious/likely powers of 10. So deliberately using a few different primes became a thing.

let persister_1_max_pending_updates = 3;

let chanmon_cfgs = create_chanmon_cfgs(2);

let persister_0 = create_persister(store_0, &chanmon_cfgs[0], persister_0_max_pending_updates);
let persister_1 = create_persister(store_1, &chanmon_cfgs[1], persister_1_max_pending_updates);

let chain_mon_0 = create_chain_monitor(&chanmon_cfgs[0], &persister_0);
let chain_mon_1 = create_chain_monitor(&chanmon_cfgs[1], &persister_1);

let mut node_cfgs = create_node_cfgs(2, &chanmon_cfgs);
let chain_mon_0 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[0].chain_source),
&chanmon_cfgs[0].tx_broadcaster,
&chanmon_cfgs[0].logger,
&chanmon_cfgs[0].fee_estimator,
store_0,
node_cfgs[0].keys_manager,
);
let chain_mon_1 = test_utils::TestChainMonitor::new(
Some(&chanmon_cfgs[1].chain_source),
&chanmon_cfgs[1].tx_broadcaster,
&chanmon_cfgs[1].logger,
&chanmon_cfgs[1].fee_estimator,
store_1,
node_cfgs[1].keys_manager,
);
node_cfgs[0].chain_monitor = chain_mon_0;
node_cfgs[1].chain_monitor = chain_mon_1;
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Check that the persisted channel data is empty before any channels are
// open.
let mut persisted_chan_data_0 =
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager).unwrap();
let mut persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 0);
let mut persisted_chan_data_1 =
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager).unwrap();
let mut persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 0);

// Helper to make sure the channel is on the expected update ID.
macro_rules! check_persisted_data {
($expected_update_id: expr) => {
persisted_chan_data_0 =
read_channel_monitors(store_0, nodes[0].keys_manager, nodes[0].keys_manager)
.unwrap();
persisted_chan_data_0 = persister_0.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_0.len(), 1);
for (_, mon) in persisted_chan_data_0.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
}
persisted_chan_data_1 =
read_channel_monitors(store_1, nodes[1].keys_manager, nodes[1].keys_manager)
.unwrap();
persisted_chan_data_1 = persister_1.read_all_channel_monitors_with_updates().unwrap();
assert_eq!(persisted_chan_data_1.len(), 1);
for (_, mon) in persisted_chan_data_1.iter() {
assert_eq!(mon.get_latest_update_id(), $expected_update_id);
Expand All @@ -137,10 +168,29 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
check_persisted_data!(0);

// Send a few payments and make sure the monitors are updated to the latest.
send_payment(&nodes[0], &vec![&nodes[1]][..], 8000000);
check_persisted_data!(5);
send_payment(&nodes[1], &vec![&nodes[0]][..], 4000000);
check_persisted_data!(10);
let expected_route = &[&nodes[1]][..];
send_payment(&nodes[0], expected_route, 8_000_000);
check_persisted_data!(EXPECTED_UPDATES_PER_PAYMENT);
let expected_route = &[&nodes[0]][..];
send_payment(&nodes[1], expected_route, 4_000_000);
check_persisted_data!(2 * EXPECTED_UPDATES_PER_PAYMENT);

// Send a few more payments to try all the alignments of max pending updates with
// updates for a payment sent and received.
let mut sender = 0;
for i in 3..=persister_0_max_pending_updates * 2 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This also seems a bit overkill at the ldk node level?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhh, a bit more test coverage doesn't hurt? FWIW, this could be made a proptest to have it be less deterministic, but apart from that there isn't much harm?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't hurt in terms over coverage, but it is probably duplicative, and code carries cost.

let receiver;
if sender == 0 {
sender = 1;
receiver = 0;
} else {
sender = 0;
receiver = 1;
}
let expected_route = &[&nodes[receiver]][..];
send_payment(&nodes[sender], expected_route, 21_000);
check_persisted_data!(i * EXPECTED_UPDATES_PER_PAYMENT);
}

// Force close because cooperative close doesn't result in any persisted
// updates.
Expand All @@ -163,27 +213,18 @@ pub(crate) fn do_test_store<K: KVStoreSync + Sync>(store_0: &K, store_1: &K) {
check_closed_broadcast!(nodes[0], true);
check_added_monitors!(nodes[0], 1);

let node_txn = nodes[0].tx_broadcaster.txn_broadcasted.lock().unwrap();
let node_txn = nodes[0].tx_broadcaster.txn_broadcast();
assert_eq!(node_txn.len(), 1);
let txn = vec![node_txn[0].clone(), node_txn[0].clone()];
let dummy_block = create_dummy_block(nodes[0].best_block_hash(), 42, txn);
connect_block(&nodes[1], &dummy_block);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some independent refactoring here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, doesn't hurt either though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to say that it is better to isolate in a separate commit.


connect_block(
&nodes[1],
&create_dummy_block(
nodes[0].best_block_hash(),
42,
vec![node_txn[0].clone(), node_txn[0].clone()],
),
);
check_closed_broadcast!(nodes[1], true);
check_closed_event!(
nodes[1],
1,
ClosureReason::CommitmentTxConfirmed,
[nodes[0].node.get_our_node_id()],
100000
);
let reason = ClosureReason::CommitmentTxConfirmed;
let node_id_0 = nodes[0].node.get_our_node_id();
check_closed_event!(nodes[1], 1, reason, false, [node_id_0], 100000);
check_added_monitors!(nodes[1], 1);

// Make sure everything is persisted as expected after close.
check_persisted_data!(11);
check_persisted_data!(persister_0_max_pending_updates * 2 * EXPECTED_UPDATES_PER_PAYMENT + 1);
}
13 changes: 11 additions & 2 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use lightning::routing::gossip;
use lightning::routing::router::DefaultRouter;
use lightning::routing::scoring::{ProbabilisticScorer, ProbabilisticScoringFeeParameters};
use lightning::sign::InMemorySigner;
use lightning::util::persist::{KVStore, KVStoreSync};
use lightning::util::persist::{KVStore, KVStoreSync, MonitorUpdatingPersister};
use lightning::util::ser::{Readable, Writeable, Writer};
use lightning::util::sweep::OutputSweeper;
use lightning_block_sync::gossip::{GossipVerifier, UtxoSource};
Expand Down Expand Up @@ -49,13 +49,22 @@ where
/// A type alias for [`SyncAndAsyncKVStore`] with `Sync`/`Send` markers;
pub type DynStore = dyn SyncAndAsyncKVStore + Sync + Send;

pub type Persister = MonitorUpdatingPersister<
Arc<DynStore>,
Arc<Logger>,
Arc<KeysManager>,
Arc<KeysManager>,
Arc<Broadcaster>,
Arc<OnchainFeeEstimator>,
>;

pub(crate) type ChainMonitor = chainmonitor::ChainMonitor<
InMemorySigner,
Arc<ChainSource>,
Arc<Broadcaster>,
Arc<OnchainFeeEstimator>,
Arc<Logger>,
Arc<DynStore>,
Arc<Persister>,
Arc<KeysManager>,
>;

Expand Down
Loading