Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
"operator_info": "cast(test.t.a, decimal(10,0) BINARY)->Column#10"
}
],
"cost": 2007109896.2117057,
"cost": 2007109894.6117058,
"est_rows": 1,
"act_rows": 1,
"task_type": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@
"operator_info": "cast(test.t.a, decimal(10,0) BINARY)->Column#10"
}
],
"cost": 2007109896.2117057,
"cost": 2007109894.6117058,
"est_rows": 1,
"act_rows": 1,
"task_type": 1,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
{
"SQL": "explain format = 'verbose' select count(*) from t2 group by a",
"Plan": [
"HashAgg_8 3.00 1706.09 root group by:test.t2.a, funcs:count(1)->Column#5",
"HashAgg_8 3.00 1729.13 root group by:test.t2.a, funcs:count(1)->Column#5",
"└─TableReader_17 3.00 58.13 root data:TableFullScan_16",
" └─TableFullScan_16 3.00 681.92 cop[tikv] table:t2 keep order:false"
]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
{
"SQL": "explain format = 'verbose' select count(*) from t2 group by a",
"Plan": [
"HashAgg_8 3.00 1706.09 root group by:test.t2.a, funcs:count(1)->Column#5",
"HashAgg_8 3.00 1729.13 root group by:test.t2.a, funcs:count(1)->Column#5",
"└─TableReader_17 3.00 58.13 root data:TableFullScan_16",
" └─TableFullScan_16 3.00 681.92 cop[tikv] table:t2 keep order:false"
]
Expand Down
34 changes: 17 additions & 17 deletions pkg/planner/core/casetest/tpch/testdata/tpch_suite_out.json

Large diffs are not rendered by default.

34 changes: 17 additions & 17 deletions pkg/planner/core/casetest/tpch/testdata/tpch_suite_xut.json

Large diffs are not rendered by default.

85 changes: 82 additions & 3 deletions pkg/planner/core/plan_cost_ver2.go
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,59 @@ func getPlanCostVer24PhysicalStreamAgg(pp base.PhysicalPlan, taskType property.T
return p.PlanCostVer2, nil
}

// getPlanCostVer24PhysicalHashAgg returns the plan-cost of this sub-plan, which is:
// plan-cost = child-cost + (agg-cost + group-cost + hash-build-cost + hash-probe-cost) / concurrency
// childCanProvideOrderForStreamAgg returns true if child preserves ordering
// through to a base-table access path.
//
// This is a conservative approximation: a base-table access path may still
// need a Sort if no index covers the GROUP BY columns. The planner's
// possible-property tracking already filters out StreamAgg alternatives that
// can't be satisfied by an index, so the false-positive rate in practice is
// low, and we accept the small over-count to avoid threading TableInfo
// through the cost path.
func childCanProvideOrderForStreamAgg(child base.PhysicalPlan) bool {
for cur := child; cur != nil; {
switch cur.(type) {
case *physicalop.PhysicalProjection, *physicalop.PhysicalSelection,
*physicalop.PhysicalUnionScan:
children := cur.Children()
if len(children) != 1 {
return false
}
cur = children[0]
case *physicalop.PhysicalIndexReader, *physicalop.PhysicalIndexLookUpReader,
*physicalop.PhysicalIndexMergeReader, *physicalop.PhysicalTableReader:
return true
default:
return false
}
}
return false
}

// getPlanCostVer24PhysicalHashAgg returns the plan-cost of this sub-plan.
//
// For TiDB root tasks:
//
// plan-cost = child-cost + hash-mem-cost + (agg-cost + group-cost + hash-build-cpu-cost + hash-probe-cost) / concurrency
//
// All CPU work (aggregation, grouping, hash key computation, probing) is parallelized
// across partial workers that each process a disjoint subset of input rows, so these
// costs are divided by concurrency. However, the hash table memory cost is placed outside
// the division: each partial worker maintains its own hash table, so total memory
// consumption scales with outputRows (NDV) regardless of concurrency. For high-NDV
// GROUP BY with an available ordered index, this memory penalty makes StreamAgg
// (which uses ~constant memory) the preferred plan.
//
// The memory penalty is only applied when the HashAgg's child can provide ordering on
// the GROUP BY keys naturally (e.g., from an ordered index scan). When no such ordering
// is available, the StreamAgg alternative would need an explicit Sort whose own cost
// already correctly disfavors it, so adding the memory penalty here would double-count
// and steer the optimizer toward Sort+StreamAgg even when HashAgg is genuinely cheaper.
//
// For TiFlash MPP and TiKV cop tasks, data is either partitioned across nodes (MPP) or
// processed single-threaded on TiKV (cop). All costs use the original formula:
//
// plan-cost = child-cost + (agg-cost + group-cost + hash-build-cost + hash-probe-cost) / concurrency
func getPlanCostVer24PhysicalHashAgg(pp base.PhysicalPlan, taskType property.TaskType, option *costusage.PlanCostOption, isChildOfINL ...bool) (costusage.CostVer2, error) {
p := pp.(*physicalop.PhysicalHashAgg)
if p.PlanCostInit && !hasCostFlag(option.CostFlag, costusage.CostFlagRecalculate) {
Expand All @@ -647,7 +698,35 @@ func getPlanCostVer24PhysicalHashAgg(pp base.PhysicalPlan, taskType property.Tas
return costusage.ZeroCostVer2, err
}

p.PlanCostVer2 = costusage.SumCostVer2(startCost, childCost, costusage.DivCostVer2(costusage.SumCostVer2(aggCost, groupCost, hashBuildCost, hashProbeCost), concurrency))
if taskType == property.RootTaskType {
var hashMemCost costusage.CostVer2
if childCanProvideOrderForStreamAgg(p.Children()[0]) {
hashMemCost = costusage.NewCostVer2(option, memFactor,
concurrency*outputRows*outputRowSize*memFactor.Value,
func() string {
return fmt.Sprintf("hashmem(%v*%v*%v*%v)", concurrency, outputRows, outputRowSize, memFactor)
})
} else {
hashMemCost = costusage.ZeroCostVer2
}
// hashBuildCost includes memory; subtract it out so we don't double-count.
// Recompute just the CPU portion of hash build (key computation + build).
nKeys := float64(len(p.GroupByItems))
hashBuildCPUCost := costusage.NewCostVer2(option, cpuFactor,
outputRows*nKeys*cpuFactor.Value+outputRows*cpuFactor.Value,
func() string {
return fmt.Sprintf("hashkey(%v*%v*%v)+hashbuild(%v*%v)", outputRows, nKeys, cpuFactor, outputRows, cpuFactor)
})
p.PlanCostVer2 = costusage.SumCostVer2(startCost, childCost, hashMemCost,
costusage.DivCostVer2(costusage.SumCostVer2(aggCost, groupCost, hashBuildCPUCost, hashProbeCost), concurrency))
} else {
// MPP and cop tasks: data is either partitioned (MPP) or processed by a single
// TiKV thread (cop), so the TiDB root concurrency factor does not apply.
// Use the original formula where all costs are divided by concurrency (for MPP)
// or treated as single-threaded work (for cop, where concurrency=1 effectively).
p.PlanCostVer2 = costusage.SumCostVer2(startCost, childCost,
costusage.DivCostVer2(costusage.SumCostVer2(aggCost, groupCost, hashBuildCost, hashProbeCost), concurrency))
}
p.PlanCostInit = true
// Multiply by cost factor - defaults to 1, but can be increased/decreased to influence the cost model
p.PlanCostVer2 = costusage.MulCostVer2(p.PlanCostVer2, p.SCtx().GetSessionVars().HashAggCostFactor)
Expand Down
145 changes: 145 additions & 0 deletions pkg/planner/core/plan_cost_ver2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"math"
"regexp"
"strconv"
"strings"
"testing"
Expand Down Expand Up @@ -811,3 +812,147 @@ func TestScanOnSmallTable(t *testing.T) {
}
require.True(t, useTiKVScan, "should use tikv scan, but got:\n%s", resStr)
}

func TestHashAggMemCostNotDividedByConcurrency(t *testing.T) {
// Verify that for high-NDV GROUP BY with an available index on a table with
// wide rows, the hash table memory cost (placed outside /concurrency) is
// significant enough that StreamAgg (with ordered index scan and ~constant
// memory) is cheaper than HashAgg. This validates that hash table memory is
// not artificially discounted by parallelism.
testkit.RunTestUnderCascadesWithDomain(t, func(t *testing.T, tk *testkit.TestKit, dom *domain.Domain, cascades, caller string) {
store := tk.Session().GetStore()
defer func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("drop table if exists t_high_ndv")
dom.StatsHandle().Clear()
}()

tk.MustExec("use test")
tk.MustExec("drop table if exists t_high_ndv")
// The hash table memory cost is based on outputRows * outputRowSize, so the
// SELECT must include wide output columns (not just count(*)) for memory to
// become the dominant factor in the cost model.
tk.MustExec("create table t_high_ndv (a int, b int, c varchar(200), d varchar(200), key(b))")

// Insert rows where every b value is unique (100% NDV).
var buf strings.Builder
buf.WriteString("insert into t_high_ndv values ")
for i := 0; i < 1000; i++ {
if i > 0 {
buf.WriteString(",")
}
buf.WriteString(fmt.Sprintf("(%d,%d,'%s','%s')", i, i, "padding-data-for-wide-rows", "more-padding-data-here"))
}
tk.MustExec(buf.String())
tk.MustExec("analyze table t_high_ndv")

// With default cost factors, force both plans via hints and compare costs.
// Including max(c), max(d) in the SELECT makes the output rows wide, which
// means the hash table (one per partial worker) consumes significant memory.
// StreamAgg on indexed input should be cheaper than HashAgg in this scenario.
q := "select b, count(*), max(c), max(d) from t_high_ndv use index(b) group by b"
rs := tk.MustQuery("explain format=verbose select /*+ STREAM_AGG() */ " + q[len("select "):]).Rows()
streamCost, err := strconv.ParseFloat(rs[0][2].(string), 64)
require.NoError(t, err)

rs = tk.MustQuery("explain format=verbose select /*+ HASH_AGG() */ " + q[len("select "):]).Rows()
hashCost, err := strconv.ParseFloat(rs[0][2].(string), 64)
require.NoError(t, err)

// StreamAgg should be cheaper than HashAgg for high-NDV GROUP BY with wide
// output rows and an available index.
require.Less(t, streamCost, hashCost,
"StreamAgg (cost=%.2f) should be cheaper than HashAgg (cost=%.2f) for high-NDV GROUP BY with wide output and index",
streamCost, hashCost)
})
}

func TestHashAggMemCostGatedOnFreeOrdering(t *testing.T) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test appears to pass without your fix. Perhaps we should redesign the case to ensure it can verify that the gate functions correctly. I typically verify it by running these regression tests without fixes to check if it truly serves as the regression test.

Please correct me if I misunderstood this case.

// Companion to TestHashAggMemCostNotDividedByConcurrency: verify that the
// HashAgg memory penalty is gated on whether the child can naturally
// provide ordering on the GROUP BY keys. Without this gate the inflated
// penalty also fires for GROUP BY over a join output, where the StreamAgg
// alternative would need an explicit Sort whose own cost already disfavors
// it; double-counting would steer the optimizer toward Sort+StreamAgg.
//
// Behavioral assertions on plan choice are unreliable here because the
// Sort cost over a 1000-row join is already large enough that HashAgg wins
// either way. Instead, inspect the HashAgg cost trace directly:
//
// * HashAgg's added memory penalty traces as
// `hashmem(<concurrency>*<rows>*<rowSize>*tidb_mem_factor(...))`
// — three numeric tokens before the factor.
// * HashJoin's own (unrelated) memory term traces as
// `hashmem(<rows>*<rowSize>*tidb_mem_factor(...))` — only two
// numeric tokens before the factor.
//
// Matching the three-token pattern at the HashAgg row therefore detects
// only the gated penalty, regardless of any HashJoin in the subtree.
testkit.RunTestUnderCascadesWithDomain(t, func(t *testing.T, tk *testkit.TestKit, dom *domain.Domain, cascades, caller string) {
store := tk.Session().GetStore()
defer func() {
tk2 := testkit.NewTestKit(t, store)
tk2.MustExec("use test")
tk2.MustExec("drop table if exists t_indexed, t_join_a, t_join_b")
dom.StatsHandle().Clear()
}()

tk.MustExec("use test")
tk.MustExec("drop table if exists t_indexed, t_join_a, t_join_b")
tk.MustExec("create table t_indexed (a int, b int, c varchar(200), key(b))")
tk.MustExec("create table t_join_a (k int, v int, c varchar(200))")
tk.MustExec("create table t_join_b (k int, w int, d varchar(200))")
var bufIdx, bufA, bufB strings.Builder
bufIdx.WriteString("insert into t_indexed values ")
bufA.WriteString("insert into t_join_a values ")
bufB.WriteString("insert into t_join_b values ")
for i := 0; i < 1000; i++ {
if i > 0 {
bufIdx.WriteString(",")
bufA.WriteString(",")
bufB.WriteString(",")
}
bufIdx.WriteString(fmt.Sprintf("(%d,%d,'padding-data-for-wide-rows')", i, i))
bufA.WriteString(fmt.Sprintf("(%d,%d,'padding-data-for-wide-rows')", i, i))
bufB.WriteString(fmt.Sprintf("(%d,%d,'more-padding-data-here')", i, i))
}
tk.MustExec(bufIdx.String())
tk.MustExec(bufA.String())
tk.MustExec(bufB.String())
tk.MustExec("analyze table t_indexed")
tk.MustExec("analyze table t_join_a")
tk.MustExec("analyze table t_join_b")

// Three numeric tokens before tidb_mem_factor isolates HashAgg's
// memory penalty from any HashJoin hashmem term in the subtree.
hashAggMemPattern := regexp.MustCompile(`hashmem\([0-9.]+\*[0-9.]+\*[0-9.]+\*tidb_mem_factor`)

hashAggTrace := func(query string) string {
rows := tk.MustQuery("explain format='cost_trace' " + query).Rows()
for _, r := range rows {
if strings.Contains(r[0].(string), "HashAgg") {
return r[3].(string)
}
}
t.Fatalf("HashAgg not found in plan for %q", query)
return ""
}

// Free ordering available (HashAgg over an ordered index scan): the
// memory penalty must be applied — the index gives StreamAgg a
// sort-free alternative, and we want HashAgg to be charged its
// concurrent-hash-table cost so the optimizer can compare fairly.
traceFree := hashAggTrace("select /*+ HASH_AGG() */ b, count(*), max(c) from t_indexed use index(b) group by b")
require.Regexp(t, hashAggMemPattern, traceFree,
"HashAgg over an ordered index scan should include the memory penalty term, got: %s", traceFree)

// No free ordering (HashAgg over a hash-join output): the memory
// penalty must be skipped — StreamAgg would need an explicit Sort
// whose cost already disfavors it; adding the penalty here would
// double-count.
traceJoin := hashAggTrace("select /*+ HASH_AGG() */ t_join_a.v, max(t_join_a.c), max(t_join_b.d) from t_join_a join t_join_b on t_join_a.k = t_join_b.k group by t_join_a.v")
require.NotRegexp(t, hashAggMemPattern, traceJoin,
"HashAgg over a join output must NOT include the memory penalty (gated on free ordering), got: %s", traceJoin)
})
}
2 changes: 1 addition & 1 deletion tests/integrationtest/r/planner/core/integration.result
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ StreamAgg_14 1.00 304652.01 root funcs:count(Column#5)->Column#4
└─TableFullScan_13 10000.00 4070000.00 cop[tikv] table:t keep order:false, stats:pseudo
explain format='verbose' select /*+ hash_agg() */ count(*) from t;
id estRows estCost task access object operator info
HashAgg_9 1.00 286269.86 root funcs:count(Column#5)->Column#4
HashAgg_9 1.00 286277.54 root funcs:count(Column#5)->Column#4
└─TableReader_10 1.00 284742.60 root data:HashAgg_5
└─HashAgg_5 1.00 4271107.30 cop[tikv] funcs:count(1)->Column#5
└─TableFullScan_8 10000.00 4070000.00 cop[tikv] table:t keep order:false, stats:pseudo
Expand Down
25 changes: 25 additions & 0 deletions tests/integrationtest/r/planner/core/plan_cost_ver2.result
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,28 @@ id estRows estCost costFormula actRows task access object execution info operato
TableReader_7 3323.33 13580.23 ((((cpu(0*filters(1)*tikv_cpu_factor(49.9))) + (((scan(1*logrowsize(32)*tikv_scan_factor(40.7))) + (scan(1000*logrowsize(32)*tikv_scan_factor(40.7))))*1.00)) + (net(0*rowsize(16)*tidb_kv_net_factor(3.96))))/15.00)*1.00 0 root <execution_info> <operator_info> <memory> <disk>
└─Selection_6 3323.33 203703.50 (cpu(0*filters(1)*tikv_cpu_factor(49.9))) + (((scan(1*logrowsize(32)*tikv_scan_factor(40.7))) + (scan(1000*logrowsize(32)*tikv_scan_factor(40.7))))*1.00) 0 cop[tikv] <execution_info> <operator_info> <memory> <disk>
└─TableFullScan_5 10000.00 203703.50 ((scan(1*logrowsize(32)*tikv_scan_factor(40.7))) + (scan(1000*logrowsize(32)*tikv_scan_factor(40.7))))*1.00 0 cop[tikv] table:t <execution_info> <operator_info> <memory> <disk>
drop table if exists t_idx;
create table t_idx (b int, c int, d varchar(200), e varchar(200), primary key (b, c) clustered);
insert into t_idx select n*100+m*10+p, n*100+m*10+p, repeat('d',200), repeat('e',200) from (select 0 n union all select 1 union all select 2 union all select 3 union all select 4 union all select 5 union all select 6 union all select 7 union all select 8 union all select 9) a, (select 0 m union all select 1 union all select 2 union all select 3 union all select 4 union all select 5 union all select 6 union all select 7 union all select 8 union all select 9) b, (select 0 p union all select 1 union all select 2 union all select 3 union all select 4 union all select 5 union all select 6 union all select 7 union all select 8 union all select 9) c;
analyze table t_idx;
explain format='brief' select b, max(d), max(e) from t_idx group by b;
id estRows task access object operator info
Projection 1000.00 root planner__core__plan_cost_ver2.t_idx.b, Column#6, Column#7
└─StreamAgg 1000.00 root group by:planner__core__plan_cost_ver2.t_idx.b, funcs:max(Column#11)->Column#6, funcs:max(Column#12)->Column#7, funcs:firstrow(planner__core__plan_cost_ver2.t_idx.b)->planner__core__plan_cost_ver2.t_idx.b
└─TableReader 1000.00 root data:StreamAgg
└─StreamAgg 1000.00 cop[tikv] group by:planner__core__plan_cost_ver2.t_idx.b, funcs:max(planner__core__plan_cost_ver2.t_idx.d)->Column#11, funcs:max(planner__core__plan_cost_ver2.t_idx.e)->Column#12
└─TableFullScan 1000.00 cop[tikv] table:t_idx keep order:true
drop table if exists t_join_a, t_join_b;
create table t_join_a (k int, v int, c varchar(200));
create table t_join_b (k int, w int, d varchar(200));
explain format='brief' select t_join_a.v, max(t_join_a.c), max(t_join_b.d) from t_join_a join t_join_b on t_join_a.k = t_join_b.k group by t_join_a.v;
id estRows task access object operator info
Projection 7992.00 root planner__core__plan_cost_ver2.t_join_a.v, Column#11, Column#12
└─HashAgg 7992.00 root group by:planner__core__plan_cost_ver2.t_join_a.v, funcs:max(planner__core__plan_cost_ver2.t_join_a.c)->Column#11, funcs:max(planner__core__plan_cost_ver2.t_join_b.d)->Column#12, funcs:firstrow(planner__core__plan_cost_ver2.t_join_a.v)->planner__core__plan_cost_ver2.t_join_a.v
└─HashJoin 12487.50 root inner join, equal:[eq(planner__core__plan_cost_ver2.t_join_a.k, planner__core__plan_cost_ver2.t_join_b.k)]
├─TableReader(Build) 9990.00 root data:Selection
│ └─Selection 9990.00 cop[tikv] not(isnull(planner__core__plan_cost_ver2.t_join_b.k))
│ └─TableFullScan 10000.00 cop[tikv] table:t_join_b keep order:false, stats:pseudo
└─TableReader(Probe) 9990.00 root data:Selection
└─Selection 9990.00 cop[tikv] not(isnull(planner__core__plan_cost_ver2.t_join_a.k))
└─TableFullScan 10000.00 cop[tikv] table:t_join_a keep order:false, stats:pseudo
Loading
Loading