Skip to content

Commit

Permalink
Pass all BroadcastChannelUpdates through pending_broadcast_message pi…
Browse files Browse the repository at this point in the history
…peline

And fix the consequent test failures.

1. Resolved issues stemming from lack of peers to broadcast `msg_events`.
2. Updated handling of `BroadcastChannelUpdate` and `HandleErrorMessage`.

Introduced `connect_dummy_node` and `disconnect_dummy_node` functions in functional test utils to rectify the first type of failure.
Additionally, manually adjusted other tests to align with the updated `msg_events` ordering.
  • Loading branch information
shaavan committed Mar 21, 2024
1 parent 6222b1f commit ada8f6c
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 27 deletions.
1 change: 1 addition & 0 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3282,6 +3282,7 @@ fn do_test_durable_preimages_on_closed_channel(close_chans_before_reload: bool,
check_spends!(bs_preimage_tx, as_closing_tx[0]);

if !close_chans_before_reload {
// Connect a dummy node to allow broadcasting the close channel event.
check_closed_broadcast(&nodes[1], 1, true);
check_closed_event(&nodes[1], 1, ClosureReason::CommitmentTxConfirmed, false, &[nodes[0].node.get_our_node_id()], 100000);
} else {
Expand Down
49 changes: 36 additions & 13 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1980,7 +1980,8 @@ macro_rules! handle_error {
match $internal {
Ok(msg) => Ok(msg),
Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => {
let mut msg_events = Vec::with_capacity(2);
let mut msg_events = Vec::with_capacity(1);
let mut broadcast_events = Vec::with_capacity(1);

if let Some((shutdown_res, update_option)) = shutdown_finish {
let counterparty_node_id = shutdown_res.counterparty_node_id;
Expand All @@ -1992,7 +1993,7 @@ macro_rules! handle_error {

$self.finish_close_channel(shutdown_res);
if let Some(update) = update_option {
msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
broadcast_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand All @@ -2008,6 +2009,11 @@ macro_rules! handle_error {
});
}

if !broadcast_events.is_empty() {
let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.append(&mut broadcast_events);
}

if !msg_events.is_empty() {
let per_peer_state = $self.per_peer_state.read().unwrap();
if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) {
Expand Down Expand Up @@ -4042,6 +4048,8 @@ where
.ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();

for channel_id in channel_ids {
if !peer_state.has_channel(channel_id) {
return Err(APIError::ChannelUnavailable {
Expand All @@ -4058,7 +4066,7 @@ where
}
if let ChannelPhase::Funded(channel) = channel_phase {
if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
} else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
node_id: channel.context.get_counterparty_node_id(),
Expand Down Expand Up @@ -4938,6 +4946,7 @@ where
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
let counterparty_node_id = *counterparty_node_id;
peer_state.channel_by_id.retain(|chan_id, phase| {
match phase {
Expand Down Expand Up @@ -4968,7 +4977,7 @@ where
if n >= DISABLE_GOSSIP_TICKS {
chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand All @@ -4982,7 +4991,7 @@ where
if n >= ENABLE_GOSSIP_TICKS {
chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -6641,9 +6650,8 @@ where
}
if let Some(ChannelPhase::Funded(chan)) = chan_option {
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -7299,11 +7307,12 @@ where
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
if let hash_map::Entry::Occupied(chan_phase_entry) = peer_state.channel_by_id.entry(channel_id) {
if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) {
failed_channels.push(chan.context.force_shutdown(false, ClosureReason::HolderForceClosed));
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -7468,6 +7477,7 @@ where
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
peer_state.channel_by_id.retain(|channel_id, phase| {
match phase {
ChannelPhase::Funded(chan) => {
Expand All @@ -7488,7 +7498,7 @@ where
// We're done with this channel. We got a closing_signed and sent back
// a closing_signed with a closing transaction to broadcast.
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -8098,6 +8108,16 @@ where
self.pending_outbound_payments.clear_pending_payments()
}

/// Checks if at least one peer is connected.
pub fn is_some_peer_connected(&self) -> bool {
let peer_state = self.per_peer_state.read().unwrap();
for (_, peer_mutex) in peer_state.iter() {
let peer = peer_mutex.lock().unwrap();
if peer.is_connected { return true; }
}
false
}

/// When something which was blocking a channel from updating its [`ChannelMonitor`] (e.g. an
/// [`Event`] being handled) completes, this should be called to restore the channel to normal
/// operation. It will double-check that nothing *else* is also blocking the same channel from
Expand Down Expand Up @@ -8450,6 +8470,8 @@ where
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();

peer_state.channel_by_id.retain(|_, phase| {
match phase {
// Retain unfunded channels.
Expand Down Expand Up @@ -8522,7 +8544,7 @@ where
let reason_message = format!("{}", reason);
failed_channels.push(channel.context.force_shutdown(true, reason));
if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -8969,7 +8991,9 @@ where
// Gossip
&events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
// [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`]
// This check here is to ensure exhaustivity.
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => false,
&events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
&events::MessageSendEvent::SendChannelUpdate { .. } => false,
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
Expand Down Expand Up @@ -11880,7 +11904,6 @@ mod tests {
assert_eq!(nodes_0_lock.len(), 1);
assert!(nodes_0_lock.contains_key(&funding_output));
}

{
// At this stage, `nodes[1]` has proposed a fee for the closing transaction in the
// `handle_closing_signed` call above. As `nodes[1]` has not yet received the signature
Expand Down
56 changes: 50 additions & 6 deletions lightning/src/ln/functional_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1534,8 +1534,16 @@ macro_rules! check_warn_msg {
/// Check that a channel's closing channel update has been broadcasted, and optionally
/// check whether an error message event has occurred.
pub fn check_closed_broadcast(node: &Node, num_channels: usize, with_error_msg: bool) -> Vec<msgs::ErrorMessage> {
let mut dummy_connected = false;
if !node.node.is_some_peer_connected() {
connect_dummy_node(&node);
dummy_connected = true;
}
let msg_events = node.node.get_and_clear_pending_msg_events();
assert_eq!(msg_events.len(), if with_error_msg { num_channels * 2 } else { num_channels });
if dummy_connected {
disconnect_dummy_node(&node);
}
msg_events.into_iter().filter_map(|msg_event| {
match msg_event {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
Expand Down Expand Up @@ -3039,6 +3047,26 @@ pub fn create_network<'a, 'b: 'a, 'c: 'b>(node_count: usize, cfgs: &'b Vec<NodeC
nodes
}

pub fn connect_dummy_node<'a, 'b: 'a, 'c: 'b>(node: &Node<'a, 'b, 'c>) {
let node_id_dummy = PublicKey::from_slice(&[2; 33]).unwrap();

let mut dummy_init_features = InitFeatures::empty();
dummy_init_features.set_static_remote_key_required();

let init_dummy = msgs::Init {
features: dummy_init_features,
networks: None,
remote_network_address: None
};

node.node.peer_connected(&node_id_dummy, &init_dummy, true).unwrap();
node.onion_messenger.peer_connected(&node_id_dummy, &init_dummy, true).unwrap();
}

pub fn disconnect_dummy_node<'a, 'b: 'a, 'c: 'b>(node: &Node<'a, 'b, 'c>) {
node.node.peer_disconnected(&PublicKey::from_slice(&[2; 33]).unwrap());
}

// Note that the following only works for CLTV values up to 128
pub const ACCEPTED_HTLC_SCRIPT_WEIGHT: usize = 137; // Here we have a diff due to HTLC CLTV expiry being < 2^15 in test
pub const ACCEPTED_HTLC_SCRIPT_WEIGHT_ANCHORS: usize = 140; // Here we have a diff due to HTLC CLTV expiry being < 2^15 in test
Expand Down Expand Up @@ -3150,15 +3178,20 @@ pub fn check_preimage_claim<'a, 'b, 'c>(node: &Node<'a, 'b, 'c>, prev_txn: &Vec<
}

pub fn handle_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec<Node<'a, 'b, 'c>>, a: usize, b: usize, needs_err_handle: bool, expected_error: &str) {
let mut dummy_connected = false;
if !nodes[a].node.is_some_peer_connected() {
connect_dummy_node(&nodes[a]);
dummy_connected = true
}
let events_1 = nodes[a].node.get_and_clear_pending_msg_events();
assert_eq!(events_1.len(), 2);
let as_update = match events_1[0] {
let as_update = match events_1[1] {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
msg.clone()
},
_ => panic!("Unexpected event"),
};
match events_1[1] {
match events_1[0] {
MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => {
assert_eq!(node_id, nodes[b].node.get_our_node_id());
assert_eq!(msg.data, expected_error);
Expand All @@ -3175,17 +3208,24 @@ pub fn handle_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec<Node<'a, '
},
_ => panic!("Unexpected event"),
}

if dummy_connected {
disconnect_dummy_node(&nodes[a]);
dummy_connected = false;
}
if !nodes[b].node.is_some_peer_connected() {
connect_dummy_node(&nodes[b]);
dummy_connected = true;
}
let events_2 = nodes[b].node.get_and_clear_pending_msg_events();
assert_eq!(events_2.len(), if needs_err_handle { 1 } else { 2 });
let bs_update = match events_2[0] {
let bs_update = match events_2.last().unwrap() {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
msg.clone()
},
_ => panic!("Unexpected event"),
};
if !needs_err_handle {
match events_2[1] {
match events_2[0] {
MessageSendEvent::HandleError { node_id, action: msgs::ErrorAction::SendErrorMessage { ref msg } } => {
assert_eq!(node_id, nodes[a].node.get_our_node_id());
assert_eq!(msg.data, expected_error);
Expand All @@ -3197,7 +3237,11 @@ pub fn handle_announce_close_broadcast_events<'a, 'b, 'c>(nodes: &Vec<Node<'a, '
_ => panic!("Unexpected event"),
}
}

if dummy_connected {
disconnect_dummy_node(&nodes[b]);
// Commenting the assignment to remove `unused_assignments` warning.
// dummy_connected = false;
}
for node in nodes {
node.gossip_sync.handle_channel_update(&as_update).unwrap();
node.gossip_sync.handle_channel_update(&bs_update).unwrap();
Expand Down
19 changes: 11 additions & 8 deletions lightning/src/ln/functional_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2371,13 +2371,13 @@ fn channel_monitor_network_test() {
connect_blocks(&nodes[3], TEST_FINAL_CLTV + LATENCY_GRACE_PERIOD_BLOCKS + 1);
let events = nodes[3].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 2);
let close_chan_update_1 = match events[0] {
let close_chan_update_1 = match events[1] {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
msg.clone()
},
_ => panic!("Unexpected event"),
};
match events[1] {
match events[0] {
MessageSendEvent::HandleError { action: ErrorAction::DisconnectPeer { .. }, node_id } => {
assert_eq!(node_id, nodes[4].node.get_our_node_id());
},
Expand All @@ -2403,13 +2403,13 @@ fn channel_monitor_network_test() {
connect_blocks(&nodes[4], TEST_FINAL_CLTV - CLTV_CLAIM_BUFFER + 2);
let events = nodes[4].node.get_and_clear_pending_msg_events();
assert_eq!(events.len(), 2);
let close_chan_update_2 = match events[0] {
let close_chan_update_2 = match events[1] {
MessageSendEvent::BroadcastChannelUpdate { ref msg } => {
msg.clone()
},
_ => panic!("Unexpected event"),
};
match events[1] {
match events[0] {
MessageSendEvent::HandleError { action: ErrorAction::DisconnectPeer { .. }, node_id } => {
assert_eq!(node_id, nodes[3].node.get_our_node_id());
},
Expand Down Expand Up @@ -4605,7 +4605,7 @@ fn test_static_spendable_outputs_preimage_tx() {
MessageSendEvent::UpdateHTLCs { .. } => {},
_ => panic!("Unexpected event"),
}
match events[1] {
match events[2] {
MessageSendEvent::BroadcastChannelUpdate { .. } => {},
_ => panic!("Unexepected event"),
}
Expand Down Expand Up @@ -4648,7 +4648,7 @@ fn test_static_spendable_outputs_timeout_tx() {
mine_transaction(&nodes[1], &commitment_tx[0]);
check_added_monitors!(nodes[1], 1);
let events = nodes[1].node.get_and_clear_pending_msg_events();
match events[0] {
match events[1] {
MessageSendEvent::BroadcastChannelUpdate { .. } => {},
_ => panic!("Unexpected event"),
}
Expand Down Expand Up @@ -5062,7 +5062,7 @@ fn test_duplicate_payment_hash_one_failure_one_success() {
MessageSendEvent::UpdateHTLCs { .. } => {},
_ => panic!("Unexpected event"),
}
match events[1] {
match events[2] {
MessageSendEvent::BroadcastChannelUpdate { .. } => {},
_ => panic!("Unexepected event"),
}
Expand Down Expand Up @@ -5140,7 +5140,7 @@ fn test_dynamic_spendable_outputs_local_htlc_success_tx() {
MessageSendEvent::UpdateHTLCs { .. } => {},
_ => panic!("Unexpected event"),
}
match events[1] {
match events[2] {
MessageSendEvent::BroadcastChannelUpdate { .. } => {},
_ => panic!("Unexepected event"),
}
Expand Down Expand Up @@ -7321,6 +7321,9 @@ fn test_announce_disable_channels() {
let node_chanmgrs = create_node_chanmgrs(2, &node_cfgs, &[None, None]);
let nodes = create_network(2, &node_cfgs, &node_chanmgrs);

// Connect a dummy node for proper future events broadcasting
connect_dummy_node(&nodes[0]);

create_announced_chan_between_nodes(&nodes, 0, 1);
create_announced_chan_between_nodes(&nodes, 1, 0);
create_announced_chan_between_nodes(&nodes, 0, 1);
Expand Down

0 comments on commit ada8f6c

Please sign in to comment.