/
task_manager.go
296 lines (268 loc) · 8.54 KB
/
task_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package dm
import (
"context"
"sync"
"time"
dmconfig "github.com/pingcap/tiflow/dm/config"
frameModel "github.com/pingcap/tiflow/engine/framework/model"
"github.com/pingcap/tiflow/engine/jobmaster/dm/config"
"github.com/pingcap/tiflow/engine/jobmaster/dm/metadata"
"github.com/pingcap/tiflow/engine/jobmaster/dm/runtime"
dmpkg "github.com/pingcap/tiflow/engine/pkg/dm"
"github.com/pingcap/tiflow/engine/pkg/dm/ticker"
"github.com/pingcap/tiflow/engine/pkg/promutil"
"github.com/pingcap/tiflow/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
var (
taskNormalInterval = time.Second * 30
taskErrorInterval = time.Second * 10
)
// TaskManager checks and operates task.
type TaskManager struct {
*ticker.DefaultTicker
jobID string
jobStore *metadata.JobStore
messageAgent dmpkg.MessageAgent
logger *zap.Logger
// tasks record the runtime task status
// taskID -> TaskStatus
tasks sync.Map
gaugeVec *prometheus.GaugeVec
}
// NewTaskManager creates a new TaskManager instance
func NewTaskManager(
jobID string,
initTaskStatus []runtime.TaskStatus,
jobStore *metadata.JobStore,
messageAgent dmpkg.MessageAgent,
pLogger *zap.Logger,
metricFactory promutil.Factory,
) *TaskManager {
taskManager := &TaskManager{
jobID: jobID,
DefaultTicker: ticker.NewDefaultTicker(taskNormalInterval, taskErrorInterval),
jobStore: jobStore,
logger: pLogger.With(zap.String("component", "task_manager")),
messageAgent: messageAgent,
gaugeVec: metricFactory.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "dm",
Subsystem: "worker",
Name: "task_state",
Help: "task state of dm worker in this job",
}, []string{"task", "source_id"}),
}
taskManager.DefaultTicker.Ticker = taskManager
for _, taskStatus := range initTaskStatus {
taskManager.UpdateTaskStatus(taskStatus)
}
return taskManager
}
// OperateTask updates the task status in metadata and triggers the task manager to check and operate task.
// called by user request.
func (tm *TaskManager) OperateTask(ctx context.Context, op dmpkg.OperateType, jobCfg *config.JobCfg, tasks []string) (err error) {
tm.logger.Info("operate task", zap.Stringer("op", op), zap.Strings("tasks", tasks))
defer func() {
if err == nil {
tm.SetNextCheckTime(time.Now())
}
}()
var stage metadata.TaskStage
switch op {
case dmpkg.Create:
return tm.jobStore.Put(ctx, metadata.NewJob(jobCfg))
case dmpkg.Update:
return tm.jobStore.UpdateConfig(ctx, jobCfg)
// Deleting marks the job as deleting.
case dmpkg.Deleting:
return tm.jobStore.MarkDeleting(ctx)
// Delete deletes the job in metadata.
case dmpkg.Delete:
return tm.jobStore.Delete(ctx)
case dmpkg.Resume:
stage = metadata.StageRunning
case dmpkg.Pause:
stage = metadata.StagePaused
default:
return errors.New("unknown operate type")
}
return tm.jobStore.UpdateStages(ctx, tasks, stage)
}
// UpdateTaskStatus is called when receive task status from worker.
func (tm *TaskManager) UpdateTaskStatus(taskStatus runtime.TaskStatus) {
tm.logger.Debug(
"update task status",
zap.String("task_id", taskStatus.Task),
zap.Stringer("stage", taskStatus.Stage),
zap.Stringer("unit", taskStatus.Unit),
zap.Uint64("config_modify_revison", taskStatus.CfgModRevision),
)
tm.tasks.Store(taskStatus.Task, taskStatus)
tm.gaugeVec.WithLabelValues(tm.jobID, taskStatus.Task).Set(float64(taskStatus.Stage))
}
// TaskStatus return the task status.
func (tm *TaskManager) TaskStatus() map[string]runtime.TaskStatus {
result := make(map[string]runtime.TaskStatus)
tm.tasks.Range(func(key, value interface{}) bool {
result[key.(string)] = value.(runtime.TaskStatus)
return true
})
return result
}
// TickImpl removes tasks that are not in the job config.
// TickImpl checks and operates task if needed.
func (tm *TaskManager) TickImpl(ctx context.Context) error {
tm.logger.Info("start to check and operate tasks")
state, err := tm.jobStore.Get(ctx)
if err != nil || state.(*metadata.Job).Deleting {
tm.logger.Info("on job deleting", zap.Error(err))
tm.onJobDel()
return err
}
job := state.(*metadata.Job)
tm.removeTaskStatus(job)
return tm.checkAndOperateTasks(ctx, job)
}
func (tm *TaskManager) checkAndOperateTasks(ctx context.Context, job *metadata.Job) error {
var (
runningTask runtime.TaskStatus
recordError error
)
// check and operate task
for taskID, persistentTask := range job.Tasks {
task, ok := tm.tasks.Load(taskID)
if ok {
runningTask = task.(runtime.TaskStatus)
}
// task unbounded or worker offline
if !ok || runningTask.Stage == metadata.StageUnscheduled {
recordError = errors.New("get task running status failed")
tm.logger.Error("failed to schedule task", zap.String("task_id", taskID), zap.Error(recordError))
continue
}
op := genOp(runningTask.Stage, runningTask.StageUpdatedTime, persistentTask.Stage, persistentTask.StageUpdatedTime)
if op == dmpkg.None {
tm.logger.Debug(
"task status will not be changed",
zap.String("task_id", taskID),
zap.Stringer("stage", runningTask.Stage),
)
continue
}
tm.logger.Info(
"unexpected task status",
zap.String("task_id", taskID),
zap.Stringer("op", op),
zap.Stringer("expected_stage", persistentTask.Stage),
zap.Stringer("stage", runningTask.Stage),
)
// operateTaskMessage should be a asynchronous request
if err := tm.operateTaskMessage(ctx, taskID, op); err != nil {
recordError = err
tm.logger.Error("operate task failed", zap.Error(recordError))
continue
}
}
return recordError
}
// remove all tasks, usually happened when delete jobs.
func (tm *TaskManager) onJobDel() {
tm.logger.Info("clear all task status")
tm.tasks.Range(func(key, value interface{}) bool {
tm.tasks.Delete(key)
tm.gaugeVec.DeleteLabelValues(tm.jobID, key.(string))
return true
})
}
// remove deleted task status, usually happened when update-job delete some tasks.
func (tm *TaskManager) removeTaskStatus(job *metadata.Job) {
tm.tasks.Range(func(key, value interface{}) bool {
taskID := key.(string)
if _, ok := job.Tasks[taskID]; !ok {
tm.logger.Info("remove task status", zap.String("task_id", taskID))
tm.tasks.Delete(taskID)
tm.gaugeVec.DeleteLabelValues(tm.jobID, taskID)
}
return true
})
}
// GetTaskStatus gets task status by taskID
func (tm *TaskManager) GetTaskStatus(taskID string) (runtime.TaskStatus, bool) {
value, ok := tm.tasks.Load(taskID)
if !ok {
return runtime.NewOfflineStatus(taskID), false
}
return value.(runtime.TaskStatus), true
}
func genOp(
runningStage metadata.TaskStage,
runningStageUpdatedTime time.Time,
expectedStage metadata.TaskStage,
expectedStageUpdatedTime time.Time,
) dmpkg.OperateType {
switch {
case expectedStage == metadata.StagePaused && (runningStage == metadata.StageRunning || runningStage == metadata.StageError):
return dmpkg.Pause
case expectedStage == metadata.StageRunning:
if runningStage == metadata.StagePaused {
return dmpkg.Resume
}
// only resume a error task for a manual Resume action by checking expectedStageUpdatedTime
if runningStage == metadata.StageError && expectedStageUpdatedTime.After(runningStageUpdatedTime) {
return dmpkg.Resume
}
return dmpkg.None
// TODO: support update
default:
return dmpkg.None
}
}
func (tm *TaskManager) operateTaskMessage(ctx context.Context, taskID string, op dmpkg.OperateType) error {
msg := &dmpkg.OperateTaskMessage{
Task: taskID,
Op: op,
}
return tm.messageAgent.SendMessage(ctx, taskID, dmpkg.OperateTask, msg)
}
func (tm *TaskManager) allFinished(ctx context.Context) bool {
state, err := tm.jobStore.Get(ctx)
if err != nil {
return false
}
job := state.(*metadata.Job)
for taskID, task := range job.Tasks {
t, ok := tm.tasks.Load(taskID)
if !ok {
return false
}
runningTask := t.(runtime.TaskStatus)
if runningTask.Stage != metadata.StageFinished {
return false
}
// update if we add new task mode
switch task.Cfg.TaskMode {
case dmconfig.ModeFull:
if runningTask.Unit != frameModel.WorkerDMLoad {
return false
}
case dmconfig.ModeDump:
default:
return false
}
}
return true
}