-
Notifications
You must be signed in to change notification settings - Fork 0
/
workerpool.go
81 lines (69 loc) · 1.84 KB
/
workerpool.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
package schnittstelle
import (
"sync"
)
// Job defines a job function which will be executed
// in a worker getting passed the worker ID and
// parameters specified when pushing the job.
type Job func(workerId int, params ...interface{}) interface{}
// WorkerPool provides a simple "thread" pool
// implementation based on goroutines.
type WorkerPool struct {
jobs chan jobWrapper
results chan interface{}
wg sync.WaitGroup
}
// jobWrapper wraps a Job and its specified
// parameters.
type jobWrapper struct {
job Job
params []interface{}
}
// NewWorkerpool creates a new instance of WorkerPool and
// spawns the defined number (size) of workers
// available waiting for jobs.
func NewWorkerpool(size int) *WorkerPool {
w := &WorkerPool{
jobs: make(chan jobWrapper),
results: make(chan interface{}),
}
for i := 0; i < size; i++ {
go w.spawnWorker(i)
}
return w
}
// Push enqueues a job with specified parameters which
// will be passed on executing the job.
func (w *WorkerPool) Push(job Job, params ...interface{}) {
w.jobs <- jobWrapper{
job: job,
params: params,
}
}
// Close closes the Jobs channel so that the workers
// stop after executing all enqueued jobs.
// This is nessecary to be executed before WaitBlocking
// is called.
func (w *WorkerPool) Close() {
close(w.jobs)
}
// Results returns the read-only results channel where
// executed job results are pushed in.
func (w *WorkerPool) Results() <-chan interface{} {
return w.results
}
// WaitBlocking blocks until all jobs are finished.
func (w *WorkerPool) WaitBlocking() {
w.wg.Wait()
}
// spawnWorker spawns a new worker with the passed
// worker id and starts listening for incomming jobs.
func (w *WorkerPool) spawnWorker(id int) {
for job := range w.jobs {
if job.job != nil {
w.wg.Add(1)
w.results <- job.job(id, job.params...)
w.wg.Done()
}
}
}