-
Notifications
You must be signed in to change notification settings - Fork 5
/
handlers_pool.go
300 lines (272 loc) · 10.3 KB
/
handlers_pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
package beacon
import (
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strconv"
"strings"
"github.com/theQRL/qrysm/v4/beacon-chain/core/blocks"
"github.com/theQRL/qrysm/v4/beacon-chain/core/feed"
"github.com/theQRL/qrysm/v4/beacon-chain/core/feed/operation"
corehelpers "github.com/theQRL/qrysm/v4/beacon-chain/core/helpers"
"github.com/theQRL/qrysm/v4/beacon-chain/core/transition"
"github.com/theQRL/qrysm/v4/beacon-chain/rpc/core"
"github.com/theQRL/qrysm/v4/beacon-chain/rpc/eth/shared"
consensus_types "github.com/theQRL/qrysm/v4/consensus-types"
"github.com/theQRL/qrysm/v4/consensus-types/primitives"
"github.com/theQRL/qrysm/v4/crypto/bls"
http2 "github.com/theQRL/qrysm/v4/network/http"
ethpbalpha "github.com/theQRL/qrysm/v4/proto/prysm/v1alpha1"
"github.com/theQRL/qrysm/v4/time/slots"
"go.opencensus.io/trace"
)
// ListAttestations retrieves attestations known by the node but
// not necessarily incorporated into any block. Allows filtering by committee index or slot.
func (s *Server) ListAttestations(w http.ResponseWriter, r *http.Request) {
_, span := trace.StartSpan(r.Context(), "beacon.ListAttestations")
defer span.End()
ok, rawSlot, slot := shared.UintFromQuery(w, r, "slot")
if !ok {
return
}
ok, rawCommitteeIndex, committeeIndex := shared.UintFromQuery(w, r, "committee_index")
if !ok {
return
}
attestations := s.AttestationsPool.AggregatedAttestations()
unaggAtts, err := s.AttestationsPool.UnaggregatedAttestations()
if err != nil {
http2.HandleError(w, "Could not get unaggregated attestations: "+err.Error(), http.StatusInternalServerError)
return
}
attestations = append(attestations, unaggAtts...)
isEmptyReq := rawSlot == "" && rawCommitteeIndex == ""
if isEmptyReq {
allAtts := make([]*shared.Attestation, len(attestations))
for i, att := range attestations {
allAtts[i] = shared.AttestationFromConsensus(att)
}
http2.WriteJson(w, &ListAttestationsResponse{Data: allAtts})
return
}
bothDefined := rawSlot != "" && rawCommitteeIndex != ""
filteredAtts := make([]*shared.Attestation, 0, len(attestations))
for _, att := range attestations {
committeeIndexMatch := rawCommitteeIndex != "" && att.Data.CommitteeIndex == primitives.CommitteeIndex(committeeIndex)
slotMatch := rawSlot != "" && att.Data.Slot == primitives.Slot(slot)
shouldAppend := (bothDefined && committeeIndexMatch && slotMatch) || (!bothDefined && (committeeIndexMatch || slotMatch))
if shouldAppend {
filteredAtts = append(filteredAtts, shared.AttestationFromConsensus(att))
}
}
http2.WriteJson(w, &ListAttestationsResponse{Data: filteredAtts})
}
// SubmitAttestations submits an attestation object to node. If the attestation passes all validation
// constraints, node MUST publish the attestation on an appropriate subnet.
func (s *Server) SubmitAttestations(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitAttestations")
defer span.End()
var req SubmitAttestationsRequest
err := json.NewDecoder(r.Body).Decode(&req.Data)
switch {
case err == io.EOF:
http2.HandleError(w, "No data submitted", http.StatusBadRequest)
return
case err != nil:
http2.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
return
}
if len(req.Data) == 0 {
http2.HandleError(w, "No data submitted", http.StatusBadRequest)
return
}
var validAttestations []*ethpbalpha.Attestation
var attFailures []*shared.IndexedVerificationFailure
for i, sourceAtt := range req.Data {
att, err := sourceAtt.ToConsensus()
if err != nil {
attFailures = append(attFailures, &shared.IndexedVerificationFailure{
Index: i,
Message: "Could not convert request attestation to consensus attestation: " + err.Error(),
})
continue
}
if _, err = bls.SignatureFromBytes(att.Signature); err != nil {
attFailures = append(attFailures, &shared.IndexedVerificationFailure{
Index: i,
Message: "Incorrect attestation signature: " + err.Error(),
})
continue
}
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
// Note we can't send for aggregated att because we don't have selection proof.
if !corehelpers.IsAggregated(att) {
s.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
}
validAttestations = append(validAttestations, att)
}
failedBroadcasts := make([]string, 0)
for i, att := range validAttestations {
// Determine subnet to broadcast attestation to
wantedEpoch := slots.ToEpoch(att.Data.Slot)
vals, err := s.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
http2.HandleError(w, "Could not get head validator indices: "+err.Error(), http.StatusInternalServerError)
return
}
subnet := corehelpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot)
if err = s.Broadcaster.BroadcastAttestation(ctx, subnet, att); err != nil {
failedBroadcasts = append(failedBroadcasts, strconv.Itoa(i))
log.WithError(err).Errorf("could not broadcast attestation at index %d", i)
}
if corehelpers.IsAggregated(att) {
if err = s.AttestationsPool.SaveAggregatedAttestation(att); err != nil {
log.WithError(err).Error("could not save aggregated attestation")
}
} else {
if err = s.AttestationsPool.SaveUnaggregatedAttestation(att); err != nil {
log.WithError(err).Error("could not save unaggregated attestation")
}
}
}
if len(failedBroadcasts) > 0 {
http2.HandleError(
w,
fmt.Sprintf("Attestations at index %s could not be broadcasted", strings.Join(failedBroadcasts, ", ")),
http.StatusInternalServerError,
)
return
}
if len(attFailures) > 0 {
failuresErr := &shared.IndexedVerificationFailureError{
Code: http.StatusBadRequest,
Message: "One or more attestations failed validation",
Failures: attFailures,
}
http2.WriteError(w, failuresErr)
}
}
// ListVoluntaryExits retrieves voluntary exits known by the node but
// not necessarily incorporated into any block.
func (s *Server) ListVoluntaryExits(w http.ResponseWriter, r *http.Request) {
_, span := trace.StartSpan(r.Context(), "beacon.ListVoluntaryExits")
defer span.End()
sourceExits, err := s.VoluntaryExitsPool.PendingExits()
if err != nil {
http2.HandleError(w, "Could not get exits from the pool: "+err.Error(), http.StatusInternalServerError)
return
}
exits := make([]*shared.SignedVoluntaryExit, len(sourceExits))
for i, e := range sourceExits {
exits[i] = shared.SignedVoluntaryExitFromConsensus(e)
}
http2.WriteJson(w, &ListVoluntaryExitsResponse{Data: exits})
}
// SubmitVoluntaryExit submits a SignedVoluntaryExit object to node's pool
// and if passes validation node MUST broadcast it to network.
func (s *Server) SubmitVoluntaryExit(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitVoluntaryExit")
defer span.End()
var req shared.SignedVoluntaryExit
err := json.NewDecoder(r.Body).Decode(&req)
switch {
case err == io.EOF:
http2.HandleError(w, "No data submitted", http.StatusBadRequest)
return
case err != nil:
http2.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
return
}
exit, err := req.ToConsensus()
if err != nil {
http2.HandleError(w, "Could not convert request exit to consensus exit: "+err.Error(), http.StatusBadRequest)
return
}
headState, err := s.ChainInfoFetcher.HeadState(ctx)
if err != nil {
http2.HandleError(w, "Could not get head state: "+err.Error(), http.StatusInternalServerError)
return
}
epochStart, err := slots.EpochStart(exit.Exit.Epoch)
if err != nil {
http2.HandleError(w, "Could not get epoch start: "+err.Error(), http.StatusInternalServerError)
return
}
headState, err = transition.ProcessSlotsIfPossible(ctx, headState, epochStart)
if err != nil {
http2.HandleError(w, "Could not process slots: "+err.Error(), http.StatusInternalServerError)
return
}
val, err := headState.ValidatorAtIndexReadOnly(exit.Exit.ValidatorIndex)
if err != nil {
if errors.Is(err, consensus_types.ErrOutOfBounds) {
http2.HandleError(w, "Could not get validator: "+err.Error(), http.StatusBadRequest)
return
}
http2.HandleError(w, "Could not get validator: "+err.Error(), http.StatusInternalServerError)
return
}
if err = blocks.VerifyExitAndSignature(val, headState, exit); err != nil {
http2.HandleError(w, "Invalid exit: "+err.Error(), http.StatusBadRequest)
return
}
s.VoluntaryExitsPool.InsertVoluntaryExit(exit)
if err = s.Broadcaster.Broadcast(ctx, exit); err != nil {
http2.HandleError(w, "Could not broadcast exit: "+err.Error(), http.StatusInternalServerError)
return
}
}
// SubmitSyncCommitteeSignatures submits sync committee signature objects to the node.
func (s *Server) SubmitSyncCommitteeSignatures(w http.ResponseWriter, r *http.Request) {
ctx, span := trace.StartSpan(r.Context(), "beacon.SubmitPoolSyncCommitteeSignatures")
defer span.End()
var req SubmitSyncCommitteeSignaturesRequest
err := json.NewDecoder(r.Body).Decode(&req.Data)
switch {
case err == io.EOF:
http2.HandleError(w, "No data submitted", http.StatusBadRequest)
return
case err != nil:
http2.HandleError(w, "Could not decode request body: "+err.Error(), http.StatusBadRequest)
return
}
if len(req.Data) == 0 {
http2.HandleError(w, "No data submitted", http.StatusBadRequest)
return
}
var validMessages []*ethpbalpha.SyncCommitteeMessage
var msgFailures []*shared.IndexedVerificationFailure
for i, sourceMsg := range req.Data {
msg, err := sourceMsg.ToConsensus()
if err != nil {
msgFailures = append(msgFailures, &shared.IndexedVerificationFailure{
Index: i,
Message: "Could not convert request message to consensus message: " + err.Error(),
})
continue
}
validMessages = append(validMessages, msg)
}
for _, msg := range validMessages {
if rpcerr := s.CoreService.SubmitSyncMessage(ctx, msg); rpcerr != nil {
http2.HandleError(w, "Could not submit message: "+rpcerr.Err.Error(), core.ErrorReasonToHTTP(rpcerr.Reason))
return
}
}
if len(msgFailures) > 0 {
failuresErr := &shared.IndexedVerificationFailureError{
Code: http.StatusBadRequest,
Message: "One or more messages failed validation",
Failures: msgFailures,
}
http2.WriteError(w, failuresErr)
}
}