Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 67 additions & 45 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use std::sync::Arc;
use std::time::{Duration, Instant};
use store::iter::{BlockRootsIterator, ParentRootBlockIterator, StateRootsIterator};
use store::{Error as DBError, HotColdDB, KeyValueStore, KeyValueStoreOp, StoreItem, StoreOp};
use types::beacon_state::CloneConfig;
use types::*;

pub type ForkChoiceError = fork_choice::Error<crate::ForkChoiceStoreError>;
Expand Down Expand Up @@ -542,7 +543,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
/// is the state as it was when the head block was received, which could be some slots prior to
/// now.
pub fn head(&self) -> Result<BeaconSnapshot<T::EthSpec>, Error> {
self.with_head(|head| Ok(head.clone_with_only_committee_caches()))
self.with_head(|head| Ok(head.clone_with(CloneConfig::committee_caches_only())))
}

/// Apply a function to the canonical head without cloning it.
Expand Down Expand Up @@ -786,37 +787,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
}

/// Returns the block proposer for a given slot.
///
/// Information is read from the present `beacon_state` shuffling, only information from the
/// present epoch is available.
pub fn block_proposer(&self, slot: Slot) -> Result<usize, Error> {
let epoch = |slot: Slot| slot.epoch(T::EthSpec::slots_per_epoch());
let head_state = &self.head()?.beacon_state;

let mut state = if epoch(slot) == epoch(head_state.slot) {
self.head()?.beacon_state
} else {
// The block proposer shuffling is not affected by the state roots, so we don't need to
// calculate them.
self.state_at_slot(slot, StateSkipConfig::WithoutStateRoots)?
};

state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;

if epoch(state.slot) != epoch(slot) {
return Err(Error::InvariantViolated(format!(
"Epochs in consistent in proposer lookup: state: {}, requested: {}",
epoch(state.slot),
epoch(slot)
)));
}

state
.get_beacon_proposer_index(slot, &self.spec)
.map_err(Into::into)
}

/// Returns the attestation duties for a given validator index.
///
/// Information is read from the current state, so only information from the present and prior
Expand Down Expand Up @@ -1771,9 +1741,49 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
slot: Slot,
validator_graffiti: Option<Graffiti>,
) -> Result<BeaconBlockAndState<T::EthSpec>, BlockProductionError> {
let state = self
.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
Copy link
Member

@paulhauner paulhauner Dec 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you aware that the slot - 1 has been removed and we will no longer be able to produce blocks from slots earlier than the head block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I did that intentionally, I'll message you

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! I've restored the slot - 1 with a warning, as we discussed.

I think this will be particularly relevant on the first slot of an epoch when there might be two seemingly legitimate proposers because of propagation delay of the last block of the previous epoch.

.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?;
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS);
let _complete_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES);

// Producing a block requires the tree hash cache, so clone a full state corresponding to
// the head from the snapshot cache. Unfortunately we can't move the snapshot out of the
// cache (which would be fast), because we need to re-process the block after it has been
// signed. If we miss the cache or we're producing a block that conflicts with the head,
// fall back to getting the head from `slot - 1`.
let state_load_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_STATE_LOAD_TIMES);
let head_info = self
.head_info()
.map_err(BlockProductionError::UnableToGetHeadInfo)?;
let state = if head_info.slot < slot {
// Normal case: proposing a block atop the current head. Use the snapshot cache.
if let Some(snapshot) = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| {
snapshot_cache.get_cloned(head_info.block_root, CloneConfig::all())
})
{
snapshot.beacon_state
} else {
warn!(
self.log,
"Block production cache miss";
"message" => "this block is more likely to be orphaned",
"slot" => slot,
);
self.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does feel like this and L1783-1874 should be de-duped, but I wont block on it.

.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?
}
} else {
warn!(
self.log,
"Producing block that conflicts with head";
"message" => "this block is more likely to be orphaned",
"slot" => slot,
);
self.state_at_slot(slot - 1, StateSkipConfig::WithStateRoots)
.map_err(|_| BlockProductionError::UnableToProduceAtSlot(slot))?
};
drop(state_load_timer);

