Skip to content

Commit

Permalink
Add ChannelDataPersister trait and point SMCM to it.
Browse files Browse the repository at this point in the history
Intended to be a simple way for users to know where and how to put their
backup and persistence logic.
  • Loading branch information
valentinewallace committed Sep 2, 2020
1 parent 3defcc8 commit 595db8a
Show file tree
Hide file tree
Showing 14 changed files with 304 additions and 55 deletions.
5 changes: 3 additions & 2 deletions Cargo.toml
@@ -1,8 +1,9 @@
[workspace]

members = [
"lightning",
"lightning-net-tokio",
"lightning",
"lightning-net-tokio",
"lightning-data-persister",
]

# Our tests do actual crypo and lots of work, the tradeoff for -O1 is well worth it
Expand Down
14 changes: 9 additions & 5 deletions fuzz/src/chanmon_consistency.rs
Expand Up @@ -34,6 +34,7 @@ use lightning::chain::transaction::OutPoint;
use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,ChainListener,FeeEstimator,ChainWatchInterfaceUtil,ChainWatchInterface};
use lightning::chain::keysinterface::{KeysInterface, InMemoryChannelKeys};
use lightning::ln::channelmonitor;
use lightning::ln::data_persister::ChannelDataPersister;
use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdateErr, MonitorEvent};
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret, ChannelManagerReadArgs};
use lightning::ln::features::{ChannelFeatures, InitFeatures, NodeFeatures};
Expand All @@ -48,6 +49,7 @@ use lightning::routing::router::{Route, RouteHop};


use utils::test_logger;
use utils::test_data_persister::TestChanDataPersister;

use bitcoin::secp256k1::key::{PublicKey,SecretKey};
use bitcoin::secp256k1::Secp256k1;
Expand Down Expand Up @@ -84,7 +86,7 @@ impl Writer for VecWriter {

struct TestChannelMonitor {
pub logger: Arc<dyn Logger>,
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<dyn ChainWatchInterface>>>,
pub simple_monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<dyn ChainWatchInterface>, Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>>>,
pub update_ret: Mutex<Result<(), channelmonitor::ChannelMonitorUpdateErr>>,
// If we reload a node with an old copy of ChannelMonitors, the ChannelManager deserialization
// logic will automatically force-close our channels for us (as we don't have an up-to-date
Expand All @@ -95,9 +97,9 @@ struct TestChannelMonitor {
pub should_update_manager: atomic::AtomicBool,
}
impl TestChannelMonitor {
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>) -> Self {
pub fn new(chain_monitor: Arc<dyn chaininterface::ChainWatchInterface>, broadcaster: Arc<TestBroadcaster>, logger: Arc<dyn Logger>, feeest: Arc<FuzzEstimator>, data_persister: Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>) -> Self {
Self {
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest)),
simple_monitor: Arc::new(channelmonitor::SimpleManyChannelMonitor::new(chain_monitor, broadcaster, logger.clone(), feeest, data_persister)),
logger,
update_ret: Mutex::new(Ok(())),
latest_monitors: Mutex::new(HashMap::new()),
Expand Down Expand Up @@ -193,7 +195,8 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
($node_id: expr) => { {
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin));
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone()));
let data_persister = Arc::new(TestChanDataPersister{});
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone(), data_persister.clone()));

let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) });
let mut config = UserConfig::default();
Expand All @@ -209,7 +212,8 @@ pub fn do_test<Out: test_logger::Output>(data: &[u8], out: Out) {
($ser: expr, $node_id: expr, $old_monitors: expr) => { {
let logger: Arc<dyn Logger> = Arc::new(test_logger::TestLogger::new($node_id.to_string(), out.clone()));
let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin));
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone()));
let data_persister = Arc::new(TestChanDataPersister{});
let monitor = Arc::new(TestChannelMonitor::new(watch.clone(), broadcast.clone(), logger.clone(), fee_est.clone(), data_persister.clone()));

