Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - Optimise snapshot cache for late blocks #2832

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
47 changes: 44 additions & 3 deletions beacon_node/beacon_chain/src/block_verification.rs
Expand Up @@ -65,6 +65,7 @@ use state_processing::{
use std::borrow::Cow;
use std::fs;
use std::io::Write;
use std::time::Duration;
use store::{Error as DBError, HotColdDB, HotStateSummary, KeyValueStore, StoreOp};
use tree_hash::TreeHash;
use types::{
Expand Down Expand Up @@ -1281,6 +1282,8 @@ fn load_parent<T: BeaconChainTypes>(
),
BlockError<T::EthSpec>,
> {
let spec = &chain.spec;

// Reject any block if its parent is not known to fork choice.
//
// A block that is not in fork choice is either:
Expand All @@ -1299,15 +1302,43 @@ fn load_parent<T: BeaconChainTypes>(
return Err(BlockError::ParentUnknown(Box::new(block)));
}

let block_delay = chain
.block_times_cache
.read()
.get_block_delays(
block.canonical_root(),
chain
.slot_clock
.start_of(block.slot())
.unwrap_or_else(|| Duration::from_secs(0)),
)
.observed;

let db_read_timer = metrics::start_timer(&metrics::BLOCK_PROCESSING_DB_READ);

let result = if let Some(snapshot) = chain
let result = if let Some((snapshot, cloned)) = chain
.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|mut snapshot_cache| {
snapshot_cache.get_state_for_block_processing(block.parent_root())
snapshot_cache.get_state_for_block_processing(
block.parent_root(),
block.slot(),
block_delay,
spec,
)
}) {
Ok((snapshot.into_pre_state(), block))
if cloned {
metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_CLONES);
debug!(
chain.log,
"Cloned snapshot for late block/skipped slot";
"slot" => %block.slot(),
"parent_slot" => %snapshot.beacon_block.slot(),
"parent_root" => ?block.parent_root(),
"block_delay" => ?block_delay,
);
}
Ok((snapshot, block))
} else {
// Load the blocks parent block from the database, returning invalid if that block is not
// found.
Expand Down Expand Up @@ -1337,6 +1368,16 @@ fn load_parent<T: BeaconChainTypes>(
BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root))
})?;

metrics::inc_counter(&metrics::BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES);
debug!(
chain.log,
"Missed snapshot cache";
"slot" => block.slot(),
"parent_slot" => parent_block.slot(),
"parent_root" => ?block.parent_root(),
"block_delay" => ?block_delay,
);

