Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: fix union scan for partitioned table #8871

Merged
merged 9 commits into from Feb 28, 2019
50 changes: 30 additions & 20 deletions executor/builder.go
Expand Up @@ -753,10 +753,10 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
// transaction, empty DirtyTable would be inserted into DirtyDB, it does not matter when multiple
// goroutines write empty DirtyTable to DirtyDB for this table concurrently. Thus we don't use lock
// to synchronize here.
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.physicalTableID)
us.conditions = v.Conditions
us.columns = x.columns
err = us.buildAndSortAddedRows()
err = us.buildAndSortAddedRows(x.table)
case *IndexReaderExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -767,10 +767,10 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
}
}
}
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.physicalTableID)
us.conditions = v.Conditions
us.columns = x.columns
err = us.buildAndSortAddedRows()
err = us.buildAndSortAddedRows(x.table)
case *IndexLookUpExecutor:
us.desc = x.desc
for _, ic := range x.index.Columns {
Expand All @@ -781,10 +781,10 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
}
}
}
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.table.Meta().ID)
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(x.physicalTableID)
us.conditions = v.Conditions
us.columns = x.columns
err = us.buildAndSortAddedRows()
err = us.buildAndSortAddedRows(x.table)
default:
// The mem table will not be written by sql directly, so we can omit the union scan to avoid err reporting.
return reader, nil
Expand Down Expand Up @@ -1573,12 +1573,12 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
return nil, errors.Trace(err)
}
ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
table, _ := b.is.TableByID(ts.Table.ID)
tbl, _ := b.is.TableByID(ts.Table.ID)
e := &TableReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPB: dagReq,
physicalTableID: ts.Table.ID,
table: table,
table: tbl,
keepOrder: ts.KeepOrder,
desc: ts.Desc,
columns: ts.Columns,
Expand All @@ -1589,6 +1589,8 @@ func buildNoRangeTableReader(b *executorBuilder, v *plannercore.PhysicalTableRea
}
if isPartition, physicalTableID := ts.IsPartition(); isPartition {
e.physicalTableID = physicalTableID
pt := tbl.(table.PartitionedTable)
e.table = pt.GetPartition(physicalTableID)
}
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, ts.Desc)
Expand Down Expand Up @@ -1630,12 +1632,19 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
return nil, errors.Trace(err)
}
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
table, _ := b.is.TableByID(is.Table.ID)
tbl, _ := b.is.TableByID(is.Table.ID)
isPartition, physicalTableID := is.IsPartition()
if isPartition {
pt := tbl.(table.PartitionedTable)
tbl = pt.GetPartition(physicalTableID)
} else {
physicalTableID = is.Table.ID
}
e := &IndexReaderExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPB: dagReq,
physicalTableID: is.Table.ID,
table: table,
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
Expand All @@ -1647,9 +1656,6 @@ func buildNoRangeIndexReader(b *executorBuilder, v *plannercore.PhysicalIndexRea
colLens: is.IdxColLens,
plans: v.IndexPlans,
}
if isPartition, physicalTableID := is.IsPartition(); isPartition {
e.physicalTableID = physicalTableID
}
if containsLimit(dagReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
} else {
Expand Down Expand Up @@ -1693,19 +1699,26 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
}
is := v.IndexPlans[0].(*plannercore.PhysicalIndexScan)
indexReq.OutputOffsets = []uint32{uint32(len(is.Index.Columns))}
table, _ := b.is.TableByID(is.Table.ID)
tbl, _ := b.is.TableByID(is.Table.ID)

for i := 0; i < v.Schema().Len(); i++ {
tableReq.OutputOffsets = append(tableReq.OutputOffsets, uint32(i))
}

ts := v.TablePlans[0].(*plannercore.PhysicalTableScan)
isPartition, physicalTableID := ts.IsPartition()
if isPartition {
pt := tbl.(table.PartitionedTable)
tbl = pt.GetPartition(physicalTableID)
} else {
physicalTableID = is.Table.ID
}

