Skip to content

Commit

Permalink
Ignore seen aggregates (#3439)
Browse files Browse the repository at this point in the history
ethereum/consensus-specs#2225 removed an ignore
rule that would filter out duplicate aggregates from gossip publishing -
however, this causes increased bandwidth and CPU usage as discussed in
ethereum/consensus-specs#2183 - the intent is
to revert the removal and reinstate the rule.

This PR implements ignore filtering which cuts down on CPU usage (fewer
aggregates to validate) and bandwidth usage (less fanout of duplicates)
- as #2225 points out, this may lead to a small increase in IHAVE
messages.
  • Loading branch information
arnetheduck committed Feb 25, 2022
1 parent 1bfbcc4 commit 92e7e28
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 9 deletions.
32 changes: 26 additions & 6 deletions beacon_chain/consensus_object_pools/attestation_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -280,6 +280,12 @@ func updateAggregates(entry: var AttestationEntry) =
inc j
inc i

func covers(entry: AttestationEntry, bits: CommitteeValidatorsBits): bool =
for i in 0..<entry.aggregates.len():
if bits.isSubsetOf(entry.aggregates[i].aggregation_bits):
return true
false

proc addAttestation(entry: var AttestationEntry,
attestation: Attestation,
signature: CookedSig): bool =
Expand All @@ -304,12 +310,8 @@ proc addAttestation(entry: var AttestationEntry,
entry.singles[singleIndex.get()] = signature
else:
# More than one vote in this attestation
for i in 0..<entry.aggregates.len():
if attestation.aggregation_bits.isSubsetOf(entry.aggregates[i].aggregation_bits):
trace "Aggregate already seen",
singles = entry.singles.len(),
aggregates = entry.aggregates.len()
return false
if entry.covers(attestation.aggregation_bits):
return false

# Since we're adding a new aggregate, we can now remove existing
# aggregates that don't add any new votes
Expand Down Expand Up @@ -376,6 +378,24 @@ proc addAttestation*(pool: var AttestationPool,
if not(isNil(pool.onAttestationAdded)):
pool.onAttestationAdded(attestation)

func covers*(
pool: var AttestationPool, data: Attestationdata,
bits: CommitteeValidatorsBits): bool =
## Return true iff the given attestation already is fully covered by one of
## the existing aggregates, making it redundant
## the `var` attestation pool is needed to use `withValue`, else Table becomes
## unusably inefficient
let candidateIdx = pool.candidateIdx(data.slot)
if candidateIdx.isNone:
return false

let attestation_data_root = hash_tree_root(data)
pool.candidates[candidateIdx.get()].withValue(attestation_data_root, entry):
if entry[].covers(bits):
return true

false

proc addForkChoice*(pool: var AttestationPool,
epochRef: EpochRef,
blckRef: BlockRef,
Expand Down
24 changes: 21 additions & 3 deletions beacon_chain/gossip_processing/gossip_validation.nim
Original file line number Diff line number Diff line change
Expand Up @@ -621,9 +621,24 @@ proc validateAggregate*(
return err(v.error)
v.get()

# [REJECT] aggregate_and_proof.selection_proof selects the validator as an
# aggregator for the slot -- i.e. is_aggregator(state, aggregate.data.slot,
# aggregate.data.index, aggregate_and_proof.selection_proof) returns True.
if pool[].covers(aggregate.data, aggregate.aggregation_bits):
# https://github.com/ethereum/consensus-specs/issues/2183 - althoughh this
# check was temporarily removed from the spec, the intent is to reinstate it
# per discussion in the ticket.
#
# [IGNORE] The valid aggregate attestation defined by
# `hash_tree_root(aggregate)` has _not_ already been seen
# (via aggregate gossip, within a verified block, or through the creation of
# an equivalent aggregate locally).

# Our implementation of this check is slightly different in that it doesn't
# consider aggregates from verified blocks - this would take a rather heavy
# index to work correcly under fork conditions - we also check for coverage
# of attestation bits instead of comparing with full root of aggreagte:
# this captures the spirit of the checkk by ignoring aggregates that are
# strict subsets of other, already-seen aggregates.
return errIgnore("Aggregate already covered")

let
epochRef = block:
let tmp = pool.dag.getEpochRef(target.blck, target.slot.epoch, false)
Expand All @@ -641,6 +656,9 @@ proc validateAggregate*(
return checkedReject("Attestation: committee index not within expected range")
idx.get()

# [REJECT] aggregate_and_proof.selection_proof selects the validator as an
# aggregator for the slot -- i.e. is_aggregator(state, aggregate.data.slot,
# aggregate.data.index, aggregate_and_proof.selection_proof) returns True.
if not is_aggregator(
epochRef, slot, committee_index, aggregate_and_proof.selection_proof):
return checkedReject("Aggregate: incorrect aggregator")
Expand Down
4 changes: 4 additions & 0 deletions tests/test_attestation_pool.nim
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,9 @@ suite "Attestation pool processing" & preset():
att0.combine(att2)
att1.combine(att2)

check:
not pool[].covers(att0.data, att0.aggregation_bits)

pool[].addAttestation(
att0, @[bc0[0], bc0[2]], att0.loadSig, att0.data.slot.start_beacon_time)
pool[].addAttestation(
Expand All @@ -214,6 +217,7 @@ suite "Attestation pool processing" & preset():
info, {}).isOk()

check:
pool[].covers(att0.data, att0.aggregation_bits)
pool[].getAttestationsForBlock(state.data, cache).len() == 2
# Can get either aggregate here, random!
pool[].getAggregatedAttestation(1.Slot, 0.CommitteeIndex).isSome()
Expand Down

0 comments on commit 92e7e28

Please sign in to comment.