New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support parallel partition for HashJoinExec #7911

Merged
merged 27 commits into from Jan 3, 2019

Conversation

Projects
None yet
4 participants
@XuHuaiyu
Copy link
Contributor

XuHuaiyu commented Oct 16, 2018

What problem does this PR solve?

Support parallel partition phase for Radix Hash Join.

What is changed and how it works?

Add parallel partition phase for the inner relation in HashJoinExec.
We pre-calculate and pre-allocate the memory for every row in HashJoinExec.innerResult, and parallel copy the rows to the specified location.

To accomplish the above work, we introduced a struct type partition = *chunk.List to store the sub-relations of inner relation or outer relation after partition phase, and we also introduced a struct ptr4Partition to store the address of every row in HashJoinExec.innerResults.

The bench result for parallel partition is listed in this comment.

Check List

Tests

  • Unit test

Code changes

  • Has exported function/method change

Side effects

  • Increased code complexity

Related changes

none


This change is Reviewable

@XuHuaiyu XuHuaiyu added the status/WIP label Oct 16, 2018

@XuHuaiyu XuHuaiyu force-pushed the XuHuaiyu:partition branch 3 times, most recently from 8e76882 to 0da5fa6 Oct 18, 2018

@XuHuaiyu XuHuaiyu changed the title [WIP] executor: support parallel partition for HashJoinExec executor: support parallel partition for HashJoinExec Oct 19, 2018

@XuHuaiyu XuHuaiyu force-pushed the XuHuaiyu:partition branch from a9871c2 to 9b3b289 Oct 22, 2018

XuHuaiyu added some commits Oct 23, 2018

@XuHuaiyu

This comment has been minimized.

Copy link
Contributor

XuHuaiyu commented Oct 23, 2018

Show resolved Hide resolved executor/join.go Outdated
Show resolved Hide resolved executor/join.go
Show resolved Hide resolved executor/join.go Outdated
Show resolved Hide resolved executor/join.go Outdated
Show resolved Hide resolved executor/join.go
@XuHuaiyu

This comment has been minimized.

Copy link
Contributor

XuHuaiyu commented Nov 12, 2018

PTAL @zz-jason

Show resolved Hide resolved executor/pkg_test.go Outdated
Show resolved Hide resolved executor/builder.go Outdated
Show resolved Hide resolved executor/join.go
Show resolved Hide resolved executor/join.go Outdated
Show resolved Hide resolved executor/pkg_test.go
Show resolved Hide resolved executor/pkg_test.go Outdated
@winoros

This comment has been minimized.

Copy link
Member

winoros commented Nov 15, 2018

What's the benchmark result?
Also please fill the description of this pr.

@XuHuaiyu

This comment has been minimized.

Copy link
Contributor

XuHuaiyu commented Nov 16, 2018

The bench test is used to benchmark the time consumption of the partition phase.