self.produce_block_on_state(state, slot, randao_reveal, validator_graffiti)
}
Expand All @@ -1793,21 +1803,20 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
randao_reveal: Signature,
validator_graffiti: Option<Graffiti>,
) -> Result<BeaconBlockAndState<T::EthSpec>, BlockProductionError> {
metrics::inc_counter(&metrics::BLOCK_PRODUCTION_REQUESTS);
let timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_TIMES);

let eth1_chain = self
.eth1_chain
.as_ref()
.ok_or(BlockProductionError::NoEth1ChainConnection)?;

let slot_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_SLOT_PROCESS_TIMES);
// If required, transition the new state to the present slot.
//
// Note: supplying some `state_root` when it it is known would be a cheap and easy
// optimization.
while state.slot < produce_at_slot {
per_slot_processing(&mut state, None, &self.spec)?;
}
drop(slot_timer);

state.build_committee_cache(RelativeEpoch::Current, &self.spec)?;

Expand Down Expand Up @@ -1844,6 +1853,8 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Iterate through the naive aggregation pool and ensure all the attestations from there
// are included in the operation pool.
let unagg_import_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_UNAGGREGATED_TIMES);
for attestation in self.naive_aggregation_pool.read().iter() {
if let Err(e) = self.op_pool.insert_attestation(
attestation.clone(),
Expand All @@ -1859,13 +1870,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
);
}
}
drop(unagg_import_timer);

// Override the beacon node's graffiti with graffiti from the validator, if present.
let graffiti = match validator_graffiti {
Some(graffiti) => graffiti,
None => self.graffiti,
};

let attestation_packing_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);
let attestations = self
.op_pool
.get_attestations(&state, attestation_filter, &self.spec)
.map_err(BlockProductionError::OpPoolError)?
.into();
drop(attestation_packing_timer);

let mut block = SignedBeaconBlock {
message: BeaconBlock {
slot: state.slot,
Expand All @@ -1878,11 +1899,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
graffiti,
proposer_slashings: proposer_slashings.into(),
attester_slashings: attester_slashings.into(),
attestations: self
.op_pool
.get_attestations(&state, attestation_filter, &self.spec)
.map_err(BlockProductionError::OpPoolError)?
.into(),
attestations,
deposits,
voluntary_exits: self.op_pool.get_voluntary_exits(&state, &self.spec).into(),
},
Expand All @@ -1891,20 +1908,23 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
signature: Signature::empty(),
};

let process_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_PROCESS_TIMES);
per_block_processing(
&mut state,
&block,
None,
BlockSignatureStrategy::NoVerification,
&self.spec,
)?;
drop(process_timer);

let state_root_timer = metrics::start_timer(&metrics::BLOCK_PRODUCTION_STATE_ROOT_TIMES);
let state_root = state.update_tree_hash_cache()?;
drop(state_root_timer);

block.message.state_root = state_root;

metrics::inc_counter(&metrics::BLOCK_PRODUCTION_SUCCESSES);
metrics::stop_timer(timer);

