Skip to content

Commit

Permalink
refactor selection function to be tracing-friendly
Browse files Browse the repository at this point in the history
  • Loading branch information
zeegomo committed Sep 24, 2021
1 parent 1dd6058 commit ec3cbe7
Showing 1 changed file with 118 additions and 87 deletions.
205 changes: 118 additions & 87 deletions jormungandr/src/fragment/selection.rs
Expand Up @@ -2,14 +2,14 @@ use super::logs::Logs;
use super::pool::internal::Pool;
use crate::{
blockcfg::{ApplyBlockLedger, Contents, ContentsBuilder, LedgerParameters},
fragment::FragmentId,
fragment::{Fragment, FragmentId},
};
use chain_core::property::Fragment as _;
use jormungandr_lib::interfaces::{BlockDate, FragmentStatus};

use async_trait::async_trait;
use futures::prelude::*;
use tracing::{span, Level};
use futures::{channel::oneshot::Receiver, future::Shared, prelude::*};
use tracing::{debug_span, Instrument};

use std::error::Error;
use std::iter;
Expand Down Expand Up @@ -59,6 +59,85 @@ impl Default for OldestFirst {
}
}

enum ApplyFragmentResult {
Ok {
ledger: ApplyBlockLedger,
space_left: u32,
},
DoesNotFit,
SoftDeadlineReached,
Err(String),
}

async fn try_apply_fragment(
fragment: Fragment,
ledger: ApplyBlockLedger,
ledger_params: &LedgerParameters,
soft_deadline_future: Shared<Receiver<()>>,
hard_deadline_future: Shared<Receiver<()>>,
mut space_left: u32,
) -> ApplyFragmentResult {
use futures::future::{select, Either};
let fragment_raw = fragment.to_raw(); // TODO: replace everything to FragmentRaw in the node
let fragment_size = fragment_raw.size_bytes_plus_size() as u32;

if fragment_size > ledger_params.block_content_max_size {
let reason = format!(
"fragment size {} exceeds maximum block content size {}",
fragment_size, ledger_params.block_content_max_size
);
return ApplyFragmentResult::Err(reason);
}

if fragment_size > space_left {
// return a fragment to the pool later if does not fit the contents size limit
tracing::trace!("discarding fragment that does not fit in block");
return ApplyFragmentResult::DoesNotFit;
}

space_left -= fragment_size;

tracing::debug!("applying fragment in simulation");

let fragment_future = tokio::task::spawn_blocking(move || ledger.apply_fragment(&fragment));

let ledger_res = match select(fragment_future, soft_deadline_future.clone()).await {
Either::Left((join_result, _)) => join_result.unwrap(),
Either::Right((_, fragment_future)) => {
if space_left < ledger_params.block_content_max_size {
tracing::debug!(
"aborting processing of the current fragment to satisfy the soft deadline"
);
return ApplyFragmentResult::SoftDeadlineReached;
}

tracing::debug!(
"only one fragment in progress: continuing until meeting the hard deadline"
);

match select(fragment_future, hard_deadline_future.clone()).await {
Either::Left((join_result, _)) => join_result.unwrap(),
Either::Right(_) => return ApplyFragmentResult::Err(
"cannot process a single fragment within the given time bounds (hard deadline)"
.into(),
),
}
}
};

match ledger_res {
Ok(ledger) => ApplyFragmentResult::Ok { ledger, space_left },
Err(err) => {
let mut msg = err.to_string();
for e in iter::successors(err.source(), |&e| e.source()) {
msg.push_str(": ");
msg.push_str(&e.to_string());
}
ApplyFragmentResult::Err(msg)
}
}
}

