Skip to content

Commit

Permalink
Refactor ShutdownResult type and construction
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanlint committed Sep 28, 2023
1 parent ba2980e commit 9c2ee48
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 56 deletions.
78 changes: 51 additions & 27 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -540,18 +540,16 @@ pub(super) struct ReestablishResponses {
pub shutdown_msg: Option<msgs::Shutdown>,
}

/// The return type of `force_shutdown`
///
/// Contains a tuple with the following:
/// - An optional (counterparty_node_id, funding_txo, [`ChannelMonitorUpdate`]) tuple
/// - A list of HTLCs to fail back in the form of the (source, payment hash, and this channel's
/// counterparty_node_id and channel_id).
/// - An optional transaction id identifying a corresponding batch funding transaction.
pub(crate) type ShutdownResult = (
Option<(PublicKey, OutPoint, ChannelMonitorUpdate)>,
Vec<(HTLCSource, PaymentHash, PublicKey, ChannelId)>,
Option<Txid>
);
/// The result of a shutdown that should be handled.
pub(crate) struct ShutdownResult {
/// A channel monitor update to apply.
pub(crate) monitor_update: Option<(PublicKey, OutPoint, ChannelMonitorUpdate)>,
/// A list of dropped outbound HTLCs that can safely be failed backwards immediately.
pub(crate) dropped_outbound_htlcs: Vec<(HTLCSource, PaymentHash, PublicKey, ChannelId)>,
/// An unbroadcasted batch funding transaction id. The closure of this channel should be
/// propagated to the remainder of the batch.
pub(crate) unbroadcasted_batch_funding_txid: Option<Txid>,
}

/// If the majority of the channels funds are to the fundee and the initiator holds only just
/// enough funds to cover their reserve value, channels are at risk of getting "stuck". Because the
Expand Down Expand Up @@ -2052,7 +2050,11 @@ impl<SP: Deref> ChannelContext<SP> where SP::Target: SignerProvider {

self.channel_state = ChannelState::ShutdownComplete as u32;
self.update_time_counter += 1;
(monitor_update, dropped_outbound_htlcs, unbroadcasted_batch_funding_txid)
ShutdownResult {
monitor_update,
dropped_outbound_htlcs,
unbroadcasted_batch_funding_txid,
}
}
}

Expand Down Expand Up @@ -4207,18 +4209,18 @@ impl<SP: Deref> Channel<SP> where

pub fn maybe_propose_closing_signed<F: Deref, L: Deref>(
&mut self, fee_estimator: &LowerBoundedFeeEstimator<F>, logger: &L)
-> Result<(Option<msgs::ClosingSigned>, Option<Transaction>), ChannelError>
-> Result<(Option<msgs::ClosingSigned>, Option<Transaction>, Option<ShutdownResult>), ChannelError>
where F::Target: FeeEstimator, L::Target: Logger
{
if self.context.last_sent_closing_fee.is_some() || !self.closing_negotiation_ready() {
return Ok((None, None));
return Ok((None, None, None));
}

if !self.context.is_outbound() {
if let Some(msg) = &self.context.pending_counterparty_closing_signed.take() {
return self.closing_signed(fee_estimator, &msg);
}
return Ok((None, None));
return Ok((None, None, None));
}

let (our_min_fee, our_max_fee) = self.calculate_closing_fee_limits(fee_estimator);
Expand All @@ -4243,7 +4245,7 @@ impl<SP: Deref> Channel<SP> where
min_fee_satoshis: our_min_fee,
max_fee_satoshis: our_max_fee,
}),
}), None))
}), None, None))
}
}
}
Expand Down Expand Up @@ -4392,7 +4394,7 @@ impl<SP: Deref> Channel<SP> where

pub fn closing_signed<F: Deref>(
&mut self, fee_estimator: &LowerBoundedFeeEstimator<F>, msg: &msgs::ClosingSigned)
-> Result<(Option<msgs::ClosingSigned>, Option<Transaction>), ChannelError>
-> Result<(Option<msgs::ClosingSigned>, Option<Transaction>, Option<ShutdownResult>), ChannelError>
where F::Target: FeeEstimator
{
if self.context.channel_state & BOTH_SIDES_SHUTDOWN_MASK != BOTH_SIDES_SHUTDOWN_MASK {
Expand All @@ -4414,7 +4416,7 @@ impl<SP: Deref> Channel<SP> where

if self.context.channel_state & ChannelState::MonitorUpdateInProgress as u32 != 0 {
self.context.pending_counterparty_closing_signed = Some(msg.clone());
return Ok((None, None));
return Ok((None, None, None));
}

let funding_redeemscript = self.context.get_funding_redeemscript();
Expand Down Expand Up @@ -4444,10 +4446,15 @@ impl<SP: Deref> Channel<SP> where
assert!(self.context.shutdown_scriptpubkey.is_some());
if let Some((last_fee, sig)) = self.context.last_sent_closing_fee {
if last_fee == msg.fee_satoshis {
let shutdown_result = ShutdownResult {
monitor_update: None,
dropped_outbound_htlcs: Vec::new(),
unbroadcasted_batch_funding_txid: self.context.unbroadcasted_batch_funding_txid(),
};
let tx = self.build_signed_closing_transaction(&mut closing_tx, &msg.signature, &sig);
self.context.channel_state = ChannelState::ShutdownComplete as u32;
self.context.update_time_counter += 1;
return Ok((None, Some(tx)));
return Ok((None, Some(tx), Some(shutdown_result)));
}
}

Expand All @@ -4466,13 +4473,22 @@ impl<SP: Deref> Channel<SP> where
let sig = ecdsa
.sign_closing_transaction(&closing_tx, &self.context.secp_ctx)
.map_err(|_| ChannelError::Close("External signer refused to sign closing transaction".to_owned()))?;

let signed_tx = if $new_fee == msg.fee_satoshis {
let signed_tx;
let shutdown_result;
if $new_fee == msg.fee_satoshis {
shutdown_result = Some(ShutdownResult {
monitor_update: None,
dropped_outbound_htlcs: Vec::new(),
unbroadcasted_batch_funding_txid: self.context.unbroadcasted_batch_funding_txid(),
});
self.context.channel_state = ChannelState::ShutdownComplete as u32;
self.context.update_time_counter += 1;
let tx = self.build_signed_closing_transaction(&closing_tx, &msg.signature, &sig);
Some(tx)
} else { None };
signed_tx = Some(tx);
} else {
shutdown_result = None;
signed_tx = None;
};

self.context.last_sent_closing_fee = Some((used_fee, sig.clone()));
Ok((Some(msgs::ClosingSigned {
Expand All @@ -4483,7 +4499,7 @@ impl<SP: Deref> Channel<SP> where
min_fee_satoshis: our_min_fee,
max_fee_satoshis: our_max_fee,
}),
}), signed_tx))
}), signed_tx, shutdown_result))
}
}
}
Expand Down Expand Up @@ -5557,7 +5573,7 @@ impl<SP: Deref> Channel<SP> where
/// [`ChannelMonitorUpdate`] will be returned).
pub fn get_shutdown(&mut self, signer_provider: &SP, their_features: &InitFeatures,
target_feerate_sats_per_kw: Option<u32>, override_shutdown_script: Option<ShutdownScript>)
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>), APIError>
-> Result<(msgs::Shutdown, Option<ChannelMonitorUpdate>, Vec<(HTLCSource, PaymentHash)>, Option<ShutdownResult>), APIError>
{
for htlc in self.context.pending_outbound_htlcs.iter() {
if let OutboundHTLCState::LocalAnnounced(_) = htlc.state {
Expand Down Expand Up @@ -5611,11 +5627,19 @@ impl<SP: Deref> Channel<SP> where
};

// From here on out, we may not fail!
let shutdown_result;
let unbroadcasted_batch_funding_txid = self.context.unbroadcasted_batch_funding_txid();
self.context.target_closing_feerate_sats_per_kw = target_feerate_sats_per_kw;
if self.context.channel_state & !STATE_FLAGS < ChannelState::FundingSent as u32 {
self.context.channel_state = ChannelState::ShutdownComplete as u32;
shutdown_result = Some(ShutdownResult {
monitor_update: None,
dropped_outbound_htlcs: Vec::new(),
unbroadcasted_batch_funding_txid,
});
} else {
self.context.channel_state |= ChannelState::LocalShutdownSent as u32;
shutdown_result = None;
}
self.context.update_time_counter += 1;

Expand Down Expand Up @@ -5652,7 +5676,7 @@ impl<SP: Deref> Channel<SP> where
debug_assert!(!self.is_shutdown() || monitor_update.is_none(),
"we can't both complete shutdown and return a monitor update");

Ok((shutdown, monitor_update, dropped_outbound_htlcs))
Ok((shutdown, monitor_update, dropped_outbound_htlcs, shutdown_result))
}

pub fn inflight_htlc_sources(&self) -> impl Iterator<Item=(&HTLCSource, &PaymentHash)> {
Expand Down
52 changes: 23 additions & 29 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2548,7 +2548,7 @@ where
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);

let mut failed_htlcs: Vec<(HTLCSource, PaymentHash)>;
let mut shutdown_result = None;
let shutdown_result;
loop {
let per_peer_state = self.per_peer_state.read().unwrap();

Expand All @@ -2563,10 +2563,10 @@ where
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
let funding_txo_opt = chan.context.get_funding_txo();
let their_features = &peer_state.latest_features;
let unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
let (shutdown_msg, mut monitor_update_opt, htlcs) =
let (shutdown_msg, mut monitor_update_opt, htlcs, local_shutdown_result) =
chan.get_shutdown(&self.signer_provider, their_features, target_feerate_sats_per_1000_weight, override_shutdown_script)?;
failed_htlcs = htlcs;
shutdown_result = local_shutdown_result;

// We can send the `shutdown` message before updating the `ChannelMonitor`
// here as we don't need the monitor update to complete until we send a
Expand Down Expand Up @@ -2594,7 +2594,6 @@ where
});
}
self.issue_channel_close_events(&chan.context, ClosureReason::HolderForceClosed);
shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
}
}
break;
Expand Down Expand Up @@ -2684,30 +2683,29 @@ where
self.close_channel_internal(channel_id, counterparty_node_id, target_feerate_sats_per_1000_weight, shutdown_script)
}