Ok((
PreProcessingSnapshot {
beacon_block: parent_block,
Expand Down
8 changes: 8 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Expand Up @@ -18,6 +18,14 @@ lazy_static! {
"beacon_block_processing_successes_total",
"Count of blocks processed without error"
);
pub static ref BLOCK_PROCESSING_SNAPSHOT_CACHE_MISSES: Result<IntCounter> = try_create_int_counter(
"beacon_block_processing_snapshot_cache_misses",
"Count of snapshot cache misses"
);
pub static ref BLOCK_PROCESSING_SNAPSHOT_CACHE_CLONES: Result<IntCounter> = try_create_int_counter(
"beacon_block_processing_snapshot_cache_clones",
"Count of snapshot cache clones"
);
pub static ref BLOCK_PROCESSING_TIMES: Result<Histogram> =
try_create_histogram("beacon_block_processing_seconds", "Full runtime of block processing");
pub static ref BLOCK_PROCESSING_BLOCK_ROOT: Result<Histogram> = try_create_histogram(
Expand Down
77 changes: 71 additions & 6 deletions beacon_node/beacon_chain/src/snapshot_cache.rs
@@ -1,12 +1,18 @@
use crate::BeaconSnapshot;
use std::cmp;
use std::time::Duration;
use types::{
beacon_state::CloneConfig, BeaconState, Epoch, EthSpec, Hash256, SignedBeaconBlock, Slot,
beacon_state::CloneConfig, BeaconState, ChainSpec, Epoch, EthSpec, Hash256, SignedBeaconBlock,
Slot,
};

/// The default size of the cache.
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;

/// The minimum block delay to clone the state in the cache instead of removing it.
/// This helps keep block processing fast during re-orgs from late blocks.
const MINIMUM_BLOCK_DELAY_FOR_CLONE: Duration = Duration::from_secs(6);

/// This snapshot is to be used for verifying a child of `self.beacon_block`.
#[derive(Debug)]
pub struct PreProcessingSnapshot<T: EthSpec> {
Expand Down Expand Up @@ -62,6 +68,22 @@ impl<T: EthSpec> CacheItem<T> {
beacon_state_root,
}
}

pub fn clone_as_pre_state(&self) -> PreProcessingSnapshot<T> {
// Do not include the beacon state root if the state has been advanced.
let beacon_state_root =
Some(self.beacon_block.state_root()).filter(|_| self.pre_state.is_none());

PreProcessingSnapshot {
beacon_block: self.beacon_block.clone(),
beacon_block_root: self.beacon_block_root,
pre_state: self
.pre_state
.as_ref()
.map_or_else(|| self.beacon_state.clone(), |pre_state| pre_state.clone()),
beacon_state_root,
}
}
}

/// The information required for block production.
Expand Down Expand Up @@ -178,11 +200,36 @@ impl<T: EthSpec> SnapshotCache<T> {
/// If available, returns a `CacheItem` that should be used for importing/processing a block.
/// The method will remove the block from `self`, carrying across any caches that may or may not
/// be built.
pub fn get_state_for_block_processing(&mut self, block_root: Hash256) -> Option<CacheItem<T>> {
///
/// In the event the block being processed was observed late, clone the cache instead of
/// moving it. This allows us to process the next block quickly in the case of a re-org.
/// Additionally, if the slot was skipped, clone the cache. This ensures blocks that are
/// later than 1 slot still have access to the cache and can be processed quickly.
pub fn get_state_for_block_processing(
&mut self,
block_root: Hash256,
block_slot: Slot,
block_delay: Option<Duration>,
spec: &ChainSpec,
) -> Option<(PreProcessingSnapshot<T>, bool)> {
self.snapshots
.iter()
.position(|snapshot| snapshot.beacon_block_root == block_root)
.map(|i| self.snapshots.remove(i))
.map(|i| {
if let Some(cache) = self.snapshots.get(i) {
if block_slot > cache.beacon_block.slot() + 1 {
return (cache.clone_as_pre_state(), true);
}
if let Some(delay) = block_delay {
if delay >= MINIMUM_BLOCK_DELAY_FOR_CLONE
&& delay <= Duration::from_secs(spec.seconds_per_slot) * 4
{
return (cache.clone_as_pre_state(), true);
}
}
}
(self.snapshots.remove(i).into_pre_state(), false)
})
}

/// If available, obtains a clone of a `BeaconState` that should be used for block production.
Expand Down Expand Up @@ -320,6 +367,7 @@ mod test {

#[test]
fn insert_get_prune_update() {
let spec = MainnetEthSpec::default_spec();
let mut cache = SnapshotCache::new(CACHE_SIZE, get_snapshot(0));

// Insert a bunch of entries in the cache. It should look like this:
Expand Down Expand Up @@ -359,7 +407,12 @@ mod test {

assert!(
cache
.get_state_for_block_processing(Hash256::from_low_u64_be(1))
.get_state_for_block_processing(
Hash256::from_low_u64_be(1),
Slot::new(0),
None,
&spec
)
.is_none(),
"the snapshot with the lowest slot should have been removed during the insert function"
);
Expand All @@ -377,8 +430,14 @@ mod test {
);
assert_eq!(
cache
.get_state_for_block_processing(Hash256::from_low_u64_be(0))
.get_state_for_block_processing(
Hash256::from_low_u64_be(0),
Slot::new(0),
None,
&spec
)
.expect("the head should still be in the cache")
.0
.beacon_block_root,
Hash256::from_low_u64_be(0),
"get_state_for_block_processing should get the correct snapshot"
Expand Down Expand Up @@ -409,8 +468,14 @@ mod test {
// Ensure that the new head value was not removed from the cache.
assert_eq!(
cache
.get_state_for_block_processing(Hash256::from_low_u64_be(2))
.get_state_for_block_processing(
Hash256::from_low_u64_be(2),
Slot::new(0),
None,
&spec
)
.expect("the new head should still be in the cache")
.0
.beacon_block_root,
Hash256::from_low_u64_be(2),
"get_state_for_block_processing should get the correct snapshot"
Expand Down