Skip to content

Commit

Permalink
Revert "Revert "Request chunks for multiple blocks simultaneously (#5… (
Browse files Browse the repository at this point in the history
#5637)

…224)" (#5473)"

This comment commits the change for "Request chunks for multiple blocks simultaneously again".

Initially I reverted the change because testnet canary ran into OOM issues and we suspect that change caused the issue. However, I couldn't reproduce the issue when I started a new testnet rpc node (I restarted the node several times to test when it was behind). I am going to revert this commit because 1) testnet release is finished 2) it's good to test the change in betanet before the next testnet release

I also discovered a few other issues in the current implementation regarding requesting chunks, some of them will help with reducing network requests if that was the culprit. I will address them in separate PRs. After this, I will work with @nikurt to start a mocknet testing all these changes together.
  • Loading branch information
mzhangmzz authored and Min Zhang committed Dec 16, 2021
1 parent 1eaa01d commit 64b875b
Show file tree
Hide file tree
Showing 11 changed files with 651 additions and 97 deletions.
335 changes: 295 additions & 40 deletions chain/chain/src/chain.rs

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions chain/chain/src/missing_chunks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ impl<Block: BlockLike> MissingChunksPool<Block> {
self.blocks_waiting_for_chunks.contains_key(block_hash)
}

pub fn get(&self, block_hash: &BlockHash) -> Option<&Block> {
self.blocks_waiting_for_chunks.get(block_hash)
}

