Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ use crate::logger::{log_error, LdkLogger, LogLevel, LogWriter, Logger};
use crate::message_handler::NodeCustomMessageHandler;
use crate::payment::asynchronous::om_mailbox::OnionMessageMailbox;
use crate::peer_store::PeerStore;
use crate::probing::ProbingService;
use crate::runtime::Runtime;
use crate::tx_broadcaster::TransactionBroadcaster;
use crate::types::{
Expand Down Expand Up @@ -129,6 +130,16 @@ struct LiquiditySourceConfig {
lsps2_service: Option<LSPS2ServiceConfig>,
}

#[derive(Debug, Clone)]
struct ProbingServiceConfig {
/// Time in seconds between consecutive probing attempts.
probing_interval_secs: u64,
/// Maximum number of distinct targets to probe concurrently.
max_probing_targets: usize,
/// Amount in milli-satoshis used for each probe.
probing_amount_msat: u64,
}

#[derive(Clone)]
enum LogWriterConfig {
File { log_file_path: Option<String>, max_log_level: Option<LogLevel> },
Expand Down Expand Up @@ -253,6 +264,7 @@ pub struct NodeBuilder {
async_payments_role: Option<AsyncPaymentsRole>,
runtime_handle: Option<tokio::runtime::Handle>,
pathfinding_scores_sync_config: Option<PathfindingScoresSyncConfig>,
probing_service_config: Option<ProbingServiceConfig>,
}

impl NodeBuilder {
Expand All @@ -271,6 +283,7 @@ impl NodeBuilder {
let log_writer_config = None;
let runtime_handle = None;
let pathfinding_scores_sync_config = None;
let probing_service_config = None;
Self {
config,
entropy_source_config,
Expand All @@ -281,6 +294,7 @@ impl NodeBuilder {
runtime_handle,
async_payments_role: None,
pathfinding_scores_sync_config,
probing_service_config,
}
}

Expand Down Expand Up @@ -488,6 +502,23 @@ impl NodeBuilder {
self
}

/// Configures the probing service used to evaluate channel liquidity by sending
/// pre-flight probes to peers and routes.
///
/// * `probing_interval_secs` - Time in seconds between consecutive probing attempts.
/// * `max_probing_targets` - Maximum number of distinct targets to probe concurrently.
/// * `probing_amount_msat` - Amount in milli-satoshis used for each probe.
pub fn set_probing_service_config(
&mut self, probing_interval_secs: u64, max_probing_targets: usize, probing_amount_msat: u64,
) -> &mut Self {
self.probing_service_config = Some(ProbingServiceConfig {
probing_interval_secs,
max_probing_targets,
probing_amount_msat,
});
self
}

/// Sets the used storage directory path.
pub fn set_storage_dir_path(&mut self, storage_dir_path: String) -> &mut Self {
self.config.storage_dir_path = storage_dir_path;
Expand Down Expand Up @@ -744,6 +775,7 @@ impl NodeBuilder {
runtime,
logger,
Arc::new(vss_store),
self.probing_service_config.as_ref(),
)
}

Expand Down Expand Up @@ -778,6 +810,7 @@ impl NodeBuilder {
runtime,
logger,
kv_store,
self.probing_service_config.as_ref(),
)
}
}
Expand Down Expand Up @@ -982,6 +1015,17 @@ impl ArcedNodeBuilder {
self.inner.write().unwrap().set_storage_dir_path(storage_dir_path);
}

// Sets the probing service used to evaluate channel liquidity by sending
pub fn set_probing_service_config(
&self, probing_interval_secs: u64, max_probing_targets: usize, probing_amount_msat: u64,
) {
self.inner.write().unwrap().set_probing_service_config(
probing_interval_secs,
max_probing_targets,
probing_amount_msat,
);
}

/// Configures the [`Node`] instance to write logs to the filesystem.
///
/// The `log_file_path` defaults to [`DEFAULT_LOG_FILENAME`] in the configured
Expand Down Expand Up @@ -1142,6 +1186,7 @@ fn build_with_store_internal(
pathfinding_scores_sync_config: Option<&PathfindingScoresSyncConfig>,
async_payments_role: Option<AsyncPaymentsRole>, seed_bytes: [u8; 64], runtime: Arc<Runtime>,
logger: Arc<Logger>, kv_store: Arc<DynStore>,
probing_service_config: Option<&ProbingServiceConfig>,
) -> Result<Node, BuildError> {
optionally_install_rustls_cryptoprovider();

Expand Down Expand Up @@ -1760,6 +1805,23 @@ fn build_with_store_internal(

let pathfinding_scores_sync_url = pathfinding_scores_sync_config.map(|c| c.url.clone());

let probing_service = if let Some(pro_ser) = probing_service_config {
Some(Arc::new(ProbingService::new(
pro_ser.probing_interval_secs,
pro_ser.max_probing_targets,
pro_ser.probing_amount_msat,
Arc::clone(&config),
Arc::clone(&logger),
Arc::clone(&channel_manager),
Arc::clone(&keys_manager),
Arc::clone(&is_running),
Arc::clone(&payment_store),
Arc::clone(&network_graph),
)))
} else {
None
};

Ok(Node {
runtime,
stop_sender,
Expand Down Expand Up @@ -1790,6 +1852,7 @@ fn build_with_store_internal(
node_metrics,
om_mailbox,
async_payments_role,
probing_service,
})
}

Expand Down
35 changes: 35 additions & 0 deletions src/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ use lightning::routing::gossip::RoutingFees;
#[cfg(not(feature = "uniffi"))]
use lightning::routing::gossip::{ChannelInfo, NodeInfo};

use std::cmp::Reverse;
use std::collections::{BinaryHeap, HashMap};

use crate::types::Graph;

/// Represents the network as nodes and channels between them.
Expand Down Expand Up @@ -48,6 +51,38 @@ impl NetworkGraph {
pub fn node(&self, node_id: &NodeId) -> Option<NodeInfo> {
self.inner.read_only().nodes().get(node_id).cloned().map(|n| n.into())
}

/// Selects a set of nodes as targets for probing based on their total channel capacity.
pub fn select_probing_targets(&self, max_targets: usize) -> Vec<NodeId> {
// Compute the total capacity for each node
let node_capacities = self.inner.read_only().channels().unordered_iter().fold(
HashMap::new(),
|mut acc, (_, chan_info)| {
let cap = chan_info.capacity_sats.unwrap_or(0);
*acc.entry(chan_info.node_one).or_insert(0) += cap;
*acc.entry(chan_info.node_two).or_insert(0) += cap;
acc
},
);

// Use a min-heap to keep track of the top `max_targets` nodes by capacity
node_capacities
.into_iter()
.fold(BinaryHeap::with_capacity(max_targets), |mut top_heap, (node_id, cap)| {
if top_heap.len() < max_targets {
top_heap.push(Reverse((cap, node_id)));
} else if let Some(Reverse((min_cap, _))) = top_heap.peek() {
if cap > *min_cap {
top_heap.pop();
top_heap.push(Reverse((cap, node_id)));
}
}
top_heap
})
.into_iter()
.map(|Reverse((_, node_id))| node_id)
.collect()
}
}

/// Details about a channel (both directions).
Expand Down
29 changes: 29 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ pub mod logger;
mod message_handler;
pub mod payment;
mod peer_store;
mod probing;
mod runtime;
mod scoring;
mod tx_broadcaster;
Expand Down Expand Up @@ -149,6 +150,7 @@ use payment::{
UnifiedQrPayment,
};
use peer_store::{PeerInfo, PeerStore};
use probing::ProbingService;
use rand::Rng;
use runtime::Runtime;
use types::{
Expand Down Expand Up @@ -196,6 +198,7 @@ pub struct Node {
scorer: Arc<Mutex<Scorer>>,
peer_store: Arc<PeerStore<Arc<Logger>>>,
payment_store: Arc<PaymentStore>,
probing_service: Option<Arc<ProbingService>>,
is_running: Arc<RwLock<bool>>,
node_metrics: Arc<RwLock<NodeMetrics>>,
om_mailbox: Option<Arc<OnionMessageMailbox>>,
Expand Down Expand Up @@ -625,6 +628,32 @@ impl Node {
});
}

if let Some(probing_service) = self.probing_service.as_ref() {
let mut stop_probing_service = self.stop_sender.subscribe();
let probing_service = Arc::clone(probing_service);
let probing_logger = Arc::clone(&self.logger);

self.runtime.spawn_cancellable_background_task(async move {
let mut interval = tokio::time::interval(Duration::from_secs(
probing_service.probing_interval_secs,
));
loop {
tokio::select! {
_ = stop_probing_service.changed() => {
log_debug!(
probing_logger,
"Stopping probing service.",
);
return;
}
_ = interval.tick() => {
probing_service.handle_probing();
}
}
}
});
}

log_info!(self.logger, "Startup complete.");
*is_running_lock = true;
Ok(())
Expand Down
100 changes: 100 additions & 0 deletions src/probing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use std::sync::{Arc, RwLock};

use crate::{
config::Config,
graph::NetworkGraph,
logger::{log_debug, log_error, LdkLogger, Logger},
payment::SpontaneousPayment,
types::{ChannelManager, Graph, KeysManager, PaymentStore},
};

/// Configuration for the probing service used to evaluate channel liquidity by sending pre-flight
/// probes to peers and routes.
pub struct ProbingService {
pub probing_interval_secs: u64,
max_probing_targets: usize,
probing_amount_msat: u64,
config: Arc<Config>,
logger: Arc<Logger>,
channel_manager: Arc<ChannelManager>,
keys_manager: Arc<KeysManager>,
is_running: Arc<RwLock<bool>>,
payment_store: Arc<PaymentStore>,
network_graph: Arc<Graph>,
}

impl ProbingService {
/// Creates a new probing service with the given configuration and dependencies.
pub fn new(
probing_interval_secs: u64, max_probing_targets: usize, probing_amount_msat: u64,
config: Arc<Config>, logger: Arc<Logger>, channel_manager: Arc<ChannelManager>,
keys_manager: Arc<KeysManager>, is_running: Arc<RwLock<bool>>,
payment_store: Arc<PaymentStore>, network_graph: Arc<Graph>,
) -> Self {
Self {
probing_interval_secs,
max_probing_targets,
probing_amount_msat,
config,
logger,
channel_manager,
keys_manager,
is_running,
payment_store,
network_graph,
}
}

pub fn handle_probing(&self) {
let channels = self.channel_manager.list_channels().len();
if channels == 0 {
log_debug!(self.logger, "Probing service found no channels, skipping probing.");
return;
}

let network = self.network_graph();
let spontaneous_payment = self.spontaneous_payment();

let targets = network.select_probing_targets(self.max_probing_targets);
for target in targets {
let public_key = match target.as_pubkey() {
Ok(pk) => pk,
Err(_) => {
log_error!(
self.logger,
"Probing service failed to get target pubkey: {}",
target
);
continue;
},
};

match spontaneous_payment.send_probes(self.probing_amount_msat, public_key) {
Ok(_) => {
log_debug!(self.logger, "Probing service sent probe to target: {}", public_key)
},
Err(e) => log_error!(
self.logger,
"Probing service failed to send probe to target {}: {}",
public_key,
e
),
}
}
}

fn network_graph(&self) -> NetworkGraph {
NetworkGraph::new(Arc::clone(&self.network_graph))
}

fn spontaneous_payment(&self) -> SpontaneousPayment {
SpontaneousPayment::new(
Arc::clone(&self.channel_manager),
Arc::clone(&self.keys_manager),
Arc::clone(&self.payment_store),
Arc::clone(&self.config),
Arc::clone(&self.is_running),
Arc::clone(&self.logger),
)
}
}
Loading