#[async_trait]
impl FragmentSelectionAlgorithm for OldestFirst {
async fn select(
Expand All @@ -70,109 +149,61 @@ impl FragmentSelectionAlgorithm for OldestFirst {
soft_deadline_future: futures::channel::oneshot::Receiver<()>,
hard_deadline_future: futures::channel::oneshot::Receiver<()>,
) -> FragmentSelectionResult {
use futures::future::{select, Either};

let date: BlockDate = ledger.block_date().into();
let mut current_total_size = 0;
let mut space_left = ledger_params.block_content_max_size;
let mut contents_builder = ContentsBuilder::new();
let mut return_to_pool = Vec::new();
let mut rejected_fragments_cnt = 0;

let soft_deadline_future = soft_deadline_future.shared();
let hard_deadline_future = hard_deadline_future.shared();

while let Some(fragment) = pool.remove_oldest() {
let id = fragment.id();
let fragment_raw = fragment.to_raw(); // TODO: replace everything to FragmentRaw in the node
let fragment_size = fragment_raw.size_bytes_plus_size() as u32;

let span = span!(Level::TRACE, "fragment_selection_algorithm", kind="older_first", hash=%id.to_string());
let _enter = span.enter();
if fragment_size > ledger_params.block_content_max_size {
let reason = format!(
"fragment size {} exceeds maximum block content size {}",
fragment_size, ledger_params.block_content_max_size
);
tracing::debug!("{}", reason);
logs.modify(id, FragmentStatus::Rejected { reason }, date);
rejected_fragments_cnt += 1;
continue;
}

let total_size = current_total_size + fragment_size;

if total_size > ledger_params.block_content_max_size {
// return a fragment to the pool later if does not fit the contents size limit
return_to_pool.push(fragment);
continue;
}

tracing::debug!("applying fragment in simulation");

let fragment1 = fragment.clone();
let ledger1 = ledger.clone();
let fragment_future =
tokio::task::spawn_blocking(move || ledger1.apply_fragment(&fragment1));

let result = match select(fragment_future, soft_deadline_future.clone()).await {
Either::Left((join_result, _)) => join_result.unwrap(),
Either::Right((_, fragment_future)) => {
if current_total_size > 0 {
tracing::debug!(
"aborting processing of the current fragment to satisfy the soft deadline"
);
return_to_pool.push(fragment);
break;
let span = debug_span!("fragment", hash=%id.to_string());

async {
let result = try_apply_fragment(
fragment.clone(),
ledger.clone(),
ledger_params,
soft_deadline_future.clone(),
hard_deadline_future.clone(),
space_left,
)
.await;
match result {
ApplyFragmentResult::Ok {
ledger: ledger_new,
space_left: space_left_new,
} => {
contents_builder.push(fragment);
ledger = ledger_new;
tracing::debug!("successfully applied and committed the fragment");
space_left = space_left_new;
}

tracing::debug!(
"only one fragment in progress: continuing until meeting the hard deadline"
);

match select(fragment_future, hard_deadline_future.clone()).await {
Either::Left((join_result, _)) => join_result.unwrap(),
Either::Right(_) => {
let reason =
"cannot process a single fragment within the given time bounds (hard deadline)";
tracing::debug!("{}", reason);
logs.modify(
id,
FragmentStatus::Rejected {
reason: reason.to_string(),
},
date,
);
rejected_fragments_cnt += 1;
break;
}
ApplyFragmentResult::DoesNotFit | ApplyFragmentResult::SoftDeadlineReached => {
return_to_pool.push(fragment);
}
}
};

match result {
Ok(ledger_new) => {
current_total_size = total_size;
contents_builder.push(fragment);
ledger = ledger_new;
tracing::debug!("successfully applied and committed the fragment");
}
Err(error) => {
let mut msg = error.to_string();
for e in iter::successors(error.source(), |&e| e.source()) {
msg.push_str(": ");
msg.push_str(&e.to_string());
ApplyFragmentResult::Err(reason) => {
tracing::debug!(%reason, "fragment is rejected");
logs.modify(id, FragmentStatus::Rejected { reason }, date);
rejected_fragments_cnt += 1;
}
tracing::debug!(?error, "fragment is rejected");
logs.modify(id, FragmentStatus::Rejected { reason: msg }, date);
rejected_fragments_cnt += 1;
}
}
.instrument(span)
.await;

if total_size == ledger_params.block_content_max_size {
if space_left == 0 {
tracing::debug!("block has reached max total size, exiting");
break;
}
}

tracing::debug!(
"finished block creation with {} fragments left in the pool",
pool.len()
);
return_to_pool.reverse();
pool.return_to_pool(return_to_pool);

Expand Down

0 comments on commit ec3cbe7

Please sign in to comment.