diff --git a/beacon-chain/operations/attestations/aggregate.go b/beacon-chain/operations/attestations/aggregate.go index 006716a5ba5..5f37d59216a 100644 --- a/beacon-chain/operations/attestations/aggregate.go +++ b/beacon-chain/operations/attestations/aggregate.go @@ -11,9 +11,9 @@ import ( "go.opencensus.io/trace" ) -// Define time to aggregate the unaggregated attestations at 3 times per slot, this gives +// Define time to aggregate the unaggregated attestations at 2 times per slot, this gives // enough confidence all the unaggregated attestations will be aggregated as aggregator requests. -var timeToAggregate = time.Duration(params.BeaconConfig().SecondsPerSlot/3) * time.Second +var timeToAggregate = time.Duration(params.BeaconConfig().SecondsPerSlot/2) * time.Second // This kicks off a routine to aggregate the unaggregated attestations from pool. func (s *Service) aggregateRoutine() { diff --git a/beacon-chain/operations/attestations/kv/aggregated.go b/beacon-chain/operations/attestations/kv/aggregated.go index 9db6ea49c8d..e1cf0c9998f 100644 --- a/beacon-chain/operations/attestations/kv/aggregated.go +++ b/beacon-chain/operations/attestations/kv/aggregated.go @@ -1,6 +1,8 @@ package kv import ( + "time" + "github.com/patrickmn/go-cache" "github.com/pkg/errors" ethpb "github.com/prysmaticlabs/ethereumapis/eth/v1alpha1" @@ -18,15 +20,18 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error { return errors.Wrap(err, "could not tree hash attestation") } - var atts []*ethpb.Attestation - d, ok := p.aggregatedAtt.Get(string(r[:])) + d, expTime, ok := p.aggregatedAtt.GetWithExpiration(string(r[:])) + // If we have not seen the attestation data before, store in in cache with + // the default expiration time out. if !ok { - atts = make([]*ethpb.Attestation, 0) - } else { - atts, ok = d.([]*ethpb.Attestation) - if !ok { - return errors.New("cached value is not of type []*ethpb.Attestation") - } + atts := []*ethpb.Attestation{att} + p.aggregatedAtt.Set(string(r[:]), atts, cache.DefaultExpiration) + return nil + } + + atts, ok := d.([]*ethpb.Attestation) + if !ok { + return errors.New("cached value is not of type []*ethpb.Attestation") } atts, err = helpers.AggregateAttestations(append(atts, att)) @@ -34,9 +39,14 @@ func (p *AttCaches) SaveAggregatedAttestation(att *ethpb.Attestation) error { return err } - // DefaultExpiration is set to what was given to New(). In this case - // it's one epoch. - p.aggregatedAtt.Set(string(r[:]), atts, cache.DefaultExpiration) + // Delete attestation if the current time has passed the expiration time. + if time.Now().Unix() >= expTime.Unix() { + p.aggregatedAtt.Delete(string(r[:])) + return nil + } + // Reset expiration time given how much time has passed. + expDuration := time.Duration(expTime.Unix() - time.Now().Unix()) + p.aggregatedAtt.Set(string(r[:]), atts, expDuration*time.Second) return nil } @@ -53,6 +63,9 @@ func (p *AttCaches) SaveAggregatedAttestations(atts []*ethpb.Attestation) error // AggregatedAttestations returns the aggregated attestations in cache. func (p *AttCaches) AggregatedAttestations() []*ethpb.Attestation { + // Delete all expired aggregated attestations before returning them. + p.aggregatedAtt.DeleteExpired() + atts := make([]*ethpb.Attestation, 0, p.aggregatedAtt.ItemCount()) for s, i := range p.aggregatedAtt.Items() { // Type assertion for the worst case. This shouldn't happen. @@ -70,6 +83,9 @@ func (p *AttCaches) AggregatedAttestations() []*ethpb.Attestation { // AggregatedAttestationsBySlotIndex returns the aggregated attestations in cache, // filtered by committee index and slot. func (p *AttCaches) AggregatedAttestationsBySlotIndex(slot uint64, committeeIndex uint64) []*ethpb.Attestation { + // Delete all expired aggregated attestations before returning them. + p.aggregatedAtt.DeleteExpired() + atts := make([]*ethpb.Attestation, 0, p.aggregatedAtt.ItemCount()) for s, i := range p.aggregatedAtt.Items() { @@ -97,7 +113,7 @@ func (p *AttCaches) DeleteAggregatedAttestation(att *ethpb.Attestation) error { if err != nil { return errors.Wrap(err, "could not tree hash attestation data") } - a, ok := p.aggregatedAtt.Get(string(r[:])) + a, expTime, ok := p.aggregatedAtt.GetWithExpiration(string(r[:])) if !ok { return nil } @@ -114,7 +130,14 @@ func (p *AttCaches) DeleteAggregatedAttestation(att *ethpb.Attestation) error { if len(filtered) == 0 { p.aggregatedAtt.Delete(string(r[:])) } else { - p.aggregatedAtt.Set(string(r[:]), filtered, cache.DefaultExpiration) + // Delete attestation if the current time has passed the expiration time. + if time.Now().Unix() >= expTime.Unix() { + p.aggregatedAtt.Delete(string(r[:])) + return nil + } + // Reset expiration time given how much time has passed. + expDuration := time.Duration(expTime.Unix() - time.Now().Unix()) + p.aggregatedAtt.Set(string(r[:]), filtered, expDuration*time.Second) } return nil diff --git a/beacon-chain/operations/attestations/kv/aggregated_test.go b/beacon-chain/operations/attestations/kv/aggregated_test.go index fed17010754..8edba8611e2 100644 --- a/beacon-chain/operations/attestations/kv/aggregated_test.go +++ b/beacon-chain/operations/attestations/kv/aggregated_test.go @@ -51,6 +51,64 @@ func TestKV_Aggregated_CanSaveRetrieve(t *testing.T) { } } +func TestKV_Aggregated_SaveAndVerifyExpireTime(t *testing.T) { + cache := NewAttCaches() + + d := ðpb.AttestationData{Slot: 1} + att1 := ðpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11100}} + att2 := ðpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b10110}} + att3 := ðpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11011}} + + r, err := ssz.HashTreeRoot(d) + if err != nil { + t.Fatal(err) + } + + if err := cache.SaveAggregatedAttestation(att1); err != nil { + t.Fatal(err) + } + a, expTime, ok := cache.aggregatedAtt.GetWithExpiration(string(r[:])) + if !ok { + t.Fatal("Did not save attestations") + } + if len(a.([]*ethpb.Attestation)) != 1 { + t.Fatal("Did not save attestations") + } + + // Let time pass by one second to test expiration time. + time.Sleep(1 * time.Second) + // Save attestation 2 too the pool, the expiration time should not change. + if err := cache.SaveAggregatedAttestation(att2); err != nil { + t.Fatal(err) + } + newA, newExpTime, ok := cache.aggregatedAtt.GetWithExpiration(string(r[:])) + if !ok { + t.Fatal("Did not save attestations") + } + if len(newA.([]*ethpb.Attestation)) != 2 { + t.Fatal("Did not delete attestations") + } + + if expTime.Unix() != newExpTime.Unix() { + t.Error("Expiration time should not change") + } + + // Let time pass by another second to test expiration time. + time.Sleep(1 * time.Second) + // Save attestation 3 too the pool, the expiration time should not change. + if err := cache.SaveAggregatedAttestation(att3); err != nil { + t.Fatal(err) + } + newA, newExpTime, _ = cache.aggregatedAtt.GetWithExpiration(string(r[:])) + if len(newA.([]*ethpb.Attestation)) != 3 { + t.Fatal("Did not delete attestations") + } + + if expTime.Unix() != newExpTime.Unix() { + t.Error("Expiration time should not change") + } +} + func TestKV_Aggregated_CanDelete(t *testing.T) { cache := NewAttCaches() @@ -80,6 +138,63 @@ func TestKV_Aggregated_CanDelete(t *testing.T) { } } +func TestKV_Aggregated_DeleteAndVerifyExpireTime(t *testing.T) { + cache := NewAttCaches() + + d := ðpb.AttestationData{Slot: 1} + att1 := ðpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11100}} + att2 := ðpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b10110}} + att3 := ðpb.Attestation{Data: d, AggregationBits: bitfield.Bitlist{0b11011}} + atts := []*ethpb.Attestation{att1, att2, att3} + for _, att := range atts { + if err := cache.SaveAggregatedAttestation(att); err != nil { + t.Fatal(err) + } + } + r, err := ssz.HashTreeRoot(d) + if err != nil { + t.Fatal(err) + } + + a, expTime, ok := cache.aggregatedAtt.GetWithExpiration(string(r[:])) + if !ok { + t.Fatal("Did not save attestations") + } + if len(a.([]*ethpb.Attestation)) != 3 { + t.Fatal("Did not save attestations") + } + + // Let time pass by one second to test expiration time. + time.Sleep(1 * time.Second) + // Delete attestation 1 from the pool, the expiration time should not change. + if err := cache.DeleteAggregatedAttestation(att1); err != nil { + t.Fatal(err) + } + newA, newExpTime, _ := cache.aggregatedAtt.GetWithExpiration(string(r[:])) + if len(newA.([]*ethpb.Attestation)) != 2 { + t.Fatal("Did not delete attestations") + } + + if expTime.Unix() != newExpTime.Unix() { + t.Error("Expiration time should not change") + } + + // Let time pass by another second to test expiration time. + time.Sleep(1 * time.Second) + // Delete attestation 1 from the pool, the expiration time should not change. + if err := cache.DeleteAggregatedAttestation(att2); err != nil { + t.Fatal(err) + } + newA, newExpTime, _ = cache.aggregatedAtt.GetWithExpiration(string(r[:])) + if len(newA.([]*ethpb.Attestation)) != 1 { + t.Fatal("Did not delete attestations") + } + + if expTime.Unix() != newExpTime.Unix() { + t.Error("Expiration time should not change") + } +} + func TestKV_Aggregated_CheckExpTime(t *testing.T) { cache := NewAttCaches() diff --git a/beacon-chain/operations/attestations/kv/kv.go b/beacon-chain/operations/attestations/kv/kv.go index 362738c61e2..ab9b2698d6d 100644 --- a/beacon-chain/operations/attestations/kv/kv.go +++ b/beacon-chain/operations/attestations/kv/kv.go @@ -23,12 +23,12 @@ func NewAttCaches() *AttCaches { secsInEpoch := time.Duration(params.BeaconConfig().SlotsPerEpoch * params.BeaconConfig().SecondsPerSlot) // Create caches with default expiration time of one epoch and which - // purges expired items every other epoch. + // purges expired items every epoch. pool := &AttCaches{ - unAggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), - aggregatedAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), - forkchoiceAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), - blockAtt: cache.New(secsInEpoch*time.Second, 2*secsInEpoch*time.Second), + unAggregatedAtt: cache.New(secsInEpoch*time.Second, secsInEpoch*time.Second), + aggregatedAtt: cache.New(secsInEpoch*time.Second, secsInEpoch*time.Second), + forkchoiceAtt: cache.New(secsInEpoch*time.Second, secsInEpoch*time.Second), + blockAtt: cache.New(secsInEpoch*time.Second, secsInEpoch*time.Second), } return pool