-
Notifications
You must be signed in to change notification settings - Fork 6
/
benchmark.go
136 lines (118 loc) · 2.73 KB
/
benchmark.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package benchmark
import (
"context"
"fmt"
"log/slog"
"os"
"os/signal"
"sync"
"syscall"
"time"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
)
type Benchmark struct {
config
loaders []*Loader
metrics Metrics
stopC chan struct{}
}
// New creates a new Benchmark and returns it. Users must call Close to release resources if it returns successfully.
func New(opts ...Option) (bm *Benchmark, err error) {
cfg, err := newConfig(opts)
if err != nil {
return nil, err
}
logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
slog.SetDefault(logger)
bm = &Benchmark{
config: cfg,
stopC: make(chan struct{}),
}
defer func() {
if err != nil {
_ = bm.Close()
}
}()
for idx := range bm.targets {
var loader *Loader
target := bm.targets[idx]
loaderMetrics := &LoaderMetrics{tgt: target}
loader, err = NewLoader(loaderConfig{
Target: target,
cid: bm.cid,
mraddrs: bm.mraddrs,
metrics: loaderMetrics,
singleConnPerTarget: bm.singleConnPerTarget,
stopC: bm.stopC,
})
if err != nil {
return bm, err
}
bm.loaders = append(bm.loaders, loader)
bm.metrics.loaderMetrics = append(bm.metrics.loaderMetrics, loaderMetrics)
}
return bm, nil
}
// Run starts Loaders and metric reporter. It blocks until the loaders are finished.
func (bm *Benchmark) Run() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
g, ctx := errgroup.WithContext(ctx)
benchmarkTimer := time.NewTimer(bm.duration)
sigC := make(chan os.Signal, 1)
signal.Notify(sigC, os.Interrupt, syscall.SIGTERM)
reportTick := time.NewTicker(bm.reportInterval)
defer reportTick.Stop()
var finished bool
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer func() {
close(bm.stopC)
wg.Done()
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
cancel()
}()
for {
select {
case <-benchmarkTimer.C:
finished = true
slog.Debug("benchmark is finished")
return
case sig := <-sigC:
finished = true
slog.Debug("caught signal", slog.String("signal", sig.String()))
return
case <-ctx.Done():
slog.Debug("loader failed")
return
case <-reportTick.C:
fmt.Println(MustEncode(bm.reportEncoder, bm.metrics.Flush()))
}
}
}()
for _, tw := range bm.loaders {
tw := tw
g.Go(func() error {
return tw.Run(ctx)
})
}
err := g.Wait()
wg.Wait()
slog.Debug("stopped benchmark")
if finished {
return nil
}
return err
}
// Close releases resources acquired by Benchmark and Loader.
func (bm *Benchmark) Close() error {
var err error
for _, loader := range bm.loaders {
err = multierr.Append(err, loader.Close())
}
return err
}