New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: don't reuse Executor in IndexLookUpJoin, remove doRequestForDatums() #5031

Merged
merged 11 commits into from Nov 10, 2017
View
@@ -15,6 +15,7 @@ package executor
import (
"math"
"sort"
"time"
"github.com/juju/errors"
@@ -41,6 +42,7 @@ type executorBuilder struct {
ctx context.Context
is infoschema.InfoSchema
priority int
startTS uint64 // cached when the first time getStartTS() is called
// err is set when there is error happened during Executor building process.
err error
}
@@ -647,10 +649,16 @@ func (b *executorBuilder) buildTableDual(v *plan.TableDual) Executor {
}
func (b *executorBuilder) getStartTS() uint64 {
if b.startTS != 0 {
// Return the cached value.
return b.startTS
}
startTS := b.ctx.GetSessionVars().SnapshotTS
if startTS == 0 {
startTS = b.ctx.Txn().StartTS()
}
b.startTS = startTS
return startTS
}
@@ -946,21 +954,11 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plan.PhysicalIndexJoin) Execut
// for IndexLookUpJoin, left is always the outer side.
outerExec := b.build(v.Children()[0])

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

check b.err after building child 0

This comment has been minimized.

@tiancaiamao

tiancaiamao Nov 9, 2017

Contributor

It's done in executor/adaptor.go buildExecutor()

