Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix aggregated attestation pool grows large in size #4932

Merged
merged 19 commits into from Feb 24, 2020
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions beacon-chain/operations/attestations/aggregate.go
Expand Up @@ -11,9 +11,9 @@ import (
"go.opencensus.io/trace"
)

// Define time to aggregate the unaggregated attestations at 3 times per slot, this gives
// Define time to aggregate the unaggregated attestations at 2 times per slot, this gives
// enough confidence all the unaggregated attestations will be aggregated as aggregator requests.
var timeToAggregate = time.Duration(params.BeaconConfig().SecondsPerSlot/3) * time.Second
var timeToAggregate = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second

// This kicks off a routine to aggregate the unaggregated attestations from pool.
func (s *Service) aggregateRoutine() {
Expand All @@ -24,8 +24,7 @@ func (s *Service) aggregateRoutine() {
case <-s.ctx.Done():
return
case <-ticker.C:
attsToBeAggregated := append(s.pool.UnaggregatedAttestations(), s.pool.AggregatedAttestations()...)
if err := s.aggregateAttestations(ctx, attsToBeAggregated); err != nil {
if err := s.aggregateAttestations(ctx, s.pool.UnaggregatedAttestations()); err != nil {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why wouldn't we use the aggregated ones too?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed this a problem. Reverted

log.WithError(err).Error("Could not aggregate attestation")
}

Expand Down
43 changes: 30 additions & 13 deletions beacon-chain/operations/attestations/kv/aggregated.go
@@ -1,6 +1,8 @@
package kv

import (
"time"

"github.com/patrickmn/go-cache"
"github.com/pkg/errors"
ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1"
Expand All @@ -18,25 +20,33 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error {
return errors.Wrap(err, "could not tree hash attestation")
}

var atts []*ethpb.Attestation
d, ok := p.aggregatedAtt.Get(string(r[:]))
d, expTime, ok := p.aggregatedAtt.GetWithExpiration(string(r[:]))
// If we have not seen the attestation data before, store in in cache with
// the default expiration time out.
if !ok {
atts = make([]*ethpb.Attestation, 0)
} else {
atts, ok = d.([]*ethpb.Attestation)
if !ok {
return errors.New("cached value is not of type []*ethpb.Attestation")
}
atts := []*ethpb.Attestation{att}
p.aggregatedAtt.Set(string(r[:]), atts, cache.DefaultExpiration)
return nil
}

atts, ok := d.([]*ethpb.Attestation)
if !ok {
return errors.New("cached value is not of type []*ethpb.Attestation")
}

atts, err = helpers.AggregateAttestations(append(atts, att))
if err != nil {
return err
}

// DefaultExpiration is set to what was given to New(). In this case
// it's one epoch.
p.aggregatedAtt.Set(string(r[:]), atts, cache.DefaultExpiration)
// Delete attestation if the current time has passed the expiration time.
if time.Now().Unix() >= expTime.Unix() {
p.aggregatedAtt.Delete(string(r[:]))
return nil
}
// Reset expiration time given how much time has passed.
expDuration := time.Duration(expTime.Unix() - time.Now().Unix())
p.aggregatedAtt.Set(string(r[:]), atts, expDuration*time.Second)

return nil
}
Expand Down Expand Up @@ -97,7 +107,7 @@ func (p *AttCaches) DeleteAggregatedAttestation(att *ethpb.Attestation) error {
if err != nil {
return errors.Wrap(err, "could not tree hash attestation data")
}
a, ok := p.aggregatedAtt.Get(string(r[:]))
a, expTime, ok := p.aggregatedAtt.GetWithExpiration(string(r[:]))
if !ok {
return nil
}
Expand All @@ -114,7 +124,14 @@ func (p *AttCaches) DeleteAggregatedAttestation(att *ethpb.Attestation) error {
if len(filtered) == 0 {
p.aggregatedAtt.Delete(string(r[:]))
} else {
p.aggregatedAtt.Set(string(r[:]), filtered, cache.DefaultExpiration)
// Delete attestation if the current time has passed the expiration time.
if time.Now().Unix() >= expTime.Unix() {
p.aggregatedAtt.Delete(string(r[:]))
return nil
}
// Reset expiration time given how much time has passed.
expDuration := time.Duration(expTime.Unix() - time.Now().Unix())
p.aggregatedAtt.Set(string(r[:]), filtered, expDuration*time.Second)
}

return nil
Expand Down
115 changes: 115 additions & 0 deletions beacon-chain/operations/attestations/kv/aggregated_test.go
Expand Up @@ -51,6 +51,64 @@ func TestKV_Aggregated_CanSaveRetrieve(t *testing.T) {
}
}

func TestKV_Aggregated_SaveAndVerifyExpireTime(t *testing.T) {
cache := NewAttCaches()

d := &ethpb.AttestationData{Slot: 1}
att1 := &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11100}}
att2 := &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b10110}}
att3 := &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11011}}

