New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: support Chunk for UnionExec #5229
Conversation
executor/executor.go
Outdated
e.initialize(nil, false) | ||
e.initialized = true | ||
} | ||
result, closed := <-e.resultPool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second returned value is ok
, not closed
.
executor/executor.go
Outdated
wg sync.WaitGroup | ||
|
||
// For chunk execution. | ||
resoucePool chan *executorResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resouce
-> resource
executor/executor.go
Outdated
return | ||
} | ||
resource := <-e.resoucePool | ||
resource.err = errors.Trace(e.children[childID].NextChunk(resource.chk)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider type conversion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, we should
executor/executor.go
Outdated
|
||
chk.SwapColumns(result.chk) | ||
result.reset() | ||
e.resoucePool <- result |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The resource pool may be closed by waitAllFinished
.
How about just use a mutex protected slice for chunk reuse?
executor/executor.go
Outdated
func (e *UnionExec) waitAllFinished() { | ||
type executorResult struct { | ||
chk *chunk.Chunk | ||
err error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about adding a channel in executorResult to reuse the chunk and remove resourcePool
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll think about it
to #5261 |
@@ -51,6 +51,25 @@ func NewFieldType(tp byte) *FieldType { | |||
} | |||
} | |||
|
|||
// Equal checks whether two FieldType objects are equal. | |||
func (ft *FieldType) Equal(other *FieldType) bool { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ Equal/ Equals
plan/logical_plan_builder.go
Outdated
u.SetSchema(firstSchema) | ||
|
||
for childID, child := range u.children { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extract this loop as a member function.
executor/executor.go
Outdated
} | ||
|
||
type execResult struct { | ||
rows []Row | ||
err error | ||
} | ||
|
||
type executorResult struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comments for executorResult and execResult,
it's really similar.
executor/executor.go
Outdated
wg sync.WaitGroup | ||
|
||
// For chunk execution. | ||
resourcePool chan *executorResult |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add comments for resourcePool and resultPool
executor/executor.go
Outdated
} | ||
resource := <-e.resourcePool | ||
resource.err = errors.Trace(e.children[childID].NextChunk(resource.chk)) | ||
if resource.err != nil || resource.chk.NumRows() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may not need nested loop.
executor/executor.go
Outdated
return errors.Trace(err) | ||
} | ||
|
||
func (e *UnionExec) resultPuller(childID int) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to add a comment for the execution flow of this func.
util/chunk/chunk.go
Outdated
@@ -92,6 +92,11 @@ func (c *Chunk) addColumnByFieldType(fieldTp *types.FieldType, initCap int) { | |||
} | |||
} | |||
|
|||
// SwapColumns swap columns with another Chunk. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s/ swap/ swaps
executor/executor.go
Outdated
if resource.err != nil { | ||
e.stopFetchData.Store(true) | ||
select { | ||
case _, ok := <-e.finished: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to check ok
, it's always false, just return.
executor/executor.go
Outdated
} | ||
case e.resultPool <- result: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just send without select
, the resultPool
has enough capacity, it never blocks the sender.
/run-all-tests |
LGTM |
/run-mybatis-test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
NextChunk
for UnionExec