Skip to content

Commit

Permalink
Fixing chunks retrieval for non-validators
Browse files Browse the repository at this point in the history
1. Properly fixing the case when a node is not a validator and doesn't need to request any chunks.
2. Allow fetching chunks that are outside of the cache horizon, and only use the horizon for unknown chunks.
  • Loading branch information
ilblackdragon authored and SkidanovAlex committed Nov 7, 2019
1 parent 5bf4f66 commit b5f1423
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 71 deletions.
45 changes: 43 additions & 2 deletions chain/chain/src/chain.rs
Expand Up @@ -1659,6 +1659,12 @@ impl Chain {
pub fn is_orphan(&self, hash: &CryptoHash) -> bool {
self.orphans.contains(hash)
}

/// Check if hash is for a known chunk orphan.
#[inline]
pub fn is_chunk_orphan(&self, hash: &CryptoHash) -> bool {
self.blocks_with_missing_chunks.contains(hash)
}
}

/// Chain update helper, contains information that is needed to process block
Expand Down Expand Up @@ -1723,12 +1729,40 @@ impl<'a> ChainUpdate<'a> {
})
}

fn care_about_any_shard_or_part(
&mut self,
me: &Option<AccountId>,
parent_hash: CryptoHash,
) -> Result<bool, Error> {
for shard_id in 0..self.runtime_adapter.num_shards() {
if self.runtime_adapter.cares_about_shard(me.as_ref(), &parent_hash, shard_id, true)
|| self.runtime_adapter.will_care_about_shard(
me.as_ref(),
&parent_hash,
shard_id,
true,
)
{
return Ok(true);
}
}
for part_id in 0..self.runtime_adapter.num_total_parts(&parent_hash) {
if &Some(self.runtime_adapter.get_part_owner(&parent_hash, part_id as u64)?) == me {
return Ok(true);
}
}
Ok(false)
}

