Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct connect for OnionMessage sending #2723

Merged
merged 18 commits into from
Dec 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 4 additions & 1 deletion fuzz/src/onion_message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ impl MessageRouter for TestMessageRouter {
Ok(OnionMessagePath {
intermediate_nodes: vec![],
destination,
addresses: None,
})
}
}
Expand Down Expand Up @@ -269,7 +270,9 @@ mod tests {
"Received an onion message with path_id None and a reply_path: Custom(TestCustomMessage)"
.to_string())), Some(&1));
assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(),
"Sending onion message: TestCustomMessage".to_string())), Some(&1));
"Constructing onion message when responding to Custom onion message with path_id None: TestCustomMessage".to_string())), Some(&1));
assert_eq!(log_entries.get(&("lightning::onion_message::messenger".to_string(),
"Buffered onion message when responding to Custom onion message with path_id None".to_string())), Some(&1));
}

let two_unblinded_hops_om = "\
Expand Down
88 changes: 67 additions & 21 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use lightning::events::{Event, PathFailure};
#[cfg(feature = "std")]
use lightning::events::{EventHandler, EventsProvider};
use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::OnionMessageHandler;
use lightning::ln::peer_handler::APeerManager;
use lightning::routing::gossip::{NetworkGraph, P2PGossipSync};
use lightning::routing::utxo::UtxoLookup;
Expand Down Expand Up @@ -104,6 +105,11 @@ const PING_TIMER: u64 = 30;
#[cfg(test)]
const PING_TIMER: u64 = 1;

#[cfg(not(test))]
const ONION_MESSAGE_HANDLER_TIMER: u64 = 10;
#[cfg(test)]
const ONION_MESSAGE_HANDLER_TIMER: u64 = 1;

/// Prune the network graph of stale entries hourly.
const NETWORK_PRUNE_TIMER: u64 = 60 * 60;

Expand Down Expand Up @@ -270,18 +276,20 @@ fn update_scorer<'a, S: 'static + Deref<Target = SC> + Send + Sync, SC: 'a + Wri
}

