Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

planner: add aggregation hints TIDB_HASHAGG and TIDB_STREAMAGG #11364

Merged
merged 33 commits into from Aug 7, 2019
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
e87fad5
add aggregation hints
foreyes Jul 19, 2019
0184392
try to fix ci
foreyes Jul 22, 2019
1f0e8c4
add tests
foreyes Jul 23, 2019
9a1a791
add enforce flag to stream agg hint & fix test
foreyes Jul 24, 2019
61bf4f9
make a small change
foreyes Jul 24, 2019
614c48a
fix misspelling
foreyes Jul 24, 2019
6f76567
change error to warning & code clear
foreyes Jul 24, 2019
439cfdb
improve logic
foreyes Jul 24, 2019
6ed0a73
add test & fix logic
foreyes Jul 24, 2019
02e66a2
temp
foreyes Jul 25, 2019
0030309
temp
foreyes Jul 26, 2019
8781ad9
fix Hint Scope & make enforced stream aggregation
foreyes Jul 26, 2019
9d7b535
update parser & fix ci
foreyes Jul 26, 2019
f31b208
resolve conflict
foreyes Jul 26, 2019
d885b69
resolve conflict
foreyes Jul 26, 2019
e68192c
resolve conflict
foreyes Jul 30, 2019
71e19c7
fix ci
foreyes Jul 30, 2019
7213db8
fix ci
foreyes Jul 30, 2019
e52d87c
fix ci
foreyes Jul 30, 2019
c90dc26
Merge branch 'master' into dev/add_agg_hints
foreyes Jul 30, 2019
24f8229
small change
foreyes Aug 2, 2019
62b041b
Merge branch 'master' into dev/add_agg_hints
foreyes Aug 2, 2019
3226dc9
Merge remote-tracking branch 'upstream/master' into dev/add_agg_hints
foreyes Aug 5, 2019
a7829f6
change enforce logic
foreyes Aug 5, 2019
885ca2d
resolve comments
foreyes Aug 5, 2019
0b1acff
Merge remote-tracking branch 'upstream/master' into dev/add_agg_hints
foreyes Aug 5, 2019
7dd61f9
resolve comments
foreyes Aug 5, 2019
d2246e5
avoid test leak
foreyes Aug 6, 2019
0a122d6
add tests & fix bug
foreyes Aug 6, 2019
8c49116
fix explain test
foreyes Aug 6, 2019
9d4c316
fix test
foreyes Aug 6, 2019
2609f8b
Merge branch 'master' into dev/add_agg_hints
eurekaka Aug 6, 2019
081b602
Merge branch 'master' into dev/add_agg_hints
foreyes Aug 7, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 7 additions & 7 deletions executor/join_test.go
Expand Up @@ -143,13 +143,13 @@ func (s *testSuite2) TestJoin(c *C) {
tk.MustQuery("select /*+ TIDB_INLJ(t1) */ * from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1 1 1 2", "1 1 1 3", "1 1 1 4", "3 3 3 4", "<nil> <nil> 4 5"))
tk.MustQuery("select /*+ TIDB_INLJ(t) */ avg(t.b) from t right outer join t1 on t.a=t1.a").Check(testkit.Rows("1.5000"))