pub fn ping_missing_chunks(
&mut self,
me: &Option<AccountId>,
parent_hash: CryptoHash,
block: &Block,
) -> Result<(), Error> {
if !self.care_about_any_shard_or_part(me, parent_hash)? {
return Ok(());
}
let mut missing = vec![];
let height = block.header.inner.height;
for (shard_id, chunk_header) in block.chunks.iter().enumerate() {
Expand Down Expand Up @@ -1775,7 +1809,14 @@ impl<'a> ChainUpdate<'a> {
Ok(())
}

pub fn save_incoming_receipts_from_block(&mut self, block: &Block) -> Result<(), Error> {
pub fn save_incoming_receipts_from_block(
&mut self,
me: &Option<AccountId>,
block: &Block,
) -> Result<(), Error> {
if !self.care_about_any_shard_or_part(me, block.header.inner.prev_hash)? {
return Ok(());
}
let height = block.header.inner.height;
let mut receipt_proofs_by_shard_id = HashMap::new();

Expand Down Expand Up @@ -2102,7 +2143,7 @@ impl<'a> ChainUpdate<'a> {
let prev_block = self.chain_store_update.get_block(&prev_hash)?.clone();

self.ping_missing_chunks(me, prev_hash, &block)?;
self.save_incoming_receipts_from_block(&block)?;
self.save_incoming_receipts_from_block(me, &block)?;

// Do basic validation of chunks before applying the transactions
for (chunk_header, prev_chunk_header) in block.chunks.iter().zip(prev_block.chunks.iter()) {
Expand Down
41 changes: 27 additions & 14 deletions chain/chunks/src/chunk_cache.rs
Expand Up @@ -75,24 +75,22 @@ impl EncodedChunksCache {
})
}

pub fn height_within_horizon(&self, height: BlockIndex) -> bool {
if height + HEIGHT_HORIZON < self.largest_seen_height {
false
} else if height > self.largest_seen_height + MAX_HEIGHTS_AHEAD {
false
} else {
true
}
}

pub fn merge_in_partial_encoded_chunk(
&mut self,
partial_encoded_chunk: &PartialEncodedChunk,
) -> bool {
let chunk_hash = partial_encoded_chunk.chunk_hash.clone();
if self.encoded_chunks.contains_key(&chunk_hash) || partial_encoded_chunk.header.is_some() {
if let Some(header) = &partial_encoded_chunk.header {
let height = header.inner.height_created;

if height + HEIGHT_HORIZON < self.largest_seen_height {
return false;
}

if height > self.largest_seen_height + MAX_HEIGHTS_AHEAD {
return false;
}
}

self.get_or_insert_from_header(chunk_hash, partial_encoded_chunk.header.as_ref())
.merge_in_partial_encoded_chunk(&partial_encoded_chunk);
return true;
Expand All @@ -101,15 +99,30 @@ impl EncodedChunksCache {
}
}

pub fn update_largest_seen_height(&mut self, new_height: BlockIndex) {
pub fn remove_from_cache_if_outside_horizon(&mut self, chunk_hash: &ChunkHash) {
if let Some(entry) = self.encoded_chunks.get(chunk_hash) {
let height = entry.header.inner.height_created;
if !self.height_within_horizon(height) {
self.encoded_chunks.remove(chunk_hash);
}
}
}

pub fn update_largest_seen_height<T>(
&mut self,
new_height: BlockIndex,
requested_chunks: &HashMap<ChunkHash, T>,
) {
let old_largest_seen_height = self.largest_seen_height;
self.largest_seen_height = new_height;
for height in old_largest_seen_height.saturating_sub(HEIGHT_HORIZON)
..self.largest_seen_height.saturating_sub(HEIGHT_HORIZON)
{
if let Some(chunks_to_remove) = self.height_map.remove(&height) {
for chunk_hash in chunks_to_remove {
self.encoded_chunks.remove(&chunk_hash);
if !requested_chunks.contains_key(&chunk_hash) {
self.encoded_chunks.remove(&chunk_hash);
}
}
}
}
Expand Down
77 changes: 40 additions & 37 deletions chain/chunks/src/lib.rs
Expand Up @@ -37,7 +37,7 @@ mod types;

const CHUNK_REQUEST_RETRY_MS: u64 = 100;
const CHUNK_REQUEST_SWITCH_TO_OTHERS_MS: u64 = 400;
const CHUNK_REQUEST_SWITCH_TO_FULL_FETCH_MS: u64 = 3000;
const CHUNK_REQUEST_SWITCH_TO_FULL_FETCH_MS: u64 = 3_000;
const CHUNK_REQUEST_RETRY_MAX_MS: u64 = 100_000;

/// Adapter to break dependency of sub-components on the network requests.
Expand Down Expand Up @@ -71,6 +71,7 @@ pub enum ChunkStatus {
Invalid,
}

#[derive(Debug)]
pub enum ProcessPartialEncodedChunkResult {
Known,
/// The CryptoHash is the previous block hash (which might be unknown to the caller) to start
Expand All @@ -81,7 +82,7 @@ pub enum ProcessPartialEncodedChunkResult {
NeedMoreOnePartsOrReceipts(ShardChunkHeader),
}

#[derive(Clone)]
#[derive(Clone, Debug)]
struct ChunkRequestInfo {
height: BlockIndex,
parent_hash: CryptoHash,
Expand Down Expand Up @@ -183,7 +184,10 @@ impl ShardsManager {
}

pub fn update_largest_seen_height(&mut self, new_height: BlockIndex) {
self.encoded_chunks.update_largest_seen_height(new_height);
self.encoded_chunks.update_largest_seen_height(
new_height,
&self.requested_partial_encoded_chunks.requests,
);
}

pub fn prepare_transactions(
Expand Down Expand Up @@ -406,7 +410,7 @@ impl ShardsManager {
) {
Ok(()) => {}
Err(err) => {
error!(target: "client", "Error during requesting partial encoded chunk: {}", err);
error!(target: "chunks", "Error during requesting partial encoded chunk: {}", err);
}
}
}
Expand Down Expand Up @@ -619,6 +623,14 @@ impl ShardsManager {
}
};

let chunk_requested =
self.requested_partial_encoded_chunks.contains_key(&header.chunk_hash());
if !chunk_requested
&& !self.encoded_chunks.height_within_horizon(header.inner.height_created)
{
return Err(Error::ChainError(ErrorKind::InvalidChunkHeight.into()));
}

if header.chunk_hash() != chunk_hash
|| header.inner.shard_id != partial_encoded_chunk.shard_id
{
Expand All @@ -634,31 +646,23 @@ impl ShardsManager {
}
}

let (had_all_parts, had_all_receipts) =
if let Some(entry) = self.encoded_chunks.get(&chunk_hash) {
let know_all_parts = partial_encoded_chunk
.parts
.iter()
.all(|part_entry| entry.parts.contains_key(&part_entry.part_ord));
if let Some(entry) = self.encoded_chunks.get(&chunk_hash) {
let know_all_parts = partial_encoded_chunk
.parts
.iter()
.all(|part_entry| entry.parts.contains_key(&part_entry.part_ord));

if know_all_parts {
let know_all_receipts = partial_encoded_chunk
.receipts
.iter()
.all(|receipt| entry.receipts.contains_key(&receipt.1.to_shard_id));
if know_all_parts {
let know_all_receipts = partial_encoded_chunk
.receipts
.iter()
.all(|receipt| entry.receipts.contains_key(&receipt.1.to_shard_id));

if know_all_receipts {
return Ok(ProcessPartialEncodedChunkResult::Known);
}
if know_all_receipts {
return Ok(ProcessPartialEncodedChunkResult::Known);
}

(
self.has_all_parts(&prev_block_hash, entry)?,
self.has_all_receipts(&prev_block_hash, entry)?,
)
} else {
(false, false)
};
}
};

if !self.runtime_adapter.verify_chunk_header_signature(&header)? {
byzantine_assert!(false);
Expand Down Expand Up @@ -700,7 +704,8 @@ impl ShardsManager {
}

if !self.encoded_chunks.merge_in_partial_encoded_chunk(&partial_encoded_chunk) {
return Err(Error::ChainError(ErrorKind::InvalidChunkHeight.into()));
// It only returns false if a header can't be fetched
assert!(false);
}

let entry = self.encoded_chunks.get(&chunk_hash).unwrap();
Expand All @@ -726,9 +731,9 @@ impl ShardsManager {
true,
);

if !had_all_parts || !had_all_receipts {
if let Err(_) = chain_store.get_partial_chunk(&header.chunk_hash()) {
let mut store_update = chain_store.store_update();
self.persist_partial_chunk_for_data_availability(entry, &mut store_update)?;
self.persist_partial_chunk_for_data_availability(entry, &mut store_update);
store_update.commit()?;
}

Expand All @@ -737,6 +742,7 @@ impl ShardsManager {
// If we do care about the shard, we will remove the request once the full chunk is
// assembled.
if !cares_about_shard {
self.encoded_chunks.remove_from_cache_if_outside_horizon(&chunk_hash);
self.requested_partial_encoded_chunks.remove(&chunk_hash);
return Ok(ProcessPartialEncodedChunkResult::HaveAllPartsAndReceipts(
prev_block_hash,
Expand All @@ -762,6 +768,7 @@ impl ShardsManager {

assert!(successfully_decoded);

self.encoded_chunks.remove_from_cache_if_outside_horizon(&chunk_hash);
self.requested_partial_encoded_chunks.remove(&chunk_hash);
return Ok(ProcessPartialEncodedChunkResult::HaveAllPartsAndReceipts(prev_block_hash));
}
Expand Down Expand Up @@ -862,7 +869,7 @@ impl ShardsManager {
&self,
chunk_entry: &EncodedChunksCacheEntry,
store_update: &mut ChainStoreUpdate,
) -> Result<(), Error> {
) {
let prev_block_hash = chunk_entry.header.inner.prev_block_hash;
let partial_chunk = PartialEncodedChunk {
shard_id: chunk_entry.header.inner.shard_id,
Expand Down Expand Up @@ -897,8 +904,6 @@ impl ShardsManager {
};

store_update.save_partial_chunk(&chunk_entry.header.chunk_hash().clone(), partial_chunk);

Ok(())
}

pub fn decode_and_persist_encoded_chunk(
Expand Down Expand Up @@ -929,7 +934,7 @@ impl ShardsManager {
merkle_paths,
&shard_chunk.receipts,
&mut store_update,
)?;
);

// Decoded a valid chunk, store it in the permanent store
store_update.save_chunk(&chunk_hash, shard_chunk);
Expand All @@ -955,7 +960,7 @@ impl ShardsManager {
merkle_paths: Vec<MerklePath>,
outgoing_receipts: &Vec<Receipt>,
store_update: &mut ChainStoreUpdate,
) -> Result<(), Error> {
) {
let shard_id = encoded_chunk.header.inner.shard_id;
let outgoing_receipts_hashes =
self.runtime_adapter.build_receipts_hashes(outgoing_receipts).unwrap();
Expand Down Expand Up @@ -992,12 +997,10 @@ impl ShardsManager {
};

// Save the partial chunk for data availability
self.persist_partial_chunk_for_data_availability(&cache_entry, store_update)?;
self.persist_partial_chunk_for_data_availability(&cache_entry, store_update);

// Save this chunk into encoded_chunks.
self.encoded_chunks.insert(cache_entry.header.chunk_hash().clone(), cache_entry);

Ok(())
}

pub fn distribute_encoded_chunk(
Expand Down
15 changes: 0 additions & 15 deletions chain/client/src/client.rs
Expand Up @@ -479,21 +479,6 @@ impl Client {
let blocks_missing_chunks = Arc::new(RwLock::new(vec![]));
let challenges = Arc::new(RwLock::new(vec![]));

for chunk_header in block.chunks.iter() {
// Process empty partial encoded chunks, to persist those for which no parts/receipts
// are needed
let partial_encoded_chunk = PartialEncodedChunk {
shard_id: chunk_header.inner.shard_id,
chunk_hash: chunk_header.chunk_hash().clone(),
header: Some(chunk_header.clone()),
receipts: vec![],
parts: vec![],
};
let _ = self
.shards_mgr
.process_partial_encoded_chunk(partial_encoded_chunk, self.chain.mut_store());
}

let result = {
let me = self
.block_producer
Expand Down
3 changes: 2 additions & 1 deletion chain/client/src/client_actor.rs
Expand Up @@ -761,6 +761,7 @@ impl ClientActor {
}
near_chain::ErrorKind::ChunksMissing(missing_chunks) => {
debug!(
target: "client",
"Chunks were missing for block {}, I'm {:?}, requesting. Missing: {:?}, ({:?})",
hash.clone(),
self.client.block_producer.as_ref().map(|bp| bp.account_id.clone()),
Expand All @@ -771,7 +772,7 @@ impl ClientActor {
NetworkClientResponses::NoResponse
}
_ => {
debug!("Process block: block {} refused by chain: {}", hash, e.kind());
debug!(target: "client", "Process block: block {} refused by chain: {}", hash, e.kind());
NetworkClientResponses::NoResponse
}
},
Expand Down
4 changes: 3 additions & 1 deletion chain/client/src/sync.rs
Expand Up @@ -327,7 +327,9 @@ impl BlockSync {

let hashes_to_request = hashes
.iter()
.filter(|x| !chain.get_block(x).is_ok() && !chain.is_orphan(x))
.filter(|x| {
!chain.get_block(x).is_ok() && !chain.is_orphan(x) && !chain.is_chunk_orphan(x)
})
.take(block_count)
.collect::<Vec<_>>();
if hashes_to_request.len() > 0 {
Expand Down

0 comments on commit b5f1423

Please sign in to comment.