Skip to content

Commit

Permalink
more refine
Browse files Browse the repository at this point in the history
Signed-off-by: xufei <xufeixw@mail.ustc.edu.cn>
  • Loading branch information
windtalker committed May 11, 2024
1 parent b5013e2 commit 8e46f8b
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 127 deletions.
62 changes: 62 additions & 0 deletions pkg/executor/join/hash_join_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,68 @@ func wait4BuildSide(isBuildEmpty isBuildSideEmpty, canSkipIfBuildEmpty bool, has
return false, nil
}

// fetchProbeSideChunks get chunks from fetches chunks from the big table in a background goroutine
// and sends the chunks to multiple channels which will be read by multiple join workers.
func (fetcher *probeSideTupleFetcherBase) fetchProbeSideChunks(ctx context.Context, maxChunkSize int, isBuildEmpty isBuildSideEmpty, canSkipIfBuildEmpty, needScanAfterProbeDone, shouldLimitProbeFetchSize bool, hashJoinCtx *hashJoinCtxBase) {
hasWaitedForBuild := false
for {
if hashJoinCtx.finished.Load() {
return
}

var probeSideResource *probeChkResource
var ok bool
select {
case <-hashJoinCtx.closeCh:
return
case probeSideResource, ok = <-fetcher.probeChkResourceCh:
if !ok {
return
}
}
probeSideResult := probeSideResource.chk
if shouldLimitProbeFetchSize {
required := int(atomic.LoadInt64(&fetcher.requiredRows))
probeSideResult.SetRequiredRows(required, maxChunkSize)
}
err := exec.Next(ctx, fetcher.ProbeSideExec, probeSideResult)
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
hashJoinCtx.joinResultCh <- &hashjoinWorkerResult{
err: err,
}
return
}
if !hasWaitedForBuild {
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
probeSideResult.Reset()
}
})
skipProbe, buildErr := wait4BuildSide(isBuildEmpty, canSkipIfBuildEmpty, hashJoinCtx)
if buildErr != nil {
hashJoinCtx.joinResultCh <- &hashjoinWorkerResult{
err: buildErr,
}
return
} else if skipProbe {
// stop probe
if !needScanAfterProbeDone {
hashJoinCtx.finished.Store(true)
}
return
}
hasWaitedForBuild = true
}

if probeSideResult.NumRows() == 0 {
return
}

probeSideResource.dest <- probeSideResult
}
}

type probeWorkerBase struct {
WorkerID uint
probeChkResourceCh chan *probeChkResource
Expand Down
65 changes: 4 additions & 61 deletions pkg/executor/join/hash_join_v1.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,66 +193,6 @@ func (e *HashJoinV1Exec) Open(ctx context.Context) error {
return nil
}

// fetchProbeSideChunks get chunks from fetches chunks from the big table in a background goroutine
// and sends the chunks to multiple channels which will be read by multiple join workers.
func (fetcher *ProbeSideTupleFetcherV1) fetchProbeSideChunks(ctx context.Context, maxChunkSize int) {
hasWaitedForBuild := false
for {
if fetcher.finished.Load() {
return
}

var probeSideResource *probeChkResource
var ok bool
select {
case <-fetcher.closeCh:
return
case probeSideResource, ok = <-fetcher.probeChkResourceCh:
if !ok {
return
}
}
probeSideResult := probeSideResource.chk
if fetcher.IsOuterJoin {
required := int(atomic.LoadInt64(&fetcher.requiredRows))
probeSideResult.SetRequiredRows(required, maxChunkSize)
}
err := exec.Next(ctx, fetcher.ProbeSideExec, probeSideResult)
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
fetcher.joinResultCh <- &hashjoinWorkerResult{
err: err,
}
return
}
if !hasWaitedForBuild {
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
probeSideResult.Reset()
}
})
emptyBuild, buildErr := wait4BuildSide(func() bool {
return fetcher.RowContainer.Len() == uint64(0)
}, fetcher.JoinType == plannercore.InnerJoin || fetcher.JoinType == plannercore.SemiJoin, &fetcher.hashJoinCtxBase)
if buildErr != nil {
fetcher.joinResultCh <- &hashjoinWorkerResult{
err: buildErr,
}
return
} else if emptyBuild {
return
}
hasWaitedForBuild = true
}

if probeSideResult.NumRows() == 0 {
return
}

probeSideResource.dest <- probeSideResult
}
}