// Test that two conflict hints will return error.
err := tk.ExecToErr("select /*+ TIDB_INLJ(t) TIDB_SMJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(err, NotNil)
err = tk.ExecToErr("select /*+ TIDB_INLJ(t) TIDB_HJ(t) */ from t join t1 on t.a=t1.a")
c.Assert(err, NotNil)
err = tk.ExecToErr("select /*+ TIDB_SMJ(t) TIDB_HJ(t) */ from t join t1 on t.a=t1.a")
c.Assert(err, NotNil)
// Test that two conflict hints will return warning.
tk.MustExec("select /*+ TIDB_INLJ(t) TIDB_SMJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
tk.MustExec("select /*+ TIDB_INLJ(t) TIDB_HJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)
tk.MustExec("select /*+ TIDB_SMJ(t) TIDB_HJ(t) */ * from t join t1 on t.a=t1.a")
c.Assert(tk.Se.GetSessionVars().StmtCtx.GetWarnings(), HasLen, 1)

tk.MustExec("drop table if exists t")
tk.MustExec("create table t(a int)")
Expand Down
65 changes: 60 additions & 5 deletions planner/core/exhaust_physical_plans.go
Expand Up @@ -1198,11 +1198,36 @@ func (p *baseLogicalPlan) exhaustPhysicalPlans(_ *property.PhysicalProperty) []P
panic("baseLogicalPlan.exhaustPhysicalPlans() should never be called.")
}

func (la *LogicalAggregation) getEnforcedStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
_, desc := prop.AllSameOrder()
enforcedAggs := make([]PhysicalPlan, 0, len(wholeTaskTypes))
childProp := &property.PhysicalProperty{
ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt),
Enforced: true,
foreyes marked this conversation as resolved.
Show resolved Hide resolved
Items: property.ItemsFromCols(la.groupByCols, desc),
}

for _, taskTp := range wholeTaskTypes {
copiedChildProperty := new(property.PhysicalProperty)
*copiedChildProperty = *childProp // It's ok to not deep copy the "cols" field.
foreyes marked this conversation as resolved.
Show resolved Hide resolved
copiedChildProperty.TaskTp = taskTp

agg := basePhysicalAgg{
GroupByItems: la.GroupByItems,
AggFuncs: la.AggFuncs,
}.initForStream(la.ctx, la.stats.ScaleByExpectCnt(prop.ExpectedCnt), copiedChildProperty)
agg.SetSchema(la.schema.Clone())
enforcedAggs = append(enforcedAggs, agg)
}
return enforcedAggs
}

func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []PhysicalPlan {
all, desc := prop.AllSameOrder()
if len(la.possibleProperties) == 0 || !all {
if !all {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If !preferStream && len(la.possibleProperties) == 0 || !all ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not resolved?

return nil
}

for _, aggFunc := range la.AggFuncs {
if aggFunc.Mode == aggregation.FinalMode {
return nil
Expand All @@ -1213,7 +1238,7 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
return nil
}

streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(wholeTaskTypes)-1))
streamAggs := make([]PhysicalPlan, 0, len(la.possibleProperties)*(len(wholeTaskTypes)-1)+len(wholeTaskTypes))
childProp := &property.PhysicalProperty{
ExpectedCnt: math.Max(prop.ExpectedCnt*la.inputCount/la.stats.RowCount, prop.ExpectedCnt),
}
Expand Down Expand Up @@ -1244,6 +1269,10 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P
streamAggs = append(streamAggs, agg)
}
}
preferStream := (la.preferAggType & preferStreamAgg) > 0
if preferStream {
streamAggs = append(streamAggs, la.getEnforcedStreamAggs(prop)...)
}
return streamAggs
}

Expand All @@ -1264,9 +1293,35 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy
}

