-
Notifications
You must be signed in to change notification settings - Fork 0
/
worker.go
83 lines (69 loc) · 1.48 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package go_workerpool
import (
"log"
)
// Job interface which will be used to create a new job
type Job interface {
Work() error
SetWorkerID(ID int)
}
// Worker is the structure for worker
type Worker struct {
id int
jobQueue chan Job
workerPool chan chan Job
quitChan chan bool
started bool
debug bool
verbose func(debug bool, msg string, args ...interface{})
}
// NewWorker return a new instance of worker
func NewWorker(id int, workerPool chan chan Job, debug bool) *Worker {
return &Worker{
id: id,
jobQueue: make(chan Job),
workerPool: workerPool,
quitChan: make(chan bool),
started: false,
debug: debug,
verbose: verbose,
}
}
// Start worker
func (w *Worker) Start() {
w.started = true
go func() {
for {
// register the current worker into the worker queue.
w.workerPool <- w.jobQueue
select {
case job := <-w.jobQueue:
job.SetWorkerID(w.ID())
if err := job.Work(); err != nil {
w.verbose(w.debug, "error running worker %d: %s\n", w.id, err.Error())
}
case <-w.quitChan:
w.verbose(w.debug, "worker %d stopping\n", w.id)
w.started = false
return
}
}
}()
}
// Stop worker
func (w *Worker) Stop() {
w.quitChan <- true
}
// ID return worker id
func (w *Worker) ID() int {
return w.id
}
// Started return worker status
func (w *Worker) Started() bool {
return w.started
}
func verbose(debug bool, msg string, args ...interface{}) {
if debug {
log.Printf(msg, args...)
}
}