Skip to content

Commit

Permalink
Advance state to next slot after importing block (#2174)
Browse files Browse the repository at this point in the history
## Issue Addressed

NA

## Proposed Changes

Add an optimization to perform `per_slot_processing` from the *leading-edge* of block processing to the *trailing-edge*. Ultimately, this allows us to import the block at slot `n` faster because we used the tail-end of slot `n - 1` to perform `per_slot_processing`.

Additionally, add a "block proposer cache" which allows us to cache the block proposer for some epoch. Since we're now doing trailing-edge `per_slot_processing`, we can prime this cache with the values for the next epoch before those blocks arrive (assuming those blocks don't have some weird forking).

There were several ancillary changes required to achieve this: 

- Remove the `state_root` field  of `BeaconSnapshot`, since there's no need to know it on a `pre_state` and in all other cases we can just read it from `block.state_root()`.
    - This caused some "dust" changes of `snapshot.beacon_state_root` to `snapshot.beacon_state_root()`, where the `BeaconSnapshot::beacon_state_root()` func just reads the state root from the block.
- Rename `types::ShuffingId` to `AttestationShufflingId`. I originally did this because I added a `ProposerShufflingId` struct which turned out to be not so useful. I thought this new name was more descriptive so I kept it.
- Address ethereum/consensus-specs#2196
- Add a debug log when we get a block with an unknown parent. There was previously no logging around this case.
- Add a function to `BeaconState` to compute all proposers for an epoch without re-computing the active indices for each slot.

## Additional Info

- ~~Blocked on #2173~~
- ~~Blocked on #2179~~ That PR was wrapped into this PR.
- There's potentially some places where we could avoid computing the proposer indices in `per_block_processing` but I haven't done this here. These would be an optimization beyond the issue at hand (improving block propagation times) and I think this PR is already doing enough. We can come back for that later.

## TODO

- [x] Tidy, improve comments.
- [x] ~~Try avoid computing proposer index in `per_block_processing`?~~
  • Loading branch information
