Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: IndexLookUpExecutor refactor #4305

Merged
merged 22 commits into from Sep 6, 2017
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 9 additions & 5 deletions executor/distsql.go
Expand Up @@ -49,11 +49,15 @@ var LookupTableTaskChannelSize int32 = 50
// lookupTableTask is created from a partial result of an index request which
// contains the handles in those index keys.
type lookupTableTask struct {
handles []int64
rows []Row
cursor int
done bool
doneCh chan error
// If tasksErr is not nil, this is a invalid lookupTableTask, it would
// bypass the pickAndExecTask() function and make tableHandler halt.
tasksErr error
handles []int64
rows []Row
cursor int
// TODO: Remove those two fields, the new distsql don't need them.
done bool
doneCh chan error

// indexOrder map is used to save the original index order for the handles.
// Without this map, the original index order might be lost.
Expand Down
289 changes: 174 additions & 115 deletions executor/new_distsql.go
Expand Up @@ -15,12 +15,15 @@ package executor

import (
"sort"
"sync"
"sync/atomic"

"github.com/juju/errors"
"github.com/ngaut/log"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util/codec"
Expand Down Expand Up @@ -286,62 +289,193 @@ type IndexLookUpExecutor struct {
ctx context.Context
schema *expression.Schema
// This is the column that represent the handle, we can use handleCol.Index to know its position.
handleCol *expression.Column

// result returns one or more distsql.PartialResult.
result distsql.SelectResult

taskChan chan *lookupTableTask
tasksErr error
taskCurr *lookupTableTask

handleCol *expression.Column
tableRequest *tipb.DAGRequest
// columns are only required by union scan.
columns []*model.ColumnInfo
priority int
// All fields above is immutable.

indexHandler
tableHandler
finished chan struct{}
}

// indexHandler is used by IndexLookUpExecutor to maintain index lookup background goroutines.
type indexHandler struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment for the struct.

buildTableTasks func(handles []int64) []*lookupTableTask
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think variable function makes the code more complex.
How about just hold a reference to IndexLookUpExecutor in indexHandler?
So we can just implement buildTableTasks directly in indexHandler.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've consider that at the first time but abandon the idea.
IndexLookUpExecutor include indexHandler while indexHandler hold a reference to IndexLookUpExecutor, it make the code even more clumsy.
As long as the the data fields buildTableTasks visited is all immutable, I think using function pointer is affordable.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very common pattern, a member holds a reference to its parent.
Except that when the member is embedded.
I think it's better if you make indexHandler and tableHandler an instance instead of embedded member.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another solution is to add prefix to the functions, like indexOpen, tableOpen, indexFetchHandles.
So we don't need to define indexHandler and tableHandler.

Use embedded member to separate the logic is not straightforward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the object-oriented style, we don't care the inner state of the object, just call the methods provide by it. That's a good abstraction for here, I don't want IndexLookUpExecutor to cope with a huge couple of inner state, so I split them into smaller object.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't remember I saw any Go code like this, assign the parent's method to embedded member's function field.
It's like use a very rare way to solve a common problem.

wg sync.WaitGroup
}

func (ih *indexHandler) Open(kvRanges []kv.KeyRange, e *IndexLookUpExecutor, workCh chan<- *lookupTableTask, finished <-chan struct{}) error {
result, err := distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges,
e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority)
if err != nil {
return errors.Trace(err)
}
result.Fetch(e.ctx.GoCtx())
ih.buildTableTasks = e.buildTableTasks
ih.wg.Add(1)
go func() {
ctx, cancel := goctx.WithCancel(e.ctx.GoCtx())
ih.fetchHandles(result, workCh, ctx, finished)
cancel()
if err := result.Close(); err != nil {
log.Error("close SelectDAG result failed:", errors.ErrorStack(err))
}
close(workCh)
ih.wg.Done()
}()
return nil
}

// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks.
func (ih *indexHandler) fetchHandles(result distsql.SelectResult, workCh chan<- *lookupTableTask, ctx goctx.Context, finished <-chan struct{}) {
for {
handles, finish, err := extractHandlesFromIndexResult(result)
if err != nil {
workCh <- &lookupTableTask{
tasksErr: errors.Trace(err),
}
return
}
if finish {
return
}
tasks := ih.buildTableTasks(handles)
for _, task := range tasks {
select {
case <-ctx.Done():
return
case <-finished:
return
case workCh <- task:
}
}
}
}

