Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
wvanlint committed Aug 8, 2023
1 parent 6f58072 commit 65a4687
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 36 deletions.
30 changes: 23 additions & 7 deletions lightning/src/ln/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,7 @@ enum ChannelState {
/// We've successfully negotiated a closing_signed dance. At this point ChannelManager is about
/// to drop us, but we store this anyway.
ShutdownComplete = 4096,
WaitingForBatch = 1 << 13,
}
const BOTH_SIDES_SHUTDOWN_MASK: u32 = ChannelState::LocalShutdownSent as u32 | ChannelState::RemoteShutdownSent as u32;
const MULTI_STATE_FLAGS: u32 = BOTH_SIDES_SHUTDOWN_MASK | ChannelState::PeerDisconnected as u32 | ChannelState::MonitorUpdateInProgress as u32;
Expand Down Expand Up @@ -896,6 +897,8 @@ pub(super) struct ChannelContext<Signer: ChannelSigner> {
/// If we can't release a [`ChannelMonitorUpdate`] until some external action completes, we
/// store it here and only release it to the `ChannelManager` once it asks for it.
blocked_monitor_updates: Vec<PendingChannelMonitorUpdate>,

is_batched: bool,
}

impl<Signer: ChannelSigner> ChannelContext<Signer> {
Expand Down Expand Up @@ -1940,7 +1943,7 @@ impl<Signer: ChannelSigner> ChannelContext<Signer> {

/// Returns transaction if there is pending funding transaction that is yet to broadcast
pub fn unbroadcasted_funding(&self) -> Option<Transaction> {
if self.channel_state & (ChannelState::FundingCreated as u32) != 0 {
if (self.channel_state & (ChannelState::FundingCreated as u32) != 0) || (self.channel_state & (ChannelState::WaitingForBatch as u32) != 0) {
self.funding_transaction.clone()
} else {
None
Expand Down Expand Up @@ -2557,7 +2560,11 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
channel_monitor.provide_latest_counterparty_commitment_tx(counterparty_initial_bitcoin_tx.txid, Vec::new(), self.context.cur_counterparty_commitment_transaction_number, self.context.counterparty_cur_commitment_point.unwrap(), logger);

assert_eq!(self.context.channel_state & (ChannelState::MonitorUpdateInProgress as u32), 0); // We have no had any monitor(s) yet to fail update!
self.context.channel_state = ChannelState::FundingSent as u32;
if self.context.is_batched {
self.context.channel_state = ChannelState::FundingSent as u32 | ChannelState::WaitingForBatch as u32;
} else {
self.context.channel_state = ChannelState::FundingSent as u32;
}
self.context.cur_holder_commitment_transaction_number -= 1;
self.context.cur_counterparty_commitment_transaction_number -= 1;

Expand All @@ -2568,6 +2575,10 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
Ok(channel_monitor)
}

pub fn set_batch_ready(&mut self) {
self.context.channel_state &= !(ChannelState::WaitingForBatch as u32);
}

/// Handles a channel_ready message from our peer. If we've already sent our channel_ready
/// and the channel is now usable (and public), this may generate an announcement_signatures to
/// reply with.
Expand Down Expand Up @@ -3656,7 +3667,7 @@ impl<Signer: WriteableEcdsaChannelSigner> Channel<Signer> {
// (re-)broadcast the funding transaction as we may have declined to broadcast it when we
// first received the funding_signed.
let mut funding_broadcastable =
if self.context.is_outbound() && self.context.channel_state & !MULTI_STATE_FLAGS >= ChannelState::FundingSent as u32 {
if self.context.is_outbound() && self.context.channel_state & !MULTI_STATE_FLAGS >= ChannelState::FundingSent as u32 && self.context.channel_state & ChannelState::WaitingForBatch as u32 == 0 {
self.context.funding_transaction.take()
} else { None };
// That said, if the funding transaction is already confirmed (ie we're active with a
Expand Down Expand Up @@ -5715,6 +5726,8 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
channel_keys_id,

blocked_monitor_updates: Vec::new(),

is_batched: false,
},
unfunded_context: UnfundedChannelContext { unfunded_channel_age_ticks: 0 }
})
Expand All @@ -5735,7 +5748,7 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
/// Note that channel_id changes during this call!
/// Do NOT broadcast the funding transaction until after a successful funding_signed call!
/// If an Err is returned, it is a ChannelError::Close.
pub fn get_funding_created<L: Deref>(mut self, funding_transaction: Transaction, funding_txo: OutPoint, logger: &L)
pub fn get_funding_created<L: Deref>(mut self, funding_transaction: Transaction, funding_txo: OutPoint, is_batched: bool, logger: &L)
-> Result<(Channel<Signer>, msgs::FundingCreated), (Self, ChannelError)> where L::Target: Logger {
if !self.context.is_outbound() {
panic!("Tried to create outbound funding_created message on an inbound channel!");
Expand Down Expand Up @@ -5768,6 +5781,7 @@ impl<Signer: WriteableEcdsaChannelSigner> OutboundV1Channel<Signer> {
self.context.channel_state = ChannelState::FundingCreated as u32;
self.context.channel_id = funding_txo.to_channel_id();
self.context.funding_transaction = Some(funding_transaction);
self.context.is_batched = is_batched;

let channel = Channel {
context: self.context,
Expand Down Expand Up @@ -6347,6 +6361,7 @@ impl<Signer: WriteableEcdsaChannelSigner> InboundV1Channel<Signer> {

channel_type,
channel_keys_id,
is_batched: false,

blocked_monitor_updates: Vec::new(),
},
Expand Down Expand Up @@ -7441,6 +7456,7 @@ impl<'a, 'b, 'c, ES: Deref, SP: Deref> ReadableArgs<(&'a ES, &'b SP, u32, &'c Ch
channel_keys_id,

blocked_monitor_updates: blocked_monitor_updates.unwrap(),
is_batched: false,
}
})
}
Expand Down Expand Up @@ -7638,7 +7654,7 @@ mod tests {
value: 10000000, script_pubkey: output_script.clone(),
}]};
let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap();
let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap();
let (_, funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).map_err(|_| ()).unwrap();

// Node B --> Node A: funding signed
Expand Down Expand Up @@ -7765,7 +7781,7 @@ mod tests {
value: 10000000, script_pubkey: output_script.clone(),
}]};
let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap();
let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap();
let (mut node_b_chan, funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).map_err(|_| ()).unwrap();

