/
proposer_attestations.go
278 lines (253 loc) · 9.38 KB
/
proposer_attestations.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
package validator
import (
"context"
"sort"
"github.com/pkg/errors"
"github.com/prysmaticlabs/go-bitfield"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/altair"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/blocks"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/core/helpers"
"github.com/prysmaticlabs/prysm/v3/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v3/config/params"
"github.com/prysmaticlabs/prysm/v3/consensus-types/primitives"
ethpb "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/attestation/aggregation"
attaggregation "github.com/prysmaticlabs/prysm/v3/proto/prysm/v1alpha1/attestation/aggregation/attestations"
"github.com/prysmaticlabs/prysm/v3/runtime/version"
"go.opencensus.io/trace"
)
type proposerAtts []*ethpb.Attestation
func (vs *Server) packAttestations(ctx context.Context, latestState state.BeaconState) ([]*ethpb.Attestation, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.packAttestations")
defer span.End()
atts := vs.AttPool.AggregatedAttestations()
atts, err := vs.validateAndDeleteAttsInPool(ctx, latestState, atts)
if err != nil {
return nil, errors.Wrap(err, "could not filter attestations")
}
uAtts, err := vs.AttPool.UnaggregatedAttestations()
if err != nil {
return nil, errors.Wrap(err, "could not get unaggregated attestations")
}
uAtts, err = vs.validateAndDeleteAttsInPool(ctx, latestState, uAtts)
if err != nil {
return nil, errors.Wrap(err, "could not filter attestations")
}
atts = append(atts, uAtts...)
// Remove duplicates from both aggregated/unaggregated attestations. This
// prevents inefficient aggregates being created.
atts, err = proposerAtts(atts).dedup()
if err != nil {
return nil, err
}
attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation, len(atts))
for _, att := range atts {
attDataRoot, err := att.Data.HashTreeRoot()
if err != nil {
return nil, err
}
attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att)
}
attsForInclusion := proposerAtts(make([]*ethpb.Attestation, 0))
for _, as := range attsByDataRoot {
as, err := attaggregation.Aggregate(as)
if err != nil {
return nil, err
}
attsForInclusion = append(attsForInclusion, as...)
}
deduped, err := attsForInclusion.dedup()
if err != nil {
return nil, err
}
sorted, err := deduped.sortByProfitability()
if err != nil {
return nil, err
}
atts = sorted.limitToMaxAttestations()
return atts, nil
}
// filter separates attestation list into two groups: valid and invalid attestations.
// The first group passes the all the required checks for attestation to be considered for proposing.
// And attestations from the second group should be deleted.
func (a proposerAtts) filter(ctx context.Context, st state.BeaconState) (proposerAtts, proposerAtts) {
validAtts := make([]*ethpb.Attestation, 0, len(a))
invalidAtts := make([]*ethpb.Attestation, 0, len(a))
var attestationProcessor func(context.Context, state.BeaconState, *ethpb.Attestation) (state.BeaconState, error)
if st.Version() == version.Phase0 {
attestationProcessor = blocks.ProcessAttestationNoVerifySignature
} else if st.Version() >= version.Altair {
// Use a wrapper here, as go needs strong typing for the function signature.
attestationProcessor = func(ctx context.Context, st state.BeaconState, attestation *ethpb.Attestation) (state.BeaconState, error) {
totalBalance, err := helpers.TotalActiveBalance(st)
if err != nil {
return nil, err
}
return altair.ProcessAttestationNoVerifySignature(ctx, st, attestation, totalBalance)
}
} else {
// Exit early if there is an unknown state type.
return validAtts, invalidAtts
}
for _, att := range a {
if _, err := attestationProcessor(ctx, st, att); err == nil {
validAtts = append(validAtts, att)
continue
}
invalidAtts = append(invalidAtts, att)
}
return validAtts, invalidAtts
}
// sortByProfitability orders attestations by highest slot and by highest aggregation bit count.
func (a proposerAtts) sortByProfitability() (proposerAtts, error) {
if len(a) < 2 {
return a, nil
}
return a.sortByProfitabilityUsingMaxCover()
}
// sortByProfitabilityUsingMaxCover orders attestations by highest slot and by highest aggregation bit count.
// Duplicate bits are counted only once, using max-cover algorithm.
func (a proposerAtts) sortByProfitabilityUsingMaxCover() (proposerAtts, error) {
// Separate attestations by slot, as slot number takes higher precedence when sorting.
var slots []primitives.Slot
attsBySlot := map[primitives.Slot]proposerAtts{}
for _, att := range a {
if _, ok := attsBySlot[att.Data.Slot]; !ok {
slots = append(slots, att.Data.Slot)
}
attsBySlot[att.Data.Slot] = append(attsBySlot[att.Data.Slot], att)
}
selectAtts := func(atts proposerAtts) (proposerAtts, error) {
if len(atts) < 2 {
return atts, nil
}
candidates := make([]*bitfield.Bitlist64, len(atts))
for i := 0; i < len(atts); i++ {
var err error
candidates[i], err = atts[i].AggregationBits.ToBitlist64()
if err != nil {
return nil, err
}
}
// Add selected candidates on top, those that are not selected - append at bottom.
selectedKeys, _, err := aggregation.MaxCover(candidates, len(candidates), true /* allowOverlaps */)
if err == nil {
// Pick selected attestations first, leftover attestations will be appended at the end.
// Both lists will be sorted by number of bits set.
selectedAtts := make(proposerAtts, selectedKeys.Count())
leftoverAtts := make(proposerAtts, selectedKeys.Not().Count())
for i, key := range selectedKeys.BitIndices() {
selectedAtts[i] = atts[key]
}
for i, key := range selectedKeys.Not().BitIndices() {
leftoverAtts[i] = atts[key]
}
sort.Slice(selectedAtts, func(i, j int) bool {
return selectedAtts[i].AggregationBits.Count() > selectedAtts[j].AggregationBits.Count()
})
sort.Slice(leftoverAtts, func(i, j int) bool {
return leftoverAtts[i].AggregationBits.Count() > leftoverAtts[j].AggregationBits.Count()
})
return append(selectedAtts, leftoverAtts...), nil
}
return atts, nil
}
// Select attestations. Slots are sorted from higher to lower at this point. Within slots attestations
// are sorted to maximize profitability (greedily selected, with previous attestations' bits
// evaluated before including any new attestation).
var sortedAtts proposerAtts
sort.Slice(slots, func(i, j int) bool {
return slots[i] > slots[j]
})
for _, slot := range slots {
selected, err := selectAtts(attsBySlot[slot])
if err != nil {
return nil, err
}
sortedAtts = append(sortedAtts, selected...)
}
return sortedAtts, nil
}
// limitToMaxAttestations limits attestations to maximum attestations per block.
func (a proposerAtts) limitToMaxAttestations() proposerAtts {
if uint64(len(a)) > params.BeaconConfig().MaxAttestations {
return a[:params.BeaconConfig().MaxAttestations]
}
return a
}
// dedup removes duplicate attestations (ones with the same bits set on).
// Important: not only exact duplicates are removed, but proper subsets are removed too
// (their known bits are redundant and are already contained in their supersets).
func (a proposerAtts) dedup() (proposerAtts, error) {
if len(a) < 2 {
return a, nil
}
attsByDataRoot := make(map[[32]byte][]*ethpb.Attestation, len(a))
for _, att := range a {
attDataRoot, err := att.Data.HashTreeRoot()
if err != nil {
continue
}
attsByDataRoot[attDataRoot] = append(attsByDataRoot[attDataRoot], att)
}
uniqAtts := make([]*ethpb.Attestation, 0, len(a))
for _, atts := range attsByDataRoot {
for i := 0; i < len(atts); i++ {
a := atts[i]
for j := i + 1; j < len(atts); j++ {
b := atts[j]
if c, err := a.AggregationBits.Contains(b.AggregationBits); err != nil {
return nil, err
} else if c {
// a contains b, b is redundant.
atts[j] = atts[len(atts)-1]
atts[len(atts)-1] = nil
atts = atts[:len(atts)-1]
j--
} else if c, err := b.AggregationBits.Contains(a.AggregationBits); err != nil {
return nil, err
} else if c {
// b contains a, a is redundant.
atts[i] = atts[len(atts)-1]
atts[len(atts)-1] = nil
atts = atts[:len(atts)-1]
i--
break
}
}
}
uniqAtts = append(uniqAtts, atts...)
}
return uniqAtts, nil
}
// This filters the input attestations to return a list of valid attestations to be packaged inside a beacon block.
func (vs *Server) validateAndDeleteAttsInPool(ctx context.Context, st state.BeaconState, atts []*ethpb.Attestation) ([]*ethpb.Attestation, error) {
ctx, span := trace.StartSpan(ctx, "ProposerServer.validateAndDeleteAttsInPool")
defer span.End()
validAtts, invalidAtts := proposerAtts(atts).filter(ctx, st)
if err := vs.deleteAttsInPool(ctx, invalidAtts); err != nil {
return nil, err
}
return validAtts, nil
}
// The input attestations are processed and seen by the node, this deletes them from pool
// so proposers don't include them in a block for the future.
func (vs *Server) deleteAttsInPool(ctx context.Context, atts []*ethpb.Attestation) error {
ctx, span := trace.StartSpan(ctx, "ProposerServer.deleteAttsInPool")
defer span.End()
for _, att := range atts {
if ctx.Err() != nil {
return ctx.Err()
}
if helpers.IsAggregated(att) {
if err := vs.AttPool.DeleteAggregatedAttestation(att); err != nil {
return err
}
} else {
if err := vs.AttPool.DeleteUnaggregatedAttestation(att); err != nil {
return err
}
}
}
return nil
}