-
Notifications
You must be signed in to change notification settings - Fork 0
/
dgpool.go
166 lines (144 loc) · 3.54 KB
/
dgpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
package dgpool
import (
"context"
"errors"
"fmt"
"github.com/osgochina/donkeygo/container/dlist"
"github.com/osgochina/donkeygo/container/dtype"
"github.com/osgochina/donkeygo/internal/intlog"
)
// Pool Goroutine Pool
type Pool struct {
limit int
count *dtype.Int
list *dlist.List
closed *dtype.Bool
}
//默认创建一个协程池
var pool = New()
// New 创建协程池,limit限制最多能同时运行多少个工作协程
func New(limit ...int) *Pool {
p := &Pool{
limit: -1,
count: dtype.NewInt(),
list: dlist.New(true),
closed: dtype.NewBool(),
}
if len(limit) > 0 && limit[0] > 0 {
p.limit = limit[0]
}
return p
}
// Go 执行协程
func Go(fn func()) bool {
if err := pool.Add(fn); err != nil {
intlog.Errorf(context.TODO(), "%s", err.Error())
return false
}
return true
}
// Add 往默认协程池中添加jobs
func Add(f func()) error {
return pool.Add(f)
}
// AddWithRecover 在默认协程池中执行方法,并且执行完成后,如果出错,则调用recover方法
func AddWithRecover(userFunc func(), recoverFunc ...func(err error)) error {
return pool.AddWithRecover(userFunc, recoverFunc...)
}
// AddWithSyncFunc 在默认协程池中执行方法,执行完成后回调
func AddWithSyncFunc(useFunc func(), syncFunc func(bool)) error {
return pool.AddWithSyncFunc(useFunc, syncFunc)
}
// Size 默认协程池的大小
func Size() int {
return pool.Size()
}
// Jobs 默认协程池当前中有多少个任务需要执行
func Jobs() int {
return pool.Jobs()
}
// Add 添加待执行的方法到协程池
func (that *Pool) Add(f func()) error {
//如果协程池已关闭,则返回错误
for that.closed.Val() {
return errors.New("pool closed")
}
//并发安全的把方法指针传入双向链表
that.list.PushFront(f)
var n int
for {
//获取协程池中的协程协程总数
n = that.count.Val()
//如果协程池中的协程已经超出限制,则不执行任务,直接返回
if that.limit != -1 && n >= that.limit {
return nil
}
//协程池中的协程够用,则执行
if that.count.Cas(n, n+1) {
break
}
}
//启动一个协程执行任务
that.fork()
return nil
}
// AddWithRecover 添加任务,并在任务执行出错的情况下,回调recoverFunc
func (that *Pool) AddWithRecover(useFunc func(), recoverFunc ...func(err error)) error {
return that.Add(func() {
defer func() {
if err := recover(); err != nil {
if len(recoverFunc) > 0 && recoverFunc[0] == nil {
recoverFunc[0](errors.New(fmt.Sprintf("%v", err)))
}
}
}()
useFunc()
})
}
// AddWithSyncFunc 执行成功后回调方法
func (that *Pool) AddWithSyncFunc(useFunc func(), syncFunc func(bool)) error {
return that.Add(func() {
defer func() {
if err := recover(); err != nil {
syncFunc(false)
} else {
syncFunc(true)
}
}()
useFunc()
})
}
//开始执行一个协程
func (that *Pool) fork() {
go func() {
defer that.count.Add(-1)
var job interface{}
for !that.closed.Val() {
if job = that.list.PopBack(); job != nil {
job.(func())()
} else {
return
}
}
}()
}
// IsClosed 判断协程池是否关闭
func (that *Pool) IsClosed() bool {
return that.closed.Val()
}
// Close 关闭协程池
func (that *Pool) Close() {
that.closed.Set(true)
}
// Size 协程池中的协程数量
func (that *Pool) Size() int {
return that.count.Val()
}
// Jobs 协程池中的待执行任务数量
func (that *Pool) Jobs() int {
return that.list.Size()
}
// Cap 协程池最大能够启动多少个协程
func (that *Pool) Cap() int {
return that.limit
}