Skip to content

Commit

Permalink
Update pubsub seen cache to be per epoch instead of per slot (#5698)
Browse files Browse the repository at this point in the history
* hasSeenAggregatorIndexSlot -> hasSeenAggregatorIndexEpoch
* Fix test
* Update subscriber
* setSeenCommitteeIndicesSlot -> setSeenCommitteeIndicesEpoch
* Fix test
* Revert "setSeenCommitteeIndicesSlot -> setSeenCommitteeIndicesEpoch"

This reverts commit bd638ae.
* Fixed unaggregated att seen cache to use per slot
  • Loading branch information
terencechain committed Apr 30, 2020
1 parent cc07494 commit fdad7e6
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 11 deletions.
2 changes: 1 addition & 1 deletion beacon-chain/sync/subscriber_beacon_aggregate_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func (r *Service) beaconAggregateProofSubscriber(ctx context.Context, msg proto.
if a.Message.Aggregate == nil || a.Message.Aggregate.Data == nil {
return errors.New("nil aggregate")
}
r.setAggregatorIndexSlotSeen(a.Message.Aggregate.Data.Slot, a.Message.AggregatorIndex)
r.setAggregatorIndexEpochSeen(a.Message.Aggregate.Data.Target.Epoch, a.Message.AggregatorIndex)

return r.attPool.SaveAggregatedAttestation(a.Message.Aggregate)
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestBeaconAggregateProofSubscriber_CanSave(t *testing.T) {
seenAttestationCache: c,
}

a := &ethpb.SignedAggregateAttestationAndProof{Message: &ethpb.AggregateAttestationAndProof{Aggregate: &ethpb.Attestation{Data: &ethpb.AttestationData{}, AggregationBits: bitfield.Bitlist{0x07}}, AggregatorIndex: 100}}
a := &ethpb.SignedAggregateAttestationAndProof{Message: &ethpb.AggregateAttestationAndProof{Aggregate: &ethpb.Attestation{Data: &ethpb.AttestationData{Target: &ethpb.Checkpoint{}}, AggregationBits: bitfield.Bitlist{0x07}}, AggregatorIndex: 100}}
if err := r.beaconAggregateProofSubscriber(context.Background(), a); err != nil {
t.Fatal(err)
}
Expand Down
16 changes: 8 additions & 8 deletions beacon-chain/sync/validate_aggregate_proof.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
return false
}
// Verify this is the first aggregate received from the aggregator with index and slot.
if r.hasSeenAggregatorIndexSlot(m.Message.Aggregate.Data.Slot, m.Message.AggregatorIndex) {
if r.hasSeenAggregatorIndexEpoch(m.Message.Aggregate.Data.Target.Epoch, m.Message.AggregatorIndex) {
return false
}

Expand All @@ -74,7 +74,7 @@ func (r *Service) validateAggregateAndProof(ctx context.Context, pid peer.ID, ms
return false
}

r.setAggregatorIndexSlotSeen(m.Message.Aggregate.Data.Slot, m.Message.AggregatorIndex)
r.setAggregatorIndexEpochSeen(m.Message.Aggregate.Data.Target.Epoch, m.Message.AggregatorIndex)

msg.ValidatorData = m

Expand Down Expand Up @@ -150,20 +150,20 @@ func (r *Service) validateBlockInAttestation(ctx context.Context, s *ethpb.Signe
return true
}

// Returns true if the node has received aggregate for the aggregator with index and slot.
func (r *Service) hasSeenAggregatorIndexSlot(slot uint64, aggregatorIndex uint64) bool {
// Returns true if the node has received aggregate for the aggregator with index and target epoch.
func (r *Service) hasSeenAggregatorIndexEpoch(epoch uint64, aggregatorIndex uint64) bool {
r.seenAttestationLock.RLock()
defer r.seenAttestationLock.RUnlock()
b := append(bytesutil.Bytes32(slot), bytesutil.Bytes32(aggregatorIndex)...)
b := append(bytesutil.Bytes32(epoch), bytesutil.Bytes32(aggregatorIndex)...)
_, seen := r.seenAttestationCache.Get(string(b))
return seen
}

// Set aggregate's aggregator index slot as seen.
func (r *Service) setAggregatorIndexSlotSeen(slot uint64, aggregatorIndex uint64) {
// Set aggregate's aggregator index target epoch as seen.
func (r *Service) setAggregatorIndexEpochSeen(epoch uint64, aggregatorIndex uint64) {
r.seenAttestationLock.Lock()
defer r.seenAttestationLock.Unlock()
b := append(bytesutil.Bytes32(slot), bytesutil.Bytes32(aggregatorIndex)...)
b := append(bytesutil.Bytes32(epoch), bytesutil.Bytes32(aggregatorIndex)...)
r.seenAttestationCache.Add(string(b), true)
}

Expand Down
18 changes: 17 additions & 1 deletion beacon-chain/sync/validate_aggregate_proof_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ func TestValidateAggregateAndProofWithNewStateMgmt_CanValidate(t *testing.T) {
}
}

func TestVerifyIndexInCommittee_SeenAggregatorSlot(t *testing.T) {
func TestVerifyIndexInCommittee_SeenAggregatorEpoch(t *testing.T) {
db := dbtest.SetupDB(t)
defer dbtest.TeardownDB(t, db)
p := p2ptest.NewTestP2P(t)
Expand Down Expand Up @@ -590,6 +590,22 @@ func TestVerifyIndexInCommittee_SeenAggregatorSlot(t *testing.T) {
if !r.validateAggregateAndProof(context.Background(), "", msg) {
t.Fatal("Validated status is false")
}

// Should fail with another attestation in the same epoch.
signedAggregateAndProof.Message.Aggregate.Data.Slot++
buf = new(bytes.Buffer)
if _, err := p.Encoding().Encode(buf, signedAggregateAndProof); err != nil {
t.Fatal(err)
}
msg = &pubsub.Message{
Message: &pubsubpb.Message{
Data: buf.Bytes(),
TopicIDs: []string{
p2p.GossipTypeMapping[reflect.TypeOf(signedAggregateAndProof)],
},
},
}

time.Sleep(10 * time.Millisecond) // Wait for cached value to pass through buffers.
if r.validateAggregateAndProof(context.Background(), "", msg) {
t.Fatal("Validated status is true")
Expand Down

0 comments on commit fdad7e6

Please sign in to comment.