r, err := ssz.HashTreeRoot(d)
if err != nil {
t.Fatal(err)
}

if err := cache.SaveAggregatedAttestation(att1); err != nil {
t.Fatal(err)
}
a, expTime, ok := cache.aggregatedAtt.GetWithExpiration(string(r[:]))
if !ok {
t.Fatal("Did not save attestations")
}
if len(a.([]*ethpb.Attestation)) != 1 {
t.Fatal("Did not save attestations")
}

// Let time pass by one second to test expiration time.
time.Sleep(1 * time.Second)
// Save attestation 2 too the pool, the expiration time should not change.
if err := cache.SaveAggregatedAttestation(att2); err != nil {
t.Fatal(err)
}
newA, newExpTime, ok := cache.aggregatedAtt.GetWithExpiration(string(r[:]))
if !ok {
t.Fatal("Did not save attestations")
}
if len(newA.([]*ethpb.Attestation)) != 2 {
t.Fatal("Did not delete attestations")
}

if expTime.Unix() != newExpTime.Unix() {
t.Error("Expiration time should not change")
}

// Let time pass by another second to test expiration time.
time.Sleep(1 * time.Second)
// Save attestation 3 too the pool, the expiration time should not change.
if err := cache.SaveAggregatedAttestation(att3); err != nil {
t.Fatal(err)
}
newA, newExpTime, _ = cache.aggregatedAtt.GetWithExpiration(string(r[:]))
if len(newA.([]*ethpb.Attestation)) != 3 {
t.Fatal("Did not delete attestations")
}

if expTime.Unix() != newExpTime.Unix() {
t.Error("Expiration time should not change")
}
}

func TestKV_Aggregated_CanDelete(t *testing.T) {
cache := NewAttCaches()

Expand Down Expand Up @@ -80,6 +138,63 @@ func TestKV_Aggregated_CanDelete(t *testing.T) {
}
}

func TestKV_Aggregated_DeleteAndVerifyExpireTime(t *testing.T) {
cache := NewAttCaches()

d := &ethpb.AttestationData{Slot: 1}
att1 := &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11100}}
att2 := &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b10110}}
att3 := &ethpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11011}}
atts := []*ethpb.Attestation{att1, att2, att3}
for _, att := range atts {
if err := cache.SaveAggregatedAttestation(att); err != nil {
t.Fatal(err)
}
}
r, err := ssz.HashTreeRoot(d)
if err != nil {
t.Fatal(err)
}

a, expTime, ok := cache.aggregatedAtt.GetWithExpiration(string(r[:]))
if !ok {
t.Fatal("Did not save attestations")
}
if len(a.([]*ethpb.Attestation)) != 3 {
t.Fatal("Did not save attestations")
}

// Let time pass by one second to test expiration time.
time.Sleep(1 * time.Second)
// Delete attestation 1 from the pool, the expiration time should not change.
if err := cache.DeleteAggregatedAttestation(att1); err != nil {
t.Fatal(err)
}
newA, newExpTime, _ := cache.aggregatedAtt.GetWithExpiration(string(r[:]))
if len(newA.([]*ethpb.Attestation)) != 2 {
t.Fatal("Did not delete attestations")
}

if expTime.Unix() != newExpTime.Unix() {
t.Error("Expiration time should not change")
}

// Let time pass by another second to test expiration time.
time.Sleep(1 * time.Second)
// Delete attestation 1 from the pool, the expiration time should not change.
if err := cache.DeleteAggregatedAttestation(att2); err != nil {
t.Fatal(err)
}
newA, newExpTime, _ = cache.aggregatedAtt.GetWithExpiration(string(r[:]))
if len(newA.([]*ethpb.Attestation)) != 1 {
t.Fatal("Did not delete attestations")
}

if expTime.Unix() != newExpTime.Unix() {
t.Error("Expiration time should not change")
}
}

func TestKV_Aggregated_CheckExpTime(t *testing.T) {
cache := NewAttCaches()

Expand Down
10 changes: 5 additions & 5 deletions beacon-chain/operations/attestations/kv/kv.go
Expand Up @@ -23,12 +23,12 @@ func NewAttCaches() *AttCaches {
secsInEpoch := time.Duration(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot)

// Create caches with default expiration time of one epoch and which
// purges expired items every other epoch.
// purges expired items every epoch.
pool := &AttCaches{
unAggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second),
aggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second),
forkchoiceAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second),
blockAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second),
unAggregatedAtt: cache.New(secsInEpoch*time.Second, secsInEpoch*time.Second),
aggregatedAtt: cache.New(secsInEpoch*time.Second, secsInEpoch*time.Second),
forkchoiceAtt: cache.New(secsInEpoch*time.Second, secsInEpoch*time.Second),
blockAtt: cache.New(secsInEpoch*time.Second, secsInEpoch*time.Second),
}

return pool
Expand Down