diff --git a/jormungandr/src/fragment/selection.rs b/jormungandr/src/fragment/selection.rs index 43197cf046..ac95477d79 100644 --- a/jormungandr/src/fragment/selection.rs +++ b/jormungandr/src/fragment/selection.rs @@ -2,14 +2,14 @@ use super::logs::Logs; use super::pool::internal::Pool; use crate::{ blockcfg::{ApplyBlockLedger, Contents, ContentsBuilder, LedgerParameters}, - fragment::FragmentId, + fragment::{Fragment, FragmentId}, }; use chain_core::property::Fragment as _; use jormungandr_lib::interfaces::{BlockDate, FragmentStatus}; use async_trait::async_trait; -use futures::prelude::*; -use tracing::{span, Level}; +use futures::{channel::oneshot::Receiver, future::Shared, prelude::*}; +use tracing::{debug_span, Instrument}; use std::error::Error; use std::iter; @@ -59,6 +59,85 @@ impl Default for OldestFirst { } } +enum ApplyFragmentResult { + Ok { + ledger: ApplyBlockLedger, + space_left: u32, + }, + DoesNotFit, + SoftDeadlineReached, + Err(String), +} + +async fn try_apply_fragment( + fragment: Fragment, + ledger: ApplyBlockLedger, + ledger_params: &LedgerParameters, + soft_deadline_future: Shared>, + hard_deadline_future: Shared>, + mut space_left: u32, +) -> ApplyFragmentResult { + use futures::future::{select, Either}; + let fragment_raw = fragment.to_raw(); // TODO: replace everything to FragmentRaw in the node + let fragment_size = fragment_raw.size_bytes_plus_size() as u32; + + if fragment_size > ledger_params.block_content_max_size { + let reason = format!( + "fragment size {} exceeds maximum block content size {}", + fragment_size, ledger_params.block_content_max_size + ); + return ApplyFragmentResult::Err(reason); + } + + if fragment_size > space_left { + // return a fragment to the pool later if does not fit the contents size limit + tracing::trace!("discarding fragment that does not fit in block"); + return ApplyFragmentResult::DoesNotFit; + } + + space_left -= fragment_size; + + tracing::debug!("applying fragment in simulation"); + + let fragment_future = tokio::task::spawn_blocking(move || ledger.apply_fragment(&fragment)); + + let ledger_res = match select(fragment_future, soft_deadline_future.clone()).await { + Either::Left((join_result, _)) => join_result.unwrap(), + Either::Right((_, fragment_future)) => { + if space_left < ledger_params.block_content_max_size { + tracing::debug!( + "aborting processing of the current fragment to satisfy the soft deadline" + ); + return ApplyFragmentResult::SoftDeadlineReached; + } + + tracing::debug!( + "only one fragment in progress: continuing until meeting the hard deadline" + ); + + match select(fragment_future, hard_deadline_future.clone()).await { + Either::Left((join_result, _)) => join_result.unwrap(), + Either::Right(_) => return ApplyFragmentResult::Err( + "cannot process a single fragment within the given time bounds (hard deadline)" + .into(), + ), + } + } + }; + + match ledger_res { + Ok(ledger) => ApplyFragmentResult::Ok { ledger, space_left }, + Err(err) => { + let mut msg = err.to_string(); + for e in iter::successors(err.source(), |&e| e.source()) { + msg.push_str(": "); + msg.push_str(&e.to_string()); + } + ApplyFragmentResult::Err(msg) + } + } +} + #[async_trait] impl FragmentSelectionAlgorithm for OldestFirst { async fn select( @@ -70,109 +149,61 @@ impl FragmentSelectionAlgorithm for OldestFirst { soft_deadline_future: futures::channel::oneshot::Receiver<()>, hard_deadline_future: futures::channel::oneshot::Receiver<()>, ) -> FragmentSelectionResult { - use futures::future::{select, Either}; - let date: BlockDate = ledger.block_date().into(); - let mut current_total_size = 0; + let mut space_left = ledger_params.block_content_max_size; let mut contents_builder = ContentsBuilder::new(); let mut return_to_pool = Vec::new(); let mut rejected_fragments_cnt = 0; let soft_deadline_future = soft_deadline_future.shared(); let hard_deadline_future = hard_deadline_future.shared(); - while let Some(fragment) = pool.remove_oldest() { let id = fragment.id(); - let fragment_raw = fragment.to_raw(); // TODO: replace everything to FragmentRaw in the node - let fragment_size = fragment_raw.size_bytes_plus_size() as u32; - - let span = span!(Level::TRACE, "fragment_selection_algorithm", kind="older_first", hash=%id.to_string()); - let _enter = span.enter(); - if fragment_size > ledger_params.block_content_max_size { - let reason = format!( - "fragment size {} exceeds maximum block content size {}", - fragment_size, ledger_params.block_content_max_size - ); - tracing::debug!("{}", reason); - logs.modify(id, FragmentStatus::Rejected { reason }, date); - rejected_fragments_cnt += 1; - continue; - } - - let total_size = current_total_size + fragment_size; - - if total_size > ledger_params.block_content_max_size { - // return a fragment to the pool later if does not fit the contents size limit - return_to_pool.push(fragment); - continue; - } - - tracing::debug!("applying fragment in simulation"); - - let fragment1 = fragment.clone(); - let ledger1 = ledger.clone(); - let fragment_future = - tokio::task::spawn_blocking(move || ledger1.apply_fragment(&fragment1)); - - let result = match select(fragment_future, soft_deadline_future.clone()).await { - Either::Left((join_result, _)) => join_result.unwrap(), - Either::Right((_, fragment_future)) => { - if current_total_size > 0 { - tracing::debug!( - "aborting processing of the current fragment to satisfy the soft deadline" - ); - return_to_pool.push(fragment); - break; + let span = debug_span!("fragment", hash=%id.to_string()); + + async { + let result = try_apply_fragment( + fragment.clone(), + ledger.clone(), + ledger_params, + soft_deadline_future.clone(), + hard_deadline_future.clone(), + space_left, + ) + .await; + match result { + ApplyFragmentResult::Ok { + ledger: ledger_new, + space_left: space_left_new, + } => { + contents_builder.push(fragment); + ledger = ledger_new; + tracing::debug!("successfully applied and committed the fragment"); + space_left = space_left_new; } - - tracing::debug!( - "only one fragment in progress: continuing until meeting the hard deadline" - ); - - match select(fragment_future, hard_deadline_future.clone()).await { - Either::Left((join_result, _)) => join_result.unwrap(), - Either::Right(_) => { - let reason = - "cannot process a single fragment within the given time bounds (hard deadline)"; - tracing::debug!("{}", reason); - logs.modify( - id, - FragmentStatus::Rejected { - reason: reason.to_string(), - }, - date, - ); - rejected_fragments_cnt += 1; - break; - } + ApplyFragmentResult::DoesNotFit | ApplyFragmentResult::SoftDeadlineReached => { + return_to_pool.push(fragment); } - } - }; - - match result { - Ok(ledger_new) => { - current_total_size = total_size; - contents_builder.push(fragment); - ledger = ledger_new; - tracing::debug!("successfully applied and committed the fragment"); - } - Err(error) => { - let mut msg = error.to_string(); - for e in iter::successors(error.source(), |&e| e.source()) { - msg.push_str(": "); - msg.push_str(&e.to_string()); + ApplyFragmentResult::Err(reason) => { + tracing::debug!(%reason, "fragment is rejected"); + logs.modify(id, FragmentStatus::Rejected { reason }, date); + rejected_fragments_cnt += 1; } - tracing::debug!(?error, "fragment is rejected"); - logs.modify(id, FragmentStatus::Rejected { reason: msg }, date); - rejected_fragments_cnt += 1; } } + .instrument(span) + .await; - if total_size == ledger_params.block_content_max_size { + if space_left == 0 { + tracing::debug!("block has reached max total size, exiting"); break; } } + tracing::debug!( + "finished block creation with {} fragments left in the pool", + pool.len() + ); return_to_pool.reverse(); pool.return_to_pool(return_to_pool);