Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: add finish flow for distribute framework #43676

Merged
merged 17 commits into from
May 15, 2023
1 change: 1 addition & 0 deletions ddl/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ const (
// BackgroundSubtaskTableSQL is the CREATE TABLE SQL of `tidb_background_subtask`.
BackgroundSubtaskTableSQL = "create table " + BackgroundSubtaskTable + `(
id bigint not null auto_increment primary key,
step int,
namespace varchar(256),
task_key varchar(256),
ddl_physical_tid bigint(20),
Expand Down
7 changes: 7 additions & 0 deletions ddl/disttask_flow.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatche
tblInfo, err = meta.NewMeta(txn).GetTable(job.SchemaID, job.TableID)
return err
})
if err != nil {
return nil, err
}

if tblInfo.Partition == nil {
return nil, errors.New("Non-partition table not supported yet")
Expand Down Expand Up @@ -89,6 +92,10 @@ func (h *litBackfillFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatche
return subTaskMetas, nil
}

func (*litBackfillFlowHandle) ProcessFinishFlow(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ [][]byte) (err error) {
return nil
}

func (*litBackfillFlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, task *proto.Task, receiveErr [][]byte) (meta []byte, err error) {
// We do not need extra meta info when rolling back
firstErr := receiveErr[0]
Expand Down
4 changes: 2 additions & 2 deletions ddl/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,8 @@ func (b *backfillSchedulerHandle) SplitSubtask(ctx context.Context, subtask []by
}

// OnSubtaskFinished implements the Scheduler interface.
func (*backfillSchedulerHandle) OnSubtaskFinished(context.Context, []byte) error {
return nil
func (*backfillSchedulerHandle) OnSubtaskFinished(_ context.Context, meta []byte) ([]byte, error) {
return meta, nil
}

// CleanupSubtaskExecEnv implements the Scheduler interface.
Expand Down
32 changes: 31 additions & 1 deletion disttask/framework/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,14 @@ func (d *dispatcher) detectTask(gTask *proto.Task) {
zap.Int64("taskID", gTask.ID), zap.String("state", gTask.State))
break
}

// process finish flow if task finish a step.
if stepIsFinished && len(errStr) == 0 && gTask.State == proto.TaskStateRunning {
logutil.BgLogger().Info("detect task, this task finished a step",
zap.Int64("taskID", gTask.ID), zap.String("state", gTask.State), zap.Int64("step", gTask.Step))
if err := d.processFinishFlow(gTask); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can put processFinishFlow directly, and only import step2 or other special step needs to do this.

errStr = [][]byte{[]byte(err.Error())}
}
}
if isFinished := d.processFlow(gTask, errStr); isFinished {
logutil.BgLogger().Info("detect task, this task is finished",
zap.Int64("taskID", gTask.ID), zap.String("state", gTask.State))
Expand Down Expand Up @@ -428,6 +435,29 @@ func (d *dispatcher) processNormalFlow(gTask *proto.Task) (err error) {
return d.updateTask(gTask, gTask.State, subTasks, retrySQLTimes)
}

func (d *dispatcher) processFinishFlow(gTask *proto.Task) (err error) {
// Generate the needed global task meta and subTask meta.
handle := GetTaskFlowHandle(gTask.Type)
if handle == nil {
logutil.BgLogger().Warn("gen gTask flow handle failed, this type handle doesn't register", zap.Int64("ID", gTask.ID), zap.String("type", gTask.Type))
return d.updateTask(gTask, proto.TaskStateReverted, nil, retrySQLTimes)
}
previousSubtasks, err := d.taskMgr.GetSucceedSubtasksByStep(gTask.ID, gTask.Step)
if err != nil {
logutil.BgLogger().Warn("get previous succeed subtask failed", zap.Int64("ID", gTask.ID), zap.String("type", gTask.Type))
return err
}
previousSubtaskMetas := make([][]byte, 0, len(previousSubtasks))
for _, subtask := range previousSubtasks {
previousSubtaskMetas = append(previousSubtaskMetas, subtask.Meta)
}
if err := handle.ProcessFinishFlow(d.ctx, d, gTask, previousSubtaskMetas); err != nil {
logutil.BgLogger().Warn("process finish flow failed", zap.Error(err))
return err
}
return nil
}

// GetEligibleInstance gets an eligible instance.
func GetEligibleInstance(serverNodes []*infosync.ServerInfo, pos int) (string, error) {
if pos >= len(serverNodes) && pos < 0 {
Expand Down
8 changes: 6 additions & 2 deletions disttask/framework/dispatcher/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func TestGetInstance(t *testing.T) {
TaskID: gTaskID,
SchedulerID: uuids[1],
}
err = mgr.AddNewSubTask(gTaskID, subtask.SchedulerID, nil, subtask.Type, true)
err = mgr.AddNewSubTask(gTaskID, proto.StepInit, subtask.SchedulerID, nil, subtask.Type, true)
require.NoError(t, err)
instanceIDs, err = dsp.GetAllSchedulerIDs(ctx, gTaskID)
require.NoError(t, err)
Expand All @@ -121,7 +121,7 @@ func TestGetInstance(t *testing.T) {
TaskID: gTaskID,
SchedulerID: uuids[0],
}
err = mgr.AddNewSubTask(gTaskID, subtask.SchedulerID, nil, subtask.Type, true)
err = mgr.AddNewSubTask(gTaskID, proto.StepInit, subtask.SchedulerID, nil, subtask.Type, true)
require.NoError(t, err)
instanceIDs, err = dsp.GetAllSchedulerIDs(ctx, gTaskID)
require.NoError(t, err)
Expand Down Expand Up @@ -313,6 +313,10 @@ func (n NumberExampleHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.T
return metas, nil
}

func (n NumberExampleHandle) ProcessFinishFlow(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ [][]byte) (err error) {
return nil
}

func (n NumberExampleHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ [][]byte) (meta []byte, err error) {
// Don't handle not.
return nil, nil
Expand Down
5 changes: 3 additions & 2 deletions disttask/framework/dispatcher/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ import (

// TaskFlowHandle is used to control the process operations for each global task.
type TaskFlowHandle interface {
ProcessNormalFlow(ctx context.Context, h TaskHandle, gTask *proto.Task) (metas [][]byte, err error)
ProcessErrFlow(ctx context.Context, h TaskHandle, gTask *proto.Task, receiveErr [][]byte) (meta []byte, err error)
ProcessNormalFlow(ctx context.Context, h TaskHandle, gTask *proto.Task) (subtaskMetas [][]byte, err error)
ProcessFinishFlow(ctx context.Context, h TaskHandle, gTask *proto.Task, previousSubtaskMetas [][]byte) (err error)
ProcessErrFlow(ctx context.Context, h TaskHandle, gTask *proto.Task, receiveErr [][]byte) (subtaskMeta []byte, err error)
}

var taskFlowHandleMap struct {
Expand Down
8 changes: 6 additions & 2 deletions disttask/framework/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ func (*testFlowHandle) ProcessNormalFlow(_ context.Context, _ dispatcher.TaskHan
return nil, nil
}

func (*testFlowHandle) ProcessFinishFlow(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ [][]byte) (err error) {
return nil
}

func (*testFlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ [][]byte) (meta []byte, err error) {
return nil, nil
}
Expand All @@ -66,8 +70,8 @@ func (t *testScheduler) SplitSubtask(_ context.Context, subtask []byte) ([]proto
}, nil
}

func (t *testScheduler) OnSubtaskFinished(_ context.Context, _ []byte) error {
return nil
func (t *testScheduler) OnSubtaskFinished(_ context.Context, meta []byte) ([]byte, error) {
return meta, nil
}

type testSubtaskExecutor struct {
Expand Down
1 change: 1 addition & 0 deletions disttask/framework/proto/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func (t *Task) IsFinished() bool {
// Each task is divided into multiple subtasks by dispatcher.
type Subtask struct {
ID int64
Step int64
Type string
TaskID int64
State string
Expand Down
4 changes: 3 additions & 1 deletion disttask/framework/scheduler/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type TaskTable interface {
GetGlobalTaskByID(taskID int64) (task *proto.Task, err error)
GetSubtaskInStates(instanceID string, taskID int64, states ...interface{}) (*proto.Subtask, error)
UpdateSubtaskStateAndError(id int64, state string, err string) error
FinishSubtask(id int64, meta []byte) error
HasSubtasksInStates(instanceID string, taskID int64, states ...interface{}) (bool, error)
}

Expand Down Expand Up @@ -54,7 +55,8 @@ type Scheduler interface {
// CleanupSubtaskExecEnv is used to clean up the environment for the subtask executor.
CleanupSubtaskExecEnv(context.Context) error
// OnSubtaskFinished is used to handle the subtask when it is finished.
OnSubtaskFinished(ctx context.Context, subtask []byte) error
// return the result of the subtask.
OnSubtaskFinished(ctx context.Context, subtask []byte) ([]byte, error)
// Rollback is used to rollback all subtasks.
Rollback(context.Context) error
}
Expand Down
13 changes: 11 additions & 2 deletions disttask/framework/scheduler/interface_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func (t *MockTaskTable) UpdateSubtaskStateAndError(id int64, state string, _ str
return args.Error(0)
}

// FinishSubtask implements SubtaskTable.FinishSubtask
func (t *MockTaskTable) FinishSubtask(id int64, meta []byte) error {
args := t.Called(id, meta)
return args.Error(0)
}

// HasSubtasksInStates implements SubtaskTable.HasSubtasksInStates.
func (t *MockTaskTable) HasSubtasksInStates(instanceID string, taskID int64, states ...interface{}) (bool, error) {
args := t.Called(instanceID, taskID, states)
Expand Down Expand Up @@ -133,9 +139,12 @@ func (m *MockScheduler) SplitSubtask(ctx context.Context, subtask []byte) ([]pro
}

// OnSubtaskFinished implements Scheduler.OnSubtaskFinished.
func (m *MockScheduler) OnSubtaskFinished(ctx context.Context, subtask []byte) error {
func (m *MockScheduler) OnSubtaskFinished(ctx context.Context, subtask []byte) ([]byte, error) {
args := m.Called(ctx, subtask)
return args.Error(0)
if args.Error(1) != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this file manually mocked?

Copy link
Contributor Author

@GMHDBJD GMHDBJD May 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I will change to generated in another pr

return nil, args.Error(1)
}
return args.Get(0).([]byte), nil
}

// CleanupSubtaskExecEnv implements Scheduler.CleanupSubtaskExecEnv.
Expand Down
7 changes: 5 additions & 2 deletions disttask/framework/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,8 +200,9 @@ func (s *InternalSchedulerImpl) runSubtask(ctx context.Context, scheduler Schedu
}

func (s *InternalSchedulerImpl) onSubtaskFinished(ctx context.Context, scheduler Scheduler, subtask *proto.Subtask) {
var subtaskMeta []byte
if err := s.getError(); err == nil {
if err := scheduler.OnSubtaskFinished(ctx, subtask.Meta); err != nil {
if subtaskMeta, err = scheduler.OnSubtaskFinished(ctx, subtask.Meta); err != nil {
s.onError(err)
}
}
Expand All @@ -213,7 +214,9 @@ func (s *InternalSchedulerImpl) onSubtaskFinished(ctx context.Context, scheduler
}
return
}
s.updateSubtaskStateAndError(subtask.ID, proto.TaskStateSucceed, "")
if err := s.taskTable.FinishSubtask(subtask.ID, subtaskMeta); err != nil {
s.onError(err)
}
}

func (s *InternalSchedulerImpl) runMinimalTask(minimalTaskCtx context.Context, minimalTask proto.MinimalTask, tp string, step int64) {
Expand Down
17 changes: 6 additions & 11 deletions disttask/framework/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,18 +119,13 @@ func TestSchedulerRun(t *testing.T) {
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", taskID, proto.TaskStateSucceed).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return([]byte(""), nil).Once()
mockSubtaskTable.On("FinishSubtask", int64(1), mock.Anything).Return(nil).Once()
mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(nil, nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
err = scheduler.Run(runCtx, &proto.Task{Type: tp, ID: taskID, Concurrency: concurrency})
require.NoError(t, err)

mockSubtaskTable.AssertExpectations(t)
mockScheduler.AssertExpectations(t)
mockSubtaskExecutor.AssertExpectations(t)
mockPool.AssertExpectations(t)

// 9. run subtask concurrently
RegisterSchedulerConstructor(tp, func(task []byte, step int64) (Scheduler, error) {
return mockScheduler, nil
Expand All @@ -141,15 +136,15 @@ func TestSchedulerRun(t *testing.T) {
mockSubtaskTable.On("UpdateSubtaskStateAndError", int64(1), proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", int64(1), proto.TaskStateSucceed).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return([]byte(""), nil).Once()
mockSubtaskTable.On("FinishSubtask", int64(1), mock.Anything).Return(nil).Once()

mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(&proto.Subtask{ID: 2, Type: tp}, nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", int64(2), proto.TaskStateRunning).Return(nil).Once()
mockScheduler.On("SplitSubtask", mock.Anything, mock.Anything).Return([]proto.MinimalTask{MockMinimalTask{}}, nil).Once()
mockSubtaskExecutor.On("Run", mock.Anything).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return(nil).Once()
mockSubtaskTable.On("UpdateSubtaskStateAndError", int64(2), proto.TaskStateSucceed).Return(nil).Once()
mockScheduler.On("OnSubtaskFinished", mock.Anything, mock.Anything).Return([]byte(""), nil).Once()
mockSubtaskTable.On("FinishSubtask", int64(2), mock.Anything).Return(nil).Once()

mockSubtaskTable.On("GetSubtaskInStates", "id", taskID, []interface{}{proto.TaskStatePending}).Return(nil, nil).Once()
mockScheduler.On("CleanupSubtaskExecEnv", mock.Anything).Return(nil).Once()
Expand Down
13 changes: 11 additions & 2 deletions disttask/framework/storage/table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func TestSubTaskTable(t *testing.T) {
sm, err := storage.GetTaskManager()
require.NoError(t, err)

err = sm.AddNewSubTask(1, "tidb1", []byte("test"), proto.TaskTypeExample, false)
err = sm.AddNewSubTask(1, proto.StepInit, "tidb1", []byte("test"), proto.TaskTypeExample, false)
require.NoError(t, err)

nilTask, err := sm.GetSubtaskInStates("tidb2", 1, proto.TaskStatePending)
Expand Down Expand Up @@ -198,12 +198,21 @@ func TestSubTaskTable(t *testing.T) {
require.NoError(t, err)
require.False(t, ok)

err = sm.AddNewSubTask(2, "tidb1", []byte("test"), proto.TaskTypeExample, true)
err = sm.AddNewSubTask(2, proto.StepInit, "tidb1", []byte("test"), proto.TaskTypeExample, true)
require.NoError(t, err)

cnt, err = sm.GetSubtaskInStatesCnt(2, proto.TaskStateRevertPending)
require.NoError(t, err)
require.Equal(t, int64(1), cnt)

subtasks, err := sm.GetSucceedSubtasksByStep(2, proto.StepInit)
require.NoError(t, err)
require.Len(t, subtasks, 0)
err = sm.FinishSubtask(2, []byte{})
require.NoError(t, err)
subtasks, err = sm.GetSucceedSubtasksByStep(2, proto.StepInit)
require.NoError(t, err)
require.Len(t, subtasks, 1)
}

func TestBothGlobalAndSubTaskTable(t *testing.T) {
Expand Down
47 changes: 35 additions & 12 deletions disttask/framework/storage/task_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,28 +243,29 @@ func (stm *TaskManager) GetGlobalTaskByKey(key string) (task *proto.Task, err er
func row2SubTask(r chunk.Row) *proto.Subtask {
task := &proto.Subtask{
ID: r.GetInt64(0),
Type: proto.Int2Type(int(r.GetInt64(4))),
SchedulerID: r.GetString(5),
State: r.GetString(7),
Meta: r.GetBytes(11),
StartTime: r.GetUint64(9),
}
tid, err := strconv.Atoi(r.GetString(2))
Step: r.GetInt64(1),
Type: proto.Int2Type(int(r.GetInt64(5))),
SchedulerID: r.GetString(6),
State: r.GetString(8),
Meta: r.GetBytes(12),
StartTime: r.GetUint64(10),
}
tid, err := strconv.Atoi(r.GetString(3))
if err != nil {
logutil.BgLogger().Warn("unexpected task ID", zap.String("task ID", r.GetString(2)))
logutil.BgLogger().Warn("unexpected task ID", zap.String("task ID", r.GetString(3)))
}
task.TaskID = int64(tid)
return task
}

// AddNewSubTask adds a new task to subtask table.
func (stm *TaskManager) AddNewSubTask(globalTaskID int64, designatedTiDBID string, meta []byte, tp string, isRevert bool) error {
func (stm *TaskManager) AddNewSubTask(globalTaskID int64, step int64, designatedTiDBID string, meta []byte, tp string, isRevert bool) error {
st := proto.TaskStatePending
if isRevert {
st = proto.TaskStateRevertPending
}

_, err := stm.executeSQLWithNewSession(stm.ctx, "insert into mysql.tidb_background_subtask(task_key, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?)", globalTaskID, designatedTiDBID, meta, st, proto.Type2Int(tp), []byte{})
_, err := stm.executeSQLWithNewSession(stm.ctx, "insert into mysql.tidb_background_subtask(task_key, step, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?, %?)", globalTaskID, step, designatedTiDBID, meta, st, proto.Type2Int(tp), []byte{})
if err != nil {
return err
}
Expand All @@ -287,6 +288,22 @@ func (stm *TaskManager) GetSubtaskInStates(tidbID string, taskID int64, states .
return row2SubTask(rs[0]), nil
}

// GetSucceedSubtasksByStep gets the subtask in the success state.
func (stm *TaskManager) GetSucceedSubtasksByStep(taskID int64, step int64) ([]*proto.Subtask, error) {
rs, err := stm.executeSQLWithNewSession(stm.ctx, "select * from mysql.tidb_background_subtask where task_key = %? and state = %? and step = %?", taskID, proto.TaskStateSucceed, step)
if err != nil {
return nil, err
}
if len(rs) == 0 {
return nil, nil
}
subtasks := make([]*proto.Subtask, 0, len(rs))
for _, r := range rs {
subtasks = append(subtasks, row2SubTask(r))
}
return subtasks, nil
}

// GetSubtaskInStatesCnt gets the subtask count in the states.
func (stm *TaskManager) GetSubtaskInStatesCnt(taskID int64, states ...interface{}) (int64, error) {
args := []interface{}{taskID}
Expand Down Expand Up @@ -332,6 +349,12 @@ func (stm *TaskManager) UpdateSubtaskStateAndError(id int64, state string, subTa
return err
}

// FinishSubtask updates the subtask meta and mark state to succeed.
func (stm *TaskManager) FinishSubtask(id int64, meta []byte) error {
_, err := stm.executeSQLWithNewSession(stm.ctx, "update mysql.tidb_background_subtask set meta = %?, state = %? where id = %?", meta, proto.TaskStateSucceed, id)
return err
}

// UpdateSubtaskHeartbeat updates the heartbeat of the subtask.
func (stm *TaskManager) UpdateSubtaskHeartbeat(instanceID string, taskID int64, heartbeat time.Time) error {
_, err := stm.executeSQLWithNewSession(stm.ctx, "update mysql.tidb_background_subtask set exec_expired = %? where exec_id = %? and task_key = %?", heartbeat.String(), instanceID, taskID)
Expand Down Expand Up @@ -389,8 +412,8 @@ func (stm *TaskManager) UpdateGlobalTaskAndAddSubTasks(gTask *proto.Task, subtas

for _, subtask := range subtasks {
// TODO: insert subtasks in batch
_, err = execSQL(stm.ctx, se, "insert into mysql.tidb_background_subtask(task_key, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?)",
gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{})
_, err = execSQL(stm.ctx, se, "insert into mysql.tidb_background_subtask(step, task_key, exec_id, meta, state, type, checkpoint) values (%?, %?, %?, %?, %?, %?, %?)",
gTask.Step, gTask.ID, subtask.SchedulerID, subtask.Meta, subtaskState, proto.Type2Int(subtask.Type), []byte{})
if err != nil {
return err
}
Expand Down
5 changes: 5 additions & 0 deletions disttask/loaddata/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ func (*FlowHandle) ProcessNormalFlow(ctx context.Context, _ dispatcher.TaskHandl
return metaBytes, nil
}

// ProcessFinishFlow implements dispatcher.ProcessFinishFlow interface.
func (*FlowHandle) ProcessFinishFlow(_ context.Context, _ dispatcher.TaskHandle, _ *proto.Task, _ [][]byte) error {
return nil
}

// ProcessErrFlow implements dispatcher.ProcessErrFlow interface.
func (*FlowHandle) ProcessErrFlow(_ context.Context, _ dispatcher.TaskHandle, gTask *proto.Task, receiveErr [][]byte) ([]byte, error) {
logutil.BgLogger().Info("process error flow", zap.ByteStrings("error message", receiveErr))
Expand Down