func (la *LogicalAggregation) exhaustPhysicalPlans(prop *property.PhysicalProperty) []PhysicalPlan {
aggs := make([]PhysicalPlan, 0, len(la.possibleProperties)+1)
aggs = append(aggs, la.getHashAggs(prop)...)
aggs = append(aggs, la.getStreamAggs(prop)...)
preferHash := (la.preferAggType & preferHashAgg) > 0
preferStream := (la.preferAggType & preferStreamAgg) > 0
if preferHash && preferStream {
errMsg := "Optimizer aggregation hints are conflicted"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about extract a common error message pattern for conflicted optimizer hints. For example:

errMsgPattern := "Conflicted SQL Hints for %s. You can only specify one of these hints: %s"
errConflictedSQLHints := terror.ClassOptimizer.New(codeInternal, errMsgPattern)

The above error can be used this way:

errConflictedSQLHints.GenWithStackByArgs(
	"Aggregate",
	strings.Join([]string{"TIDB_HASHAGG()", "TIDB_STREAMAGG()"}, ","),
)

Example usage for Join:

errConflictedSQLHints.GenWithStackByArgs(
	"Aggregate",
	strings.Join([]string{"TIDB_HJ()", "TIDB_SMJ()"}, ","),
)

You can improve the above error message by replacing TIDB_HASHAGG() with the string extracted from the AST node, which is written by the user.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do this later, in another PR.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you file an issue for this?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay

warning := ErrInternal.GenWithStack(errMsg)
la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
la.preferAggType = 0
preferHash, preferStream = false, false
}

hashAggs := la.getHashAggs(prop)
if hashAggs != nil && preferHash {
return hashAggs
}

streamAggs := la.getStreamAggs(prop)
if streamAggs != nil && preferStream {
return streamAggs
}

if streamAggs == nil && preferStream {
foreyes marked this conversation as resolved.
Show resolved Hide resolved
errMsg := "Optimizer Hint TIDB_STREAMAGG is inapplicable"
warning := ErrInternal.GenWithStack(errMsg)
la.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
}

aggs := make([]PhysicalPlan, 0, len(hashAggs)+len(streamAggs))
aggs = append(aggs, hashAggs...)
aggs = append(aggs, streamAggs...)
return aggs
}

