Skip to content

Commit

Permalink
Push naive attestations into op pool (#1466)
Browse files Browse the repository at this point in the history
## Issue Addressed

NA

## Proposed Changes

- When producing a block, go and ensure every attestation in the naive aggregation pool is included in the operation pool. This should help us increase the number of useful attestations in a block.
- Lift the `RwLock`s inside `NaiveAggregationPool` up into a single high-level lock. There were race conditions in the existing setup and it was hard to reason about.

## Additional Info

NA
  • Loading branch information
paulhauner committed Aug 6, 2020
1 parent ee036cb commit 0b287f6
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 39 deletions.
29 changes: 25 additions & 4 deletions beacon_node/beacon_chain/src/beacon_chain.rs
Expand Up @@ -175,7 +175,7 @@ pub struct BeaconChain<T: BeaconChainTypes> {
///
/// This pool accepts `Attestation` objects that only have one aggregation bit set and provides
/// a method to get an aggregated `Attestation` for some `AttestationData`.
pub naive_aggregation_pool: NaiveAggregationPool<T::EthSpec>,
pub naive_aggregation_pool: RwLock<NaiveAggregationPool<T::EthSpec>>,
/// Contains a store of attestations which have been observed by the beacon chain.
pub observed_attestations: ObservedAttestations<T::EthSpec>,
/// Maintains a record of which validators have been seen to attest in recent epochs.
Expand Down Expand Up @@ -747,7 +747,10 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
&self,
data: &AttestationData,
) -> Result<Option<Attestation<T::EthSpec>>, Error> {
self.naive_aggregation_pool.get(data).map_err(Into::into)
self.naive_aggregation_pool
.read()
.get(data)
.map_err(Into::into)
}

/// Produce an unaggregated `Attestation` that is valid for the given `slot` and `index`.
Expand Down Expand Up @@ -937,7 +940,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {

let attestation = unaggregated_attestation.attestation();

match self.naive_aggregation_pool.insert(attestation) {
match self.naive_aggregation_pool.write().insert(attestation) {
Ok(outcome) => trace!(
self.log,
"Stored unaggregated attestation";
Expand Down Expand Up @@ -1632,6 +1635,24 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
})
};

// Iterate through the naive aggregation pool and ensure all the attestations from there
// are included in the operation pool.
for attestation in self.naive_aggregation_pool.read().iter() {
if let Err(e) = self.op_pool.insert_attestation(
attestation.clone(),
&state.fork,
state.genesis_validators_root,
&self.spec,
) {
// Don't stop block production if there's an error, just create a log.
error!(
self.log,
"Attestation did not transfer to op pool";
"reason" => format!("{:?}", e)
);
}
}

let mut block = SignedBeaconBlock {
message: BeaconBlock {
slot: state.slot,
Expand Down Expand Up @@ -1852,7 +1873,7 @@ impl<T: BeaconChainTypes> BeaconChain<T> {
pub fn per_slot_task(&self) {
trace!(self.log, "Running beacon chain per slot tasks");
if let Some(slot) = self.slot_clock.now() {
self.naive_aggregation_pool.prune(slot);
self.naive_aggregation_pool.write().prune(slot);
}
}

Expand Down
75 changes: 40 additions & 35 deletions beacon_node/beacon_chain/src/naive_aggregation_pool.rs
@@ -1,5 +1,4 @@
use crate::metrics;
use parking_lot::RwLock;
use std::collections::HashMap;
use types::{Attestation, AttestationData, EthSpec, Slot};

Expand Down Expand Up @@ -120,6 +119,11 @@ impl<E: EthSpec> AggregatedAttestationMap<E> {
Ok(self.map.get(data).cloned())
}

/// Iterate all attestations in `self`.
pub fn iter(&self) -> impl Iterator<Item = &Attestation<E>> {
self.map.iter().map(|(_key, attestation)| attestation)
}

pub fn len(&self) -> usize {
self.map.len()
}
Expand Down Expand Up @@ -147,15 +151,15 @@ impl<E: EthSpec> AggregatedAttestationMap<E> {
/// than that will also be refused. Pruning is done automatically based upon the attestations it
/// receives and it can be triggered manually.
pub struct NaiveAggregationPool<E: EthSpec> {
lowest_permissible_slot: RwLock<Slot>,
maps: RwLock<HashMap<Slot, AggregatedAttestationMap<E>>>,
lowest_permissible_slot: Slot,
maps: HashMap<Slot, AggregatedAttestationMap<E>>,
}

impl<E: EthSpec> Default for NaiveAggregationPool<E> {
fn default() -> Self {
Self {
lowest_permissible_slot: RwLock::new(Slot::new(0)),
maps: RwLock::new(HashMap::new()),
lowest_permissible_slot: Slot::new(0),
maps: HashMap::new(),
}
}
}
Expand All @@ -168,10 +172,10 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
///
/// The pool may be pruned if the given `attestation.data` has a slot higher than any
/// previously seen.
pub fn insert(&self, attestation: &Attestation<E>) -> Result<InsertOutcome, Error> {
pub fn insert(&mut self, attestation: &Attestation<E>) -> Result<InsertOutcome, Error> {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_INSERT);
let slot = attestation.data.slot;
let lowest_permissible_slot: Slot = *self.lowest_permissible_slot.read();
let lowest_permissible_slot = self.lowest_permissible_slot;

// Reject any attestations that are too old.
if slot < lowest_permissible_slot {
Expand All @@ -183,16 +187,16 @@ impl<E: EthSpec> NaiveAggregationPool<E> {

let lock_timer =
metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_MAPS_WRITE_LOCK);
let mut maps = self.maps.write();
drop(lock_timer);

let outcome = if let Some(map) = maps.get_mut(&slot) {
let outcome = if let Some(map) = self.maps.get_mut(&slot) {
map.insert(attestation)
} else {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_CREATE_MAP);
// To avoid re-allocations, try and determine a rough initial capacity for the new item
// by obtaining the mean size of all items in earlier epoch.
let (count, sum) = maps
let (count, sum) = self
.maps
.iter()
// Only include epochs that are less than the given slot in the average. This should
// generally avoid including recent epochs that are still "filling up".
Expand All @@ -205,12 +209,11 @@ impl<E: EthSpec> NaiveAggregationPool<E> {

let mut item = AggregatedAttestationMap::new(initial_capacity);
let outcome = item.insert(attestation);
maps.insert(slot, item);
self.maps.insert(slot, item);

outcome
};

drop(maps);
self.prune(slot);

outcome
Expand All @@ -219,47 +222,55 @@ impl<E: EthSpec> NaiveAggregationPool<E> {
/// Returns an aggregated `Attestation` with the given `data`, if any.
pub fn get(&self, data: &AttestationData) -> Result<Option<Attestation<E>>, Error> {
self.maps
.read()
.iter()
.find(|(slot, _map)| **slot == data.slot)
.map(|(_slot, map)| map.get(data))
.unwrap_or_else(|| Ok(None))
}

/// Iterate all attestations in all slots of `self`.
pub fn iter(&self) -> impl Iterator<Item = &Attestation<E>> {
self.maps.iter().map(|(_slot, map)| map.iter()).flatten()
}

/// Removes any attestations with a slot lower than `current_slot` and bars any future
/// attestations with a slot lower than `current_slot - SLOTS_RETAINED`.
pub fn prune(&self, current_slot: Slot) {
pub fn prune(&mut self, current_slot: Slot) {
let _timer = metrics::start_timer(&metrics::ATTESTATION_PROCESSING_AGG_POOL_PRUNE);

// Taking advantage of saturating subtraction on `Slot`.
let lowest_permissible_slot = current_slot - Slot::from(SLOTS_RETAINED);

// No need to prune if the lowest permissible slot has not changed and the queue length is
// less than the maximum
if *self.lowest_permissible_slot.read() == lowest_permissible_slot
&& self.maps.read().len() <= SLOTS_RETAINED
if self.lowest_permissible_slot == lowest_permissible_slot
&& self.maps.len() <= SLOTS_RETAINED
{
return;
}

*self.lowest_permissible_slot.write() = lowest_permissible_slot;
let mut maps = self.maps.write();
self.lowest_permissible_slot = lowest_permissible_slot;

// Remove any maps that are definitely expired.
maps.retain(|slot, _map| *slot >= lowest_permissible_slot);
self.maps
.retain(|slot, _map| *slot >= lowest_permissible_slot);

// If we have too many maps, remove the lowest amount to ensure we only have
// `SLOTS_RETAINED` left.
if maps.len() > SLOTS_RETAINED {
let mut slots = maps.iter().map(|(slot, _map)| *slot).collect::<Vec<_>>();
if self.maps.len() > SLOTS_RETAINED {
let mut slots = self
.maps
.iter()
.map(|(slot, _map)| *slot)
.collect::<Vec<_>>();
// Sort is generally pretty slow, however `SLOTS_RETAINED` is quite low so it should be
// negligible.
slots.sort_unstable();
slots
.into_iter()
.take(maps.len().saturating_sub(SLOTS_RETAINED))
.take(self.maps.len().saturating_sub(SLOTS_RETAINED))
.for_each(|slot| {
maps.remove(&slot);
self.maps.remove(&slot);
})
}
}
Expand Down Expand Up @@ -304,7 +315,7 @@ mod tests {
fn single_attestation() {
let mut a = get_attestation(Slot::new(0));

let pool = NaiveAggregationPool::default();
let mut pool = NaiveAggregationPool::default();

assert_eq!(
pool.insert(&a),
Expand Down Expand Up @@ -352,7 +363,7 @@ mod tests {
sign(&mut a_0, 0, genesis_validators_root);
sign(&mut a_1, 1, genesis_validators_root);

let pool = NaiveAggregationPool::default();
let mut pool = NaiveAggregationPool::default();

assert_eq!(
pool.insert(&a_0),
Expand Down Expand Up @@ -409,7 +420,7 @@ mod tests {
let mut base = get_attestation(Slot::new(0));
sign(&mut base, 0, Hash256::random());

let pool = NaiveAggregationPool::default();
let mut pool = NaiveAggregationPool::default();

for i in 0..SLOTS_RETAINED * 2 {
let slot = Slot::from(i);
Expand All @@ -424,22 +435,16 @@ mod tests {

if i < SLOTS_RETAINED {
let len = i + 1;
assert_eq!(
pool.maps.read().len(),
len,
"the pool should have length {}",
len
);
assert_eq!(pool.maps.len(), len, "the pool should have length {}", len);
} else {
assert_eq!(
pool.maps.read().len(),
pool.maps.len(),
SLOTS_RETAINED,
"the pool should have length SLOTS_RETAINED"
);

let mut pool_slots = pool
.maps
.read()
.iter()
.map(|(slot, _map)| *slot)
.collect::<Vec<_>>();
Expand All @@ -463,7 +468,7 @@ mod tests {
let mut base = get_attestation(Slot::new(0));
sign(&mut base, 0, Hash256::random());

let pool = NaiveAggregationPool::default();
let mut pool = NaiveAggregationPool::default();

for i in 0..=MAX_ATTESTATIONS_PER_SLOT {
let mut a = base.clone();
Expand Down

0 comments on commit 0b287f6

Please sign in to comment.