-
Notifications
You must be signed in to change notification settings - Fork 153
/
dispatcher.go
129 lines (114 loc) · 2.74 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
package execute
import (
"context"
"fmt"
"runtime/debug"
"sync"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Dispatcher schedules work for a query.
// Each transformation submits work to be done to the dispatcher.
// Then the dispatcher schedules to work based on the available resources.
type Dispatcher interface {
// Schedule fn to be executed
Schedule(fn ScheduleFunc)
}
// ScheduleFunc is a function that represents work to do.
// The throughput is the maximum number of messages to process for this scheduling.
type ScheduleFunc func(throughput int)
// poolDispatcher implements Dispatcher using a pool of goroutines.
type poolDispatcher struct {
work chan ScheduleFunc
throughput int
mu sync.Mutex
closed bool
closing chan struct{}
wg sync.WaitGroup
err error
errC chan error
logger *zap.Logger
}
func newPoolDispatcher(throughput int, logger *zap.Logger) *poolDispatcher {
return &poolDispatcher{
throughput: throughput,
work: make(chan ScheduleFunc, 100),
closing: make(chan struct{}),
errC: make(chan error, 1),
logger: logger.With(zap.String("component", "dispatcher")),
}
}
func (d *poolDispatcher) Schedule(fn ScheduleFunc) {
select {
case d.work <- fn:
case <-d.closing:
}
}
func (d *poolDispatcher) Start(n int, ctx context.Context) {
d.wg.Add(n)
for i := 0; i < n; i++ {
go func() {
defer d.wg.Done()
// Setup panic handling on the worker goroutines
defer func() {
if e := recover(); e != nil {
var err error
switch e := e.(type) {
case error:
err = e
default:
err = fmt.Errorf("%v", e)
}
d.setErr(fmt.Errorf("panic: %v\n%s", err, debug.Stack()))
if entry := d.logger.Check(zapcore.InfoLevel, "Dispatcher panic"); entry != nil {
entry.Stack = string(debug.Stack())
entry.Write(zap.Error(err))
}
}
}()
d.run(ctx)
}()
}
}
// Err returns a channel with will produce an error if encountered.
func (d *poolDispatcher) Err() <-chan error {
d.mu.Lock()
defer d.mu.Unlock()
return d.errC
}
func (d *poolDispatcher) setErr(err error) {
d.mu.Lock()
defer d.mu.Unlock()
// TODO(nathanielc): Collect all error information.
if d.err == nil {
d.err = err
d.errC <- err
}
}
//Stop the dispatcher.
func (d *poolDispatcher) Stop() error {
d.mu.Lock()
defer d.mu.Unlock()
if d.closed {
return d.err
}
d.closed = true
close(d.closing)
d.wg.Wait()
return d.err
}
// run is the logic executed by each worker goroutine in the pool.
func (d *poolDispatcher) run(ctx context.Context) {
for {
select {
case <-ctx.Done():
// Immediately return, do not process any more work
return
case <-d.closing:
// We are done, nothing left to do.
return
case fn := <-d.work:
fn(d.throughput)
}
}
}