Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: fix a order by bug #4470

Merged
merged 7 commits into from Sep 8, 2017
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 4 additions & 7 deletions executor/distsql.go
Expand Up @@ -49,13 +49,10 @@ var LookupTableTaskChannelSize int32 = 50
// lookupTableTask is created from a partial result of an index request which
// contains the handles in those index keys.
type lookupTableTask struct {
// If tasksErr is not nil, this is a invalid lookupTableTask, it would
// bypass the pickAndExecTask() function and make tableHandler halt.
tasksErr error
handles []int64
rows []Row
cursor int
// TODO: Remove those two fields, the new distsql don't need them.
handles []int64
rows []Row
cursor int

done bool
doneCh chan error

Expand Down
9 changes: 9 additions & 0 deletions executor/executor_test.go
Expand Up @@ -672,6 +672,15 @@ func (s *testSuite) TestSelectOrderBy(c *C) {
tk.MustExec("insert into t values(2, 2, 2)")
tk.MustExec("insert into t values(3, 1, 3)")
tk.MustQuery("select * from t use index(idx) order by a desc limit 1").Check(testkit.Rows("3 1 3"))

// Test double read which needs to keep order.
tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int, b int, key b (b))")
tk.Se.GetSessionVars().IndexLookupSize = 3
for i := 0; i < 10; i++ {
tk.MustExec(fmt.Sprintf("insert into t values(%d, %d)", i, 10-i))
}
tk.MustQuery("select a from t use index(b) order by b").Check(testkit.Rows("9", "8", "7", "6", "5", "4", "3", "2", "1", "0"))
}

func (s *testSuite) TestSelectErrorRow(c *C) {
Expand Down
79 changes: 32 additions & 47 deletions executor/new_distsql.go
Expand Up @@ -300,7 +300,7 @@ type IndexLookUpExecutor struct {
tableWorker
finished chan struct{}

resultChan <-chan *lookupTableTask
resultCh chan *lookupTableTask
resultCurr *lookupTableTask
}

Expand All @@ -317,28 +317,33 @@ func (e *IndexLookUpExecutor) startIndexWorker(kvRanges []kv.KeyRange, workCh ch
return errors.Trace(err)
}
result.Fetch(e.ctx.GoCtx())
ih := &e.indexWorker
ih.wg.Add(1)
worker := &e.indexWorker
worker.wg.Add(1)
go func() {
ctx, cancel := goctx.WithCancel(e.ctx.GoCtx())
ih.fetchHandles(e, result, workCh, ctx, finished)
worker.fetchHandles(e, result, workCh, ctx, finished)
cancel()
if err := result.Close(); err != nil {
log.Error("close SelectDAG result failed:", errors.ErrorStack(err))
}
close(workCh)
ih.wg.Done()
close(e.resultCh)
worker.wg.Done()
}()
return nil
}

// fetchHandles fetches a batch of handles from index data and builds the index lookup tasks.
func (ih *indexWorker) fetchHandles(e *IndexLookUpExecutor, result distsql.SelectResult, workCh chan<- *lookupTableTask, ctx goctx.Context, finished <-chan struct{}) {
// The tasks are sent to workCh to be further processed by tableWorker, and sent to e.resultCh
// at the same time to keep data ordered.
func (worker *indexWorker) fetchHandles(e *IndexLookUpExecutor, result distsql.SelectResult, workCh chan<- *lookupTableTask, ctx goctx.Context, finished <-chan struct{}) {
for {
handles, finish, err := extractHandlesFromIndexResult(result)
if err != nil {
workCh <- &lookupTableTask{
tasksErr: errors.Trace(err),
doneCh := make(chan error, 1)
doneCh <- errors.Trace(err)
e.resultCh <- &lookupTableTask{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to select finished channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If finished is closed, means we've close the IndexLookUpExecutor and don't care the result any more.
IndexLookUpExecutor.Close() would drain the remain lookupTableTask and discard them.
Both the same result, so select finished here or not doesn't make any difference.

doneCh: doneCh,
}
return
}
Expand All @@ -353,64 +358,45 @@ func (ih *indexWorker) fetchHandles(e *IndexLookUpExecutor, result distsql.Selec
case <-finished:
return
case workCh <- task:
e.resultCh <- task
Copy link
Member

@coocood coocood Sep 7, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there need another select, if resultCh is full, and IndexLookupExecutor stopped reading then call Close, indexWorker may be blocked here forever.

But it only need to select finished, ctx.Done can be handled in the next loop.

The old implementation before refactor have this issue, it's very hard to trigger though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TestIndexDoubleReadClose actually cover the case and that's why my first commit fail to pass CI.
I've fix it by drain the resultCh in IndexLookUpExecutor.Close https://github.com/pingcap/tidb/pull/4470/files#diff-f77833966b29c503e5221562eb37f86fR552

}
}
}
}

func (ih *indexWorker) close() {
ih.wg.Wait()
}

func (e *IndexLookUpExecutor) waitIndexWorker() {
e.indexWorker.close()
func (worker *indexWorker) close() {
worker.wg.Wait()
}

// tableWorker is used by IndexLookUpExecutor to maintain table lookup background goroutines.
type tableWorker struct {
resultCh chan<- *lookupTableTask
wg sync.WaitGroup
wg sync.WaitGroup
}

// startTableWorker launch some background goroutines which pick tasks from workCh,
// execute the task and store the results in IndexLookUpExecutor's resultCh.
// startTableWorker launch some background goroutines which pick tasks from workCh and execute the task.
func (e *IndexLookUpExecutor) startTableWorker(workCh <-chan *lookupTableTask, finished <-chan struct{}) {
resultCh := make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))
th := &e.tableWorker
th.resultCh = resultCh
e.resultChan = resultCh
worker := &e.tableWorker
lookupConcurrencyLimit := e.ctx.GetSessionVars().IndexLookupConcurrency
th.wg.Add(lookupConcurrencyLimit)
worker.wg.Add(lookupConcurrencyLimit)
for i := 0; i < lookupConcurrencyLimit; i++ {
ctx, cancel := goctx.WithCancel(e.ctx.GoCtx())
go func() {
th.pickAndExecTask(e, workCh, ctx, finished)
worker.pickAndExecTask(e, workCh, ctx, finished)
cancel()
th.wg.Done()
worker.wg.Done()
}()
}
go func() {
th.wg.Wait()
close(th.resultCh)
}()
}