paulhauner committed Feb 15, 2021
1 parent 8e5c20b commit 71864d2
Show file tree
Hide file tree
Showing 25 changed files with 1,067 additions and 136 deletions.
50 changes: 30 additions & 20 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::attestation_verification::{
Error as AttestationError, SignatureVerifiedAttestation, VerifiedAggregatedAttestation,
VerifiedUnaggregatedAttestation,
};
use crate::beacon_proposer_cache::BeaconProposerCache;
use crate::block_verification::{
check_block_is_finalized_descendant, check_block_relevancy, get_block_root,
signature_verify_chain_segment, BlockError, FullyVerifiedBlock, GossipVerifiedBlock,
Expand All @@ -24,7 +25,7 @@ use crate::shuffling_cache::{BlockShufflingIds, ShufflingCache};
use crate::snapshot_cache::SnapshotCache;
use crate::timeout_rw_lock::TimeoutRwLock;
use crate::validator_monitor::{
get_block_delay_ms, timestamp_now, ValidatorMonitor,
get_block_delay_ms, get_slot_delay_ms, timestamp_now, ValidatorMonitor,
HISTORIC_EPOCHS as VALIDATOR_MONITOR_HISTORIC_EPOCHS,
};
use crate::validator_pubkey_cache::ValidatorPubkeyCache;
Expand Down Expand Up @@ -231,8 +232,10 @@ pub struct BeaconChain<T: BeaconChainTypes> {
pub(crate) head_tracker: Arc<HeadTracker>,
/// A cache dedicated to block processing.
pub(crate) snapshot_cache: TimeoutRwLock<SnapshotCache<T::EthSpec>>,
/// Caches the shuffling for a given epoch and state root.
/// Caches the attester shuffling for a given epoch and shuffling key root.
pub(crate) shuffling_cache: TimeoutRwLock<ShufflingCache>,
/// Caches the beacon block proposer shuffling for a given epoch and shuffling key root.
pub(crate) beacon_proposer_cache: Mutex<BeaconProposerCache>,
/// Caches a map of `validator_index -> validator_pubkey`.
pub(crate) validator_pubkey_cache: TimeoutRwLock<ValidatorPubkeyCache>,
/// A list of any hard-coded forks that have been disabled.
Expand Down Expand Up @@ -453,9 +456,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
) -> Result<impl Iterator<Item = Result<(Hash256, Slot), Error>>, Error> {
let head = self.head()?;
let slot = head.beacon_state.slot;
let head_slot = head.beacon_state.slot;
let head_state_root = head.beacon_state_root();
let iter = StateRootsIterator::owned(self.store.clone(), head.beacon_state);
let iter = std::iter::once(Ok((head.beacon_state_root, slot)))
let iter = std::iter::once(Ok((head_state_root, head_slot)))
.chain(iter)
.map(|result| result.map_err(Into::into));
Ok(iter)
Expand Down Expand Up @@ -599,7 +603,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
Ok(HeadInfo {
slot: head.beacon_block.slot(),
block_root: head.beacon_block_root,
state_root: head.beacon_state_root,
state_root: head.beacon_state_root(),
current_justified_checkpoint: head.beacon_state.current_justified_checkpoint,
finalized_checkpoint: head.beacon_state.finalized_checkpoint,
fork: head.beacon_state.fork,
Expand Down Expand Up @@ -1549,7 +1553,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// For the current and next epoch of this state, ensure we have the shuffling from this
// block in our cache.
for relative_epoch in &[RelativeEpoch::Current, RelativeEpoch::Next] {
let shuffling_id = ShufflingId::new(block_root, &state, *relative_epoch)?;
let shuffling_id = AttestationShufflingId::new(block_root, &state, *relative_epoch)?;

let shuffling_is_cached = self
.shuffling_cache
Expand Down Expand Up @@ -1727,27 +1731,30 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.ok_or(Error::SnapshotCacheLockTimeout)
.map(|mut snapshot_cache| {
snapshot_cache.insert(BeaconSnapshot {
beacon_state: state,
beacon_state_root: signed_block.state_root(),
beacon_block: signed_block,
beacon_block_root: block_root,
});
snapshot_cache.insert(
BeaconSnapshot {
beacon_state: state,
beacon_block: signed_block,
beacon_block_root: block_root,
},
None,
)
})
.unwrap_or_else(|| {
.unwrap_or_else(|e| {
error!(
self.log,
"Failed to obtain cache write lock";
"lock" => "snapshot_cache",
"Failed to insert snapshot";
"error" => ?e,
"task" => "process block"
);
});

self.head_tracker
.register_block(block_root, parent_root, slot);

// send an event to the `events` endpoint after fully processing the block
// Send an event to the `events` endpoint after fully processing the block.
if let Some(event_handler) = self.event_handler.as_ref() {
if event_handler.has_block_subscribers() {
event_handler.register(EventKind::Block(SseBlock {
Expand Down Expand Up @@ -2021,7 +2028,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
beacon_block,
beacon_block_root,
beacon_state,
beacon_state_root,
})
})
.and_then(|mut snapshot| {
Expand Down Expand Up @@ -2096,7 +2102,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let update_head_timer = metrics::start_timer(&metrics::UPDATE_HEAD_TIMES);

// These fields are used for server-sent events
let state_root = new_head.beacon_state_root;
let state_root = new_head.beacon_state_root();
let head_slot = new_head.beacon_state.slot;
let target_epoch_start_slot = new_head
.beacon_state
Expand All @@ -2116,6 +2122,12 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

metrics::stop_timer(update_head_timer);

// Observe the delay between the start of the slot and when we set the block as head.
metrics::observe_duration(
&metrics::BEACON_BLOCK_HEAD_SLOT_START_DELAY_TIME,
get_slot_delay_ms(timestamp_now(), head_slot, &self.slot_clock),
);

self.snapshot_cache
.try_write_for(BLOCK_PROCESSING_CACHE_LOCK_TIMEOUT)
.map(|mut snapshot_cache| {
Expand Down Expand Up @@ -2458,7 +2470,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
beacon_block: self.head()?.beacon_block,
beacon_block_root: self.head()?.beacon_block_root,
beacon_state: self.head()?.beacon_state,
beacon_state_root: self.head()?.beacon_state_root,
};

dump.push(last_slot.clone());
Expand All @@ -2485,7 +2496,6 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
beacon_block,
beacon_block_root,
beacon_state,
beacon_state_root,
};

dump.push(slot.clone());
Expand Down
2 changes: 1 addition & 1 deletion beacon_node/beacon_chain/src/beacon_fork_choice_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ where
let anchor_state = &anchor.beacon_state;
let mut anchor_block_header = anchor_state.latest_block_header.clone();
if anchor_block_header.state_root == Hash256::zero() {
anchor_block_header.state_root = anchor.beacon_state_root;
anchor_block_header.state_root = anchor.beacon_state_root();
}
let anchor_root = anchor_block_header.canonical_root();
let anchor_epoch = anchor_state.current_epoch();
Expand Down
113 changes: 113 additions & 0 deletions beacon_node/beacon_chain/src/beacon_proposer_cache.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
//! The `BeaconProposer` cache stores the proposer indices for some epoch.
//!
//! This cache is keyed by `(epoch, block_root)` where `block_root` is the block root at
//! `end_slot(epoch - 1)`. We make the assertion that the proposer shuffling is identical for all
//! blocks in `epoch` which share the common ancestor of `block_root`.
//!
//! The cache is a fairly unintelligent LRU cache that is not pruned after finality. This makes it
//! very simple to reason about, but it might store values that are useless due to finalization. The
//! values it stores are very small, so this should not be an issue.

use lru::LruCache;
use smallvec::SmallVec;
use types::{BeaconStateError, Epoch, EthSpec, Fork, Hash256, Slot, Unsigned};

/// The number of sets of proposer indices that should be cached.
const CACHE_SIZE: usize = 16;

/// This value is fairly unimportant, it's used to avoid heap allocations. The result of it being
/// incorrect is non-substantial from a consensus perspective (and probably also from a
/// performance perspective).
const TYPICAL_SLOTS_PER_EPOCH: usize = 32;

/// For some given slot, this contains the proposer index (`index`) and the `fork` that should be
/// used to verify their signature.
pub struct Proposer {
pub index: usize,
pub fork: Fork,
}

/// The list of proposers for some given `epoch`, alongside the `fork` that should be used to verify
/// their signatures.
pub struct EpochBlockProposers {
/// The epoch to which the proposers pertain.
epoch: Epoch,
/// The fork that should be used to verify proposer signatures.
fork: Fork,
/// A list of length `T::EthSpec::slots_per_epoch()`, representing the proposers for each slot
/// in that epoch.
///
/// E.g., if `self.epoch == 1`, then `self.proposers[0]` contains the proposer for slot `32`.
proposers: SmallVec<[usize; TYPICAL_SLOTS_PER_EPOCH]>,
}

/// A cache to store the proposers for some epoch.
///
/// See the module-level documentation for more information.
pub struct BeaconProposerCache {
cache: LruCache<(Epoch, Hash256), EpochBlockProposers>,
}

impl Default for BeaconProposerCache {
fn default() -> Self {
Self {
cache: LruCache::new(CACHE_SIZE),
}
}
}

impl BeaconProposerCache {
/// If it is cached, returns the proposer for the block at `slot` where the block has the
/// ancestor block root of `shuffling_decision_block` at `end_slot(slot.epoch() - 1)`.
pub fn get<T: EthSpec>(
&mut self,
shuffling_decision_block: Hash256,
slot: Slot,
) -> Option<Proposer> {
let epoch = slot.epoch(T::slots_per_epoch());
let key = (epoch, shuffling_decision_block);
if let Some(cache) = self.cache.get(&key) {
// This `if` statement is likely unnecessary, but it feels like good practice.
if epoch == cache.epoch {
cache
.proposers
.get(slot.as_usize() % T::SlotsPerEpoch::to_usize())
.map(|&index| Proposer {
index,
fork: cache.fork,
})
} else {
None
}
} else {
None
}
}

/// Insert the proposers into the cache.
///
/// See `Self::get` for a description of `shuffling_decision_block`.
///
/// The `fork` value must be valid to verify proposer signatures in `epoch`.
pub fn insert(
&mut self,
epoch: Epoch,
shuffling_decision_block: Hash256,
proposers: Vec<usize>,
fork: Fork,
) -> Result<(), BeaconStateError> {
let key = (epoch, shuffling_decision_block);
if !self.cache.contains(&key) {
self.cache.put(
key,
EpochBlockProposers {
epoch,
fork,
proposers: proposers.into(),
},
);
}

Ok(())
}
}
15 changes: 9 additions & 6 deletions beacon_node/beacon_chain/src/beacon_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ pub struct BeaconSnapshot<E: EthSpec> {
pub beacon_block: SignedBeaconBlock<E>,
pub beacon_block_root: Hash256,
pub beacon_state: BeaconState<E>,
pub beacon_state_root: Hash256,
}

impl<E: EthSpec> BeaconSnapshot<E> {
Expand All @@ -18,36 +17,40 @@ impl<E: EthSpec> BeaconSnapshot<E> {
beacon_block: SignedBeaconBlock<E>,
beacon_block_root: Hash256,
beacon_state: BeaconState<E>,
beacon_state_root: Hash256,
) -> Self {
Self {
beacon_block,
beacon_block_root,
beacon_state,
beacon_state_root,
}
}

/// Returns the state root from `self.beacon_block`.
///
/// ## Caution
///
/// It is not strictly enforced that `root(self.beacon_state) == self.beacon_state_root()`.
pub fn beacon_state_root(&self) -> Hash256 {
self.beacon_block.message.state_root
}

/// Update all fields of the checkpoint.
pub fn update(
&mut self,
beacon_block: SignedBeaconBlock<E>,
beacon_block_root: Hash256,
beacon_state: BeaconState<E>,
beacon_state_root: Hash256,
) {
self.beacon_block = beacon_block;
self.beacon_block_root = beacon_block_root;
self.beacon_state = beacon_state;
self.beacon_state_root = beacon_state_root;
}

pub fn clone_with(&self, clone_config: CloneConfig) -> Self {
Self {
beacon_block: self.beacon_block.clone(),
beacon_block_root: self.beacon_block_root,
beacon_state: self.beacon_state.clone_with(clone_config),
beacon_state_root: self.beacon_state_root,
}
}
}

0 comments on commit 71864d2

Please sign in to comment.