trace!(
self.log,
Expand Down Expand Up @@ -1950,7 +1970,9 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let new_head = self
.snapshot_cache
.try_read_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.and_then(|snapshot_cache| snapshot_cache.get_cloned(beacon_block_root))
.and_then(|snapshot_cache| {
snapshot_cache.get_cloned(beacon_block_root, CloneConfig::committee_caches_only())
})
.map::<Result<_, Error>, _>(Ok)
.unwrap_or_else(|| {
let beacon_block = self
Expand Down
6 changes: 3 additions & 3 deletions beacon_node/beacon_chain/src/beacon_snapshot.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use serde_derive::Serialize;
use ssz_derive::{Decode, Encode};
use types::{BeaconState, EthSpec, Hash256, SignedBeaconBlock};
use types::{beacon_state::CloneConfig, BeaconState, EthSpec, Hash256, SignedBeaconBlock};

/// Represents some block and its associated state. Generally, this will be used for tracking the
/// head, justified head and finalized head.
Expand Down Expand Up @@ -42,11 +42,11 @@ impl<E: EthSpec> BeaconSnapshot<E> {
self.beacon_state_root = beacon_state_root;
}

pub fn clone_with_only_committee_caches(&self) -> Self {
pub fn clone_with(&self, clone_config: CloneConfig) -> Self {
Self {
beacon_block: self.beacon_block.clone(),
beacon_block_root: self.beacon_block_root,
beacon_state: self.beacon_state.clone_with_only_committee_caches(),
beacon_state: self.beacon_state.clone_with(clone_config),
beacon_state_root: self.beacon_state_root,
}
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ easy_from_to!(ForkChoiceStoreError, BeaconChainError);

#[derive(Debug)]
pub enum BlockProductionError {
UnableToGetHeadInfo(BeaconChainError),
UnableToGetBlockRootFromState,
UnableToReadSlot,
UnableToProduceAtSlot(Slot),
Expand Down
24 changes: 24 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,30 @@ lazy_static! {
);
pub static ref BLOCK_PRODUCTION_TIMES: Result<Histogram> =
try_create_histogram("beacon_block_production_seconds", "Full runtime of block production");
pub static ref BLOCK_PRODUCTION_STATE_LOAD_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_state_load_seconds",
"Time taken to load the base state for block production"
);
pub static ref BLOCK_PRODUCTION_SLOT_PROCESS_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_slot_process_seconds",
"Time taken to advance the state to the block production slot"
);
pub static ref BLOCK_PRODUCTION_UNAGGREGATED_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_unaggregated_seconds",
"Time taken to import the naive aggregation pool for block production"
);
pub static ref BLOCK_PRODUCTION_ATTESTATION_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_attestation_seconds",
"Time taken to pack attestations into a block"
);
pub static ref BLOCK_PRODUCTION_PROCESS_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_process_seconds",
"Time taken to process the block produced"
);
pub static ref BLOCK_PRODUCTION_STATE_ROOT_TIMES: Result<Histogram> = try_create_histogram(
"beacon_block_production_state_root_seconds",
"Time taken to calculate the block's state root"
);

/*
* Block Statistics
Expand Down
19 changes: 12 additions & 7 deletions beacon_node/beacon_chain/src/snapshot_cache.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use crate::BeaconSnapshot;
use std::cmp;
use types::{Epoch, EthSpec, Hash256};
use types::{beacon_state::CloneConfig, Epoch, EthSpec, Hash256};

/// The default size of the cache.
pub const DEFAULT_SNAPSHOT_CACHE_SIZE: usize = 4;
Expand Down Expand Up @@ -69,13 +69,16 @@ impl<T: EthSpec> SnapshotCache<T> {
.map(|i| self.snapshots.remove(i))
}

/// If there is a snapshot with `block_root`, clone it (with only the committee caches) and
/// return the clone.
pub fn get_cloned(&self, block_root: Hash256) -> Option<BeaconSnapshot<T>> {
/// If there is a snapshot with `block_root`, clone it and return the clone.
pub fn get_cloned(
&self,
block_root: Hash256,
clone_config: CloneConfig,
) -> Option<BeaconSnapshot<T>> {
self.snapshots
.iter()
.find(|snapshot| snapshot.beacon_block_root == block_root)
.map(|snapshot| snapshot.clone_with_only_committee_caches())
.map(|snapshot| snapshot.clone_with(clone_config))
}

/// Removes all snapshots from the queue that are less than or equal to the finalized epoch.
Expand Down Expand Up @@ -165,11 +168,13 @@ mod test {
cache.try_remove(Hash256::from_low_u64_be(1)).is_none(),
"the snapshot with the lowest slot should have been removed during the insert function"
);
assert!(cache.get_cloned(Hash256::from_low_u64_be(1)).is_none());
assert!(cache
.get_cloned(Hash256::from_low_u64_be(1), CloneConfig::none())
.is_none());

assert!(
cache
.get_cloned(Hash256::from_low_u64_be(0))
.get_cloned(Hash256::from_low_u64_be(0), CloneConfig::none())
.expect("the head should still be in the cache")
.beacon_block_root
== Hash256::from_low_u64_be(0),
Expand Down