-
Notifications
You must be signed in to change notification settings - Fork 28
/
handler.go
248 lines (206 loc) · 6.56 KB
/
handler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
package pipeline
import (
"context"
"errors"
"fmt"
"github.com/infraboard/workflow/api/apps/pipeline"
)
// syncHandler compares the actual state with the desired, and attempts to
// converge the two. It then updates the Status block of the Network resource
// with the current status of the resource.
func (c *Controller) syncHandler(key string) error {
obj, ok, err := c.informer.GetStore().GetByKey(key)
if err != nil {
return err
}
// 如果不存在, 这期望行为为删除 (DEL)
if !ok {
c.log.Debugf("remove pipeline: %s, skip", key)
return nil
}
ins, isOK := obj.(*pipeline.Pipeline)
if !isOK {
return errors.New("invalidate *pipeline.Pipeline obj")
}
// 运行pipeline
if err := c.runPipeline(ins); err != nil {
return err
}
return nil
}
// 运行一个pipeline,流程如下
// 1. 判断是否已经完成, 已完不做处理
// 2. 判断是否需要调度, 未调度先调度
// 3. 判断是否已经运行, 未运行先标记为运行状态
// 4. 如果是运行状态, 判断pipline是否需要中断
// 5. 如果pipeline正常, 则开始运行定义的 Next Step
func (c *Controller) runPipeline(p *pipeline.Pipeline) error {
c.log.Debugf("receive add pipeline: %s status: %s", p.ShortDescribe(), p.Status.Status)
if err := p.Validate(); err != nil {
return fmt.Errorf("invalidate pipeline error, %s", err)
}
// 已经处理完成的无需处理
if p.IsComplete() {
return fmt.Errorf("skip run complete pipeline %s, status: %s", p.ShortDescribe(), p.Status.Status)
}
// TODO: 使用分布式锁trylock处理 多个实例竞争调度问题
// 未调度的选进行调度后, 再处理
if !p.IsScheduled() {
if err := c.schedulePipeline(p); err != nil {
return err
}
return nil
}
// 标记开始执行, 并更新保存
if !p.IsRunning() {
p.Run()
if err := c.informer.Recorder().Update(p); err != nil {
c.log.Errorf("update pipeline %s start status to store error, %s", p.ShortDescribe(), err)
} else {
c.log.Debugf("update pipeline %s start status to store success", p.ShortDescribe())
}
return nil
}
// 判断pipeline没有要执行的下一步, 则结束整个Pipeline
steps := c.nextStep(p)
c.log.Debugf("pipeline %s start run next steps: %s", p.ShortDescribe(), steps)
return c.runPipelineNextStep(steps)
}
func (c *Controller) nextStep(p *pipeline.Pipeline) []*pipeline.Step {
// 找出 pipeline 下次执行需要的step
steps, isComplete := p.NextStep()
if isComplete {
p.Complete()
if err := c.informer.Recorder().Update(p); err != nil {
c.log.Errorf("update pipeline %s end status to store error, %s", p.ShortDescribe(), err)
} else {
c.log.Debugf("pipeline is complete, update pipeline status to db success")
}
return nil
}
// 找出需要同步的step
needSync := []*pipeline.Step{}
for i := range steps {
ins := steps[i]
// 判断step是否已经运行, 如果已经运行则更新Pipeline状态
old, err := c.step.Lister().Get(context.Background(), ins.Key)
if err != nil {
c.log.Errorf("get step %s by key error, %s", ins.Key, err)
return nil
}
if old == nil {
c.log.Debugf("step %s not found in db", ins.Key)
continue
}
// 状态相等 则无需同步
if ins.Status.Status.Equal(old.Status.Status) {
c.log.Debugf("pipeline step status: %s, etcd step status: %s, has sync",
ins.Status.Status, old.Status.Status)
continue
}
needSync = append(needSync, old)
}
// 同步step到pipeline上
if len(needSync) > 0 {
for i := range needSync {
c.log.Debugf("sync step %s to pipeline ...", needSync[i].Key)
p.UpdateStep(needSync[i])
}
if err := c.informer.Recorder().Update(p); err != nil {
c.log.Errorf("update pipeline status error, %s", err)
return nil
}
c.log.Debugf("sync %d steps ok", len(needSync))
return nil
}
return steps
}
func (c *Controller) runPipelineNextStep(steps []*pipeline.Step) error {
// 将需要调度的任务, 交给step调度器调度
if c.step == nil {
return fmt.Errorf("step recorder is nil")
}
// 有step则进行执行
for i := range steps {
ins := steps[i]
c.log.Debugf("create pipeline step: %s", ins.Key)
if err := c.step.Recorder().Update(ins.Clone()); err != nil {
c.log.Errorf(err.Error())
}
}
return nil
}
// Pipeline 调度
func (c *Controller) schedulePipeline(p *pipeline.Pipeline) error {
c.log.Debugf("pipeline %s start schedule ...", p.ShortDescribe())
node, err := c.picker.Pick(p)
if err != nil {
return err
}
// 没有合法的node
if node == nil {
return fmt.Errorf("no excutable scheduler")
}
c.log.Debugf("choice scheduler %s for pipeline %s", node.InstanceName, p.Id)
p.SetScheduleNode(node.InstanceName)
c.updatePipelineStatus(p)
return nil
}
func (c *Controller) updatePipelineStatus(p *pipeline.Pipeline) {
if p == nil {
c.log.Errorf("update pipeline is nil")
return
}
if c.informer.Recorder() == nil {
c.log.Errorf("pipeline informer recorder missed")
return
}
// 清除一下其他数据
if err := c.informer.Recorder().Update(p); err != nil {
c.log.Errorf("update scheduled pipeline error, %s", err)
}
}
// step 如果完成后, 将状态记录到Pipeline上, 并删除step
func (c *Controller) UpdateStepCallback(old, new *pipeline.Step) {
c.log.Debugf("receive step update event, start update step status to pipeline ...")
c.log.Debugf("old[%s]: %s, new[%s]: %s", old.Key, old.Status, new.Key, new.Status)
if !new.CreateType.Equal(pipeline.STEP_CREATE_BY_PIPELINE) {
c.log.Debugf("step type is %s, skip update status to pipeline", new.CreateType)
return
}
key := pipeline.PipeLineObjectKey(new.GetNamespace(), new.GetPipelineId())
obj, ok, err := c.informer.GetStore().GetByKey(key)
if err != nil {
c.log.Errorf("get pipeline from store error, %s", err)
return
}
if !ok {
c.log.Errorf("pipeline key %s not found in store cache", key)
return
}
p, isOK := obj.(*pipeline.Pipeline)
if !isOK {
c.log.Errorf("invalidate *pipeline.Pipeline obj")
return
}
current, err := p.GetStep(new.GetPipelineStageNumber(), new.Key)
if err != nil {
c.log.Errorf("get current step from pipeline error, %s", err)
return
}
// 判断状态是否变化, 只有状态变化才需要更新
if current.IsStatusEqual(new) {
c.log.Infof("status not changed, skip update, current: %s, target: %s",
current.Status, new.Status)
return
}
if err := p.UpdateStep(new); err != nil {
c.log.Errorf("update pipeline step error, %s", err)
return
}
if err := c.informer.Recorder().Update(p); err != nil {
c.log.Errorf("update pipeline status to store error, %s", err)
return
}
c.log.Debugf("update pipeline %s step %s success", p.ShortDescribe(), new.Key)
}