// pickAndExecTask picks tasks from workCh, execute them, and send the result to tableWorker's taskCh.
func (th *tableWorker) pickAndExecTask(e *IndexLookUpExecutor, workCh <-chan *lookupTableTask, ctx goctx.Context, finished <-chan struct{}) {
// pickAndExecTask picks tasks from workCh, and execute them.
func (worker *tableWorker) pickAndExecTask(e *IndexLookUpExecutor, workCh <-chan *lookupTableTask, ctx goctx.Context, finished <-chan struct{}) {
for {
select {
case task, ok := <-workCh:
if !ok {
return
}
if task.tasksErr != nil {
th.resultCh <- task
return
}

// TODO: The results can be simplified when new_distsql.go replace distsql.go totally.
e.executeTask(task, ctx)
th.resultCh <- task
case <-ctx.Done():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ctx is done, and then there can be remaining tasks in the workCh, which never executes.
Then if the IndexLookupExecutor is waiting for the task done channel, it will be blocked for ever.

One solution is to remove ctx.Done() solution here, waiting for indexWorker to close workCh.

The finished can be removed too, makes the behavior more predictable.
So we can make sure every tasks sent to the workCh will be done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter that workCh is not consumed.
indexWorker would not block by send to that channel, if ctx.Done() is true, indexWorker will exit and workCh would be GCed eventually.

Copy link
Member

@coocood coocood Sep 8, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But the session thread blocking in Next will not exit.

return
case <-finished:
Expand All @@ -419,9 +405,8 @@ func (th *tableWorker) pickAndExecTask(e *IndexLookUpExecutor, workCh <-chan *lo
}
}

func (e *IndexLookUpExecutor) waitTableWorker() {
for range e.resultChan {
}
func (worker *tableWorker) close() {
worker.wg.Wait()
}

// Open implements the Executor Open interface.
Expand All @@ -437,6 +422,7 @@ func (e *IndexLookUpExecutor) open(kvRanges []kv.KeyRange) error {
e.finished = make(chan struct{})
e.indexWorker = indexWorker{}
e.tableWorker = tableWorker{}
e.resultCh = make(chan *lookupTableTask, atomic.LoadInt32(&LookupTableTaskChannelSize))

// indexWorker will write to workCh and tableWorker will read from workCh,
// so fetching index and getting table data can run concurrently.
Expand Down Expand Up @@ -563,8 +549,10 @@ func (e *IndexLookUpExecutor) Schema() *expression.Schema {
func (e *IndexLookUpExecutor) Close() error {
if e.finished != nil {
close(e.finished)
e.waitIndexWorker()
e.waitTableWorker()
for range e.resultCh {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need comment for this line.

}
e.indexWorker.close()
e.tableWorker.close()
e.finished = nil
}
return nil
Expand All @@ -574,13 +562,10 @@ func (e *IndexLookUpExecutor) Close() error {
func (e *IndexLookUpExecutor) Next() (Row, error) {
for {
if e.resultCurr == nil {
resultCurr, ok := <-e.resultChan
resultCurr, ok := <-e.resultCh
if !ok {
return nil, nil
}
if resultCurr.tasksErr != nil {
return nil, errors.Trace(resultCurr.tasksErr)
}
e.resultCurr = resultCurr
}
row, err := e.resultCurr.getRow()
Expand Down