Skip to content

Commit

Permalink
Optimise snapshot cache for late blocks (#2832)
Browse files Browse the repository at this point in the history
## Proposed Changes

In the event of a late block, keep the block in the snapshot cache by cloning it. This helps us process new blocks quickly in the event the late block was re-org'd.


Co-authored-by: Michael Sproul <michael@sigmaprime.io>
  • Loading branch information
macladson and michaelsproul committed Dec 6, 2021
1 parent b5f2764 commit a7a7edb
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 9 deletions.
47 changes: 44 additions & 3 deletions beacon_node/beacon_chain/src/block_verification.rs
Expand Up @@ -70,6 +70,7 @@ use state_processing::{
use std::borrow::Cow;
use std::fs;
use std::io::Write;
use std::time::Duration;
use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp};
use tree_hash::TreeHash;
use types::{
Expand Down Expand Up @@ -1418,6 +1419,8 @@ fn load_parent<T: BeaconChainTypes>(
),
BlockError<T::EthSpec>,
> {
let spec = &chain.spec;

// Reject any block if its parent is not known to fork choice.
//
// A block that is not in fork choice is either:
Expand All @@ -1436,15 +1439,43 @@ fn load_parent<T: BeaconChainTypes>(
return Err(BlockError::ParentUnknown(Box::new(block)));
}

let block_delay = chain
.block_times_cache
.read()
.get_block_delays(
block.canonical_root(),
chain
.slot_clock
.start_of(block.slot())
.unwrap_or_else(|| Duration::from_secs(0)),
)
.observed;

let db_read_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_READ);

let result = if let Some(snapshot) = chain
let result = if let Some((snapshot, cloned)) = chain
.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|mut snapshot_cache| {
snapshot_cache.get_state_for_block_processing(block.parent_root())
snapshot_cache.get_state_for_block_processing(
block.parent_root(),
block.slot(),
block_delay,
spec,
)
}) {
Ok((snapshot.into_pre_state(), block))
if cloned {
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_CLONES);
debug!(
chain.log,
"Cloned snapshot for late block/skipped slot";
"slot" => %block.slot(),
"parent_slot" => %snapshot.beacon_block.slot(),
"parent_root" => ?block.parent_root(),
"block_delay" => ?block_delay,
);
}
Ok((snapshot, block))
} else {
// Load the blocks parent block from the database, returning invalid if that block is not
// found.
Expand Down Expand Up @@ -1474,6 +1505,16 @@ fn load_parent<T: BeaconChainTypes>(
BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root))
})?;

metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES);
debug!(
chain.log,
"Missed snapshot cache";
"slot" => block.slot(),
"parent_slot" => parent_block.slot(),
"parent_root" => ?block.parent_root(),
"block_delay" => ?block_delay,
);

