diff --git a/jormungandr/src/blockchain/candidate.rs b/jormungandr/src/blockchain/candidate.rs index ca56401633..ec82134314 100644 --- a/jormungandr/src/blockchain/candidate.rs +++ b/jormungandr/src/blockchain/candidate.rs @@ -2,19 +2,12 @@ use super::{ chain::{self, Blockchain, HeaderChainVerifyError, PreCheckedHeader}, chunk_sizes, }; -use crate::blockcfg::{Block, Header, HeaderHash}; +use crate::blockcfg::{Header, HeaderHash}; use crate::utils::async_msg::MessageQueue; -use chain_core::property::{Block as _, HasHeader}; use futures::future::{self, Either, Loop}; use futures::prelude::*; use slog::Logger; -use tokio::sync::lock::Lock; -use tokio::timer::{self, delay_queue, DelayQueue}; - -use std::collections::HashMap; -use std::hint::unreachable_unchecked; -use std::time::Duration; // derive use thiserror::Error; @@ -39,84 +32,6 @@ pub enum Error { Unexpected, } -#[derive(Clone)] -pub struct CandidateForest { - inner: Lock, - logger: Logger, -} - -struct CandidateForestThickets { - candidate_map: HashMap, - roots: HashMap, - root_ttl: Duration, - expirations: DelayQueue, -} - -struct Candidate { - data: CandidateData, - children: Vec, -} - -enum CandidateData { - Header(Header), - Block(Block), - Applied(Header), -} - -impl Candidate { - fn from_header(header: Header) -> Self { - Candidate { - data: CandidateData::Header(header), - children: Vec::new(), - } - } - - fn from_block(block: Block) -> Self { - Candidate { - data: CandidateData::Block(block), - children: Vec::new(), - } - } - - fn applied(header: Header) -> Self { - Candidate { - data: CandidateData::Applied(header), - children: Vec::new(), - } - } - - fn has_block(&self) -> bool { - use self::CandidateData::*; - match self.data { - Header(_) => false, - Block(_) => true, - Applied(_) => false, - } - } - - fn is_applied(&self) -> bool { - use self::CandidateData::*; - match self.data { - Applied(_) => true, - _ => false, - } - } - - // FIXME: fix the clone happiness, comes from chain_core::property::HasHeader - fn header(&self) -> Header { - use self::CandidateData::*; - match &self.data { - Header(header) => header.clone(), - Block(block) => block.header(), - Applied(header) => header.clone(), - } - } -} - -struct RootData { - expiration_key: delay_queue::Key, -} - mod chain_landing { use super::*; @@ -197,7 +112,7 @@ mod chain_landing { struct ChainAdvance { stream: HeaderStream, - parent_hash: HeaderHash, + parent_header: Header, header: Option
, new_hashes: Vec, logger: Logger, @@ -211,94 +126,38 @@ mod chain_advance { } impl ChainAdvance { - fn try_process_header( - &mut self, - header: Header, - forest: &mut Lock, - ) -> Poll<(), Error> { - match forest.poll_lock() { - Async::NotReady => { - assert!(self.header.is_none()); - self.header = Some(header); - Ok(Async::NotReady) - } - Async::Ready(mut forest) => { - // If we already have this header as candidate, - // skip to the next, otherwise validate - // and store as candidate and a child of its parent. - let block_hash = header.hash(); - if forest.candidate_map.contains_key(&block_hash) { - // Hey, it has the same crypto hash, so it's the - // same header, what could possibly go wrong? - debug!( - self.logger, - "block is already cached as a candidate"; - "hash" => %block_hash, - ); - } else { - let parent_hash = header.block_parent_hash(); - if parent_hash != self.parent_hash { - return Err(Error::BrokenHeaderChain(parent_hash)); - } - let parent_candidate = forest - .candidate_map - .get_mut(&parent_hash) - .ok_or(Error::MissingParentBlock(parent_hash.clone()))?; - // TODO: replace with a Blockchain method call - // when that can pre-validate headers without - // up-to-date ledger. - chain::pre_verify_link(&header, &parent_candidate.header())?; - if parent_candidate.is_applied() { - // The parent block has been committed to storage - // before this header was received. - // Drop the block hashes collected for fetching so far - // and try to re-land the chain. - self.new_hashes.clear(); - let (_, is_new) = forest.add_or_refresh_root(header); - debug!( - self.logger, - "re-landed the header chain, {}", - if is_new { "new root" } else { "existing root" }; - "hash" => %block_hash, - "parent" => %parent_hash, - ); - if is_new { - self.new_hashes.push(block_hash); - } - } else { - debug_assert!(!parent_candidate.children.contains(&block_hash)); - parent_candidate.children.push(block_hash); - forest - .candidate_map - .insert(block_hash, Candidate::from_header(header)); - debug!( - self.logger, - "adding block to fetch"; - "hash" => %block_hash, - "parent" => %parent_hash, - ); - self.new_hashes.push(block_hash); - } - } - self.parent_hash = block_hash; - Ok(().into()) - } - } - } - - fn poll_done( - &mut self, - forest: &mut Lock, - ) -> Poll { + fn process_header(&mut self, header: Header) -> Result<(), Error> { + // Pre-validate the chain and pick up header hashes. + let block_hash = header.hash(); + let parent_hash = header.block_parent_hash(); + if parent_hash != self.parent_header.hash() { + return Err(Error::BrokenHeaderChain(parent_hash)); + } + // TODO: replace with a Blockchain method call + // when that can pre-validate headers without + // up-to-date ledger. + chain::pre_verify_link(&header, &self.parent_header)?; + debug!( + self.logger, + "adding block to fetch"; + "hash" => %block_hash, + "parent" => %parent_hash, + ); + self.new_hashes.push(block_hash); + self.parent_header = header; + Ok(()) + } + + fn poll_done(&mut self) -> Poll { use self::chain_advance::Outcome; loop { if let Some(header) = self.header.take() { - try_ready!(self.try_process_header(header, forest)); + self.process_header(header)?; } else { match try_ready!(self.stream.poll().map_err(|()| Error::Unexpected)) { Some(header) => { - try_ready!(self.try_process_header(header, forest)); + self.process_header(header)?; } None => return Ok(Outcome::Complete.into()), } @@ -311,296 +170,67 @@ impl ChainAdvance { } } -impl CandidateForest { - pub fn new(root_ttl: Duration, logger: Logger) -> Self { - let inner = CandidateForestThickets { - candidate_map: HashMap::new(), - roots: HashMap::new(), - expirations: DelayQueue::new(), - root_ttl, - }; - CandidateForest { - inner: Lock::new(inner), - logger, - } - } - - fn land_header_chain( - &self, - blockchain: Blockchain, - stream: HeaderStream, - ) -> impl Future, Error = Error> { - let mut inner = self.inner.clone(); - let logger = self.logger.clone(); - - chain_landing::State::start(stream.map_err(|()| unreachable!()), blockchain) - .and_then(move |state| state.skip_present_blocks()) - .and_then(move |maybe_new| match maybe_new { - Some((header, stream)) => { - // We have got a header that may not be in storage yet, - // but its parent is. - // Find an existing root or create a new one. - let fut = future::poll_fn(move || Ok(inner.poll_lock())).and_then( - move |mut forest| { - let root_parent_hash = header.block_parent_hash(); - let (root_hash, is_new) = forest.add_or_refresh_root(header); - debug!( - logger, - "landed the header chain, {}", - if is_new { "new root" } else { "existing root" }; - "hash" => %root_hash, - "parent" => %root_parent_hash, - ); - let new_hashes = if is_new { vec![root_hash] } else { Vec::new() }; - let landing = ChainAdvance { - stream: stream.into_inner(), - parent_hash: root_hash, - header: None, - new_hashes, - logger, - }; - Ok(Some(landing)) - }, - ); - Either::A(fut) - } - None => Either::B(future::ok(None)), - }) - } - - /// Consumes headers from the stream, validating and caching them as - /// candidate entries with possibly a new root. Returns a future that - /// resolves to a batch of block hashes to request from the network - /// and the stream if the process terminated early due to reaching - /// a limit on the number of blocks or (TODO: implement) needing - /// block data to validate more blocks with newer leadership information. - pub fn advance_branch( - &self, - blockchain: Blockchain, - header_stream: HeaderStream, - ) -> impl Future, Option), Error = Error> { - let mut inner = self.inner.clone(); - self.land_header_chain(blockchain, header_stream) - .and_then(move |mut advance| { - if advance.is_some() { - let fut = future::poll_fn(move || { - use self::chain_advance::Outcome; - let done = try_ready!(advance.as_mut().unwrap().poll_done(&mut inner)); - let advance = advance.take().unwrap(); - let ret_stream = match done { - Outcome::Complete => None, - Outcome::Incomplete => Some(advance.stream), - }; - Ok((advance.new_hashes, ret_stream).into()) - }); - Either::A(fut) - } else { - Either::B(future::ok((Vec::new(), None))) - } - }) - } - - /// Puts a block into the cache for later application. - /// - /// The block's header must have been earlier registered in a header chain - /// passed to the `advance_branch` method. If the block is already - /// in the cache, the block value is not updated and the returned future - /// resolves successfully. - pub fn cache_block(&self, block: Block) -> impl Future { - let block_hash = block.id(); - let mut inner = self.inner.clone(); - future::poll_fn(move || Ok(inner.poll_lock())).and_then(move |mut forest| { - forest - .cache_requested_block(block_hash, block) - .map_err(|_block| chain::ErrorKind::BlockNotRequested(block_hash).into()) - }) - } - - pub fn apply_block( - &self, - block: Block, - ) -> impl Future, Error = chain::Error> { - let mut inner = self.inner.clone(); - future::poll_fn(move || Ok(inner.poll_lock())) - .and_then(move |mut forest| Ok(forest.apply_block(block))) - } - - pub fn purge(&self) -> impl Future { - let mut inner = self.inner.clone(); - - // FIXME: this is expected to be called periodically, as it ignores - // polling deadlines set by the DelayQueue. A rework will be - // needed to gather all GC activities from here and other blockchain - // entities to be managed by a common DelayQueue in a separate task, - // with channels from the garbage-producing tasks to manage - // expiration. - future::poll_fn(move || Ok(inner.poll_lock())) - .and_then(|mut forest| future::poll_fn(move || forest.poll_purge())) - } -} - -impl CandidateForestThickets { - fn add_or_refresh_root(&mut self, header: Header) -> (HeaderHash, bool) { - use std::collections::hash_map::Entry::*; - - let root_hash = header.hash(); - let is_new = match self.roots.entry(root_hash) { - Vacant(entry) => { - let expiration_key = self.expirations.insert(root_hash, self.root_ttl); - entry.insert(RootData { expiration_key }); - let _old = self - .candidate_map - .insert(root_hash, Candidate::from_header(header)); - debug_assert!( - _old.is_none(), - "chain pull root candidate {} was previously cached", - root_hash, - ); - true - } - Occupied(entry) => { - debug_assert!( - !self - .candidate_map - .get(&root_hash) - .expect("chain pull root candidate should be in the map") - .has_block(), - "a chain pull root candidate should not cache a block", +fn land_header_chain( + blockchain: Blockchain, + stream: HeaderStream, + logger: Logger, +) -> impl Future, Error = Error> { + chain_landing::State::start(stream.map_err(|()| unreachable!()), blockchain) + .and_then(move |state| state.skip_present_blocks()) + .and_then(move |maybe_new| match maybe_new { + Some((header, stream)) => { + // We have got a header that may not be in storage yet, + // but its parent is. + // Find an existing root or create a new one. + let root_hash = header.hash(); + let root_parent_hash = header.block_parent_hash(); + debug!( + logger, + "landed the header chain"; + "hash" => %root_hash, + "parent" => %root_parent_hash, ); - self.expirations - .reset(&entry.get().expiration_key, self.root_ttl); - false - } - }; - (root_hash, is_new) - } - - fn apply_candidate(&mut self, block_hash: HeaderHash) -> Candidate { - use std::collections::hash_map::Entry::*; - - match self.candidate_map.entry(block_hash) { - Occupied(mut entry) => { - let header = entry.get().header(); - entry.insert(Candidate::applied(header)) + let new_hashes = vec![root_hash]; + let landing = ChainAdvance { + stream: stream.into_inner(), + parent_header: header, + header: None, + new_hashes, + logger, + }; + future::ok(Some(landing)) } - Vacant(_) => panic!("referential integrity failure in CandidateForest"), - } - } - - fn apply_block(&mut self, block: Block) -> Vec { - use std::collections::hash_map::Entry::*; + None => future::ok(None), + }) +} - let block_hash = block.id(); - if self.roots.contains_key(&block_hash) { - let candidate = self.apply_candidate(block_hash); - debug_assert!( - !candidate.has_block(), - "a chain pull root candidate should not cache a block", - ); - let mut block_avalanche = vec![block]; - let mut child_hashes = candidate.children; - while let Some(child_hash) = child_hashes.pop() { - match self.candidate_map.entry(child_hash) { - Occupied(mut entry) => { - // Promote the child to a new root entry - let expiration_key = self.expirations.insert(child_hash, self.root_ttl); - let root_data = RootData { expiration_key }; - let _old = self.roots.insert(child_hash, root_data); - debug_assert!(_old.is_none()); - match &entry.get().data { - CandidateData::Header(_header) => { - debug_assert_eq!(child_hash, _header.hash()); - } - CandidateData::Block(block) => { - let header = block.header(); - debug_assert_eq!(child_hash, header.hash()); - // Extract the block and descend to children - let candidate = entry.insert(Candidate::applied(header)); - if let CandidateData::Block(block) = candidate.data { - block_avalanche.push(block); - } else { - unsafe { unreachable_unchecked() } - } - child_hashes.extend(candidate.children); - } - CandidateData::Applied(_) => { - panic!("a child block has been applied ahead of the parent") - } - } - } - Vacant(_) => panic!("referential integrity failure in CandidateForest"), - } - } - block_avalanche +/// Consumes headers from the stream, filtering out those that are already +/// present and validating the chain integrity for the remainder. +/// Returns a future that resolves to a batch of block hashes to request +/// from the network, +/// and the stream if the process terminated early due to reaching +/// a limit on the number of blocks or (TODO: implement) needing +/// block data to validate more blocks with newer leadership information. +pub fn advance_branch( + blockchain: Blockchain, + header_stream: HeaderStream, + logger: Logger, +) -> impl Future, Option), Error = Error> { + land_header_chain(blockchain, header_stream, logger).and_then(move |mut advance| { + if advance.is_some() { + let fut = future::poll_fn(move || { + use self::chain_advance::Outcome; + let done = try_ready!(advance.as_mut().unwrap().poll_done()); + let advance = advance.take().unwrap(); + let ret_stream = match done { + Outcome::Complete => None, + Outcome::Incomplete => Some(advance.stream), + }; + Ok((advance.new_hashes, ret_stream).into()) + }); + Either::A(fut) } else { - match self.cache_requested_block(block_hash, block) { - Ok(()) => { - // The task that applies the block has won the lock before - // other tasks that should apply preceding blocks. - // The block is cached for later, return an empty vector. - Vec::default() - } - Err(block) => { - // The block is not part of a chain pull. - // Pass it through so that it gets applied to storage - // or fails to validate against the parent that should be - // already stored. - vec![block] - } - } + Either::B(future::ok((Vec::new(), None))) } - } - - fn cache_requested_block(&mut self, block_hash: HeaderHash, block: Block) -> Result<(), Block> { - use std::collections::hash_map::Entry::*; - - match self.candidate_map.entry(block_hash) { - Vacant(_) => Err(block), - Occupied(mut entry) => { - match &entry.get().data { - CandidateData::Header(header) => { - debug_assert!(header.hash() == block_hash); - entry.insert(Candidate::from_block(block)); - } - CandidateData::Block(block) => { - debug_assert!(block.header().hash() == block_hash); - } - CandidateData::Applied(header) => { - debug_assert!(header.hash() == block_hash); - } - } - Ok(()) - } - } - } - - // Removes the root from, then walks up the tree and - // removes all the descendant candidates. - fn expunge_root(&mut self, root_hash: HeaderHash) { - self.roots.remove(&root_hash); - let mut hashes = vec![root_hash]; - while let Some(hash) = hashes.pop() { - let candidate = self - .candidate_map - .remove(&hash) - .expect("referential integrity failure in CandidateForest"); - hashes.extend(candidate.children); - } - } - - fn poll_purge(&mut self) -> Poll<(), timer::Error> { - loop { - match self.expirations.poll()? { - Async::NotReady => { - // Nothing to process now. - // Return Ready to release the lock. - return Ok(Async::Ready(())); - } - Async::Ready(None) => return Ok(Async::Ready(())), - Async::Ready(Some(entry)) => { - self.expunge_root(entry.into_inner()); - } - } - } - } + }) } diff --git a/jormungandr/src/blockchain/mod.rs b/jormungandr/src/blockchain/mod.rs index 4a16f88e79..d96fce71ff 100644 --- a/jormungandr/src/blockchain/mod.rs +++ b/jormungandr/src/blockchain/mod.rs @@ -26,7 +26,6 @@ mod chunk_sizes { pub use self::{ branch::Branch, - candidate::CandidateForest, chain::{ new_epoch_leadership_from, Blockchain, Error, ErrorKind, PreCheckedHeader, MAIN_BRANCH_TAG, }, diff --git a/jormungandr/src/blockchain/process.rs b/jormungandr/src/blockchain/process.rs index 03e85236d7..7c1aa8d4cb 100644 --- a/jormungandr/src/blockchain/process.rs +++ b/jormungandr/src/blockchain/process.rs @@ -1,6 +1,5 @@ use super::{ - candidate::{self, CandidateForest}, - chain, + candidate, chain, chain_selection::{self, ComparisonResult}, Blockchain, Error, ErrorKind, PreCheckedHeader, Ref, Tip, MAIN_BRANCH_TAG, }; @@ -10,6 +9,7 @@ use crate::{ intercom::{ self, BlockMsg, ExplorerMsg, NetworkMsg, PropagateMsg, ReplyHandle, TransactionMsg, }, + log, network::p2p::Id as NodeId, stats_counter::StatsCounter, utils::{ @@ -59,7 +59,6 @@ const GET_NEXT_BLOCK_SCHEDULER_CONFIG: FireForgetSchedulerConfig = FireForgetSch pub struct Process { pub blockchain: Blockchain, pub blockchain_tip: Tip, - pub candidate_forest: CandidateForest, pub stats_counter: StatsCounter, pub network_msgbox: MessageBox, pub fragment_msgbox: MessageBox, @@ -73,7 +72,6 @@ impl Process { service_info: TokioServiceInfo, input: MessageQueue, ) -> impl Future { - service_info.spawn(self.start_garbage_collector(service_info.logger().clone())); service_info.spawn(self.start_branch_reprocessing(service_info.logger().clone())); let pull_headers_scheduler = self.spawn_pull_headers_scheduler(&service_info); let get_next_block_scheduler = self.spawn_get_next_block_scheduler(&service_info); @@ -99,7 +97,6 @@ impl Process { let blockchain_tip = self.blockchain_tip.clone(); let network_msg_box = self.network_msgbox.clone(); let explorer_msg_box = self.explorer_msgbox.clone(); - let candidate_forest = self.candidate_forest.clone(); let mut tx_msg_box = self.fragment_msgbox.clone(); let stats_counter = self.stats_counter.clone(); @@ -204,7 +201,6 @@ impl Process { let get_next_block_scheduler = get_next_block_scheduler.clone(); let future = future::loop_fn(state, move |state| { let blockchain = blockchain_fold.clone(); - let candidate_forest = candidate_forest.clone(); let tx_msg_box = tx_msg_box.clone(); let explorer_msg_box = explorer_msg_box.clone(); let stats_counter = stats_counter.clone(); @@ -220,7 +216,6 @@ impl Process { Some(block) => Either::A( process_network_block( blockchain, - candidate_forest, block, tx_msg_box, explorer_msg_box, @@ -276,12 +271,13 @@ impl Process { info!(info.logger(), "receiving header stream from network"); let (stream, reply) = handle.into_stream_and_reply(); - let logger = info.logger().clone(); - let logger_err = logger.clone(); + let logger = info.logger().new(o!(log::KEY_SUB_TASK => "chain_pull")); + let logger_err1 = logger.clone(); + let logger_err2 = logger.clone(); let schedule_logger = logger.clone(); let mut pull_headers_scheduler = pull_headers_scheduler.clone(); - let future = candidate_forest.advance_branch(blockchain, stream) + let future = candidate::advance_branch(blockchain, stream, logger) .inspect(move |(header_ids, _)| header_ids.iter() .try_for_each(|header_id| pull_headers_scheduler.declare_completed(*header_id)) @@ -289,7 +285,7 @@ impl Process { .then(move |resp| match resp { Err(e) => { info!( - logger, + logger_err1, "error processing an incoming header stream"; "reason" => %e, ); @@ -313,7 +309,7 @@ impl Process { }) .timeout(Duration::from_secs(DEFAULT_TIMEOUT_PROCESS_HEADERS)) .map_err(move |err: TimeoutError| { - error!(logger_err, "cannot process network headers" ; "reason" => err.to_string()) + error!(logger_err2, "cannot process network headers" ; "reason" => err.to_string()) }); info.spawn(future); } @@ -337,20 +333,6 @@ impl Process { }) } - fn start_garbage_collector(&self, logger: Logger) -> impl Future { - let candidate_forest = self.candidate_forest.clone(); - let garbage_collection_interval = self.garbage_collection_interval; - let error_logger = logger.clone(); - Interval::new_interval(garbage_collection_interval) - .for_each(move |_instant| { - debug!(logger, "garbage collecting unresolved branch candidates"); - candidate_forest.purge() - }) - .map_err(move |e| { - error!(error_logger, "cannot run garbage collection" ; "reason" => %e); - }) - } - fn spawn_pull_headers_scheduler(&self, info: &TokioServiceInfo) -> PullHeadersScheduler { let network_msgbox = self.network_msgbox.clone(); let scheduler_logger = info.logger().clone(); @@ -610,15 +592,12 @@ fn process_block_announcement( fn process_network_block( blockchain: Blockchain, - candidate_forest: CandidateForest, block: Block, tx_msg_box: MessageBox, explorer_msg_box: Option>, mut get_next_block_scheduler: GetNextBlockScheduler, logger: Logger, ) -> impl Future>, Error = chain::Error> { - use futures::future::Either::{A, B}; - get_next_block_scheduler.declare_completed(block.id()) .unwrap_or_else(|e| error!(logger, "get next block schedule completion failed"; "reason" => e.to_string())); let header = block.header(); @@ -633,86 +612,83 @@ fn process_network_block( "parent" => %header.parent_id(), "date" => %header.block_date(), ); - A(A(future::ok(None))) + Either::A(future::ok(None)) } PreCheckedHeader::MissingParent { header, .. } => { + let parent_hash = header.parent_id(); debug!( logger, - "block is missing a locally stored parent, caching as candidate"; + "block is missing a locally stored parent"; "hash" => %header.hash(), - "parent" => %header.parent_id(), + "parent" => %parent_hash, "date" => %header.block_date(), ); - A(B(candidate_forest.cache_block(block).map(|()| None))) + Either::A(future::err( + ErrorKind::MissingParentBlock(parent_hash).into(), + )) } PreCheckedHeader::HeaderWithCache { parent_ref, .. } => { - let post_check_and_apply = candidate_forest - .apply_block(block) - .and_then(move |blocks| { - check_and_apply_blocks( - blockchain, - parent_ref, - blocks, - tx_msg_box, - explorer_msg_box, - logger, - ) - }) - .map(Some); - B(post_check_and_apply) + let post_check_and_apply = check_and_apply_block( + blockchain, + parent_ref, + block, + tx_msg_box, + explorer_msg_box, + logger, + ) + .map(Some); + Either::B(post_check_and_apply) } }) } -fn check_and_apply_blocks( +fn check_and_apply_block( blockchain: Blockchain, parent_ref: Arc, - blocks: Vec, + block: Block, tx_msg_box: MessageBox, explorer_msg_box: Option>, logger: Logger, ) -> impl Future, Error = chain::Error> { let explorer_enabled = explorer_msg_box.is_some(); - stream::iter_ok(blocks).fold(parent_ref, move |parent_ref, block| { - let blockchain1 = blockchain.clone(); - let mut tx_msg_box = tx_msg_box.clone(); - let mut explorer_msg_box = explorer_msg_box.clone(); - let logger = logger.clone(); - let header = block.header(); - blockchain - .post_check_header(header, parent_ref) - .and_then(move |post_checked| { - let header = post_checked.header(); - debug!( - logger, - "applying block to storage"; - "hash" => %header.hash(), - "parent" => %header.parent_id(), - "date" => %header.block_date(), - ); - let mut block_for_explorer = if explorer_enabled { - Some(block.clone()) - } else { - None - }; - let fragment_ids = block.fragments().map(|f| f.id()).collect::>(); - blockchain1 - .apply_and_store_block(post_checked, block) - .and_then(move |block_ref| { - try_request_fragment_removal(&mut tx_msg_box, fragment_ids, block_ref.header()).unwrap_or_else(|err| { - error!(logger, "cannot remove fragments from pool" ; "reason" => %err) - }); - if let Some(msg_box) = explorer_msg_box.as_mut() { - msg_box - .try_send(ExplorerMsg::NewBlock(block_for_explorer.take().unwrap())) - .unwrap_or_else(|err| { - error!(logger, "cannot add block to explorer: {}", err) - }); - } - Ok(block_ref) - }) - }) - }) + let blockchain1 = blockchain.clone(); + let mut tx_msg_box = tx_msg_box.clone(); + let mut explorer_msg_box = explorer_msg_box.clone(); + let logger = logger.clone(); + let header = block.header(); + blockchain + .post_check_header(header, parent_ref) + .and_then(move |post_checked| { + let header = post_checked.header(); + debug!( + logger, + "applying block to storage"; + "hash" => %header.hash(), + "parent" => %header.parent_id(), + "date" => %header.block_date(), + ); + let mut block_for_explorer = if explorer_enabled { + Some(block.clone()) + } else { + None + }; + let fragment_ids = block.fragments().map(|f| f.id()).collect::>(); + blockchain1 + .apply_and_store_block(post_checked, block) + .and_then(move |block_ref| { + try_request_fragment_removal(&mut tx_msg_box, fragment_ids, block_ref.header()).unwrap_or_else(|err| { + error!(logger, "cannot remove fragments from pool" ; "reason" => %err) + }); + if let Some(msg_box) = explorer_msg_box.as_mut() { + msg_box + .try_send(ExplorerMsg::NewBlock(block_for_explorer.take().unwrap())) + .unwrap_or_else(|err| { + error!(logger, "cannot add block to explorer: {}", err) + }); + } + Ok(block_ref) + }) + }) } fn network_block_error_into_reply(err: chain::Error) -> intercom::Error { diff --git a/jormungandr/src/main.rs b/jormungandr/src/main.rs index 0a3afef52f..8b98ece6d5 100644 --- a/jormungandr/src/main.rs +++ b/jormungandr/src/main.rs @@ -23,7 +23,7 @@ extern crate slog_syslog; use crate::{ blockcfg::{HeaderHash, Leader}, - blockchain::{Blockchain, CandidateForest}, + blockchain::Blockchain, diagnostic::Diagnostic, network::p2p::P2pTopology, secure::enclave::Enclave, @@ -163,14 +163,9 @@ fn start_services(bootstrapped_node: BootstrappedNode) -> Result<(), start_up::E let block_cache_ttl: Duration = Duration::from_secs(120); let stats_counter = stats_counter.clone(); services.spawn_future("block", move |info| { - let candidate_forest = CandidateForest::new( - block_cache_ttl, - info.logger().new(o!(log::KEY_SUB_TASK => "chain_pull")), - ); let process = blockchain::Process { blockchain, blockchain_tip, - candidate_forest, stats_counter, network_msgbox, fragment_msgbox,