pub fn len(&self) -> usize {
self.blocks_waiting_for_chunks.len()
}
Expand Down
1 change: 1 addition & 0 deletions chain/chain/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ impl Chain {
|_| {},
|_| {},
|_| {},
|_| {},
)
}
}
113 changes: 84 additions & 29 deletions chain/chunks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use near_primitives::time::Clock;
use near_primitives::transaction::SignedTransaction;
use near_primitives::types::validator_stake::ValidatorStake;
use near_primitives::types::{
AccountId, Balance, BlockHeight, BlockHeightDelta, Gas, MerkleHash, ShardId, StateRoot,
AccountId, Balance, BlockHeight, BlockHeightDelta, EpochId, Gas, MerkleHash, ShardId, StateRoot,
};
use near_primitives::utils::MaybeValidated;
use near_primitives::validator_signer::ValidatorSigner;
Expand Down Expand Up @@ -88,7 +88,11 @@ pub enum ProcessPartialEncodedChunkResult {
#[derive(Clone, Debug)]
struct ChunkRequestInfo {
height: BlockHeight,
parent_hash: CryptoHash,
// hash of the ancestor hash used for the request, i.e., the first block up the
// parent chain of the block that has missing chunks that is approved
ancestor_hash: CryptoHash,
// previous block hash of the chunk
prev_block_hash: CryptoHash,
shard_id: ShardId,
added: Instant,
last_requested: Instant,
Expand Down Expand Up @@ -452,7 +456,7 @@ impl ShardsManager {
fn request_partial_encoded_chunk(
&mut self,
height: BlockHeight,
parent_hash: &CryptoHash,
ancestor_hash: &CryptoHash,
shard_id: ShardId,
chunk_hash: &ChunkHash,
force_request_full: bool,
Expand All @@ -466,13 +470,13 @@ impl ShardsManager {
let request_full = force_request_full
|| self.cares_about_shard_this_or_next_epoch(
self.me.as_ref(),
&parent_hash,
&ancestor_hash,
shard_id,
true,
);

let chunk_producer_account_id = &self.runtime_adapter.get_chunk_producer(
&self.runtime_adapter.get_epoch_id_from_prev_block(parent_hash)?,
&self.runtime_adapter.get_epoch_id_from_prev_block(ancestor_hash)?,
height,
shard_id,
)?;
Expand All @@ -484,10 +488,10 @@ impl ShardsManager {
{
AccountIdOrPeerTrackingShard::from_account(shard_id, chunk_producer_account_id.clone())
} else {
self.get_random_target_tracking_shard(&parent_hash, shard_id, request_from_archival)?
self.get_random_target_tracking_shard(&ancestor_hash, shard_id, request_from_archival)?
};

let seal = self.seals_mgr.get_seal(chunk_hash, parent_hash, height, shard_id)?;
let seal = self.seals_mgr.get_seal(chunk_hash, ancestor_hash, height, shard_id)?;

for part_ord in 0..self.runtime_adapter.num_total_parts() {
let part_ord = part_ord as u64;
Expand All @@ -499,7 +503,7 @@ impl ShardsManager {
true
} else {
if let Some(me) = me {
&self.runtime_adapter.get_part_owner(&parent_hash, part_ord)? == me
&self.runtime_adapter.get_part_owner(&ancestor_hash, part_ord)? == me
} else {
false
}
Expand All @@ -509,7 +513,8 @@ impl ShardsManager {
let fetch_from = if request_from_archival {
shard_representative_target.clone()
} else {
let part_owner = self.runtime_adapter.get_part_owner(&parent_hash, part_ord)?;
let part_owner =
self.runtime_adapter.get_part_owner(&ancestor_hash, part_ord)?;

if Some(&part_owner) == me {
// If missing own part, request it from the chunk producer / node tracking shard
Expand All @@ -525,7 +530,7 @@ impl ShardsManager {

let shards_to_fetch_receipts =
// TODO: only keep shards for which we don't have receipts yet
if request_full { HashSet::new() } else { self.get_tracking_shards(&parent_hash) };
if request_full { HashSet::new() } else { self.get_tracking_shards(&ancestor_hash) };

// The loop below will be sending PartialEncodedChunkRequestMsg to various block producers.
// We need to send such a message to the original chunk producer if we do not have the receipts
Expand Down Expand Up @@ -633,7 +638,7 @@ impl ShardsManager {
/// Only marks this chunk as being requested
/// Note no requests are actually sent at this point.
fn request_chunk_single_mark_only(&mut self, chunk_header: &ShardChunkHeader) {
self.request_chunk_single(chunk_header, None, false)
self.request_chunk_single(chunk_header, chunk_header.prev_block_hash(), None, false)
}

/// send partial chunk requests for one chunk
Expand All @@ -644,10 +649,10 @@ impl ShardsManager {
fn request_chunk_single(
&mut self,
chunk_header: &ShardChunkHeader,
ancestor_hash: CryptoHash,
header_head: Option<&Tip>,
should_wait_for_chunk_forwarding: bool,
) {
let parent_hash = chunk_header.prev_block_hash();
let height = chunk_header.height_created();
let shard_id = chunk_header.shard_id();
let chunk_hash = chunk_header.chunk_hash();
Expand All @@ -658,11 +663,13 @@ impl ShardsManager {

self.encoded_chunks.get_or_insert_from_header(chunk_hash.clone(), chunk_header);

let prev_block_hash = chunk_header.prev_block_hash();
self.requested_partial_encoded_chunks.insert(
chunk_hash.clone(),
ChunkRequestInfo {
height,
parent_hash,
prev_block_hash,
ancestor_hash,
shard_id,
last_requested: Clock::instant(),
added: Clock::instant(),
Expand All @@ -671,12 +678,12 @@ impl ShardsManager {

if let Some(header_head) = header_head {
let fetch_from_archival = self.runtime_adapter
.chunk_needs_to_be_fetched_from_archival(&parent_hash, &header_head.last_block_hash).unwrap_or_else(|err| {
.chunk_needs_to_be_fetched_from_archival(&ancestor_hash, &header_head.last_block_hash).unwrap_or_else(|err| {
error!(target: "chunks", "Error during requesting partial encoded chunk. Cannot determine whether to request from an archival node, defaulting to not: {}", err);
false
});
let old_block = header_head.last_block_hash != parent_hash
&& header_head.prev_block_hash != parent_hash;
let old_block = header_head.last_block_hash != prev_block_hash
&& header_head.prev_block_hash != prev_block_hash;

// If chunks forwarding is enabled,
// we purposely do not send chunk request messages right away for new blocks. Such requests
Expand All @@ -686,7 +693,7 @@ impl ShardsManager {
if !should_wait_for_chunk_forwarding || fetch_from_archival || old_block {
let request_result = self.request_partial_encoded_chunk(
height,
&parent_hash,
&ancestor_hash,
shard_id,
&chunk_hash,
false,
Expand All @@ -708,21 +715,67 @@ impl ShardsManager {
pub fn request_chunks<T>(
&mut self,
chunks_to_request: T,
prev_hash: &CryptoHash,
prev_hash: CryptoHash,
header_head: &Tip,
) where
T: IntoIterator<Item = ShardChunkHeader>,
{
let is_chunk_forwarding_enabled =
self.should_wait_for_chunk_forwarding(prev_hash).unwrap_or_else(|_| {
self.should_wait_for_chunk_forwarding(&prev_hash).unwrap_or_else(|_| {
// prev_hash must be accepted because we don't request missing chunks through this
// this function for orphans
debug_assert!(false, "{:?} must be accepted", prev_hash);
error!(target:"chunks", "requesting chunks for orphan {:?}", prev_hash);
false
});
for chunk_header in chunks_to_request {
self.request_chunk_single(&chunk_header, Some(header_head), is_chunk_forwarding_enabled)
self.request_chunk_single(
&chunk_header,
prev_hash,
Some(header_head),
is_chunk_forwarding_enabled,
);
}
}

/// Request chunks for an orphan block.
/// `epoch_id`: epoch_id of the orphan block
/// `ancestor_hash`: because BlockInfo for the immediate parent of an orphan block is not ready,
/// we use hash of an ancestor block of this orphan to request chunks. It must
/// satisfy
/// 1) it is from the same epoch than `epoch_id`
/// 2) it is processed
/// If the above conditions are not met, the request will be dropped
pub fn request_chunks_for_orphan<T>(
&mut self,
chunks_to_request: T,
epoch_id: &EpochId,
ancestor_hash: CryptoHash,
header_head: &Tip,
) where
T: IntoIterator<Item = ShardChunkHeader>,
{
let ancestor_epoch_id =
unwrap_or_return!(self.runtime_adapter.get_epoch_id_from_prev_block(&ancestor_hash));
if epoch_id != &ancestor_epoch_id {
return;
}

let should_wait_for_chunk_forwarding =
self.should_wait_for_chunk_forwarding(&ancestor_hash).unwrap_or_else(|_| {
// ancestor_hash must be accepted because we don't request missing chunks through this
// this function for orphans
debug_assert!(false, "{:?} must be accepted", ancestor_hash);
error!(target:"chunks", "requesting chunks for orphan whose ancestor_hash {:?} is not accepted", ancestor_hash);
false
});
for chunk_header in chunks_to_request {
self.request_chunk_single(
&chunk_header,
ancestor_hash,
Some(header_head),
should_wait_for_chunk_forwarding,
)
}
}

Expand All @@ -732,16 +785,16 @@ impl ShardsManager {
let requests = self.requested_partial_encoded_chunks.fetch();
for (chunk_hash, chunk_request) in requests {
let fetch_from_archival = self.runtime_adapter
.chunk_needs_to_be_fetched_from_archival(&chunk_request.parent_hash, &header_head.last_block_hash).unwrap_or_else(|err| {
.chunk_needs_to_be_fetched_from_archival(&chunk_request.ancestor_hash, &header_head.last_block_hash).unwrap_or_else(|err| {
error!(target: "chunks", "Error during re-requesting partial encoded chunk. Cannot determine whether to request from an archival node, defaulting to not: {}", err);
false
});
let old_block = header_head.last_block_hash != chunk_request.parent_hash
&& header_head.prev_block_hash != chunk_request.parent_hash;
let old_block = header_head.last_block_hash != chunk_request.prev_block_hash
&& header_head.prev_block_hash != chunk_request.prev_block_hash;

match self.request_partial_encoded_chunk(
chunk_request.height,
&chunk_request.parent_hash,
&chunk_request.ancestor_hash,
chunk_request.shard_id,
&chunk_hash,
chunk_request.added.elapsed()
Expand Down Expand Up @@ -1824,7 +1877,8 @@ mod test {
ChunkHash(hash(&[1])),
ChunkRequestInfo {
height: 0,
parent_hash: Default::default(),
ancestor_hash: Default::default(),
prev_block_hash: Default::default(),
shard_id: 0,
added: added,
last_requested: added,
Expand Down Expand Up @@ -1904,7 +1958,8 @@ mod test {
header.chunk_hash(),
ChunkRequestInfo {
height: header.height_created(),
parent_hash: header.prev_block_hash(),
ancestor_hash: header.prev_block_hash(),
prev_block_hash: header.prev_block_hash(),
shard_id: header.shard_id(),
last_requested: Clock::instant(),
added: Clock::instant(),
Expand Down Expand Up @@ -2114,7 +2169,7 @@ mod test {
};
shards_manager.request_chunks(
vec![fixture.mock_chunk_header.clone()],
&fixture.mock_chunk_header.prev_block_hash(),
fixture.mock_chunk_header.prev_block_hash(),
&header_head,
);
assert!(shards_manager
Expand All @@ -2135,7 +2190,7 @@ mod test {
);
shards_manager.request_chunks(
vec![fixture.mock_chunk_header.clone()],
&fixture.mock_chunk_header.prev_block_hash(),
fixture.mock_chunk_header.prev_block_hash(),
&header_head,
);
assert!(shards_manager
Expand All @@ -2156,7 +2211,7 @@ mod test {
);
shards_manager.request_chunks(
vec![fixture.mock_chunk_header.clone()],
&fixture.mock_chunk_header.prev_block_hash(),
fixture.mock_chunk_header.prev_block_hash(),
&header_head,
);
assert!(shards_manager
Expand Down
Loading

0 comments on commit 64b875b

Please sign in to comment.