Skip to content

Commit

Permalink
executor, plan: remove Exists (#7207)
Browse files Browse the repository at this point in the history
  • Loading branch information
winoros authored and zz-jason committed Aug 9, 2018
1 parent b449589 commit be2c276
Show file tree
Hide file tree
Showing 15 changed files with 100 additions and 210 deletions.
18 changes: 9 additions & 9 deletions cmd/explaintest/r/explain_easy.result
Expand Up @@ -9,15 +9,15 @@ set @@session.tidb_opt_insubquery_unfold = 1;
set @@session.tidb_opt_agg_push_down = 1;
explain select * from t3 where exists (select s.a from t3 s having sum(s.a) = t3.a );
id count task operator info
Projection_13 8000.00 root test.t3.a, test.t3.b, test.t3.c, test.t3.d
└─HashLeftJoin_14 8000.00 root semi join, inner:StreamAgg_30, equal:[eq(cast(test.t3.a), sel_agg_1)]
├─Projection_15 10000.00 root test.t3.a, test.t3.b, test.t3.c, test.t3.d, cast(test.t3.a)
│ └─TableReader_17 10000.00 root data:TableScan_16
│ └─TableScan_16 10000.00 cop table:t3, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_30 1.00 root funcs:sum(col_0)
└─TableReader_31 1.00 root data:StreamAgg_22
└─StreamAgg_22 1.00 cop funcs:sum(s.a)
└─TableScan_29 10000.00 cop table:s, range:[-inf,+inf], keep order:false, stats:pseudo
Projection_12 8000.00 root test.t3.a, test.t3.b, test.t3.c, test.t3.d
└─HashLeftJoin_13 8000.00 root semi join, inner:StreamAgg_29, equal:[eq(cast(test.t3.a), sel_agg_1)]
├─Projection_14 10000.00 root test.t3.a, test.t3.b, test.t3.c, test.t3.d, cast(test.t3.a)
│ └─TableReader_16 10000.00 root data:TableScan_15
│ └─TableScan_15 10000.00 cop table:t3, range:[-inf,+inf], keep order:false, stats:pseudo
└─StreamAgg_29 1.00 root funcs:sum(col_0)
└─TableReader_30 1.00 root data:StreamAgg_21
└─StreamAgg_21 1.00 cop funcs:sum(s.a)
└─TableScan_28 10000.00 cop table:s, range:[-inf,+inf], keep order:false, stats:pseudo
explain select * from t1;
id count task operator info
TableReader_5 10000.00 root data:TableScan_4
Expand Down
18 changes: 9 additions & 9 deletions cmd/explaintest/r/explain_easy_stats.result
Expand Up @@ -12,15 +12,15 @@ set @@session.tidb_opt_insubquery_unfold = 1;
set @@session.tidb_opt_agg_push_down = 1;
explain select * from t3 where exists (select s.a from t3 s having sum(s.a) = t3.a );
id count task operator info
Projection_13 1600.00 root test.t3.a, test.t3.b, test.t3.c, test.t3.d
└─HashLeftJoin_14 1600.00 root semi join, inner:StreamAgg_30, equal:[eq(cast(test.t3.a), sel_agg_1)]
├─Projection_15 2000.00 root test.t3.a, test.t3.b, test.t3.c, test.t3.d, cast(test.t3.a)
│ └─TableReader_17 2000.00 root data:TableScan_16
│ └─TableScan_16 2000.00 cop table:t3, range:[-inf,+inf], keep order:false
└─StreamAgg_30 1.00 root funcs:sum(col_0)
└─TableReader_31 1.00 root data:StreamAgg_22
└─StreamAgg_22 1.00 cop funcs:sum(s.a)
└─TableScan_29 2000.00 cop table:s, range:[-inf,+inf], keep order:false
Projection_12 1600.00 root test.t3.a, test.t3.b, test.t3.c, test.t3.d
└─HashLeftJoin_13 1600.00 root semi join, inner:StreamAgg_29, equal:[eq(cast(test.t3.a), sel_agg_1)]
├─Projection_14 2000.00 root test.t3.a, test.t3.b, test.t3.c, test.t3.d, cast(test.t3.a)
│ └─TableReader_16 2000.00 root data:TableScan_15
│ └─TableScan_15 2000.00 cop table:t3, range:[-inf,+inf], keep order:false
└─StreamAgg_29 1.00 root funcs:sum(col_0)
└─TableReader_30 1.00 root data:StreamAgg_21
└─StreamAgg_21 1.00 cop funcs:sum(s.a)
└─TableScan_28 2000.00 cop table:s, range:[-inf,+inf], keep order:false
explain select * from t1;
id count task operator info
TableReader_5 1999.00 root data:TableScan_4
Expand Down
102 changes: 51 additions & 51 deletions cmd/explaintest/r/tpch.result
Expand Up @@ -295,17 +295,17 @@ o_orderpriority
order by
o_orderpriority;
id count task operator info
Sort_11 1.00 root tpch.orders.o_orderpriority:asc
└─Projection_13 1.00 root tpch.orders.o_orderpriority, 8_col_0
└─HashAgg_16 1.00 root group by:tpch.orders.o_orderpriority, funcs:count(1), firstrow(tpch.orders.o_orderpriority)
└─IndexJoin_22 2340750.00 root semi join, inner:IndexLookUp_21, outer key:tpch.orders.o_orderkey, inner key:tpch.lineitem.l_orderkey
├─TableReader_34 2925937.50 root data:Selection_33
│ └─Selection_33 2925937.50 cop ge(tpch.orders.o_orderdate, 1995-01-01 00:00:00.000000), lt(tpch.orders.o_orderdate, 1995-04-01)
│ └─TableScan_32 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
└─IndexLookUp_21 240004648.80 root
├─IndexScan_18 1.00 cop table:lineitem, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.orders.o_orderkey], keep order:false
└─Selection_20 240004648.80 cop lt(tpch.lineitem.l_commitdate, tpch.lineitem.l_receiptdate)
└─TableScan_19 1.00 cop table:lineitem, keep order:false
Sort_10 1.00 root tpch.orders.o_orderpriority:asc
└─Projection_12 1.00 root tpch.orders.o_orderpriority, 7_col_0
└─HashAgg_15 1.00 root group by:tpch.orders.o_orderpriority, funcs:count(1), firstrow(tpch.orders.o_orderpriority)
└─IndexJoin_21 2340750.00 root semi join, inner:IndexLookUp_20, outer key:tpch.orders.o_orderkey, inner key:tpch.lineitem.l_orderkey
├─TableReader_33 2925937.50 root data:Selection_32
│ └─Selection_32 2925937.50 cop ge(tpch.orders.o_orderdate, 1995-01-01 00:00:00.000000), lt(tpch.orders.o_orderdate, 1995-04-01)
│ └─TableScan_31 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
└─IndexLookUp_20 240004648.80 root
├─IndexScan_17 1.00 cop table:lineitem, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.orders.o_orderkey], keep order:false
└─Selection_19 240004648.80 cop lt(tpch.lineitem.l_commitdate, tpch.lineitem.l_receiptdate)
└─TableScan_18 1.00 cop table:lineitem, keep order:false
/*
Q5 Local Supplier Volume Query
This query lists the revenue volume done through local suppliers.
Expand Down Expand Up @@ -1212,34 +1212,34 @@ numwait desc,
s_name
limit 100;
id count task operator info
Projection_27 100.00 root tpch.supplier.s_name, 19_col_0
└─TopN_30 100.00 root 19_col_0:desc, tpch.supplier.s_name:asc, offset:0, count:100
└─HashAgg_33 320000.00 root group by:tpch.supplier.s_name, funcs:count(1), firstrow(tpch.supplier.s_name)
└─Selection_34 3786715.90 root not(18_aux_0)
└─IndexJoin_40 4733394.87 root left outer semi join, inner:IndexLookUp_39, outer key:l1.l_orderkey, inner key:l3.l_orderkey, other cond:ne(l3.l_suppkey, l1.l_suppkey)
├─IndexJoin_84 4733394.87 root semi join, inner:IndexLookUp_83, outer key:l1.l_orderkey, inner key:l2.l_orderkey, other cond:ne(l2.l_suppkey, l1.l_suppkey)
│ ├─HashLeftJoin_90 5916743.59 root inner join, inner:TableReader_119, equal:[eq(tpch.supplier.s_nationkey, tpch.nation.n_nationkey)]
│ │ ├─HashLeftJoin_95 147918589.81 root inner join, inner:TableReader_116, equal:[eq(l1.l_suppkey, tpch.supplier.s_suppkey)]
│ │ │ ├─IndexJoin_102 147918589.81 root inner join, inner:IndexLookUp_101, outer key:tpch.orders.o_orderkey, inner key:l1.l_orderkey
│ │ │ │ ├─TableReader_111 36517371.00 root data:Selection_110
│ │ │ │ │ └─Selection_110 36517371.00 cop eq(tpch.orders.o_orderstatus, "F")
│ │ │ │ │ └─TableScan_109 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
│ │ │ │ └─IndexLookUp_101 240004648.80 root
│ │ │ │ ├─IndexScan_98 1.00 cop table:l1, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.orders.o_orderkey], keep order:false
│ │ │ │ └─Selection_100 240004648.80 cop gt(l1.l_receiptdate, l1.l_commitdate)
│ │ │ │ └─TableScan_99 1.00 cop table:lineitem, keep order:false
│ │ │ └─TableReader_116 500000.00 root data:TableScan_115
│ │ │ └─TableScan_115 500000.00 cop table:supplier, range:[-inf,+inf], keep order:false
│ │ └─TableReader_119 1.00 root data:Selection_118
│ │ └─Selection_118 1.00 cop eq(tpch.nation.n_name, "EGYPT")
│ │ └─TableScan_117 25.00 cop table:nation, range:[-inf,+inf], keep order:false
│ └─IndexLookUp_83 1.00 root
│ ├─IndexScan_81 1.00 cop table:l2, index:L_ORDERKEY, L_LINENUMBER, range: decided by [l1.l_orderkey], keep order:false
│ └─TableScan_82 1.00 cop table:lineitem, keep order:false
└─IndexLookUp_39 240004648.80 root
├─IndexScan_36 1.00 cop table:l3, index:L_ORDERKEY, L_LINENUMBER, range: decided by [l1.l_orderkey], keep order:false
└─Selection_38 240004648.80 cop gt(l3.l_receiptdate, l3.l_commitdate)
└─TableScan_37 1.00 cop table:lineitem, keep order:false
Projection_25 100.00 root tpch.supplier.s_name, 17_col_0
└─TopN_28 100.00 root 17_col_0:desc, tpch.supplier.s_name:asc, offset:0, count:100
└─HashAgg_31 320000.00 root group by:tpch.supplier.s_name, funcs:count(1), firstrow(tpch.supplier.s_name)
└─Selection_32 3786715.90 root not(16_aux_0)
└─IndexJoin_38 4733394.87 root left outer semi join, inner:IndexLookUp_37, outer key:l1.l_orderkey, inner key:l3.l_orderkey, other cond:ne(l3.l_suppkey, l1.l_suppkey)
├─IndexJoin_82 4733394.87 root semi join, inner:IndexLookUp_81, outer key:l1.l_orderkey, inner key:l2.l_orderkey, other cond:ne(l2.l_suppkey, l1.l_suppkey)
│ ├─HashLeftJoin_88 5916743.59 root inner join, inner:TableReader_117, equal:[eq(tpch.supplier.s_nationkey, tpch.nation.n_nationkey)]
│ │ ├─HashLeftJoin_93 147918589.81 root inner join, inner:TableReader_114, equal:[eq(l1.l_suppkey, tpch.supplier.s_suppkey)]
│ │ │ ├─IndexJoin_100 147918589.81 root inner join, inner:IndexLookUp_99, outer key:tpch.orders.o_orderkey, inner key:l1.l_orderkey
│ │ │ │ ├─TableReader_109 36517371.00 root data:Selection_108
│ │ │ │ │ └─Selection_108 36517371.00 cop eq(tpch.orders.o_orderstatus, "F")
│ │ │ │ │ └─TableScan_107 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
│ │ │ │ └─IndexLookUp_99 240004648.80 root
│ │ │ │ ├─IndexScan_96 1.00 cop table:l1, index:L_ORDERKEY, L_LINENUMBER, range: decided by [tpch.orders.o_orderkey], keep order:false
│ │ │ │ └─Selection_98 240004648.80 cop gt(l1.l_receiptdate, l1.l_commitdate)
│ │ │ │ └─TableScan_97 1.00 cop table:lineitem, keep order:false
│ │ │ └─TableReader_114 500000.00 root data:TableScan_113
│ │ │ └─TableScan_113 500000.00 cop table:supplier, range:[-inf,+inf], keep order:false
│ │ └─TableReader_117 1.00 root data:Selection_116
│ │ └─Selection_116 1.00 cop eq(tpch.nation.n_name, "EGYPT")
│ │ └─TableScan_115 25.00 cop table:nation, range:[-inf,+inf], keep order:false
│ └─IndexLookUp_81 1.00 root
│ ├─IndexScan_79 1.00 cop table:l2, index:L_ORDERKEY, L_LINENUMBER, range: decided by [l1.l_orderkey], keep order:false
│ └─TableScan_80 1.00 cop table:lineitem, keep order:false
└─IndexLookUp_37 240004648.80 root
├─IndexScan_34 1.00 cop table:l3, index:L_ORDERKEY, L_LINENUMBER, range: decided by [l1.l_orderkey], keep order:false
└─Selection_36 240004648.80 cop gt(l3.l_receiptdate, l3.l_commitdate)
└─TableScan_35 1.00 cop table:lineitem, keep order:false
/*
Q22 Global Sales Opportunity Query
The Global Sales Opportunity Query identifies geographies where there are customers who may be likely to make a
Expand Down Expand Up @@ -1287,15 +1287,15 @@ cntrycode
order by
cntrycode;
id count task operator info
Sort_33 1.00 root custsale.cntrycode:asc
└─Projection_35 1.00 root custsale.cntrycode, 29_col_0, 29_col_1
└─HashAgg_38 1.00 root group by:custsale.cntrycode, funcs:count(1), sum(custsale.c_acctbal), firstrow(custsale.cntrycode)
└─Projection_39 0.00 root substring(tpch.customer.c_phone, 1, 2), tpch.customer.c_acctbal
└─Selection_40 0.00 root not(27_aux_0)
└─HashLeftJoin_41 0.00 root left outer semi join, inner:TableReader_47, equal:[eq(tpch.customer.c_custkey, tpch.orders.o_custkey)]
├─Selection_42 0.00 root in(substring(tpch.customer.c_phone, 1, 2), "20", "40", "22", "30", "39", "42", "21")
│ └─TableReader_45 0.00 root data:Selection_44
│ └─Selection_44 0.00 cop gt(tpch.customer.c_acctbal, NULL)
│ └─TableScan_43 7500000.00 cop table:customer, range:[-inf,+inf], keep order:false
└─TableReader_47 75000000.00 root data:TableScan_46
└─TableScan_46 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
Sort_32 1.00 root custsale.cntrycode:asc
└─Projection_34 1.00 root custsale.cntrycode, 28_col_0, 28_col_1
└─HashAgg_37 1.00 root group by:custsale.cntrycode, funcs:count(1), sum(custsale.c_acctbal), firstrow(custsale.cntrycode)
└─Projection_38 0.00 root substring(tpch.customer.c_phone, 1, 2), tpch.customer.c_acctbal
└─Selection_39 0.00 root not(26_aux_0)
└─HashLeftJoin_40 0.00 root left outer semi join, inner:TableReader_46, equal:[eq(tpch.customer.c_custkey, tpch.orders.o_custkey)]
├─Selection_41 0.00 root in(substring(tpch.customer.c_phone, 1, 2), "20", "40", "22", "30", "39", "42", "21")
│ └─TableReader_44 0.00 root data:Selection_43
│ └─Selection_43 0.00 cop gt(tpch.customer.c_acctbal, NULL)
│ └─TableScan_42 7500000.00 cop table:customer, range:[-inf,+inf], keep order:false
└─TableReader_46 75000000.00 root data:TableScan_45
└─TableScan_45 75000000.00 cop table:orders, range:[-inf,+inf], keep order:false
14 changes: 0 additions & 14 deletions executor/builder.go
Expand Up @@ -149,8 +149,6 @@ func (b *executorBuilder) build(p plan.Plan) Executor {
return b.buildTableDual(v)
case *plan.PhysicalApply:
return b.buildApply(v)
case *plan.PhysicalExists:
return b.buildExists(v)
case *plan.PhysicalMaxOneRow:
return b.buildMaxOneRow(v)
case *plan.Analyze:
Expand Down Expand Up @@ -1175,18 +1173,6 @@ func (b *executorBuilder) buildApply(apply *plan.PhysicalApply) *NestedLoopApply
return e
}

func (b *executorBuilder) buildExists(v *plan.PhysicalExists) Executor {
childExec := b.build(v.Children()[0])
if b.err != nil {
b.err = errors.Trace(b.err)
return nil
}
e := &ExistsExec{
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID(), childExec),
}
return e
}

func (b *executorBuilder) buildMaxOneRow(v *plan.PhysicalMaxOneRow) Executor {
childExec := b.build(v.Children()[0])
if b.err != nil {
Expand Down
43 changes: 0 additions & 43 deletions executor/executor.go
Expand Up @@ -40,7 +40,6 @@ import (

var (
_ Executor = &CheckTableExec{}
_ Executor = &ExistsExec{}
_ Executor = &HashAggExec{}
_ Executor = &LimitExec{}
_ Executor = &MaxOneRowExec{}
Expand Down Expand Up @@ -866,48 +865,6 @@ func (e *TableScanExec) Open(ctx context.Context) error {
return nil
}

// ExistsExec represents exists executor.
type ExistsExec struct {
baseExecutor

evaluated bool
childResult *chunk.Chunk
}

// Open implements the Executor Open interface.
func (e *ExistsExec) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return errors.Trace(err)
}
e.childResult = e.children[0].newChunk()
e.evaluated = false
return nil
}

// Next implements the Executor Next interface.
func (e *ExistsExec) Next(ctx context.Context, chk *chunk.Chunk) error {
chk.Reset()
if !e.evaluated {
e.evaluated = true
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
}
if e.childResult.NumRows() > 0 {
chk.AppendInt64(0, 1)
} else {
chk.AppendInt64(0, 0)
}
}
return nil
}

// Close implements the Executor Close interface.
func (e *ExistsExec) Close() error {
e.childResult = nil
return errors.Trace(e.baseExecutor.Close())
}

// MaxOneRowExec checks if the number of rows that a query returns is at maximum one.
// It's built from subquery expression.
type MaxOneRowExec struct {
Expand Down
9 changes: 0 additions & 9 deletions plan/exhaust_physical_plans.go
Expand Up @@ -889,15 +889,6 @@ func (ls *LogicalSort) exhaustPhysicalPlans(prop *requiredProp) []PhysicalPlan {
return nil
}

func (p *LogicalExists) exhaustPhysicalPlans(prop *requiredProp) []PhysicalPlan {
if !prop.isEmpty() {
return nil
}
exists := PhysicalExists{}.init(p.ctx, p.stats, &requiredProp{expectedCnt: 1})
exists.SetSchema(p.schema)
return []PhysicalPlan{exists}
}

func (p *LogicalMaxOneRow) exhaustPhysicalPlans(prop *requiredProp) []PhysicalPlan {
if !prop.isEmpty() {
return nil
Expand Down
35 changes: 30 additions & 5 deletions plan/expression_rewriter.go
Expand Up @@ -544,9 +544,9 @@ func (er *expressionRewriter) handleExistSubquery(v *ast.ExistsSubqueryExpr) (as
er.err = errors.Trace(err)
return v, true
}
np = er.b.buildExists(np)
np = er.popExistsSubPlan(np)
if len(np.extractCorrelatedCols()) > 0 {
er.p, er.err = er.b.buildSemiApply(er.p, np.Children()[0], nil, er.asScalar, false)
er.p, er.err = er.b.buildSemiApply(er.p, np, nil, er.asScalar, false)
if er.err != nil || !er.asScalar {
return v, true
}
Expand All @@ -562,13 +562,38 @@ func (er *expressionRewriter) handleExistSubquery(v *ast.ExistsSubqueryExpr) (as
er.err = errors.Trace(err)
return v, true
}
er.ctxStack = append(er.ctxStack, &expression.Constant{
Value: rows[0][0],
RetType: types.NewFieldType(mysql.TypeTiny)})
if len(rows) > 0 {
er.ctxStack = append(er.ctxStack, expression.One.Clone())
} else {
er.ctxStack = append(er.ctxStack, expression.Zero.Clone())
}
}
return v, true
}

// popExistsSubPlan will remove the useless plan in exist's child.
// See comments inside the method for more details.
func (er *expressionRewriter) popExistsSubPlan(p LogicalPlan) LogicalPlan {
out:
for {
switch plan := p.(type) {
// This can be removed when in exists clause,
// e.g. exists(select count(*) from t order by a) is equal to exists t.
case *LogicalProjection, *LogicalSort:
p = p.Children()[0]
case *LogicalAggregation:
if len(plan.GroupByItems) == 0 {
p = LogicalTableDual{RowCount: 1}.init(er.ctx)
break out
}
p = p.Children()[0]
default:
break out
}
}
return p
}

func (er *expressionRewriter) handleInSubquery(v *ast.PatternInExpr) (ast.Node, bool) {
asScalar := er.asScalar
er.asScalar = true
Expand Down
12 changes: 0 additions & 12 deletions plan/initialize.go
Expand Up @@ -194,18 +194,6 @@ func (p PhysicalTableDual) init(ctx sessionctx.Context, stats *statsInfo) *Physi
return &p
}

func (p LogicalExists) init(ctx sessionctx.Context) *LogicalExists {
p.baseLogicalPlan = newBaseLogicalPlan(ctx, TypeExists, &p)
return &p
}

func (p PhysicalExists) init(ctx sessionctx.Context, stats *statsInfo, props ...*requiredProp) *PhysicalExists {
p.basePhysicalPlan = newBasePhysicalPlan(ctx, TypeExists, &p)
p.childrenReqProps = props
p.stats = stats
return &p
}

func (p LogicalMaxOneRow) init(ctx sessionctx.Context) *LogicalMaxOneRow {
p.baseLogicalPlan = newBaseLogicalPlan(ctx, TypeMaxOneRow, &p)
return &p
Expand Down
29 changes: 0 additions & 29 deletions plan/logical_plan_builder.go
Expand Up @@ -1917,35 +1917,6 @@ func (b *planBuilder) buildSemiApply(outerPlan, innerPlan LogicalPlan, condition
return ap, nil
}

func (b *planBuilder) buildExists(p LogicalPlan) LogicalPlan {
out:
for {
switch plan := p.(type) {
// This can be removed when in exists clause,
// e.g. exists(select count(*) from t order by a) is equal to exists t.
case *LogicalProjection, *LogicalSort:
p = p.Children()[0]
case *LogicalAggregation:
if len(plan.GroupByItems) == 0 {
p = b.buildTableDual()
break out
}
p = p.Children()[0]
default:
break out
}
}
exists := LogicalExists{}.init(b.ctx)
exists.SetChildren(p)
newCol := &expression.Column{
RetType: types.NewFieldType(mysql.TypeTiny),
ColName: model.NewCIStr("exists_col"),
UniqueID: b.ctx.GetSessionVars().AllocPlanColumnID(),
}
exists.SetSchema(expression.NewSchema(newCol))
return exists
}

func (b *planBuilder) buildMaxOneRow(p LogicalPlan) LogicalPlan {
maxOneRow := LogicalMaxOneRow{}.init(b.ctx)
maxOneRow.SetChildren(p)
Expand Down

0 comments on commit be2c276

Please sign in to comment.