// Node B --> Node A: funding signed
Expand Down Expand Up @@ -7953,7 +7969,7 @@ mod tests {
value: 10000000, script_pubkey: output_script.clone(),
}]};
let funding_outpoint = OutPoint{ txid: tx.txid(), index: 0 };
let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, &&logger).map_err(|_| ()).unwrap();
let (mut node_a_chan, funding_created_msg) = node_a_chan.get_funding_created(tx.clone(), funding_outpoint, false, &&logger).map_err(|_| ()).unwrap();
let (_, funding_signed_msg, _) = node_b_chan.funding_created(&funding_created_msg, best_block, &&keys_provider, &&logger).map_err(|_| ()).unwrap();

// Node B --> Node A: funding signed
Expand Down
124 changes: 96 additions & 28 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,6 +1120,9 @@ where
/// `PersistenceNotifierGuard::notify_on_drop(..)` and pass the lock to it, to ensure the
/// Notifier the lock contains sends out a notification when the lock is released.
total_consistency_lock: RwLock<()>,
/// Tracks the progress of the channels in a batched funding transaction by whether
/// funding_signed was received and the monitor has been persisted.
v1_channel_batch_states: FairRwLock<HashMap<Txid, Mutex<HashMap<([u8;32], PublicKey), bool>>>>,

background_events_processed_since_startup: AtomicBool,

