Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Single-pass epoch processing #4483

Merged
merged 15 commits into from
Jul 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4299,6 +4299,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
let attestation_packing_timer =
metrics::start_timer(&metrics::BLOCK_PRODUCTION_ATTESTATION_TIMES);

state.build_total_active_balance_cache_at(state.current_epoch(), &self.spec)?;
let mut prev_filter_cache = HashMap::new();
let prev_attestation_filter = |att: &AttestationRef<T::EthSpec>| {
self.filter_op_pool_attestation(&mut prev_filter_cache, att, &state)
Expand Down
11 changes: 10 additions & 1 deletion beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ use state_processing::{
block_signature_verifier::{BlockSignatureVerifier, Error as BlockSignatureVerifierError},
per_block_processing, per_slot_processing,
state_advance::partial_state_advance,
BlockProcessingError, BlockSignatureStrategy, ConsensusContext, SlotProcessingError,
AllCaches, BlockProcessingError, BlockSignatureStrategy, ConsensusContext, SlotProcessingError,
StateProcessingStrategy, VerifyBlockRoot,
};
use std::borrow::Cow;
Expand Down Expand Up @@ -1712,6 +1712,15 @@ fn load_parent<T: BeaconChainTypes>(
BeaconChainError::DBInconsistent(format!("Missing state {:?}", parent_state_root))
})?;

if !state.all_caches_built() {
slog::warn!(
chain.log,
"Parent state lacks built caches";
"block_slot" => block.slot(),
"state_slot" => state.slot(),
);
}

