-
Notifications
You must be signed in to change notification settings - Fork 0
/
manager.go
173 lines (150 loc) · 3.62 KB
/
manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package worker
import (
"context"
"hash/crc32"
"strings"
"sync"
"time"
"github.com/sryanyuan/binp/utils"
"github.com/juju/errors"
"github.com/sirupsen/logrus"
)
const (
minWorkerCount = 1
workerReportChanSize = 2560
workerJobChanSize = 2560
rplPointSaveInterval = 15
)
type workerReport struct {
discardable bool
wid int
err error
}
// WorkerManager manages all workers
type WorkerManager struct {
workers []*worker
wreport chan *workerReport
done context.Context
stopFn context.CancelFunc
wg sync.WaitGroup
jobWg sync.WaitGroup
lastRplPointTime int64
}
// NewWorkerManager creates a new WorkerManager
func NewWorkerManager(cfg *WorkerConfig) (*WorkerManager, error) {
workerCount := cfg.WorkerCount
if workerCount < minWorkerCount {
logrus.Warnf("Minimal worker count is %d", minWorkerCount)
workerCount = minWorkerCount
}
wm := &WorkerManager{
wreport: make(chan *workerReport, workerReportChanSize),
workers: make([]*worker, 0, workerCount),
lastRplPointTime: time.Now().Unix(),
}
// Create executor
execs, err := createExecutors(cfg.Tos)
if nil != err {
return nil, errors.Trace(err)
}
for i := 0; i < workerCount; i++ {
w := &worker{}
w.wid = i
w.executors = execs
w.jobWg = &wm.jobWg
wm.workers = append(wm.workers, w)
}
return wm, nil
}
// Start starts all workers
func (w *WorkerManager) Start() error {
for _, wr := range w.workers {
if err := wr.start(&w.wg, 0, 0); nil != err {
return errors.Trace(err)
}
}
w.wg.Add(1)
go w.workerReportLoop()
return nil
}
// Stop stops all workers
func (w *WorkerManager) Stop() {
w.jobWg.Wait()
for _, v := range w.workers {
v.stop()
}
// Once all worker is stop, worker report channel won't be used any more
close(w.wreport)
w.wg.Wait()
}
func (w *WorkerManager) uploadReport(r *workerReport) {
defer func() {
w.wg.Done()
}()
select {
case w.wreport <- r:
{
// Nothing
}
default:
{
// Check discardable
if r.discardable {
return
}
logrus.Warnf("Worker report channel is full")
w.wreport <- r
}
}
}
func (w *WorkerManager) workerReportLoop() {
for {
select {
case rp, ok := <-w.wreport:
{
if !ok {
logrus.Infof("Worker report loop quit")
return
}
_ = rp
}
}
}
}
// DispatchWorkerEvent dispatchs WorkerEvent to worker, return true if replication point is checked
func (w *WorkerManager) DispatchWorkerEvent(job *WorkerEvent, dispPolicy int) (bool, error) {
index := -1
var key string
if DispatchPolicyPrimaryKey == dispPolicy {
// Find keys from primary keys
if len(job.Ti.IndexColumns) != 0 {
pkvalues := make([]string, 0, len(job.Ti.IndexColumns))
for _, ic := range job.Ti.IndexColumns {
pkvalues = append(pkvalues, job.Columns[ic.Index].ValueToString())
}
key = strings.Join(pkvalues, ",")
}
} else if DispatchPolicyTableName == dispPolicy {
key = utils.GetTableKey(job.SDesc.RewriteSchema, job.SDesc.RewriteTable)
}
if "" == key {
return false, errors.Errorf("Can't get job dispatch key, dispatch policy = %d, job = %v", dispPolicy, job)
}
index = int(crc32.ChecksumIEEE([]byte(key))) % len(w.workers)
w.workers[index].push(job)
w.jobWg.Add(1)
// Need wait and write the lastest replication point
rplPointChecked := false
if rplPointChecked = w.needSaveRplPoint(); rplPointChecked {
w.jobWg.Wait()
w.lastRplPointTime = time.Now().Unix()
}
return rplPointChecked, nil
}
func (w *WorkerManager) needSaveRplPoint() bool {
tn := time.Now().Unix()
if tn-w.lastRplPointTime > rplPointSaveInterval {
return true
}
return false
}