Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 40 additions & 4 deletions pkg/objectio/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,16 @@ type mataCacheKey [cacheKeyLen]byte
var metaCache *fifocache.Cache[mataCacheKey, []byte]
var onceInit sync.Once

// metaLoadGroup deduplicates concurrent loads for the same cache key,
// preventing cache stampede when many goroutines miss the same entry simultaneously.
var metaLoadGroup sync.Map // mataCacheKey -> *loadCall

type loadCall struct {
done chan struct{}
val []byte
err error
}

func metaCacheSize() int64 {
v, err := mem.VirtualMemory()
if err != nil {
Expand Down Expand Up @@ -174,11 +184,13 @@ func LoadObjectMetaByExtent(
zap.String("name", name.String()),
zap.String("extent", extent.String()))
}
if v, err = ReadExtent(ctx, name.UnsafeString(), extent, policy, fs, constructorFactory); err != nil {
v, err = dedupLoad(ctx, key, func() ([]byte, error) {
return ReadExtent(ctx, name.UnsafeString(), extent, policy, fs, constructorFactory)
})
if err != nil {
return
}
meta = MustObjectMeta(v)
metaCache.Set(ctx, key, v[:], int64(len(v)))
return
}

Expand Down Expand Up @@ -216,14 +228,38 @@ func LoadBFWithMeta(
return v, nil
}
extent := meta.BlockHeader().BFExtent()
bf, err := ReadBloomFilter(ctx, location.Name().String(), &extent, fileservice.SkipFullFilePreloads, fs)
bf, err := dedupLoad(ctx, key, func() ([]byte, error) {
return ReadBloomFilter(ctx, location.Name().String(), &extent, fileservice.SkipFullFilePreloads, fs)
})
if err != nil {
return nil, err
}
metaCache.Set(ctx, key, bf, int64(len(bf)))
return bf, nil
}

// dedupLoad ensures that for a given cache key, only one goroutine performs
// the actual I/O load. Other concurrent callers for the same key wait and
// share the result. This prevents cache stampede under high concurrency.
func dedupLoad(ctx context.Context, key mataCacheKey, load func() ([]byte, error)) ([]byte, error) {
call := &loadCall{done: make(chan struct{})}
if actual, loaded := metaLoadGroup.LoadOrStore(key, call); loaded {
existing := actual.(*loadCall)
select {
case <-existing.done:
return existing.val, existing.err
case <-ctx.Done():
return nil, ctx.Err()
}
}
defer metaLoadGroup.Delete(key)
call.val, call.err = load()
if call.err == nil {
metaCache.Set(ctx, key, call.val, int64(len(call.val)))
}
close(call.done)
return call.val, call.err
}

