-
Notifications
You must be signed in to change notification settings - Fork 199
/
peerAuthenticationResolver.go
187 lines (159 loc) · 5.9 KB
/
peerAuthenticationResolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
package resolvers
import (
"fmt"
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/batch"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/heartbeat"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/storage"
logger "github.com/multiversx/mx-chain-logger-go"
)
// maxBuffToSendPeerAuthentications represents max buffer size to send in bytes
const maxBuffToSendPeerAuthentications = 1 << 18 // 256KB
// ArgPeerAuthenticationResolver is the argument structure used to create a new peer authentication resolver instance
type ArgPeerAuthenticationResolver struct {
ArgBaseResolver
PeerAuthenticationPool storage.Cacher
DataPacker dataRetriever.DataPacker
PayloadValidator dataRetriever.PeerAuthenticationPayloadValidator
}
// peerAuthenticationResolver is a wrapper over Resolver that is specialized in resolving peer authentication requests
type peerAuthenticationResolver struct {
*baseResolver
messageProcessor
peerAuthenticationPool storage.Cacher
dataPacker dataRetriever.DataPacker
payloadValidator dataRetriever.PeerAuthenticationPayloadValidator
}
// NewPeerAuthenticationResolver creates a peer authentication resolver
func NewPeerAuthenticationResolver(arg ArgPeerAuthenticationResolver) (*peerAuthenticationResolver, error) {
err := checkArgPeerAuthenticationResolver(arg)
if err != nil {
return nil, err
}
return &peerAuthenticationResolver{
baseResolver: &baseResolver{
TopicResolverSender: arg.SenderResolver,
},
messageProcessor: messageProcessor{
marshalizer: arg.Marshaller,
antifloodHandler: arg.AntifloodHandler,
throttler: arg.Throttler,
topic: arg.SenderResolver.RequestTopic(),
},
peerAuthenticationPool: arg.PeerAuthenticationPool,
dataPacker: arg.DataPacker,
payloadValidator: arg.PayloadValidator,
}, nil
}
func checkArgPeerAuthenticationResolver(arg ArgPeerAuthenticationResolver) error {
err := checkArgBase(arg.ArgBaseResolver)
if err != nil {
return err
}
if check.IfNil(arg.PeerAuthenticationPool) {
return dataRetriever.ErrNilPeerAuthenticationPool
}
if check.IfNil(arg.DataPacker) {
return dataRetriever.ErrNilDataPacker
}
if check.IfNil(arg.PayloadValidator) {
return dataRetriever.ErrNilPayloadValidator
}
return nil
}
// ProcessReceivedMessage represents the callback func from the p2p.Messenger that is called each time a new message is received
// (for the topic this validator was registered to, usually a request topic)
func (res *peerAuthenticationResolver) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID, source p2p.MessageHandler) error {
err := res.canProcessMessage(message, fromConnectedPeer)
if err != nil {
return err
}
res.throttler.StartProcessing()
defer res.throttler.EndProcessing()
rd, err := res.parseReceivedMessage(message, fromConnectedPeer)
if err != nil {
return err
}
switch rd.Type {
case dataRetriever.HashArrayType:
return res.resolveMultipleHashesRequest(rd.Value, message.Peer(), source)
default:
err = dataRetriever.ErrRequestTypeNotImplemented
}
if err != nil {
err = fmt.Errorf("%w for value %s", err, logger.DisplayByteSlice(rd.Value))
}
return err
}
// resolveMultipleHashesRequest sends the response for multiple hashes request
func (res *peerAuthenticationResolver) resolveMultipleHashesRequest(hashesBuff []byte, pid core.PeerID, source p2p.MessageHandler) error {
b := batch.Batch{}
err := res.marshalizer.Unmarshal(&b, hashesBuff)
if err != nil {
return err
}
hashes := b.Data
peerAuthsForHashes, err := res.fetchPeerAuthenticationSlicesForPublicKeys(hashes)
if err != nil {
return fmt.Errorf("resolveMultipleHashesRequest error %w from buff %x", err, hashesBuff)
}
return res.sendPeerAuthsForHashes(peerAuthsForHashes, pid, source)
}
// sendPeerAuthsForHashes sends multiple peer authentication messages for specific hashes
func (res *peerAuthenticationResolver) sendPeerAuthsForHashes(dataBuff [][]byte, pid core.PeerID, source p2p.MessageHandler) error {
buffsToSend, err := res.dataPacker.PackDataInChunks(dataBuff, maxBuffToSendPeerAuthentications)
if err != nil {
return err
}
for _, buff := range buffsToSend {
err = res.Send(buff, pid, source)
if err != nil {
return err
}
}
return nil
}
// fetchPeerAuthenticationSlicesForPublicKeys fetches all peer authentications for all pks
func (res *peerAuthenticationResolver) fetchPeerAuthenticationSlicesForPublicKeys(pks [][]byte) ([][]byte, error) {
peerAuths := make([][]byte, 0)
for _, pk := range pks {
peerAuthForHash, _ := res.fetchPeerAuthenticationAsByteSlice(pk)
if peerAuthForHash != nil {
peerAuths = append(peerAuths, peerAuthForHash)
}
}
if len(peerAuths) == 0 {
return nil, dataRetriever.ErrPeerAuthNotFound
}
return peerAuths, nil
}
// fetchPeerAuthenticationAsByteSlice returns the value from authentication pool if exists
func (res *peerAuthenticationResolver) fetchPeerAuthenticationAsByteSlice(pk []byte) ([]byte, error) {
value, ok := res.peerAuthenticationPool.Peek(pk)
if !ok {
return nil, dataRetriever.ErrPeerAuthNotFound
}
interceptedPeerAuthenticationData, ok := value.(*heartbeat.PeerAuthentication)
if !ok {
return nil, dataRetriever.ErrWrongTypeAssertion
}
payloadBuff := interceptedPeerAuthenticationData.Payload
payload := &heartbeat.Payload{}
err := res.marshalizer.Unmarshal(payload, payloadBuff)
if err != nil {
return nil, err
}
err = res.payloadValidator.ValidateTimestamp(payload.Timestamp)
if err != nil {
log.Trace("found peer authentication payload with invalid value, will not send it", "error", err)
return nil, err
}
return res.marshalizer.Marshal(value)
}
// IsInterfaceNil returns true if there is no value under the interface
func (res *peerAuthenticationResolver) IsInterfaceNil() bool {
return res == nil
}