diff --git a/chain/chain/src/chain.rs b/chain/chain/src/chain.rs index 3a362143fa6..593b9806c4c 100644 --- a/chain/chain/src/chain.rs +++ b/chain/chain/src/chain.rs @@ -75,6 +75,16 @@ pub const MAX_ORPHAN_SIZE: usize = 1024; /// Maximum age of orhpan to store in the chain. const MAX_ORPHAN_AGE_SECS: u64 = 300; +// Number of orphan ancestors should be checked to request chunks +const NUM_ORPHAN_ANCESTORS_CHECK: u64 = 5; + +// Maximum number of orphans that we can request missing chunks +// Note that if there are no forks, the maximum number of orphans we would +// request missing chunks will not exceed NUM_ORPHAN_ANCESTORS_CHECK, +// this number only adds another restriction when there are multiple forks. +// It should almost never be hit +const MAX_ORPHAN_MISSING_CHUNKS: usize = 100; + /// Refuse blocks more than this many block intervals in the future (as in bitcoin). const ACCEPTABLE_TIME_DIFFERENCE: i64 = 12 * 10; @@ -102,6 +112,10 @@ enum ApplyChunksMode { NotCaughtUp, } +/// Orphan is a block whose previous block is not accepted (in store) yet. +/// Therefore, they are not ready to be processed yet. +/// We save these blocks in an in-memory orphan pool to be processed later +/// after their previous block is accepted. pub struct Orphan { block: MaybeValidated, provenance: Provenance, @@ -118,17 +132,34 @@ impl BlockLike for Orphan { } } -/// Contains information for missing chunks in a block -pub struct BlockMissingChunks { - /// previous block hash - pub prev_hash: CryptoHash, - pub missing_chunks: Vec, +impl Orphan { + fn prev_hash(&self) -> &CryptoHash { + self.block.header().prev_hash() + } } +/// OrphanBlockPool stores information of all orphans that are waiting to be processed +/// A block is added to the orphan pool when process_block failed because the block is an orphan +/// A block is removed from the pool if +/// 1) it is ready to be processed +/// or +/// 2) size of the pool exceeds MAX_ORPHAN_SIZE and the orphan was added a long time ago +/// or the height is high pub struct OrphanBlockPool { + /// A map from block hash to a orphan block orphans: HashMap, + /// A set that contains all orphans for which we have requested missing chunks for them + /// An orphan can be added to this set when it was first added to the pool, or later + /// when certain requirements are satisfied (see check_orphans) + /// It can only be removed from this set when the orphan is removed from the pool + orphans_requested_missing_chunks: HashSet, + /// A map from block heights to orphan blocks at the height + /// It's used to evict orphans when the pool is saturated height_idx: HashMap>, + /// A map from block hashes to orphan blocks whose prev block is the block + /// It's used to check which orphan blocks are ready to be processed when a block is accepted prev_hash_idx: HashMap>, + /// number of orphans that were evicted evicted: usize, } @@ -136,6 +167,7 @@ impl OrphanBlockPool { pub fn new() -> OrphanBlockPool { OrphanBlockPool { orphans: HashMap::default(), + orphans_requested_missing_chunks: HashSet::default(), height_idx: HashMap::default(), prev_hash_idx: HashMap::default(), evicted: 0, @@ -150,24 +182,34 @@ impl OrphanBlockPool { self.evicted } - fn add(&mut self, orphan: Orphan) { + /// Add a block to the orphan pool + /// `requested_missing_chunks`: whether missing chunks has been requested for the orphan + fn add(&mut self, orphan: Orphan, requested_missing_chunks: bool) { + let block_hash = *orphan.block.hash(); let height_hashes = self.height_idx.entry(orphan.block.header().height()).or_insert_with(|| vec![]); height_hashes.push(*orphan.block.hash()); let prev_hash_entries = self.prev_hash_idx.entry(*orphan.block.header().prev_hash()).or_insert_with(|| vec![]); - prev_hash_entries.push(*orphan.block.hash()); - self.orphans.insert(*orphan.block.hash(), orphan); + prev_hash_entries.push(block_hash.clone()); + self.orphans.insert(block_hash.clone(), orphan); + if requested_missing_chunks { + self.orphans_requested_missing_chunks.insert(block_hash.clone()); + } if self.orphans.len() > MAX_ORPHAN_SIZE { let old_len = self.orphans.len(); + let mut removed_hashes: HashSet = HashSet::default(); self.orphans.retain(|_, ref mut x| { - x.added.elapsed() < TimeDuration::from_secs(MAX_ORPHAN_AGE_SECS) + let keep = x.added.elapsed() < TimeDuration::from_secs(MAX_ORPHAN_AGE_SECS); + if !keep { + removed_hashes.insert(*x.block.hash()); + } + keep }); let mut heights = self.height_idx.keys().cloned().collect::>(); heights.sort_unstable(); - let mut removed_hashes: HashSet = HashSet::default(); for h in heights.iter().rev() { if let Some(hash) = self.height_idx.remove(h) { for h in hash { @@ -182,6 +224,7 @@ impl OrphanBlockPool { self.height_idx.retain(|_, ref mut xs| xs.iter().any(|x| !removed_hashes.contains(&x))); self.prev_hash_idx .retain(|_, ref mut xs| xs.iter().any(|x| !removed_hashes.contains(&x))); + self.orphans_requested_missing_chunks.retain(|x| !removed_hashes.contains(x)); self.evicted += old_len - self.orphans.len(); } @@ -191,12 +234,21 @@ impl OrphanBlockPool { self.orphans.contains_key(hash) } + pub fn get(&self, hash: &CryptoHash) -> Option<&Orphan> { + self.orphans.get(hash) + } + + /// Remove all orphans in the pool that can be "adopted" by block `prev_hash`, i.e., children + /// of `prev_hash` and return the list. + /// This function is called when `prev_hash` is accepted, thus its children can be removed + /// from the orphan pool and be processed. pub fn remove_by_prev_hash(&mut self, prev_hash: CryptoHash) -> Option> { let mut removed_hashes: HashSet = HashSet::default(); let ret = self.prev_hash_idx.remove(&prev_hash).map(|hs| { hs.iter() .filter_map(|h| { removed_hashes.insert(h.clone()); + self.orphans_requested_missing_chunks.remove(h); self.orphans.remove(h) }) .collect() @@ -206,6 +258,70 @@ impl OrphanBlockPool { ret } + + /// Return a list of orphans that are among the `target_depth` immediate descendants of + /// the block `parent_hash` + pub fn get_orphans_within_depth( + &self, + parent_hash: CryptoHash, + target_depth: u64, + ) -> Vec { + let mut _visited = HashSet::new(); + + let mut res = vec![]; + let mut queue = vec![(parent_hash, 0)]; + while let Some((prev_hash, depth)) = queue.pop() { + if depth == target_depth { + break; + } + if let Some(block_hashes) = self.prev_hash_idx.get(&prev_hash) { + for hash in block_hashes { + queue.push((*hash, depth + 1)); + res.push(*hash); + // there should be no loop + debug_assert!(_visited.insert(*hash)); + } + } + + // probably something serious went wrong here because there shouldn't be so many forks + assert!( + res.len() <= 100 * target_depth as usize, + "found too many orphans {:?}, probably something is wrong with the chain", + res + ); + } + res + } + + /// Returns true if the block has not been requested yet and the number of orphans + /// for which we have requested missing chunks have not exceeded MAX_ORPHAN_MISSING_CHUNKS + fn can_request_missing_chunks_for_orphan(&self, block_hash: &CryptoHash) -> bool { + self.orphans_requested_missing_chunks.len() < MAX_ORPHAN_MISSING_CHUNKS + && !self.orphans_requested_missing_chunks.contains(block_hash) + } + + fn mark_missing_chunks_requested_for_orphan(&mut self, block_hash: CryptoHash) { + self.orphans_requested_missing_chunks.insert(block_hash); + } +} + +/// Contains information for missing chunks in a block +pub struct BlockMissingChunks { + /// previous block hash + pub prev_hash: CryptoHash, + pub missing_chunks: Vec, +} + +/// Contains information needed to request chunks for orphans +/// Fields will be used as arguments for `request_chunks_for_orphan` +pub struct OrphanMissingChunks { + pub missing_chunks: Vec, + /// epoch id for the block that has missing chunks + pub epoch_id: EpochId, + /// hash of an ancestor block of the block that has missing chunks + /// this is used as an argument for `request_chunks_for_orphan` + /// see comments in `request_chunks_for_orphan` for what `ancestor_hash` is used for + pub ancestor_hash: CryptoHash, } /// Facade to the blockchain block processing and storage. @@ -469,7 +585,11 @@ impl Chain { Ok(()) } - pub fn save_orphan(&mut self, block: MaybeValidated) -> Result<(), Error> { + pub fn save_orphan( + &mut self, + block: MaybeValidated, + requested_missing_chunks: bool, + ) -> Result<(), Error> { if self.orphans.contains(block.hash()) { return Ok(()); } @@ -477,11 +597,10 @@ impl Chain { byzantine_assert!(false); return Err(e.into()); } - self.orphans.add(Orphan { - block: block, - provenance: Provenance::NONE, - added: Clock::instant(), - }); + self.orphans.add( + Orphan { block, provenance: Provenance::NONE, added: Clock::instant() }, + requested_missing_chunks, + ); Ok(()) } @@ -771,18 +890,20 @@ impl Chain { /// Process a received or produced block, and unroll any orphans that may depend on it. /// Changes current state, and calls `block_accepted` callback in case block was successfully applied. - pub fn process_block( + pub fn process_block( &mut self, me: &Option, block: MaybeValidated, provenance: Provenance, block_accepted: F, - block_misses_chunks: F2, + block_misses_chunks: F1, + block_orphaned_with_missing_chunks: F2, on_challenge: F3, ) -> Result, Error> where F: Copy + FnMut(AcceptedBlock) -> (), - F2: Copy + FnMut(BlockMissingChunks) -> (), + F1: Copy + FnMut(BlockMissingChunks) -> (), + F2: Copy + FnMut(OrphanMissingChunks) -> (), F3: Copy + FnMut(ChallengeBody) -> (), { let block_hash = *block.hash(); @@ -793,6 +914,7 @@ impl Chain { provenance, block_accepted, block_misses_chunks, + block_orphaned_with_missing_chunks, on_challenge, ); timer.observe_duration(); @@ -804,6 +926,7 @@ impl Chain { block_hash, block_accepted, block_misses_chunks, + block_orphaned_with_missing_chunks, on_challenge, ) { return Ok(Some(new_res)); @@ -1002,17 +1125,19 @@ impl Chain { /// Set the new head after state sync was completed if it is indeed newer. /// Check for potentially unlocked orphans after this update. - pub fn reset_heads_post_state_sync( + pub fn reset_heads_post_state_sync( &mut self, me: &Option, sync_hash: CryptoHash, block_accepted: F, - block_misses_chunks: F2, + block_misses_chunks: F1, + orphan_misses_chunks: F2, on_challenge: F3, ) -> Result<(), Error> where F: Copy + FnMut(AcceptedBlock) -> (), - F2: Copy + FnMut(BlockMissingChunks) -> (), + F1: Copy + FnMut(BlockMissingChunks) -> (), + F2: Copy + FnMut(OrphanMissingChunks) -> (), F3: Copy + FnMut(ChallengeBody) -> (), { // Get header we were syncing into. @@ -1037,7 +1162,14 @@ impl Chain { // Check if there are any orphans unlocked by this state sync. // We can't fail beyond this point because the caller will not process accepted blocks // and the blocks with missing chunks if this method fails - self.check_orphans(me, hash, block_accepted, block_misses_chunks, on_challenge); + self.check_orphans( + me, + hash, + block_accepted, + block_misses_chunks, + orphan_misses_chunks, + on_challenge, + ); Ok(()) } @@ -1089,18 +1221,20 @@ impl Chain { Ok(()) } - fn process_block_single( + fn process_block_single( &mut self, me: &Option, block: MaybeValidated, provenance: Provenance, mut block_accepted: F, - mut block_misses_chunks: F2, + mut block_misses_chunks: F1, + mut orphan_misses_chunks: F2, on_challenge: F3, ) -> Result, Error> where F: FnMut(AcceptedBlock) -> (), - F2: Copy + FnMut(BlockMissingChunks) -> (), + F1: Copy + FnMut(BlockMissingChunks) -> (), + F2: Copy + FnMut(OrphanMissingChunks) -> (), F3: FnMut(ChallengeBody) -> (), { metrics::BLOCK_PROCESSED_TOTAL.inc(); @@ -1160,9 +1294,21 @@ impl Chain { // we only add blocks that couldn't have been gc'ed to the orphan pool. if block_height >= tail_height { let block_hash = *block.hash(); - let orphan = Orphan { block, provenance, added: Clock::instant() }; + let requested_missing_chunks = if let Some(orphan_missing_chunks) = + self.should_request_chunks_for_orphan(me, &block) + { + debug!(target:"chain", "request missing chunks for orphan {:?} {:?}", block_hash, orphan_missing_chunks.missing_chunks); + // This callback handles requesting missing chunks. It adds the missing chunks + // to a list and all missing chunks in the list will be requested + // at the end of Client::process_block + orphan_misses_chunks(orphan_missing_chunks); + true + } else { + false + }; - self.orphans.add(orphan); + let orphan = Orphan { block, provenance, added: Clock::instant() }; + self.orphans.add(orphan, requested_missing_chunks); debug!( target: "chain", @@ -1218,6 +1364,74 @@ impl Chain { } } + /// Check if we can request chunks for this orphan. Conditions are + /// 1) Orphans that with outstanding missing chunks request has not exceed `MAX_ORPHAN_MISSING_CHUNKS` + /// 2) we haven't already requested missing chunks for the orphan + /// 3) All the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block are either accepted, + /// or orphans or in `blocks_with_missing_chunks` + /// 4) Among the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block at least one is + /// accepted(is in store), call it `ancestor` + /// 5) The next block of `ancestor` has the same epoch_id as the orphan block + /// (This is because when requesting chunks, we will use `ancestor` hash instead of the + /// previous block hash of the orphan to decide epoch id) + /// 6) The orphan has missing chunks + pub fn should_request_chunks_for_orphan( + &mut self, + me: &Option, + orphan: &Block, + ) -> Option { + // 1) Orphans that with outstanding missing chunks request has not exceed `MAX_ORPHAN_MISSING_CHUNKS` + // 2) we haven't already requested missing chunks for the orphan + if !self.orphans.can_request_missing_chunks_for_orphan(orphan.hash()) { + return None; + } + let mut block_hash = *orphan.header().prev_hash(); + for _ in 0..NUM_ORPHAN_ANCESTORS_CHECK { + // 3) All the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block are either accepted, + // or orphans or in `blocks_with_missing_chunks` + if let Some(block) = self.blocks_with_missing_chunks.get(&block_hash) { + block_hash = *block.prev_hash(); + continue; + } + if let Some(orphan) = self.orphans.get(&block_hash) { + block_hash = *orphan.prev_hash(); + continue; + } + // 4) Among the `NUM_ORPHAN_ANCESTORS_CHECK` immediate parents of the block at least one is + // accepted(is in store), call it `ancestor` + if self.get_block(&block_hash).is_ok() { + if let Ok(epoch_id) = self.runtime_adapter.get_epoch_id_from_prev_block(&block_hash) + { + // 5) The next block of `ancestor` has the same epoch_id as the orphan block + if &epoch_id == orphan.header().epoch_id() { + let mut chain_update = self.chain_update(); + // 6) The orphan has missing chunks + if let Err(e) = chain_update.ping_missing_chunks(me, block_hash, &orphan) { + return match e.kind() { + ErrorKind::ChunksMissing(missing_chunks) => { + Some(OrphanMissingChunks { + missing_chunks, + epoch_id, + ancestor_hash: block_hash, + }) + } + _ => None, + }; + } + } + } + return None; + } + return None; + } + None + } + + /// only used for test + pub fn check_orphan_partial_chunks_requested(&self, block_hash: &CryptoHash) -> bool { + self.orphans.orphans_requested_missing_chunks.contains(block_hash) + } + pub fn prev_block_is_caught_up( &self, prev_prev_hash: &CryptoHash, @@ -1264,15 +1478,17 @@ impl Chain { } /// Check if any block with missing chunk is ready to be processed - pub fn check_blocks_with_missing_chunks( + pub fn check_blocks_with_missing_chunks( &mut self, me: &Option, block_accepted: F, - block_misses_chunks: F2, + block_misses_chunks: F1, + orphan_misses_chunks: F2, on_challenge: F3, ) where F: Copy + FnMut(AcceptedBlock) -> (), - F2: Copy + FnMut(BlockMissingChunks) -> (), + F1: Copy + FnMut(BlockMissingChunks) -> (), + F2: Copy + FnMut(OrphanMissingChunks) -> (), F3: Copy + FnMut(ChallengeBody) -> (), { let mut new_blocks_accepted = vec![]; @@ -1285,6 +1501,7 @@ impl Chain { orphan.provenance, block_accepted, block_misses_chunks, + orphan_misses_chunks, on_challenge, ); match res { @@ -1304,23 +1521,34 @@ impl Chain { accepted_block, block_accepted, block_misses_chunks, + orphan_misses_chunks, on_challenge, ); } } - /// Check for orphans, once a block is successfully added. - pub fn check_orphans( + /// Check for orphans that are ready to be processed or request missing chunks, once a block + /// is successfully accepted. + /// `prev_hash`: hash of the block that is just accepted + /// `block_accepted`: callback to be called when an orphan is accepted + /// `block_misses_chunks`: callback to be called when an orphan is added to the pool of blocks + /// that have missing chunks + /// `orphan_misses_chunks`: callback to be called when it is ready to request missing chunks for + /// an orphan + /// `on_challenge`: callback to be called when an orphan should be challenged + pub fn check_orphans( &mut self, me: &Option, prev_hash: CryptoHash, block_accepted: F, - block_misses_chunks: F2, + block_misses_chunks: F1, + mut orphan_misses_chunks: F2, on_challenge: F3, ) -> Option where F: Copy + FnMut(AcceptedBlock) -> (), - F2: Copy + FnMut(BlockMissingChunks) -> (), + F1: Copy + FnMut(BlockMissingChunks) -> (), + F2: Copy + FnMut(OrphanMissingChunks) -> (), F3: Copy + FnMut(ChallengeBody) -> (), { let mut queue = vec![prev_hash]; @@ -1331,7 +1559,23 @@ impl Chain { // Check if there are orphans we can process. debug!(target: "chain", "Check orphans: from {}, # orphans {}", prev_hash, self.orphans.len()); while queue_idx < queue.len() { - if let Some(orphans) = self.orphans.remove_by_prev_hash(queue[queue_idx]) { + let prev_hash = queue[queue_idx]; + // check within the descendents of `prev_hash` to see if there are orphans there that + // are ready to request missing chunks for + let orphans_to_check = self + .orphans + .get_orphans_within_depth(prev_hash.clone(), NUM_ORPHAN_ANCESTORS_CHECK); + for orphan_hash in orphans_to_check { + let orphan = self.orphans.get(&orphan_hash).unwrap().block.clone(); + if let Some(orphan_missing_chunks) = + self.should_request_chunks_for_orphan(me, &orphan) + { + debug!(target:"chain", "request missing chunks for orphan {:?}", orphan_hash); + orphan_misses_chunks(orphan_missing_chunks); + self.orphans.mark_missing_chunks_requested_for_orphan(orphan_hash); + } + } + if let Some(orphans) = self.orphans.remove_by_prev_hash(prev_hash) { debug!(target: "chain", "Check orphans: found {} orphans", orphans.len()); for orphan in orphans.into_iter() { let block_hash = orphan.hash(); @@ -1342,6 +1586,7 @@ impl Chain { orphan.provenance, block_accepted, block_misses_chunks, + orphan_misses_chunks, on_challenge, ); timer.observe_duration(); @@ -2017,18 +2262,20 @@ impl Chain { } /// Apply transactions in chunks for the next epoch in blocks that were blocked on the state sync - pub fn finish_catchup_blocks( + pub fn finish_catchup_blocks( &mut self, me: &Option, epoch_first_block: &CryptoHash, block_accepted: F, - block_misses_chunks: F2, + block_misses_chunks: F1, + orphan_misses_chunks: F2, on_challenge: F3, affected_blocks: &Vec, ) -> Result<(), Error> where F: Copy + FnMut(AcceptedBlock) -> (), - F2: Copy + FnMut(BlockMissingChunks) -> (), + F1: Copy + FnMut(BlockMissingChunks) -> (), + F2: Copy + FnMut(OrphanMissingChunks) -> (), F3: Copy + FnMut(ChallengeBody) -> (), { debug!( @@ -2056,7 +2303,14 @@ impl Chain { chain_store_update.commit()?; for hash in affected_blocks.iter() { - self.check_orphans(me, hash.clone(), block_accepted, block_misses_chunks, on_challenge); + self.check_orphans( + me, + hash.clone(), + block_accepted, + block_misses_chunks, + orphan_misses_chunks, + on_challenge, + ); } Ok(()) @@ -3839,6 +4093,7 @@ impl<'a> ChainUpdate<'a> { let apply_chunk_work = if is_caught_up { self.apply_chunks_preprocessing(me, block, &prev_block, ApplyChunksMode::IsCaughtUp)? } else { + debug!("add block to catch up {:?} {:?}", prev_hash, *block.hash()); self.chain_store_update.add_block_to_catchup(prev_hash, *block.hash()); self.apply_chunks_preprocessing(me, block, &prev_block, ApplyChunksMode::NotCaughtUp)? }; diff --git a/chain/chain/src/missing_chunks.rs b/chain/chain/src/missing_chunks.rs index b971be1dbb4..5a77e26742d 100644 --- a/chain/chain/src/missing_chunks.rs +++ b/chain/chain/src/missing_chunks.rs @@ -66,6 +66,10 @@ impl MissingChunksPool { self.blocks_waiting_for_chunks.contains_key(block_hash) } + pub fn get(&self, block_hash: &BlockHash) -> Option<&Block> { + self.blocks_waiting_for_chunks.get(block_hash) + } + pub fn len(&self) -> usize { self.blocks_waiting_for_chunks.len() } diff --git a/chain/chain/src/tests/mod.rs b/chain/chain/src/tests/mod.rs index 490626effbf..ed67768feaa 100644 --- a/chain/chain/src/tests/mod.rs +++ b/chain/chain/src/tests/mod.rs @@ -25,6 +25,7 @@ impl Chain { |_| {}, |_| {}, |_| {}, + |_| {}, ) } } diff --git a/chain/chunks/src/lib.rs b/chain/chunks/src/lib.rs index 2eb83db90e3..6bc52512439 100644 --- a/chain/chunks/src/lib.rs +++ b/chain/chunks/src/lib.rs @@ -29,7 +29,7 @@ use near_primitives::time::Clock; use near_primitives::transaction::SignedTransaction; use near_primitives::types::validator_stake::ValidatorStake; use near_primitives::types::{ - AccountId, Balance, BlockHeight, BlockHeightDelta, Gas, MerkleHash, ShardId, StateRoot, + AccountId, Balance, BlockHeight, BlockHeightDelta, EpochId, Gas, MerkleHash, ShardId, StateRoot, }; use near_primitives::utils::MaybeValidated; use near_primitives::validator_signer::ValidatorSigner; @@ -88,7 +88,11 @@ pub enum ProcessPartialEncodedChunkResult { #[derive(Clone, Debug)] struct ChunkRequestInfo { height: BlockHeight, - parent_hash: CryptoHash, + // hash of the ancestor hash used for the request, i.e., the first block up the + // parent chain of the block that has missing chunks that is approved + ancestor_hash: CryptoHash, + // previous block hash of the chunk + prev_block_hash: CryptoHash, shard_id: ShardId, added: Instant, last_requested: Instant, @@ -452,7 +456,7 @@ impl ShardsManager { fn request_partial_encoded_chunk( &mut self, height: BlockHeight, - parent_hash: &CryptoHash, + ancestor_hash: &CryptoHash, shard_id: ShardId, chunk_hash: &ChunkHash, force_request_full: bool, @@ -466,13 +470,13 @@ impl ShardsManager { let request_full = force_request_full || self.cares_about_shard_this_or_next_epoch( self.me.as_ref(), - &parent_hash, + &ancestor_hash, shard_id, true, ); let chunk_producer_account_id = &self.runtime_adapter.get_chunk_producer( - &self.runtime_adapter.get_epoch_id_from_prev_block(parent_hash)?, + &self.runtime_adapter.get_epoch_id_from_prev_block(ancestor_hash)?, height, shard_id, )?; @@ -484,10 +488,10 @@ impl ShardsManager { { AccountIdOrPeerTrackingShard::from_account(shard_id, chunk_producer_account_id.clone()) } else { - self.get_random_target_tracking_shard(&parent_hash, shard_id, request_from_archival)? + self.get_random_target_tracking_shard(&ancestor_hash, shard_id, request_from_archival)? }; - let seal = self.seals_mgr.get_seal(chunk_hash, parent_hash, height, shard_id)?; + let seal = self.seals_mgr.get_seal(chunk_hash, ancestor_hash, height, shard_id)?; for part_ord in 0..self.runtime_adapter.num_total_parts() { let part_ord = part_ord as u64; @@ -499,7 +503,7 @@ impl ShardsManager { true } else { if let Some(me) = me { - &self.runtime_adapter.get_part_owner(&parent_hash, part_ord)? == me + &self.runtime_adapter.get_part_owner(&ancestor_hash, part_ord)? == me } else { false } @@ -509,7 +513,8 @@ impl ShardsManager { let fetch_from = if request_from_archival { shard_representative_target.clone() } else { - let part_owner = self.runtime_adapter.get_part_owner(&parent_hash, part_ord)?; + let part_owner = + self.runtime_adapter.get_part_owner(&ancestor_hash, part_ord)?; if Some(&part_owner) == me { // If missing own part, request it from the chunk producer / node tracking shard @@ -525,7 +530,7 @@ impl ShardsManager { let shards_to_fetch_receipts = // TODO: only keep shards for which we don't have receipts yet - if request_full { HashSet::new() } else { self.get_tracking_shards(&parent_hash) }; + if request_full { HashSet::new() } else { self.get_tracking_shards(&ancestor_hash) }; // The loop below will be sending PartialEncodedChunkRequestMsg to various block producers. // We need to send such a message to the original chunk producer if we do not have the receipts @@ -633,7 +638,7 @@ impl ShardsManager { /// Only marks this chunk as being requested /// Note no requests are actually sent at this point. fn request_chunk_single_mark_only(&mut self, chunk_header: &ShardChunkHeader) { - self.request_chunk_single(chunk_header, None, false) + self.request_chunk_single(chunk_header, chunk_header.prev_block_hash(), None, false) } /// send partial chunk requests for one chunk @@ -644,10 +649,10 @@ impl ShardsManager { fn request_chunk_single( &mut self, chunk_header: &ShardChunkHeader, + ancestor_hash: CryptoHash, header_head: Option<&Tip>, should_wait_for_chunk_forwarding: bool, ) { - let parent_hash = chunk_header.prev_block_hash(); let height = chunk_header.height_created(); let shard_id = chunk_header.shard_id(); let chunk_hash = chunk_header.chunk_hash(); @@ -658,11 +663,13 @@ impl ShardsManager { self.encoded_chunks.get_or_insert_from_header(chunk_hash.clone(), chunk_header); + let prev_block_hash = chunk_header.prev_block_hash(); self.requested_partial_encoded_chunks.insert( chunk_hash.clone(), ChunkRequestInfo { height, - parent_hash, + prev_block_hash, + ancestor_hash, shard_id, last_requested: Clock::instant(), added: Clock::instant(), @@ -671,12 +678,12 @@ impl ShardsManager { if let Some(header_head) = header_head { let fetch_from_archival = self.runtime_adapter - .chunk_needs_to_be_fetched_from_archival(&parent_hash, &header_head.last_block_hash).unwrap_or_else(|err| { + .chunk_needs_to_be_fetched_from_archival(&ancestor_hash, &header_head.last_block_hash).unwrap_or_else(|err| { error!(target: "chunks", "Error during requesting partial encoded chunk. Cannot determine whether to request from an archival node, defaulting to not: {}", err); false }); - let old_block = header_head.last_block_hash != parent_hash - && header_head.prev_block_hash != parent_hash; + let old_block = header_head.last_block_hash != prev_block_hash + && header_head.prev_block_hash != prev_block_hash; // If chunks forwarding is enabled, // we purposely do not send chunk request messages right away for new blocks. Such requests @@ -686,7 +693,7 @@ impl ShardsManager { if !should_wait_for_chunk_forwarding || fetch_from_archival || old_block { let request_result = self.request_partial_encoded_chunk( height, - &parent_hash, + &ancestor_hash, shard_id, &chunk_hash, false, @@ -708,13 +715,13 @@ impl ShardsManager { pub fn request_chunks( &mut self, chunks_to_request: T, - prev_hash: &CryptoHash, + prev_hash: CryptoHash, header_head: &Tip, ) where T: IntoIterator, { let is_chunk_forwarding_enabled = - self.should_wait_for_chunk_forwarding(prev_hash).unwrap_or_else(|_| { + self.should_wait_for_chunk_forwarding(&prev_hash).unwrap_or_else(|_| { // prev_hash must be accepted because we don't request missing chunks through this // this function for orphans debug_assert!(false, "{:?} must be accepted", prev_hash); @@ -722,7 +729,53 @@ impl ShardsManager { false }); for chunk_header in chunks_to_request { - self.request_chunk_single(&chunk_header, Some(header_head), is_chunk_forwarding_enabled) + self.request_chunk_single( + &chunk_header, + prev_hash, + Some(header_head), + is_chunk_forwarding_enabled, + ); + } + } + + /// Request chunks for an orphan block. + /// `epoch_id`: epoch_id of the orphan block + /// `ancestor_hash`: because BlockInfo for the immediate parent of an orphan block is not ready, + /// we use hash of an ancestor block of this orphan to request chunks. It must + /// satisfy + /// 1) it is from the same epoch than `epoch_id` + /// 2) it is processed + /// If the above conditions are not met, the request will be dropped + pub fn request_chunks_for_orphan( + &mut self, + chunks_to_request: T, + epoch_id: &EpochId, + ancestor_hash: CryptoHash, + header_head: &Tip, + ) where + T: IntoIterator, + { + let ancestor_epoch_id = + unwrap_or_return!(self.runtime_adapter.get_epoch_id_from_prev_block(&ancestor_hash)); + if epoch_id != &ancestor_epoch_id { + return; + } + + let should_wait_for_chunk_forwarding = + self.should_wait_for_chunk_forwarding(&ancestor_hash).unwrap_or_else(|_| { + // ancestor_hash must be accepted because we don't request missing chunks through this + // this function for orphans + debug_assert!(false, "{:?} must be accepted", ancestor_hash); + error!(target:"chunks", "requesting chunks for orphan whose ancestor_hash {:?} is not accepted", ancestor_hash); + false + }); + for chunk_header in chunks_to_request { + self.request_chunk_single( + &chunk_header, + ancestor_hash, + Some(header_head), + should_wait_for_chunk_forwarding, + ) } } @@ -732,16 +785,16 @@ impl ShardsManager { let requests = self.requested_partial_encoded_chunks.fetch(); for (chunk_hash, chunk_request) in requests { let fetch_from_archival = self.runtime_adapter - .chunk_needs_to_be_fetched_from_archival(&chunk_request.parent_hash, &header_head.last_block_hash).unwrap_or_else(|err| { + .chunk_needs_to_be_fetched_from_archival(&chunk_request.ancestor_hash, &header_head.last_block_hash).unwrap_or_else(|err| { error!(target: "chunks", "Error during re-requesting partial encoded chunk. Cannot determine whether to request from an archival node, defaulting to not: {}", err); false }); - let old_block = header_head.last_block_hash != chunk_request.parent_hash - && header_head.prev_block_hash != chunk_request.parent_hash; + let old_block = header_head.last_block_hash != chunk_request.prev_block_hash + && header_head.prev_block_hash != chunk_request.prev_block_hash; match self.request_partial_encoded_chunk( chunk_request.height, - &chunk_request.parent_hash, + &chunk_request.ancestor_hash, chunk_request.shard_id, &chunk_hash, chunk_request.added.elapsed() @@ -1824,7 +1877,8 @@ mod test { ChunkHash(hash(&[1])), ChunkRequestInfo { height: 0, - parent_hash: Default::default(), + ancestor_hash: Default::default(), + prev_block_hash: Default::default(), shard_id: 0, added: added, last_requested: added, @@ -1904,7 +1958,8 @@ mod test { header.chunk_hash(), ChunkRequestInfo { height: header.height_created(), - parent_hash: header.prev_block_hash(), + ancestor_hash: header.prev_block_hash(), + prev_block_hash: header.prev_block_hash(), shard_id: header.shard_id(), last_requested: Clock::instant(), added: Clock::instant(), @@ -2114,7 +2169,7 @@ mod test { }; shards_manager.request_chunks( vec![fixture.mock_chunk_header.clone()], - &fixture.mock_chunk_header.prev_block_hash(), + fixture.mock_chunk_header.prev_block_hash(), &header_head, ); assert!(shards_manager @@ -2135,7 +2190,7 @@ mod test { ); shards_manager.request_chunks( vec![fixture.mock_chunk_header.clone()], - &fixture.mock_chunk_header.prev_block_hash(), + fixture.mock_chunk_header.prev_block_hash(), &header_head, ); assert!(shards_manager @@ -2156,7 +2211,7 @@ mod test { ); shards_manager.request_chunks( vec![fixture.mock_chunk_header.clone()], - &fixture.mock_chunk_header.prev_block_hash(), + fixture.mock_chunk_header.prev_block_hash(), &header_head, ); assert!(shards_manager diff --git a/chain/client/src/client.rs b/chain/client/src/client.rs index 7fae2b07650..da9885fcca1 100644 --- a/chain/client/src/client.rs +++ b/chain/client/src/client.rs @@ -12,7 +12,7 @@ use near_primitives::time::Clock; use near_chain::chain::{ ApplyStatePartsRequest, BlockCatchUpRequest, BlockMissingChunks, BlocksCatchUpState, - StateSplitRequest, TX_ROUTING_HEIGHT_HORIZON, + OrphanMissingChunks, StateSplitRequest, TX_ROUTING_HEIGHT_HORIZON, }; use near_chain::test_utils::format_hash; use near_chain::types::{AcceptedBlock, LatestKnown}; @@ -724,6 +724,7 @@ impl Client { // TODO: replace to channels or cross beams here? we don't have multi-threading here so it's mostly to get around borrow checker. let accepted_blocks = Arc::new(RwLock::new(vec![])); let blocks_missing_chunks = Arc::new(RwLock::new(vec![])); + let orphans_missing_chunks = Arc::new(RwLock::new(vec![])); let challenges = Arc::new(RwLock::new(vec![])); let result = { @@ -739,6 +740,9 @@ impl Client { accepted_blocks.write().unwrap().push(accepted_block); }, |missing_chunks| blocks_missing_chunks.write().unwrap().push(missing_chunks), + |orphan_missing_chunks| { + orphans_missing_chunks.write().unwrap().push(orphan_missing_chunks); + }, |challenge| challenges.write().unwrap().push(challenge), ) }; @@ -777,7 +781,7 @@ impl Client { } // Request any missing chunks - self.request_missing_chunks(blocks_missing_chunks); + self.request_missing_chunks(blocks_missing_chunks, orphans_missing_chunks); let unwrapped_accepted_blocks = accepted_blocks.write().unwrap().drain(..).collect(); (unwrapped_accepted_blocks, result) @@ -892,7 +896,7 @@ impl Client { let prev_hash = chunk_header.prev_block_hash(); self.shards_mgr.request_chunks( iter::once(chunk_header), - &prev_hash, + prev_hash, &self.chain.header_head()?, ); Ok(vec![]) @@ -1190,13 +1194,28 @@ impl Client { pub fn request_missing_chunks( &mut self, blocks_missing_chunks: Arc>>, + orphans_missing_chunks: Arc>>, ) { for BlockMissingChunks { prev_hash, missing_chunks } in blocks_missing_chunks.write().unwrap().drain(..) { self.shards_mgr.request_chunks( missing_chunks, - &prev_hash, + prev_hash, + &self + .chain + .header_head() + .expect("header_head must be available when processing a block"), + ); + } + + for OrphanMissingChunks { missing_chunks, epoch_id, ancestor_hash } in + orphans_missing_chunks.write().unwrap().drain(..) + { + self.shards_mgr.request_chunks_for_orphan( + missing_chunks, + &epoch_id, + ancestor_hash, &self .chain .header_head() @@ -1210,16 +1229,22 @@ impl Client { pub fn process_blocks_with_missing_chunks(&mut self) -> Vec { let accepted_blocks = Arc::new(RwLock::new(vec![])); let blocks_missing_chunks = Arc::new(RwLock::new(vec![])); + let orphans_missing_chunks = Arc::new(RwLock::new(vec![])); let challenges = Arc::new(RwLock::new(vec![])); let me = self.validator_signer.as_ref().map(|validator_signer| validator_signer.validator_id()); - self.chain.check_blocks_with_missing_chunks(&me.map(|x| x.clone()), |accepted_block| { - debug!(target: "client", "Block {} was missing chunks but now is ready to be processed", accepted_block.hash); - accepted_blocks.write().unwrap().push(accepted_block); - }, |missing_chunks| blocks_missing_chunks.write().unwrap().push(missing_chunks), |challenge| challenges.write().unwrap().push(challenge)); + self.chain.check_blocks_with_missing_chunks( + &me.map(|x| x.clone()), + |accepted_block| { + debug!(target: "client", "Block {} was missing chunks but now is ready to be processed", accepted_block.hash); + accepted_blocks.write().unwrap().push(accepted_block); + }, + |missing_chunks| blocks_missing_chunks.write().unwrap().push(missing_chunks), + |orphan_missing_chunks| orphans_missing_chunks.write().unwrap().push(orphan_missing_chunks), + |challenge| challenges.write().unwrap().push(challenge)); self.send_challenges(challenges); - self.request_missing_chunks(blocks_missing_chunks); + self.request_missing_chunks(blocks_missing_chunks, orphans_missing_chunks); let unwrapped_accepted_blocks = accepted_blocks.write().unwrap().drain(..).collect(); unwrapped_accepted_blocks } @@ -1707,6 +1732,7 @@ impl Client { if blocks_catch_up_state.is_finished() { let accepted_blocks = Arc::new(RwLock::new(vec![])); let blocks_missing_chunks = Arc::new(RwLock::new(vec![])); + let orphans_missing_chunks = Arc::new(RwLock::new(vec![])); let challenges = Arc::new(RwLock::new(vec![])); self.chain.finish_catchup_blocks( @@ -1718,13 +1744,16 @@ impl Client { |missing_chunks| { blocks_missing_chunks.write().unwrap().push(missing_chunks) }, + |orphan_missing_chunks| { + orphans_missing_chunks.write().unwrap().push(orphan_missing_chunks) + }, |challenge| challenges.write().unwrap().push(challenge), &blocks_catch_up_state.done_blocks, )?; self.send_challenges(challenges); - self.request_missing_chunks(blocks_missing_chunks); + self.request_missing_chunks(blocks_missing_chunks, orphans_missing_chunks); return Ok(accepted_blocks.write().unwrap().drain(..).collect()); } diff --git a/chain/client/src/client_actor.rs b/chain/client/src/client_actor.rs index 4f21551a2f0..af62db4e14b 100644 --- a/chain/client/src/client_actor.rs +++ b/chain/client/src/client_actor.rs @@ -389,7 +389,9 @@ impl Handler for ClientActor { error!(target: "client", "Failed to save a block during state sync: {}", e); } } else if block.hash() == sync_hash { - if let Err(e) = self.client.chain.save_orphan(block.into()) { + // This is the immediate block after a state sync + // We can afford to delay requesting missing chunks for this one block + if let Err(e) = self.client.chain.save_orphan(block.into(), false) { error!(target: "client", "Received an invalid block during state sync: {}", e); } } @@ -954,8 +956,11 @@ impl ClientActor { match &res { Ok(_) => Ok(()), Err(e) => match e.kind() { - // missing chunks were already dealt with in client.process_block - near_chain::ErrorKind::ChunksMissing(_) => Ok(()), + near_chain::ErrorKind::ChunksMissing(_) => { + // missing chunks were already handled in Client::process_block, we don't need to + // do anything here + Ok(()) + } _ => { error!(target: "client", "Failed to process freshly produced block: {:?}", res); byzantine_assert!(false); @@ -1415,6 +1420,7 @@ impl ClientActor { info!(target: "sync", "State sync: all shards are done"); let accepted_blocks = Arc::new(RwLock::new(vec![])); + let orphans_missing_chunks = Arc::new(RwLock::new(vec![])); let blocks_missing_chunks = Arc::new(RwLock::new(vec![])); let challenges = Arc::new(RwLock::new(vec![])); @@ -1427,6 +1433,9 @@ impl ClientActor { |missing_chunks| { blocks_missing_chunks.write().unwrap().push(missing_chunks) }, + |orphan_missing_chunks| { + orphans_missing_chunks.write().unwrap().push(orphan_missing_chunks); + }, |challenge| challenges.write().unwrap().push(challenge) )); @@ -1436,7 +1445,8 @@ impl ClientActor { accepted_blocks.write().unwrap().drain(..).collect(), ); - self.client.request_missing_chunks(blocks_missing_chunks); + self.client + .request_missing_chunks(blocks_missing_chunks, orphans_missing_chunks); self.client.sync_status = SyncStatus::BodySync { current_height: 0, highest_height: 0 }; diff --git a/chain/client/src/sync.rs b/chain/client/src/sync.rs index 31c17e7e935..b3e6a56a092 100644 --- a/chain/client/src/sync.rs +++ b/chain/client/src/sync.rs @@ -1334,7 +1334,15 @@ mod test { let prev = chain.get_block(&chain.head().unwrap().last_block_hash).unwrap(); let block = Block::empty(prev, &*signer); chain - .process_block(&None, block.into(), Provenance::PRODUCED, |_| {}, |_| {}, |_| {}) + .process_block( + &None, + block.into(), + Provenance::PRODUCED, + |_| {}, + |_| {}, + |_| {}, + |_| {}, + ) .unwrap(); } let (mut chain2, _, signer2) = setup(); @@ -1342,7 +1350,15 @@ mod test { let prev = chain2.get_block(&chain2.head().unwrap().last_block_hash).unwrap(); let block = Block::empty(&prev, &*signer2); chain2 - .process_block(&None, block.into(), Provenance::PRODUCED, |_| {}, |_| {}, |_| {}) + .process_block( + &None, + block.into(), + Provenance::PRODUCED, + |_| {}, + |_| {}, + |_| {}, + |_| {}, + ) .unwrap(); } let mut sync_status = SyncStatus::NoSync; diff --git a/chain/client/src/test_utils.rs b/chain/client/src/test_utils.rs index 9b11a59034e..a1b223e7619 100644 --- a/chain/client/src/test_utils.rs +++ b/chain/client/src/test_utils.rs @@ -53,8 +53,8 @@ use near_chain::types::AcceptedBlock; use near_client_primitives::types::Error; use near_network::types::{NetworkInfo, PeerManagerMessageRequest, PeerManagerMessageResponse}; use near_network_primitives::types::{ - AccountOrPeerIdOrHash, NetworkViewClientMessages, NetworkViewClientResponses, PeerChainInfoV2, - PeerInfo, + AccountOrPeerIdOrHash, NetworkViewClientMessages, NetworkViewClientResponses, + PartialEncodedChunkRequestMsg, PartialEncodedChunkResponseMsg, PeerChainInfoV2, PeerInfo, }; use near_primitives::epoch_manager::RngSeed; use near_primitives::network::PeerId; @@ -1377,6 +1377,61 @@ impl TestEnv { } } + /// Process all PartialEncodedChunkRequests in the network queue for a client + /// `id`: id for the client + pub fn process_partial_encoded_chunks_requests(&mut self, id: usize) { + while let Some(request) = self.network_adapters[id].pop() { + self.process_partial_encoded_chunk_request(id, request); + } + } + + /// Send the PartialEncodedChunkRequest to the target client, get response and process the response + pub fn process_partial_encoded_chunk_request( + &mut self, + id: usize, + request: PeerManagerMessageRequest, + ) { + if let PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedChunkRequest { target, request }, + ) = request + { + let target_id = self.account_to_client_index[&target.account_id.unwrap()]; + let response = self.get_partial_encoded_chunk_response(target_id, request); + let accepted_blocks = + self.clients[id].process_partial_encoded_chunk_response(response).unwrap(); + for block in accepted_blocks { + self.clients[id].on_block_accepted(block.hash, block.status, block.provenance); + } + } else { + panic!("The request is not a PartialEncodedChunk request {:?}", request); + } + } + + fn get_partial_encoded_chunk_response( + &mut self, + id: usize, + request: PartialEncodedChunkRequestMsg, + ) -> PartialEncodedChunkResponseMsg { + let client = &mut self.clients[id]; + client.shards_mgr.process_partial_encoded_chunk_request( + request, + CryptoHash::default(), + client.chain.mut_store(), + ); + let response = self.network_adapters[id].pop().unwrap(); + if let PeerManagerMessageRequest::NetworkRequests( + NetworkRequests::PartialEncodedChunkResponse { route_back: _, response }, + ) = response + { + return response; + } else { + panic!( + "did not find PartialEncodedChunkResponse from the network queue {:?}", + response + ); + } + } + pub fn send_money(&mut self, id: usize) -> NetworkClientResponses { let account_id = self.get_client_id(0); let signer = diff --git a/integration-tests/tests/client/process_blocks.rs b/integration-tests/tests/client/process_blocks.rs index 52c4dc6642c..293e05964e3 100644 --- a/integration-tests/tests/client/process_blocks.rs +++ b/integration-tests/tests/client/process_blocks.rs @@ -5,6 +5,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::{Arc, RwLock}; use actix::System; +use assert_matches::assert_matches; use futures::{future, FutureExt}; use near_primitives::num_rational::Rational; @@ -66,7 +67,7 @@ use near_store::db::DBCol::ColStateParts; use near_store::get; use near_store::test_utils::create_test_store; use nearcore::config::{GenesisExt, TESTING_INIT_BALANCE, TESTING_INIT_STAKE}; -use nearcore::NEAR_BASE; +use nearcore::{TrackedConfig, NEAR_BASE}; use rand::Rng; pub fn set_block_protocol_version( @@ -1764,7 +1765,7 @@ fn test_gc_tail_update() { store_update.commit().unwrap(); env.clients[1] .chain - .reset_heads_post_state_sync(&None, *sync_block.hash(), |_| {}, |_| {}, |_| {}) + .reset_heads_post_state_sync(&None, *sync_block.hash(), |_| {}, |_| {}, |_| {}, |_| {}) .unwrap(); env.process_block(1, blocks.pop().unwrap(), Provenance::NONE); assert_eq!(env.clients[1].chain.store().tail().unwrap(), prev_sync_height); @@ -3189,6 +3190,7 @@ fn test_limit_contract_functions_number_upgrade() { Path::new("."), create_test_store(), &genesis, + TrackedConfig::new_empty(), RuntimeConfigStore::new(None), ))]; let mut env = TestEnv::builder(chain_genesis).runtime_adapters(runtimes).build(); @@ -3394,6 +3396,132 @@ mod access_key_nonce_range_tests { NetworkClientResponses::InvalidTx(InvalidTxError::InvalidAccessKeyError(_)) )); } + + #[test] + /// This test tests the logic regarding requesting chunks for orphan. + /// The test tests the following scenario, there is one validator(test0) and one non-validator node(test1) + /// test0 produces and processes 20 blocks and test1 processes these blocks with some delays. We + /// want to test that test1 requests missing chunks for orphans ahead of time. + /// + /// - test1 processes blocks 1, 2 successfully + /// - test1 processes blocks 3, 4, ..., 20, but it doesn't have chunks for these blocks, so block 3 + /// will be put to the missing chunks pool while block 4 - 20 will be orphaned + /// - check that test1 sends missing chunk requests for block 4 - 7 + /// - test1 processes partial chunk responses for block 4 - 7 + /// - test1 processes partial chunk responses for block 3 + /// - check that block 3 - 7 are accepted, this confirms that the missing chunk requests are sent + /// and processed successfully for block 4 - 7 + /// - check that test1 sends missing chunk requests for block 9, because now it satisfies the requirements + /// for requesting chunks for orphans + /// - check that test1 does not send missing chunk requests for block 10, because it breaks + /// the requirement that the block must be in the same epoch as the next block after its accepted ancestor + /// - test1 processes partial chunk responses for block 8 and 9 + /// - check that test1 sends missing chunk requests for block 11 to 15, since now they satisfy the + /// the requirements for requesting chunks for orphans + fn test_request_chunks_for_orphan() { + init_test_logger(); + + let num_clients = 2; + let num_validators = 1; + let epoch_length = 10; + + let accounts: Vec = + (0..num_clients).map(|i| format!("test{}", i).to_string().parse().unwrap()).collect(); + let mut genesis = Genesis::test(accounts, num_validators); + genesis.config.epoch_length = epoch_length; + let chain_genesis = ChainGenesis::from(&genesis); + let runtimes: Vec> = (0..2) + .map(|_| { + Arc::new(nearcore::NightshadeRuntime::test_with_runtime_config_store( + Path::new("."), + create_test_store(), + &genesis, + TrackedConfig::AllShards, + RuntimeConfigStore::test(), + )) as Arc + }) + .collect(); + let mut env = TestEnv::builder(chain_genesis) + .clients_count(num_clients) + .validator_seats(num_validators as usize) + .runtime_adapters(runtimes) + .build(); + + let mut blocks = vec![]; + // produce 20 blocks + for i in 1..=20 { + let block = env.clients[0].produce_block(i).unwrap().unwrap(); + blocks.push(block.clone()); + env.process_block(0, block, Provenance::PRODUCED); + } + + env.clients[1].process_block(blocks[0].clone().into(), Provenance::NONE).1.unwrap(); + // process blocks 1, 2 successfully + for i in 1..3 { + let (_, res) = env.clients[1].process_block(blocks[i].clone().into(), Provenance::NONE); + run_catchup(&mut env.clients[1], &vec![]).unwrap(); + assert_matches!(res, Err(e) => { + assert_matches!(e.kind(), near_chain::ErrorKind::ChunksMissing(_)); + }); + env.process_partial_encoded_chunks_requests(1); + } + + // process blocks 3 to 15 without processing missing chunks + // block 3 will be put into the blocks_with_missing_chunks pool + let (_, res) = env.clients[1].process_block(blocks[3].clone().into(), Provenance::NONE); + assert_matches!(res, Err(e) => { + assert_matches!(e.kind(), near_chain::ErrorKind::ChunksMissing(_)); + }); + // remove the missing chunk request from the network queue because we want to process it later + let missing_chunk_request = env.network_adapters[1].pop().unwrap(); + // block 4-20 will be put to the orphan pool + for i in 4..20 { + let (_, res) = env.clients[1].process_block(blocks[i].clone().into(), Provenance::NONE); + assert_matches!(res, Err(e) => { + assert_eq!(e.kind(), near_chain::ErrorKind::Orphan); + }); + } + // check that block 4-7 requested partial encoded chunks already + for i in 4..8 { + assert!( + env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[i].hash()), + "{}", + i + ); + } + assert!(!env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[8].hash())); + assert!(!env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[9].hash())); + // process all the partial encoded chunk requests for block 4 - 7 + env.process_partial_encoded_chunks_requests(1); + + // process partial encoded chunk request for block 3, which will unlock block 4-7 + env.process_partial_encoded_chunk_request(1, missing_chunk_request); + assert_eq!(&env.clients[1].chain.head().unwrap().last_block_hash, blocks[7].hash()); + + // check that `check_orphans` will request PartialChunks for new orphans as new blocks are processed + assert!(env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[9].hash())); + // blocks[10] is at the new epoch, so we can't request partial chunks for it yet + assert!(!env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[10].hash())); + + // process missing chunks for block 8 and 9 + let request = env.network_adapters[1].pop().unwrap(); + env.process_partial_encoded_chunk_request(1, request); + let request = env.network_adapters[1].pop().unwrap(); + env.process_partial_encoded_chunk_request(1, request); + assert_eq!(&env.clients[1].chain.head().unwrap().last_block_hash, blocks[9].hash()); + + assert!(env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[11].hash())); + assert!(env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[12].hash())); + assert!(env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[13].hash())); + assert!(env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[14].hash())); + assert!(!env.clients[1].chain.check_orphan_partial_chunks_requested(blocks[15].hash())); + + for i in 10..=15 { + let request = env.network_adapters[1].pop().unwrap(); + env.process_partial_encoded_chunk_request(1, request); + assert_eq!(&env.clients[1].chain.head().unwrap().last_block_hash, blocks[i].hash()); + } + } } mod protocol_feature_restore_receipts_after_fix_tests { diff --git a/nearcore/src/runtime/mod.rs b/nearcore/src/runtime/mod.rs index c78f9d61a72..b784bc5638f 100644 --- a/nearcore/src/runtime/mod.rs +++ b/nearcore/src/runtime/mod.rs @@ -216,23 +216,22 @@ impl NightshadeRuntime { home_dir: &Path, store: Arc, genesis: &Genesis, + tracked_config: TrackedConfig, runtime_config_store: RuntimeConfigStore, ) -> Self { - Self::new( + Self::new(home_dir, store, genesis, tracked_config, None, None, Some(runtime_config_store)) + } + + pub fn test(home_dir: &Path, store: Arc, genesis: &Genesis) -> Self { + Self::test_with_runtime_config_store( home_dir, store, genesis, TrackedConfig::new_empty(), - None, - None, - Some(runtime_config_store), + RuntimeConfigStore::test(), ) } - pub fn test(home_dir: &Path, store: Arc, genesis: &Genesis) -> Self { - Self::test_with_runtime_config_store(home_dir, store, genesis, RuntimeConfigStore::test()) - } - fn get_epoch_height_from_prev_block( &self, prev_block_hash: &CryptoHash, diff --git a/test-utils/runtime-tester/src/run_test.rs b/test-utils/runtime-tester/src/run_test.rs index 9fb9103e5b6..5661adceb6a 100644 --- a/test-utils/runtime-tester/src/run_test.rs +++ b/test-utils/runtime-tester/src/run_test.rs @@ -13,6 +13,7 @@ use near_primitives::transaction::{Action, SignedTransaction}; use near_primitives::types::{AccountId, BlockHeight, BlockHeightDelta, Gas, Nonce}; use near_store::create_store; use near_store::test_utils::create_test_store; +use nearcore::TrackedConfig; use nearcore::{config::GenesisExt, NightshadeRuntime}; use near_primitives::runtime::config_store::RuntimeConfigStore; @@ -60,6 +61,7 @@ impl Scenario { if let Some(tempdir) = &tempdir { tempdir.path() } else { Path::new(".") }, store, &genesis, + TrackedConfig::new_empty(), runtime_config_store, ))]) .build();