Skip to content

Commit

Permalink
improve filter cu (#15003)
Browse files Browse the repository at this point in the history
improve cu in some cases

Approved by: @m-schen, @sukki37
  • Loading branch information
Morranto committed Mar 19, 2024
1 parent 810d9fc commit bd6febf
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 26 deletions.
25 changes: 14 additions & 11 deletions pkg/sql/colexec/fuzzyfilter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ package fuzzyfilter

import (
"bytes"
"time"

"github.com/matrixorigin/matrixone/pkg/common/bloomfilter"
"github.com/matrixorigin/matrixone/pkg/common/moerr"
Expand Down Expand Up @@ -335,13 +336,7 @@ func (arg *Argument) handleRuntimeFilter(proc *process.Process) error {
}

if runtimeFilter != nil {
select {
case <-proc.Ctx.Done():
ctr.state = End

case arg.RuntimeFilterSenders[0].Chan <- runtimeFilter:
ctr.state = Probe
}
sendFilter(arg, proc, runtimeFilter)
return nil
}

Expand All @@ -362,14 +357,22 @@ func (arg *Argument) handleRuntimeFilter(proc *process.Process) error {
Data: data,
}

sendFilter(arg, proc, runtimeFilter)

return nil

}

func sendFilter(arg *Argument, proc *process.Process, runtimeFilter *pipeline.RuntimeFilter) {
anal := proc.GetAnalyze(arg.info.Idx, arg.info.ParallelIdx, arg.info.ParallelMajor)
start := time.Now()
select {
case <-proc.Ctx.Done():
ctr.state = End
arg.state = End

case arg.RuntimeFilterSenders[0].Chan <- runtimeFilter:
ctr.state = Probe
arg.state = Probe
}

return nil
anal.WaitStop(start)

}
32 changes: 17 additions & 15 deletions pkg/sql/colexec/hashbuild/build.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package hashbuild

import (
"bytes"
"time"

"github.com/matrixorigin/matrixone/pkg/common/hashmap"
"github.com/matrixorigin/matrixone/pkg/container/batch"
Expand Down Expand Up @@ -370,14 +371,7 @@ func (ctr *container) handleRuntimeFilter(ap *Argument, proc *process.Process) e
}

if runtimeFilter != nil {
select {
case <-proc.Ctx.Done():
ctr.state = End

case ap.RuntimeFilterSenders[0].Chan <- runtimeFilter:
ctr.state = Eval
}

sendFilter(ap, proc, runtimeFilter)
return nil
}

Expand Down Expand Up @@ -438,13 +432,7 @@ func (ctr *container) handleRuntimeFilter(ap *Argument, proc *process.Process) e
}
}

select {
case <-proc.Ctx.Done():
ctr.state = End

case ap.RuntimeFilterSenders[0].Chan <- runtimeFilter:
ctr.state = Eval
}
sendFilter(ap, proc, runtimeFilter)

return nil
}
Expand All @@ -461,3 +449,17 @@ func (ctr *container) evalJoinCondition(bat *batch.Batch, proc *process.Process)
}
return nil
}

func sendFilter(ap *Argument, proc *process.Process, runtimeFilter *pipeline.RuntimeFilter) {
anal := proc.GetAnalyze(ap.Info.Idx, ap.Info.ParallelIdx, ap.Info.ParallelMajor)
sendRuntimeFilterStart := time.Now()

select {
case <-proc.Ctx.Done():
ap.ctr.state = End

case ap.RuntimeFilterSenders[0].Chan <- runtimeFilter:
ap.ctr.state = Eval
}
anal.WaitStop(sendRuntimeFilterStart)
}
10 changes: 10 additions & 0 deletions pkg/sql/colexec/rightsemi/join.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package rightsemi

import (
"bytes"
"time"

"github.com/matrixorigin/matrixone/pkg/common/bitmap"
"github.com/matrixorigin/matrixone/pkg/common/hashmap"
Expand Down Expand Up @@ -159,11 +160,18 @@ func (ctr *container) sendLast(ap *Argument, proc *process.Process, analyze proc

if ap.NumCPU > 1 {
if !ap.IsMerger {

sendStart := time.Now()
ap.Channel <- ctr.matched
analyze.WaitStop(sendStart)

return true, nil
} else {
cnt := 1
// The original code didn't handle the context correctly and would cause the system to HUNG!

mergeStart := time.Now()

for completed := true; completed; {
select {
case <-proc.Ctx.Done():
Expand All @@ -177,6 +185,8 @@ func (ctr *container) sendLast(ap *Argument, proc *process.Process, analyze proc
}
}
}

analyze.WaitStop(mergeStart)
}
}

Expand Down

0 comments on commit bd6febf

Please sign in to comment.