func FastLoadObjectMeta(
ctx context.Context,
location *Location,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/group/exec2.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ const (

func (group *Group) Prepare(proc *process.Process) (err error) {
group.ctr.state = vm.Build
if group.ctr.mp != nil {
group.ctr.free()
}
group.ctr.mp = mpool.MustNewNoLock("group_mpool")

// debug,
Expand Down
3 changes: 3 additions & 0 deletions pkg/sql/colexec/group/mergeGroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ import (

func (mergeGroup *MergeGroup) Prepare(proc *process.Process) error {
mergeGroup.ctr.state = vm.Build
if mergeGroup.ctr.mp != nil {
mergeGroup.ctr.free()
}
mergeGroup.ctr.mp = mpool.MustNew("merge_group_mpool")
mergeGroup.ctr.groupByTypes = nil
mergeGroup.ctr.keyNullable = false
Expand Down
18 changes: 18 additions & 0 deletions pkg/sql/colexec/insert/insert.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@ import (
"github.com/matrixorigin/matrixone/pkg/vm/process"
)

// flushSemaphore limits concurrent flushS3WriterOnMemoryPressure calls to
// prevent thundering-herd mpool explosion when many workers are denied memory
// simultaneously and all try to flush + read objectio metadata at once.
var flushSemaphore = make(chan struct{}, 4)

const opName = "insert"

func (insert *Insert) String(buf *bytes.Buffer) {
Expand Down Expand Up @@ -275,6 +280,15 @@ func (insert *Insert) flushS3WriterOnMemoryPressure(proc *process.Process, analy
}
}()

// Limit concurrent flushes to avoid thundering-herd OOM when many
// workers are denied memory and all flush simultaneously.
select {
case flushSemaphore <- struct{}{}:
case <-proc.Ctx.Done():
return proc.Ctx.Err()
}
defer func() { <-flushSemaphore }()

crs := analyzer.GetOpCounterSet()
newCtx := perfcounter.AttachS3RequestKey(proc.Ctx, crs)

Expand All @@ -296,6 +310,10 @@ func (insert *Insert) flushS3WriterOnMemoryPressure(proc *process.Process, analy
}

insert.releaseS3MemGrant()

// After flushing, release throttle grant and force-refresh so subsequent
// acquires by this or other workers see the freed capacity immediately.
forcedRefresh(insert.ctr.s3MemThrottler)
return nil
}

Expand Down
32 changes: 27 additions & 5 deletions pkg/sql/plan/apply_indices.go
Original file line number Diff line number Diff line change
Expand Up @@ -911,7 +911,7 @@ func (builder *QueryBuilder) applyIndicesForFiltersRegularIndex(nodeID int32, no
return nodeID
}
}
if node.Stats.Selectivity > InFilterSelectivityLimit || node.Stats.Outcnt > float64(GetInFilterCardLimitOnPK(builder.compCtx.GetProcess().GetService(), node.Stats.TableCnt)) {
if node.Stats.Selectivity >= InFilterSelectivityLimit || node.Stats.Outcnt >= float64(GetInFilterCardLimitOnPK(builder.compCtx.GetProcess().GetService(), node.Stats.TableCnt)) {
return nodeID
}
}
Expand Down Expand Up @@ -1234,6 +1234,12 @@ func (builder *QueryBuilder) tryIndexOnlyScan(idxDef *IndexDef, node *plan.Node,
leadingPos = tryMatchMoreLeadingFilters(idxDef, node, leadingPos[0])
}

if !leadingEqualCond && node.Stats != nil && node.Stats.TableCnt >= 50000 {
if node.Stats.Selectivity >= InFilterSelectivityLimit || node.Stats.Outcnt >= float64(InFilterCardLimitNonPK) {
return -1
}
}

missFilterIdx := make([]int, 0, len(node.FilterList))
for i := range node.FilterList {
isLeading := false
Expand Down Expand Up @@ -1509,6 +1515,11 @@ func (builder *QueryBuilder) getIndexForNonEquiCond(indexes []*IndexDef, node *p
if numParts > 1 && hasUnsafeRangeOp(fn) {
continue
}
if isSingleRangeOp(fn) && node.Stats != nil && node.Stats.TableCnt >= 50000 {
if node.Stats.Selectivity >= InFilterSelectivityLimit || node.Stats.Outcnt >= float64(InFilterCardLimitNonPK) {
continue
}
}
}
return idxPos, []int32{int32(i)}
}
Expand All @@ -1518,12 +1529,15 @@ func (builder *QueryBuilder) getIndexForNonEquiCond(indexes []*IndexDef, node *p

// hasUnsafeRangeOp returns true if fn (or any OR arm within it) uses <= or >
// which are unsafe on serialized multi-part composite index keys.
// hasUnsafeRangeOp checks whether a filter expression contains <= or > operators
// that cannot be safely converted to prefix_between on a multi-part composite index.
// For prefix-encoded keys, serial(v, pk) is always > serial(v) because
// the full key is longer. Therefore:
// - >= is safe: serial(v, pk) >= serial(bound) correctly matches v >= bound
// - < is safe: serial(v, pk) < serial(bound) correctly matches v < bound
// - <= is UNSAFE: serial(v, pk) <= serial(v) is always FALSE (under-fetches)
// - > is UNSAFE: serial(v, pk) > serial(v) is always TRUE (over-fetches)
//
// Only recurses into OR arms; AND arms are safe because checkIndexFilter pre-rejects
// any AND-nested expression that isn't a simple comparison on an indexed column.
// in_range is intentionally not flagged: it is only emitted for numParts == 1 (single-
// part unique indexes) where prefix correctness is not a concern.
func hasUnsafeRangeOp(fn *plan.Function) bool {
if fn == nil {
return false
Expand All @@ -1540,6 +1554,14 @@ func hasUnsafeRangeOp(fn *plan.Function) bool {
return op == "<=" || op == ">"
}

func isSingleRangeOp(fn *plan.Function) bool {
switch fn.Func.ObjName {
case ">=", ">", "<=", "<":
return true
}
return false
}

func canonicalRangeOp(fn *plan.Function) string {
if len(fn.Args) < 2 {
return fn.Func.ObjName
Expand Down
4 changes: 2 additions & 2 deletions pkg/vm/engine/readutil/pk_filter_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,11 +305,11 @@ func ConstructBasePKFilter(

case "in_range":
ok, oid, vals := evalValue(expr, exprImpl, tblDef, false, tblDef.Pkey.PkeyColName)
if !ok || len(vals) < 3 {
if !ok || len(vals) < 3 || len(vals[2]) < 1 {
return
}
filter.Valid = true
flag := types.DecodeInt64(vals[2])
flag := vals[2][0]
switch flag {
case 0:
filter.Op = function.BETWEEN
Expand Down
Loading