-
Notifications
You must be signed in to change notification settings - Fork 2.1k
/
load.go
124 lines (107 loc) · 3.39 KB
/
load.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
package main
import (
"context"
"crypto/rand"
"errors"
"fmt"
"math"
"time"
"github.com/tendermint/tendermint/libs/log"
rpchttp "github.com/tendermint/tendermint/rpc/client/http"
e2e "github.com/tendermint/tendermint/test/e2e/pkg"
"github.com/tendermint/tendermint/types"
)
// Load generates transactions against the network until the given context is
// canceled. A multiplier of greater than one can be supplied if load needs to
// be generated beyond a minimum amount.
func Load(ctx context.Context, testnet *e2e.Testnet, multiplier int) error {
// Since transactions are executed across all nodes in the network, we need
// to reduce transaction load for larger networks to avoid using too much
// CPU. This gives high-throughput small networks and low-throughput large ones.
// This also limits the number of TCP connections, since each worker has
// a connection to all nodes.
concurrency := 64 / len(testnet.Nodes)
if concurrency == 0 {
concurrency = 1
}
initialTimeout := 1 * time.Minute
stallTimeout := 30 * time.Second
chTx := make(chan types.Tx)
chSuccess := make(chan types.Tx)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Spawn job generator and processors.
logger.Info("load", "msg", log.NewLazySprintf("Starting transaction load (%v workers)...", concurrency))
started := time.Now()
go loadGenerate(ctx, chTx, multiplier)
for w := 0; w < concurrency; w++ {
go loadProcess(ctx, testnet, chTx, chSuccess)
}
// Monitor successful transactions, and abort on stalls.
success := 0
timeout := initialTimeout
for {
select {
case <-chSuccess:
success++
timeout = stallTimeout
case <-time.After(timeout):
return fmt.Errorf("unable to submit transactions for %v", timeout)
case <-ctx.Done():
if success == 0 {
return errors.New("failed to submit any transactions")
}
logger.Info("load", "msg", log.NewLazySprintf("Ending transaction load after %v txs (%.1f tx/s)...",
success, float64(success)/time.Since(started).Seconds()))
return nil
}
}
}
// loadGenerate generates jobs until the context is canceled
func loadGenerate(ctx context.Context, chTx chan<- types.Tx, multiplier int) {
for i := 0; i < math.MaxInt64; i++ {
// We keep generating the same 1000 keys over and over, with different values.
// This gives a reasonable load without putting too much data in the app.
id := i % 1000
bz := make([]byte, 1024) // 1kb hex-encoded
_, err := rand.Read(bz)
if err != nil {
panic(fmt.Sprintf("Failed to read random bytes: %v", err))
}
tx := types.Tx(fmt.Sprintf("load-%X=%x", id, bz))
select {
case chTx <- tx:
time.Sleep(time.Second / time.Duration(multiplier))
case <-ctx.Done():
close(chTx)
return
}
}
}
// loadProcess processes transactions
func loadProcess(ctx context.Context, testnet *e2e.Testnet, chTx <-chan types.Tx, chSuccess chan<- types.Tx) {
// Each worker gets its own client to each node, which allows for some
// concurrency while still bounding it.
clients := map[string]*rpchttp.HTTP{}
var err error
for tx := range chTx {
node := testnet.RandomNode()
client, ok := clients[node.Name]
if !ok {
client, err = node.Client()
if err != nil {
continue
}
// check that the node is up
_, err = client.Health(ctx)
if err != nil {
continue
}
clients[node.Name] = client
}
if _, err = client.BroadcastTxSync(ctx, tx); err != nil {
continue
}
chSuccess <- tx
}
}