diff --git a/doc/configuration/mempool.md b/doc/configuration/mempool.md index b794f76791..ff3b146525 100644 --- a/doc/configuration/mempool.md +++ b/doc/configuration/mempool.md @@ -11,7 +11,15 @@ as follow: mempool: pool_max_entries: 10000 log_max_entries: 100000 + persistent_log_path: fragments.json ``` * `pool_max_entries`: (optional, default is 10000). Set a maximum size of the mempool * `log_max_entries`: (optional, default is 100000). Set a maximum size of fragment logs +* `persistent_log_path`: (optional, disabled by default) log all incoming fragments to a designated file + +## Persistent logs + +A persistent log is a collection of records comprised of a UNIX timestamp of when a fragment was +registereed by the mempool followed by the hex-encoded fragment body. This log is a line-delimited +JSON stream. \ No newline at end of file diff --git a/jormungandr-lib/src/interfaces/config/mempool.rs b/jormungandr-lib/src/interfaces/config/mempool.rs index 9f2e8a1d83..84b723d9cb 100644 --- a/jormungandr-lib/src/interfaces/config/mempool.rs +++ b/jormungandr-lib/src/interfaces/config/mempool.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use serde::{Deserialize, Serialize}; #[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)] @@ -15,6 +17,9 @@ pub struct Mempool { /// maximum number of entries in the fragment logs #[serde(default)] pub log_max_entries: LogMaxEntries, + /// path to the persistent log of all incoming fragments + #[serde(default)] + pub persistent_log_path: Option, } impl Default for PoolMaxEntries { @@ -34,6 +39,7 @@ impl Default for Mempool { Mempool { pool_max_entries: PoolMaxEntries::default(), log_max_entries: LogMaxEntries::default(), + persistent_log_path: None, } } } diff --git a/jormungandr-lib/src/interfaces/fragment_log_persistent.rs b/jormungandr-lib/src/interfaces/fragment_log_persistent.rs new file mode 100644 index 0000000000..a8c59574d7 --- /dev/null +++ b/jormungandr-lib/src/interfaces/fragment_log_persistent.rs @@ -0,0 +1,16 @@ +use crate::interfaces::FragmentDef; +use crate::time::SecondsSinceUnixEpoch; + +use chain_impl_mockchain::fragment::Fragment; + +use serde::{Deserialize, Serialize}; + +/// Represents a persistent fragments log entry. +#[derive(Debug, Serialize, Deserialize)] +pub struct PersistentFragmentLog { + /// the time this fragment was registered and accepted by the pool + pub time: SecondsSinceUnixEpoch, + /// full hex-encoded fragment body + #[serde(with = "FragmentDef")] + pub fragment: Fragment, +} diff --git a/jormungandr-lib/src/interfaces/mod.rs b/jormungandr-lib/src/interfaces/mod.rs index 413bd42cc7..734478357c 100644 --- a/jormungandr-lib/src/interfaces/mod.rs +++ b/jormungandr-lib/src/interfaces/mod.rs @@ -8,6 +8,7 @@ mod committee; mod config; mod fragment; mod fragment_log; +mod fragment_log_persistent; mod leadership_log; mod linear_fee; mod old_address; @@ -41,6 +42,7 @@ pub use self::committee::CommitteeIdDef; pub use self::config::*; pub use self::fragment::FragmentDef; pub use self::fragment_log::{FragmentLog, FragmentOrigin, FragmentStatus}; +pub use self::fragment_log_persistent::PersistentFragmentLog; pub use self::leadership_log::{ EnclaveLeaderId, LeadershipLog, LeadershipLogId, LeadershipLogStatus, }; diff --git a/jormungandr/src/fragment/pool.rs b/jormungandr/src/fragment/pool.rs index 92193ab69c..53e7fd3b72 100644 --- a/jormungandr/src/fragment/pool.rs +++ b/jormungandr/src/fragment/pool.rs @@ -11,13 +11,19 @@ use chain_core::property::Fragment as _; use chain_impl_mockchain::{fragment::Contents, transaction::Transaction}; use futures::channel::mpsc::SendError; use futures::sink::SinkExt; -use jormungandr_lib::interfaces::{FragmentLog, FragmentOrigin, FragmentStatus}; +use jormungandr_lib::{ + interfaces::{FragmentLog, FragmentOrigin, FragmentStatus, PersistentFragmentLog}, + time::SecondsSinceUnixEpoch, +}; +use std::fs::File; +use std::io::Write; use thiserror::Error; pub struct Pools { logs: Logs, pools: Vec, network_msg_box: MessageBox, + persistent_log: Option, } #[derive(Debug, Error)] @@ -32,6 +38,7 @@ impl Pools { n_pools: usize, logs: Logs, network_msg_box: MessageBox, + persistent_log: Option, ) -> Self { let pools = (0..=n_pools) .map(|_| internal::Pool::new(max_entries)) @@ -40,6 +47,7 @@ impl Pools { logs, pools, network_msg_box, + persistent_log, } } @@ -68,6 +76,21 @@ impl Pools { .filter(|(_, exists_in_logs)| !exists_in_logs) .map(|(fragment, _)| fragment); + if let Some(mut persistent_log) = self.persistent_log.as_mut() { + for fragment in new_fragments.clone() { + let entry = PersistentFragmentLog { + time: SecondsSinceUnixEpoch::now(), + fragment, + }; + if let Err(err) = serde_json::to_writer(&mut persistent_log, &entry) { + tracing::error!(err = %err, "failed to write persistent fragment log entry"); + } + if let Err(err) = persistent_log.write_all("\n".as_bytes()) { + tracing::error!(err = %err, "failed to write persistent fragment log delimiter"); + } + } + } + let mut max_added = 0; for (i, pool) in self.pools.iter_mut().enumerate() { diff --git a/jormungandr/src/fragment/process.rs b/jormungandr/src/fragment/process.rs index f6258c3fdd..66d7dad7e9 100644 --- a/jormungandr/src/fragment/process.rs +++ b/jormungandr/src/fragment/process.rs @@ -9,6 +9,7 @@ use crate::{ }; use std::collections::HashMap; +use std::fs::File; use thiserror::Error; use tokio_stream::StreamExt; @@ -47,12 +48,14 @@ impl Process { service_info: TokioServiceInfo, stats_counter: StatsCounter, mut input: MessageQueue, + persistent_log: Option, ) -> Result<(), Error> { let mut pool = Pools::new( self.pool_max_entries, n_pools, self.logs, self.network_msg_box, + persistent_log, ); async move { diff --git a/jormungandr/src/main.rs b/jormungandr/src/main.rs index 44f5674d44..0eeb37063b 100644 --- a/jormungandr/src/main.rs +++ b/jormungandr/src/main.rs @@ -71,6 +71,7 @@ pub struct BootstrappedNode { rest_context: Option, services: Services, initial_peers: Vec, + persistent_fragment_log: Option, _logger_guards: Vec, } @@ -276,9 +277,16 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E bootstrapped_node.settings.mempool.log_max_entries.into(), network_msgbox.clone(), ); + let persistent_fragment_log = bootstrapped_node.persistent_fragment_log; services.spawn_try_future("fragment", move |info| { - process.start(n_pools, info, stats_counter, fragment_queue) + process.start( + n_pools, + info, + stats_counter, + fragment_queue, + persistent_fragment_log, + ) }); }; @@ -347,6 +355,7 @@ fn bootstrap(initialized_node: InitializedNode) -> Result Result, pub services: Services, pub cancellation_token: CancellationToken, + pub persistent_fragment_log: Option, pub _logger_guards: Vec, } @@ -592,6 +603,20 @@ fn initialize_node() -> Result { let settings = raw_settings.try_into_settings()?; + let persistent_fragment_log = settings + .mempool + .persistent_log_path + .as_ref() + .map(|path| { + std::fs::OpenOptions::new() + .append(true) + .create(true) + .read(false) + .open(path) + }) + .transpose() + .map_err(start_up::Error::PersistentFragmentLog)?; + let storage = start_up::prepare_storage(&settings)?; if exit_after_storage_setup { tracing::info!("Exiting after successful storage setup"); @@ -676,6 +701,7 @@ fn initialize_node() -> Result { rest_context, services, cancellation_token, + persistent_fragment_log, _logger_guards, }) } diff --git a/jormungandr/src/start_up/error.rs b/jormungandr/src/start_up/error.rs index 7518f8592a..af94e750c8 100644 --- a/jormungandr/src/start_up/error.rs +++ b/jormungandr/src/start_up/error.rs @@ -18,6 +18,8 @@ pub enum ErrorKind { #[derive(Debug, Error)] pub enum Error { + #[error("Cannot open the persistent fragments log for writing")] + PersistentFragmentLog(#[source] io::Error), #[error("Unable to initialize the logger")] LoggingInitializationError(#[from] logging::Error), #[error("Error in the overall configuration of the node")] @@ -92,6 +94,7 @@ impl Error { Error::ExplorerBootstrapError { .. } => 11, Error::ServiceTerminatedWithError { .. } => 12, Error::DiagnosticError { .. } => 13, + Error::PersistentFragmentLog(_) => 14, } } } diff --git a/testing/jormungandr-integration-tests/src/jormungandr/fragments.rs b/testing/jormungandr-integration-tests/src/jormungandr/fragments.rs index c5bc5ee949..8bf988ba5b 100644 --- a/testing/jormungandr-integration-tests/src/jormungandr/fragments.rs +++ b/testing/jormungandr-integration-tests/src/jormungandr/fragments.rs @@ -31,6 +31,7 @@ pub fn send_all_fragments() { .with_mempool(Mempool { pool_max_entries: 1_000_000usize.into(), log_max_entries: 1_000_000usize.into(), + persistent_log_path: None, }), ) .unwrap(); diff --git a/testing/jormungandr-integration-tests/src/jormungandr/transactions.rs b/testing/jormungandr-integration-tests/src/jormungandr/transactions.rs index b79e7b5a57..a6c7b59d9e 100644 --- a/testing/jormungandr-integration-tests/src/jormungandr/transactions.rs +++ b/testing/jormungandr-integration-tests/src/jormungandr/transactions.rs @@ -23,6 +23,7 @@ pub fn accounts_funds_are_updated_after_transaction() { .with_mempool(Mempool { pool_max_entries: 1_000_000usize.into(), log_max_entries: 1_000_000usize.into(), + persistent_log_path: None, }), ) .unwrap();