/
attester.go
144 lines (124 loc) · 5.36 KB
/
attester.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
package validator
import (
"context"
"github.com/theQRL/qrysm/v4/beacon-chain/cache"
"github.com/theQRL/qrysm/v4/beacon-chain/core/feed"
"github.com/theQRL/qrysm/v4/beacon-chain/core/feed/operation"
"github.com/theQRL/qrysm/v4/beacon-chain/core/helpers"
"github.com/theQRL/qrysm/v4/beacon-chain/rpc/core"
"github.com/theQRL/qrysm/v4/consensus-types/primitives"
"github.com/theQRL/qrysm/v4/crypto/dilithium"
zondpb "github.com/theQRL/qrysm/v4/proto/prysm/v1alpha1"
"github.com/theQRL/qrysm/v4/time/slots"
"go.opencensus.io/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
)
// GetAttestationData requests that the beacon node produce an attestation data object,
// which the validator acting as an attester will then sign.
func (vs *Server) GetAttestationData(ctx context.Context, req *zondpb.AttestationDataRequest) (*zondpb.AttestationData, error) {
ctx, span := trace.StartSpan(ctx, "AttesterServer.RequestAttestation")
defer span.End()
span.AddAttributes(
trace.Int64Attribute("slot", int64(req.Slot)),
trace.Int64Attribute("committeeIndex", int64(req.CommitteeIndex)),
)
if vs.SyncChecker.Syncing() {
return nil, status.Errorf(codes.Unavailable, "Syncing to latest head, not ready to respond")
}
// An optimistic validator MUST NOT participate in attestation. (i.e., sign across the DOMAIN_BEACON_ATTESTER, DOMAIN_SELECTION_PROOF or DOMAIN_AGGREGATE_AND_PROOF domains).
if err := vs.optimisticStatus(ctx); err != nil {
return nil, err
}
res, err := vs.CoreService.GetAttestationData(ctx, req)
if err != nil {
return nil, status.Errorf(core.ErrorReasonToGRPC(err.Reason), "Could not get attestation data: %v", err.Err)
}
return res, nil
}
// ProposeAttestation is a function called by an attester to vote
// on a block via an attestation object as defined in the Ethereum Serenity specification.
func (vs *Server) ProposeAttestation(ctx context.Context, att *zondpb.Attestation) (*zondpb.AttestResponse, error) {
ctx, span := trace.StartSpan(ctx, "AttesterServer.ProposeAttestation")
defer span.End()
if _, err := dilithium.SignatureFromBytes(att.Signature); err != nil {
return nil, status.Error(codes.InvalidArgument, "Incorrect attestation signature")
}
root, err := att.Data.HashTreeRoot()
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not tree hash attestation: %v", err)
}
// Broadcast the unaggregated attestation on a feed to notify other services in the beacon node
// of a received unaggregated attestation.
vs.OperationNotifier.OperationFeed().Send(&feed.Event{
Type: operation.UnaggregatedAttReceived,
Data: &operation.UnAggregatedAttReceivedData{
Attestation: att,
},
})
// Determine subnet to broadcast attestation to
wantedEpoch := slots.ToEpoch(att.Data.Slot)
vals, err := vs.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
return nil, err
}
subnet := helpers.ComputeSubnetFromCommitteeAndSlot(uint64(len(vals)), att.Data.CommitteeIndex, att.Data.Slot)
// Broadcast the new attestation to the network.
if err := vs.P2P.BroadcastAttestation(ctx, subnet, att); err != nil {
return nil, status.Errorf(codes.Internal, "Could not broadcast attestation: %v", err)
}
go func() {
ctx = trace.NewContext(context.Background(), trace.FromContext(ctx))
attCopy := zondpb.CopyAttestation(att)
if err := vs.AttPool.SaveUnaggregatedAttestation(attCopy); err != nil {
log.WithError(err).Error("Could not handle attestation in operations service")
return
}
}()
return &zondpb.AttestResponse{
AttestationDataRoot: root[:],
}, nil
}
// SubscribeCommitteeSubnets subscribes to the committee ID subnet given subscribe request.
func (vs *Server) SubscribeCommitteeSubnets(ctx context.Context, req *zondpb.CommitteeSubnetsSubscribeRequest) (*emptypb.Empty, error) {
ctx, span := trace.StartSpan(ctx, "AttesterServer.SubscribeCommitteeSubnets")
defer span.End()
if len(req.Slots) != len(req.CommitteeIds) || len(req.CommitteeIds) != len(req.IsAggregator) {
return nil, status.Error(codes.InvalidArgument, "request fields are not the same length")
}
if len(req.Slots) == 0 {
return nil, status.Error(codes.InvalidArgument, "no attester slots provided")
}
fetchValsLen := func(slot primitives.Slot) (uint64, error) {
wantedEpoch := slots.ToEpoch(slot)
vals, err := vs.HeadFetcher.HeadValidatorsIndices(ctx, wantedEpoch)
if err != nil {
return 0, err
}
return uint64(len(vals)), nil
}
// Request the head validator indices of epoch represented by the first requested
// slot.
currValsLen, err := fetchValsLen(req.Slots[0])
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve head validator length: %v", err)
}
currEpoch := slots.ToEpoch(req.Slots[0])
for i := 0; i < len(req.Slots); i++ {
// If epoch has changed, re-request active validators length
if currEpoch != slots.ToEpoch(req.Slots[i]) {
currValsLen, err = fetchValsLen(req.Slots[i])
if err != nil {
return nil, status.Errorf(codes.Internal, "Could not retrieve head validator length: %v", err)
}
currEpoch = slots.ToEpoch(req.Slots[i])
}
subnet := helpers.ComputeSubnetFromCommitteeAndSlot(currValsLen, req.CommitteeIds[i], req.Slots[i])
cache.SubnetIDs.AddAttesterSubnetID(req.Slots[i], subnet)
if req.IsAggregator[i] {
cache.SubnetIDs.AddAggregatorSubnetID(req.Slots[i], subnet)
}
}
return &emptypb.Empty{}, nil
}