New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
executor: IndexLookUpExecutor refactor #4305
Conversation
tiancaiamao
commented
Aug 23, 2017
- When the executor close, all background goroutines should exit
- Objects should maintain smaller inner state, high cohesion and low coupling
- Avoid data race and make the code more reasonable
executor/distsql.go
Outdated
cursor int | ||
done bool | ||
doneCh chan error | ||
// if tasksErr is not nil, this is a invalid lookupTableTask. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if -> If
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can not understand the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indexHandler send lookupTableTask to a channel, then
tableHandler receive lookupTableTask from channel and further handle.
They communicate through channel, but what happens when indexHandler meet error?
it has to pass that information, so that's why there is a tasksErr
field.
@shenli
executor/new_distsql.go
Outdated
finished chan struct{} | ||
} | ||
|
||
type indexHandler struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment for the struct.
executor/new_distsql.go
Outdated
ih.wg.Wait() | ||
} | ||
|
||
type tableHandler struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto
executor/new_distsql.go
Outdated
} | ||
|
||
func (ih *indexHandler) Open(kvRanges []kv.KeyRange, e *IndexLookUpExecutor, workCh chan<- *lookupTableTask, finished <-chan struct{}) error { | ||
result, err := distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This line too long ...
executor/new_distsql.go
Outdated
ih.fetchHandles(result, workCh, ctx, finished) | ||
cancel() | ||
if err := result.Close(); err != nil { | ||
log.Errorf(errors.ErrorStack(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Complete this log.
379f2de
to
e3064bf
Compare
executor/new_distsql.go
Outdated
ih.wg.Wait() | ||
} | ||
|
||
// indexHandler is used by IndexLookUpExecutor to maintain table lookup background goroutines. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be tableHandler
?
executor/new_distsql.go
Outdated
executeTask func(task *lookupTableTask, goCtx goctx.Context) | ||
} | ||
|
||
func (th *tableHandler) Open(e *IndexLookUpExecutor, workCh <-chan *lookupTableTask, finished <-chan struct{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we change Open to open?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current Executor
interface need Open
method.
LGTM |
@hanfei1991 @lamxTyler PTAL |
executor/new_distsql.go
Outdated
} | ||
return indexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes) | ||
} | ||
|
||
// doRequestForDatums constructs kv ranges by datums. It is used by index look up executor. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment about implements DataReader
interface.
And it is used by index look up join instead of index lookup executor
executor/new_distsql.go
Outdated
e.indexHandler = indexHandler{} | ||
e.tableHandler = tableHandler{} | ||
|
||
// workCh serves as a pipeline, indexHandler will write to workCh and tableHandler |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
workCh is not a pipeline, pipeline means one in one out, but workCh is one in many out.
executor/new_distsql.go
Outdated
th.taskChan <- task | ||
} else { | ||
th.taskChan <- task | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we really need to return here?
When error occurred in fetchHandles
, the workCh
will be closed soon.
This can be simpler.
if task.tasksErr == nil {
th.executeTask(task, ctx)
}
th.taskChan <- task
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If error happened, we should return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are too many returns here, makes it hard to reason about.
If we remove the redundant returns, we can infer the sequence of execution easier.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And our code style is to handle error first, this looks awkward.
executor/new_distsql.go
Outdated
finished chan struct{} | ||
} | ||
|
||
// indexHandler is used by IndexLookUpExecutor to maintain index lookup background goroutines. | ||
type indexHandler struct { | ||
buildTableTasks func(handles []int64) []*lookupTableTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think variable function makes the code more complex.
How about just hold a reference to IndexLookUpExecutor
in indexHandler
?
So we can just implement buildTableTasks
directly in indexHandler
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've consider that at the first time but abandon the idea.
IndexLookUpExecutor
include indexHandler
while indexHandler
hold a reference to IndexLookUpExecutor
, it make the code even more clumsy.
As long as the the data fields buildTableTasks
visited is all immutable, I think using function pointer is affordable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very common pattern, a member holds a reference to its parent.
Except that when the member is embedded.
I think it's better if you make indexHandler
and tableHandler
an instance instead of embedded member.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another solution is to add prefix to the functions, like indexOpen
, tableOpen
, indexFetchHandles
.
So we don't need to define indexHandler
and tableHandler
.
Use embedded member to separate the logic is not straightforward.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the object-oriented style, we don't care the inner state of the object, just call the methods provide by it. That's a good abstraction for here, I don't want IndexLookUpExecutor
to cope with a huge couple of inner state, so I split them into smaller object.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't remember I saw any Go code like this, assign the parent's method to embedded member's function field.
It's like use a very rare way to solve a common problem.
executor/new_distsql.go
Outdated
type tableHandler struct { | ||
taskChan chan *lookupTableTask | ||
taskCurr *lookupTableTask | ||
executeTask func(task *lookupTableTask, goCtx goctx.Context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto.
f1cea0d
to
e1519eb
Compare
executor/new_distsql.go
Outdated
}() | ||
} | ||
|
||
func (th *tableHandler) pickAndExecTask(workCh <-chan *lookupTableTask, ctx goctx.Context, finished <-chan struct{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to add comment on a background worker function.
executor/new_distsql.go
Outdated
executeTask func(task *lookupTableTask, goCtx goctx.Context) | ||
} | ||
|
||
func (th *tableHandler) open(e *IndexLookUpExecutor, workCh <-chan *lookupTableTask, finished <-chan struct{}) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add comment for the function.
executor/new_distsql.go
Outdated
wg sync.WaitGroup | ||
} | ||
|
||
func (ih *indexHandler) open(kvRanges []kv.KeyRange, e *IndexLookUpExecutor, workCh chan<- *lookupTableTask, finished <-chan struct{}) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need comments about what does this function do.
PTAL @coocood |
executor/new_distsql.go
Outdated
wg sync.WaitGroup | ||
} | ||
|
||
// openIndexHandler open the indexHandler, launch some background worker goroutines and write the results to workCh. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It only launch one background goroutine to fetch handles.
send the results to workCh.
executor/new_distsql.go
Outdated
} | ||
} | ||
|
||
func (th *tableHandler) next() (Row, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think move this method to IndexLookUpExecutor is better, it's a foreground function, tableHandler is supposed to handle background work.
executor/new_distsql.go
Outdated
} | ||
} | ||
|
||
func (ih *indexHandler) close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
func (e *IndexLookUpExecutor) closeIndexHandler
For symmetricity.
executor/new_distsql.go
Outdated
} | ||
} | ||
|
||
func (th *tableHandler) close() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
executor/new_distsql.go
Outdated
for { | ||
if e.taskCurr == nil { | ||
taskCurr, ok := <-e.taskChan | ||
if th.taskCurr == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move taskCurr to IndexLookUpExecutor is better.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I move next()
function back to tableHandler
.
taskCurr
is tableHandler's
inner state, IndexLookUpExecutor
just call method provided by tableHandler
, never touch its inner state.
executor/new_distsql.go
Outdated
} | ||
} | ||
|
||
func (ih *indexHandler) closeIndexHandler() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method receiver should be IndexLookUpExecutor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This reminds me func (ih *indexHandler) close()
is better.
func (ih *indexHandler) close() {
ih.wg.Wait()
}
func (e *IndexLookUpExecutor) closeIndexHandler() {
// e visit indexHandler's inner state, that's bad.
e.indexHandler.wg.Wait()
}
// From the previous view, this is also asymmetric, isn't it?
xxx := NewXXX()
xxx.Close()
executor/new_distsql.go
Outdated
} | ||
} | ||
|
||
func (th *tableHandler) closeTableHandler() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto
executor/new_distsql.go
Outdated
th := &e.tableHandler | ||
th.taskChan = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize)) | ||
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency | ||
var wg sync.WaitGroup |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about make wg
as a member of tableHandler like indexHandler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable is not necessary to be the inner state of tableHandler
, a local variable is enough.
tableHandler
close doesn't wait on it.
LGTM |
/run-all-test |
executor/new_distsql.go
Outdated
|
||
// tableWorker is used by IndexLookUpExecutor to maintain table lookup background goroutines. | ||
type tableWorker struct { | ||
taskChan chan<- *lookupTableTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think resultChan
maybe a better name. task
is the type of the channel, it doesn't give any information.
the table worker pick a task from workChan
, after finished, send the task to resultChan
, this is easier to understand.
7a918a7
to
efb1b0b
Compare
executor/new_distsql.go
Outdated
finished chan struct{} | ||
|
||
taskChan <-chan *lookupTableTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resultChan
executor/new_distsql.go
Outdated
finished chan struct{} | ||
|
||
taskChan <-chan *lookupTableTask | ||
taskCurr *lookupTableTask |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
resultCurr
/run-all-test |
PTAL @coocood |
LGTM |