Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/sql/compile/sql_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,18 @@ func (s *sqlExecutor) getCompileContext(
proc *process.Process,
db string,
lower int64) *compilerContext {
return &compilerContext{
cc := &compilerContext{
ctx: ctx,
defaultDB: db,
engine: s.eng,
proc: proc,
lower: lower,
}
// For testing: check if a stats cache is provided in context
if statsCache, ok := ctx.Value("test_stats_cache").(*plan.StatsCache); ok {
cc.statsCache = statsCache
}
return cc
}

func (s *sqlExecutor) adjustOptions(
Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/compile/sql_executor_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/matrixorigin/matrixone/pkg/common/moerr"
"github.com/matrixorigin/matrixone/pkg/defines"
"github.com/matrixorigin/matrixone/pkg/logutil"
planpb "github.com/matrixorigin/matrixone/pkg/pb/plan"
pb "github.com/matrixorigin/matrixone/pkg/pb/statsinfo"
"github.com/matrixorigin/matrixone/pkg/perfcounter"
Expand Down Expand Up @@ -137,6 +138,17 @@ func (c *compilerContext) Stats(obj *plan.ObjectRef, snapshot *plan.Snapshot) (*
return nil, moerr.NewNoSuchTable(ctx, dbName, tableName)
}

// For testing only: check if stats cache already has stats for this table
// Only use cached stats if we're in a test environment (indicated by test_stats_cache in context)
// If so, use the cached stats instead of fetching from engine
if _, isTestEnv := c.GetContext().Value("test_stats_cache").(*plan.StatsCache); isTestEnv {
tableID := table.GetTableID(ctx)
if statsWrapper := c.statsCache.GetStatsInfo(tableID, false); statsWrapper != nil && statsWrapper.Stats != nil {
logutil.Infof("use test env cached stats for table %s (tableID=%d)", tableName, tableID)
return statsWrapper.Stats, nil
}
}

newCtx := perfcounter.AttachCalcTableStatsKey(ctx)
statsInfo, err := table.Stats(newCtx, true)
if err != nil {
Expand Down
123 changes: 122 additions & 1 deletion pkg/sql/plan/associative_law.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,65 @@

package plan

import "github.com/matrixorigin/matrixone/pkg/pb/plan"
import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/pb/plan"
)

// checkExprInvolvesTags checks if an expression references any of the specified tags
func checkExprInvolvesTags(expr *plan.Expr, tagsMap map[int32]bool) bool {
switch e := expr.Expr.(type) {
case *plan.Expr_Col:
return tagsMap[e.Col.RelPos]
case *plan.Expr_F:
for _, arg := range e.F.Args {
if checkExprInvolvesTags(arg, tagsMap) {
return true
}
}
case *plan.Expr_W:
if checkExprInvolvesTags(e.W.WindowFunc, tagsMap) {
return true
}
for _, order := range e.W.OrderBy {
if checkExprInvolvesTags(order.Expr, tagsMap) {
return true
}
}
}
return false
}

// migrateOnListConditions moves conditions involving specified tags from src to dst
func migrateOnListConditions(src *plan.Node, dst *plan.Node, tagsMap map[int32]bool) {
var kept []*plan.Expr
for _, cond := range src.OnList {
if checkExprInvolvesTags(cond, tagsMap) {
dst.OnList = append(dst.OnList, cond)
} else {
kept = append(kept, cond)
}
}
src.OnList = kept
}

// getTableNameOrLabel returns table name from node's TableDef, or a label (A/B/C) if not available
func (builder *QueryBuilder) getTableNameOrLabel(nodeID int32, label string) string {
node := builder.qry.Nodes[nodeID]
if node.TableDef != nil && node.TableDef.Name != "" {
return node.TableDef.Name
}
return label
}

// formatStatsInfo formats stats information for logging
func formatStatsInfo(stats *plan.Stats) string {
if stats == nil {
return "stats=nil"
}
return fmt.Sprintf("sel=%.4f,outcnt=%.2f,tablecnt=%.2f", stats.Selectivity, stats.Outcnt, stats.TableCnt)
}

// for A*(B*C), if C.sel>0.9 and B<C, change this to (A*B)*C
func (builder *QueryBuilder) applyAssociativeLawRule1(nodeID int32) int32 {
Expand Down Expand Up @@ -43,6 +101,25 @@ func (builder *QueryBuilder) applyAssociativeLawRule1(nodeID int32) int32 {
node.Children[1] = rightChild.NodeId
return node.NodeId
}

tagsC := builder.enumerateTags(NodeC.NodeId)
// Migrate OnList: conditions involving C must move to outer join
tagsCMap := make(map[int32]bool)
for _, tag := range tagsC {
tagsCMap[tag] = true
}
migrateOnListConditions(node, rightChild, tagsCMap)

// Record table names and stats after migration
tableNameA := builder.getTableNameOrLabel(node.Children[0], "A")
tableNameB := builder.getTableNameOrLabel(NodeB.NodeId, "B")
tableNameC := builder.getTableNameOrLabel(NodeC.NodeId, "C")
statsInfo := fmt.Sprintf("rule1: A=%s(stats:%s) B=%s(stats:%s) C=%s(stats:%s)",
tableNameA, formatStatsInfo(builder.qry.Nodes[node.Children[0]].Stats),
tableNameB, formatStatsInfo(NodeB.Stats),
tableNameC, formatStatsInfo(NodeC.Stats))
builder.optimizationHistory = append(builder.optimizationHistory, statsInfo)

rightChild.Children[0] = node.NodeId
ReCalcNodeStats(rightChild.NodeId, builder, true, false, true)
return rightChild.NodeId
Expand All @@ -67,6 +144,7 @@ func (builder *QueryBuilder) applyAssociativeLawRule2(nodeID int32) int32 {
if NodeC.Stats.Selectivity > 0.5 {
return nodeID
}
NodeA := builder.qry.Nodes[leftChild.Children[0]]
NodeB := builder.qry.Nodes[leftChild.Children[1]]
node.Children[0] = NodeB.NodeId
determineHashOnPK(node.NodeId, builder)
Expand All @@ -75,6 +153,24 @@ func (builder *QueryBuilder) applyAssociativeLawRule2(nodeID int32) int32 {
node.Children[0] = leftChild.NodeId
return node.NodeId
}

// Migrate OnList: conditions involving A must move to outer join
tagsAMap := make(map[int32]bool)
for _, tag := range builder.enumerateTags(NodeA.NodeId) {
tagsAMap[tag] = true
}
migrateOnListConditions(node, leftChild, tagsAMap)

// Record table names and stats after migration
tableNameA := builder.getTableNameOrLabel(NodeA.NodeId, "A")
tableNameB := builder.getTableNameOrLabel(NodeB.NodeId, "B")
tableNameC := builder.getTableNameOrLabel(NodeC.NodeId, "C")
statsInfo := fmt.Sprintf("rule2: A=%s(stats:%s) B=%s(stats:%s) C=%s(stats:%s)",
tableNameA, formatStatsInfo(NodeA.Stats),
tableNameB, formatStatsInfo(NodeB.Stats),
tableNameC, formatStatsInfo(NodeC.Stats))
builder.optimizationHistory = append(builder.optimizationHistory, statsInfo)

leftChild.Children[1] = node.NodeId
ReCalcNodeStats(leftChild.NodeId, builder, true, false, true)
return leftChild.NodeId
Expand Down Expand Up @@ -110,6 +206,31 @@ func (builder *QueryBuilder) applyAssociativeLawRule3(nodeID int32) int32 {
node.Children[0] = leftChild.NodeId
return node.NodeId
}

// Migrate OnList:
// - node: move conditions involving B to leftChild
// - leftChild: move conditions involving C to node
tagsBMap := make(map[int32]bool)
for _, tag := range builder.enumerateTags(NodeB.NodeId) {
tagsBMap[tag] = true
}
tagsCMap := make(map[int32]bool)
for _, tag := range builder.enumerateTags(NodeC.NodeId) {
tagsCMap[tag] = true
}
migrateOnListConditions(node, leftChild, tagsBMap)
migrateOnListConditions(leftChild, node, tagsCMap)

// Record table names and stats after migration
tableNameA := builder.getTableNameOrLabel(NodeA.NodeId, "A")
tableNameB := builder.getTableNameOrLabel(NodeB.NodeId, "B")
tableNameC := builder.getTableNameOrLabel(NodeC.NodeId, "C")
statsInfo := fmt.Sprintf("rule3: A=%s(stats:%s) B=%s(stats:%s) C=%s(stats:%s)",
tableNameA, formatStatsInfo(NodeA.Stats),
tableNameB, formatStatsInfo(NodeB.Stats),
tableNameC, formatStatsInfo(NodeC.Stats))
builder.optimizationHistory = append(builder.optimizationHistory, statsInfo)

leftChild.Children[0] = node.NodeId
ReCalcNodeStats(leftChild.NodeId, builder, true, false, true)
return leftChild.NodeId
Expand Down
13 changes: 13 additions & 0 deletions pkg/sql/plan/join_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package plan

import (
"fmt"
"sort"
"strings"

Expand Down Expand Up @@ -204,6 +205,7 @@ func HasColExpr(expr *plan.Expr, pos int32) int32 {
}

func (builder *QueryBuilder) determineJoinOrder(nodeID int32) int32 {
originalNodeID := nodeID
if builder.optimizerHints != nil && builder.optimizerHints.joinOrdering != 0 {
return nodeID
}
Expand All @@ -223,6 +225,9 @@ func (builder *QueryBuilder) determineJoinOrder(nodeID int32) int32 {
}

leaves, conds := builder.gatherJoinLeavesAndConds(node, nil, nil)
// Record middle: gathered leaves and conditions
builder.optimizationHistory = append(builder.optimizationHistory,
fmt.Sprintf("determineJoinOrder:middle (nodeID: %d, leaves: %d, conds: %d)", nodeID, len(leaves), len(conds)))
newConds := deduceNewOnList(conds)
conds = append(conds, newConds...)
vertices := builder.getJoinGraph(leaves, conds)
Expand Down Expand Up @@ -347,6 +352,14 @@ func (builder *QueryBuilder) determineJoinOrder(nodeID int32) int32 {
FilterList: conds,
}, nil)
}
// Record after determineJoinOrder
if nodeID != originalNodeID {
builder.optimizationHistory = append(builder.optimizationHistory,
fmt.Sprintf("determineJoinOrder:after (nodeID: %d -> %d, remainingConds: %d)", originalNodeID, nodeID, len(conds)))
} else {
builder.optimizationHistory = append(builder.optimizationHistory,
fmt.Sprintf("determineJoinOrder:after (nodeID: %d, no change, remainingConds: %d)", nodeID, len(conds)))
}
return nodeID
}

Expand Down
12 changes: 10 additions & 2 deletions pkg/sql/plan/opt_misc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@
package plan

import (
"context"
"testing"

"github.com/matrixorigin/matrixone/pkg/container/types"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/stretchr/testify/require"
"testing"
)

func TestRemapWindowClause(t *testing.T) {
Expand All @@ -43,7 +45,13 @@ func TestRemapWindowClause(t *testing.T) {
Typ: plan.Type{},
}
colMap := make(map[[2]int32][2]int32)
var b *QueryBuilder
var b *QueryBuilder = &QueryBuilder{
compCtx: &MockCompilerContext{
ctx: context.Background(),
},
optimizationHistory: []string{"test optimization history"},
}
err := b.remapWindowClause(f, 1, 1, colMap, nil)
t.Log(err)
require.Error(t, err)
}
23 changes: 23 additions & 0 deletions pkg/sql/plan/pushdown.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,19 @@
package plan

import (
"fmt"

"github.com/matrixorigin/matrixone/pkg/catalog"
"github.com/matrixorigin/matrixone/pkg/objectio"
"github.com/matrixorigin/matrixone/pkg/pb/plan"
"github.com/matrixorigin/matrixone/pkg/vectorindex/metric"
)

func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr, separateNonEquiConds bool) (int32, []*plan.Expr) {
originalNodeID := nodeID
// Record before pushdownFilters
builder.optimizationHistory = append(builder.optimizationHistory,
fmt.Sprintf("pushdownFilters:before (nodeID: %d, nodeType: %s, filters: %d)", nodeID, builder.qry.Nodes[nodeID].NodeType, len(filters)))
node := builder.qry.Nodes[nodeID]

var canPushdown, cantPushdown []*plan.Expr
Expand Down Expand Up @@ -150,6 +156,9 @@ func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr,
}

case plan.Node_JOIN:
// Record middle: processing JOIN node
builder.optimizationHistory = append(builder.optimizationHistory,
fmt.Sprintf("pushdownFilters:middle (nodeID: %d, JOIN, filters: %d, onList: %d)", nodeID, len(filters), len(node.OnList)))
leftTags := make(map[int32]bool)
for _, tag := range builder.enumerateTags(node.Children[0]) {
leftTags[tag] = true
Expand Down Expand Up @@ -402,6 +411,9 @@ func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr,
node.Children[1] = childID

case plan.Node_UNION, plan.Node_UNION_ALL, plan.Node_MINUS, plan.Node_MINUS_ALL, plan.Node_INTERSECT, plan.Node_INTERSECT_ALL:
// Record middle: processing UNION/MINUS/INTERSECT node
builder.optimizationHistory = append(builder.optimizationHistory,
fmt.Sprintf("pushdownFilters:middle (nodeID: %d, %s, filters: %d)", nodeID, node.NodeType, len(filters)))
leftChild := builder.qry.Nodes[node.Children[0]]
rightChild := builder.qry.Nodes[node.Children[1]]
var canPushDownRight []*plan.Expr
Expand Down Expand Up @@ -457,6 +469,9 @@ func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr,
node.Children[0] = childID

case plan.Node_TABLE_SCAN, plan.Node_EXTERNAL_SCAN:
// Record middle: processing TABLE_SCAN/EXTERNAL_SCAN node
builder.optimizationHistory = append(builder.optimizationHistory,
fmt.Sprintf("pushdownFilters:middle (nodeID: %d, %s, filters: %d)", nodeID, node.NodeType, len(filters)))
for _, filter := range filters {
if onlyContainsTag(filter, node.BindingTags[0]) {
node.FilterList = append(node.FilterList, filter)
Expand Down Expand Up @@ -505,6 +520,14 @@ func (builder *QueryBuilder) pushdownFilters(nodeID int32, filters []*plan.Expr,
}
}

// Record after pushdownFilters
if nodeID != originalNodeID {
builder.optimizationHistory = append(builder.optimizationHistory,
fmt.Sprintf("pushdownFilters:after (nodeID: %d -> %d, cantPushdown: %d)", originalNodeID, nodeID, len(cantPushdown)))
} else {
builder.optimizationHistory = append(builder.optimizationHistory,
fmt.Sprintf("pushdownFilters:after (nodeID: %d, no change, cantPushdown: %d)", nodeID, len(cantPushdown)))
}
return nodeID, cantPushdown
}

Expand Down
12 changes: 12 additions & 0 deletions pkg/sql/plan/query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func NewQueryBuilder(queryType plan.Query_StatementType, ctx CompilerContext, is
isPrepareStatement: isPrepareStatement,
deleteNode: make(map[uint64]int32),
skipStats: skipStats,
optimizationHistory: make([]string, 0),
}
}

Expand Down Expand Up @@ -193,6 +194,17 @@ func (builder *QueryBuilder) buildRemapErrorMessage(
}
}

// Optimization history
if len(builder.optimizationHistory) > 0 {
sb.WriteString("🔧 Optimization History:\n")
for _, hist := range builder.optimizationHistory {
sb.WriteString(fmt.Sprintf(" - %s\n", hist))
}
sb.WriteString("\n")
} else {
sb.WriteString("🔧 Optimization History: (no associatelaw applied)\n\n")
}

// Available columns
if len(colMap) > 0 {
sb.WriteString("✅ Available Columns in Context:\n")
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/plan/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,10 @@ type QueryBuilder struct {
aggSpillMem int64

optimizerHints *OptimizerHints

// optimizationHistory records key optimization steps for debugging remap errors
// Only records when optimizations actually change the plan structure
optimizationHistory []string
}

type OptimizerHints struct {
Expand Down
Loading
Loading