forked from ShiningRush/fastflow
/
dispatcher.go
99 lines (87 loc) · 1.77 KB
/
dispatcher.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
package mod
import (
"github.com/linclin/fastflow/pkg/entity"
"github.com/linclin/fastflow/pkg/event"
"github.com/linclin/fastflow/pkg/log"
"github.com/linclin/fastflow/pkg/utils/data"
"github.com/shiningrush/goevent"
"sync"
"time"
)
// DefDispatcher
type DefDispatcher struct {
closeCh chan struct{}
wg sync.WaitGroup
}
// NewDefDispatcher
func NewDefDispatcher() *DefDispatcher {
return &DefDispatcher{
closeCh: make(chan struct{}),
}
}
// Init
func (d *DefDispatcher) Init() {
d.wg.Add(1)
go d.WatchInitDags()
}
// WatchInitDags
func (d *DefDispatcher) WatchInitDags() {
closed := false
timerCh := time.Tick(time.Second)
for !closed {
select {
case <-d.closeCh:
closed = true
case <-timerCh:
start := time.Now()
e := &event.DispatchInitDagInsCompleted{}
if err := d.Do(); err != nil {
d.handlerErr(err)
e.Error = err
}
e.ElapsedMs = time.Now().Sub(start).Milliseconds()
goevent.Publish(e)
}
}
d.wg.Done()
}
// Do dispatch
func (d *DefDispatcher) Do() error {
dagIns, err := GetStore().ListDagInstance(&ListDagInstanceInput{
Status: []entity.DagInstanceStatus{
entity.DagInstanceStatusInit,
},
Limit: 1000,
})
if err != nil {
return err
}
if len(dagIns) == 0 {
return nil
}
nodes, err := GetKeeper().AliveNodes()
if err != nil {
return err
}
if len(nodes) == 0 {
return data.ErrNoAliveNodes
}
for i := range dagIns {
dagIns[i].Status = entity.DagInstanceStatusScheduled
dagIns[i].Worker = nodes[i%len(nodes)]
}
if err := GetStore().BatchUpdateDagIns(dagIns); err != nil {
return err
}
return nil
}
func (d *DefDispatcher) handlerErr(err error) {
log.Errorf("dispatch failed",
"module", "dispatch",
"err", err)
}
// Close component
func (d *DefDispatcher) Close() {
close(d.closeCh)
d.wg.Wait()
}