Skip to content

Commit

Permalink
implement persistent fragment log
Browse files Browse the repository at this point in the history
  • Loading branch information
eugene-babichenko committed Apr 30, 2021
1 parent db43812 commit efeff16
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 2 deletions.
8 changes: 8 additions & 0 deletions doc/configuration/mempool.md
Expand Up @@ -11,7 +11,15 @@ as follow:
mempool:
pool_max_entries: 10000
log_max_entries: 100000
persistent_log_path: fragments.json
```

* `pool_max_entries`: (optional, default is 10000). Set a maximum size of the mempool
* `log_max_entries`: (optional, default is 100000). Set a maximum size of fragment logs
* `persistent_log_path`: (optional, disabled by default) log all incoming fragments to a designated file

## Persistent logs

A persistent log is a collection of records comprised of a UNIX timestamp of when a fragment was
registereed by the mempool followed by the hex-encoded fragment body. This log is a line-delimited
JSON stream.
6 changes: 6 additions & 0 deletions jormungandr-lib/src/interfaces/config/mempool.rs
@@ -1,3 +1,5 @@
use std::path::PathBuf;

use serde::{Deserialize, Serialize};

#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Hash, Deserialize, Serialize)]
Expand All @@ -15,6 +17,9 @@ pub struct Mempool {
/// maximum number of entries in the fragment logs
#[serde(default)]
pub log_max_entries: LogMaxEntries,
/// path to the persistent log of all incoming fragments
#[serde(default)]
pub persistent_log_path: Option<PathBuf>,
}

impl Default for PoolMaxEntries {
Expand All @@ -34,6 +39,7 @@ impl Default for Mempool {
Mempool {
pool_max_entries: PoolMaxEntries::default(),
log_max_entries: LogMaxEntries::default(),
persistent_log_path: None,
}
}
}
Expand Down
16 changes: 16 additions & 0 deletions jormungandr-lib/src/interfaces/fragment_log_persistent.rs
@@ -0,0 +1,16 @@
use crate::interfaces::FragmentDef;
use crate::time::SecondsSinceUnixEpoch;

use chain_impl_mockchain::fragment::Fragment;

use serde::{Deserialize, Serialize};

/// Represents a persistent fragments log entry.
#[derive(Debug, Serialize, Deserialize)]
pub struct PersistentFragmentLog {
/// the time this fragment was registered and accepted by the pool
pub time: SecondsSinceUnixEpoch,
/// full hex-encoded fragment body
#[serde(with = "FragmentDef")]
pub fragment: Fragment,
}
2 changes: 2 additions & 0 deletions jormungandr-lib/src/interfaces/mod.rs
Expand Up @@ -8,6 +8,7 @@ mod committee;
mod config;
mod fragment;
mod fragment_log;
mod fragment_log_persistent;
mod leadership_log;
mod linear_fee;
mod old_address;
Expand Down Expand Up @@ -41,6 +42,7 @@ pub use self::committee::CommitteeIdDef;
pub use self::config::*;
pub use self::fragment::FragmentDef;
pub use self::fragment_log::{FragmentLog, FragmentOrigin, FragmentStatus};
pub use self::fragment_log_persistent::PersistentFragmentLog;
pub use self::leadership_log::{
EnclaveLeaderId, LeadershipLog, LeadershipLogId, LeadershipLogStatus,
};
Expand Down
25 changes: 24 additions & 1 deletion jormungandr/src/fragment/pool.rs
Expand Up @@ -11,13 +11,19 @@ use chain_core::property::Fragment as _;
use chain_impl_mockchain::{fragment::Contents, transaction::Transaction};
use futures::channel::mpsc::SendError;
use futures::sink::SinkExt;
use jormungandr_lib::interfaces::{FragmentLog, FragmentOrigin, FragmentStatus};
use jormungandr_lib::{
interfaces::{FragmentLog, FragmentOrigin, FragmentStatus, PersistentFragmentLog},
time::SecondsSinceUnixEpoch,
};
use std::fs::File;
use std::io::Write;
use thiserror::Error;

pub struct Pools {
logs: Logs,
pools: Vec<internal::Pool>,
network_msg_box: MessageBox<NetworkMsg>,
persistent_log: Option<File>,
}

#[derive(Debug, Error)]
Expand All @@ -32,6 +38,7 @@ impl Pools {
n_pools: usize,
logs: Logs,
network_msg_box: MessageBox<NetworkMsg>,
persistent_log: Option<File>,
) -> Self {
let pools = (0..=n_pools)
.map(|_| internal::Pool::new(max_entries))
Expand All @@ -40,6 +47,7 @@ impl Pools {
logs,
pools,
network_msg_box,
persistent_log,
}
}

Expand Down Expand Up @@ -68,6 +76,21 @@ impl Pools {
.filter(|(_, exists_in_logs)| !exists_in_logs)
.map(|(fragment, _)| fragment);

if let Some(mut persistent_log) = self.persistent_log.as_mut() {
for fragment in new_fragments.clone() {
let entry = PersistentFragmentLog {
time: SecondsSinceUnixEpoch::now(),
fragment,
};
if let Err(err) = serde_json::to_writer(&mut persistent_log, &entry) {
tracing::error!(err = %err, "failed to write persistent fragment log entry");
}
if let Err(err) = persistent_log.write_all("\n".as_bytes()) {
tracing::error!(err = %err, "failed to write persistent fragment log delimiter");
}
}
}

let mut max_added = 0;

for (i, pool) in self.pools.iter_mut().enumerate() {
Expand Down
3 changes: 3 additions & 0 deletions jormungandr/src/fragment/process.rs
Expand Up @@ -9,6 +9,7 @@ use crate::{
};

use std::collections::HashMap;
use std::fs::File;

use thiserror::Error;
use tokio_stream::StreamExt;
Expand Down Expand Up @@ -47,12 +48,14 @@ impl Process {
service_info: TokioServiceInfo,
stats_counter: StatsCounter,
mut input: MessageQueue<TransactionMsg>,
persistent_log: Option<File>,
) -> Result<(), Error> {
let mut pool = Pools::new(
self.pool_max_entries,
n_pools,
self.logs,
self.network_msg_box,
persistent_log,
);

async move {
Expand Down
28 changes: 27 additions & 1 deletion jormungandr/src/main.rs
Expand Up @@ -71,6 +71,7 @@ pub struct BootstrappedNode {
rest_context: Option<rest::ContextLock>,
services: Services,
initial_peers: Vec<topology::Peer>,
persistent_fragment_log: Option<std::fs::File>,
_logger_guards: Vec<WorkerGuard>,
}

Expand Down Expand Up @@ -276,9 +277,16 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E
bootstrapped_node.settings.mempool.log_max_entries.into(),
network_msgbox.clone(),
);
let persistent_fragment_log = bootstrapped_node.persistent_fragment_log;

services.spawn_try_future("fragment", move |info| {
process.start(n_pools, info, stats_counter, fragment_queue)
process.start(
n_pools,
info,
stats_counter,
fragment_queue,
persistent_fragment_log,
)
});
};

Expand Down Expand Up @@ -347,6 +355,7 @@ fn bootstrap(initialized_node: InitializedNode) -> Result<BootstrappedNode, star
rest_context,
mut services,
cancellation_token,
persistent_fragment_log,
_logger_guards,
} = initialized_node;

Expand Down Expand Up @@ -378,6 +387,7 @@ fn bootstrap(initialized_node: InitializedNode) -> Result<BootstrappedNode, star
rest_context,
services,
initial_peers,
persistent_fragment_log,
_logger_guards,
})
}
Expand Down Expand Up @@ -503,6 +513,7 @@ pub struct InitializedNode {
pub rest_context: Option<rest::ContextLock>,
pub services: Services,
pub cancellation_token: CancellationToken,
pub persistent_fragment_log: Option<std::fs::File>,
pub _logger_guards: Vec<WorkerGuard>,
}

Expand Down Expand Up @@ -592,6 +603,20 @@ fn initialize_node() -> Result<InitializedNode, start_up::Error> {

let settings = raw_settings.try_into_settings()?;

let persistent_fragment_log = settings
.mempool
.persistent_log_path
.as_ref()
.map(|path| {
std::fs::OpenOptions::new()
.append(true)
.create(true)
.read(false)
.open(path)
})
.transpose()
.map_err(start_up::Error::PersistentFragmentLog)?;

let storage = start_up::prepare_storage(&settings)?;
if exit_after_storage_setup {
tracing::info!("Exiting after successful storage setup");
Expand Down Expand Up @@ -676,6 +701,7 @@ fn initialize_node() -> Result<InitializedNode, start_up::Error> {
rest_context,
services,
cancellation_token,
persistent_fragment_log,
_logger_guards,
})
}
Expand Down
3 changes: 3 additions & 0 deletions jormungandr/src/start_up/error.rs
Expand Up @@ -18,6 +18,8 @@ pub enum ErrorKind {

#[derive(Debug, Error)]
pub enum Error {
#[error("Cannot open the persistent fragments log for writing")]
PersistentFragmentLog(#[source] io::Error),
#[error("Unable to initialize the logger")]
LoggingInitializationError(#[from] logging::Error),
#[error("Error in the overall configuration of the node")]
Expand Down Expand Up @@ -92,6 +94,7 @@ impl Error {
Error::ExplorerBootstrapError { .. } => 11,
Error::ServiceTerminatedWithError { .. } => 12,
Error::DiagnosticError { .. } => 13,
Error::PersistentFragmentLog(_) => 14,
}
}
}
Expand Up @@ -31,6 +31,7 @@ pub fn send_all_fragments() {
.with_mempool(Mempool {
pool_max_entries: 1_000_000usize.into(),
log_max_entries: 1_000_000usize.into(),
persistent_log_path: None,
}),
)
.unwrap();
Expand Down
Expand Up @@ -23,6 +23,7 @@ pub fn accounts_funds_are_updated_after_transaction() {
.with_mempool(Mempool {
pool_max_entries: 1_000_000usize.into(),
log_max_entries: 1_000_000usize.into(),
persistent_log_path: None,
}),
)
.unwrap();
Expand Down

0 comments on commit efeff16

Please sign in to comment.