Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add utils to persist scorer in BackgroundProcessor #1416

Merged
merged 2 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 67 additions & 15 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use lightning::ln::channelmanager::ChannelManager;
use lightning::ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
use lightning::ln::peer_handler::{CustomMessageHandler, PeerManager, SocketDescriptor};
use lightning::routing::network_graph::{NetworkGraph, NetGraphMsgHandler};
use lightning::routing::scoring::WriteableScore;
use lightning::util::events::{Event, EventHandler, EventsProvider};
use lightning::util::logger::Logger;
use lightning::util::persist::Persister;
Expand Down Expand Up @@ -151,6 +152,7 @@ impl BackgroundProcessor {
/// [`NetworkGraph`]: lightning::routing::network_graph::NetworkGraph
/// [`NetworkGraph::write`]: lightning::routing::network_graph::NetworkGraph#impl-Writeable
pub fn start<
'a,
Signer: 'static + Sign,
CA: 'static + Deref + Send + Sync,
CF: 'static + Deref + Send + Sync,
Expand All @@ -171,9 +173,11 @@ impl BackgroundProcessor {
NG: 'static + Deref<Target = NetGraphMsgHandler<G, CA, L>> + Send + Sync,
UMH: 'static + Deref + Send + Sync,
PM: 'static + Deref<Target = PeerManager<Descriptor, CMH, RMH, L, UMH>> + Send + Sync,
S: 'static + Deref<Target = SC> + Send + Sync,
SC: WriteableScore<'a>,
>(
persister: PS, event_handler: EH, chain_monitor: M, channel_manager: CM,
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L
net_graph_msg_handler: Option<NG>, peer_manager: PM, logger: L, scorer: Option<S>
) -> Self
where
CA::Target: 'static + chain::Access,
Expand All @@ -187,7 +191,7 @@ impl BackgroundProcessor {
CMH::Target: 'static + ChannelMessageHandler,
RMH::Target: 'static + RoutingMessageHandler,
UMH::Target: 'static + CustomMessageHandler,
PS::Target: 'static + Persister<Signer, CW, T, K, F, L>
PS::Target: 'static + Persister<'a, Signer, CW, T, K, F, L, SC>,
{
let stop_thread = Arc::new(AtomicBool::new(false));
let stop_thread_clone = stop_thread.clone();
Expand Down Expand Up @@ -274,9 +278,16 @@ impl BackgroundProcessor {
if let Err(e) = persister.persist_graph(handler.network_graph()) {
log_error!(logger, "Error: Failed to persist network graph, check your disk and permissions {}", e)
}
last_prune_call = Instant::now();
have_pruned = true;
}
if let Some(ref scorer) = scorer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, this logic is busted if we don't have a net_graph_msg_handler provided (which, okay, should be rare). Just need to move the last_prune_call and have_pruned updates out of the above if. While you're at it, can you add a trace-level log before calling persist_scorer?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this predates this change. @jurvis Could you make a separate commit for this fix?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, I can do that 😄

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jkczyz done

log_trace!(logger, "Persisting scorer");
if let Err(e) = persister.persist_scorer(&scorer) {
log_error!(logger, "Error: Failed to persist scorer, check your disk and permissions {}", e)
}
}

last_prune_call = Instant::now();
have_pruned = true;
}
}

Expand All @@ -285,10 +296,16 @@ impl BackgroundProcessor {
// ChannelMonitor update(s) persisted without a corresponding ChannelManager update.
persister.persist_manager(&*channel_manager)?;

// Persist Scorer on exit
if let Some(ref scorer) = scorer {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: lets do this before the network graph.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be worth doing the same in the loop?

persister.persist_scorer(&scorer)?;
}

// Persist NetworkGraph on exit
if let Some(ref handler) = net_graph_msg_handler {
persister.persist_graph(handler.network_graph())?;
}

Ok(())
});
Self { stop_thread: stop_thread_clone, thread_handle: Some(handle) }
Expand Down Expand Up @@ -369,6 +386,7 @@ mod tests {
use std::path::PathBuf;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use lightning::routing::scoring::{FixedPenaltyScorer};
use super::{BackgroundProcessor, FRESHNESS_TIMER};

const EVENT_DEADLINE: u64 = 5 * FRESHNESS_TIMER;
Expand All @@ -395,6 +413,7 @@ mod tests {
network_graph: Arc<NetworkGraph>,
logger: Arc<test_utils::TestLogger>,
best_block: BestBlock,
scorer: Arc<Mutex<FixedPenaltyScorer>>,
}

impl Drop for Node {
Expand All @@ -410,13 +429,14 @@ mod tests {
struct Persister {
graph_error: Option<(std::io::ErrorKind, &'static str)>,
manager_error: Option<(std::io::ErrorKind, &'static str)>,
scorer_error: Option<(std::io::ErrorKind, &'static str)>,
filesystem_persister: FilesystemPersister,
}

impl Persister {
fn new(data_dir: String) -> Self {
let filesystem_persister = FilesystemPersister::new(data_dir.clone());
Self { graph_error: None, manager_error: None, filesystem_persister }
Self { graph_error: None, manager_error: None, scorer_error: None, filesystem_persister }
}

fn with_graph_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Expand All @@ -426,6 +446,10 @@ mod tests {
fn with_manager_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Self { manager_error: Some((error, message)), ..self }
}

fn with_scorer_error(self, error: std::io::ErrorKind, message: &'static str) -> Self {
Self { scorer_error: Some((error, message)), ..self }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like the scorer_error isn't used - need to update the persist method below.

}
}

impl KVStorePersister for Persister {
Expand All @@ -442,6 +466,12 @@ mod tests {
}
}

if key == "scorer" {
if let Some((error, message)) = self.scorer_error {
return Err(std::io::Error::new(error, message))
}
}

self.filesystem_persister.persist(key, object)
}
}
Expand Down Expand Up @@ -473,7 +503,8 @@ mod tests {
let net_graph_msg_handler = Some(Arc::new(NetGraphMsgHandler::new(network_graph.clone(), Some(chain_source.clone()), logger.clone())));
let msg_handler = MessageHandler { chan_handler: Arc::new(test_utils::TestChannelMessageHandler::new()), route_handler: Arc::new(test_utils::TestRoutingMessageHandler::new() )};
let peer_manager = Arc::new(PeerManager::new(msg_handler, keys_manager.get_node_secret(Recipient::Node).unwrap(), &seed, logger.clone(), IgnoringMessageHandler{}));
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block };
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let node = Node { node: manager, net_graph_msg_handler, peer_manager, chain_monitor, persister, tx_broadcaster, network_graph, logger, best_block, scorer };
nodes.push(node);
}

Expand Down Expand Up @@ -571,7 +602,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

macro_rules! check_persisted_data {
($node: expr, $filepath: expr) => {
Expand Down Expand Up @@ -621,6 +652,10 @@ mod tests {
check_persisted_data!(network_graph, filepath.clone());
}

// Check scorer is persisted
let filepath = get_full_filepath("test_background_processor_persister_0".to_string(), "scorer".to_string());
check_persisted_data!(nodes[0].scorer, filepath.clone());

assert!(bg_processor.stop().is_ok());
}

Expand All @@ -632,7 +667,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
loop {
let log_entries = nodes[0].logger.lines.lock().unwrap();
let desired_log = "Calling ChannelManager's timer_tick_occurred".to_string();
Expand All @@ -655,7 +690,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_manager_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
match bg_processor.join() {
Ok(_) => panic!("Expected error persisting manager"),
Err(e) => {
Expand All @@ -672,7 +707,7 @@ mod tests {
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_graph_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

match bg_processor.stop() {
Ok(_) => panic!("Expected error persisting network graph"),
Expand All @@ -683,6 +718,24 @@ mod tests {
}
}

#[test]
fn test_scorer_persist_error() {
// Test that if we encounter an error during scorer persistence, an error gets returned.
let nodes = create_nodes(2, "test_persist_scorer_error".to_string());
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir).with_scorer_error(std::io::ErrorKind::Other, "test"));
let event_handler = |_: &_| {};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

match bg_processor.stop() {
Ok(_) => panic!("Expected error persisting scorer"),
Err(e) => {
assert_eq!(e.kind(), std::io::ErrorKind::Other);
assert_eq!(e.get_ref().unwrap().to_string(), "test");
},
}
}

#[test]
fn test_background_event_handling() {
let mut nodes = create_nodes(2, "test_background_event_handling".to_string());
Expand All @@ -695,7 +748,7 @@ mod tests {
let event_handler = move |event: &Event| {
sender.send(handle_funding_generation_ready!(event, channel_value)).unwrap();
};
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

// Open a channel and check that the FundingGenerationReady event was handled.
begin_open_channel!(nodes[0], nodes[1], channel_value);
Expand All @@ -720,7 +773,7 @@ mod tests {
let (sender, receiver) = std::sync::mpsc::sync_channel(1);
let event_handler = move |event: &Event| sender.send(event.clone()).unwrap();
let persister = Arc::new(Persister::new(data_dir));
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));

// Force close the channel and check that the SpendableOutputs event was handled.
nodes[0].node.force_close_channel(&nodes[0].node.list_channels()[0].channel_id).unwrap();
Expand All @@ -747,11 +800,10 @@ mod tests {
// Initiate the background processors to watch each node.
let data_dir = nodes[0].persister.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
let scorer = Arc::new(Mutex::new(test_utils::TestScorer::with_penalty(0)));
let router = DefaultRouter::new(Arc::clone(&nodes[0].network_graph), Arc::clone(&nodes[0].logger), random_seed_bytes);
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, scorer, Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
let invoice_payer = Arc::new(InvoicePayer::new(Arc::clone(&nodes[0].node), router, Arc::clone(&nodes[0].scorer), Arc::clone(&nodes[0].logger), |_: &_| {}, RetryAttempts(2)));
let event_handler = Arc::clone(&invoice_payer);
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone());
let bg_processor = BackgroundProcessor::start(persister, event_handler, nodes[0].chain_monitor.clone(), nodes[0].node.clone(), nodes[0].net_graph_msg_handler.clone(), nodes[0].peer_manager.clone(), nodes[0].logger.clone(), Some(nodes[0].scorer.clone()));
assert!(bg_processor.stop().is_ok());
}
}
8 changes: 8 additions & 0 deletions lightning/src/routing/scoring.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,14 @@ pub trait LockableScore<'a> {
fn lock(&'a self) -> Self::Locked;
}

/// Refers to a scorer that is accessible under lock and also writeable to disk
///
/// We need this trait to be able to pass in a scorer to `lightning-background-processor` that will enable us to
/// use the Persister to persist it.
pub trait WriteableScore<'a>: LockableScore<'a> + Writeable {}

impl<'a, T> WriteableScore<'a> for T where T: LockableScore<'a> + Writeable {}

/// (C-not exported)
impl<'a, T: 'a + Score> LockableScore<'a> for Mutex<T> {
type Locked = MutexGuard<'a, T>;
Expand Down
21 changes: 16 additions & 5 deletions lightning/src/util/persist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use core::ops::Deref;
use bitcoin::hashes::hex::ToHex;
use io::{self};
use routing::scoring::WriteableScore;

use crate::{chain::{keysinterface::{Sign, KeysInterface}, self, transaction::{OutPoint}, chaininterface::{BroadcasterInterface, FeeEstimator}, chainmonitor::{Persist, MonitorUpdateId}, channelmonitor::{ChannelMonitor, ChannelMonitorUpdate}}, ln::channelmanager::ChannelManager, routing::network_graph::NetworkGraph};
use super::{logger::Logger, ser::Writeable};
Expand All @@ -24,37 +25,47 @@ pub trait KVStorePersister {
fn persist<W: Writeable>(&self, key: &str, object: &W) -> io::Result<()>;
}

/// Trait that handles persisting a [`ChannelManager`] and [`NetworkGraph`] to disk.
pub trait Persister<Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref>
/// Trait that handles persisting a [`ChannelManager`], [`NetworkGraph`], and [`WriteableScore`] to disk.
pub trait Persister<'a, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref, S>
where M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
S: WriteableScore<'a>,
{
/// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), io::Error>;

/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error>;

/// Persist the given [`WriteableScore`] to disk, returning an error if persistence failed.
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error>;
}

impl<A: KVStorePersister, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref> Persister<Signer, M, T, K, F, L> for A
impl<'a, A: KVStorePersister, Signer: Sign, M: Deref, T: Deref, K: Deref, F: Deref, L: Deref, S> Persister<'a, Signer, M, T, K, F, L, S> for A
where M::Target: 'static + chain::Watch<Signer>,
T::Target: 'static + BroadcasterInterface,
K::Target: 'static + KeysInterface<Signer = Signer>,
F::Target: 'static + FeeEstimator,
L::Target: 'static + Logger,
S: WriteableScore<'a>,
{
/// Persist the given ['ChannelManager'] to disk, returning an error if persistence failed.
/// Persist the given ['ChannelManager'] to disk with the name "manager", returning an error if persistence failed.
fn persist_manager(&self, channel_manager: &ChannelManager<Signer, M, T, K, F, L>) -> Result<(), io::Error> {
self.persist("manager", channel_manager)
}

/// Persist the given [`NetworkGraph`] to disk, returning an error if persistence failed.
/// Persist the given [`NetworkGraph`] to disk with the name "network_graph", returning an error if persistence failed.
fn persist_graph(&self, network_graph: &NetworkGraph) -> Result<(), io::Error> {
self.persist("network_graph", network_graph)
}

/// Persist the given [`WriteableScore`] to disk with name "scorer", returning an error if persistence failed.
fn persist_scorer(&self, scorer: &S) -> Result<(), io::Error> {
self.persist("scorer", &scorer)
}
}

impl<ChannelSigner: Sign, K: KVStorePersister> Persist<ChannelSigner> for K {
Expand Down