let keys_manager = Arc::new(KeyProvider { node_id: $node_id, rand_bytes_id: atomic::AtomicU8::new(0) });
let mut config = UserConfig::default();
Expand Down
11 changes: 7 additions & 4 deletions fuzz/src/full_stack.rs
Expand Up @@ -30,6 +30,7 @@ use lightning::chain::chaininterface::{BroadcasterInterface,ConfirmationTarget,C
use lightning::chain::transaction::OutPoint;
use lightning::chain::keysinterface::{InMemoryChannelKeys, KeysInterface};
use lightning::ln::channelmonitor;
use lightning::ln::data_persister::ChannelDataPersister;
use lightning::ln::channelmanager::{ChannelManager, PaymentHash, PaymentPreimage, PaymentSecret};
use lightning::ln::peer_handler::{MessageHandler,PeerManager,SocketDescriptor};
use lightning::routing::router::get_route;
Expand All @@ -40,6 +41,7 @@ use lightning::util::logger::Logger;
use lightning::util::config::UserConfig;

use utils::test_logger;
use utils::test_data_persister::TestChanDataPersister;

use bitcoin::secp256k1::key::{PublicKey,SecretKey};
use bitcoin::secp256k1::Secp256k1;
Expand Down Expand Up @@ -145,14 +147,14 @@ impl<'a> std::hash::Hash for Peer<'a> {

type ChannelMan = ChannelManager<
EnforcingChannelKeys,
Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>,
Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>, Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>>>,
Arc<TestBroadcaster>, Arc<KeyProvider>, Arc<FuzzEstimator>, Arc<dyn Logger>>;
type PeerMan<'a> = PeerManager<Peer<'a>, Arc<ChannelMan>, Arc<NetGraphMsgHandler<Arc<ChainWatchInterfaceUtil>, Arc<dyn Logger>>>, Arc<dyn Logger>>;

struct MoneyLossDetector<'a> {
manager: Arc<ChannelMan>,
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<
OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>,
OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>, Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>>>,
handler: PeerMan<'a>,

peers: &'a RefCell<[bool; 256]>,
Expand All @@ -166,7 +168,7 @@ struct MoneyLossDetector<'a> {
impl<'a> MoneyLossDetector<'a> {
pub fn new(peers: &'a RefCell<[bool; 256]>,
manager: Arc<ChannelMan>,
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>>>,
monitor: Arc<channelmonitor::SimpleManyChannelMonitor<OutPoint, EnforcingChannelKeys, Arc<TestBroadcaster>, Arc<FuzzEstimator>, Arc<dyn Logger>, Arc<ChainWatchInterfaceUtil>, Arc<dyn ChannelDataPersister<Keys=EnforcingChannelKeys>>>>,
handler: PeerMan<'a>) -> Self {
MoneyLossDetector {
manager,
Expand Down Expand Up @@ -337,7 +339,8 @@ pub fn do_test(data: &[u8], logger: &Arc<dyn Logger>) {

let watch = Arc::new(ChainWatchInterfaceUtil::new(Network::Bitcoin));
let broadcast = Arc::new(TestBroadcaster{});
let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone()));
let data_persister: Arc<dyn ChannelDataPersister<Keys = EnforcingChannelKeys>> = Arc::new(TestChanDataPersister{});
let monitor = Arc::new(channelmonitor::SimpleManyChannelMonitor::new(watch.clone(), broadcast.clone(), Arc::clone(&logger), fee_est.clone(), data_persister.clone()));

let keys_manager = Arc::new(KeyProvider { node_secret: our_network_key.clone(), counter: AtomicU64::new(0) });
let mut config = UserConfig::default();
Expand Down
1 change: 1 addition & 0 deletions fuzz/src/utils/mod.rs
Expand Up @@ -8,3 +8,4 @@
// licenses.

pub mod test_logger;
pub mod test_data_persister;
23 changes: 23 additions & 0 deletions fuzz/src/utils/test_data_persister.rs
@@ -0,0 +1,23 @@
use lightning::ln::data_persister::ChannelDataPersister;
use lightning::ln::channelmonitor;
use lightning::chain::transaction::OutPoint;
use lightning::util::enforcing_trait_impls::EnforcingChannelKeys;

use std::collections::HashMap;

pub struct TestChanDataPersister {}
impl ChannelDataPersister for TestChanDataPersister {
type Keys = EnforcingChannelKeys;

fn persist_channel_data(&self, _funding_txo: OutPoint, _data: &channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
Ok(())
}

fn update_channel_data(&self, _funding_txo: OutPoint, _update: &channelmonitor::ChannelMonitorUpdate, _data: &channelmonitor::ChannelMonitor<EnforcingChannelKeys>) -> Result<(), channelmonitor::ChannelMonitorUpdateErr> {
Ok(())
}

fn load_channel_data(&self) -> Result<HashMap<OutPoint, channelmonitor::ChannelMonitor<EnforcingChannelKeys>>, channelmonitor::ChannelMonitorUpdateErr> {
Ok(HashMap::new())
}
}
13 changes: 13 additions & 0 deletions lightning-data-persister/Cargo.toml
@@ -0,0 +1,13 @@
[package]
name = "lightning-data-persister"
version = "0.0.1"
authors = ["Valentine Wallace"]
license = "Apache-2.0"
edition = "2018"
description = """
Utilities to manage channel data persistence and retrieval.
"""

[dependencies]
bitcoin = "0.23"
lightning = { version = "0.0.11", path = "../lightning" }
124 changes: 124 additions & 0 deletions lightning-data-persister/src/lib.rs
@@ -0,0 +1,124 @@
use lightning::chain::keysinterface::ChannelKeys;
use lightning::ln::data_persister::ChannelDataPersister;
use lightning::ln::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, ChannelMonitorUpdateErr};
use lightning::util::ser::{Writeable, Readable};
use lightning::chain::transaction::OutPoint;
use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::hashes::hex::{ToHex, FromHex};
use std::collections::HashMap;
use std::fs;
use std::io::{Error, ErrorKind, Cursor};
use std::marker::PhantomData;

/// LinuxPersister can persist channel data on disk on Linux machines, where
/// each channel's data is stored in a file named after its outpoint.
pub struct LinuxPersister<ChanSigner: ChannelKeys + Readable + Writeable> {
path_to_channel_data: String,
phantom: PhantomData<ChanSigner>, // TODO: is there a way around this?
}

impl<ChanSigner: ChannelKeys + Readable + Writeable> LinuxPersister<ChanSigner> {
/// Initialize a new LinuxPersister and set the path to the individual channels'
/// files.
pub fn new(path_to_channel_data: String) -> Self {
return Self {
path_to_channel_data,
phantom: PhantomData,
}
}

fn get_full_filepath(&self, funding_txo: OutPoint) -> String {
format!("{}/{}_{}", self.path_to_channel_data, funding_txo.txid.to_hex(), funding_txo.index)
}

fn write_channel_data(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<ChanSigner>) -> std::io::Result<()> {
// Do a crazy dance with lots of fsync()s to be overly cautious here...
// We never want to end up in a state where we've lost the old data, or end up using the
// old data on power loss after we've returned
// Note that this actually *isn't* enough (at least on Linux)! We need to fsync an fd with
// the containing dir, but Rust doesn't let us do that directly, sadly. TODO: Fix this with
// the libc crate!
let filename = self.get_full_filepath(funding_txo);
let tmp_filename = filename.clone() + ".tmp";

{
let mut f = fs::File::create(&tmp_filename)?;
monitor.write_for_disk(&mut f)?;
f.sync_all()?;
}
// We don't need to create a backup if didn't already have the file, but in any other case
// try to create the backup and expect failure on fs::copy() if eg there's a perms issue.
let need_bk = match fs::metadata(&filename) {
Ok(data) => {
if !data.is_file() { return Err(Error::new(ErrorKind::InvalidInput, "Filename given was not a file")); }
true
},
Err(e) => match e.kind() {
std::io::ErrorKind::NotFound => false,
_ => true,
}
};
let bk_filename = filename.clone() + ".bk";
if need_bk {
fs::copy(&filename, &bk_filename)?;
{
let f = fs::File::open(&bk_filename)?;
f.sync_all()?;
}
}
fs::rename(&tmp_filename, &filename)?;
{
let f = fs::File::open(&filename)?;
f.sync_all()?;
}
if need_bk {
fs::remove_file(&bk_filename)?;
}
Ok(())
}
}

impl<ChanSigner: ChannelKeys + Readable + Writeable + Send + Sync> ChannelDataPersister for LinuxPersister<ChanSigner> {
type Keys = ChanSigner;

fn persist_channel_data(&self, funding_txo: OutPoint, monitor: &ChannelMonitor<Self::Keys>) -> Result<(), ChannelMonitorUpdateErr> {
match self.write_channel_data(funding_txo, monitor) {
Ok(_) => Ok(()),
Err(_) => Err(ChannelMonitorUpdateErr::TemporaryFailure)
}
}

fn update_channel_data(&self, funding_txo: OutPoint, _update: &ChannelMonitorUpdate, monitor: &ChannelMonitor<ChanSigner>) -> Result<(), ChannelMonitorUpdateErr> {
match self.write_channel_data(funding_txo, monitor) {
Ok(_) => Ok(()),
Err(_) => Err(ChannelMonitorUpdateErr::TemporaryFailure)
}
}

fn load_channel_data(&self) -> Result<HashMap<OutPoint, ChannelMonitor<ChanSigner>>, ChannelMonitorUpdateErr> {
let mut res = HashMap::new();
for file_option in fs::read_dir(&self.path_to_channel_data).unwrap() {
let mut loaded = false;
let file = file_option.unwrap();
if let Some(filename) = file.file_name().to_str() {
if filename.is_ascii() && filename.len() > 65 {
if let Ok(txid) = Txid::from_hex(filename.split_at(64).0) {
if let Ok(index) = filename.split_at(65).1.split('.').next().unwrap().parse() {
if let Ok(contents) = fs::read(&file.path()) {
if let Ok((_, loaded_monitor)) = <(BlockHash, ChannelMonitor<ChanSigner>)>::read(&mut Cursor::new(&contents)) {
res.insert(OutPoint { txid, index }, loaded_monitor);
loaded = true;
}
}
}
}
}
}
if !loaded {
// TODO(val): this should prob error not just print something
println!("WARNING: Failed to read one of the channel monitor storage files! Check perms!");
}
}
Ok(res)
}
}
3 changes: 2 additions & 1 deletion lightning-net-tokio/src/lib.rs
Expand Up @@ -35,7 +35,8 @@
//! type FeeEstimator = dyn lightning::chain::chaininterface::FeeEstimator;
//! type Logger = dyn lightning::util::logger::Logger;
//! type ChainWatchInterface = dyn lightning::chain::chaininterface::ChainWatchInterface;
//! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor<lightning::chain::transaction::OutPoint, lightning::chain::keysinterface::InMemoryChannelKeys, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<ChainWatchInterface>>;
//! type DataPersister = dyn lightning::ln::data_persister::ChannelDataPersister<Keys = lightning::chain::keysinterface::InMemoryChannelKeys>;
//! type ChannelMonitor = lightning::ln::channelmonitor::SimpleManyChannelMonitor<lightning::chain::transaction::OutPoint, lightning::chain::keysinterface::InMemoryChannelKeys, Arc<TxBroadcaster>, Arc<FeeEstimator>, Arc<Logger>, Arc<ChainWatchInterface>, Arc<DataPersister>>;
//! type ChannelManager = lightning::ln::channelmanager::SimpleArcChannelManager<ChannelMonitor, TxBroadcaster, FeeEstimator, Logger>;
//! type PeerManager = lightning::ln::peer_handler::SimpleArcPeerManager<lightning_net_tokio::SocketDescriptor, ChannelMonitor, TxBroadcaster, FeeEstimator, ChainWatchInterface, Logger>;
//!
Expand Down

0 comments on commit 595db8a

Please sign in to comment.