/
workpool.go
111 lines (94 loc) · 1.94 KB
/
workpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
package utils
import (
"fmt"
"os"
"runtime/debug"
"time"
"k8s.io/klog"
)
// WorkerPool workerPool interface
type WorkerPool interface {
Schedule(task func())
ScheduleAlways(task func())
ScheduleAuto(task func())
}
type workerPool struct {
work chan func()
sem chan struct{}
}
// NewWorkerPool build workpool object
func NewWorkerPool(size int) WorkerPool {
return &workerPool{
work: make(chan func()),
sem: make(chan struct{}, size),
}
}
func (p *workerPool) Schedule(task func()) {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.spawnWorker(task)
}
}
func (p *workerPool) ScheduleAlways(task func()) {
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.spawnWorker(task)
default:
klog.Infof("[syncpool] workerpool new goroutine")
GoWithRecover(func() {
task()
}, nil)
}
}
func (p *workerPool) ScheduleAuto(task func()) {
select {
case p.work <- task:
return
default:
}
select {
case p.work <- task:
case p.sem <- struct{}{}:
go p.spawnWorker(task)
default:
// klog.V().Infof("[syncpool] workerpool new goroutine")
GoWithRecover(func() {
task()
}, nil)
}
}
func (p *workerPool) spawnWorker(task func()) {
defer func() {
if r := recover(); r != nil {
klog.Warningf("syncpool panic %v\n%s", r, string(debug.Stack()))
}
<-p.sem
}()
for {
task()
task = <-p.work
}
}
// GoWithRecover go task with goroutine and recover
func GoWithRecover(handler func(), recoverHandler func(r interface{})) {
go func() {
defer func() {
if r := recover(); r != nil {
fmt.Fprintf(os.Stderr, "%s goroutine panic:%v\n%s\n", time.Now().Format("2006-01-02 15:04:05"), r, string(debug.Stack()))
if recoverHandler != nil {
go func() {
defer func() {
if p := recover(); p != nil {
fmt.Fprintf(os.Stderr, "recover goroutine panic:%v\n%s\n", p, string(debug.Stack()))
}
}()
recoverHandler(r)
}()
}
}
}()
handler()
}()
}