-
-
Notifications
You must be signed in to change notification settings - Fork 1
/
worker.go
82 lines (68 loc) · 1.52 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package gopool
import (
"context"
"sync"
)
// WorkFunc defines a piece of work to be processed
type WorkFunc func(ctx context.Context) error
// NewWorker returns a worker instance
func NewWorker(id string, work WorkFunc, ctx context.Context) *Worker {
w := new(Worker)
w.id = id
w.work = work
w.ctx = ctx
w.ctx = context.WithValue(w.ctx, "workerId", w.id)
return w
}
// Worker is the definition for a single worker
type Worker struct {
mu sync.Mutex
id string
work WorkFunc
ctx context.Context
err error
done chan struct{}
}
// ID returns the works unique ID
func (w *Worker) ID() string {
w.mu.Lock()
defer w.mu.Unlock()
return w.id
}
// Err returns an err if one occurred in the worker
func (w *Worker) Err() error {
w.mu.Lock()
defer w.mu.Unlock()
return w.err
}
// Context returns the workers context
func (w *Worker) Context() context.Context {
w.mu.Lock()
defer w.mu.Unlock()
return w.ctx
}
// Start initiates a go routine for the worker and returns the cancel context
func (w *Worker) Start() context.CancelFunc {
workerDoneChan := make(chan struct{})
w.mu.Lock()
w.done = workerDoneChan
w.mu.Unlock()
ctx, cancel := context.WithCancel(w.Context())
go func() {
w.mu.Lock()
workerWork := w.work
w.mu.Unlock()
err := workerWork(ctx)
w.mu.Lock()
w.err = err
w.mu.Unlock()
close(workerDoneChan)
}()
return cancel
}
// Done returns a channel you can use to pick up on when a worker has finished
func (w *Worker) Done() chan struct{} {
w.mu.Lock()
defer w.mu.Unlock()
return w.done
}