Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

plan, executor: no longer treat dml as logical/physical plan. #5230

Merged
merged 9 commits into from Nov 30, 2017
12 changes: 5 additions & 7 deletions executor/builder.go
Expand Up @@ -313,9 +313,7 @@ func (b *executorBuilder) buildInsert(v *plan.Insert) Executor {
GenExprs: v.GenCols.Exprs,
needFillDefaultValues: v.NeedFillDefaultValue,
}
if len(v.Children()) > 0 {
ivs.SelectExec = b.build(v.Children()[0])
}
ivs.SelectExec = b.build(v.SelectPlan)
ivs.Table = v.Table
if v.IsReplace {
return b.buildReplace(ivs)
Expand Down Expand Up @@ -778,12 +776,12 @@ func (b *executorBuilder) buildUnion(v *plan.Union) Executor {

func (b *executorBuilder) buildUpdate(v *plan.Update) Executor {
tblID2table := make(map[int64]table.Table)
for id := range v.Schema().TblID2Handle {
for id := range v.SelectPlan.Schema().TblID2Handle {
tblID2table[id], _ = b.is.TableByID(id)
}
return &UpdateExec{
baseExecutor: newBaseExecutor(nil, b.ctx),
SelectExec: b.build(v.Children()[0]),
SelectExec: b.build(v.SelectPlan),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

after building, we should check b.err:

selectExec := b.build(v.SelectPlan)
if b.err != nil {
    b.err = errors.Trace(b.err)
    return nil
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if b.err is not nil, it's ok to return any executor.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to discover error as soon as possible so that we can avoid useless executor building.

OrderedList: v.OrderedList,
tblID2table: tblID2table,
IgnoreErr: v.IgnoreErr,
Expand All @@ -792,12 +790,12 @@ func (b *executorBuilder) buildUpdate(v *plan.Update) Executor {

func (b *executorBuilder) buildDelete(v *plan.Delete) Executor {
tblID2table := make(map[int64]table.Table)
for id := range v.Schema().TblID2Handle {
for id := range v.SelectPlan.Schema().TblID2Handle {
tblID2table[id], _ = b.is.TableByID(id)
}
return &DeleteExec{
baseExecutor: newBaseExecutor(nil, b.ctx),
SelectExec: b.build(v.Children()[0]),
SelectExec: b.build(v.SelectPlan),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Tables: v.Tables,
IsMultiTable: v.IsMultiTable,
tblID2Table: tblID2table,
Expand Down
8 changes: 8 additions & 0 deletions executor/metrics.go
Expand Up @@ -256,6 +256,14 @@ func (pa *stmtAttributes) fromPlan(p plan.Plan) {
pa.setIsSystemTable(x.DBName)
case *plan.PhysicalHashSemiJoin:
pa.hasJoin = true
case *plan.Insert:
if x.SelectPlan != nil {
pa.fromPlan(x.SelectPlan)
}
case *plan.Delete:
pa.fromPlan(x.SelectPlan)
case *plan.Update:
pa.fromPlan(x.SelectPlan)
case *plan.PhysicalTableReader:
for _, child := range x.TablePlans {
pa.fromPlan(child)
Expand Down
19 changes: 0 additions & 19 deletions plan/column_pruning.go
Expand Up @@ -198,15 +198,6 @@ func (p *Exists) PruneColumns(parentUsedCols []*expression.Column) {
p.children[0].(LogicalPlan).PruneColumns(nil)
}

// PruneColumns implements LogicalPlan interface.
func (p *Insert) PruneColumns(_ []*expression.Column) {
if len(p.Children()) == 0 {
return
}
child := p.children[0].(LogicalPlan)
child.PruneColumns(child.Schema().Columns)
}

func (p *LogicalJoin) extractUsedCols(parentUsedCols []*expression.Column) (leftCols []*expression.Column, rightCols []*expression.Column) {
for _, eqCond := range p.EqualConditions {
parentUsedCols = append(parentUsedCols, expression.ExtractColumns(eqCond)...)
Expand Down Expand Up @@ -288,13 +279,3 @@ func (p *SelectLock) PruneColumns(parentUsedCols []*expression.Column) {
p.children[0].(LogicalPlan).PruneColumns(parentUsedCols)
}
}

// PruneColumns implements LogicalPlan interface.
func (p *Update) PruneColumns(parentUsedCols []*expression.Column) {
p.baseLogicalPlan.PruneColumns(p.children[0].Schema().Columns)
}

// PruneColumns implements LogicalPlan interface.
func (p *Delete) PruneColumns(parentUsedCols []*expression.Column) {
p.baseLogicalPlan.PruneColumns(p.children[0].Schema().Columns)
}
12 changes: 6 additions & 6 deletions plan/dag_plan_test.go
Expand Up @@ -519,33 +519,33 @@ func (s *testPlanSuite) TestDAGPlanBuilderBasePhysicalPlan(c *C) {
// Test complex update.
{
sql: "update t set a = 5 where b < 1 order by d limit 1",
best: "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->*plan.Update",
best: "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->Update",
},
// Test simple update.
{
sql: "update t set a = 5",
best: "TableReader(Table(t))->*plan.Update",
best: "TableReader(Table(t))->Update",
},
// TODO: Test delete/update with join.
// Test complex delete.
{
sql: "delete from t where b < 1 order by d limit 1",
best: "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->*plan.Delete",
best: "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->Delete",
},
// Test simple delete.
{
sql: "delete from t",
best: "TableReader(Table(t))->*plan.Delete",
best: "TableReader(Table(t))->Delete",
},
// Test complex insert.
{
sql: "insert into t select * from t where b < 1 order by d limit 1",
best: "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->*plan.Insert",
best: "TableReader(Table(t)->Sel([lt(test.t.b, 1)])->TopN([test.t.d],0,1))->TopN([test.t.d],0,1)->Insert",
},
// Test simple insert.
{
sql: "insert into t values(0,0,0,0,0,0,0)",
best: "*plan.Insert",
best: "Insert",
},
// Test dual.
{
Expand Down
2 changes: 1 addition & 1 deletion plan/eliminate_projection.go
Expand Up @@ -129,7 +129,7 @@ func (pe *projectionEliminater) eliminate(p LogicalPlan, replace map[string]*exp
setParentAndChildren(p, children...)

switch p.(type) {
case *Sort, *TopN, *Limit, *Selection, *MaxOneRow, *Update, *SelectLock:
case *Sort, *TopN, *Limit, *Selection, *MaxOneRow, *SelectLock:
p.SetSchema(p.Children()[0].Schema())
case *LogicalJoin, *LogicalApply:
var joinTp JoinType
Expand Down
12 changes: 3 additions & 9 deletions plan/initialize.go
Expand Up @@ -179,23 +179,17 @@ func (p MaxOneRow) init(ctx context.Context) *MaxOneRow {
}

func (p Update) init(ctx context.Context) *Update {
p.basePlan = newBasePlan(TypeUpate, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
p.basePlan = *newBasePlan(TypeUpate, ctx, &p)
return &p
}

func (p Delete) init(ctx context.Context) *Delete {
p.basePlan = newBasePlan(TypeDelete, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
p.basePlan = *newBasePlan(TypeDelete, ctx, &p)
return &p
}

func (p Insert) init(ctx context.Context) *Insert {
p.basePlan = newBasePlan(TypeInsert, ctx, &p)
p.baseLogicalPlan = newBaseLogicalPlan(p.basePlan)
p.basePhysicalPlan = newBasePhysicalPlan(p.basePlan)
p.basePlan = *newBasePlan(TypeInsert, ctx, &p)
return &p
}

Expand Down
11 changes: 7 additions & 4 deletions plan/logical_plan_builder.go
Expand Up @@ -1781,7 +1781,7 @@ func (b *planBuilder) buildSemiJoin(outerPlan, innerPlan LogicalPlan, onConditio
return joinPlan
}

func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) LogicalPlan {
func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) Plan {
b.inUpdateStmt = true
sel := &ast.SelectStmt{Fields: &ast.FieldList{}, From: update.TableRefs, Where: update.Where, OrderBy: update.Order, Limit: update.Limit}
p := b.buildResultSetNode(sel.From.TableRefs)
Expand Down Expand Up @@ -1827,8 +1827,8 @@ func (b *planBuilder) buildUpdate(update *ast.UpdateStmt) LogicalPlan {
OrderedList: orderedList,
IgnoreErr: update.IgnoreErr,
}.init(b.ctx)
setParentAndChildren(updt, p)
updt.SetSchema(p.Schema())
updt.SelectPlan, b.err = doOptimize(b.optFlag, p, b.ctx)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check b.err

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If b.err is not nil, we can also return. It's no need to check this.

return updt
}

Expand Down Expand Up @@ -1943,7 +1943,7 @@ func extractTableAsNameForUpdate(p Plan, asNames map[*model.TableInfo][]*model.C
}
}

func (b *planBuilder) buildDelete(delete *ast.DeleteStmt) LogicalPlan {
func (b *planBuilder) buildDelete(delete *ast.DeleteStmt) Plan {
sel := &ast.SelectStmt{Fields: &ast.FieldList{}, From: delete.TableRefs, Where: delete.Where, OrderBy: delete.Order, Limit: delete.Limit}
p := b.buildResultSetNode(sel.From.TableRefs)
if b.err != nil {
Expand Down Expand Up @@ -1978,7 +1978,10 @@ func (b *planBuilder) buildDelete(delete *ast.DeleteStmt) LogicalPlan {
Tables: tables,
IsMultiTable: delete.IsMultiTable,
}.init(b.ctx)
setParentAndChildren(del, p)
del.SelectPlan, b.err = doOptimize(b.optFlag, p, b.ctx)
if b.err != nil {
return nil
}
del.SetSchema(expression.NewSchema())

var tableList []*ast.TableName
Expand Down
6 changes: 3 additions & 3 deletions plan/logical_plan_test.go
Expand Up @@ -612,19 +612,19 @@ func (s *testPlanSuite) TestPlanBuilder(c *C) {
},
{
sql: "update t set t.a = t.a * 1.5 where t.a >= 1000 order by t.a desc limit 10",
plan: "DataScan(t)->Sel([ge(test.t.a, 1000)])->Sort->Limit->*plan.Update",
plan: "TableReader(Table(t)->Limit)->Limit->Update",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why DataScan is replaced with TableReader?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We build update's SelectPlan at one time.

},
{
sql: "delete from t where t.a >= 1000 order by t.a desc limit 10",
plan: "DataScan(t)->Sel([ge(test.t.a, 1000)])->Sort->Limit->*plan.Delete",
plan: "TableReader(Table(t)->Limit)->Limit->Delete",
},
{
sql: "explain select * from t union all select * from t limit 1, 1",
plan: "*plan.Explain",
},
{
sql: "insert into t select * from t",
plan: "DataScan(t)->Projection->*plan.Insert",
plan: "TableReader(Table(t))->Insert",
},
{
sql: "show columns from t where `Key` = 'pri' like 't*'",
Expand Down
23 changes: 0 additions & 23 deletions plan/logical_plans.go
Expand Up @@ -35,12 +35,9 @@ var (
_ LogicalPlan = &DataSource{}
_ LogicalPlan = &Union{}
_ LogicalPlan = &Sort{}
_ LogicalPlan = &Update{}
_ LogicalPlan = &Delete{}
_ LogicalPlan = &SelectLock{}
_ LogicalPlan = &Limit{}
_ LogicalPlan = &Show{}
_ LogicalPlan = &Insert{}
)

// JoinType contains CrossJoin, InnerJoin, LeftOuterJoin, RightOuterJoin, FullOuterJoin, SemiJoin.
Expand Down Expand Up @@ -377,23 +374,3 @@ type Limit struct {

expectedProp *requiredProp
}

// Update represents Update plan.
type Update struct {
*basePlan
baseLogicalPlan
basePhysicalPlan

OrderedList []*expression.Assignment
IgnoreErr bool
}

// Delete represents a delete plan.
type Delete struct {
*basePlan
baseLogicalPlan
basePhysicalPlan

Tables []*ast.TableName
IsMultiTable bool
}
1 change: 1 addition & 0 deletions plan/optimizer.go
Expand Up @@ -79,6 +79,7 @@ func Optimize(ctx context.Context, node ast.Node, is infoschema.InfoSchema) (Pla
if logic, ok := p.(LogicalPlan); ok {
return doOptimize(builder.optFlag, logic, ctx)
}
p.ResolveIndices()
if execPlan, ok := p.(*Execute); ok {
err := execPlan.optimizePreparedPlan(ctx, is)
return p, errors.Trace(err)
Expand Down
30 changes: 0 additions & 30 deletions plan/physical_plans.go
Expand Up @@ -31,12 +31,9 @@ var (
_ PhysicalPlan = &TableDual{}
_ PhysicalPlan = &Union{}
_ PhysicalPlan = &Sort{}
_ PhysicalPlan = &Update{}
_ PhysicalPlan = &Delete{}
_ PhysicalPlan = &SelectLock{}
_ PhysicalPlan = &Limit{}
_ PhysicalPlan = &Show{}
_ PhysicalPlan = &Insert{}
_ PhysicalPlan = &PhysicalIndexScan{}
_ PhysicalPlan = &PhysicalTableScan{}
_ PhysicalPlan = &PhysicalTableReader{}
Expand Down Expand Up @@ -423,15 +420,6 @@ func (p *MaxOneRow) Copy() PhysicalPlan {
return &np
}

// Copy implements the PhysicalPlan Copy interface.
func (p *Insert) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}

// Copy implements the PhysicalPlan Copy interface.
func (p *Limit) Copy() PhysicalPlan {
np := *p
Expand Down Expand Up @@ -494,24 +482,6 @@ func (p *PhysicalAggregation) Copy() PhysicalPlan {
return &np
}

// Copy implements the PhysicalPlan Copy interface.
func (p *Update) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}

// Copy implements the PhysicalPlan Copy interface.
func (p *Delete) Copy() PhysicalPlan {
np := *p
np.basePlan = p.basePlan.copy()
np.baseLogicalPlan = newBaseLogicalPlan(np.basePlan)
np.basePhysicalPlan = newBasePhysicalPlan(np.basePlan)
return &np
}

// Copy implements the PhysicalPlan Copy interface.
func (p *Show) Copy() PhysicalPlan {
np := *p
Expand Down
6 changes: 3 additions & 3 deletions plan/plan.go
Expand Up @@ -52,6 +52,9 @@ type Plan interface {
// findColumn finds the column in basePlan's schema.
// If the column is not in the schema, returns error.
findColumn(*ast.ColumnName) (*expression.Column, int, error)

// ResolveIndices resolves the indices for columns. After doing this, the columns can evaluate the rows by their indices.
ResolveIndices()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why move this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Insert/Delete/Update also needs resolveIndices

}

// taskType is the type of execution task.
Expand Down Expand Up @@ -197,9 +200,6 @@ type PhysicalPlan interface {

// statsProfile will return the stats for this plan.
statsProfile() *statsProfile

// ResolveIndices resolves the indices for columns. After doing this, the columns can evaluate the rows by their indices.
ResolveIndices()
}

type baseLogicalPlan struct {
Expand Down
5 changes: 4 additions & 1 deletion plan/planbuilder.go
Expand Up @@ -920,7 +920,10 @@ func (b *planBuilder) buildInsert(insert *ast.InsertStmt) Plan {
return nil
}
}
setParentAndChildren(insertPlan, selectPlan)
insertPlan.SelectPlan, b.err = doOptimize(b.optFlag, selectPlan.(LogicalPlan), b.ctx)
if b.err != nil {
return nil
}
}

// Calculate generated columns.
Expand Down
26 changes: 23 additions & 3 deletions plan/plans.go
Expand Up @@ -205,9 +205,7 @@ type InsertGeneratedColumns struct {

// Insert represents an insert plan.
type Insert struct {
*basePlan
baseLogicalPlan
basePhysicalPlan
basePlan

Table table.Table
tableSchema *expression.Schema
Expand All @@ -224,6 +222,28 @@ type Insert struct {
NeedFillDefaultValue bool

GenCols InsertGeneratedColumns

SelectPlan PhysicalPlan
}

// Update represents Update plan.
type Update struct {
basePlan

OrderedList []*expression.Assignment
IgnoreErr bool

SelectPlan PhysicalPlan
}

// Delete represents a delete plan.
type Delete struct {
basePlan

Tables []*ast.TableName
IsMultiTable bool

SelectPlan PhysicalPlan
}

// AnalyzeColumnsTask is used for analyze columns.
Expand Down