/
pool.go
109 lines (84 loc) · 2.14 KB
/
pool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// Copyright 2020 Jayden Lie. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package main
import (
"container/list"
"sync"
"sync/atomic"
)
type JobFunc func(args ...interface{}) interface{}
type Job struct {
id int64
fun JobFunc
args []interface{}
pipe chan interface{}
}
type GoPool struct {
LastID int64
PoolSize int
Cond *sync.Cond
Queue *list.List
JobPool *sync.Pool
}
// Coroutine pool worker process function
// @param pool: coroutine pool object
func routine(pool *GoPool) {
for {
pool.Cond.L.Lock()
// First: Get job from queue front
checkAgain:
elem := pool.Queue.Front()
if elem == nil {
pool.Cond.Wait()
goto checkAgain
}
job := elem.Value.(*Job)
// Second: Remove job from queue
pool.Queue.Remove(elem)
pool.Cond.L.Unlock()
job.pipe <- job.fun(job.args...) // Third: Call job process function and return value
pool.JobPool.Put(job)
}
}
// Create new coroutine pool object
// @param size: how many worker coroutine would be create
func NewGoPool(size int) *GoPool {
lock := &sync.Mutex{}
pool := &GoPool{
LastID: 0,
PoolSize: size,
Cond: sync.NewCond(lock),
Queue: list.New(),
JobPool: &sync.Pool{
New: func() interface{} {
return &Job{}
},
},
}
pool.Cond.L.Lock() // First: stop all worker coroutine
for i := 0; i < size; i++ {
go routine(pool)
}
pool.Cond.L.Unlock() // Second: start all worker coroutine
return pool
}
// Send a job to coroutine pool and wait for process
// @param handler: job process function handler
// @param param: job process function parameter
// @return: chan interface{}
func (pool *GoPool) Do(fun JobFunc, args ...interface{}) <-chan interface{} {
job := pool.JobPool.Get().(*Job)
job.Init(atomic.AddInt64(&pool.LastID, 1), fun, args)
pool.Cond.L.Lock()
pool.Queue.PushBack(job) // First: push job to queue
pool.Cond.Signal() // Second: signal worker routine
pool.Cond.L.Unlock()
return job.pipe
}
func (j *Job) Init(id int64, fun JobFunc, args []interface{}) {
j.id = id
j.fun = fun
j.args = args
j.pipe = make(chan interface{}, 1)
}