New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: add finish flow for distribute framework #43676
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
15/19 done
// ProcessNormalFlow processes the normal flow. | ||
// It receives the previous subtask metas to do some post-processing. | ||
// returns the new subtask metas and whether the error is retryable. | ||
ProcessNormalFlow(ctx context.Context, h TaskHandle, gTask *proto.Task, prevSubtaskMetas [][]byte) (subtaskMetas [][]byte, retryable bool, err error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe just add a new api to this TaskFlowHandle
to check retriable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I prefer do retry in business logic, so this retryable variable is just a temporary solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether being able to retry should be a attribute of each step, and the dispatcher or scheduler decides the follow-up action according to that attribute.
The user story could be:
- The framework provide the capability to set the attribute
- Each task / job / business (or whatever you called it) should have such kind of attributes or definitions which define how the framework trigger the steps and control the flow
- The task that need to be paused on the failure, define the value of the attribute
- The framework takes actions according to the definition of attributes
API may not be a good idea because we may need another system or mechanism to call such API accordingly which breaks the cohesion.
args := m.Called(ctx, subtask) | ||
return args.Error(0) | ||
if args.Error(1) != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this file manually mocked?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will change to generated in another pr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgmt
session/bootstrap.go
Outdated
if ver >= version145 { | ||
return | ||
} | ||
doReentrantDDL(s, "ALTER TABLE mysql.tidb_background_subtask ADD COLUMN `step` INT", infoschema.ErrColumnExists) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this will add step the last column of the table, but in row2SubTask
we use Step: r.GetInt64(1)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgtm
for _, subtask := range prevSubtasks { | ||
prevSubtaskMetas = append(prevSubtaskMetas, subtask.Meta) | ||
} | ||
metas, retryable, err := handle.ProcessNormalFlow(d.ctx, d, gTask, prevSubtaskMetas) | ||
if err != nil { | ||
logutil.BgLogger().Warn("gen dist-plan failed", zap.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add retryable
to this log?
for _, subtask := range prevSubtasks { | ||
prevSubtaskMetas = append(prevSubtaskMetas, subtask.Meta) | ||
} | ||
metas, retryable, err := handle.ProcessNormalFlow(d.ctx, d, gTask, prevSubtaskMetas) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does prevSubtasks
do? I don't think it's in use at the moment. In addition, this quantity may be very large, is there any other way to deal with it? For example, put it in gTask
to reshard the task.
Rest LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prevSubtaskMetas contains the result of previous step
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not a good idea.
If we need to record all the previous sub task, we may need to record the whole sub-task context because it could fail on any sub-task. The sub-tasks should not depends on each other, they should all depends on another thing, such as context or meta.
Refer to 'Dependence Inversion Principle'.
if stepIsFinished && len(errStr) == 0 && gTask.State == proto.TaskStateRunning { | ||
logutil.BgLogger().Info("detect task, this task finished a step", | ||
zap.Int64("taskID", gTask.ID), zap.String("state", gTask.State), zap.Int64("step", gTask.Step)) | ||
if err := d.processFinishFlow(gTask); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can put processFinishFlow
directly, and only import step2 or other special step needs to do this.
/retest |
1 similar comment
/retest |
/test build |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
/merge |
This pull request has been accepted and is ready to merge. Commit hash: 76f8c68
|
/merge |
This pull request has been accepted and is ready to merge. Commit hash: b85b9a2
|
What problem does this PR solve?
Issue Number: close #43675
Problem Summary:
What is changed and how it works?
OnSubtaskFinished
Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.