Skip to content

Commit

Permalink
Add utils to persist scorer in BackgroundProcessor
Browse files Browse the repository at this point in the history
  • Loading branch information
jurvis committed May 2, 2022
1 parent 62edee5 commit 199f46d
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 15 deletions.
76 changes: 66 additions & 10 deletions lightning-background-processor/src/lib.rs
Expand Up @@ -18,6 +18,7 @@ use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
use lightning::routing::scoring::WriteableScore;
use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
Expand Down Expand Up @@ -151,6 +152,7 @@ impl BackgroundProcessor {
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
pub fn start<
'a,
Signer: 'static + Sign,
CA: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
Expand All @@ -171,9 +173,11 @@ impl BackgroundProcessor {
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>
) -> Self
where
CA::Target: 'static + chain::Access,
Expand All @@ -187,7 +191,7 @@ impl BackgroundProcessor {
CMH::Target: 'static + ChannelMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<Signer, CW, T, K, F, L>
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
Expand Down Expand Up @@ -277,6 +281,11 @@ impl BackgroundProcessor {
last_prune_call = Instant::now();
have_pruned = true;
}
if let Some(ref scorer) = scorer {
if let Err(e) = persister.persist_scorer(&scorer) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}
}
}

Expand All @@ -285,10 +294,16 @@ impl BackgroundProcessor {
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
persister.persist_manager(&*channel_manager)?;

// Persist Scorer on exit
if let Some(ref scorer) = scorer {
persister.persist_scorer(&scorer)?;
}

// Persist NetworkGraph on exit
if let Some(ref handler) = net_graph_msg_handler {
persister.persist_graph(handler.network_graph())?;
}

Ok(())
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
Expand Down Expand Up @@ -411,12 +426,13 @@ mod tests {
graph_error: Option<(std::io::ErrorKind, &'static str)>,
manager_error: Option<(std::io::ErrorKind, &'static str)>,
filesystem_persister: FilesystemPersister,
scorer_error: Option<(std::io::ErrorKind, &'static str)>
}

impl Persister {
fn new(data_dir: String) -> Self {
let filesystem_persister = FilesystemPersister::new(data_dir.clone());
Self { graph_error: None, manager_error: None, filesystem_persister }
Self { graph_error: None, manager_error: None, scorer_error: None, filesystem_persister }
}

fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Expand All @@ -426,6 +442,10 @@ mod tests {
fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Self { manager_error: Some((error, message)), ..self }
}

fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Self { scorer_error: Some((error, message)), ..self }
}
}

impl KVStorePersister for Persister {
Expand All @@ -442,6 +462,12 @@ mod tests {
}
}

if key == "scorer" {
if let Some((error, message)) = self.scorer_error {
return Err(std::io::Error::new(error, message))
}
}

self.filesystem_persister.persist(key, object)
}
}
Expand Down Expand Up @@ -571,7 +597,8 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(scorer.clone()));

macro_rules! check_persisted_data {
($node: expr, $filepath: expr) => {
Expand Down Expand Up @@ -621,6 +648,10 @@ mod tests {
check_persisted_data!(network_graph, filepath.clone());
}

// Check scorer is persisted
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
check_persisted_data!(scorer.lock().unwrap(), filepath.clone());

assert!(bg_processor.stop().is_ok());
}

Expand All @@ -632,7 +663,8 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(scorer));
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
Expand All @@ -655,7 +687,8 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(scorer));
match bg_processor.join() {
Ok(_) => panic!("Expected error persisting manager"),
Err(e) => {
Expand All @@ -672,7 +705,8 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(scorer));

match bg_processor.stop() {
Ok(_) => panic!("Expected error persisting network graph"),
Expand All @@ -683,6 +717,25 @@ mod tests {
}
}

#[test]
fn test_scorer_persist_error() {
// Test that if we encounter an error during scorer persistence, an error gets returned.
let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(scorer));

match bg_processor.stop() {
Ok(_) => panic!("Expected error persisting scorer"),
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::Other);
assert_eq!(e.get_ref().unwrap().to_string(), "test");
},
}
}

#[test]
fn test_background_event_handling() {
let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
Expand All @@ -695,7 +748,8 @@ mod tests {
let event_handler = move |event: &Event| {
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(scorer));

// Open a channel and check that the FundingGenerationReady event was handled.
begin_open_channel!(nodes[0], nodes[1], channel_value);
Expand All @@ -720,7 +774,8 @@ mod tests {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
let persister = Arc::new(Persister::new(data_dir));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(scorer));

// Force close the channel and check that the SpendableOutputs event was handled.
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
Expand Down Expand Up @@ -751,7 +806,8 @@ mod tests {
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
let event_handler = Arc::clone(&invoice_payer);
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(scorer));
assert!(bg_processor.stop().is_ok());
}
}
10 changes: 10 additions & 0 deletions lightning/src/routing/scoring.rs
Expand Up @@ -137,6 +137,16 @@ pub trait LockableScore<'a> {
fn lock(&'a self) -> Self::Locked;
}

/// Refers to a scorer that is accessible under lock and also writeable to disk
///
/// We need this trait to be able to pass in a scorer to `lightning-background-processor` that will enable us to
/// use the Persister to persist it.
pub trait WriteableScore<'a>: LockableScore<'a> + Writeable {}

impl<'a, T> WriteableScore<'a> for T
where T: LockableScore<'a> + Writeable
{}

/// (C-not exported)
impl<'a, T: 'a + Score> LockableScore<'a> for Mutex<T> {
type Locked = MutexGuard<'a, T>;
Expand Down
21 changes: 16 additions & 5 deletions lightning/src/util/persist.rs
Expand Up @@ -11,6 +11,7 @@
use core::ops::Deref;
use bitcoin::hashes::hex::ToHex;
use io::{self};
use routing::scoring::WriteableScore;

use crate::{chain::{keysinterface::{Sign, KeysInterface}, self, transaction::{OutPoint}, chaininterface::{BroadcasterInterface, FeeEstimator}, chainmonitor::{Persist, MonitorUpdateId}, channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}}, ln::channelmanager::ChannelManager, routing::network_graph::NetworkGraph};
use super::{logger::Logger, ser::Writeable};
Expand All @@ -24,37 +25,47 @@ pub trait KVStorePersister {
fn persist<W: Writeable>(&self, key: &str, object: &W) -> io::Result<()>;
}

/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
pub trait Persister<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref, S>
where M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
S: WriteableScore<'a>
{
/// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), io::Error>;

/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>;

/// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
}

impl<A: KVStorePersister, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Persister<Signer, M, T, K, F, L> for A
impl<'a, A: KVStorePersister, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref, S> Persister<'a, Signer, M, T, K, F, L, S> for A
where M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
S: WriteableScore<'a>
{
/// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
/// Persist the given ['ChannelManager'] to disk with the name "manager", returning an error if persistence failed.
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), io::Error> {
self.persist("manager", channel_manager)
}

/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
/// Persist the given [`NetworkGraph`] to disk with the name "network_graph", returning an error if persistence failed.
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> {
self.persist("network_graph", network_graph)
}

/// Persist the given [`WriteableScore`] to disk with name "scorer", returning an error if persistence failed.
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
self.persist("scorer", &scorer)
}
}

impl<ChannelSigner: Sign, K: KVStorePersister> Persist<ChannelSigner> for K {
Expand Down

0 comments on commit 199f46d

Please sign in to comment.