-
Notifications
You must be signed in to change notification settings - Fork 199
/
transactionResolver.go
205 lines (172 loc) · 5.65 KB
/
transactionResolver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
package resolvers
import (
"fmt"
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/batch"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/p2p"
"github.com/multiversx/mx-chain-go/storage"
logger "github.com/multiversx/mx-chain-logger-go"
)
var _ dataRetriever.Resolver = (*TxResolver)(nil)
// maxBuffToSendBulkTransactions represents max buffer size to send in bytes
const maxBuffToSendBulkTransactions = 1 << 18 // 256KB
// maxBuffToSendBulkMiniblocks represents max buffer size to send in bytes
const maxBuffToSendBulkMiniblocks = 1 << 18 // 256KB
// ArgTxResolver is the argument structure used to create new TxResolver instance
type ArgTxResolver struct {
ArgBaseResolver
TxPool dataRetriever.ShardedDataCacherNotifier
TxStorage storage.Storer
DataPacker dataRetriever.DataPacker
IsFullHistoryNode bool
}
// TxResolver is a wrapper over Resolver that is specialized in resolving transaction requests
type TxResolver struct {
*baseResolver
messageProcessor
baseStorageResolver
txPool dataRetriever.ShardedDataCacherNotifier
dataPacker dataRetriever.DataPacker
}
// NewTxResolver creates a new transaction resolver
func NewTxResolver(arg ArgTxResolver) (*TxResolver, error) {
err := checkArgTxResolver(arg)
if err != nil {
return nil, err
}
txResolver := &TxResolver{
baseResolver: &baseResolver{
TopicResolverSender: arg.SenderResolver,
},
txPool: arg.TxPool,
baseStorageResolver: createBaseStorageResolver(arg.TxStorage, arg.IsFullHistoryNode),
dataPacker: arg.DataPacker,
messageProcessor: messageProcessor{
marshalizer: arg.Marshaller,
antifloodHandler: arg.AntifloodHandler,
topic: arg.SenderResolver.RequestTopic(),
throttler: arg.Throttler,
},
}
return txResolver, nil
}
func checkArgTxResolver(arg ArgTxResolver) error {
err := checkArgBase(arg.ArgBaseResolver)
if err != nil {
return err
}
if check.IfNil(arg.TxPool) {
return dataRetriever.ErrNilTxDataPool
}
if check.IfNil(arg.TxStorage) {
return dataRetriever.ErrNilTxStorage
}
if check.IfNil(arg.DataPacker) {
return dataRetriever.ErrNilDataPacker
}
return nil
}
// ProcessReceivedMessage will be the callback func from the p2p.Messenger and will be called each time a new message was received
// (for the topic this validator was registered to, usually a request topic)
func (txRes *TxResolver) ProcessReceivedMessage(message p2p.MessageP2P, fromConnectedPeer core.PeerID, source p2p.MessageHandler) error {
err := txRes.canProcessMessage(message, fromConnectedPeer)
if err != nil {
return err
}
txRes.throttler.StartProcessing()
defer txRes.throttler.EndProcessing()
rd, err := txRes.parseReceivedMessage(message, fromConnectedPeer)
if err != nil {
return err
}
switch rd.Type {
case dataRetriever.HashType:
err = txRes.resolveTxRequestByHash(rd.Value, message.Peer(), rd.Epoch, source)
case dataRetriever.HashArrayType:
err = txRes.resolveTxRequestByHashArray(rd.Value, message.Peer(), rd.Epoch, source)
default:
err = dataRetriever.ErrRequestTypeNotImplemented
}
if err != nil {
err = fmt.Errorf("%w for hash %s", err, logger.DisplayByteSlice(rd.Value))
}
return err
}
func (txRes *TxResolver) resolveTxRequestByHash(hash []byte, pid core.PeerID, epoch uint32, source p2p.MessageHandler) error {
// TODO this can be optimized by searching in corresponding datapool (taken by topic name)
tx, err := txRes.fetchTxAsByteSlice(hash, epoch)
if err != nil {
return err
}
b := &batch.Batch{
Data: [][]byte{tx},
}
buff, err := txRes.marshalizer.Marshal(b)
if err != nil {
return err
}
return txRes.Send(buff, pid, source)
}
func (txRes *TxResolver) fetchTxAsByteSlice(hash []byte, epoch uint32) ([]byte, error) {
value, ok := txRes.txPool.SearchFirstData(hash)
if ok {
return txRes.marshalizer.Marshal(value)
}
buff, err := txRes.getFromStorage(hash, epoch)
if err != nil {
txRes.DebugHandler().LogFailedToResolveData(
txRes.topic,
hash,
err,
)
return nil, err
}
txRes.DebugHandler().LogSucceededToResolveData(txRes.topic, hash)
return buff, nil
}
func (txRes *TxResolver) resolveTxRequestByHashArray(hashesBuff []byte, pid core.PeerID, epoch uint32, source p2p.MessageHandler) error {
// TODO this can be optimized by searching in corresponding datapool (taken by topic name)
b := batch.Batch{}
err := txRes.marshalizer.Unmarshal(&b, hashesBuff)
if err != nil {
return err
}
hashes := b.Data
var errFetch error
errorsFound := 0
txsBuffSlice := make([][]byte, 0, len(hashes))
for _, hash := range hashes {
tx, errTemp := txRes.fetchTxAsByteSlice(hash, epoch)
if errTemp != nil {
errFetch = fmt.Errorf("%w for hash %s", errTemp, logger.DisplayByteSlice(hash))
// it might happen to error on a tx (maybe it is missing) but should continue
// as to send back as many as it can
log.Trace("fetchTxAsByteSlice missing",
"error", errFetch.Error(),
"hash", hash)
errorsFound++
continue
}
txsBuffSlice = append(txsBuffSlice, tx)
}
buffsToSend, errPack := txRes.dataPacker.PackDataInChunks(txsBuffSlice, maxBuffToSendBulkTransactions)
if errPack != nil {
return errPack
}
for _, buff := range buffsToSend {
errSend := txRes.Send(buff, pid, source)
if errSend != nil {
return errSend
}
}
if errFetch != nil {
errFetch = fmt.Errorf("resolveTxRequestByHashArray last error %w from %d encountered errors", errFetch, errorsFound)
}
return errFetch
}
// IsInterfaceNil returns true if there is no value under the interface
func (txRes *TxResolver) IsInterfaceNil() bool {
return txRes == nil
}