Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 94 additions & 120 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3288,8 +3288,9 @@ macro_rules! locked_close_channel {
}};
($self: ident, $peer_state: expr, $funded_chan: expr, $shutdown_res_mut: expr, FUNDED) => {{
if let Some((_, funding_txo, _, update)) = $shutdown_res_mut.monitor_update.take() {
handle_new_monitor_update_actions_deferred!($self, funding_txo, update, $peer_state,
$funded_chan.context);
handle_new_monitor_update_locked_actions_handled_by_caller!(
$self, funding_txo, update, $peer_state, $funded_chan.context
);
}
// If there's a possibility that we need to generate further monitor updates for this
// channel, we need to store the last update_id of it. However, we don't want to insert
Expand Down Expand Up @@ -3695,40 +3696,82 @@ macro_rules! handle_initial_monitor {
};
}

fn handle_new_monitor_update_internal<CM: AChannelManager, LG: Logger>(
cm: &CM,
in_flight_monitor_updates: &mut BTreeMap<ChannelId, (OutPoint, Vec<ChannelMonitorUpdate>)>,
channel_id: ChannelId, funding_txo: OutPoint, counterparty_node_id: PublicKey,
new_update: ChannelMonitorUpdate, logger: LG,
) -> (bool, bool) {
let in_flight_updates = &mut in_flight_monitor_updates
.entry(channel_id)
.or_insert_with(|| (funding_txo, Vec::new()))
.1;
// During startup, we push monitor updates as background events through to here in
// order to replay updates that were in-flight when we shut down. Thus, we have to
// filter for uniqueness here.
let update_idx =
in_flight_updates.iter().position(|upd| upd == &new_update).unwrap_or_else(|| {
in_flight_updates.push(new_update);
in_flight_updates.len() - 1
});

if cm.get_cm().background_events_processed_since_startup.load(Ordering::Acquire) {
let update_res =
cm.get_cm().chain_monitor.update_channel(channel_id, &in_flight_updates[update_idx]);
let update_completed = handle_monitor_update_res(cm, update_res, channel_id, logger);
if update_completed {
let _ = in_flight_updates.remove(update_idx);
}
(update_completed, update_completed && in_flight_updates.is_empty())
} else {
// We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we
// fail to persist it. This is a fairly safe assumption, however, since anything we do
// during the startup sequence should be replayed exactly if we immediately crash.
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id,
funding_txo,
channel_id,
update: in_flight_updates[update_idx].clone(),
};
// We want to track the in-flight update both in `in_flight_monitor_updates` and in
// `pending_background_events` to avoid a race condition during
// `pending_background_events` processing where we complete one
// `ChannelMonitorUpdate` (but there are more pending as background events) but we
// conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to
// run post-completion actions.
// We could work around that with some effort, but its simpler to just track updates
// twice.
cm.get_cm().pending_background_events.lock().unwrap().push(event);
(false, false)
}
}

macro_rules! handle_post_close_monitor_update {
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
$per_peer_state_lock: expr, $counterparty_node_id: expr, $channel_id: expr
) => {{
let logger =
WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None);
let in_flight_updates;
let idx;
handle_new_monitor_update_internal!(
let (update_completed, all_updates_complete) = handle_new_monitor_update_internal(
$self,
$funding_txo,
$update,
$peer_state,
logger,
&mut $peer_state.in_flight_monitor_updates,
$channel_id,
$funding_txo,
$counterparty_node_id,
in_flight_updates,
idx,
{
let _ = in_flight_updates.remove(idx);
if in_flight_updates.is_empty() {
let update_actions = $peer_state
.monitor_update_blocked_actions
.remove(&$channel_id)
.unwrap_or(Vec::new());
$update,
WithContext::from(&$self.logger, Some($counterparty_node_id), Some($channel_id), None),
);
if all_updates_complete {
let update_actions = $peer_state
.monitor_update_blocked_actions
.remove(&$channel_id)
.unwrap_or(Vec::new());

mem::drop($peer_state_lock);
mem::drop($per_peer_state_lock);
mem::drop($peer_state_lock);
mem::drop($per_peer_state_lock);

$self.handle_monitor_update_completion_actions(update_actions);
}
}
)
$self.handle_monitor_update_completion_actions(update_actions);
}
update_completed
}};
}

