diff --git a/executor/show_test.go b/executor/show_test.go index 7bebc1843db76..48f166b64da74 100644 --- a/executor/show_test.go +++ b/executor/show_test.go @@ -149,6 +149,16 @@ func (s *testSuite5) TestShowWarningsForExprPushdown(c *C) { tk.MustExec("explain select * from show_warnings_expr_pushdown where date_add(value, interval 1 day) = '2020-01-01'") c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(1)) tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Scalar function 'date_add'(signature: AddDateDatetimeInt) can not be pushed to tikv")) + tk.MustExec("explain select max(date_add(value, interval 1 day)) from show_warnings_expr_pushdown group by a") + c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(2)) + tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Scalar function 'date_add'(signature: AddDateDatetimeInt) can not be pushed to tikv", "Warning|1105|Aggregation can not be pushed to tikv because arguments of AggFunc `max` contains unsupported exprs")) + tk.MustExec("explain select max(a) from show_warnings_expr_pushdown group by date_add(value, interval 1 day)") + c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(2)) + tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Scalar function 'date_add'(signature: AddDateDatetimeInt) can not be pushed to tikv", "Warning|1105|Aggregation can not be pushed to tikv because groupByItems contain unsupported exprs")) + tk.MustExec("set tidb_opt_distinct_agg_push_down=0") + tk.MustExec("explain select max(distinct a) from show_warnings_expr_pushdown group by value") + c.Assert(tk.Se.GetSessionVars().StmtCtx.WarningCount(), Equals, uint16(1)) + tk.MustQuery("show warnings").Check(testutil.RowsWithSep("|", "Warning|1105|Aggregation can not be pushed to storage layer in non-mpp mode because it contains agg function with distinct")) } func (s *testSuite5) TestShowGrantsPrivilege(c *C) { diff --git a/planner/core/exhaust_physical_plans.go b/planner/core/exhaust_physical_plans.go index 3e955c5848ec3..1a78a4f212002 100644 --- a/planner/core/exhaust_physical_plans.go +++ b/planner/core/exhaust_physical_plans.go @@ -19,6 +19,7 @@ import ( "math" "sort" + "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/parser/ast" "github.com/pingcap/parser/model" @@ -2349,17 +2350,24 @@ func (la *LogicalAggregation) getStreamAggs(prop *property.PhysicalProperty) []P // TODO: support more operators and distinct later func (la *LogicalAggregation) checkCanPushDownToMPP() bool { + hasUnsupportedDistinct := false for _, agg := range la.AggFuncs { // MPP does not support distinct except count distinct now if agg.HasDistinct { if agg.Name != ast.AggFuncCount { - return false + hasUnsupportedDistinct = true } } // MPP does not support AggFuncApproxCountDistinct now if agg.Name == ast.AggFuncApproxCountDistinct { - return false + hasUnsupportedDistinct = true + } + } + if hasUnsupportedDistinct { + if la.ctx.GetSessionVars().StmtCtx.InExplainStmt { + la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("Aggregation can not be pushed to storage layer in mpp mode because it contains agg function with distinct")) } + return false } return CheckAggCanPushCop(la.ctx, la.AggFuncs, la.GroupByItems, kv.TiFlash) } @@ -2441,10 +2449,13 @@ func (la *LogicalAggregation) getHashAggs(prop *property.PhysicalProperty) []Phy taskTypes = append(taskTypes, property.CopTiFlashLocalReadTaskType) } canPushDownToTiFlash := la.canPushToCop(kv.TiFlash) - canPushDownToMPP := la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() && canPushDownToTiFlash + canPushDownToMPP := canPushDownToTiFlash && la.ctx.GetSessionVars().AllowMPPExecution && la.checkCanPushDownToMPP() if la.HasDistinct() { // TODO: remove after the cost estimation of distinct pushdown is implemented. if !la.ctx.GetSessionVars().AllowDistinctAggPushDown { + if la.ctx.GetSessionVars().StmtCtx.InExplainStmt { + la.ctx.GetSessionVars().StmtCtx.AppendWarning(errors.New("Aggregation can not be pushed to storage layer in non-mpp mode because it contains agg function with distinct")) + } taskTypes = []property.TaskType{property.RootTaskType} } } else if !la.aggHints.preferAggToCop { diff --git a/planner/core/task.go b/planner/core/task.go index 93776a059f8dd..e3fd122bb7d3a 100644 --- a/planner/core/task.go +++ b/planner/core/task.go @@ -1358,33 +1358,49 @@ func (sel *PhysicalSelection) attach2Task(tasks ...task) task { func CheckAggCanPushCop(sctx sessionctx.Context, aggFuncs []*aggregation.AggFuncDesc, groupByItems []expression.Expression, storeType kv.StoreType) bool { sc := sctx.GetSessionVars().StmtCtx client := sctx.GetClient() + ret := true + reason := "" for _, aggFunc := range aggFuncs { // if the aggFunc contain VirtualColumn or CorrelatedColumn, it can not be pushed down. if expression.ContainVirtualColumn(aggFunc.Args) || expression.ContainCorrelatedColumn(aggFunc.Args) { - return false - } - pb := aggregation.AggFuncToPBExpr(sc, client, aggFunc) - if pb == nil { - return false + reason = "expressions of AggFunc `" + aggFunc.Name + "` contain virtual column or correlated column, which is not supported now" + ret = false + break } if !aggregation.CheckAggPushDown(aggFunc, storeType) { - if sc.InExplainStmt { - storageName := storeType.Name() - if storeType == kv.UnSpecified { - storageName = "storage layer" - } - sc.AppendWarning(errors.New("Agg function '" + aggFunc.Name + "' can not be pushed to " + storageName)) - } - return false + reason = "AggFunc `" + aggFunc.Name + "` is not supported now" + ret = false + break } if !expression.CanExprsPushDown(sc, aggFunc.Args, client, storeType) { - return false + reason = "arguments of AggFunc `" + aggFunc.Name + "` contains unsupported exprs" + ret = false + break + } + pb := aggregation.AggFuncToPBExpr(sc, client, aggFunc) + if pb == nil { + reason = "AggFunc `" + aggFunc.Name + "` can not be converted to pb expr" + ret = false + break } } - if expression.ContainVirtualColumn(groupByItems) { - return false + if ret && expression.ContainVirtualColumn(groupByItems) { + reason = "groupByItems contain virtual columns, which is not supported now" + ret = false + } + if ret && !expression.CanExprsPushDown(sc, groupByItems, client, storeType) { + reason = "groupByItems contain unsupported exprs" + ret = false + } + + if !ret && sc.InExplainStmt { + storageName := storeType.Name() + if storeType == kv.UnSpecified { + storageName = "storage layer" + } + sc.AppendWarning(errors.New("Aggregation can not be pushed to " + storageName + " because " + reason)) } - return expression.CanExprsPushDown(sc, groupByItems, client, storeType) + return ret } // AggInfo stores the information of an Aggregation.