forked from revel/modules
/
workers.go
281 lines (248 loc) · 7.43 KB
/
workers.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
package gorp
import (
"errors"
"fmt"
"runtime"
"sync"
"time"
)
// The worker container.
type DbWorkerContainer struct {
SharedWorker
mutex sync.Mutex
Workers []*DbWorker
NumWorkers int
LongWorkTimeout int64
StartWorkTimeout int64
Db *DbGorp
}
// The timeoutInfo for monitoring long running processes.
type timeoutInfo struct {
worker *DbWorker
started time.Time
state WorkerPhase
}
type DbWorker struct {
Id int
Db *DbGorp
SharedWorker
WorkUnit int
SharedData map[string]interface{}
TimeInfo *timeoutInfo
TimeoutChannel chan *timeoutInfo
}
type SharedWorker struct {
workInfo DbWorkInfo
InputChannel chan interface{}
OutputChannel chan interface{}
ControlChannel chan func() (WorkerPhase, *DbWorker)
}
type DbWorkInfo interface {
Status(phase WorkerPhase, worker *DbWorker)
Work(value interface{}, worker *DbWorker)
}
type DbCallbackImplied struct {
StatusFn func(phase WorkerPhase, worker *DbWorker)
WorkFn func(value interface{}, worker *DbWorker)
}
type WorkerPhase int
const (
Start WorkerPhase = iota
Stop
StartJob
EndJob
JobLongrunning
)
// Creates a container to run the group of workers (up to a max of maxNumWorkers), does not return to all workers are completed)
// If returnResults is true then the task MUST write to the DbWorker.OutputChannel once for every task.
func WorkParallel(db *DbGorp, tasks []func(worker *DbWorker), returnResults bool, maxNumWorkers int, timeouts int) (results []interface{}, err error) {
if maxNumWorkers == 0 {
maxNumWorkers = len(tasks)
}
// Create a container with no status callback
container := NewDbWorker(db,
MakeCallback(nil,
func(value interface{}, worker *DbWorker) {
task := value.(func(worker *DbWorker))
task(worker)
}), maxNumWorkers)
err = container.Start()
if err != nil {
return
}
for _, task := range tasks {
container.InputChannel <- task
}
if returnResults {
for range tasks {
result := <-container.OutputChannel
results = append(results, result)
}
}
container.Close(timeouts)
return
}
// This creates a DbWorkerContainer with the number of working threads already started.
// Each working thread has their own database instance running.
func NewDbWorker(db *DbGorp, workInfo DbWorkInfo, numWorkers int) (container *DbWorkerContainer) {
container = &DbWorkerContainer{
SharedWorker: SharedWorker{
InputChannel: make(chan interface{}, numWorkers),
OutputChannel: make(chan interface{}, numWorkers),
ControlChannel: make(chan func() (WorkerPhase, *DbWorker), numWorkers),
workInfo: workInfo,
},
NumWorkers: numWorkers,
Db: db,
StartWorkTimeout: 0,
LongWorkTimeout: 0,
}
return
}
func (container *DbWorkerContainer) Start() (err error) {
for x := 0; x < container.NumWorkers; x++ {
go startWorker(container, container.Db, x)
}
// Make sure all containers are running before returning
for x := 0; x < container.NumWorkers; x++ {
if container.StartWorkTimeout > 0 {
select {
case result := <-container.ControlChannel:
state, source := result()
if state != Start {
container.Close(5)
err = fmt.Errorf("failed to start workers %v", source)
return
}
case <-time.After(time.Second * time.Duration(container.StartWorkTimeout)):
container.Close(5)
err = errors.New("failed to start worker timeout")
return
}
} else {
result := <-container.ControlChannel
state, source := result()
if state != Start {
container.Close(5)
err = fmt.Errorf("failed to start workers %v", source)
return
}
}
}
return
}
func (container *DbWorkerContainer) Close(timeouts int) (totalWork int, err error) {
close(container.InputChannel)
for x := 0; x < len(container.Workers); x++ {
// Allow close to continue even if a worker does not respond
if timeouts > 0 {
select {
case result := <-container.ControlChannel:
_, worker := result()
totalWork += worker.WorkUnit
case <-time.After(time.Second * time.Duration(timeouts)):
}
} else {
_, worker := (<-container.ControlChannel)()
totalWork += worker.WorkUnit
}
}
close(container.OutputChannel)
return
}
// Called by using "go" to invoke, creates a DBWorker, and starts a watchdog channel.
func startWorker(container *DbWorkerContainer, db *DbGorp, id int) {
newDb, _ := db.CloneDb(true)
worker := &DbWorker{
Db: newDb,
Id: id,
SharedData: map[string]interface{}{},
SharedWorker: SharedWorker{
workInfo: container.workInfo,
InputChannel: container.InputChannel,
OutputChannel: container.OutputChannel,
ControlChannel: container.ControlChannel,
},
}
// Close the database after worker has ended (Start returned
defer worker.Db.Close()
container.mutex.Lock()
container.Workers = append(container.Workers, worker)
container.mutex.Unlock()
// Only monitor jobs if Status function defined and a timeout is also defined
if container.LongWorkTimeout > 0 {
worker.TimeoutChannel = make(chan *timeoutInfo)
go worker.TimeInfo.start(worker.TimeoutChannel, container.LongWorkTimeout)
}
worker.start()
}
// Starts the worker, continues running until inputchannel is closed.
func (worker *DbWorker) start() {
worker.workInfo.Status(Start, worker)
worker.ControlChannel <- func() (WorkerPhase, *DbWorker) { return Start, worker }
for job := range worker.InputChannel {
worker.invoke(job)
}
worker.workInfo.Status(Stop, worker)
worker.ControlChannel <- func() (WorkerPhase, *DbWorker) { return Stop, worker }
if worker.TimeoutChannel != nil {
close(worker.TimeoutChannel)
}
}
// Wrapper to prevent panics from disturbing the channel.
func (worker *DbWorker) invoke(job interface{}) {
defer func() {
if err := recover(); err != nil {
trace := make([]byte, 1024)
count := runtime.Stack(trace, true)
moduleLogger.Error("Recover from panic: ", "error", err)
moduleLogger.Error("Stack", "size", count, "trace", string(trace))
}
}()
// Setup the timeout information
if worker.TimeoutChannel != nil {
worker.TimeInfo = &timeoutInfo{worker: worker, started: time.Now(), state: StartJob}
worker.TimeoutChannel <- worker.TimeInfo
}
worker.workInfo.Work(job, worker)
if worker.TimeoutChannel != nil {
worker.TimeInfo.state = EndJob
worker.TimeoutChannel <- worker.TimeInfo
}
}
// A function to return an object that is a valid DbCallback.
func MakeCallback(status func(phase WorkerPhase, worker *DbWorker), work func(value interface{}, worker *DbWorker)) DbWorkInfo {
return &DbCallbackImplied{StatusFn: status, WorkFn: work}
}
// Call the status function if available.
func (dbCallback *DbCallbackImplied) Status(phase WorkerPhase, worker *DbWorker) {
if dbCallback.StatusFn != nil {
dbCallback.StatusFn(phase, worker)
}
}
// Calls the work function.
func (dbCallback *DbCallbackImplied) Work(value interface{}, worker *DbWorker) {
dbCallback.WorkFn(value, worker)
}
// Starts the timeout worker.
func (*timeoutInfo) start(timeoutChannel chan *timeoutInfo, timeout int64) {
for j := range timeoutChannel {
j.started = time.Now()
j.state = StartJob
j.worker.workInfo.Status(j.state, j.worker)
for {
select {
case complete, ok := <-timeoutChannel:
if !ok {
// Channel closed returning...
return
}
// Received new State, record and loop
complete.worker.workInfo.Status(complete.state, complete.worker)
break
case <-time.After(time.Second * time.Duration(timeout)):
j.worker.workInfo.Status(JobLongrunning, j.worker)
}
}
}
}