/
pool.go
111 lines (92 loc) · 1.51 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package reactor
import (
"sync"
"runtime"
"fmt"
)
type OnDone func ()
type Task interface {
Do()
}
type Pool interface {
Start()
Stop()
PostTask(task Task)
}
type worker struct {
taskCh chan Task
quitCh chan *sync.WaitGroup
}
type pool struct {
taskCh chan Task
workers []*worker
lock sync.Mutex
running bool
}
func newWorker(taskCh chan Task) *worker {
return &worker{
taskCh: taskCh,
quitCh: make(chan *sync.WaitGroup),
}
}
func (this *worker) start() {
go func() {
for {
select {
case task := <- this.taskCh:
task.Do()
case wg := <- this.quitCh:
wg.Done()
break
}
}
}()
}
func (this *worker) stop(wg *sync.WaitGroup) {
this.quitCh <- wg
}
func NewPool(nWorkers int) Pool {
if nWorkers == 0 {
nWorkers = runtime.NumCPU() / 2
if nWorkers == 0 {
nWorkers = 1
}
}
fmt.Println("nWorkers:", nWorkers)
ret := &pool{
taskCh: make(chan Task),
workers: make([]*worker, nWorkers),
}
for i := 0; i < nWorkers; i++ {
ret.workers[i] = newWorker(ret.taskCh)
}
return ret
}
func (this *pool) Start() {
this.lock.Lock()
defer this.lock.Unlock()
if this.running {
return
}
this.running = true
for _, worker := range this.workers {
worker.start()
}
}
func (this *pool) Stop() {
this.lock.Lock()
defer this.lock.Unlock()
if !this.running {
return
}
wg := sync.WaitGroup{}
wg.Add(len(this.workers))
for _, worker := range this.workers {
worker.stop(&wg)
}
wg.Wait()
this.running = false
}
func (this *pool) PostTask(task Task) {
this.taskCh <- task
}