diff --git a/executor/builder.go b/executor/builder.go index 8bd2cf170f23c..44dc241a41668 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -760,24 +760,43 @@ func (b *executorBuilder) buildExplain(v *plannercore.Explain) Executor { } func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) Executor { - src := b.build(v.Children()[0]) + reader := b.build(v.Children()[0]) if b.err != nil { b.err = errors.Trace(b.err) return nil } - us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), src)} + us, err := b.buildUnionScanFromReader(reader, v) + if err != nil { + b.err = err + return nil + } + return us +} + +// buildUnionScanFromReader builds union scan executor from child executor. +// Note that this function may be called by inner workers of index lookup join concurrently. +// Be careful to avoid data race. +func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannercore.PhysicalUnionScan) (Executor, error) { + var err error + us := &UnionScanExec{baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), reader)} // Get the handle column index of the below plannercore. // We can guarantee that there must be only one col in the map. for _, cols := range v.Children()[0].Schema().TblID2Handle { us.belowHandleIndex = cols[0].Index } - switch x := src.(type) { + switch x := reader.(type) { case *TableReaderExecutor: us.desc = x.desc + // Union scan can only be in a write transaction, so DirtyDB should has non-nil value now, thus + // GetDirtyDB() is safe here. If this table has been modified in the transaction, non-nil DirtyTable + // can be found in DirtyDB now, so GetDirtyTable is safe; if this table has not been modified in the + // transaction, empty DirtyTable would be inserted into DirtyDB, it does not matter when multiple + // goroutines write empty DirtyTable to DirtyDB for this table concurrently. Thus we don't use lock + // to synchronize here. us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID) us.conditions = v.Conditions us.columns = x.columns - b.err = us.buildAndSortAddedRows() + err = us.buildAndSortAddedRows() case *IndexReaderExecutor: us.desc = x.desc for _, ic := range x.index.Columns { @@ -791,7 +810,7 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID) us.conditions = v.Conditions us.columns = x.columns - b.err = us.buildAndSortAddedRows() + err = us.buildAndSortAddedRows() case *IndexLookUpExecutor: us.desc = x.desc for _, ic := range x.index.Columns { @@ -805,16 +824,16 @@ func (b *executorBuilder) buildUnionScanExec(v *plannercore.PhysicalUnionScan) E us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID) us.conditions = v.Conditions us.columns = x.columns - b.err = us.buildAndSortAddedRows() + err = us.buildAndSortAddedRows() default: // The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting. - return src + return reader, nil } - if b.err != nil { - b.err = errors.Trace(b.err) - return nil + if err != nil { + err = errors.Trace(err) + return nil, err } - return us + return us, nil } // buildMergeJoin builds MergeJoinExec executor. @@ -1951,10 +1970,28 @@ func (builder *dataReaderBuilder) buildExecutorForIndexJoin(ctx context.Context, return builder.buildIndexReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) case *plannercore.PhysicalIndexLookUpReader: return builder.buildIndexLookUpReaderForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) + case *plannercore.PhysicalUnionScan: + return builder.buildUnionScanForIndexJoin(ctx, v, datums, IndexRanges, keyOff2IdxOff) } return nil, errors.New("Wrong plan type for dataReaderBuilder") } +func (builder *dataReaderBuilder) buildUnionScanForIndexJoin(ctx context.Context, v *plannercore.PhysicalUnionScan, + values [][]types.Datum, indexRanges []*ranger.Range, keyOff2IdxOff []int) (Executor, error) { + childBuilder := &dataReaderBuilder{Plan: v.Children()[0], executorBuilder: builder.executorBuilder} + reader, err := childBuilder.buildExecutorForIndexJoin(ctx, values, indexRanges, keyOff2IdxOff) + if err != nil { + return nil, err + } + e, err := builder.buildUnionScanFromReader(reader, v) + if err != nil { + return nil, err + } + us := e.(*UnionScanExec) + us.snapshotChunkBuffer = us.newFirstChunk() + return us, nil +} + func (builder *dataReaderBuilder) buildTableReaderForIndexJoin(ctx context.Context, v *plannercore.PhysicalTableReader, datums [][]types.Datum) (Executor, error) { e, err := buildNoRangeTableReader(builder.executorBuilder, v) if err != nil { diff --git a/executor/index_lookup_join_test.go b/executor/index_lookup_join_test.go index 30cc7b490768f..95752816dfca0 100644 --- a/executor/index_lookup_join_test.go +++ b/executor/index_lookup_join_test.go @@ -65,3 +65,44 @@ func (s *testSuite) TestIndexJoinOverflow(c *C) { tk.MustExec(`create table t2(a int unsigned, index idx(a));`) tk.MustQuery(`select /*+ TIDB_INLJ(t2) */ * from t1 join t2 on t1.a = t2.a;`).Check(testkit.Rows()) } + +func (s *testSuite) TestIndexJoinUnionScan(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t1(id int primary key, a int)") + tk.MustExec("create table t2(id int primary key, a int, b int, key idx_a(a))") + tk.MustExec("insert into t2 values (1,1,1),(4,2,4)") + tk.MustExec("begin") + tk.MustExec("insert into t1 values(2,2)") + tk.MustExec("insert into t2 values(2,2,2), (3,3,3)") + // TableScan below UnionScan + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.id").Check(testkit.Rows( + "2 2 2 2 2", + )) + // IndexLookUp below UnionScan + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ * from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "2 2 2 2 2", + "2 2 4 2 4", + )) + // IndexScan below UnionScan + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ t1.a, t2.a from t1 join t2 on t1.a = t2.a").Check(testkit.Rows( + "2 2", + "2 2", + )) + tk.MustExec("rollback") +} + +func (s *testSuite) TestBatchIndexJoinUnionScan(c *C) { + tk := testkit.NewTestKitWithInit(c, s.store) + tk.MustExec("create table t1(id int primary key, a int)") + tk.MustExec("create table t2(id int primary key, a int, key idx_a(a))") + tk.MustExec("set @@session.tidb_max_chunk_size=1") + tk.MustExec("set @@session.tidb_index_join_batch_size=1") + tk.MustExec("set @@session.tidb_index_lookup_join_concurrency=4") + tk.MustExec("begin") + tk.MustExec("insert into t1 values(1,1),(2,1),(3,1),(4,1)") + tk.MustExec("insert into t2 values(1,1)") + tk.MustQuery("select /*+ TIDB_INLJ(t1, t2)*/ count(*) from t1 join t2 on t1.a = t2.id").Check(testkit.Rows( + "4", + )) + tk.MustExec("rollback") +} diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 5447b8b15a02c..b3db34b247ecc 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -413,20 +413,23 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou innerJoinKeys = p.LeftJoinKeys outerJoinKeys = p.RightJoinKeys } - x, ok := innerChild.(*DataSource) - if !ok { + ds, isDataSource := innerChild.(*DataSource) + us, isUnionScan := innerChild.(*LogicalUnionScan) + if !isDataSource && !isUnionScan { return nil } + if isUnionScan { + ds = us.Children()[0].(*DataSource) + } var tblPath *accessPath - for _, path := range x.possibleAccessPaths { + for _, path := range ds.possibleAccessPaths { if path.isTablePath { tblPath = path break } } - if pkCol := x.getPKIsHandleCol(); pkCol != nil && tblPath != nil { + if pkCol := ds.getPKIsHandleCol(); pkCol != nil && tblPath != nil { keyOff2IdxOff := make([]int, len(innerJoinKeys)) - pkCol := x.getPKIsHandleCol() pkMatched := false for i, key := range innerJoinKeys { if !key.Equal(nil, pkCol) { @@ -437,7 +440,7 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou keyOff2IdxOff[i] = 0 } if pkMatched { - innerPlan := p.constructInnerTableScan(x, pkCol, outerJoinKeys) + innerPlan := p.constructInnerTableScan(ds, pkCol, outerJoinKeys, us) // Since the primary key means one value corresponding to exact one row, this will always be a no worse one // comparing to other index. return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, nil, keyOff2IdxOff) @@ -450,12 +453,12 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou remainedOfBest []expression.Expression keyOff2IdxOff []int ) - for _, path := range x.possibleAccessPaths { + for _, path := range ds.possibleAccessPaths { if path.isTablePath { continue } indexInfo := path.index - ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, x, innerJoinKeys) + ranges, remained, tmpKeyOff2IdxOff := p.buildRangeForIndexJoin(indexInfo, ds, innerJoinKeys) // We choose the index by the number of used columns of the range, the much the better. // Notice that there may be the cases like `t1.a=t2.a and b > 2 and b < 1`. So ranges can be nil though the conditions are valid. // But obviously when the range is nil, we don't need index join. @@ -468,20 +471,15 @@ func (p *LogicalJoin) getIndexJoinByOuterIdx(prop *property.PhysicalProperty, ou } } if bestIndexInfo != nil { - innerPlan := p.constructInnerIndexScan(x, bestIndexInfo, remainedOfBest, outerJoinKeys) + innerPlan := p.constructInnerIndexScan(ds, bestIndexInfo, remainedOfBest, outerJoinKeys, us) return p.constructIndexJoin(prop, innerJoinKeys, outerJoinKeys, outerIdx, innerPlan, rangesOfBest, keyOff2IdxOff) } return nil } // constructInnerTableScan is specially used to construct the inner plan for PhysicalIndexJoin. -func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column) PhysicalPlan { - var ranges []*ranger.Range - if pk != nil { - ranges = ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag)) - } else { - ranges = ranger.FullIntRange(false) - } +func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Column, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan { + ranges := ranger.FullIntRange(mysql.HasUnsignedFlag(pk.RetType.Flag)) ts := PhysicalTableScan{ Table: ds.tableInfo, Columns: ds.Columns, @@ -506,11 +504,23 @@ func (p *LogicalJoin) constructInnerTableScan(ds *DataSource, pk *expression.Col selStats := ts.stats.Scale(selectionFactor) ts.addPushedDownSelection(copTask, selStats) t := finishCopTask(ds.ctx, copTask) - return t.plan() + reader := t.plan() + return p.constructInnerUnionScan(us, reader) +} + +func (p *LogicalJoin) constructInnerUnionScan(us *LogicalUnionScan, reader PhysicalPlan) PhysicalPlan { + if us == nil { + return reader + } + // Use `reader.stats` instead of `us.stats` because it should be more accurate. No need to specify + // childrenReqProps now since we have got reader already. + physicalUnionScan := PhysicalUnionScan{Conditions: us.conditions}.init(us.ctx, reader.statsInfo(), nil) + physicalUnionScan.SetChildren(reader) + return physicalUnionScan } // constructInnerIndexScan is specially used to construct the inner plan for PhysicalIndexJoin. -func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column) PhysicalPlan { +func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexInfo, remainedConds []expression.Expression, outerJoinKeys []*expression.Column, us *LogicalUnionScan) PhysicalPlan { is := PhysicalIndexScan{ Table: ds.tableInfo, TableAsName: ds.TableAsName, @@ -567,7 +577,8 @@ func (p *LogicalJoin) constructInnerIndexScan(ds *DataSource, idx *model.IndexIn finalStats := ds.stats.ScaleByExpectCnt(selectivity * rowCount) is.addPushedDownSelection(cop, ds, path, finalStats) t := finishCopTask(ds.ctx, cop) - return t.plan() + reader := t.plan() + return p.constructInnerUnionScan(us, reader) } // buildRangeForIndexJoin checks whether this index can be used for building index join and return the range if this index is ok. diff --git a/planner/core/find_best_task.go b/planner/core/find_best_task.go index a24e8c0d9a672..c8892c0e5006c 100644 --- a/planner/core/find_best_task.go +++ b/planner/core/find_best_task.go @@ -82,6 +82,11 @@ func (p *LogicalTableDual) findBestTask(prop *property.PhysicalProperty) (task, // findBestTask implements LogicalPlan interface. func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty) (bestTask task, err error) { + // If p is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself, + // and set inner child prop nil, so here we do nothing. + if prop == nil { + return nil, nil + } // Look up the task with this prop in the task map. // It's used to reduce double counting. bestTask = p.getTask(prop) @@ -332,10 +337,8 @@ func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candida // findBestTask implements the PhysicalPlan interface. // It will enumerate all the available indices and choose a plan with least cost. func (ds *DataSource) findBestTask(prop *property.PhysicalProperty) (t task, err error) { - // If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself. - // So here we do nothing. - // TODO: Add a special prop to handle IndexJoin's inner plan. - // Then we can remove forceToTableScan and forceToIndexScan. + // If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself, + // and set inner child prop nil, so here we do nothing. if prop == nil { return nil, nil } diff --git a/planner/core/physical_plan_test.go b/planner/core/physical_plan_test.go index c7f94e5d2f9e3..cc32107bbcbdf 100644 --- a/planner/core/physical_plan_test.go +++ b/planner/core/physical_plan_test.go @@ -1442,3 +1442,52 @@ func (s *testPlanSuite) TestIndexJoinHint(c *C) { } } } + +func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) { + defer testleak.AfterTest(c)() + store, dom, err := newStoreWithBootstrap() + c.Assert(err, IsNil) + defer func() { + dom.Close() + store.Close() + }() + se, err := session.CreateSession4Test(store) + c.Assert(err, IsNil) + _, err = se.Execute(context.Background(), "use test") + c.Assert(err, IsNil) + tests := []struct { + sql string + best string + }{ + // Test Index Join + UnionScan + TableScan. + { + sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a", + best: "IndexJoin{TableReader(Table(t))->UnionScan([])->TableReader(Table(t))->UnionScan([])}(test.t1.a,test.t2.a)", + }, + // Test Index Join + UnionScan + DoubleRead. + { + sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.c", + best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexLookUp(Index(t.c_d_e)[[NULL,+inf]], Table(t))->UnionScan([])}(test.t1.a,test.t2.c)", + }, + // Test Index Join + UnionScan + IndexScan. + { + sql: "select /*+ TIDB_INLJ(t1, t2) */ t1.a , t2.c from t t1, t t2 where t1.a = t2.c", + best: "IndexJoin{TableReader(Table(t))->UnionScan([])->IndexReader(Index(t.c_d_e)[[NULL,+inf]])->UnionScan([])}(test.t1.a,test.t2.c)->Projection", + }, + } + for i, tt := range tests { + comment := Commentf("case:%v sql:%s", i, tt.sql) + stmt, err := s.ParseOneStmt(tt.sql, "", "") + c.Assert(err, IsNil, comment) + err = se.NewTxn() + c.Assert(err, IsNil) + // Make txn not read only. + txn, err := se.Txn(true) + c.Assert(err, IsNil) + txn.Set(kv.Key("AAA"), []byte("BBB")) + se.StmtCommit() + p, err := core.Optimize(se, stmt, s.is) + c.Assert(err, IsNil) + c.Assert(core.ToString(p), Equals, tt.best, comment) + } +}