/
prepare_forkchoice.go
154 lines (139 loc) · 4.58 KB
/
prepare_forkchoice.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
package attestations
import (
"bytes"
"context"
"errors"
"time"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v5/config/features"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/crypto/hash"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
attaggregation "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1/attestation/aggregation/attestations"
"github.com/prysmaticlabs/prysm/v5/time/slots"
"go.opencensus.io/trace"
)
// This prepares fork choice attestations by running batchForkChoiceAtts
// every prepareForkChoiceAttsPeriod.
func (s *Service) prepareForkChoiceAtts() {
intervals := features.Get().AggregateIntervals
slotDuration := time.Duration(params.BeaconConfig().SecondsPerSlot) * time.Second
// Adjust intervals for networks with a lower slot duration (Hive, e2e, etc)
for {
if intervals[len(intervals)-1] >= slotDuration {
for i, offset := range intervals {
intervals[i] = offset / 2
}
} else {
break
}
}
ticker := slots.NewSlotTickerWithIntervals(time.Unix(int64(s.genesisTime), 0), intervals[:])
for {
select {
case slotInterval := <-ticker.C():
t := time.Now()
if err := s.batchForkChoiceAtts(s.ctx); err != nil {
log.WithError(err).Error("Could not prepare attestations for fork choice")
}
switch slotInterval.Interval {
case 0:
duration := time.Since(t)
log.WithField("duration", duration).Debug("Aggregated unaggregated attestations")
batchForkChoiceAttsT1.Observe(float64(duration.Milliseconds()))
case 1:
batchForkChoiceAttsT2.Observe(float64(time.Since(t).Milliseconds()))
}
case <-s.ctx.Done():
log.Debug("Context closed, exiting routine")
return
}
}
}
// This gets the attestations from the unaggregated, aggregated and block
// pool. Then finds the common data, aggregate and batch them for fork choice.
// The resulting attestations are saved in the fork choice pool.
func (s *Service) batchForkChoiceAtts(ctx context.Context) error {
ctx, span := trace.StartSpan(ctx, "Operations.attestations.batchForkChoiceAtts")
defer span.End()
if err := s.cfg.Pool.AggregateUnaggregatedAttestations(ctx); err != nil {
return err
}
atts := append(s.cfg.Pool.AggregatedAttestations(), s.cfg.Pool.BlockAttestations()...)
atts = append(atts, s.cfg.Pool.ForkchoiceAttestations()...)
attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation, len(atts))
// Consolidate attestations by aggregating them by similar data root.
for _, att := range atts {
seen, err := s.seen(att)
if err != nil {
return err
}
if seen {
continue
}
attDataRoot, err := att.Data.HashTreeRoot()
if err != nil {
return err
}
attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att)
}
for _, atts := range attsByDataRoot {
if err := s.aggregateAndSaveForkChoiceAtts(atts); err != nil {
return err
}
}
for _, a := range s.cfg.Pool.BlockAttestations() {
if err := s.cfg.Pool.DeleteBlockAttestation(a); err != nil {
return err
}
}
return nil
}
// This aggregates a list of attestations using the aggregation algorithm defined in AggregateAttestations
// and saves the attestations for fork choice.
func (s *Service) aggregateAndSaveForkChoiceAtts(atts []*ethpb.Attestation) error {
clonedAtts := make([]*ethpb.Attestation, len(atts))
for i, a := range atts {
clonedAtts[i] = ethpb.CopyAttestation(a)
}
aggregatedAtts, err := attaggregation.Aggregate(clonedAtts)
if err != nil {
return err
}
return s.cfg.Pool.SaveForkchoiceAttestations(aggregatedAtts)
}
// This checks if the attestation has previously been aggregated for fork choice
// return true if yes, false if no.
func (s *Service) seen(att *ethpb.Attestation) (bool, error) {
attRoot, err := hash.Proto(att.Data)
if err != nil {
return false, err
}
incomingBits := att.AggregationBits
savedBits, ok := s.forkChoiceProcessedRoots.Get(attRoot)
if ok {
savedBitlist, ok := savedBits.(bitfield.Bitlist)
if !ok {
return false, errors.New("not a bit field")
}
if savedBitlist.Len() == incomingBits.Len() {
// Returns true if the node has seen all the bits in the new bit field of the incoming attestation.
if bytes.Equal(savedBitlist, incomingBits) {
return true, nil
}
if c, err := savedBitlist.Contains(incomingBits); err != nil {
return false, err
} else if c {
return true, nil
}
var err error
// Update the bit fields by Or'ing them with the new ones.
incomingBits, err = incomingBits.Or(savedBitlist)
if err != nil {
return false, err
}
}
}
s.forkChoiceProcessedRoots.Add(attRoot, incomingBits)
return false, nil
}