Expand Down Expand Up @@ -1871,6 +1874,31 @@ macro_rules! emit_channel_ready_event {

macro_rules! handle_monitor_update_completion {
($self: ident, $peer_state_lock: expr, $peer_state: expr, $per_peer_state_lock: expr, $chan: expr) => { {
// If the channel belongs to a batched funding transaction, the progress of the batch
// should be updated as we have received funding_signed and persisted the monitor.
println!("TRYTINGIN");
let mut completed_batch = None;
{
let v1_channel_batch_states = $self.v1_channel_batch_states.read().unwrap();
println!("{:?}", $chan.context.unbroadcasted_funding());
println!("{:?}", v1_channel_batch_states.keys());
let batch_state_key_value = $chan.context.unbroadcasted_funding()
.and_then(|tx| v1_channel_batch_states.get_key_value(&tx.txid()));
if let Some((txid, batch_state)) = batch_state_key_value {
println!("Got batch state");
let mut batch_state = batch_state.lock().unwrap();
batch_state.insert(
($chan.context.channel_id(), $chan.context.get_counterparty_node_id()),
true,
);
if batch_state.values().all(|v| *v) {
println!("Got all of them!");
$chan.set_batch_ready();
completed_batch = Some(txid.clone());
}
}
}

let mut updates = $chan.monitor_updating_restored(&$self.logger,
&$self.node_signer, $self.genesis_hash, &$self.default_configuration,
$self.best_block.read().unwrap().height());
Expand All @@ -1895,14 +1923,36 @@ macro_rules! handle_monitor_update_completion {
let htlc_forwards = $self.handle_channel_resumption(
&mut $peer_state.pending_msg_events, $chan, updates.raa,
updates.commitment_update, updates.order, updates.accepted_htlcs,
updates.funding_broadcastable, updates.channel_ready,
updates.announcement_sigs);
updates.funding_broadcastable, updates.channel_ready, updates.announcement_sigs);
if let Some(upd) = channel_update {
$peer_state.pending_msg_events.push(upd);
}

let channel_id = $chan.context.channel_id();
core::mem::drop($peer_state_lock);

// When all channels in a batched funding transaction have become ready, it is not necessary
// to track the progress of the batch anymore and the state of the channels can be updated.
if let Some(txid) = completed_batch {
let other_batched_channel_ids = $self.v1_channel_batch_states.write().unwrap()
.remove(&txid)
.map(|batch_state| batch_state.into_inner().unwrap().into_keys())
.into_iter().flatten()
.filter(|(other_channel_id, _counterparty_node_id)| other_channel_id != &channel_id);
for (other_channel_id, counterparty_node_id) in other_batched_channel_ids {
let peer_state = $per_peer_state_lock.get(&counterparty_node_id)
.map(|peer_state| peer_state.lock().unwrap());
if let Some(mut peer_state) = peer_state {
peer_state.channel_by_id.get_mut(&other_channel_id)
.map(|channel| {
let mut pending_events = $self.pending_events.lock().unwrap();
channel.set_batch_ready();
emit_channel_pending_event!(pending_events, channel);
});
}
}
}

core::mem::drop($per_peer_state_lock);

$self.handle_monitor_update_completion_actions(update_actions);
Expand Down Expand Up @@ -2119,6 +2169,7 @@ where
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),
v1_channel_batch_states: FairRwLock::new(HashMap::new()),

entropy_source,
node_signer,
Expand Down Expand Up @@ -3354,7 +3405,7 @@ where
/// Handles the generation of a funding transaction, optionally (for tests) with a function
/// which checks the correctness of the funding transaction given the associated channel.
fn funding_transaction_generated_intern<FundingOutput: Fn(&OutboundV1Channel<<SP::Target as SignerProvider>::Signer>, &Transaction) -> Result<OutPoint, APIError>>(
&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, find_funding_output: FundingOutput
&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, is_batched: bool, find_funding_output: FundingOutput
) -> Result<(), APIError> {
let per_peer_state = self.per_peer_state.read().unwrap();
let peer_state_mutex = per_peer_state.get(counterparty_node_id)
Expand All @@ -3366,7 +3417,7 @@ where
Some(chan) => {
let funding_txo = find_funding_output(&chan, &funding_transaction)?;

let funding_res = chan.get_funding_created(funding_transaction, funding_txo, &self.logger)
let funding_res = chan.get_funding_created(funding_transaction, funding_txo, is_batched, &self.logger)
.map_err(|(mut chan, e)| if let ChannelError::Close(msg) = e {
let channel_id = chan.context.channel_id();
let user_id = chan.context.get_user_id();
Expand Down Expand Up @@ -3416,7 +3467,7 @@ where

#[cfg(test)]
pub(crate) fn funding_transaction_generated_unchecked(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction, output_index: u16) -> Result<(), APIError> {
self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |_, tx| {
self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, false, |_, tx| {
Ok(OutPoint { txid: tx.txid(), index: output_index })
})
}
Expand Down Expand Up @@ -3452,6 +3503,10 @@ where
/// [`Event::FundingGenerationReady`]: crate::events::Event::FundingGenerationReady
/// [`Event::ChannelClosed`]: crate::events::Event::ChannelClosed
pub fn funding_transaction_generated(&self, temporary_channel_id: &[u8; 32], counterparty_node_id: &PublicKey, funding_transaction: Transaction) -> Result<(), APIError> {
self.batch_funding_transaction_generated(&[(temporary_channel_id, counterparty_node_id)], funding_transaction)
}

pub fn batch_funding_transaction_generated(&self, temporary_channels: &[(&[u8; 32], &PublicKey)], funding_transaction: Transaction) -> Result<(), APIError> {
let _persistence_guard = PersistenceNotifierGuard::notify_on_drop(self);

for inp in funding_transaction.input.iter() {
Expand All @@ -3473,32 +3528,44 @@ where
});
}
}
self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction, |chan, tx| {
if tx.output.len() > u16::max_value() as usize {
return Err(APIError::APIMisuseError {
err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
});
}

let mut output_index = None;
let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
for (idx, outp) in tx.output.iter().enumerate() {
if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
if output_index.is_some() {
return Err(APIError::APIMisuseError {
err: "Multiple outputs matched the expected script and value".to_owned()
});
let is_batched = temporary_channels.len() > 1;
let v1_channel_batch_state = RefCell::new(HashMap::new());
for (temporary_channel_id, counterparty_node_id) in temporary_channels {
self.funding_transaction_generated_intern(temporary_channel_id, counterparty_node_id, funding_transaction.clone(), is_batched, |chan, tx| {
if tx.output.len() > u16::max_value() as usize {
return Err(APIError::APIMisuseError {
err: "Transaction had more than 2^16 outputs, which is not supported".to_owned()
});
}

let mut output_index = None;
let expected_spk = chan.context.get_funding_redeemscript().to_v0_p2wsh();
for (idx, outp) in tx.output.iter().enumerate() {
if outp.script_pubkey == expected_spk && outp.value == chan.context.get_value_satoshis() {
if output_index.is_some() {
return Err(APIError::APIMisuseError {
err: "Multiple outputs matched the expected script and value".to_owned()
});
}
output_index = Some(idx as u16);
}
output_index = Some(idx as u16);
}
}
if output_index.is_none() {
return Err(APIError::APIMisuseError {
err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
});
}
Ok(OutPoint { txid: tx.txid(), index: output_index.unwrap() })
})
if output_index.is_none() {
return Err(APIError::APIMisuseError {
err: "No output matched the script_pubkey and value in the FundingGenerationReady event".to_owned()
});
}
let outpoint = OutPoint { txid: tx.txid(), index: output_index.unwrap() };
v1_channel_batch_state.borrow_mut().insert((outpoint.to_channel_id(), (*counterparty_node_id).clone()), false);
Ok(outpoint)
})?;
}
if is_batched {
let mut v1_channel_batch_states = self.v1_channel_batch_states.write().unwrap();
v1_channel_batch_states.insert(funding_transaction.txid(), Mutex::new(v1_channel_batch_state.into_inner()));
}
Ok(())
}

/// Atomically applies partial updates to the [`ChannelConfig`] of the given channels.
Expand Down Expand Up @@ -9069,6 +9136,7 @@ where
total_consistency_lock: RwLock::new(()),
background_events_processed_since_startup: AtomicBool::new(false),
persistence_notifier: Notifier::new(),
v1_channel_batch_states: FairRwLock::new(HashMap::new()),

entropy_source: args.entropy_source,
node_signer: args.node_signer,
Expand Down
Loading

0 comments on commit 65a4687

Please sign in to comment.