Expand Down
5 changes: 1 addition & 4 deletions planner/core/expression_rewriter.go
Expand Up @@ -756,10 +756,7 @@ func (er *expressionRewriter) handleInSubquery(ctx context.Context, v *ast.Patte
join.attachOnConds(expression.SplitCNFItems(checkCondition))
// Set join hint for this join.
if er.b.TableHints() != nil {
er.err = join.setPreferredJoinType(er.b.TableHints())
if er.err != nil {
return v, true
}
join.setPreferredJoinType(er.b.TableHints())
}
er.p = join
} else {
Expand Down
65 changes: 35 additions & 30 deletions planner/core/logical_plan_builder.go
Expand Up @@ -54,6 +54,10 @@ const (
TiDBIndexNestedLoopJoin = "tidb_inlj"
// TiDBHashJoin is hint enforce hash join.
TiDBHashJoin = "tidb_hj"
// TiDBHashAgg is hint enforce hash aggregation.
TiDBHashAgg = "tidb_hashagg"
// TiDBStreamAgg is hint enforce stream aggregation.
TiDBStreamAgg = "tidb_streamagg"
)

const (
Expand Down Expand Up @@ -137,6 +141,9 @@ func (b *PlanBuilder) buildAggregation(ctx context.Context, p LogicalPlan, aggFu
plan4Agg.GroupByItems = gbyItems
plan4Agg.SetSchema(schema4Agg)
plan4Agg.collectGroupByColumns()
if hint := b.TableHints(); hint != nil {
plan4Agg.preferAggType = hint.preferAggType
}
return plan4Agg, aggIndexMap, nil
}

Expand Down Expand Up @@ -327,9 +334,9 @@ func extractTableAlias(p LogicalPlan) *model.CIStr {
return nil
}

func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) error {
func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) {
if hintInfo == nil {
return nil
return
}

lhsAlias := extractTableAlias(p.children[0])
Expand All @@ -355,9 +362,10 @@ func (p *LogicalJoin) setPreferredJoinType(hintInfo *tableHintInfo) error {
// If there're multiple join types and one of them is not index join hint,
// then there is a conflict of join types.
if bits.OnesCount(p.preferJoinType) > 1 && (p.preferJoinType^preferRightAsIndexInner^preferLeftAsIndexInner) > 0 {
return errors.New("Join hints are conflict, you can only specify one type of join")
errMsg := "Join hints are conflict, you can only specify one type of join"
warning := ErrInternal.GenWithStack(errMsg)
p.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
foreyes marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
}

func resetNotNullFlag(schema *expression.Schema, start, end int) {
Expand Down Expand Up @@ -424,10 +432,7 @@ func (b *PlanBuilder) buildJoin(ctx context.Context, joinNode *ast.Join) (Logica
joinPlan.redundantSchema = expression.MergeSchema(lRedundant, rRedundant)

// Set preferred join algorithm if some join hints is specified by user.
err = joinPlan.setPreferredJoinType(b.TableHints())
if err != nil {
return nil, err
}
joinPlan.setPreferredJoinType(b.TableHints())

// "NATURAL JOIN" doesn't have "ON" or "USING" conditions.
//
Expand Down Expand Up @@ -1931,8 +1936,9 @@ func (b *PlanBuilder) unfoldWildStar(p LogicalPlan, selectFields []*ast.SelectFi
return resultList, nil
}

func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint) bool {
func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint) {
var sortMergeTables, INLJTables, hashJoinTables []hintTableInfo
var preferAggType uint
for _, hint := range hints {
switch hint.HintName.L {
case TiDBMergeJoin:
Expand All @@ -1941,19 +1947,20 @@ func (b *PlanBuilder) pushTableHints(hints []*ast.TableOptimizerHint) bool {
INLJTables = tableNames2HintTableInfo(hint.Tables)
case TiDBHashJoin:
hashJoinTables = tableNames2HintTableInfo(hint.Tables)
case TiDBHashAgg:
preferAggType |= preferHashAgg
case TiDBStreamAgg:
preferAggType |= preferStreamAgg
default:
// ignore hints that not implemented
}
}
if len(sortMergeTables)+len(INLJTables)+len(hashJoinTables) > 0 {
b.tableHintInfo = append(b.tableHintInfo, tableHintInfo{
sortMergeJoinTables: sortMergeTables,
indexNestedLoopJoinTables: INLJTables,
hashJoinTables: hashJoinTables,
})
return true
}
return false
b.tableHintInfo = append(b.tableHintInfo, tableHintInfo{
sortMergeJoinTables: sortMergeTables,
indexNestedLoopJoinTables: INLJTables,
hashJoinTables: hashJoinTables,
preferAggType: preferAggType,
})
}

func (b *PlanBuilder) popTableHints() {
Expand Down Expand Up @@ -1983,10 +1990,10 @@ func (b *PlanBuilder) TableHints() *tableHintInfo {
}

func (b *PlanBuilder) buildSelect(ctx context.Context, sel *ast.SelectStmt) (p LogicalPlan, err error) {
if b.pushTableHints(sel.TableHints) {
// table hints are only visible in the current SELECT statement.
defer b.popTableHints()
}
b.pushTableHints(sel.TableHints)
// table hints are only visible in the current SELECT statement.
defer b.popTableHints()

if sel.SelectStmtOpts != nil {
origin := b.inStraightJoin
b.inStraightJoin = sel.SelectStmtOpts.StraightJoin
Expand Down Expand Up @@ -2602,10 +2609,9 @@ func buildColumns2Handle(
}

func (b *PlanBuilder) buildUpdate(ctx context.Context, update *ast.UpdateStmt) (Plan, error) {
if b.pushTableHints(update.TableHints) {
// table hints are only visible in the current UPDATE statement.
defer b.popTableHints()
}
b.pushTableHints(update.TableHints)
// table hints are only visible in the current UPDATE statement.
defer b.popTableHints()

// update subquery table should be forbidden
var asNameList []string
Expand Down Expand Up @@ -2823,10 +2829,9 @@ func extractTableAsNameForUpdate(p LogicalPlan, asNames map[*model.TableInfo][]*
}

func (b *PlanBuilder) buildDelete(ctx context.Context, delete *ast.DeleteStmt) (Plan, error) {
if b.pushTableHints(delete.TableHints) {
// table hints are only visible in the current DELETE statement.
defer b.popTableHints()
}
b.pushTableHints(delete.TableHints)
// table hints are only visible in the current DELETE statement.
defer b.popTableHints()

p, err := b.buildResultSetNode(ctx, delete.TableRefs.TableRefs)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions planner/core/logical_plans.go
Expand Up @@ -97,6 +97,8 @@ const (
preferRightAsIndexInner
preferHashJoin
preferMergeJoin
preferHashAgg
preferStreamAgg
)

// LogicalJoin is the logical join plan.
Expand Down Expand Up @@ -246,6 +248,9 @@ type LogicalAggregation struct {
// groupByCols stores the columns that are group-by items.
groupByCols []*expression.Column

// preferAggType stores preferred aggregation algorithm type.
preferAggType uint

possibleProperties [][]*expression.Column
inputCount float64 // inputCount is the input count of this plan.
}
Expand Down
70 changes: 69 additions & 1 deletion planner/core/physical_plan_test.go
Expand Up @@ -1521,7 +1521,7 @@ func (s *testPlanSuite) TestUnmatchedTableInHint(c *C) {
}
}

func (s *testPlanSuite) TestIndexJoinHint(c *C) {
func (s *testPlanSuite) TestJoinHints(c *C) {
defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
Expand Down Expand Up @@ -1570,3 +1570,71 @@ func (s *testPlanSuite) TestIndexJoinHint(c *C) {
}
}
}

func (s *testPlanSuite) TestAggregationHints(c *C) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a test case which contains subquery?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the first one, because possibleChildProperties are only possible... We'd better not rely too much on it, I handle this in line 1272 - 1275, you can take a look.

For the test case, I will add them soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When adding test case, find another bug. Fix it soon...

defer testleak.AfterTest(c)()
store, dom, err := newStoreWithBootstrap()
c.Assert(err, IsNil)
defer func() {
dom.Close()
store.Close()
}()
se, err := session.CreateSession4Test(store)
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "use test")
c.Assert(err, IsNil)

tests := []struct {
sql string
best string
warning string
}{
// without Aggregation hints
{
sql: "select count(*) from t t1, t t2 where t1.a = t2.b",
best: "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t1.a,test.t2.b)->StreamAgg",
warning: "",
},
{
sql: "select count(t1.a) from t t1, t t2 where t1.a = t2.a*2 group by t1.a",
best: "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))->Projection}(test.t1.a,mul(test.t2.a, 2))->HashAgg",
warning: "",
},
// with Aggregation hints
{
sql: "select /*+ TIDB_HASHAGG() */ count(*) from t t1, t t2 where t1.a = t2.b",
best: "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t1.a,test.t2.b)->HashAgg",
warning: "",
},
{
sql: "select /*+ TIDB_STREAMAGG() */ count(t1.a) from t t1, t t2 where t1.a = t2.a*2 group by t1.a",
best: "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))->Projection}(test.t1.a,mul(test.t2.a, 2))->Sort->StreamAgg",
warning: "",
},
// test conflict warning
{
sql: "select /*+ TIDB_HASHAGG() TIDB_STREAMAGG() */ count(*) from t t1, t t2 where t1.a = t2.b",
best: "LeftHashJoin{TableReader(Table(t))->TableReader(Table(t))}(test.t1.a,test.t2.b)->StreamAgg",
warning: "[planner:1815]Optimizer aggregation hints are conflicted",
},
}
ctx := context.Background()
for i, test := range tests {
comment := Commentf("case:%v sql:%s", i, test)
stmt, err := s.ParseOneStmt(test.sql, "", "")
c.Assert(err, IsNil, comment)

p, err := planner.Optimize(ctx, se, stmt, s.is)
c.Assert(err, IsNil)
c.Assert(core.ToString(p), Equals, test.best)

warnings := se.GetSessionVars().StmtCtx.GetWarnings()
if test.warning == "" {
c.Assert(len(warnings), Equals, 0)
} else {
c.Assert(len(warnings), Equals, 1)
c.Assert(warnings[0].Level, Equals, stmtctx.WarnLevelWarning)
c.Assert(warnings[0].Err.Error(), Equals, test.warning)
}
}
}
1 change: 1 addition & 0 deletions planner/core/planbuilder.go
Expand Up @@ -57,6 +57,7 @@ type tableHintInfo struct {
indexNestedLoopJoinTables []hintTableInfo
sortMergeJoinTables []hintTableInfo
hashJoinTables []hintTableInfo
preferAggType uint
}

type hintTableInfo struct {
Expand Down