diff --git a/beacon_node/beacon_chain/src/attestation_verification.rs b/beacon_node/beacon_chain/src/attestation_verification.rs index 39436bcdb71..1b3c97e42b1 100644 --- a/beacon_node/beacon_chain/src/attestation_verification.rs +++ b/beacon_node/beacon_chain/src/attestation_verification.rs @@ -316,6 +316,7 @@ impl VerifiedAggregatedAttestation { let attestation_root = attestation.tree_hash_root(); if chain .observed_attestations + .write() .is_known(attestation, attestation_root) .map_err(|e| Error::BeaconChainError(e.into()))? { @@ -329,6 +330,7 @@ impl VerifiedAggregatedAttestation { // Note: do not observe yet, only observe once the attestation has been verfied. match chain .observed_aggregators + .read() .validator_has_been_observed(attestation, aggregator_index as usize) { Ok(true) => Err(Error::AggregatorAlreadyKnown(aggregator_index)), @@ -400,6 +402,7 @@ impl VerifiedAggregatedAttestation { // attestations processed at the same time could be published. if let ObserveOutcome::AlreadyKnown = chain .observed_attestations + .write() .observe_attestation(attestation, Some(attestation_root)) .map_err(|e| Error::BeaconChainError(e.into()))? { @@ -412,6 +415,7 @@ impl VerifiedAggregatedAttestation { // attestations processed at the same time could be published. if chain .observed_aggregators + .write() .observe_validator(&attestation, aggregator_index as usize) .map_err(BeaconChainError::from)? { @@ -518,6 +522,7 @@ impl VerifiedUnaggregatedAttestation { */ if chain .observed_attesters + .read() .validator_has_been_observed(&attestation, validator_index as usize) .map_err(BeaconChainError::from)? { @@ -538,6 +543,7 @@ impl VerifiedUnaggregatedAttestation { // process them in different threads. if chain .observed_attesters + .write() .observe_validator(&attestation, validator_index as usize) .map_err(BeaconChainError::from)? { diff --git a/beacon_node/beacon_chain/src/beacon_chain.rs b/beacon_node/beacon_chain/src/beacon_chain.rs index c97159d9a47..e1f86b74a6f 100644 --- a/beacon_node/beacon_chain/src/beacon_chain.rs +++ b/beacon_node/beacon_chain/src/beacon_chain.rs @@ -31,7 +31,7 @@ use fork_choice::ForkChoice; use futures::channel::mpsc::Sender; use itertools::process_results; use operation_pool::{OperationPool, PersistedOperationPool}; -use parking_lot::RwLock; +use parking_lot::{Mutex, RwLock}; use slog::{crit, debug, error, info, trace, warn, Logger}; use slot_clock::SlotClock; use state_processing::{ @@ -181,20 +181,21 @@ pub struct BeaconChain { /// a method to get an aggregated `Attestation` for some `AttestationData`. pub naive_aggregation_pool: RwLock>, /// Contains a store of attestations which have been observed by the beacon chain. - pub observed_attestations: ObservedAttestations, + pub(crate) observed_attestations: RwLock>, /// Maintains a record of which validators have been seen to attest in recent epochs. - pub observed_attesters: ObservedAttesters, + pub(crate) observed_attesters: RwLock>, /// Maintains a record of which validators have been seen to create `SignedAggregateAndProofs` /// in recent epochs. - pub observed_aggregators: ObservedAggregators, + pub(crate) observed_aggregators: RwLock>, /// Maintains a record of which validators have proposed blocks for each slot. - pub observed_block_producers: ObservedBlockProducers, + pub(crate) observed_block_producers: RwLock>, /// Maintains a record of which validators have submitted voluntary exits. - pub observed_voluntary_exits: ObservedOperations, + pub(crate) observed_voluntary_exits: Mutex>, /// Maintains a record of which validators we've seen proposer slashings for. - pub observed_proposer_slashings: ObservedOperations, + pub(crate) observed_proposer_slashings: Mutex>, /// Maintains a record of which validators we've seen attester slashings for. - pub observed_attester_slashings: ObservedOperations, T::EthSpec>, + pub(crate) observed_attester_slashings: + Mutex, T::EthSpec>>, /// Provides information from the Ethereum 1 (PoW) chain. pub eth1_chain: Option>, /// Stores a "snapshot" of the chain at the time the head-of-the-chain block was received. @@ -1158,9 +1159,11 @@ impl BeaconChain { ) -> Result, Error> { // NOTE: this could be more efficient if it avoided cloning the head state let wall_clock_state = self.wall_clock_state()?; - Ok(self - .observed_voluntary_exits - .verify_and_observe(exit, &wall_clock_state, &self.spec)?) + Ok(self.observed_voluntary_exits.lock().verify_and_observe( + exit, + &wall_clock_state, + &self.spec, + )?) } /// Accept a pre-verified exit and queue it for inclusion in an appropriate block. @@ -1176,7 +1179,7 @@ impl BeaconChain { proposer_slashing: ProposerSlashing, ) -> Result, Error> { let wall_clock_state = self.wall_clock_state()?; - Ok(self.observed_proposer_slashings.verify_and_observe( + Ok(self.observed_proposer_slashings.lock().verify_and_observe( proposer_slashing, &wall_clock_state, &self.spec, @@ -1196,7 +1199,7 @@ impl BeaconChain { attester_slashing: AttesterSlashing, ) -> Result>, Error> { let wall_clock_state = self.wall_clock_state()?; - Ok(self.observed_attester_slashings.verify_and_observe( + Ok(self.observed_attester_slashings.lock().verify_and_observe( attester_slashing, &wall_clock_state, &self.spec, @@ -1506,7 +1509,11 @@ impl BeaconChain { // Iterate through the attestations in the block and register them as an "observed // attestation". This will stop us from propagating them on the gossip network. for a in &signed_block.message.body.attestations { - match self.observed_attestations.observe_attestation(a, None) { + match self + .observed_attestations + .write() + .observe_attestation(a, None) + { // If the observation was successful or if the slot for the attestation was too // low, continue. // @@ -2091,7 +2098,7 @@ impl BeaconChain { self.fork_choice.write().prune()?; let new_finalized_checkpoint = head_state.finalized_checkpoint; - self.observed_block_producers.prune( + self.observed_block_producers.write().prune( new_finalized_checkpoint .epoch .start_slot(T::EthSpec::slots_per_epoch()), diff --git a/beacon_node/beacon_chain/src/block_verification.rs b/beacon_node/beacon_chain/src/block_verification.rs index aaa0425dd16..47a96c6806e 100644 --- a/beacon_node/beacon_chain/src/block_verification.rs +++ b/beacon_node/beacon_chain/src/block_verification.rs @@ -418,6 +418,7 @@ impl GossipVerifiedBlock { // Check that we have not already received a block with a valid signature for this slot. if chain .observed_block_producers + .read() .proposer_has_been_observed(&block.message) .map_err(|e| BlockError::BeaconChainError(e.into()))? { @@ -472,6 +473,7 @@ impl GossipVerifiedBlock { // have a race-condition when verifying two blocks simultaneously. if chain .observed_block_producers + .write() .observe_proposer(&block.message) .map_err(|e| BlockError::BeaconChainError(e.into()))? { diff --git a/beacon_node/beacon_chain/src/metrics.rs b/beacon_node/beacon_chain/src/metrics.rs index 42fac5aa374..4f4a58960b9 100644 --- a/beacon_node/beacon_chain/src/metrics.rs +++ b/beacon_node/beacon_chain/src/metrics.rs @@ -419,6 +419,7 @@ fn scrape_attestation_observation(slot_now: Slot, chain: &B if let Some(count) = chain .observed_attesters + .read() .observed_validator_count(prev_epoch) { set_gauge_by_usize(&ATTN_OBSERVATION_PREV_EPOCH_ATTESTERS, count); @@ -426,6 +427,7 @@ fn scrape_attestation_observation(slot_now: Slot, chain: &B if let Some(count) = chain .observed_aggregators + .read() .observed_validator_count(prev_epoch) { set_gauge_by_usize(&ATTN_OBSERVATION_PREV_EPOCH_AGGREGATORS, count); diff --git a/beacon_node/beacon_chain/src/observed_attestations.rs b/beacon_node/beacon_chain/src/observed_attestations.rs index f74dd39ae51..0bea39ff862 100644 --- a/beacon_node/beacon_chain/src/observed_attestations.rs +++ b/beacon_node/beacon_chain/src/observed_attestations.rs @@ -1,7 +1,6 @@ //! Provides an `ObservedAttestations` struct which allows us to reject aggregated attestations if //! we've already seen the aggregated attestation. -use parking_lot::RwLock; use std::collections::HashSet; use std::marker::PhantomData; use tree_hash::TreeHash; @@ -116,16 +115,16 @@ impl SlotHashSet { /// Stores the roots of `Attestation` objects for some number of `Slots`, so we can determine if /// these have previously been seen on the network. pub struct ObservedAttestations { - lowest_permissible_slot: RwLock, - sets: RwLock>, + lowest_permissible_slot: Slot, + sets: Vec, _phantom: PhantomData, } impl Default for ObservedAttestations { fn default() -> Self { Self { - lowest_permissible_slot: RwLock::new(Slot::new(0)), - sets: RwLock::new(vec![]), + lowest_permissible_slot: Slot::new(0), + sets: vec![], _phantom: PhantomData, } } @@ -136,7 +135,7 @@ impl ObservedAttestations { /// /// `root` must equal `a.tree_hash_root()`. pub fn observe_attestation( - &self, + &mut self, a: &Attestation, root_opt: Option, ) -> Result { @@ -144,7 +143,6 @@ impl ObservedAttestations { let root = root_opt.unwrap_or_else(|| a.tree_hash_root()); self.sets - .write() .get_mut(index) .ok_or_else(|| Error::InvalidSetIndex(index)) .and_then(|set| set.observe_attestation(a, root)) @@ -153,11 +151,10 @@ impl ObservedAttestations { /// Check to see if the `root` of `a` is in self. /// /// `root` must equal `a.tree_hash_root()`. - pub fn is_known(&self, a: &Attestation, root: Hash256) -> Result { + pub fn is_known(&mut self, a: &Attestation, root: Hash256) -> Result { let index = self.get_set_index(a.data.slot)?; self.sets - .read() .get(index) .ok_or_else(|| Error::InvalidSetIndex(index)) .and_then(|set| set.is_known(a, root)) @@ -172,23 +169,21 @@ impl ObservedAttestations { /// Removes any attestations with a slot lower than `current_slot` and bars any future /// attestations with a slot lower than `current_slot - SLOTS_RETAINED`. - pub fn prune(&self, current_slot: Slot) { + pub fn prune(&mut self, current_slot: Slot) { // Taking advantage of saturating subtraction on `Slot`. let lowest_permissible_slot = current_slot - (self.max_capacity() - 1); - self.sets - .write() - .retain(|set| set.slot >= lowest_permissible_slot); + self.sets.retain(|set| set.slot >= lowest_permissible_slot); - *self.lowest_permissible_slot.write() = lowest_permissible_slot; + self.lowest_permissible_slot = lowest_permissible_slot; } /// Returns the index of `self.set` that matches `slot`. /// /// If there is no existing set for this slot one will be created. If `self.sets.len() >= /// Self::max_capacity()`, the set with the lowest slot will be replaced. - fn get_set_index(&self, slot: Slot) -> Result { - let lowest_permissible_slot: Slot = *self.lowest_permissible_slot.read(); + fn get_set_index(&mut self, slot: Slot) -> Result { + let lowest_permissible_slot = self.lowest_permissible_slot; if slot < lowest_permissible_slot { return Err(Error::SlotTooLow { @@ -202,15 +197,14 @@ impl ObservedAttestations { self.prune(slot) } - let mut sets = self.sets.write(); - - if let Some(index) = sets.iter().position(|set| set.slot == slot) { + if let Some(index) = self.sets.iter().position(|set| set.slot == slot) { return Ok(index); } // To avoid re-allocations, try and determine a rough initial capacity for the new set // by obtaining the mean size of all items in earlier epoch. - let (count, sum) = sets + let (count, sum) = self + .sets .iter() // Only include slots that are less than the given slot in the average. This should // generally avoid including recent slots that are still "filling up". @@ -222,20 +216,21 @@ impl ObservedAttestations { // but considering it's approx. 128 * 32 bytes we're not wasting much. let initial_capacity = sum.checked_div(count).unwrap_or(128); - if sets.len() < self.max_capacity() as usize || sets.is_empty() { - let index = sets.len(); - sets.push(SlotHashSet::new(slot, initial_capacity)); + if self.sets.len() < self.max_capacity() as usize || self.sets.is_empty() { + let index = self.sets.len(); + self.sets.push(SlotHashSet::new(slot, initial_capacity)); return Ok(index); } - let index = sets + let index = self + .sets .iter() .enumerate() .min_by_key(|(_i, set)| set.slot) .map(|(i, _set)| i) .expect("sets cannot be empty due to previous .is_empty() check"); - sets[index] = SlotHashSet::new(slot, initial_capacity); + self.sets[index] = SlotHashSet::new(slot, initial_capacity); Ok(index) } @@ -259,7 +254,7 @@ mod tests { a } - fn single_slot_test(store: &ObservedAttestations, slot: Slot) { + fn single_slot_test(store: &mut ObservedAttestations, slot: Slot) { let attestations = (0..NUM_ELEMENTS as u64) .map(|i| get_attestation(slot, i)) .collect::>(); @@ -293,17 +288,13 @@ mod tests { #[test] fn single_slot() { - let store = ObservedAttestations::default(); + let mut store = ObservedAttestations::default(); - single_slot_test(&store, Slot::new(0)); + single_slot_test(&mut store, Slot::new(0)); + assert_eq!(store.sets.len(), 1, "should have a single set stored"); assert_eq!( - store.sets.read().len(), - 1, - "should have a single set stored" - ); - assert_eq!( - store.sets.read()[0].len(), + store.sets[0].len(), NUM_ELEMENTS, "set should have NUM_ELEMENTS elements" ); @@ -311,13 +302,13 @@ mod tests { #[test] fn mulitple_contiguous_slots() { - let store = ObservedAttestations::default(); + let mut store = ObservedAttestations::default(); let max_cap = store.max_capacity(); for i in 0..max_cap * 3 { let slot = Slot::new(i); - single_slot_test(&store, slot); + single_slot_test(&mut store, slot); /* * Ensure that the number of sets is correct. @@ -325,14 +316,14 @@ mod tests { if i < max_cap { assert_eq!( - store.sets.read().len(), + store.sets.len(), i as usize + 1, "should have a {} sets stored", i + 1 ); } else { assert_eq!( - store.sets.read().len(), + store.sets.len(), max_cap as usize, "should have max_capacity sets stored" ); @@ -342,7 +333,7 @@ mod tests { * Ensure that each set contains the correct number of elements. */ - for set in &store.sets.read()[..] { + for set in &store.sets[..] { assert_eq!( set.len(), NUM_ELEMENTS, @@ -354,12 +345,7 @@ mod tests { * Ensure that all the sets have the expected slots */ - let mut store_slots = store - .sets - .read() - .iter() - .map(|set| set.slot) - .collect::>(); + let mut store_slots = store.sets.iter().map(|set| set.slot).collect::>(); assert!( store_slots.len() <= store.max_capacity() as usize, @@ -378,7 +364,7 @@ mod tests { #[test] fn mulitple_non_contiguous_slots() { - let store = ObservedAttestations::default(); + let mut store = ObservedAttestations::default(); let max_cap = store.max_capacity(); let to_skip = vec![1_u64, 2, 3, 5, 6, 29, 30, 31, 32, 64]; @@ -394,13 +380,13 @@ mod tests { let slot = Slot::from(i); - single_slot_test(&store, slot); + single_slot_test(&mut store, slot); /* * Ensure that each set contains the correct number of elements. */ - for set in &store.sets.read()[..] { + for set in &store.sets[..] { assert_eq!( set.len(), NUM_ELEMENTS, @@ -412,12 +398,7 @@ mod tests { * Ensure that all the sets have the expected slots */ - let mut store_slots = store - .sets - .read() - .iter() - .map(|set| set.slot) - .collect::>(); + let mut store_slots = store.sets.iter().map(|set| set.slot).collect::>(); store_slots.sort_unstable(); @@ -426,7 +407,7 @@ mod tests { "store size should not exceed max" ); - let lowest = store.lowest_permissible_slot.read().as_u64(); + let lowest = store.lowest_permissible_slot.as_u64(); let highest = slot.as_u64(); let expected_slots = (lowest..=highest) .filter(|i| !to_skip.contains(i)) diff --git a/beacon_node/beacon_chain/src/observed_attesters.rs b/beacon_node/beacon_chain/src/observed_attesters.rs index c0eac9b4d8f..f93ba6fce49 100644 --- a/beacon_node/beacon_chain/src/observed_attesters.rs +++ b/beacon_node/beacon_chain/src/observed_attesters.rs @@ -7,7 +7,6 @@ //! the same epoch use bitvec::vec::BitVec; -use parking_lot::RwLock; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use types::{Attestation, Epoch, EthSpec, Unsigned}; @@ -148,16 +147,16 @@ impl Item for EpochHashSet { /// /// `T` should be set to a `EpochBitfield` or `EpochHashSet`. pub struct AutoPruningContainer { - lowest_permissible_epoch: RwLock, - items: RwLock>, + lowest_permissible_epoch: Epoch, + items: HashMap, _phantom: PhantomData, } impl Default for AutoPruningContainer { fn default() -> Self { Self { - lowest_permissible_epoch: RwLock::new(Epoch::new(0)), - items: RwLock::new(HashMap::new()), + lowest_permissible_epoch: Epoch::new(0), + items: HashMap::new(), _phantom: PhantomData, } } @@ -172,7 +171,7 @@ impl AutoPruningContainer { /// - `validator_index` is higher than `VALIDATOR_REGISTRY_LIMIT`. /// - `a.data.target.slot` is earlier than `self.earliest_permissible_slot`. pub fn observe_validator( - &self, + &mut self, a: &Attestation, validator_index: usize, ) -> Result { @@ -182,14 +181,13 @@ impl AutoPruningContainer { self.prune(epoch); - let mut items = self.items.write(); - - if let Some(item) = items.get_mut(&epoch) { + if let Some(item) = self.items.get_mut(&epoch) { Ok(item.insert(validator_index)) } else { // To avoid re-allocations, try and determine a rough initial capacity for the new item // by obtaining the mean size of all items in earlier epoch. - let (count, sum) = items + let (count, sum) = self + .items .iter() // Only include epochs that are less than the given slot in the average. This should // generally avoid including recent epochs that are still "filling up". @@ -201,7 +199,7 @@ impl AutoPruningContainer { let mut item = T::with_capacity(initial_capacity); item.insert(validator_index); - items.insert(epoch, item); + self.items.insert(epoch, item); Ok(false) } @@ -223,7 +221,6 @@ impl AutoPruningContainer { let exists = self .items - .read() .get(&a.data.target.epoch) .map_or(false, |item| item.contains(validator_index)); @@ -233,10 +230,7 @@ impl AutoPruningContainer { /// Returns the number of validators that have been observed at the given `epoch`. Returns /// `None` if `self` does not have a cache for that epoch. pub fn observed_validator_count(&self, epoch: Epoch) -> Option { - self.items - .read() - .get(&epoch) - .map(|item| item.validator_count()) + self.items.get(&epoch).map(|item| item.validator_count()) } fn sanitize_request(&self, a: &Attestation, validator_index: usize) -> Result<(), Error> { @@ -245,7 +239,7 @@ impl AutoPruningContainer { } let epoch = a.data.target.epoch; - let lowest_permissible_epoch: Epoch = *self.lowest_permissible_epoch.read(); + let lowest_permissible_epoch = self.lowest_permissible_epoch; if epoch < lowest_permissible_epoch { return Err(Error::EpochTooLow { epoch, @@ -270,14 +264,13 @@ impl AutoPruningContainer { /// /// Also sets `self.lowest_permissible_epoch` with relation to `current_epoch` and /// `Self::max_capacity`. - pub fn prune(&self, current_epoch: Epoch) { + pub fn prune(&mut self, current_epoch: Epoch) { // Taking advantage of saturating subtraction on `Slot`. let lowest_permissible_epoch = current_epoch - (self.max_capacity().saturating_sub(1)); - *self.lowest_permissible_epoch.write() = lowest_permissible_epoch; + self.lowest_permissible_epoch = lowest_permissible_epoch; self.items - .write() .retain(|epoch, _item| *epoch >= lowest_permissible_epoch); } } @@ -301,7 +294,7 @@ mod tests { a } - fn single_epoch_test(store: &$type, epoch: Epoch) { + fn single_epoch_test(store: &mut $type, epoch: Epoch) { let attesters = [0, 1, 2, 3, 5, 6, 7, 18, 22]; let a = &get_attestation(epoch); @@ -334,26 +327,22 @@ mod tests { #[test] fn single_epoch() { - let store = $type::default(); + let mut store = $type::default(); - single_epoch_test(&store, Epoch::new(0)); + single_epoch_test(&mut store, Epoch::new(0)); - assert_eq!( - store.items.read().len(), - 1, - "should have a single bitfield stored" - ); + assert_eq!(store.items.len(), 1, "should have a single bitfield stored"); } #[test] fn mulitple_contiguous_epochs() { - let store = $type::default(); + let mut store = $type::default(); let max_cap = store.max_capacity(); for i in 0..max_cap * 3 { let epoch = Epoch::new(i); - single_epoch_test(&store, epoch); + single_epoch_test(&mut store, epoch); /* * Ensure that the number of sets is correct. @@ -361,14 +350,14 @@ mod tests { if i < max_cap { assert_eq!( - store.items.read().len(), + store.items.len(), i as usize + 1, "should have a {} items stored", i + 1 ); } else { assert_eq!( - store.items.read().len(), + store.items.len(), max_cap as usize, "should have max_capacity items stored" ); @@ -380,7 +369,6 @@ mod tests { let mut store_epochs = store .items - .read() .iter() .map(|(epoch, _set)| *epoch) .collect::>(); @@ -402,7 +390,7 @@ mod tests { #[test] fn mulitple_non_contiguous_epochs() { - let store = $type::default(); + let mut store = $type::default(); let max_cap = store.max_capacity(); let to_skip = vec![1_u64, 3, 4, 5]; @@ -418,7 +406,7 @@ mod tests { let epoch = Epoch::from(i); - single_epoch_test(&store, epoch); + single_epoch_test(&mut store, epoch); /* * Ensure that all the sets have the expected slots @@ -426,7 +414,6 @@ mod tests { let mut store_epochs = store .items - .read() .iter() .map(|(epoch, _)| *epoch) .collect::>(); @@ -438,7 +425,7 @@ mod tests { "store size should not exceed max" ); - let lowest = store.lowest_permissible_epoch.read().as_u64(); + let lowest = store.lowest_permissible_epoch.as_u64(); let highest = epoch.as_u64(); let expected_epochs = (lowest..=highest) .filter(|i| !to_skip.contains(i)) diff --git a/beacon_node/beacon_chain/src/observed_block_producers.rs b/beacon_node/beacon_chain/src/observed_block_producers.rs index b2d281adf94..cb8e0095a57 100644 --- a/beacon_node/beacon_chain/src/observed_block_producers.rs +++ b/beacon_node/beacon_chain/src/observed_block_producers.rs @@ -1,7 +1,6 @@ //! Provides the `ObservedBlockProducers` struct which allows for rejecting gossip blocks from //! validators that have already produced a block. -use parking_lot::RwLock; use std::collections::{HashMap, HashSet}; use std::marker::PhantomData; use types::{BeaconBlock, EthSpec, Slot, Unsigned}; @@ -27,8 +26,8 @@ pub enum Error { /// active_validator_count`, however in reality that is more like `slots_since_finality * /// known_distinct_shufflings` which is much smaller. pub struct ObservedBlockProducers { - finalized_slot: RwLock, - items: RwLock>>, + finalized_slot: Slot, + items: HashMap>, _phantom: PhantomData, } @@ -36,8 +35,8 @@ impl Default for ObservedBlockProducers { /// Instantiates `Self` with `finalized_slot == 0`. fn default() -> Self { Self { - finalized_slot: RwLock::new(Slot::new(0)), - items: RwLock::new(HashMap::new()), + finalized_slot: Slot::new(0), + items: HashMap::new(), _phantom: PhantomData, } } @@ -53,12 +52,11 @@ impl ObservedBlockProducers { /// /// - `block.proposer_index` is greater than `VALIDATOR_REGISTRY_LIMIT`. /// - `block.slot` is equal to or less than the latest pruned `finalized_slot`. - pub fn observe_proposer(&self, block: &BeaconBlock) -> Result { + pub fn observe_proposer(&mut self, block: &BeaconBlock) -> Result { self.sanitize_block(block)?; let did_not_exist = self .items - .write() .entry(block.slot) .or_insert_with(|| HashSet::with_capacity(E::SlotsPerEpoch::to_usize())) .insert(block.proposer_index); @@ -79,7 +77,6 @@ impl ObservedBlockProducers { let exists = self .items - .read() .get(&block.slot) .map_or(false, |set| set.contains(&block.proposer_index)); @@ -92,7 +89,7 @@ impl ObservedBlockProducers { return Err(Error::ValidatorIndexTooHigh(block.proposer_index)); } - let finalized_slot = *self.finalized_slot.read(); + let finalized_slot = self.finalized_slot; if finalized_slot > 0 && block.slot <= finalized_slot { return Err(Error::FinalizedBlock { slot: block.slot, @@ -109,15 +106,13 @@ impl ObservedBlockProducers { /// equal to or less than `finalized_slot`. /// /// No-op if `finalized_slot == 0`. - pub fn prune(&self, finalized_slot: Slot) { + pub fn prune(&mut self, finalized_slot: Slot) { if finalized_slot == 0 { return; } - *self.finalized_slot.write() = finalized_slot; - self.items - .write() - .retain(|slot, _set| *slot > finalized_slot); + self.finalized_slot = finalized_slot; + self.items.retain(|slot, _set| *slot > finalized_slot); } } @@ -137,10 +132,10 @@ mod tests { #[test] fn pruning() { - let cache = ObservedBlockProducers::default(); + let mut cache = ObservedBlockProducers::default(); - assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero"); - assert_eq!(cache.items.read().len(), 0, "no slots should be present"); + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 0, "no slots should be present"); // Slot 0, proposer 0 let block_a = &get_block(0, 0); @@ -155,16 +150,11 @@ mod tests { * Preconditions. */ - assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero"); - assert_eq!( - cache.items.read().len(), - 1, - "only one slot should be present" - ); + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 1, "only one slot should be present"); assert_eq!( cache .items - .read() .get(&Slot::new(0)) .expect("slot zero should be present") .len(), @@ -178,16 +168,11 @@ mod tests { cache.prune(Slot::new(0)); - assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero"); - assert_eq!( - cache.items.read().len(), - 1, - "only one slot should be present" - ); + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 1, "only one slot should be present"); assert_eq!( cache .items - .read() .get(&Slot::new(0)) .expect("slot zero should be present") .len(), @@ -201,11 +186,11 @@ mod tests { cache.prune(E::slots_per_epoch().into()); assert_eq!( - *cache.finalized_slot.read(), + cache.finalized_slot, Slot::from(E::slots_per_epoch()), "finalized slot is updated" ); - assert_eq!(cache.items.read().len(), 0, "no items left"); + assert_eq!(cache.items.len(), 0, "no items left"); /* * Check that we can't insert a finalized block @@ -223,7 +208,7 @@ mod tests { "cant insert finalized block" ); - assert_eq!(cache.items.read().len(), 0, "block was not added"); + assert_eq!(cache.items.len(), 0, "block was not added"); /* * Check that we _can_ insert a non-finalized block @@ -240,15 +225,10 @@ mod tests { "can insert non-finalized block" ); - assert_eq!( - cache.items.read().len(), - 1, - "only one slot should be present" - ); + assert_eq!(cache.items.len(), 1, "only one slot should be present"); assert_eq!( cache .items - .read() .get(&Slot::new(three_epochs)) .expect("the three epochs slot should be present") .len(), @@ -264,20 +244,15 @@ mod tests { cache.prune(two_epochs.into()); assert_eq!( - *cache.finalized_slot.read(), + cache.finalized_slot, Slot::from(two_epochs), "finalized slot is updated" ); - assert_eq!( - cache.items.read().len(), - 1, - "only one slot should be present" - ); + assert_eq!(cache.items.len(), 1, "only one slot should be present"); assert_eq!( cache .items - .read() .get(&Slot::new(three_epochs)) .expect("the three epochs slot should be present") .len(), @@ -288,7 +263,7 @@ mod tests { #[test] fn simple_observations() { - let cache = ObservedBlockProducers::default(); + let mut cache = ObservedBlockProducers::default(); // Slot 0, proposer 0 let block_a = &get_block(0, 0); @@ -314,16 +289,11 @@ mod tests { "observing again indicates true" ); - assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero"); - assert_eq!( - cache.items.read().len(), - 1, - "only one slot should be present" - ); + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 1, "only one slot should be present"); assert_eq!( cache .items - .read() .get(&Slot::new(0)) .expect("slot zero should be present") .len(), @@ -355,12 +325,11 @@ mod tests { "observing slot 1 again indicates true" ); - assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero"); - assert_eq!(cache.items.read().len(), 2, "two slots should be present"); + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 2, "two slots should be present"); assert_eq!( cache .items - .read() .get(&Slot::new(0)) .expect("slot zero should be present") .len(), @@ -370,7 +339,6 @@ mod tests { assert_eq!( cache .items - .read() .get(&Slot::new(1)) .expect("slot zero should be present") .len(), @@ -402,12 +370,11 @@ mod tests { "observing new proposer again indicates true" ); - assert_eq!(*cache.finalized_slot.read(), 0, "finalized slot is zero"); - assert_eq!(cache.items.read().len(), 2, "two slots should be present"); + assert_eq!(cache.finalized_slot, 0, "finalized slot is zero"); + assert_eq!(cache.items.len(), 2, "two slots should be present"); assert_eq!( cache .items - .read() .get(&Slot::new(0)) .expect("slot zero should be present") .len(), @@ -417,7 +384,6 @@ mod tests { assert_eq!( cache .items - .read() .get(&Slot::new(1)) .expect("slot zero should be present") .len(), diff --git a/beacon_node/beacon_chain/src/observed_operations.rs b/beacon_node/beacon_chain/src/observed_operations.rs index 9b8dddf9b76..e16ede5a0d4 100644 --- a/beacon_node/beacon_chain/src/observed_operations.rs +++ b/beacon_node/beacon_chain/src/observed_operations.rs @@ -1,5 +1,4 @@ use derivative::Derivative; -use parking_lot::Mutex; use smallvec::SmallVec; use state_processing::{SigVerifiedOp, VerifyOperation}; use std::collections::HashSet; @@ -25,7 +24,7 @@ pub struct ObservedOperations, E: EthSpec> { /// For attester slashings, this is the set of all validators who would be slashed by /// previously seen attester slashings, i.e. those validators in the intersection of /// `attestation_1.attester_indices` and `attestation_2.attester_indices`. - observed_validator_indices: Mutex>, + observed_validator_indices: HashSet, _phantom: PhantomData<(T, E)>, } @@ -71,12 +70,12 @@ impl ObservableOperation for AttesterSlashing { impl, E: EthSpec> ObservedOperations { pub fn verify_and_observe( - &self, + &mut self, op: T, head_state: &BeaconState, spec: &ChainSpec, ) -> Result, T::Error> { - let mut observed_validator_indices = self.observed_validator_indices.lock(); + let observed_validator_indices = &mut self.observed_validator_indices; let new_validator_indices = op.observed_validators(); // If all of the new validator indices have been previously observed, short-circuit