forked from ledgerwatch/erigon
-
Notifications
You must be signed in to change notification settings - Fork 0
/
sentry_api.go
157 lines (143 loc) · 4.82 KB
/
sentry_api.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
package sentry_multi_client
import (
"context"
"math/rand"
"github.com/holiman/uint256"
libcommon "github.com/nebojsa94/erigon/erigon-lib/common"
"github.com/nebojsa94/erigon/erigon-lib/gointerfaces"
proto_sentry "github.com/nebojsa94/erigon/erigon-lib/gointerfaces/sentry"
"google.golang.org/grpc"
"github.com/nebojsa94/erigon/eth/protocols/eth"
"github.com/nebojsa94/erigon/p2p/sentry"
"github.com/nebojsa94/erigon/rlp"
"github.com/nebojsa94/erigon/turbo/stages/bodydownload"
"github.com/nebojsa94/erigon/turbo/stages/headerdownload"
)
// Methods of sentry called by Core
func (cs *MultiClient) UpdateHead(ctx context.Context, height, time uint64, hash libcommon.Hash, td *uint256.Int) {
cs.lock.Lock()
defer cs.lock.Unlock()
cs.headHeight = height
cs.headTime = time
cs.headHash = hash
cs.headTd = td
statusMsg := cs.makeStatusData()
for _, sentry := range cs.sentries {
if !sentry.Ready() {
continue
}
if _, err := sentry.SetStatus(ctx, statusMsg, &grpc.EmptyCallOption{}); err != nil {
cs.logger.Error("Update status message for the sentry", "err", err)
}
}
}
func (cs *MultiClient) SendBodyRequest(ctx context.Context, req *bodydownload.BodyRequest) (peerID [64]byte, ok bool) {
// if sentry not found peers to send such message, try next one. stop if found.
for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() {
if !cs.sentries[i].Ready() {
continue
}
//log.Info(fmt.Sprintf("Sending body request for %v", req.BlockNums))
var bytes []byte
var err error
bytes, err = rlp.EncodeToBytes(ð.GetBlockBodiesPacket66{
RequestId: rand.Uint64(), // nolint: gosec
GetBlockBodiesPacket: req.Hashes,
})
if err != nil {
cs.logger.Error("Could not encode block bodies request", "err", err)
return [64]byte{}, false
}
outreq := proto_sentry.SendMessageByMinBlockRequest{
MinBlock: req.BlockNums[len(req.BlockNums)-1],
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_GET_BLOCK_BODIES_66,
Data: bytes,
},
MaxPeers: 1,
}
sentPeers, err1 := cs.sentries[i].SendMessageByMinBlock(ctx, &outreq, &grpc.EmptyCallOption{})
if err1 != nil {
cs.logger.Error("Could not send block bodies request", "err", err1)
return [64]byte{}, false
}
if sentPeers == nil || len(sentPeers.Peers) == 0 {
continue
}
return sentry.ConvertH512ToPeerID(sentPeers.Peers[0]), true
}
return [64]byte{}, false
}
func (cs *MultiClient) SendHeaderRequest(ctx context.Context, req *headerdownload.HeaderRequest) (peerID [64]byte, ok bool) {
// if sentry not found peers to send such message, try next one. stop if found.
for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() {
if !cs.sentries[i].Ready() {
continue
}
//log.Info(fmt.Sprintf("Sending header request {hash: %x, height: %d, length: %d}", req.Hash, req.Number, req.Length))
reqData := ð.GetBlockHeadersPacket66{
RequestId: rand.Uint64(), // nolint: gosec
GetBlockHeadersPacket: ð.GetBlockHeadersPacket{
Amount: req.Length,
Reverse: req.Reverse,
Skip: req.Skip,
Origin: eth.HashOrNumber{Hash: req.Hash},
},
}
if req.Hash == (libcommon.Hash{}) {
reqData.Origin.Number = req.Number
}
bytes, err := rlp.EncodeToBytes(reqData)
if err != nil {
cs.logger.Error("Could not encode header request", "err", err)
return [64]byte{}, false
}
minBlock := req.Number
outreq := proto_sentry.SendMessageByMinBlockRequest{
MinBlock: minBlock,
Data: &proto_sentry.OutboundMessageData{
Id: proto_sentry.MessageId_GET_BLOCK_HEADERS_66,
Data: bytes,
},
MaxPeers: 5,
}
sentPeers, err1 := cs.sentries[i].SendMessageByMinBlock(ctx, &outreq, &grpc.EmptyCallOption{})
if err1 != nil {
cs.logger.Error("Could not send header request", "err", err1)
return [64]byte{}, false
}
if sentPeers == nil || len(sentPeers.Peers) == 0 {
continue
}
return sentry.ConvertH512ToPeerID(sentPeers.Peers[0]), true
}
return [64]byte{}, false
}
func (cs *MultiClient) randSentryIndex() (int, bool, func() (int, bool)) {
var i int
if len(cs.sentries) > 1 {
i = rand.Intn(len(cs.sentries) - 1) // nolint: gosec
}
to := i
return i, true, func() (int, bool) {
i = (i + 1) % len(cs.sentries)
return i, i != to
}
}
// sending list of penalties to all sentries
func (cs *MultiClient) Penalize(ctx context.Context, penalties []headerdownload.PenaltyItem) {
for i := range penalties {
outreq := proto_sentry.PenalizePeerRequest{
PeerId: gointerfaces.ConvertHashToH512(penalties[i].PeerID),
Penalty: proto_sentry.PenaltyKind_Kick, // TODO: Extend penalty kinds
}
for i, ok, next := cs.randSentryIndex(); ok; i, ok = next() {
if !cs.sentries[i].Ready() {
continue
}
if _, err1 := cs.sentries[i].PenalizePeer(ctx, &outreq, &grpc.EmptyCallOption{}); err1 != nil {
cs.logger.Error("Could not send penalty", "err", err1)
}
}
}
}