-
Notifications
You must be signed in to change notification settings - Fork 348
/
pool.go
123 lines (105 loc) · 2.44 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package stress
import (
"fmt"
"os"
"os/signal"
"sync"
"time"
)
type Result struct {
Error error
Took time.Duration
}
type WorkFn func(input chan string, output chan Result)
type GeneratorAddFn func(string)
type GenerateFn func(add GeneratorAddFn)
type WorkerPool struct {
parallelism int
Input chan string
Output chan Result
wg sync.WaitGroup
done chan struct{}
}
func NewWorkerPool(parallelism int) *WorkerPool {
return &WorkerPool{
parallelism: parallelism,
Input: make(chan string),
Output: make(chan Result),
done: make(chan struct{}),
}
}
func (p *WorkerPool) Start(workFn WorkFn) {
// spawn workers
p.wg.Add(p.parallelism)
for i := 0; i < p.parallelism; i++ {
go func() {
defer p.wg.Done()
workFn(p.Input, p.Output) // call the worker we were given
}()
}
go func() {
p.wg.Wait()
p.done <- struct{}{}
}()
}
func (p *WorkerPool) Done() chan struct{} {
return p.done
}
// Generator sets up a pool and a result collector
type Generator struct {
pool *WorkerPool
collector *ResultCollector
handleSignals []os.Signal
}
type GeneratorOption func(*Generator)
func WithSignalHandlersFor(sigs ...os.Signal) GeneratorOption {
return func(generator *Generator) {
generator.handleSignals = sigs
}
}
func NewGenerator(parallelism int, opts ...GeneratorOption) *Generator {
pool := NewWorkerPool(parallelism)
collector := NewResultCollector(pool.Output)
g := &Generator{
pool: pool,
collector: collector,
handleSignals: []os.Signal{},
}
for _, opt := range opts {
opt(g)
}
return g
}
func (g *Generator) addResult(s string) {
g.pool.Input <- s
}
func (g *Generator) Setup(fn GenerateFn) {
go func() {
fn(g.addResult)
close(g.pool.Input)
}()
}
// Run will start the worker goroutines and print out their
// progress every second. Upon completion (or on a SIGTERM), will also print a latency histogram
func (g *Generator) Run(fn WorkFn) {
go g.collector.Collect()
g.pool.Start(fn)
termSignal := make(chan os.Signal, 1)
if len(g.handleSignals) > 0 {
signal.Notify(termSignal, g.handleSignals...)
}
collecting := true
ticker := time.NewTicker(time.Second)
for collecting {
select {
case <-ticker.C:
fmt.Printf("%s\n", g.collector.Stats())
case <-g.pool.Done():
collecting = false
case <-termSignal:
collecting = false
}
}
fmt.Printf("%s\n\n", g.collector.Stats())
fmt.Printf("Historgram (ms):\n%s\n", g.collector.Histogram())
}