-
Notifications
You must be signed in to change notification settings - Fork 199
/
validatorInfoResolver.go
225 lines (191 loc) · 6.87 KB
/
validatorInfoResolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
package resolvers
import (
"encoding/hex"
"fmt"
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/batch"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/storage"
logger "github.com/multiversx/mx-chain-logger-go"
)
// maxBuffToSendValidatorsInfo represents max buffer size to send in bytes
const maxBuffToSendValidatorsInfo = 1 << 18 // 256KB
// ArgValidatorInfoResolver is the argument structure used to create a new validator info resolver instance
type ArgValidatorInfoResolver struct {
SenderResolver dataRetriever.TopicResolverSender
Marshaller marshal.Marshalizer
AntifloodHandler dataRetriever.P2PAntifloodHandler
Throttler dataRetriever.ResolverThrottler
ValidatorInfoPool dataRetriever.ShardedDataCacherNotifier
ValidatorInfoStorage storage.Storer
DataPacker dataRetriever.DataPacker
IsFullHistoryNode bool
}
// validatorInfoResolver is a wrapper over Resolver that is specialized in resolving validator info requests
type validatorInfoResolver struct {
dataRetriever.TopicResolverSender
messageProcessor
baseStorageResolver
validatorInfoPool dataRetriever.ShardedDataCacherNotifier
validatorInfoStorage storage.Storer
dataPacker dataRetriever.DataPacker
}
// NewValidatorInfoResolver creates a validator info resolver
func NewValidatorInfoResolver(args ArgValidatorInfoResolver) (*validatorInfoResolver, error) {
err := checkArgs(args)
if err != nil {
return nil, err
}
return &validatorInfoResolver{
TopicResolverSender: args.SenderResolver,
messageProcessor: messageProcessor{
marshalizer: args.Marshaller,
antifloodHandler: args.AntifloodHandler,
throttler: args.Throttler,
topic: args.SenderResolver.RequestTopic(),
},
baseStorageResolver: createBaseStorageResolver(args.ValidatorInfoStorage, args.IsFullHistoryNode),
validatorInfoPool: args.ValidatorInfoPool,
validatorInfoStorage: args.ValidatorInfoStorage,
dataPacker: args.DataPacker,
}, nil
}
func checkArgs(args ArgValidatorInfoResolver) error {
if check.IfNil(args.SenderResolver) {
return dataRetriever.ErrNilResolverSender
}
if check.IfNil(args.Marshaller) {
return dataRetriever.ErrNilMarshalizer
}
if check.IfNil(args.AntifloodHandler) {
return dataRetriever.ErrNilAntifloodHandler
}
if check.IfNil(args.Throttler) {
return dataRetriever.ErrNilThrottler
}
if check.IfNil(args.ValidatorInfoPool) {
return dataRetriever.ErrNilValidatorInfoPool
}
if check.IfNil(args.ValidatorInfoStorage) {
return dataRetriever.ErrNilValidatorInfoStorage
}
if check.IfNil(args.DataPacker) {
return dataRetriever.ErrNilDataPacker
}
return nil
}
// ProcessReceivedMessage represents the callback func from the p2p.Messenger that is called each time a new message is received
// (for the topic this validator was registered to, usually a request topic)
func (res *validatorInfoResolver) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID, source p2p.MessageHandler) error {
err := res.canProcessMessage(message, fromConnectedPeer)
if err != nil {
return err
}
res.throttler.StartProcessing()
defer res.throttler.EndProcessing()
rd, err := res.parseReceivedMessage(message, fromConnectedPeer)
if err != nil {
return err
}
switch rd.Type {
case dataRetriever.HashType:
return res.resolveHashRequest(rd.Value, rd.Epoch, fromConnectedPeer, source)
case dataRetriever.HashArrayType:
return res.resolveMultipleHashesRequest(rd.Value, rd.Epoch, fromConnectedPeer, source)
}
return fmt.Errorf("%w for value %s", dataRetriever.ErrRequestTypeNotImplemented, logger.DisplayByteSlice(rd.Value))
}
// resolveHashRequest sends the response for a hash request
func (res *validatorInfoResolver) resolveHashRequest(hash []byte, epoch uint32, pid core.PeerID, source p2p.MessageHandler) error {
data, err := res.fetchValidatorInfoByteSlice(hash, epoch)
if err != nil {
return err
}
return res.marshalAndSend(data, pid, source)
}
// resolveMultipleHashesRequest sends the response for a hash array type request
func (res *validatorInfoResolver) resolveMultipleHashesRequest(hashesBuff []byte, epoch uint32, pid core.PeerID, source p2p.MessageHandler) error {
b := batch.Batch{}
err := res.marshalizer.Unmarshal(&b, hashesBuff)
if err != nil {
return err
}
hashes := b.Data
validatorInfoForHashes, err := res.fetchValidatorInfoForHashes(hashes, epoch)
if err != nil {
outputHashes := ""
for _, hash := range hashes {
outputHashes += hex.EncodeToString(hash) + " "
}
return fmt.Errorf("resolveMultipleHashesRequest error %w from buff %s", err, outputHashes)
}
return res.sendValidatorInfoForHashes(validatorInfoForHashes, pid, source)
}
func (res *validatorInfoResolver) sendValidatorInfoForHashes(validatorInfoForHashes [][]byte, pid core.PeerID, source p2p.MessageHandler) error {
buffsToSend, err := res.dataPacker.PackDataInChunks(validatorInfoForHashes, maxBuffToSendValidatorsInfo)
if err != nil {
return err
}
for _, buff := range buffsToSend {
err = res.Send(buff, pid, source)
if err != nil {
return err
}
}
return nil
}
func (res *validatorInfoResolver) fetchValidatorInfoForHashes(hashes [][]byte, epoch uint32) ([][]byte, error) {
validatorInfos := make([][]byte, 0)
for _, hash := range hashes {
validatorInfoForHash, _ := res.fetchValidatorInfoByteSlice(hash, epoch)
if validatorInfoForHash != nil {
validatorInfos = append(validatorInfos, validatorInfoForHash)
}
}
if len(validatorInfos) == 0 {
return nil, dataRetriever.ErrValidatorInfoNotFound
}
return validatorInfos, nil
}
func (res *validatorInfoResolver) fetchValidatorInfoByteSlice(hash []byte, epoch uint32) ([]byte, error) {
data, ok := res.validatorInfoPool.SearchFirstData(hash)
if ok {
return res.marshalizer.Marshal(data)
}
buff, err := res.getFromStorage(hash, epoch)
if err != nil {
res.DebugHandler().LogFailedToResolveData(
res.topic,
hash,
err,
)
return nil, err
}
res.DebugHandler().LogSucceededToResolveData(res.topic, hash)
return buff, nil
}
func (res *validatorInfoResolver) marshalAndSend(data []byte, pid core.PeerID, source p2p.MessageHandler) error {
b := &batch.Batch{
Data: [][]byte{data},
}
buff, err := res.marshalizer.Marshal(b)
if err != nil {
return err
}
return res.Send(buff, pid, source)
}
// SetDebugHandler sets a debug handler
func (res *validatorInfoResolver) SetDebugHandler(handler dataRetriever.DebugHandler) error {
return res.TopicResolverSender.SetDebugHandler(handler)
}
// Close returns nil
func (res *validatorInfoResolver) Close() error {
return nil
}
// IsInterfaceNil returns true if there is no value under the interface
func (res *validatorInfoResolver) IsInterfaceNil() bool {
return res == nil
}