Ok((
PreProcessingSnapshot {
beacon_block: parent_block,
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Expand Up @@ -18,6 +18,14 @@ lazy_static! {
"beacon_block_processing_successes_total",
"Count of blocks processed without error"
);
pub static ref BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES: Result<IntCounter> = try_create_int_counter(
"beacon_block_processing_snapshot_cache_misses",
"Count of snapshot cache misses"
);
pub static ref BLOCK_PROCESSING_SNAPSHOT_CACHE_CLONES: Result<IntCounter> = try_create_int_counter(
"beacon_block_processing_snapshot_cache_clones",
"Count of snapshot cache clones"
);
pub static ref BLOCK_PROCESSING_TIMES: Result<Histogram> =
try_create_histogram("beacon_block_processing_seconds", "Full runtime of block processing");
pub static ref BLOCK_PROCESSING_BLOCK_ROOT: Result<Histogram> = try_create_histogram(
Expand Down
77 changes: 71 additions & 6 deletions beacon_node/beacon_chain/src/snapshot_cache.rs
@@ -1,12 +1,18 @@
use crate::BeaconSnapshot;
use std::cmp;
use std::time::Duration;
use types::{
beacon_state::CloneConfig, BeaconState, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
beacon_state::CloneConfig, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock,
Slot,
};

/// The default size of the cache.
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;

/// The minimum block delay to clone the state in the cache instead of removing it.
/// This helps keep block processing fast during re-orgs from late blocks.
const MINIMUM_BLOCK_DELAY_FOR_CLONE: Duration = Duration::from_secs(6);

/// This snapshot is to be used for verifying a child of `self.beacon_block`.
#[derive(Debug)]
pub struct PreProcessingSnapshot<T: EthSpec> {
Expand Down Expand Up @@ -62,6 +68,22 @@ impl<T: EthSpec> CacheItem<T> {
beacon_state_root,
}
}

pub fn clone_as_pre_state(&self) -> PreProcessingSnapshot<T> {
// Do not include the beacon state root if the state has been advanced.
let beacon_state_root =
Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none());

PreProcessingSnapshot {
beacon_block: self.beacon_block.clone(),
beacon_block_root: self.beacon_block_root,
pre_state: self
.pre_state
.as_ref()
.map_or_else(|| self.beacon_state.clone(), |pre_state| pre_state.clone()),
beacon_state_root,
}
}
}

/// The information required for block production.
Expand Down Expand Up @@ -178,11 +200,36 @@ impl<T: EthSpec> SnapshotCache<T> {
/// If available, returns a `CacheItem` that should be used for importing/processing a block.
/// The method will remove the block from `self`, carrying across any caches that may or may not
/// be built.
pub fn get_state_for_block_processing(&mut self, block_root: Hash256) -> Option<CacheItem<T>> {
///
/// In the event the block being processed was observed late, clone the cache instead of
/// moving it. This allows us to process the next block quickly in the case of a re-org.
/// Additionally, if the slot was skipped, clone the cache. This ensures blocks that are
/// later than 1 slot still have access to the cache and can be processed quickly.
pub fn get_state_for_block_processing(
&mut self,
block_root: Hash256,
block_slot: Slot,
block_delay: Option<Duration>,
spec: &ChainSpec,
) -> Option<(PreProcessingSnapshot<T>, bool)> {
self.snapshots
.iter()
.position(|snapshot| snapshot.beacon_block_root == block_root)
.map(|i| self.snapshots.remove(i))
.map(|i| {
if let Some(cache) = self.snapshots.get(i) {
if block_slot > cache.beacon_block.slot() + 1 {
return (cache.clone_as_pre_state(), true);
}
if let Some(delay) = block_delay {
if delay >= MINIMUM_BLOCK_DELAY_FOR_CLONE
&& delay <= Duration::from_secs(spec.seconds_per_slot) * 4
{
return (cache.clone_as_pre_state(), true);
}
}
}
(self.snapshots.remove(i).into_pre_state(), false)
})
}

/// If available, obtains a clone of a `BeaconState` that should be used for block production.
Expand Down Expand Up @@ -320,6 +367,7 @@ mod test {

#[test]
fn insert_get_prune_update() {
let spec = MainnetEthSpec::default_spec();
let mut cache = SnapshotCache::new(CACHE_SIZE, get_snapshot(0));

// Insert a bunch of entries in the cache. It should look like this:
Expand Down Expand Up @@ -359,7 +407,12 @@ mod test {

assert!(
cache
.get_state_for_block_processing(Hash256::from_low_u64_be(1))
.get_state_for_block_processing(
Hash256::from_low_u64_be(1),
Slot::new(0),
None,
&spec
)
.is_none(),
"the snapshot with the lowest slot should have been removed during the insert function"
);
Expand All @@ -377,8 +430,14 @@ mod test {
);
assert_eq!(
cache
.get_state_for_block_processing(Hash256::from_low_u64_be(0))
.get_state_for_block_processing(
Hash256::from_low_u64_be(0),
Slot::new(0),
None,
&spec
)
.expect("the head should still be in the cache")
.0
.beacon_block_root,
Hash256::from_low_u64_be(0),
"get_state_for_block_processing should get the correct snapshot"
Expand Down Expand Up @@ -409,8 +468,14 @@ mod test {
// Ensure that the new head value was not removed from the cache.
assert_eq!(
cache
.get_state_for_block_processing(Hash256::from_low_u64_be(2))
.get_state_for_block_processing(
Hash256::from_low_u64_be(2),
Slot::new(0),
None,
&spec
)
.expect("the new head should still be in the cache")
.0
.beacon_block_root,
Hash256::from_low_u64_be(2),
"get_state_for_block_processing should get the correct snapshot"
Expand Down

0 comments on commit a7a7edb

Please sign in to comment.