func (ih *indexHandler) Close() {
ih.wg.Wait()
}

// tableHandler is used by IndexLookUpExecutor to maintain table lookup background goroutines.
type tableHandler struct {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

taskChan chan *lookupTableTask
taskCurr *lookupTableTask
executeTask func(task *lookupTableTask, goCtx goctx.Context)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto.

}

func (th *tableHandler) Open(e *IndexLookUpExecutor, workCh <-chan *lookupTableTask, finished <-chan struct{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we change Open to open?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current Executor interface need Open method.

th.executeTask = e.executeTask
th.taskChan = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
var wg sync.WaitGroup
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about make wg as a member of tableHandler like indexHandler?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable is not necessary to be the inner state of tableHandler, a local variable is enough.
tableHandler close doesn't wait on it.

wg.Add(lookupConcurrencyLimit)
for i := 0; i < lookupConcurrencyLimit; i++ {
ctx, cancel := goctx.WithCancel(e.ctx.GoCtx())
go func() {
th.pickAndExecTask(workCh, ctx, finished)
cancel()
wg.Done()
}()
}
go func() {
wg.Wait()
close(th.taskChan)
}()
}

func (th *tableHandler) pickAndExecTask(workCh <-chan *lookupTableTask, ctx goctx.Context, finished <-chan struct{}) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add comment on a background worker function.

for {
select {
case task := <-workCh:
if task == nil {
return
}
if task.tasksErr == nil {
th.executeTask(task, ctx)
th.taskChan <- task
} else {
th.taskChan <- task
return
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to return here?
When error occurred in fetchHandles, the workCh will be closed soon.

This can be simpler.

if task.tasksErr == nil {
    th.executeTask(task, ctx)
}
th.taskChan <- task

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If error happened, we should return.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many returns here, makes it hard to reason about.
If we remove the redundant returns, we can infer the sequence of execution easier.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And our code style is to handle error first, this looks awkward.

}
case <-ctx.Done():
return
case <-finished:
return
}
}
}

func (th *tableHandler) Next() (Row, error) {
for {
if th.taskCurr == nil {
taskCurr, ok := <-th.taskChan
if !ok {
return nil, nil
}
if taskCurr.tasksErr != nil {
return nil, errors.Trace(taskCurr.tasksErr)
}
th.taskCurr = taskCurr
}
row, err := th.taskCurr.getRow()
if err != nil {
return nil, errors.Trace(err)
}
if row != nil {
return row, nil
}
th.taskCurr = nil
}
}

func (th *tableHandler) Close() {
for range th.taskChan {
}
}

// Open implements the Executor Open interface.
func (e *IndexLookUpExecutor) Open() error {
e.finished = make(chan struct{})
fieldTypes := make([]*types.FieldType, len(e.index.Columns))
for i, v := range e.index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
kvRanges, err := indexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes)
kvRanges, err := e.indexRangesToKVRanges()
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority)
return e.open(kvRanges)
}

func (e *IndexLookUpExecutor) open(kvRanges []kv.KeyRange) error {
e.finished = make(chan struct{})
e.indexHandler = indexHandler{}
e.tableHandler = tableHandler{}

// workCh serves as a pipeline, indexHandler will write to workCh and tableHandler
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

workCh is not a pipeline, pipeline means one in one out, but workCh is one in many out.

// will read from workCh, so fetching index and getting table data can run concurrently.
workCh := make(chan *lookupTableTask, 1)
err := e.indexHandler.Open(kvRanges, e, workCh, e.finished)
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(e.ctx.GoCtx())

// Use a background goroutine to fetch index and put the result in e.taskChan.
// e.taskChan serves as a pipeline, so fetching index and getting table data can
// run concurrently.
e.taskChan = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
go e.fetchHandlesAndStartWorkers()
e.tableHandler.Open(e, workCh, e.finished)
return nil
}

