Skip to content

Commit

Permalink
Delay broadcasting Channel Updates until connected to peers
Browse files Browse the repository at this point in the history
    - We might generate channel updates to be broadcast when
      we are not connected to any peers to broadcast them to.
    - This PR ensures to cache them and broadcast them only when
      we are connected to some peers.

Other Changes:
    1. Introduce a test.
    2. Update the relevant current tests affected by this change.
    3. Fix a typo.
    4. Introduce two functions in functional_utils that optionally
       connect and disconnect a dummy node during broadcast testing.
  • Loading branch information
shaavan committed Apr 2, 2024
1 parent 5bf58f0 commit 6647bff
Show file tree
Hide file tree
Showing 3 changed files with 178 additions and 42 deletions.
134 changes: 106 additions & 28 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ pub(super) struct PeerState<SP: Deref> where SP::Target: SignerProvider {
/// The peer is currently connected (i.e. we've seen a
/// [`ChannelMessageHandler::peer_connected`] and no corresponding
/// [`ChannelMessageHandler::peer_disconnected`].
is_connected: bool,
pub is_connected: bool,
}

impl <SP: Deref> PeerState<SP> where SP::Target: SignerProvider {
Expand Down Expand Up @@ -1392,6 +1392,9 @@ where

pending_offers_messages: Mutex<Vec<PendingOnionMessage<OffersMessage>>>,

/// Tracks the message events that are to be broadcasted when we are connected to some peer.
pending_broadcast_messages: Mutex<Vec<MessageSendEvent>>,

entropy_source: ES,
node_signer: NS,
signer_provider: SP,
Expand Down Expand Up @@ -1976,7 +1979,7 @@ macro_rules! handle_error {
match $internal {
Ok(msg) => Ok(msg),
Err(MsgHandleErrInternal { err, shutdown_finish, .. }) => {
let mut msg_events = Vec::with_capacity(2);
let mut msg_event = None;

if let Some((shutdown_res, update_option)) = shutdown_finish {
let counterparty_node_id = shutdown_res.counterparty_node_id;
Expand All @@ -1988,7 +1991,8 @@ macro_rules! handle_error {

$self.finish_close_channel(shutdown_res);
if let Some(update) = update_option {
msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = $self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand All @@ -1998,17 +2002,17 @@ macro_rules! handle_error {

if let msgs::ErrorAction::IgnoreError = err.action {
} else {
msg_events.push(events::MessageSendEvent::HandleError {
msg_event = Some(events::MessageSendEvent::HandleError {
node_id: $counterparty_node_id,
action: err.action.clone()
});
}

if !msg_events.is_empty() {
if let Some(msg_event) = msg_event {
let per_peer_state = $self.per_peer_state.read().unwrap();
if let Some(peer_state_mutex) = per_peer_state.get(&$counterparty_node_id) {
let mut peer_state = peer_state_mutex.lock().unwrap();
peer_state.pending_msg_events.append(&mut msg_events);
peer_state.pending_msg_events.push(msg_event);
}
}

Expand Down Expand Up @@ -2466,6 +2470,7 @@ where
funding_batch_states: Mutex::new(BTreeMap::new()),

pending_offers_messages: Mutex::new(Vec::new()),
pending_broadcast_messages: Mutex::new(Vec::new()),

entropy_source,
node_signer,
Expand Down Expand Up @@ -2957,17 +2962,11 @@ where
}
};
if let Some(update) = update_opt {
// Try to send the `BroadcastChannelUpdate` to the peer we just force-closed on, but if
// not try to broadcast it via whatever peer we have.
let per_peer_state = self.per_peer_state.read().unwrap();
let a_peer_state_opt = per_peer_state.get(peer_node_id)
.ok_or(per_peer_state.values().next());
if let Ok(a_peer_state_mutex) = a_peer_state_opt {
let mut a_peer_state = a_peer_state_mutex.lock().unwrap();
a_peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
// If we have some Channel Update to broadcast, we cache it and broadcast it later.
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}

Ok(counterparty_node_id)
Expand Down Expand Up @@ -4043,6 +4042,7 @@ where
.ok_or_else(|| APIError::ChannelUnavailable { err: format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id) })?;
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;

for channel_id in channel_ids {
if !peer_state.has_channel(channel_id) {
return Err(APIError::ChannelUnavailable {
Expand All @@ -4059,7 +4059,8 @@ where
}
if let ChannelPhase::Funded(channel) = channel_phase {
if let Ok(msg) = self.get_channel_update_for_broadcast(channel) {
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate { msg });
} else if let Ok(msg) = self.get_channel_update_for_unicast(channel) {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendChannelUpdate {
node_id: channel.context.get_counterparty_node_id(),
Expand Down Expand Up @@ -4969,7 +4970,8 @@ where
if n >= DISABLE_GOSSIP_TICKS {
chan.set_channel_update_status(ChannelUpdateStatus::Disabled);
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand All @@ -4983,7 +4985,8 @@ where
if n >= ENABLE_GOSSIP_TICKS {
chan.set_channel_update_status(ChannelUpdateStatus::Enabled);
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -6642,9 +6645,8 @@ where
}
if let Some(ChannelPhase::Funded(chan)) = chan_option {
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
peer_state.pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -7304,7 +7306,8 @@ where
if let ChannelPhase::Funded(mut chan) = remove_channel_phase!(self, chan_phase_entry) {
failed_channels.push(chan.context.force_shutdown(false, ClosureReason::HolderForceClosed));
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -7489,7 +7492,8 @@ where
// We're done with this channel. We got a closing_signed and sent back
// a closing_signed with a closing transaction to broadcast.
if let Ok(update) = self.get_channel_update_for_broadcast(&chan) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -8209,7 +8213,7 @@ where
/// will randomly be placed first or last in the returned array.
///
/// Note that even though `BroadcastChannelAnnouncement` and `BroadcastChannelUpdate`
/// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be pleaced among
/// `MessageSendEvent`s are intended to be broadcasted to all peers, they will be placed among
/// the `MessageSendEvent`s to the specific peer they were generated under.
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> {
let events = RefCell::new(Vec::new());
Expand All @@ -8229,6 +8233,7 @@ where
result = NotifyOption::DoPersist;
}

let mut is_any_peer_connected = false;
let mut pending_events = Vec::new();
let per_peer_state = self.per_peer_state.read().unwrap();
for (_cp_id, peer_state_mutex) in per_peer_state.iter() {
Expand All @@ -8237,6 +8242,15 @@ where
if peer_state.pending_msg_events.len() > 0 {
pending_events.append(&mut peer_state.pending_msg_events);
}
if peer_state.is_connected {
is_any_peer_connected = true
}
}

// Ensure that we are connected to some peers before getting broadcast messages.
if is_any_peer_connected {
let mut broadcast_msgs = self.pending_broadcast_messages.lock().unwrap();
pending_events.append(&mut broadcast_msgs);
}

if !pending_events.is_empty() {
Expand Down Expand Up @@ -8441,6 +8455,7 @@ where
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
let pending_msg_events = &mut peer_state.pending_msg_events;

peer_state.channel_by_id.retain(|_, phase| {
match phase {
// Retain unfunded channels.
Expand Down Expand Up @@ -8513,7 +8528,8 @@ where
let reason_message = format!("{}", reason);
failed_channels.push(channel.context.force_shutdown(true, reason));
if let Ok(update) = self.get_channel_update_for_broadcast(&channel) {
pending_msg_events.push(events::MessageSendEvent::BroadcastChannelUpdate {
let mut pending_broadcast_messages = self.pending_broadcast_messages.lock().unwrap();
pending_broadcast_messages.push(events::MessageSendEvent::BroadcastChannelUpdate {
msg: update
});
}
Expand Down Expand Up @@ -8960,7 +8976,12 @@ where
// Gossip
&events::MessageSendEvent::SendChannelAnnouncement { .. } => false,
&events::MessageSendEvent::BroadcastChannelAnnouncement { .. } => true,
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => true,
// [`ChannelManager::pending_broadcast_events`] holds the [`BroadcastChannelUpdate`]
// This check here is to ensure exhaustivity.
&events::MessageSendEvent::BroadcastChannelUpdate { .. } => {
debug_assert!(false, "This event shouldn't have been here");
false
},
&events::MessageSendEvent::BroadcastNodeAnnouncement { .. } => true,
&events::MessageSendEvent::SendChannelUpdate { .. } => false,
&events::MessageSendEvent::SendChannelRangeQuery { .. } => false,
Expand Down Expand Up @@ -11149,6 +11170,8 @@ where

pending_offers_messages: Mutex::new(Vec::new()),

pending_broadcast_messages: Mutex::new(Vec::new()),

entropy_source: args.entropy_source,
node_signer: args.node_signer,
signer_provider: args.signer_provider,
Expand Down Expand Up @@ -11678,6 +11701,61 @@ mod tests {
}
}

#[test]
fn test_channel_update_cached() {
let chanmon_cfgs = create_chanmon_cfgs(3);
let node_cfgs = create_node_cfgs(3, &chanmon_cfgs);
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let nodes = create_network(3, &node_cfgs, &node_chanmgrs);

let chan = create_announced_chan_between_nodes(&nodes, 0, 1);

nodes[0].node.force_close_channel_with_peer(&chan.2, &nodes[1].node.get_our_node_id(), None, true).unwrap();
check_added_monitors!(nodes[0], 1);
check_closed_event!(nodes[0], 1, ClosureReason::HolderForceClosed, [nodes[1].node.get_our_node_id()], 100000);

// Confirm that the channel_update was not sent immediately to node[1] but was cached.
let node_1_events = nodes[1].node.get_and_clear_pending_msg_events();
assert_eq!(node_1_events.len(), 0);

{
// Assert that ChannelUpdate message has been added to node[0] pending broadcast messages
let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
assert_eq!(pending_broadcast_messages.len(), 1);
}

// Test that we do not retrieve the pending broadcast messages when we are not connected to any peer
nodes[0].node.peer_disconnected(&nodes[1].node.get_our_node_id());
nodes[1].node.peer_disconnected(&nodes[0].node.get_our_node_id());

nodes[0].node.peer_disconnected(&nodes[2].node.get_our_node_id());
nodes[2].node.peer_disconnected(&nodes[0].node.get_our_node_id());

let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(node_0_events.len(), 0);

// Now we reconnect to a peer
nodes[0].node.peer_connected(&nodes[2].node.get_our_node_id(), &msgs::Init {
features: nodes[2].node.init_features(), networks: None, remote_network_address: None
}, true).unwrap();
nodes[2].node.peer_connected(&nodes[0].node.get_our_node_id(), &msgs::Init {
features: nodes[0].node.init_features(), networks: None, remote_network_address: None
}, false).unwrap();

// Confirm that get_and_clear_pending_msg_events correctly captures pending broadcast messages
let node_0_events = nodes[0].node.get_and_clear_pending_msg_events();
assert_eq!(node_0_events.len(), 1);
match &node_0_events[0] {
MessageSendEvent::BroadcastChannelUpdate { .. } => (),
_ => panic!("Unexpected event"),
}
{
// Assert that ChannelUpdate message has been cleared from nodes[0] pending broadcast messages
let pending_broadcast_messages= nodes[0].node.pending_broadcast_messages.lock().unwrap();
assert_eq!(pending_broadcast_messages.len(), 0);
}
}

#[test]
fn test_drop_disconnected_peers_when_removing_channels() {
let chanmon_cfgs = create_chanmon_cfgs(2);
Expand Down

0 comments on commit 6647bff

Please sign in to comment.