Skip to content

Commit

Permalink
fix: add epoch_id to the key identifying chunk state witness (#11309)
Browse files Browse the repository at this point in the history
Currently both partial witness tracker and orphan witness pool use
`(height_created, shard_id)` pair as LRU cache key when tracking
received state witnesses. This is not entirely correct: different chunk
producers can produce a chunk for a given `(height_created, shard_id)`
in different epochs. For example that could happen with a fork on epoch
boundary when some validators keep producing chunks for the old epoch
while other have already transitioned to the new one.
This PR introduces new struct `ChunkProductionKey` to be used as a key
for tracking chunk state witnesses.
  • Loading branch information
pugachAG committed May 17, 2024
1 parent e273c41 commit 8635fa0
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use lru::LruCache;
use near_chain_configs::default_orphan_state_witness_pool_size;
use near_primitives::hash::CryptoHash;
use near_primitives::stateless_validation::ChunkStateWitness;
use near_primitives::types::{BlockHeight, ShardId};
use near_primitives::stateless_validation::{ChunkProductionKey, ChunkStateWitness};
use near_primitives::types::BlockHeight;

use metrics_tracker::OrphanWitnessMetricsTracker;

Expand All @@ -11,7 +11,7 @@ use metrics_tracker::OrphanWitnessMetricsTracker;
/// shows up before the block is available. In such cases the witness is put in `OrphanStateWitnessPool` until the
/// required block arrives and the witness can be processed.
pub struct OrphanStateWitnessPool {
witness_cache: LruCache<(ShardId, BlockHeight), CacheEntry>,
witness_cache: LruCache<ChunkProductionKey, CacheEntry>,
}

struct CacheEntry {
Expand All @@ -37,13 +37,12 @@ impl OrphanStateWitnessPool {
/// Add an orphaned chunk state witness to the pool. The witness will be put in a cache and it'll
/// wait there for the block that's required to process it.
/// It's expected that this `ChunkStateWitness` has gone through basic validation - including signature,
/// shard_id, size and distance from the tip. The pool would still work without it, but without validation
/// it'd be possible to fill the whole cache with spam.
/// shard_id, size, epoch_id and distance from the tip. The pool would still work without it, but without
/// validation it'd be possible to fill the whole cache with spam.
/// `witness_size` is only used for metrics, it's okay to pass 0 if you don't care about the metrics.
pub fn add_orphan_state_witness(&mut self, witness: ChunkStateWitness, witness_size: usize) {
// Insert the new ChunkStateWitness into the cache
let chunk_header = &witness.chunk_header;
let cache_key = (chunk_header.shard_id(), chunk_header.height_created());
let cache_key = witness.chunk_production_key();
let metrics_tracker = OrphanWitnessMetricsTracker::new(&witness, witness_size);
let cache_entry = CacheEntry { witness, _metrics_tracker: metrics_tracker };
if let Some((_, ejected_entry)) = self.witness_cache.push(cache_key, cache_entry) {
Expand All @@ -66,10 +65,10 @@ impl OrphanStateWitnessPool {
&mut self,
prev_block: &CryptoHash,
) -> Vec<ChunkStateWitness> {
let mut to_remove: Vec<(ShardId, BlockHeight)> = Vec::new();
let mut to_remove: Vec<ChunkProductionKey> = Vec::new();
for (cache_key, cache_entry) in self.witness_cache.iter() {
if cache_entry.witness.chunk_header.prev_block_hash() == prev_block {
to_remove.push(*cache_key);
to_remove.push(cache_key.clone());
}
}
let mut result = Vec::new();
Expand All @@ -87,16 +86,17 @@ impl OrphanStateWitnessPool {
/// Orphan witnesses below the final height of the chain won't be needed anymore,
/// so they can be removed from the pool to free up memory.
pub fn remove_witnesses_below_final_height(&mut self, final_height: BlockHeight) {
let mut to_remove: Vec<(ShardId, BlockHeight)> = Vec::new();
for ((witness_shard, witness_height), cache_entry) in self.witness_cache.iter() {
if *witness_height <= final_height {
to_remove.push((*witness_shard, *witness_height));
let mut to_remove: Vec<ChunkProductionKey> = Vec::new();
for (cache_key, cache_entry) in self.witness_cache.iter() {
let witness_height = cache_key.height_created;
if witness_height <= final_height {
to_remove.push(cache_key.clone());
let header = &cache_entry.witness.chunk_header;
tracing::debug!(
target: "client",
final_height,
ejected_witness_height = *witness_height,
ejected_witness_shard = *witness_shard,
ejected_witness_height = witness_height,
ejected_witness_shard = cache_key.shard_id,
ejected_witness_chunk = ?header.chunk_hash(),
ejected_witness_prev_block = ?header.prev_block_hash(),
"Ejecting an orphaned ChunkStateWitness from the cache because it's below \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,10 @@ use near_chain::chain::ProcessChunkStateWitnessMessage;
use near_chain::Error;
use near_epoch_manager::EpochManagerAdapter;
use near_primitives::reed_solomon::reed_solomon_decode;
use near_primitives::stateless_validation::{EncodedChunkStateWitness, PartialEncodedStateWitness};
use near_primitives::types::{BlockHeight, ShardId};
use near_primitives::stateless_validation::{
ChunkProductionKey, EncodedChunkStateWitness, PartialEncodedStateWitness,
};
use near_primitives::types::ShardId;
use reed_solomon_erasure::galois_8::ReedSolomon;

use crate::client_actor::ClientSenderForPartialWitness;
Expand Down Expand Up @@ -160,7 +162,7 @@ pub struct PartialEncodedStateWitnessTracker {
/// Epoch manager to get the set of chunk validators
epoch_manager: Arc<dyn EpochManagerAdapter>,
/// Keeps track of state witness parts received from chunk producers.
parts_cache: LruCache<(ShardId, BlockHeight), CacheEntry>,
parts_cache: LruCache<ChunkProductionKey, CacheEntry>,
/// Reed Solomon encoder for decoding state witness parts.
rs_map: RsMap,
}
Expand All @@ -186,7 +188,7 @@ impl PartialEncodedStateWitnessTracker {

self.maybe_insert_new_entry_in_parts_cache(&partial_witness)?;

let key = (partial_witness.shard_id(), partial_witness.height_created());
let key = partial_witness.chunk_production_key();
let entry = self.parts_cache.get_mut(&key).unwrap();

if let Some(encoded_witness) = entry.insert_in_cache_entry(partial_witness) {
Expand Down Expand Up @@ -222,7 +224,7 @@ impl PartialEncodedStateWitnessTracker {
partial_witness: &PartialEncodedStateWitness,
) -> Result<(), Error> {
// Insert a new entry into the cache for the chunk hash.
let key = (partial_witness.shard_id(), partial_witness.height_created());
let key = partial_witness.chunk_production_key();
if self.parts_cache.contains(&key) {
return Ok(());
}
Expand Down
25 changes: 25 additions & 0 deletions core/primitives/src/stateless_validation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ impl PartialEncodedStateWitness {
Self { inner, signature }
}

pub fn chunk_production_key(&self) -> ChunkProductionKey {
ChunkProductionKey {
shard_id: self.shard_id(),
epoch_id: self.epoch_id().clone(),
height_created: self.height_created(),
}
}

pub fn verify(&self, public_key: &PublicKey) -> bool {
let data = borsh::to_vec(&self.inner).unwrap();
self.signature.verify(&data, public_key)
Expand Down Expand Up @@ -342,6 +350,14 @@ impl ChunkStateWitness {
}
}

pub fn chunk_production_key(&self) -> ChunkProductionKey {
ChunkProductionKey {
shard_id: self.chunk_header.shard_id(),
epoch_id: self.epoch_id.clone(),
height_created: self.chunk_header.height_created(),
}
}

pub fn new_dummy(height: BlockHeight, shard_id: ShardId, prev_block_hash: CryptoHash) -> Self {
let header = ShardChunkHeader::V3(ShardChunkHeaderV3::new(
PROTOCOL_VERSION,
Expand Down Expand Up @@ -477,6 +493,15 @@ impl EndorsementStats {
}
}

/// This struct contains combination of fields that uniquely identify chunk production.
/// It means that for a given instance only one chunk could be produced.
#[derive(Debug, Hash, PartialEq, Eq, Clone)]
pub struct ChunkProductionKey {
pub shard_id: ShardId,
pub epoch_id: EpochId,
pub height_created: BlockHeight,
}

#[derive(Debug, Default)]
pub struct ChunkValidatorAssignments {
assignments: Vec<(AccountId, Balance)>,
Expand Down

0 comments on commit 8635fa0

Please sign in to comment.