Skip to content

Commit

Permalink
refactor selection/filtering functions to be tracing-friendly (#3619)
Browse files Browse the repository at this point in the history
* refactor selection function to be tracing-friendly

* use standard result

* refactor filter function to be tracing-friendly

* fix fail fast not including failed fragments in report
  • Loading branch information
zeegomo committed Oct 12, 2021
1 parent 8a9e2e0 commit 0017210
Show file tree
Hide file tree
Showing 2 changed files with 190 additions and 160 deletions.
143 changes: 70 additions & 73 deletions jormungandr/src/fragment/pool.rs
Expand Up @@ -13,7 +13,9 @@ use crate::{
utils::async_msg::MessageBox,
};
use chain_core::property::Fragment as _;
use chain_impl_mockchain::{block::BlockDate, fragment::Contents, transaction::Transaction};
use chain_impl_mockchain::{
block::BlockDate, fragment::Contents, setting::Settings, transaction::Transaction,
};
use futures::channel::mpsc::SendError;
use futures::sink::SinkExt;
use jormungandr_lib::{
Expand All @@ -24,6 +26,7 @@ use jormungandr_lib::{
time::SecondsSinceUnixEpoch,
};
use thiserror::Error;
use tracing::Instrument;

use std::mem;

Expand Down Expand Up @@ -94,6 +97,58 @@ impl Pool {
}
}

async fn filter_fragment(
&mut self,
fragment: &Fragment,
ledger_settings: &Settings,
block_date: BlockDate,
) -> Result<(), FragmentRejectionReason> {
let id = fragment.id();
if self.logs.exists(id) {
tracing::debug!("fragment is already logged");
return Err(FragmentRejectionReason::FragmentAlreadyInLog);
}

if let Some(valid_until) = get_transaction_expiry_date(fragment) {
use chain_impl_mockchain::ledger::check::{valid_transaction_date, TxValidityError};
match valid_transaction_date(ledger_settings, valid_until, block_date) {
Ok(_) => {}
Err(TxValidityError::TransactionExpired) => {
tracing::debug!("fragment is expired at the time of receiving");
return Err(FragmentRejectionReason::FragmentExpired);
}
Err(TxValidityError::TransactionValidForTooLong) => {
tracing::debug!("fragment is valid for too long");
return Err(FragmentRejectionReason::FragmentValidForTooLong);
}
}
}

if !is_fragment_valid(fragment) {
tracing::debug!("fragment is invalid, not including to the pool");
return Err(FragmentRejectionReason::FragmentInvalid);
}

if let Some(persistent_log) = self.persistent_log.as_mut() {
use bincode::Options;
let entry = PersistentFragmentLog {
time: SecondsSinceUnixEpoch::now(),
fragment: fragment.clone(),
};
// this must be sufficient: the PersistentFragmentLog format is using byte array
// for serialization so we do not expect any problems during deserialization
let codec = bincode::DefaultOptions::new().with_fixint_encoding();
let serialized = codec.serialize(&entry).unwrap();

if let Err(err) = persistent_log.write_all(&serialized).await {
tracing::error!(err = %err, "failed to write persistent fragment log entry");
}
}

tracing::debug!("including fragment to the pool");
Ok(())
}

/// Returns number of registered fragments. Setting `fail_fast` to `true` will force this
/// method to reject all fragments after the first invalid fragments was met.
pub async fn insert_and_propagate_all(
Expand All @@ -104,8 +159,6 @@ impl Pool {
) -> Result<FragmentsProcessingSummary, Error> {
tracing::debug!(origin = ?origin, "received {} fragments", fragments.len());

use bincode::Options;

let mut filtered_fragments = Vec::new();
let mut rejected = Vec::new();

Expand All @@ -119,77 +172,23 @@ impl Pool {
for fragment in fragments.by_ref() {
let id = fragment.id();

let span = tracing::trace_span!("pool_incoming_fragment", fragment_id=?id);
let _enter = span.enter();
let span = tracing::debug_span!("pool_incoming_fragment", fragment_id=?id);

if self.logs.exists(id) {
rejected.push(RejectedFragmentInfo {
id,
reason: FragmentRejectionReason::FragmentAlreadyInLog,
});
tracing::debug!("fragment is already logged");
continue;
}

if let Some(valid_until) = get_transaction_expiry_date(&fragment) {
use chain_impl_mockchain::ledger::check::{
valid_transaction_date, TxValidityError,
};
match valid_transaction_date(ledger_settings, valid_until, block_date) {
Ok(_) => {}
Err(TxValidityError::TransactionExpired) => {
rejected.push(RejectedFragmentInfo {
id,
reason: FragmentRejectionReason::FragmentExpired,
});
tracing::debug!("fragment is expired at the time of receiving");
continue;
}
Err(TxValidityError::TransactionValidForTooLong) => {
rejected.push(RejectedFragmentInfo {
id,
reason: FragmentRejectionReason::FragmentValidForTooLong,
});
tracing::debug!("fragment is valid for too long");
continue;
match self
.filter_fragment(&fragment, ledger_settings, block_date)
.instrument(span)
.await
{
Err(reason @ FragmentRejectionReason::FragmentInvalid) => {
rejected.push(RejectedFragmentInfo { id, reason });
if fail_fast {
tracing::debug!("fail_fast is enabled; rejecting all downstream fragments");
break;
}
}
Err(reason) => rejected.push(RejectedFragmentInfo { id, reason }),
Ok(()) => filtered_fragments.push(fragment),
}

if !is_fragment_valid(&fragment) {
rejected.push(RejectedFragmentInfo {
id,
reason: FragmentRejectionReason::FragmentInvalid,
});

tracing::debug!("fragment is invalid, not including to the pool");

if fail_fast {
tracing::debug!("fail_fast is enabled; rejecting all downstream fragments");
break;
}

continue;
}

if let Some(persistent_log) = self.persistent_log.as_mut() {
let entry = PersistentFragmentLog {
time: SecondsSinceUnixEpoch::now(),
fragment: fragment.clone(),
};
// this must be sufficient: the PersistentFragmentLog format is using byte array
// for serialization so we do not expect any problems during deserialization
let codec = bincode::DefaultOptions::new().with_fixint_encoding();
let serialized = codec.serialize(&entry).unwrap();

if let Err(err) = persistent_log.write_all(&serialized).await {
tracing::error!(err = %err, "failed to write persistent fragment log entry");
}
}

tracing::debug!("including fragment to the pool");

filtered_fragments.push(fragment);
}

// flush every request to minimize possibility of losing fragments at the expense of non optimal performance
Expand All @@ -202,10 +201,8 @@ impl Pool {
if fail_fast {
for fragment in fragments {
let id = fragment.id();
let span = tracing::trace_span!("pool_incoming_fragment", fragment_id=?id);
let _enter = span.enter();
tracing::error!(
"rejected due to fail_fast and one of previous fragments being invalid"
%id, "rejected due to fail_fast and one of previous fragments being invalid"
);
rejected.push(RejectedFragmentInfo {
id,
Expand Down

0 comments on commit 0017210

Please sign in to comment.