forked from cloudfoundry/gorouter
/
workpool.go
147 lines (121 loc) · 2.21 KB
/
workpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
package workpool
import (
"fmt"
"sync"
"sync/atomic"
"time"
)
const waitTimeout = 5 * time.Second
type WorkPool struct {
workQueue chan func()
stopping chan struct{}
stopped int32
mutex sync.Mutex
maxWorkers int
numWorkers int
idleWorkers int
}
func NewWorkPool(maxWorkers int) (*WorkPool, error) {
if maxWorkers < 1 {
return nil, fmt.Errorf("must provide positive maxWorkers; provided %d", maxWorkers)
}
return newWorkPoolWithPending(maxWorkers, 0), nil
}
func newWorkPoolWithPending(maxWorkers, pending int) *WorkPool {
return &WorkPool{
workQueue: make(chan func(), maxWorkers+pending),
stopping: make(chan struct{}),
maxWorkers: maxWorkers,
}
}
func (w *WorkPool) Submit(work func()) {
if atomic.LoadInt32(&w.stopped) == 1 {
return
}
select {
case w.workQueue <- work:
if atomic.LoadInt32(&w.stopped) == 1 {
w.drain()
} else {
w.addWorker()
}
case <-w.stopping:
}
}
func (w *WorkPool) Stop() {
if atomic.CompareAndSwapInt32(&w.stopped, 0, 1) {
close(w.stopping)
w.drain()
}
}
func (w *WorkPool) addWorker() bool {
w.mutex.Lock()
defer w.mutex.Unlock()
if w.idleWorkers > 0 || w.numWorkers == w.maxWorkers {
return false
}
w.numWorkers++
go worker(w)
return true
}
func (w *WorkPool) workerStopping(force bool) bool {
w.mutex.Lock()
if !force {
if len(w.workQueue) < w.numWorkers {
w.mutex.Unlock()
return false
}
}
w.numWorkers--
w.mutex.Unlock()
return true
}
func (w *WorkPool) drain() {
for {
select {
case <-w.workQueue:
default:
return
}
}
}
func worker(w *WorkPool) {
timer := time.NewTimer(waitTimeout)
defer timer.Stop()
for {
if atomic.LoadInt32(&w.stopped) == 1 {
w.workerStopping(true)
return
}
select {
case <-timer.C:
if w.workerStopping(false) {
return
}
timer.Reset(waitTimeout)
case <-w.stopping:
w.workerStopping(true)
return
case work := <-w.workQueue:
timer.Stop()
w.mutex.Lock()
w.idleWorkers--
w.mutex.Unlock()
NOWORK:
for {
work()
select {
case work = <-w.workQueue:
case <-w.stopping:
break NOWORK
default:
break NOWORK
}
}
w.mutex.Lock()
w.idleWorkers++
w.mutex.Unlock()
timer.Reset(waitTimeout)
}
}
}