Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions beacon_node/beacon_chain/src/attestation_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
let attestation_root = attestation.tree_hash_root();
if chain
.observed_attestations
.write()
.is_known(attestation, attestation_root)
.map_err(|e| Error::BeaconChainError(e.into()))?
{
Expand All @@ -329,6 +330,7 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
// Note: do not observe yet, only observe once the attestation has been verfied.
match chain
.observed_aggregators
.read()
.validator_has_been_observed(attestation, aggregator_index as usize)
{
Ok(true) => Err(Error::AggregatorAlreadyKnown(aggregator_index)),
Expand Down Expand Up @@ -400,6 +402,7 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
// attestations processed at the same time could be published.
if let ObserveOutcome::AlreadyKnown = chain
.observed_attestations
.write()
.observe_attestation(attestation, Some(attestation_root))
.map_err(|e| Error::BeaconChainError(e.into()))?
{
Expand All @@ -412,6 +415,7 @@ impl<T: BeaconChainTypes> VerifiedAggregatedAttestation<T> {
// attestations processed at the same time could be published.
if chain
.observed_aggregators
.write()
.observe_validator(&attestation, aggregator_index as usize)
.map_err(BeaconChainError::from)?
{
Expand Down Expand Up @@ -518,6 +522,7 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
*/
if chain
.observed_attesters
.read()
.validator_has_been_observed(&attestation, validator_index as usize)
.map_err(BeaconChainError::from)?
{
Expand All @@ -538,6 +543,7 @@ impl<T: BeaconChainTypes> VerifiedUnaggregatedAttestation<T> {
// process them in different threads.
if chain
.observed_attesters
.write()
.observe_validator(&attestation, validator_index as usize)
.map_err(BeaconChainError::from)?
{
Expand Down
37 changes: 22 additions & 15 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ use fork_choice::ForkChoice;
use futures::channel::mpsc::Sender;
use itertools::process_results;
use operation_pool::{OperationPool, PersistedOperationPool};
use parking_lot::RwLock;
use parking_lot::{Mutex, RwLock};
use slog::{crit, debug, error, info, trace, warn, Logger};
use slot_clock::SlotClock;
use state_processing::{
Expand Down Expand Up @@ -181,20 +181,21 @@ pub struct BeaconChain<T: BeaconChainTypes> {
/// a method to get an aggregated `Attestation` for some `AttestationData`.
pub naive_aggregation_pool: RwLock<NaiveAggregationPool<T::EthSpec>>,
/// Contains a store of attestations which have been observed by the beacon chain.
pub observed_attestations: ObservedAttestations<T::EthSpec>,
pub(crate) observed_attestations: RwLock<ObservedAttestations<T::EthSpec>>,
/// Maintains a record of which validators have been seen to attest in recent epochs.
pub observed_attesters: ObservedAttesters<T::EthSpec>,
pub(crate) observed_attesters: RwLock<ObservedAttesters<T::EthSpec>>,
/// Maintains a record of which validators have been seen to create `SignedAggregateAndProofs`
/// in recent epochs.
pub observed_aggregators: ObservedAggregators<T::EthSpec>,
pub(crate) observed_aggregators: RwLock<ObservedAggregators<T::EthSpec>>,
/// Maintains a record of which validators have proposed blocks for each slot.
pub observed_block_producers: ObservedBlockProducers<T::EthSpec>,
pub(crate) observed_block_producers: RwLock<ObservedBlockProducers<T::EthSpec>>,
/// Maintains a record of which validators have submitted voluntary exits.
pub observed_voluntary_exits: ObservedOperations<SignedVoluntaryExit, T::EthSpec>,
pub(crate) observed_voluntary_exits: Mutex<ObservedOperations<SignedVoluntaryExit, T::EthSpec>>,
/// Maintains a record of which validators we've seen proposer slashings for.
pub observed_proposer_slashings: ObservedOperations<ProposerSlashing, T::EthSpec>,
pub(crate) observed_proposer_slashings: Mutex<ObservedOperations<ProposerSlashing, T::EthSpec>>,
/// Maintains a record of which validators we've seen attester slashings for.
pub observed_attester_slashings: ObservedOperations<AttesterSlashing<T::EthSpec>, T::EthSpec>,
pub(crate) observed_attester_slashings:
Mutex<ObservedOperations<AttesterSlashing<T::EthSpec>, T::EthSpec>>,
/// Provides information from the Ethereum 1 (PoW) chain.
pub eth1_chain: Option<Eth1Chain<T::Eth1Chain, T::EthSpec>>,
/// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received.
Expand Down Expand Up @@ -1158,9 +1159,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
) -> Result<ObservationOutcome<SignedVoluntaryExit>, Error> {
// NOTE: this could be more efficient if it avoided cloning the head state
let wall_clock_state = self.wall_clock_state()?;
Ok(self
.observed_voluntary_exits
.verify_and_observe(exit, &wall_clock_state, &self.spec)?)
Ok(self.observed_voluntary_exits.lock().verify_and_observe(
exit,
&wall_clock_state,
&self.spec,
)?)
}

/// Accept a pre-verified exit and queue it for inclusion in an appropriate block.
Expand All @@ -1176,7 +1179,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
proposer_slashing: ProposerSlashing,
) -> Result<ObservationOutcome<ProposerSlashing>, Error> {
let wall_clock_state = self.wall_clock_state()?;
Ok(self.observed_proposer_slashings.verify_and_observe(
Ok(self.observed_proposer_slashings.lock().verify_and_observe(
proposer_slashing,
&wall_clock_state,
&self.spec,
Expand All @@ -1196,7 +1199,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
attester_slashing: AttesterSlashing<T::EthSpec>,
) -> Result<ObservationOutcome<AttesterSlashing<T::EthSpec>>, Error> {
let wall_clock_state = self.wall_clock_state()?;
Ok(self.observed_attester_slashings.verify_and_observe(
Ok(self.observed_attester_slashings.lock().verify_and_observe(
attester_slashing,
&wall_clock_state,
&self.spec,
Expand Down Expand Up @@ -1506,7 +1509,11 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
// Iterate through the attestations in the block and register them as an "observed
// attestation". This will stop us from propagating them on the gossip network.
for a in &signed_block.message.body.attestations {
match self.observed_attestations.observe_attestation(a, None) {
match self
.observed_attestations
.write()
.observe_attestation(a, None)
{
// If the observation was successful or if the slot for the attestation was too
// low, continue.
//
Expand Down Expand Up @@ -2091,7 +2098,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
self.fork_choice.write().prune()?;
let new_finalized_checkpoint = head_state.finalized_checkpoint;

self.observed_block_producers.prune(
self.observed_block_producers.write().prune(
new_finalized_checkpoint
.epoch
.start_slot(T::EthSpec::slots_per_epoch()),
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/block_verification.rs
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// Check that we have not already received a block with a valid signature for this slot.
if chain
.observed_block_producers
.read()
.proposer_has_been_observed(&block.message)
.map_err(|e| BlockError::BeaconChainError(e.into()))?
{
Expand Down Expand Up @@ -472,6 +473,7 @@ impl<T: BeaconChainTypes> GossipVerifiedBlock<T> {
// have a race-condition when verifying two blocks simultaneously.
if chain
.observed_block_producers
.write()
.observe_proposer(&block.message)
.map_err(|e| BlockError::BeaconChainError(e.into()))?
{
Expand Down
2 changes: 2 additions & 0 deletions beacon_node/beacon_chain/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,13 +419,15 @@ fn scrape_attestation_observation<T: BeaconChainTypes>(slot_now: Slot, chain: &B

if let Some(count) = chain
.observed_attesters
.read()
.observed_validator_count(prev_epoch)
{
set_gauge_by_usize(&ATTN_OBSERVATION_PREV_EPOCH_ATTESTERS, count);
}

if let Some(count) = chain
.observed_aggregators
.read()
.observed_validator_count(prev_epoch)
{
set_gauge_by_usize(&ATTN_OBSERVATION_PREV_EPOCH_AGGREGATORS, count);
Expand Down
Loading