/
worklist_bulk_worker.go
executable file
·148 lines (126 loc) · 4.32 KB
/
worklist_bulk_worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
package manager
import (
"sync"
"time"
"github.com/joaosoft/logger"
)
// BulkWorkHandler ...
type BulkWorkHandler func([]*Work) error
// BulkWorkRecoverHandler ...
type BulkWorkRecoverHandler func(list IList) error
// BulkWorkRecoverWastedRetriesHandler ...
type BulkWorkRecoverWastedRetriesHandler func(id string, data interface{}) error
// Worker ...
type BulkWorker struct {
id int
name string
handler BulkWorkHandler
recoverHandler BulkWorkRecoverHandler
recoverWastedRetriesHandler BulkWorkRecoverWastedRetriesHandler
list IList
maxWorks int
maxRetries int
sleepTime time.Duration
quit chan bool
mux *sync.Mutex
logger logger.ILogger
started bool
}
// NewBulkWorker ...
func NewBulkWorker(id int, config *BulkWorkListConfig, handler BulkWorkHandler, list IList, bulkWorkRecoverHandler BulkWorkRecoverHandler, bulkWorkRecoverOneHandler BulkWorkRecoverWastedRetriesHandler, logger logger.ILogger) *BulkWorker {
bulkWorker := &BulkWorker{
id: id,
name: config.Name,
maxWorks: config.MaxWorks,
maxRetries: config.MaxRetries,
sleepTime: config.SleepTime,
handler: handler,
recoverHandler: bulkWorkRecoverHandler,
recoverWastedRetriesHandler: bulkWorkRecoverOneHandler,
list: list,
quit: make(chan bool),
mux: &sync.Mutex{},
logger: logger,
}
return bulkWorker
}
// Start ...
func (bulkWorker *BulkWorker) Start() error {
go func() error {
for {
select {
case <-bulkWorker.quit:
logger.Debugf("worker quited [name: %s, list size: %d ]", bulkWorker.name, bulkWorker.list.Size())
return nil
default:
if bulkWorker.list.Size() > 0 {
logger.Debugf("worker starting [ name: %d, queue size: %d]", bulkWorker.name, bulkWorker.list.Size())
bulkWorker.execute()
logger.Debugf("worker finished [ name: %s, queue size: %d]", bulkWorker.name, bulkWorker.list.Size())
} else {
logger.Debugf("worker waiting for work to do... [ id: %d, name: %s ]", bulkWorker.id, bulkWorker.name)
<-time.After(bulkWorker.sleepTime)
}
}
}
}()
bulkWorker.started = true
return nil
}
// Stop ...
func (bulkWorker *BulkWorker) Stop() error {
bulkWorker.mux.Lock()
defer bulkWorker.mux.Unlock()
if bulkWorker.list.Size() > 0 {
logger.Infof("stopping worker with tasks in the list [ list size: %d ]", bulkWorker.list.Size())
}
bulkWorker.quit <- true
bulkWorker.started = false
return nil
}
// AddWork ...
func (bulkWorker *BulkWorker) AddWork(id string, data interface{}) error {
work := NewWork(id, data, bulkWorker.logger)
return bulkWorker.list.Add(id, work)
}
func (bulkWorker *BulkWorker) execute() error {
defer func() {
if bulkWorker.recoverHandler != nil {
if r := recover(); r != nil {
logger.Debug("recovering worker data")
if err := bulkWorker.recoverHandler(bulkWorker.list); err != nil {
logger.Errorf("error processing recovering of worker. [ error: %s ]", err)
}
}
}
}()
var works []*Work
for i := 0; i < bulkWorker.maxWorks; i++ {
if tmp := bulkWorker.list.Remove(); tmp != nil {
if tmp == nil {
break
}
works = append(works, tmp.(*Work))
}
}
if err := bulkWorker.handler(works); err != nil {
for _, work := range works {
if work.retries < bulkWorker.maxRetries {
work.retries++
if err := bulkWorker.list.Add(work.Id, work); err != nil {
logger.Errorf("error processing the work. re-adding the work to the list [retries: %d, error: %s ]", work.retries, err)
}
logger.Errorf("work requeued of the queue [ retries: %d, error: %s ]", work.retries, err).ToError()
} else {
if bulkWorker.recoverWastedRetriesHandler != nil {
if err := bulkWorker.recoverWastedRetriesHandler(work.Id, work.Data); err != nil {
logger.Errorf("error processing recovering one of worker. [ error: %s ]", err).ToError()
}
}
logger.Errorf("work discarded of the queue [ retries: %d, error: %s ]", work.retries, err)
}
}
return nil
}
return nil
}