e := &IndexLookUpExecutor{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
dagPB: indexReq,
physicalTableID: is.Table.ID,
table: table,
physicalTableID: physicalTableID,
table: tbl,
index: is.Index,
keepOrder: is.KeepOrder,
desc: is.Desc,
Expand All @@ -1722,9 +1735,6 @@ func buildNoRangeIndexLookUpReader(b *executorBuilder, v *plannercore.PhysicalIn
idxPlans: v.IndexPlans,
tblPlans: v.TablePlans,
}
if isPartition, physicalTableID := ts.IsPartition(); isPartition {
e.physicalTableID = physicalTableID
}

if containsLimit(indexReq.Executors) {
e.feedback = statistics.NewQueryFeedback(0, nil, 0, is.Desc)
Expand Down
15 changes: 8 additions & 7 deletions executor/union_scan.go
Expand Up @@ -18,10 +18,12 @@ import (
"sort"
"time"

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)
Expand Down Expand Up @@ -120,6 +122,11 @@ func (us *UnionScanExec) Open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (us *UnionScanExec) Next(ctx context.Context, req *chunk.RecordBatch) error {
if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("unionScan.Next", opentracing.ChildOf(span.Context()))
defer span1.Finish()
}

if us.runtimeStats != nil {
start := time.Now()
defer func() { us.runtimeStats.Record(time.Since(start), req.NumRows()) }()
Expand Down Expand Up @@ -267,15 +274,9 @@ func (us *UnionScanExec) compare(a, b []types.Datum) (int, error) {
return cmp, nil
}

func (us *UnionScanExec) buildAndSortAddedRows() error {
func (us *UnionScanExec) buildAndSortAddedRows(t table.Table) error {
us.addedRows = make([][]types.Datum, 0, len(us.dirty.addedRows))
mutableRow := chunk.MutRowFromTypes(us.retTypes())
t, found := GetInfoSchema(us.ctx).TableByID(us.dirty.tid)
if !found {
// t is got from a snapshot InfoSchema, so it should be found, this branch should not happen.
return errors.Errorf("table not found (tid: %d, schema version: %d)",
us.dirty.tid, GetInfoSchema(us.ctx).SchemaMetaVersion())
}
cols := t.WritableCols()
for h := range us.dirty.addedRows {
newData := make([]types.Datum, 0, us.schema.Len())
Expand Down
15 changes: 15 additions & 0 deletions executor/union_scan_test.go
Expand Up @@ -77,6 +77,21 @@ func (s *testSuite2) TestDirtyTransaction(c *C) {
tk.MustExec("insert into t values(1, 2, 3, 4)")
tk.MustQuery("select * from t use index(idx) where c > 1 and d = 4").Check(testkit.Rows("1 2 3 4"))
tk.MustExec("commit")

// Test partitioned table use wrong table ID.
tk.MustExec(`drop table if exists t`)
tk.MustExec(`CREATE TABLE t (c1 smallint(6) NOT NULL, c2 char(5) DEFAULT NULL) PARTITION BY RANGE ( c1 ) (
PARTITION p0 VALUES LESS THAN (10),
PARTITION p1 VALUES LESS THAN (20),
PARTITION p2 VALUES LESS THAN (30),
PARTITION p3 VALUES LESS THAN (MAXVALUE)
)`)
tk.MustExec("begin")
tk.MustExec("insert into t values (1, 1)")
tk.MustQuery("select * from t").Check(testkit.Rows("1 1"))
tk.MustQuery("select * from t where c1 < 5").Check(testkit.Rows("1 1"))
alivxxx marked this conversation as resolved.
Show resolved Hide resolved
tk.MustQuery("select c2 from t").Check(testkit.Rows("1"))
tk.MustExec("commit")
}

func (s *testSuite2) TestUnionScanWithCastCondition(c *C) {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/physical_plan_test.go
Expand Up @@ -1339,7 +1339,7 @@ func (s *testPlanSuite) TestIndexJoinUnionScan(c *C) {
// Index Join + Union Scan + Union All is not supported now.
{
sql: "select /*+ TIDB_INLJ(t1, t2) */ * from t t1, t t2 where t1.a = t2.a",
best: "LeftHashJoin{UnionAll{TableReader(Table(t))->TableReader(Table(t))}->UnionScan([])->UnionAll{TableReader(Table(t))->TableReader(Table(t))}->UnionScan([])}(t1.a,t2.a)",
best: "LeftHashJoin{UnionAll{TableReader(Table(t))->UnionScan([])->TableReader(Table(t))->UnionScan([])}->UnionAll{TableReader(Table(t))->UnionScan([])->TableReader(Table(t))->UnionScan([])}}(t1.a,t2.a)",
is: pis,
},
}
Expand Down
22 changes: 22 additions & 0 deletions planner/core/rule_partition_processor.go
Expand Up @@ -47,6 +47,28 @@ func (s *partitionProcessor) rewriteDataSource(lp LogicalPlan) (LogicalPlan, err
switch lp.(type) {
case *DataSource:
return s.prune(lp.(*DataSource))
case *LogicalUnionScan:
us := lp.(*LogicalUnionScan)
ds := us.Children()[0]
ds, err := s.prune(ds.(*DataSource))
if err != nil {
return nil, err
}
if ua, ok := ds.(*LogicalUnionAll); ok {
// Adjust the UnionScan->Union->DataSource1, DataSource2 ... to
// Union->(UnionScan->DataSource1), (UnionScan->DataSource2)
children := make([]LogicalPlan, 0, len(ua.Children()))
for _, child := range ua.Children() {
us := LogicalUnionScan{}.Init(ua.ctx)
us.SetChildren(child)
children = append(children, us)
}
ua.SetChildren(children...)
return ua, nil
}
// Only one partition, no union all.
us.SetChildren(ds)
return us, nil
default:
children := lp.Children()
for i, child := range children {
Expand Down
2 changes: 1 addition & 1 deletion sessionctx/context.go
Expand Up @@ -80,7 +80,7 @@ type Context interface {
// StmtGetMutation gets the binlog mutation for current statement.
StmtGetMutation(int64) *binlog.TableMutation
// StmtAddDirtyTableOP adds the dirty table operation for current statement.
StmtAddDirtyTableOP(op int, tid int64, handle int64, row []types.Datum)
StmtAddDirtyTableOP(op int, physicalID int64, handle int64, row []types.Datum)
// DDLOwnerChecker returns owner.DDLOwnerChecker.
DDLOwnerChecker() owner.DDLOwnerChecker
}
Expand Down
2 changes: 1 addition & 1 deletion table/tables/tables.go
Expand Up @@ -1189,7 +1189,7 @@ func (ctx *ctxForPartitionExpr) StmtGetMutation(int64) *binlog.TableMutation {
}

// StmtAddDirtyTableOP adds the dirty table operation for current statement.
func (ctx *ctxForPartitionExpr) StmtAddDirtyTableOP(op int, tid int64, handle int64, row []types.Datum) {
func (ctx *ctxForPartitionExpr) StmtAddDirtyTableOP(op int, physicalID int64, handle int64, row []types.Datum) {
panic("not support")
}

Expand Down