Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

disttask: add load data dispatcher #42592

Merged
merged 8 commits into from Mar 29, 2023

Conversation

GMHDBJD
Copy link
Contributor

@GMHDBJD GMHDBJD commented Mar 26, 2023

What problem does this PR solve?

Issue Number: close #42591

Problem Summary:

What is changed and how it works?

  • split files into chunks

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Side effects

  • Performance regression: Consumes more CPU
  • Performance regression: Consumes more Memory
  • Breaking backward compatibility

Documentation

  • Affects user behaviors
  • Contains syntax changes
  • Contains variable changes
  • Contains experimental features
  • Changes MySQL compatibility

Release note

Please refer to Release Notes Language Style Guide to write a quality release note.

None

@ti-chi-bot
Copy link
Member

ti-chi-bot commented Mar 26, 2023

[REVIEW NOTIFICATION]

This pull request has been approved by:

  • D3Hunter
  • lance6716

To complete the pull request process, please ask the reviewers in the list to review by filling /cc @reviewer in the comment.
After your PR has acquired the required number of LGTMs, you can assign this pull request to the committer in the list by filling /assign @committer in the comment to help you merge this pull request.

The full list of commands accepted by this bot can be found here.

Reviewer can indicate their review by submitting an approval review.
Reviewer can cancel approval by submitting a request changes review.

@ti-chi-bot ti-chi-bot added do-not-merge/invalid-title release-note-none size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Mar 26, 2023
@GMHDBJD GMHDBJD changed the title Add load data dispatcher disttask: add load data dispatcher Mar 26, 2023
)

go_test(
name = "loaddata_test",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add

timeout = "short",
flaky = True,

@ti-chi-bot ti-chi-bot added size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. and removed size/XL Denotes a PR that changes 500-999 lines, ignoring generated files. labels Mar 27, 2023
@GMHDBJD
Copy link
Contributor Author

GMHDBJD commented Mar 27, 2023

/retest

disttask/framework/dispatcher/dispatcher.go Outdated Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in https://github.com/stretchr/testify#mock-package

You can use the mockery tool to autogenerate the mock code against an interface as well, making using mocks much quicker.

Can we add a makefile entry for it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/util/syncutil"
)

// TaskFlowHandle is used to control the process operations for each global task.
type TaskFlowHandle interface {
ProcessNormalFlow(d Dispatch, gTask *proto.Task) (metas [][]byte, err error)
ProcessErrFlow(d Dispatch, gTask *proto.Task, receive string) (meta []byte, err error)
ProcessNormalFlow(ctx context.Context, h Handle, gTask *proto.Task) (metas [][]byte, err error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the comment says this is for each task, so why we need to specify gTask as argument?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TaskFlowHandle gets the gTask from dispatcher, and then split the gTask to subtask

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems different types of task has different TaskFlowHandle implementation, can we use gTask to create a new struct as the receiver of these methods? In other words, ProcessNormalFlow is a method of task object, to avoid use gTask as parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL @zimulala , maybe let RegisterTaskFlowHandle register a factory function

"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/util/syncutil"
)

// TaskFlowHandle is used to control the process operations for each global task.
type TaskFlowHandle interface {
ProcessNormalFlow(d Dispatch, gTask *proto.Task) (metas [][]byte, err error)
ProcessErrFlow(d Dispatch, gTask *proto.Task, receive string) (meta []byte, err error)
ProcessNormalFlow(ctx context.Context, h Handle, gTask *proto.Task) (metas [][]byte, err error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems different types of task has different TaskFlowHandle implementation, can we use gTask to create a new struct as the receiver of these methods? In other words, ProcessNormalFlow is a method of task object, to avoid use gTask as parameter.

disttask/loaddata/dispatcher.go Show resolved Hide resolved

switch gTask.Step {
case Import:
gTask.State = proto.TaskStateSucceed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference of Step and State? Please add comments

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will each step has the full lifetime of state transition? Or step is changed only in TaskStateRunning and only the final step will finish the state?

Please add comment

}
metaBytes = append(metaBytes, bs)
}
gTask.Step = Import
Copy link
Contributor

@lance6716 lance6716 Mar 28, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please add comment to the interface to mention that gTask will be modified inside this method

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zimulala Not sure if you can fix it in your PR conveniently

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll comment on that

disttask/loaddata/dispatcher.go Show resolved Hide resolved
}

func init() {
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &FlowHandle{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no need to use lock inside RegisterTaskFlowHandle?

https://go.dev/ref/spec

Package initialization—variable initialization and the invocation of init functions—happens in a single goroutine, sequentially, one package at a time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we're not sure that all the calls are inside the init function


switch gTask.Step {
case Import:
gTask.State = proto.TaskStateSucceed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will each step has the full lifetime of state transition? Or step is changed only in TaskStateRunning and only the final step will finish the state?

Please add comment

"github.com/pingcap/tidb/disttask/framework/proto"
"github.com/pingcap/tidb/util/syncutil"
)

// TaskFlowHandle is used to control the process operations for each global task.
type TaskFlowHandle interface {
ProcessNormalFlow(d Dispatch, gTask *proto.Task) (metas [][]byte, err error)
ProcessErrFlow(d Dispatch, gTask *proto.Task, receive string) (meta []byte, err error)
ProcessNormalFlow(ctx context.Context, h Handle, gTask *proto.Task) (metas [][]byte, err error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PTAL @zimulala , maybe let RegisterTaskFlowHandle register a factory function

}
metaBytes = append(metaBytes, bs)
}
gTask.Step = Import
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@zimulala Not sure if you can fix it in your PR conveniently

}

func init() {
dispatcher.RegisterTaskFlowHandle(proto.LoadData, &FlowHandle{})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ti-chi-bot ti-chi-bot added the status/LGT1 Indicates that a PR has LGTM 1. label Mar 29, 2023
@@ -50,6 +50,12 @@ type Dispatch interface {
Stop()
}

// Handle provides the interface for operations needed by task flow handles.
type Handle interface {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
type Handle interface {
type TaskHandle interface {

// Handle provides the interface for operations needed by task flow handles.
type Handle interface {
// GetTaskAllInstances gets handles the task's all available instances.
GetTaskAllInstances(ctx context.Context, gTaskID int64) ([]string, error)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

GetAllSchedulerIDs?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

generate it with mockgen? and put it into Makefile

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, in next pr #42592 (comment)

@@ -16,6 +17,7 @@ go_library(
"//util/logutil",
"//util/syncutil",
"@com_github_pingcap_errors//:errors",
"@com_github_stretchr_testify//mock",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we can add race = "on", to go_test for finding the data race. You can decide whether to open it according to the situation.

@ti-chi-bot ti-chi-bot added status/LGT2 Indicates that a PR has LGTM 2. and removed status/LGT1 Indicates that a PR has LGTM 1. labels Mar 29, 2023
@GMHDBJD
Copy link
Contributor Author

GMHDBJD commented Mar 29, 2023

/merge

@ti-chi-bot
Copy link
Member

This pull request has been accepted and is ready to merge.

Commit hash: 4adce49

@ti-chi-bot ti-chi-bot added the status/can-merge Indicates a PR has been approved by a committer. label Mar 29, 2023
@ti-chi-bot ti-chi-bot merged commit af62a5c into pingcap:master Mar 29, 2023
14 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
release-note-none size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. status/can-merge Indicates a PR has been approved by a committer. status/LGT2 Indicates that a PR has LGTM 2.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

add dispatcher for load data
6 participants