// the inner relation.
// TODO: we need to evaluate the skewness for the partitions size, if the
// skewness exceeds a threshold, we do not use partition phase.
func (e *HashJoinExec) preAlloc4InnerParts() (err error) {

This comment has been minimized.

@eurekaka

eurekaka Nov 22, 2018

Contributor

Seems this function should be the most time-consuming/bottleneck part of the partition phase, can we have a result about the time percentage of preAlloc4Innerparts and doInnerPartition in the whole partition process respectively?

This comment has been minimized.

@XuHuaiyu

XuHuaiyu Dec 13, 2018

Contributor

The time-consumption is about 1:2 ~ 1:1.5 for (preAlloc4InnerParts : doInnerPartition).

go tool pprof cpu.prof
Type: cpu
Time: Dec 13, 2018 at 7:58pm (CST)
Duration: 7.40s, Total samples = 12.29s (166.07%)
Entering interactive mode (type "help" for commands, "o" for options)
(pprof) list partitionInnerRows
Total: 12.29s
ROUTINE ======================== github.com/pingcap/tidb/executor.(*HashJoinExec).partitionInnerRows in /Users/xuhuaiyu/Development/GOPATH/src/github.com/pingcap/tidb/executor/join.go
         0      1.51s (flat, cum) 12.29% of Total
         .          .    306:}
         .          .    307:
         .          .    308:// partitionInnerRows re-order e.innerResults into sub-relations.
         .          .    309:func (e *HashJoinExec) partitionInnerRows() error {
         .          .    310:   t1 := time.Now()
         .      1.50s    311:   if err := e.preAlloc4InnerParts(); err != nil {
         .          .    312:           return err
         .          .    313:   }
         .       10ms    314:   logrus.Warning("PreAlloc:", time.Since(t1))
         .          .    315:   wg := &sync.WaitGroup{}
         .          .    316:   defer wg.Wait()
         .          .    317:   for i := 0; i < e.partConcurrency; i++ {
         .          .    318:           wg.Add(1)
         .          .    319:           workerID := i
ROUTINE ======================== github.com/pingcap/tidb/executor.(*HashJoinExec).partitionInnerRows.func1 in /Users/xuhuaiyu/Development/GOPATH/src/github.com/pingcap/tidb/executor/join.go
         0      2.33s (flat, cum) 18.96% of Total
         .          .    316:   defer wg.Wait()
         .          .    317:   for i := 0; i < e.partConcurrency; i++ {
         .          .    318:           wg.Add(1)
         .          .    319:           workerID := i
         .          .    320:           go util.WithRecovery(func() {
         .      2.33s    321:                   e.doInnerPartition(workerID, wg)
         .          .    322:           }, nil)
         .          .    323:   }
         .          .    324:   return nil
         .          .    325:}
         .          .    326:

This comment has been minimized.

@eurekaka

eurekaka Dec 18, 2018

Contributor

What is the number of e.partConcurrency used in this benchmark?

This comment has been minimized.

@eurekaka

eurekaka Dec 24, 2018

Contributor

@XuHuaiyu I am still curious about the number of e.partConcurrency used here.

Show resolved Hide resolved executor/join.go Outdated
@XuHuaiyu

This comment has been minimized.

Copy link
Contributor

XuHuaiyu commented Dec 13, 2018

@pingcap pingcap deleted a comment from ciscoxll Dec 21, 2018

Show resolved Hide resolved executor/pkg_test.go Outdated
Show resolved Hide resolved executor/pkg_test.go
Show resolved Hide resolved executor/pkg_test.go Outdated
Show resolved Hide resolved util/chunk/chunk.go
Show resolved Hide resolved executor/join.go Outdated

XuHuaiyu added some commits Dec 28, 2018

@eurekaka
Copy link
Contributor

eurekaka left a comment

LGTM

Show resolved Hide resolved executor/join.go
Show resolved Hide resolved executor/join.go Outdated
Show resolved Hide resolved executor/join.go
Show resolved Hide resolved executor/join.go
Show resolved Hide resolved executor/join.go Outdated
Show resolved Hide resolved executor/join.go Outdated
@@ -347,6 +351,9 @@ func (e *HashJoinExec) doInnerPartition(workerID int, wg *sync.WaitGroup) {
chk := e.innerResult.GetChunk(chkIdx)
for srcRowIdx, partPtr := range e.innerRowPrts[chkIdx] {
partIdx, destRowIdx := partPtr.partitionIdx, partPtr.rowIdx
if partIdx == math.MaxUint32 && destRowIdx == math.MaxUint32 {

This comment has been minimized.

@zz-jason

zz-jason Jan 2, 2019

Member

add a comment for this special case.

@XuHuaiyu XuHuaiyu force-pushed the XuHuaiyu:partition branch from b830650 to 7619210 Jan 3, 2019

@@ -105,6 +106,9 @@ type partRowPtr struct {
rowIdx uint32
}

// PartPtr4NullKey indicates a partition pointer which points to a row with null-join-key.
var PartPtr4NullKey = partRowPtr{math.MaxUint32, math.MaxUint32}

This comment has been minimized.

@zz-jason

zz-jason Jan 3, 2019

Member

no no need to export this variable?

wg := &sync.WaitGroup{}
defer wg.Wait()
wg.Add(int(e.concurrency))
e.partWorkerWaitGroup = sync.WaitGroup{}

This comment has been minimized.

@zz-jason

zz-jason Jan 3, 2019

Member

I think we don't need to make it to be a member field of HashJoinExec. Why not keep the old way?

go util.WithRecovery(func() {
	defer wg.Done()
	e.doInnerPartition(workerID)
}, e.finishPartitionInnerRows)

This comment has been minimized.

@XuHuaiyu

XuHuaiyu Jan 3, 2019

Contributor

wg.Done is moved to finishPartitionInnerRows, so I make it as a member variable

return nil
}

func (e *HashJoinExec) finishPartitionInnerRows(r interface{}) {

This comment has been minimized.

@zz-jason

zz-jason Jan 3, 2019

Member

how about s/finishPartitionInnerRows/handlePartitionPanic/?

@XuHuaiyu XuHuaiyu requested a review from zz-jason Jan 3, 2019

@XuHuaiyu

This comment has been minimized.

Copy link
Contributor

XuHuaiyu commented Jan 3, 2019

PTAL @zz-jason

@zz-jason
Copy link
Member

zz-jason left a comment

LGTM

@XuHuaiyu

This comment has been minimized.

Copy link
Contributor

XuHuaiyu commented Jan 3, 2019

@zz-jason

This comment has been minimized.

Copy link
Member

zz-jason commented Jan 3, 2019

/run-all-tests

@zz-jason zz-jason added status/LGT2 and removed status/LGT1 labels Jan 3, 2019

@winoros winoros merged commit 18d75a3 into pingcap:master Jan 3, 2019

12 checks passed

ci/circleci Your tests passed on CircleCI!
Details
continuous-integration/travis-ci/pr The Travis CI build passed
Details
idc-jenkins-ci-tidb/build Jenkins job succeeded.
Details
idc-jenkins-ci-tidb/common-test job succeeded
Details
idc-jenkins-ci-tidb/integration-common-test Jenkins job succeeded.
Details
idc-jenkins-ci-tidb/integration-compatibility-test Jenkins job succeeded.
Details
idc-jenkins-ci-tidb/integration-ddl-test Jenkins job succeeded.
Details
idc-jenkins-ci-tidb/mybatis-test job succeeded
Details
idc-jenkins-ci-tidb/sqllogic-test-1 Jenkins job succeeded.
Details
idc-jenkins-ci-tidb/sqllogic-test-2 Jenkins job succeeded.
Details
idc-jenkins-ci-tidb/unit-test Jenkins job succeeded.
Details
license/cla Contributor License Agreement is signed.
Details
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment