Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanlint committed Jul 31, 2023
1 parent 5cddf5e commit 68cb520
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 31 deletions.
20 changes: 16 additions & 4 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ enum ChannelState {
/// We've successfully negotiated a closing_signed dance. At this point ChannelManager is about
/// to drop us, but we store this anyway.
ShutdownComplete = 4096,
Batched = 1 << 13,
}
const BOTH_SIDES_SHUTDOWN_MASK: u32 = ChannelState::LocalShutdownSent as u32 | ChannelState::RemoteShutdownSent as u32;
const MULTI_STATE_FLAGS: u32 = BOTH_SIDES_SHUTDOWN_MASK | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32;
Expand Down Expand Up @@ -896,6 +897,8 @@ pub(super) struct ChannelContext<Signer: ChannelSigner> {
/// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we
/// store it here and only release it to the `ChannelManager` once it asks for it.
blocked_monitor_updates: Vec<PendingChannelMonitorUpdate>,

is_batched: bool,
}

impl<Signer: ChannelSigner> ChannelContext<Signer> {
Expand Down Expand Up @@ -1940,7 +1943,7 @@ impl<Signer: ChannelSigner> ChannelContext<Signer> {

/// Returns transaction if there is pending funding transaction that is yet to broadcast
pub fn unbroadcasted_funding(&self) -> Option<Transaction> {
if self.channel_state & (ChannelState::FundingCreated as u32) != 0 {
if (self.channel_state & (ChannelState::FundingCreated as u32) != 0) || (self.channel_state & (ChannelState::Batched as u32) != 0) {
self.funding_transaction.clone()
} else {
None
Expand Down Expand Up @@ -2557,7 +2560,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
channel_monitor.provide_latest_counterparty_commitment_tx(counterparty_initial_bitcoin_tx.txid, Vec::new(), self.context.cur_counterparty_commitment_transaction_number, self.context.counterparty_cur_commitment_point.unwrap(), logger);

assert_eq!(self.context.channel_state & (ChannelState::MonitorUpdateInProgress as u32), 0); // We have no had any monitor(s) yet to fail update!
self.context.channel_state = ChannelState::FundingSent as u32;
if self.context.is_batched {
self.context.channel_state = ChannelState::FundingSent as u32 & ChannelState::Batched as u32;
} else {
self.context.channel_state = ChannelState::FundingSent as u32;
}
self.context.cur_holder_commitment_transaction_number -= 1;
self.context.cur_counterparty_commitment_transaction_number -= 1;

Expand Down Expand Up @@ -3656,7 +3663,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
// first received the funding_signed.
let mut funding_broadcastable =
if self.context.is_outbound() && self.context.channel_state & !MULTI_STATE_FLAGS >= ChannelState::FundingSent as u32 {
if self.context.is_outbound() && self.context.channel_state & !MULTI_STATE_FLAGS >= ChannelState::FundingSent as u32 && self.context.channel_state & ChannelState::Batched as u32 == 0 {
self.context.funding_transaction.take()
} else { None };
// That said, if the funding transaction is already confirmed (ie we're active with a
Expand Down Expand Up @@ -5715,6 +5722,8 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
channel_keys_id,

blocked_monitor_updates: Vec::new(),

is_batched: false,
},
unfunded_context: UnfundedChannelContext { unfunded_channel_age_ticks: 0 }
})
Expand All @@ -5735,7 +5744,7 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
/// Note that channel_id changes during this call!
/// Do NOT broadcast the funding transaction until after a successful funding_signed call!
/// If an Err is returned, it is a ChannelError::Close.
pub fn get_funding_created<L: Deref>(mut self, funding_transaction: Transaction, funding_txo: OutPoint, logger: &L)
pub fn get_funding_created<L: Deref>(mut self, funding_transaction: Transaction, funding_txo: OutPoint, is_batched: bool, logger: &L)
-> Result<(Channel<Signer>, msgs::FundingCreated), (Self, ChannelError)> where L::Target: Logger {
if !self.context.is_outbound() {
panic!("Tried to create outbound funding_created message on an inbound channel!");
Expand Down Expand Up @@ -5768,6 +5777,7 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
self.context.channel_state = ChannelState::FundingCreated as u32;
self.context.channel_id = funding_txo.to_channel_id();
self.context.funding_transaction = Some(funding_transaction);
self.context.is_batched = is_batched;

let channel = Channel {
context: self.context,
Expand Down Expand Up @@ -6347,6 +6357,7 @@ impl<Signer: WriteableEcdsaChannelSigner> InboundV1Channel<Signer> {

channel_type,
channel_keys_id,
is_batched: false,

blocked_monitor_updates: Vec::new(),
},
Expand Down Expand Up @@ -7441,6 +7452,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
channel_keys_id,

blocked_monitor_updates: blocked_monitor_updates.unwrap(),
is_batched: false,
}
})
}
Expand Down
87 changes: 60 additions & 27 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,7 @@ where
/// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
/// Notifier the lock contains sends out a notification when the lock is released.
total_consistency_lock: RwLock<()>,
batch_funding_states: FairRwLock<HashMap<Txid, Mutex<HashMap<[u8;32], bool>>>>,

background_events_processed_since_startup: AtomicBool,

Expand Down Expand Up @@ -1871,6 +1872,20 @@ macro_rules! emit_channel_ready_event {

macro_rules! handle_monitor_update_completion {
($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
let mut batch_funding_transaction = None;
{
let batch_funding_states = $self.batch_funding_states.read().unwrap();
let batch_funding_state = $chan.context.unbroadcasted_funding()
.and_then(|tx| batch_funding_states.get(&tx.txid()))
.map(|state| state.lock().unwrap());
if let Some(mut batch_funding_state) = batch_funding_state {
batch_funding_state.insert($chan.context.channel_id(), true);
if batch_funding_state.values().all(|v| *v) {
batch_funding_transaction = $chan.context.unbroadcasted_funding();
}
}
}

let mut updates = $chan.monitor_updating_restored(&$self.logger,
&$self.node_signer, $self.genesis_hash, &$self.default_configuration,
$self.best_block.read().unwrap().height());
Expand All @@ -1895,8 +1910,8 @@ macro_rules! handle_monitor_update_completion {
let htlc_forwards = $self.handle_channel_resumption(
&mut $peer_state.pending_msg_events, $chan, updates.raa,
updates.commitment_update, updates.order, updates.accepted_htlcs,
updates.funding_broadcastable, updates.channel_ready,
updates.announcement_sigs);
batch_funding_transaction.or(updates.funding_broadcastable),
updates.channel_ready, updates.announcement_sigs);
if let Some(upd) = channel_update {
$peer_state.pending_msg_events.push(upd);
}
Expand Down Expand Up @@ -2119,6 +2134,7 @@ where
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),
batch_funding_states: FairRwLock::new(HashMap::new()),

entropy_source,
node_signer,
Expand Down Expand Up @@ -3354,7 +3370,7 @@ where
/// Handles the generation of a funding transaction, optionally (for tests) with a function
/// which checks the correctness of the funding transaction given the associated channel.
fn funding_transaction_generated_intern<FundingOutput: Fn(&OutboundV1Channel<<SP::Target as SignerProvider>::Signer>, &Transaction) -> Result<OutPoint, APIError>>(
&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, is_batched: bool, find_funding_output: FundingOutput
) -> Result<(), APIError> {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
Expand All @@ -3366,7 +3382,7 @@ where
Some(chan) => {
let funding_txo = find_funding_output(&chan, &funding_transaction)?;

let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger)
let funding_res = chan.get_funding_created(funding_transaction, funding_txo, is_batched, &self.logger)
.map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e {
let channel_id = chan.context.channel_id();
let user_id = chan.context.get_user_id();
Expand Down Expand Up @@ -3452,6 +3468,10 @@ where
/// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady
/// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed
pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> {
self.batch_funding_transaction_generated(&[(temporary_channel_id, counterparty_node_id)], funding_transaction)
}

pub fn batch_funding_transaction_generated(&self, temporary_channels: &[(&[u8; 32], &PublicKey)], funding_transaction: Transaction) -> Result<(), APIError> {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);

for inp in funding_transaction.input.iter() {
Expand All @@ -3473,32 +3493,44 @@ where
});
}
}
self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| {
if tx.output.len() > u16::max_value() as usize {
return Err(APIError::APIMisuseError {
err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
});
}

let mut output_index = None;
let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
for (idx, outp) in tx.output.iter().enumerate() {
if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
if output_index.is_some() {
return Err(APIError::APIMisuseError {
err: "Multiple outputs matched the expected script and value".to_owned()
});
let is_batched = temporary_channels.len() > 1;
let batch_funding_state = RefCell::new(HashMap::new());
for (temporary_channel_id, counterparty_node_id) in temporary_channels {
self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction.clone(), is_batched, |chan, tx| {
if tx.output.len() > u16::max_value() as usize {
return Err(APIError::APIMisuseError {
err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
});
}

let mut output_index = None;
let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
for (idx, outp) in tx.output.iter().enumerate() {
if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
if output_index.is_some() {
return Err(APIError::APIMisuseError {
err: "Multiple outputs matched the expected script and value".to_owned()
});
}
output_index = Some(idx as u16);
}
output_index = Some(idx as u16);
}
}
if output_index.is_none() {
return Err(APIError::APIMisuseError {
err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
});
}
Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() })
})
if output_index.is_none() {
return Err(APIError::APIMisuseError {
err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
});
}
let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() };
batch_funding_state.borrow_mut().insert(outpoint.to_channel_id(), false);
Ok(outpoint)
})?;
}
if is_batched {
let mut batch_funding_states = self.batch_funding_states.write().unwrap();
batch_funding_states.insert(funding_transaction.txid(), Mutex::new(batch_funding_state.into_inner()));
}
Ok(())
}

/// Atomically applies partial updates to the [`ChannelConfig`] of the given channels.
Expand Down Expand Up @@ -9067,6 +9099,7 @@ where
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),
batch_funding_states: FairRwLock::new(HashMap::new()),

entropy_source: args.entropy_source,
node_signer: args.node_signer,
Expand Down

0 comments on commit 68cb520

Please sign in to comment.