-
Notifications
You must be signed in to change notification settings - Fork 182
/
coordinator.go
100 lines (83 loc) · 2.47 KB
/
coordinator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package stream
import (
"fmt"
"time"
"github.com/okex/exchain/x/stream/types"
"github.com/tendermint/tendermint/libs/log"
)
const (
atomTaskTimeout int = distributeLockTimeout * 0.98
)
type Coordinator struct {
engineMap map[EngineKind]types.IStreamEngine
taskChan chan *TaskWithData
resultChan chan Task
atomTaskTimeout int // In Million Second
logger log.Logger
}
func NewCoordinator(logger log.Logger, taskCh chan *TaskWithData, resultCh chan Task, timeout int, engineMap map[EngineKind]types.IStreamEngine) *Coordinator {
c := Coordinator{
logger: logger,
taskChan: taskCh,
resultChan: resultCh,
atomTaskTimeout: timeout,
engineMap: engineMap,
}
return &c
}
func (c *Coordinator) prepareAtomTasks(taskDesc *TaskWithData, notify chan AtomTaskResult) []*AtomTaskRunner {
var runners []*AtomTaskRunner
for streamType, done := range taskDesc.DoneMap {
if !done {
engineType := StreamKind2EngineKindMap[streamType]
if engineType == EngineNilKind {
err := fmt.Errorf("stream kind: %+v not supported, no Kind found, Quite", streamType)
panic(err)
}
r := AtomTaskRunner{
data: taskDesc.dataMap[streamType],
engine: c.engineMap[engineType],
result: notify,
logger: c.logger,
sType: streamType,
}
runners = append(runners, &r)
}
}
return runners
}
func (c *Coordinator) run() {
for task := range c.taskChan {
// outer loop, block to get streamTask from taskChan
func() {
validTaskCnt := task.validAtomTaskCount()
if validTaskCnt > 0 {
notifyCh := make(chan AtomTaskResult, validTaskCnt)
atomRunners := c.prepareAtomTasks(task, notifyCh)
c.logger.Debug(fmt.Sprintf("Coordinator loop: %d atomRunners prepared, %+v", len(atomRunners), atomRunners))
for _, r := range atomRunners {
go r.run()
}
timer := time.NewTimer(time.Duration(c.atomTaskTimeout * int(time.Millisecond)))
notifyCnt := 0
// inner loop, wait all jobs timeout or get all notified message
for {
select {
case <-timer.C:
c.logger.Error(fmt.Sprintf(
"Coordinator: All atom tasks are forced stop becoz of %d millsecond timeout", c.atomTaskTimeout))
notifyCnt = validTaskCnt
case taskResult := <-notifyCh:
task.DoneMap[taskResult.sType] = taskResult.successDone
notifyCnt++
}
if validTaskCnt == notifyCnt {
break
}
}
}
task.UpdatedAt = time.Now().Unix()
c.resultChan <- *task.Task
}()
}
}