if block.slot() != state.slot() {
slog::warn!(
chain.log,
Expand Down
3 changes: 2 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use proto_array::{DisallowedReOrgOffsets, ReOrgThreshold};
use slasher::Slasher;
use slog::{crit, error, info, Logger};
use slot_clock::{SlotClock, TestingSlotClock};
use state_processing::AllCaches;
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -447,7 +448,7 @@ where
// Prime all caches before storing the state in the database and computing the tree hash
// root.
weak_subj_state
.build_caches(&self.spec)
.build_all_caches(&self.spec)
.map_err(|e| format!("Error building caches on checkpoint state: {e:?}"))?;

let computed_state_root = weak_subj_state
Expand Down
6 changes: 5 additions & 1 deletion beacon_node/beacon_chain/src/canonical_head.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use itertools::process_results;
use parking_lot::{Mutex, RwLock, RwLockReadGuard, RwLockWriteGuard};
use slog::{crit, debug, error, warn, Logger};
use slot_clock::SlotClock;
use state_processing::AllCaches;
use std::sync::Arc;
use std::time::Duration;
use store::{iter::StateRootsIterator, KeyValueStoreOp, StoreItem};
Expand Down Expand Up @@ -666,7 +667,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

// Regardless of where we got the state from, attempt to build all the
// caches except the tree hash cache.
new_snapshot.beacon_state.build_caches(&self.spec)?;
new_snapshot
.beacon_state
.build_all_caches(&self.spec)
.map_err(Error::HeadCacheError)?;

let new_cached_head = CachedHead {
snapshot: Arc::new(new_snapshot),
Expand Down
1 change: 1 addition & 0 deletions beacon_node/beacon_chain/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ pub enum BeaconChainError {
SlotClockDidNotStart,
NoStateForSlot(Slot),
BeaconStateError(BeaconStateError),
HeadCacheError(EpochCacheError),
DBInconsistent(String),
DBError(store::Error),
ForkChoiceError(ForkChoiceError),
Expand Down
9 changes: 3 additions & 6 deletions beacon_node/http_api/src/validator_inclusion.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@ use eth2::{
lighthouse::{GlobalValidatorInclusionData, ValidatorInclusionData},
types::ValidatorId,
};
use state_processing::per_epoch_processing::{
altair::participation_cache::Error as ParticipationCacheError, process_epoch,
EpochProcessingSummary,
};
use types::{BeaconState, ChainSpec, Epoch, EthSpec};
use state_processing::per_epoch_processing::{process_epoch, EpochProcessingSummary};
use types::{BeaconState, BeaconStateError, ChainSpec, Epoch, EthSpec};

/// Returns the state in the last slot of `epoch`.
fn end_of_epoch_state<T: BeaconChainTypes>(
Expand All @@ -35,7 +32,7 @@ fn get_epoch_processing_summary<T: EthSpec>(
.map_err(|e| warp_utils::reject::custom_server_error(format!("{:?}", e)))
}

fn convert_cache_error(error: ParticipationCacheError) -> warp::reject::Rejection {
fn convert_cache_error(error: BeaconStateError) -> warp::reject::Rejection {
warp_utils::reject::custom_server_error(format!("{:?}", error))
}

Expand Down
9 changes: 8 additions & 1 deletion beacon_node/store/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::hdiff;
use crate::hot_cold_store::HotColdDBError;
use ssz::DecodeError;
use state_processing::BlockReplayError;
use types::{milhouse, BeaconStateError, Epoch, Hash256, InconsistentFork, Slot};
use types::{milhouse, BeaconStateError, Epoch, EpochCacheError, Hash256, InconsistentFork, Slot};

pub type Result<T> = std::result::Result<T, Error>;

Expand Down Expand Up @@ -71,6 +71,7 @@ pub enum Error {
Hdiff(hdiff::Error),
InconsistentFork(InconsistentFork),
ZeroCacheSize,
CacheBuildError(EpochCacheError),
}

pub trait HandleUnavailable<T> {
Expand Down Expand Up @@ -141,6 +142,12 @@ impl From<InconsistentFork> for Error {
}
}

impl From<EpochCacheError> for Error {
fn from(e: EpochCacheError) -> Error {
Error::CacheBuildError(e)
}
}

#[derive(Debug)]
pub struct DBError {
pub message: String,
Expand Down
11 changes: 6 additions & 5 deletions beacon_node/store/src/hot_cold_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ use slog::{debug, error, info, warn, Logger};
use ssz::{Decode, Encode};
use ssz_derive::{Decode, Encode};
use state_processing::{
block_replayer::PreSlotHook, BlockProcessingError, BlockReplayer, SlotProcessingError,
block_replayer::PreSlotHook, AllCaches, BlockProcessingError, BlockReplayer,
SlotProcessingError,
};
use std::cmp::min;
use std::collections::VecDeque;
Expand Down Expand Up @@ -1152,7 +1153,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let state_cacher_hook = |opt_state_root: Option<Hash256>, state: &mut BeaconState<_>| {
// Ensure all caches are built before attempting to cache.
state.update_tree_hash_cache()?;
state.build_caches(&self.spec)?;
state.build_all_caches(&self.spec)?;

if let Some(state_root) = opt_state_root {
// Cache
Expand Down Expand Up @@ -1246,7 +1247,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
)?;

state.update_tree_hash_cache()?;
state.build_caches(&self.spec)?;
state.build_all_caches(&self.spec)?;
}

// Apply state diff. Block replay should have ensured that the diff is now applicable.
Expand Down Expand Up @@ -1280,7 +1281,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>
let tree_hash_ms = t.elapsed().as_millis();

let t = std::time::Instant::now();
state.build_caches(&self.spec)?;
state.build_all_caches(&self.spec)?;
let cache_ms = t.elapsed().as_millis();

debug!(
Expand Down Expand Up @@ -1358,7 +1359,7 @@ impl<E: EthSpec, Hot: ItemStore<E>, Cold: ItemStore<E>> HotColdDB<E, Hot, Cold>

// Do a tree hash here so that the cache is fully built.
state.update_tree_hash_cache()?;
state.build_caches(&self.spec)?;
state.build_all_caches(&self.spec)?;

let latest_block_root = state.get_latest_block_root(*state_root);
Ok((state, latest_block_root))
Expand Down
Loading
Loading