fn finish_close_channel(&self, shutdown_res: ShutdownResult) {
fn finish_close_channel(&self, mut shutdown_res: ShutdownResult) {
debug_assert_ne!(self.per_peer_state.held_by_thread(), LockHeldState::HeldByThread);
#[cfg(debug_assertions)]
for (_, peer) in self.per_peer_state.read().unwrap().iter() {
debug_assert_ne!(peer.held_by_thread(), LockHeldState::HeldByThread);
}

let (monitor_update_option, mut failed_htlcs, unbroadcasted_batch_funding_txid) = shutdown_res;
log_debug!(self.logger, "Finishing closure of channel with {} HTLCs to fail", failed_htlcs.len());
for htlc_source in failed_htlcs.drain(..) {
log_debug!(self.logger, "Finishing closure of channel with {} HTLCs to fail", shutdown_res.dropped_outbound_htlcs.len());
for htlc_source in shutdown_res.dropped_outbound_htlcs.drain(..) {
let (source, payment_hash, counterparty_node_id, channel_id) = htlc_source;
let reason = HTLCFailReason::from_failure_code(0x4000 | 8);
let receiver = HTLCDestination::NextHopChannel { node_id: Some(counterparty_node_id), channel_id };
self.fail_htlc_backwards_internal(&source, &payment_hash, &reason, receiver);
}
if let Some((_, funding_txo, monitor_update)) = monitor_update_option {
if let Some((_, funding_txo, monitor_update)) = shutdown_res.monitor_update {
// There isn't anything we can do if we get an update failure - we're already
// force-closing. The monitor update on the required in-memory copy should broadcast
// the latest local state, which is the best we can do anyway. Thus, it is safe to
// ignore the result here.
let _ = self.chain_monitor.update_channel(funding_txo, &monitor_update);
}
let mut shutdown_results = Vec::new();
if let Some(txid) = unbroadcasted_batch_funding_txid {
if let Some(txid) = shutdown_res.unbroadcasted_batch_funding_txid {
let mut funding_batch_states = self.funding_batch_states.lock().unwrap();
let affected_channels = funding_batch_states.remove(&txid).into_iter().flatten();
let per_peer_state = self.per_peer_state.read().unwrap();
Expand Down Expand Up @@ -6242,22 +6240,19 @@ where
}

fn internal_closing_signed(&self, counterparty_node_id: &PublicKey, msg: &msgs::ClosingSigned) -> Result<(), MsgHandleErrInternal> {
let mut shutdown_result = None;
let unbroadcasted_batch_funding_txid;
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
.ok_or_else(|| {
debug_assert!(false);
MsgHandleErrInternal::send_err_msg_no_close(format!("Can't find a peer matching the passed counterparty node_id {}", counterparty_node_id), msg.channel_id)
})?;
let (tx, chan_option) = {
let (tx, chan_option, shutdown_result) = {
let mut peer_state_lock = peer_state_mutex.lock().unwrap();
let peer_state = &mut *peer_state_lock;
match peer_state.channel_by_id.entry(msg.channel_id.clone()) {
hash_map::Entry::Occupied(mut chan_phase_entry) => {
if let ChannelPhase::Funded(chan) = chan_phase_entry.get_mut() {
unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
let (closing_signed, tx) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
let (closing_signed, tx, shutdown_result) = try_chan_phase_entry!(self, chan.closing_signed(&self.fee_estimator, &msg), chan_phase_entry);
if let Some(msg) = closing_signed {
peer_state.pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
node_id: counterparty_node_id.clone(),
Expand All @@ -6270,8 +6265,8 @@ where
// also implies there are no pending HTLCs left on the channel, so we can
// fully delete it from tracking (the channel monitor is still around to
// watch for old state broadcasts)!
(tx, Some(remove_channel_phase!(self, chan_phase_entry)))
} else { (tx, None) }
(tx, Some(remove_channel_phase!(self, chan_phase_entry)), shutdown_result)
} else { (tx, None, shutdown_result) }
} else {
return try_chan_phase_entry!(self, Err(ChannelError::Close(
"Got a closing_signed message for an unfunded channel!".into())), chan_phase_entry);
Expand All @@ -6293,7 +6288,6 @@ where
});
}
self.issue_channel_close_events(&chan.context, ClosureReason::CooperativeClosure);
shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
}
mem::drop(per_peer_state);
if let Some(shutdown_result) = shutdown_result {
Expand Down Expand Up @@ -6972,8 +6966,7 @@ where
fn maybe_generate_initial_closing_signed(&self) -> bool {
let mut handle_errors: Vec<(PublicKey, Result<(), _>)> = Vec::new();
let mut has_update = false;
let mut shutdown_result = None;
let mut unbroadcasted_batch_funding_txid = None;
let mut shutdown_results = Vec::new();
{
let per_peer_state = self.per_peer_state.read().unwrap();

Expand All @@ -6984,15 +6977,17 @@ where
peer_state.channel_by_id.retain(|channel_id, phase| {
match phase {
ChannelPhase::Funded(chan) => {
unbroadcasted_batch_funding_txid = chan.context.unbroadcasted_batch_funding_txid();
match chan.maybe_propose_closing_signed(&self.fee_estimator, &self.logger) {
Ok((msg_opt, tx_opt)) => {
Ok((msg_opt, tx_opt, shutdown_result_opt)) => {
if let Some(msg) = msg_opt {
has_update = true;
pending_msg_events.push(events::MessageSendEvent::SendClosingSigned {
node_id: chan.context.get_counterparty_node_id(), msg,
});
}
if let Some(shutdown_result) = shutdown_result_opt {
shutdown_results.push(shutdown_result);
}
if let Some(tx) = tx_opt {
// We're done with this channel. We got a closing_signed and sent back
// a closing_signed with a closing transaction to broadcast.
Expand All @@ -7007,7 +7002,6 @@ where
log_info!(self.logger, "Broadcasting {}", log_tx!(tx));
self.tx_broadcaster.broadcast_transactions(&[&tx]);
update_maps_on_chan_removal!(self, &chan.context);
shutdown_result = Some((None, Vec::new(), unbroadcasted_batch_funding_txid));
false
} else { true }
},
Expand All @@ -7029,7 +7023,7 @@ where
let _ = handle_error!(self, err, counterparty_node_id);
}

if let Some(shutdown_result) = shutdown_result {
for shutdown_result in shutdown_results.drain(..) {
self.finish_close_channel(shutdown_result);
}

Expand All @@ -7048,7 +7042,7 @@ where
// Channel::force_shutdown tries to make us do) as we may still be in initialization,
// so we track the update internally and handle it when the user next calls
// timer_tick_occurred, guaranteeing we're running normally.
if let Some((counterparty_node_id, funding_txo, update)) = failure.0.take() {
if let Some((counterparty_node_id, funding_txo, update)) = failure.monitor_update.take() {
assert_eq!(update.updates.len(), 1);
if let ChannelMonitorUpdateStep::ChannelForceClosed { should_broadcast } = update.updates[0] {
assert!(should_broadcast);
Expand Down Expand Up @@ -9266,16 +9260,16 @@ where
log_error!(args.logger, " The ChannelMonitor for channel {} is at counterparty commitment transaction number {} but the ChannelManager is at counterparty commitment transaction number {}.",
&channel.context.channel_id(), monitor.get_cur_counterparty_commitment_number(), channel.get_cur_counterparty_commitment_transaction_number());
}
let (monitor_update, mut new_failed_htlcs, batch_funding_txid) = channel.context.force_shutdown(true);
if batch_funding_txid.is_some() {
let mut shutdown_result = channel.context.force_shutdown(true);
if shutdown_result.unbroadcasted_batch_funding_txid.is_some() {
return Err(DecodeError::InvalidValue);
}
if let Some((counterparty_node_id, funding_txo, update)) = monitor_update {
if let Some((counterparty_node_id, funding_txo, update)) = shutdown_result.monitor_update {
close_background_events.push(BackgroundEvent::MonitorUpdateRegeneratedOnStartup {
counterparty_node_id, funding_txo, update
});
}
failed_htlcs.append(&mut new_failed_htlcs);
failed_htlcs.append(&mut shutdown_result.dropped_outbound_htlcs);
channel_closures.push_back((events::Event::ChannelClosed {
channel_id: channel.context.channel_id(),
user_channel_id: channel.context.get_user_id(),
Expand Down

0 comments on commit 9c2ee48

Please sign in to comment.