func (e *HashJoinV1Exec) initializeForProbe() {
// e.joinResultCh is for transmitting the join result chunks to the main
// thread.
Expand Down Expand Up @@ -291,7 +231,10 @@ func (e *HashJoinV1Exec) fetchAndProbeHashTable(ctx context.Context) {
e.initializeForProbe()
e.workerWg.RunWithRecover(func() {
defer trace.StartRegion(ctx, "HashJoinProbeSideFetcher").End()
e.ProbeSideTupleFetcher.fetchProbeSideChunks(ctx, e.MaxChunkSize())
e.ProbeSideTupleFetcher.fetchProbeSideChunks(ctx, e.MaxChunkSize(), func() bool {
return e.ProbeSideTupleFetcher.RowContainer.Len() == uint64(0)
}, e.ProbeSideTupleFetcher.JoinType == plannercore.InnerJoin || e.ProbeSideTupleFetcher.JoinType == plannercore.SemiJoin,
false, e.ProbeSideTupleFetcher.IsOuterJoin, &e.ProbeSideTupleFetcher.hashJoinCtxBase)
}, e.ProbeSideTupleFetcher.handleProbeSideFetcherPanic)

for i := uint(0); i < e.Concurrency; i++ {
Expand Down
70 changes: 4 additions & 66 deletions pkg/executor/join/hash_join_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,71 +293,6 @@ func (fetcher *ProbeSideTupleFetcherV2) shouldLimitProbeFetchSize() bool {
return false
}

// fetchProbeSideChunks get chunks from fetches chunks from the big table in a background goroutine
// and sends the chunks to multiple channels which will be read by multiple join workers.
func (fetcher *ProbeSideTupleFetcherV2) fetchProbeSideChunks(ctx context.Context, maxChunkSize int) {
hasWaitedForBuild := false
for {
if fetcher.finished.Load() {
return
}

var probeSideResource *probeChkResource
var ok bool
select {
case <-fetcher.closeCh:
return
case probeSideResource, ok = <-fetcher.probeChkResourceCh:
if !ok {
return
}
}
probeSideResult := probeSideResource.chk
if fetcher.shouldLimitProbeFetchSize() {
required := int(atomic.LoadInt64(&fetcher.requiredRows))
probeSideResult.SetRequiredRows(required, maxChunkSize)
}
err := exec.Next(ctx, fetcher.ProbeSideExec, probeSideResult)
failpoint.Inject("ConsumeRandomPanic", nil)
if err != nil {
fetcher.joinResultCh <- &hashjoinWorkerResult{
err: err,
}
return
}
if !hasWaitedForBuild {
failpoint.Inject("issue30289", func(val failpoint.Value) {
if val.(bool) {
probeSideResult.Reset()
}
})

skipProbe, buildErr := wait4BuildSide(func() bool {
return fetcher.hashTableContext.hashTable.isHashTableEmpty()
}, fetcher.canSkipProbeIfHashTableIsEmpty, &fetcher.hashJoinCtxBase)
if buildErr != nil {
fetcher.joinResultCh <- &hashjoinWorkerResult{
err: buildErr,
}
return
} else if skipProbe {
// stop probe
if !fetcher.HashJoinCtxV2.needScanRowTableAfterProbeDone {
fetcher.finished.Store(true)
}
return
}
hasWaitedForBuild = true
}

if probeSideResult.NumRows() == 0 {
return
}

probeSideResource.dest <- probeSideResult
}
}

func (w *BuildWorkerV2) splitPartitionAndAppendToRowTable(typeCtx types.Context, srcChkCh chan *chunk.Chunk) (err error) {
cost := int64(0)
defer func() {
Expand Down Expand Up @@ -441,7 +376,10 @@ func (e *HashJoinV2Exec) fetchAndProbeHashTable(ctx context.Context) {
e.initializeForProbe()
e.workerWg.RunWithRecover(func() {
defer trace.StartRegion(ctx, "HashJoinProbeSideFetcher").End()
e.ProbeSideTupleFetcher.fetchProbeSideChunks(ctx, e.MaxChunkSize())
e.ProbeSideTupleFetcher.fetchProbeSideChunks(ctx, e.MaxChunkSize(), func() bool {
return e.ProbeSideTupleFetcher.hashTableContext.hashTable.isHashTableEmpty()
}, e.ProbeSideTupleFetcher.canSkipProbeIfHashTableIsEmpty, e.ProbeSideTupleFetcher.needScanRowTableAfterProbeDone,
e.ProbeSideTupleFetcher.needScanRowTableAfterProbeDone, &e.ProbeSideTupleFetcher.hashJoinCtxBase)
}, e.ProbeSideTupleFetcher.handleProbeSideFetcherPanic)

for i := uint(0); i < e.Concurrency; i++ {
Expand Down

0 comments on commit 8e46f8b

Please sign in to comment.