if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
innerExec := b.build(v.Children()[1]).(DataReader)
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
innerExecBuilder := &dataReaderBuilder{v.Children()[1], b}
return &IndexLookUpJoin{
baseExecutor: newBaseExecutor(v.Schema(), b.ctx, outerExec),
outerExec: outerExec,
innerExec: innerExec,
innerExecBuilder: innerExecBuilder,
outerKeys: v.OuterJoinKeys,
innerKeys: v.InnerJoinKeys,
outerFilter: v.LeftConditions,
@@ -972,7 +970,7 @@ func (b *executorBuilder) buildIndexLookUpJoin(v *plan.PhysicalIndexJoin) Execut
}
}
func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) Executor {
func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) *TableReaderExecutor {
dagReq := b.constructDAGReq(v.TablePlans)
if b.err != nil {
return nil
@@ -1003,7 +1001,7 @@ func (b *executorBuilder) buildTableReader(v *plan.PhysicalTableReader) Executor
return e
}
func (b *executorBuilder) buildIndexReader(v *plan.PhysicalIndexReader) Executor {
func (b *executorBuilder) buildIndexReader(v *plan.PhysicalIndexReader) *IndexReaderExecutor {
dagReq := b.constructDAGReq(v.IndexPlans)
if b.err != nil {
return nil
@@ -1035,7 +1033,7 @@ func (b *executorBuilder) buildIndexReader(v *plan.PhysicalIndexReader) Executor
return e
}
func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpReader) Executor {
func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpReader) *IndexLookUpExecutor {
indexReq := b.constructDAGReq(v.IndexPlans)
if b.err != nil {
return nil
@@ -1094,6 +1092,90 @@ func (b *executorBuilder) buildIndexLookUpReader(v *plan.PhysicalIndexLookUpRead
handleCol: handleCol,
priority: b.priority,
tableReaderSchema: tableReaderSchema,
builder: &dataReaderBuilder{executorBuilder: b},
}
return e
}
// dataReaderBuilder build a executor.

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

an executor

// The executor can be used to read data in the ranges which are constructed by datums.
// Differences from executorBuilder:
// 1. dataReaderBuilder calculate data range from argument, rather than plan.
// 2. the result executor is already opened.
type dataReaderBuilder struct {
plan.Plan
*executorBuilder
}
func (builder *dataReaderBuilder) BuildExecutorForDatums(goCtx goctx.Context, datums [][]types.Datum) (Executor, error) {

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member
  1. no need to export
  2. how about buildFromData ?
switch v := builder.Plan.(type) {
case *plan.PhysicalIndexReader:
return builder.buildIndexReaderForDatums(goCtx, v, datums)
case *plan.PhysicalTableReader:
return builder.buildTableReaderForDatums(goCtx, v, datums)
case *plan.PhysicalIndexLookUpReader:
return builder.buildIndexLookUpReaderForDatums(goCtx, v, datums)
}
return nil, errors.New("should never run here")

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

make error message more specific

}
func (builder *dataReaderBuilder) buildTableReaderForDatums(goCtx goctx.Context, v *plan.PhysicalTableReader, datums [][]types.Datum) (Executor, error) {
e := builder.executorBuilder.buildTableReader(v)

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

check builder.executorBuilder.err

handles := make([]int64, 0, len(datums))
for _, datum := range datums {
handles = append(handles, datum[0].GetInt64())
}
return builder.doRequestForHandles(goCtx, e, handles)
}
func (builder *dataReaderBuilder) doRequestForHandles(goCtx goctx.Context, e *TableReaderExecutor, handles []int64) (Executor, error) {

This comment has been minimized.

@coocood

coocood Nov 9, 2017

Member

buildTableReaderForHandles

sort.Sort(int64Slice(handles))

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

use Int64Slice in package github.com/cznic/sortutil

var b requestBuilder
kvReq, err := b.SetTableHandles(e.tableID, handles).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return nil, errors.Trace(err)
}
e.result, err = distsql.SelectDAG(goCtx, e.ctx.GetClient(), kvReq, e.schema.Len())
if err != nil {
return nil, errors.Trace(err)
}
e.result.Fetch(goCtx)
return e, nil
}
func (builder *dataReaderBuilder) buildIndexReaderForDatums(goCtx goctx.Context, v *plan.PhysicalIndexReader, values [][]types.Datum) (Executor, error) {
e := builder.executorBuilder.buildIndexReader(v)

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

ditto

This comment has been minimized.

@tiancaiamao

tiancaiamao Nov 9, 2017

Contributor

done

var b requestBuilder
kvReq, err := b.SetIndexValues(e.tableID, e.index.ID, values).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return nil, errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq, e.schema.Len())
if err != nil {
return nil, errors.Trace(err)
}
e.result.Fetch(goCtx)
return e, nil
}
func (builder *dataReaderBuilder) buildIndexLookUpReaderForDatums(goCtx goctx.Context, v *plan.PhysicalIndexLookUpReader, values [][]types.Datum) (Executor, error) {
e := builder.executorBuilder.buildIndexLookUpReader(v)

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

ditto

This comment has been minimized.

@tiancaiamao

tiancaiamao Nov 9, 2017

Contributor

done

kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values)
if err != nil {
return nil, errors.Trace(err)
}
err = e.open(kvRanges)
return e, errors.Trace(err)
}
View
@@ -44,10 +44,6 @@ var (
_ Executor = &TableReaderExecutor{}
_ Executor = &IndexReaderExecutor{}
_ Executor = &IndexLookUpExecutor{}
_ DataReader = &TableReaderExecutor{}
_ DataReader = &IndexReaderExecutor{}
_ DataReader = &IndexLookUpExecutor{}
)
// LookupTableTaskChannelSize represents the channel size of the index double read taskChan.
@@ -394,13 +390,6 @@ func setPBColumnsDefaultValue(ctx context.Context, pbColumns []*tipb.ColumnInfo,
return nil
}
// DataReader can send requests which ranges are constructed by datums.
type DataReader interface {
Executor
doRequestForDatums(goCtx goctx.Context, datums [][]types.Datum) error
}
// handleIsExtra checks whether this column is a extra handle column generated during plan building phase.
func handleIsExtra(col *expression.Column) bool {
if col != nil && col.ID == model.ExtraHandleID {
@@ -512,38 +501,6 @@ func startSpanFollowsContext(goCtx goctx.Context, operationName string) (opentra
return span, opentracing.ContextWithSpan(goCtx, span)
}
// doRequestForHandles constructs kv ranges by handles. It is used by index look up executor.
func (e *TableReaderExecutor) doRequestForHandles(goCtx goctx.Context, handles []int64) error {
sort.Sort(int64Slice(handles))
var builder requestBuilder
kvReq, err := builder.SetTableHandles(e.tableID, handles).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(goCtx, e.ctx.GetClient(), kvReq, e.schema.Len())
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(goCtx)
return nil
}
// doRequestForDatums constructs kv ranges by Datums. It is used by index look up join.
// Every lens for `datums` will always be one and must be type of int64.
func (e *TableReaderExecutor) doRequestForDatums(goCtx goctx.Context, datums [][]types.Datum) error {
handles := make([]int64, 0, len(datums))
for _, datum := range datums {
handles = append(handles, datum[0].GetInt64())
}
return errors.Trace(e.doRequestForHandles(goCtx, handles))
}
// IndexReaderExecutor sends dag request and reads index data from kv layer.
type IndexReaderExecutor struct {
table table.Table
@@ -640,27 +597,6 @@ func (e *IndexReaderExecutor) Open() error {
return nil
}
// doRequestForDatums constructs kv ranges by datums. It is used by index look up executor.
func (e *IndexReaderExecutor) doRequestForDatums(goCtx goctx.Context, values [][]types.Datum) error {
var builder requestBuilder
kvReq, err := builder.SetIndexValues(e.tableID, e.index.ID, values).
SetDAGRequest(e.dagPB).
SetDesc(e.desc).
SetKeepOrder(e.keepOrder).
SetPriority(e.priority).
SetFromSessionVars(e.ctx.GetSessionVars()).
Build()
if err != nil {
return errors.Trace(err)
}
e.result, err = distsql.SelectDAG(e.ctx.GoCtx(), e.ctx.GetClient(), kvReq, e.schema.Len())
if err != nil {
return errors.Trace(err)
}
e.result.Fetch(goCtx)
return nil
}
// IndexLookUpExecutor implements double read for index scan.
type IndexLookUpExecutor struct {
table table.Table
@@ -682,6 +618,7 @@ type IndexLookUpExecutor struct {
// columns are only required by union scan.
columns []*model.ColumnInfo
priority int
builder *dataReaderBuilder

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

s/builder/tableReaderBuilder/

// All fields above is immutable.
indexWorker
@@ -846,15 +783,6 @@ func (e *IndexLookUpExecutor) indexRangesToKVRanges() ([]kv.KeyRange, error) {
return indexRangesToKVRanges(e.ctx.GetSessionVars().StmtCtx, e.tableID, e.index.ID, e.ranges, fieldTypes)
}
// doRequestForDatums constructs kv ranges by datums. It is used by index look up join.
func (e *IndexLookUpExecutor) doRequestForDatums(goCtx goctx.Context, values [][]types.Datum) error {
kvRanges, err := indexValuesToKVRanges(e.tableID, e.index.ID, values)
if err != nil {
return errors.Trace(err)
}
return e.open(kvRanges)
}
// executeTask executes the table look up tasks. We will construct a table reader and send request by handles.
// Then we hold the returning rows and finish this task.
func (e *IndexLookUpExecutor) executeTask(task *lookupTableTask, goCtx goctx.Context) {
@@ -868,21 +796,23 @@ func (e *IndexLookUpExecutor) executeTask(task *lookupTableTask, goCtx goctx.Con
} else {
schema = e.schema
}
tableReader := &TableReaderExecutor{
var e1 Executor
e1, err = e.builder.doRequestForHandles(goCtx, &TableReaderExecutor{

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

s/e1/tableReader/

table: e.table,
tableID: e.tableID,
dagPB: e.tableRequest,
schema: schema,
ctx: e.ctx,
}
err = tableReader.doRequestForHandles(goCtx, task.handles)
}, task.handles)
if err != nil {

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

log this error ?

This comment has been minimized.

@tiancaiamao

tiancaiamao Nov 9, 2017

Contributor

done

return
}
defer terror.Call(tableReader.Close)
defer terror.Call(e1.Close)
for {
var row Row
row, err = tableReader.Next()
row, err = e1.Next()
if err != nil || row == nil {
break
}
@@ -86,7 +86,7 @@ type IndexLookUpJoin struct {
baseExecutor
outerExec Executor
innerExec DataReader
innerExecBuilder *dataReaderBuilder
outerKeys []*expression.Column
innerKeys []*expression.Column
outerFilter expression.CNFExprs
@@ -129,7 +129,7 @@ func (e *IndexLookUpJoin) Close() error {
// release all resource references.
e.outerExec = nil
e.innerExec = nil
e.innerExecBuilder = nil
e.outerKeys = nil
e.innerKeys = nil
e.outerFilter = nil
@@ -262,16 +262,16 @@ func (e *IndexLookUpJoin) deDuplicateRequestRows(requestRows [][]types.Datum, re
// fetchSortedInners will join the outer rows and inner rows and store them to resultBuffer.
func (e *IndexLookUpJoin) fetchSortedInners(requestRows [][]types.Datum) error {
if err := e.innerExec.doRequestForDatums(e.ctx.GoCtx(), requestRows); err != nil {
e1, err := e.innerExecBuilder.BuildExecutorForDatums(e.ctx.GoCtx(), requestRows)

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

s/e1/innerExec/

if err != nil {
return errors.Trace(err)
}
defer terror.Call(e.innerExec.Close)
defer terror.Call(e1.Close)
for {
innerRow, err := e.innerExec.Next()
innerRow, err1 := e1.Next()

This comment has been minimized.

@zz-jason

zz-jason Nov 9, 2017

Member

s/err1/err/

if err != nil {
return errors.Trace(err)
return errors.Trace(err1)
} else if innerRow == nil {
break
}
@@ -298,7 +298,6 @@ func (e *IndexLookUpJoin) fetchSortedInners(requestRows [][]types.Datum) error {
innerJoinKey = innerJoinKey[len(innerJoinKey):]
}
var err error
e.innerOrderedRows.keys, err = e.constructJoinKeys(innerJoinKeys)
if err != nil {
return errors.Trace(err)
ProTip! Use n and p to navigate between commits in a pull request.