func (e *IndexLookUpExecutor) indexRangesToKVRanges() ([]kv.KeyRange, error) {
fieldTypes := make([]*types.FieldType, len(e.index.Columns))
for i, v := range e.index.Columns {
fieldTypes[i] = &(e.table.Cols()[v.Offset].FieldType)
}
return indexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes)
}

// doRequestForDatums constructs kv ranges by datums. It is used by index look up executor.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment about implements DataReader interface.
And it is used by index look up join instead of index lookup executor

func (e *IndexLookUpExecutor) doRequestForDatums(values [][]types.Datum, goCtx goctx.Context) error {
e.finished = make(chan struct{})
kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values)
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GetClient(), e.ctx.GoCtx(), e.dagPB, kvRanges, e.ctx.GetSessionVars().DistSQLScanConcurrency, e.keepOrder, e.desc, getIsolationLevel(e.ctx.GetSessionVars()), e.priority)
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(goCtx)
e.taskChan = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
go e.fetchHandlesAndStartWorkers()
return nil
return e.open(kvRanges)
}

// executeTask executes the table look up tasks. We will construct a table reader and send request by handles.
Expand Down Expand Up @@ -376,6 +510,7 @@ func (e *IndexLookUpExecutor) executeTask(task *lookupTableTask, goCtx goctx.Con
if err != nil {
return
}
defer tableReader.Close()
for {
var row Row
row, err = tableReader.Next()
Expand All @@ -396,61 +531,6 @@ func (e *IndexLookUpExecutor) executeTask(task *lookupTableTask, goCtx goctx.Con
}
}

func (e *IndexLookUpExecutor) pickAndExecTask(workCh <-chan *lookupTableTask, txnCtx goctx.Context) {
childCtx, cancel := goctx.WithCancel(txnCtx)
defer cancel()
for {
select {
case task := <-workCh:
if task == nil {
return
}
e.executeTask(task, childCtx)
case <-childCtx.Done():
return
case <-e.finished:
return
}
}
}

// fetchHandlesAndStartWorkers fetches a batch of handles from index data and builds the index lookup tasks.
// We initialize some workers to execute this tasks concurrently and put the task to taskCh by order.
func (e *IndexLookUpExecutor) fetchHandlesAndStartWorkers() {
// The tasks in workCh will be consumed by workers. When all workers are busy, we should stop to push tasks to channel.
// So its length is one.
workCh := make(chan *lookupTableTask, 1)
defer func() {
close(workCh)
close(e.taskChan)
}()

lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
txnCtx := e.ctx.GoCtx()
for i := 0; i < lookupConcurrencyLimit; i++ {
go e.pickAndExecTask(workCh, txnCtx)
}

for {
handles, finish, err := extractHandlesFromIndexResult(e.result)
if err != nil || finish {
e.tasksErr = errors.Trace(err)
return
}
tasks := e.buildTableTasks(handles)
for _, task := range tasks {
select {
case <-txnCtx.Done():
return
case <-e.finished:
return
case workCh <- task:
}
e.taskChan <- task
}
}
}

func (e *IndexLookUpExecutor) buildTableTasks(handles []int64) []*lookupTableTask {
// Build tasks with increasing batch size.
var taskSizes []int
Expand Down Expand Up @@ -493,37 +573,16 @@ func (e *IndexLookUpExecutor) Schema() *expression.Schema {

// Close implements Exec Close interface.
func (e *IndexLookUpExecutor) Close() error {
// If this executor is closed once, we should not close it second time.
if e.taskChan == nil {
return nil
if e.finished != nil {
close(e.finished)
e.indexHandler.Close()
e.tableHandler.Close()
e.finished = nil
}
close(e.finished)
// Consume the task channel in case channel is full.
for range e.taskChan {
}
e.taskChan = nil
err := e.result.Close()
e.result = nil
return errors.Trace(err)
return nil
}

// Next implements Exec Next interface.
func (e *IndexLookUpExecutor) Next() (Row, error) {
for {
if e.taskCurr == nil {
taskCurr, ok := <-e.taskChan
if !ok {
return nil, e.tasksErr
}
e.taskCurr = taskCurr
}
row, err := e.taskCurr.getRow()
if err != nil {
return nil, errors.Trace(err)
}
if row != nil {
return row, nil
}
e.taskCurr = nil
}
return e.tableHandler.Next()
}