Expand All @@ -3741,80 +3784,20 @@ macro_rules! handle_post_close_monitor_update {
/// drop the aforementioned peer state locks at a given callsite. In this situation, use this macro
/// to apply the monitor update immediately and handle the monitor update completion actions at a
/// later time.
macro_rules! handle_new_monitor_update_actions_deferred {
macro_rules! handle_new_monitor_update_locked_actions_handled_by_caller {
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $chan_context: expr
) => {{
let logger = WithChannelContext::from(&$self.logger, &$chan_context, None);
let chan_id = $chan_context.channel_id();
let counterparty_node_id = $chan_context.get_counterparty_node_id();
let in_flight_updates;
let idx;
handle_new_monitor_update_internal!(
let (update_completed, _all_updates_complete) = handle_new_monitor_update_internal(
$self,
&mut $peer_state.in_flight_monitor_updates,
$chan_context.channel_id(),
$funding_txo,
$chan_context.get_counterparty_node_id(),
$update,
$peer_state,
logger,
chan_id,
counterparty_node_id,
in_flight_updates,
idx,
{
let _ = in_flight_updates.remove(idx);
}
)
}};
}

macro_rules! handle_new_monitor_update_internal {
(
$self: ident, $funding_txo: expr, $update: expr, $peer_state: expr, $logger: expr,
$chan_id: expr, $counterparty_node_id: expr, $in_flight_updates: ident, $update_idx: ident,
$completed: expr
) => {{
$in_flight_updates = &mut $peer_state
.in_flight_monitor_updates
.entry($chan_id)
.or_insert_with(|| ($funding_txo, Vec::new()))
.1;
// During startup, we push monitor updates as background events through to here in
// order to replay updates that were in-flight when we shut down. Thus, we have to
// filter for uniqueness here.
$update_idx =
$in_flight_updates.iter().position(|upd| upd == &$update).unwrap_or_else(|| {
$in_flight_updates.push($update);
$in_flight_updates.len() - 1
});
if $self.background_events_processed_since_startup.load(Ordering::Acquire) {
let update_res =
$self.chain_monitor.update_channel($chan_id, &$in_flight_updates[$update_idx]);
let update_completed = handle_monitor_update_res($self, update_res, $chan_id, $logger);
if update_completed {
$completed;
}
update_completed
} else {
// We blindly assume that the ChannelMonitorUpdate will be regenerated on startup if we
// fail to persist it. This is a fairly safe assumption, however, since anything we do
// during the startup sequence should be replayed exactly if we immediately crash.
let event = BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id: $counterparty_node_id,
funding_txo: $funding_txo,
channel_id: $chan_id,
update: $in_flight_updates[$update_idx].clone(),
};
// We want to track the in-flight update both in `in_flight_monitor_updates` and in
// `pending_background_events` to avoid a race condition during
// `pending_background_events` processing where we complete one
// `ChannelMonitorUpdate` (but there are more pending as background events) but we
// conclude that all pending `ChannelMonitorUpdate`s have completed and its safe to
// run post-completion actions.
// We could work around that with some effort, but its simpler to just track updates
// twice.
$self.pending_background_events.lock().unwrap().push(event);
false
}
WithChannelContext::from(&$self.logger, &$chan_context, None),
);
update_completed
}};
}

Expand All @@ -3823,34 +3806,25 @@ macro_rules! handle_new_monitor_update {
$self: ident, $funding_txo: expr, $update: expr, $peer_state_lock: expr, $peer_state: expr,
$per_peer_state_lock: expr, $chan: expr
) => {{
let logger = WithChannelContext::from(&$self.logger, &$chan.context, None);
let chan_id = $chan.context.channel_id();
let counterparty_node_id = $chan.context.get_counterparty_node_id();
let in_flight_updates;
let idx;
handle_new_monitor_update_internal!(
let (update_completed, all_updates_complete) = handle_new_monitor_update_internal(
$self,
&mut $peer_state.in_flight_monitor_updates,
$chan.context.channel_id(),
$funding_txo,
$chan.context.get_counterparty_node_id(),
$update,
$peer_state,
logger,
chan_id,
counterparty_node_id,
in_flight_updates,
idx,
{
let _ = in_flight_updates.remove(idx);
if in_flight_updates.is_empty() && $chan.blocked_monitor_updates_pending() == 0 {
handle_monitor_update_completion!(
$self,
$peer_state_lock,
$peer_state,
$per_peer_state_lock,
$chan
);
}
}
)
WithChannelContext::from(&$self.logger, &$chan.context, None),
);
if all_updates_complete && $chan.blocked_monitor_updates_pending() == 0 {
handle_monitor_update_completion!(
$self,
$peer_state_lock,
$peer_state,
$per_peer_state_lock,
$chan
);
}
update_completed
}};
}

Expand Down Expand Up @@ -14360,7 +14334,7 @@ where
insert_short_channel_id!(short_to_chan_info, funded_channel);

if let Some(monitor_update) = monitor_update_opt {
handle_new_monitor_update_actions_deferred!(
handle_new_monitor_update_locked_actions_handled_by_caller!(
self,
funding_txo,
monitor_update,
Expand Down
Loading