macro_rules! define_run_body {
($persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
$gossip_sync: ident, $peer_manager: ident, $logger: ident, $scorer: ident,
$loop_exit_check: expr, $await: expr, $get_timer: expr, $timer_elapsed: expr,
$check_slow_await: expr)
=> { {
(
$persister: ident, $chain_monitor: ident, $process_chain_monitor_events: expr,
$channel_manager: ident, $process_channel_manager_events: expr,
$peer_manager: ident, $process_onion_message_handler_events: expr, $gossip_sync: ident,
$logger: ident, $scorer: ident, $loop_exit_check: expr, $await: expr, $get_timer: expr,
$timer_elapsed: expr, $check_slow_await: expr
) => { {
log_trace!($logger, "Calling ChannelManager's timer_tick_occurred on startup");
$channel_manager.timer_tick_occurred();
log_trace!($logger, "Rebroadcasting monitor's pending claims on startup");
$chain_monitor.rebroadcast_pending_claims();

let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
let mut last_ping_call = $get_timer(PING_TIMER);
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
let mut last_scorer_persist_call = $get_timer(SCORER_PERSIST_TIMER);
Expand All @@ -291,6 +299,7 @@ macro_rules! define_run_body {
loop {
$process_channel_manager_events;
$process_chain_monitor_events;
$process_onion_message_handler_events;

// Note that the PeerManager::process_events may block on ChannelManager's locks,
// hence it comes last here. When the ChannelManager finishes whatever it's doing,
Expand Down Expand Up @@ -334,6 +343,11 @@ macro_rules! define_run_body {
$channel_manager.timer_tick_occurred();
last_freshness_call = $get_timer(FRESHNESS_TIMER);
}
if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
jkczyz marked this conversation as resolved.
Show resolved Hide resolved
$peer_manager.onion_message_handler().timer_tick_occurred();
last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
}
if await_slow {
// On various platforms, we may be starved of CPU cycles for several reasons.
// E.g. on iOS, if we've been in the background, we will be entirely paused.
Expand Down Expand Up @@ -603,8 +617,7 @@ pub async fn process_events_async<
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
APM: APeerManager + Send + Sync,
PM: 'static + Deref<Target = APM> + Send + Sync,
PM: 'static + Deref + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for<'b> WriteableScore<'b>,
SleepFuture: core::future::Future<Output = bool> + core::marker::Unpin,
Expand All @@ -627,6 +640,7 @@ where
L::Target: 'static + Logger,
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
PM::Target: APeerManager + Send + Sync,
{
let mut should_break = false;
let async_event_handler = |event| {
Expand All @@ -650,10 +664,12 @@ where
event_handler(event).await;
}
};
define_run_body!(persister,
chain_monitor, chain_monitor.process_pending_events_async(async_event_handler).await,
define_run_body!(
persister, chain_monitor,
chain_monitor.process_pending_events_async(async_event_handler).await,
channel_manager, channel_manager.process_pending_events_async(async_event_handler).await,
gossip_sync, peer_manager, logger, scorer, should_break, {
peer_manager, process_onion_message_handler_events_async(&peer_manager, async_event_handler).await,
gossip_sync, logger, scorer, should_break, {
let fut = Selector {
a: channel_manager.get_event_or_persistence_needed_future(),
b: chain_monitor.get_update_future(),
Expand All @@ -673,7 +689,29 @@ where
task::Poll::Ready(exit) => { should_break = exit; true },
task::Poll::Pending => false,
}
}, mobile_interruptable_platform)
}, mobile_interruptable_platform
)
}

#[cfg(feature = "futures")]
async fn process_onion_message_handler_events_async<
EventHandlerFuture: core::future::Future<Output = ()>,
EventHandler: Fn(Event) -> EventHandlerFuture,
PM: 'static + Deref + Send + Sync,
>(
peer_manager: &PM, handler: EventHandler
)
where
PM::Target: APeerManager + Send + Sync,
{
use lightning::events::EventsProvider;

let events = core::cell::RefCell::new(Vec::new());
peer_manager.onion_message_handler().process_pending_events(&|e| events.borrow_mut().push(e));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe after async functions in traits are stabilized, we can add an async event processing method to EventsProvider with a default implementation 🤔 I'm just wondering if this could be a temporary workaround.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, possibly. EventsProvider is kinda weird in that up until now it never really needed to be a trait.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the reason for this now? Can we not just add a process_pending_events_async method on OnionMessenger just like we do for ChannelManager?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BackgroundProcessor gets the OnionMessenger through PeerManager, which only knows it is something that implements OnionMessageHandler and EventsProvider. If we instead pass OnionMessenger to BackgroundProcessor, then we would need to add three more type parameters to BackgroundProcessor.


for event in events.into_inner() {
handler(event).await
}
}

#[cfg(feature = "std")]
Expand Down Expand Up @@ -742,8 +780,7 @@ impl BackgroundProcessor {
CM: 'static + Deref<Target = ChannelManager<CW, T, ES, NS, SP, F, R, L>> + Send + Sync,
PGS: 'static + Deref<Target = P2PGossipSync<G, UL, L>> + Send + Sync,
RGS: 'static + Deref<Target = RapidGossipSync<G, L>> + Send,
APM: APeerManager + Send + Sync,
PM: 'static + Deref<Target = APM> + Send + Sync,
PM: 'static + Deref + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: for <'b> WriteableScore<'b>,
>(
Expand All @@ -763,6 +800,7 @@ impl BackgroundProcessor {
L::Target: 'static + Logger,
P::Target: 'static + Persist<<SP::Target as SignerProvider>::EcdsaSigner>,
PS::Target: 'static + Persister<'a, CW, T, ES, NS, SP, F, R, L, SC>,
PM::Target: APeerManager + Send + Sync,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
Expand All @@ -782,14 +820,18 @@ impl BackgroundProcessor {
}
event_handler.handle_event(event);
};
define_run_body!(persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
define_run_body!(
persister, chain_monitor, chain_monitor.process_pending_events(&event_handler),
channel_manager, channel_manager.process_pending_events(&event_handler),
gossip_sync, peer_manager, logger, scorer, stop_thread.load(Ordering::Acquire),
peer_manager,
peer_manager.onion_message_handler().process_pending_events(&event_handler),
gossip_sync, logger, scorer, stop_thread.load(Ordering::Acquire),
{ Sleeper::from_two_futures(
channel_manager.get_event_or_persistence_needed_future(),
chain_monitor.get_update_future()
).wait_timeout(Duration::from_millis(100)); },
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false)
|_| Instant::now(), |time: &Instant, dur| time.elapsed().as_secs() > dur, false
)
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
}
Expand Down Expand Up @@ -1362,9 +1404,11 @@ mod tests {

#[test]
fn test_timer_tick_called() {
// Test that `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
// `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`, and
// `PeerManager::timer_tick_occurred` every `PING_TIMER`.
// Test that:
// - `ChannelManager::timer_tick_occurred` is called every `FRESHNESS_TIMER`,
// - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
// - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
// - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
Expand All @@ -1375,9 +1419,11 @@ mod tests {
let desired_log_1 = "Calling ChannelManager's timer_tick_occurred".to_string();
let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() {
log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
break
}
}
Expand Down
28 changes: 26 additions & 2 deletions lightning/src/events/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -530,6 +530,25 @@ pub enum Event {
/// serialized prior to LDK version 0.0.117.
sender_intended_total_msat: Option<u64>,
},
/// Indicates that a peer connection with a node is needed in order to send an [`OnionMessage`].
///
/// Typically, this happens when a [`MessageRouter`] is unable to find a complete path to a
/// [`Destination`]. Once a connection is established, any messages buffered by an
/// [`OnionMessageHandler`] may be sent.
///
/// This event will not be generated for onion message forwards; only for sends including
/// replies. Handlers should connect to the node otherwise any buffered messages may be lost.
///
/// [`OnionMessage`]: msgs::OnionMessage
/// [`MessageRouter`]: crate::onion_message::MessageRouter
/// [`Destination`]: crate::onion_message::Destination
/// [`OnionMessageHandler`]: crate::ln::msgs::OnionMessageHandler
ConnectionNeeded {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why did you end up going the event route vs having an "start opening a connection async" trait (or even just doing it via the onion message router)? I guess an event could let us reuse it later in ChannelManager to just tell the user to reconnect every time a peer disconnects we care about, rather than users having to poll that, but it does seem like a lot of indirection to get the needs-connection event out.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would require adding another parameterization for users to implement (i.e., either a new trait on OnionMessenger or one on DefaultMessageRotuer), which would then need to bubble up to the Simple* aliases.

And since it needs to be async, the trait implementation wouldn't be enough. Users would need to write code to process connection requests in the background from what their trait enqueued. If we wrote some of this logic for them, then we'd need to do it for each language's IO layer. But that wouldn't fit well into any peer management logic that they may have (e.g., LDK Node's PeerStore).

Seems much simpler just to surface an event for them so that no additional parameterization and processing code is needed, other than handling one more event variant.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea, fair, I guess. I think the current piping through of the needs-connected nodes is also a bit gross, but I'm not sure its more gross. One further thing to consider is what we want the eventual end state to be. If we want to to generate ConnectionNeeded events for peers with which we have a channel, we should stick with the event route (and probably start generating such events in the same release). If we don't, adding it via the router is easier to "walk back", and is at least only gross temporarily.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we pipe the needs-connect nodes through OffersMessageHandler / CustomOMHandler with a new trait method? Then the ChannelManager could generate ConnectionNeeded events and we wouldn't have to trouble the background processor. I do see Jeff's argument that the current design aligns with the peer manager doing peer stuff though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... that wouldn't work for uses of send_onion_message or anyone using something other than ChannelManager to implement one of those traits.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The custom trait implementations would need to support connecting to peers outbound, would it not work in some other way? I'm not convinced it's cleaner than the current approach though :/ just nicer in terms of the PeerMan/BgProcessor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure that I follow. All custom implementations should work with the current approach of using OnionMessenger. Or are you saying a custom implementation of OnionMessageHandler (i.e., something other than OnionMessenger)? Note that CustomOnionMessageHandler and OffersMessageHandler are specific to OnionMessenger. Any implementations of those work fine with the current approach.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed offline. Updated to have events processed directly from any OnionMessageHandler by requiring implementations to also implement EventsProvider. I was able to do so without adding another type parameter to BackgroundProcessor since the handler can be fetched from PeerManager.

/// The node id for the node needing a connection.
node_id: PublicKey,
/// Sockets for connecting to the node.
addresses: Vec<msgs::SocketAddress>,
},
/// Indicates a request for an invoice failed to yield a response in a reasonable amount of time
/// or was explicitly abandoned by [`ChannelManager::abandon_payment`]. This may be for an
/// [`InvoiceRequest`] sent for an [`Offer`] or for a [`Refund`] that hasn't been redeemed.
Expand Down Expand Up @@ -1190,6 +1209,10 @@ impl Writeable for Event {
(0, payment_id, required),
})
},
&Event::ConnectionNeeded { .. } => {
35u8.write(writer)?;
// Never write ConnectionNeeded events as buffered onion messages aren't serialized.
},
// Note that, going forward, all new events must only write data inside of
// `write_tlv_fields`. Versions 0.0.101+ will ignore odd-numbered events that write
// data via `write_tlv_fields`.
Expand All @@ -1200,8 +1223,7 @@ impl Writeable for Event {
impl MaybeReadable for Event {
fn read<R: io::Read>(reader: &mut R) -> Result<Option<Self>, msgs::DecodeError> {
match Readable::read(reader)? {
// Note that we do not write a length-prefixed TLV for FundingGenerationReady events,
// unlike all other events, thus we return immediately here.
// Note that we do not write a length-prefixed TLV for FundingGenerationReady events.
0u8 => Ok(None),
1u8 => {
let f = || {
Expand Down Expand Up @@ -1588,6 +1610,8 @@ impl MaybeReadable for Event {
};
f()
},
// Note that we do not write a length-prefixed TLV for ConnectionNeeded events.
35u8 => Ok(None),
// Versions prior to 0.0.100 did not ignore odd types, instead returning InvalidValue.
// Version 0.0.100 failed to properly ignore odd types, possibly resulting in corrupt
// reads.
Expand Down
8 changes: 6 additions & 2 deletions lightning/src/ln/msgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ use core::fmt::Display;
use crate::io::{self, Cursor, Read};
use crate::io_extras::read_to_end;

use crate::events::MessageSendEventsProvider;
use crate::events::{EventsProvider, MessageSendEventsProvider};
use crate::util::chacha20poly1305rfc::ChaChaPolyReadAdapter;
use crate::util::logger;
use crate::util::ser::{LengthReadable, LengthReadableArgs, Readable, ReadableArgs, Writeable, Writer, WithoutLength, FixedLengthReader, HighZeroBytesDroppedBigSize, Hostname, TransactionU16LenLimited, BigSize};
Expand Down Expand Up @@ -1631,7 +1631,7 @@ pub trait RoutingMessageHandler : MessageSendEventsProvider {
}

/// A handler for received [`OnionMessage`]s and for providing generated ones to send.
pub trait OnionMessageHandler {
pub trait OnionMessageHandler: EventsProvider {
/// Handle an incoming `onion_message` message from the given peer.
fn handle_onion_message(&self, peer_node_id: &PublicKey, msg: &OnionMessage);

Expand All @@ -1650,6 +1650,10 @@ pub trait OnionMessageHandler {
/// drop and refuse to forward onion messages to this peer.
fn peer_disconnected(&self, their_node_id: &PublicKey);

/// Performs actions that should happen roughly every ten seconds after startup. Allows handlers
/// to drop any buffered onion messages intended for prospective peers.
fn timer_tick_occurred(&self);

// Handler information:
/// Gets the node feature flags which this handler itself supports. All available handlers are
/// queried similarly and their feature flags are OR'd together to form the [`NodeFeatures`]
Expand Down
11 changes: 10 additions & 1 deletion lightning/src/ln/peer_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use bitcoin::blockdata::constants::ChainHash;
use bitcoin::secp256k1::{self, Secp256k1, SecretKey, PublicKey};

use crate::sign::{KeysManager, NodeSigner, Recipient};
use crate::events::{MessageSendEvent, MessageSendEventsProvider};
use crate::events::{EventHandler, EventsProvider, MessageSendEvent, MessageSendEventsProvider};
use crate::ln::ChannelId;
use crate::ln::features::{InitFeatures, NodeFeatures};
use crate::ln::msgs;
Expand Down Expand Up @@ -89,6 +89,9 @@ pub trait CustomMessageHandler: wire::CustomMessageReader {
/// A dummy struct which implements `RoutingMessageHandler` without storing any routing information
/// or doing any processing. You can provide one of these as the route_handler in a MessageHandler.
pub struct IgnoringMessageHandler{}
impl EventsProvider for IgnoringMessageHandler {
fn process_pending_events<H: Deref>(&self, _handler: H) where H::Target: EventHandler {}
}
impl MessageSendEventsProvider for IgnoringMessageHandler {
fn get_and_clear_pending_msg_events(&self) -> Vec<MessageSendEvent> { Vec::new() }
}
Expand All @@ -115,6 +118,7 @@ impl OnionMessageHandler for IgnoringMessageHandler {
fn next_onion_message_for_peer(&self, _peer_node_id: PublicKey) -> Option<msgs::OnionMessage> { None }
fn peer_connected(&self, _their_node_id: &PublicKey, _init: &msgs::Init, _inbound: bool) -> Result<(), ()> { Ok(()) }
fn peer_disconnected(&self, _their_node_id: &PublicKey) {}
fn timer_tick_occurred(&self) {}
fn provided_node_features(&self) -> NodeFeatures { NodeFeatures::empty() }
fn provided_init_features(&self, _their_node_id: &PublicKey) -> InitFeatures {
InitFeatures::empty()
Expand Down Expand Up @@ -680,6 +684,8 @@ pub trait APeerManager {
type NS: Deref<Target=Self::NST>;
/// Gets a reference to the underlying [`PeerManager`].
fn as_ref(&self) -> &PeerManager<Self::Descriptor, Self::CM, Self::RM, Self::OM, Self::L, Self::CMH, Self::NS>;
/// Returns the peer manager's [`OnionMessageHandler`].
fn onion_message_handler(&self) -> &Self::OMT;
}

impl<Descriptor: SocketDescriptor, CM: Deref, RM: Deref, OM: Deref, L: Deref, CMH: Deref, NS: Deref>
Expand All @@ -705,6 +711,9 @@ APeerManager for PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> where
type NST = <NS as Deref>::Target;
type NS = NS;
fn as_ref(&self) -> &PeerManager<Descriptor, CM, RM, OM, L, CMH, NS> { self }
fn onion_message_handler(&self) -> &Self::OMT {
self.message_handler.onion_message_handler.deref()
}
}

/// A PeerManager manages a set of peers, described by their [`SocketDescriptor`] and marshalls
Expand Down