-
Notifications
You must be signed in to change notification settings - Fork 199
/
txsSender.go
240 lines (205 loc) · 6.65 KB
/
txsSender.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
package txsSender
import (
"context"
"sync/atomic"
"time"
"github.com/multiversx/mx-chain-core-go/core"
"github.com/multiversx/mx-chain-core-go/core/accumulator"
"github.com/multiversx/mx-chain-core-go/core/check"
"github.com/multiversx/mx-chain-core-go/data/transaction"
"github.com/multiversx/mx-chain-core-go/marshal"
"github.com/multiversx/mx-chain-go/common"
"github.com/multiversx/mx-chain-go/config"
"github.com/multiversx/mx-chain-go/dataRetriever"
"github.com/multiversx/mx-chain-go/process"
"github.com/multiversx/mx-chain-go/process/factory"
"github.com/multiversx/mx-chain-go/storage"
logger "github.com/multiversx/mx-chain-logger-go"
)
var log = logger.GetOrCreate("txsSender")
var numSecondsBetweenPrints = 20
// SendTransactionsPipe is the pipe used for sending new transactions
const SendTransactionsPipe = "send transactions pipe"
// ArgsTxsSenderWithAccumulator is a holder struct for all necessary arguments to create a NewTxsSenderWithAccumulator
type ArgsTxsSenderWithAccumulator struct {
Marshaller marshal.Marshalizer
ShardCoordinator storage.ShardCoordinator
NetworkMessenger NetworkMessenger
AccumulatorConfig config.TxAccumulatorConfig
DataPacker process.DataPacker
}
type txsSender struct {
marshaller marshal.Marshalizer
shardCoordinator storage.ShardCoordinator
networkMessenger NetworkMessenger
ctx context.Context
cancelFunc context.CancelFunc
txAccumulator core.Accumulator
dataPacker process.DataPacker
txSentCounter uint32
}
// NewTxsSenderWithAccumulator creates a new instance of TxsSenderHandler, which initializes internally an accumulator.NewTimeAccumulator
func NewTxsSenderWithAccumulator(args ArgsTxsSenderWithAccumulator) (*txsSender, error) {
if check.IfNil(args.Marshaller) {
return nil, process.ErrNilMarshalizer
}
if check.IfNil(args.ShardCoordinator) {
return nil, process.ErrNilShardCoordinator
}
if check.IfNil(args.NetworkMessenger) {
return nil, process.ErrNilMessenger
}
if check.IfNil(args.DataPacker) {
return nil, dataRetriever.ErrNilDataPacker
}
txAccumulator, err := accumulator.NewTimeAccumulator(
time.Duration(args.AccumulatorConfig.MaxAllowedTimeInMilliseconds)*time.Millisecond,
time.Duration(args.AccumulatorConfig.MaxDeviationTimeInMilliseconds)*time.Millisecond,
log,
)
if err != nil {
return nil, err
}
ctx, cancelFunc := context.WithCancel(context.Background())
ret := &txsSender{
marshaller: args.Marshaller,
shardCoordinator: args.ShardCoordinator,
networkMessenger: args.NetworkMessenger,
dataPacker: args.DataPacker,
ctx: ctx,
cancelFunc: cancelFunc,
txAccumulator: txAccumulator,
txSentCounter: 0,
}
go ret.sendFromTxAccumulator(ret.ctx)
go ret.printTxSentCounter(ret.ctx)
return ret, nil
}
// SendBulkTransactions sends the provided transactions as a bulk, optimizing transfer between nodes
func (ts *txsSender) SendBulkTransactions(txs []*transaction.Transaction) (uint64, error) {
if len(txs) == 0 {
return 0, process.ErrNoTxToProcess
}
ts.addTransactionsToSendPipe(txs)
return uint64(len(txs)), nil
}
func (ts *txsSender) addTransactionsToSendPipe(txs []*transaction.Transaction) {
for _, tx := range txs {
ts.txAccumulator.AddData(tx)
}
}
func (ts *txsSender) sendFromTxAccumulator(ctx context.Context) {
outputChannel := ts.txAccumulator.OutputChannel()
for {
select {
case objs := <-outputChannel:
{
ts.sendTxObjsFromChannel(objs)
}
case <-ctx.Done():
return
}
}
}
func (ts *txsSender) sendTxObjsFromChannel(objs []interface{}) {
if len(objs) == 0 {
return
}
txs := make([]*transaction.Transaction, 0, len(objs))
for _, obj := range objs {
tx, ok := obj.(*transaction.Transaction)
if !ok {
continue
}
txs = append(txs, tx)
}
atomic.AddUint32(&ts.txSentCounter, uint32(len(txs)))
ts.sendBulkTransactions(txs)
}
func (ts *txsSender) sendBulkTransactions(txs []*transaction.Transaction) {
transactionsByShards := make(map[uint32][][]byte)
log.Trace("txsSender.sendBulkTransactions sending txs",
"num", len(txs),
)
for _, tx := range txs {
marshalledTx, err := ts.marshaller.Marshal(tx)
if err != nil {
log.Warn("txsSender.sendBulkTransactions",
"marshaller error", err,
)
continue
}
senderShardId := ts.shardCoordinator.ComputeId(tx.SndAddr)
transactionsByShards[senderShardId] = append(transactionsByShards[senderShardId], marshalledTx)
}
for shardId, txsForShard := range transactionsByShards {
err := ts.sendBulkTransactionsFromShard(txsForShard, shardId)
log.LogIfError(err)
}
}
func (ts *txsSender) sendBulkTransactionsFromShard(transactions [][]byte, senderShardId uint32) error {
// the topic identifier is made of the current shard id and sender's shard id
identifier := factory.TransactionTopic + ts.shardCoordinator.CommunicationIdentifier(senderShardId)
packets, err := ts.dataPacker.PackDataInChunks(transactions, common.MaxBulkTransactionSize)
if err != nil {
return err
}
for _, buff := range packets {
go func(bufferToSend []byte) {
log.Trace("txsSender.sendBulkTransactionsFromShard",
"topic", identifier,
"size", len(bufferToSend),
)
err = ts.networkMessenger.BroadcastOnChannelBlocking(
SendTransactionsPipe,
identifier,
bufferToSend,
)
log.LogIfError(err)
}(buff)
}
return nil
}
// printTxSentCounter prints the peak transaction counter from a time frame of about 'numSecondsBetweenPrints' seconds
// if this peak value is 0 (no transaction was sent through the REST API interface), the print will not be done
// the peak counter resets after each print. There is also a total number of transactions sent to p2p
// TODO make this function testable. Refactor if necessary.
func (ts *txsSender) printTxSentCounter(ctx context.Context) {
maxTxCounter := uint32(0)
totalTxCounter := uint64(0)
counterSeconds := 0
for {
select {
case <-time.After(time.Second):
txSent := atomic.SwapUint32(&ts.txSentCounter, 0)
if txSent > maxTxCounter {
maxTxCounter = txSent
}
totalTxCounter += uint64(txSent)
counterSeconds++
if counterSeconds > numSecondsBetweenPrints {
counterSeconds = 0
if maxTxCounter > 0 {
log.Info("sent transactions on network",
"max/sec", maxTxCounter,
"total", totalTxCounter,
)
}
maxTxCounter = 0
}
case <-ctx.Done():
return
}
}
}
// IsInterfaceNil checks if the underlying pointer is nil
func (ts *txsSender) IsInterfaceNil() bool {
return ts == nil
}
// Close calls the cancel function of the background context and closes the network messenger
func (ts *txsSender) Close() error {
ts.cancelFunc()
err := ts.txAccumulator.Close()
log.LogIfError(err)
return ts.networkMessenger.Close()
}