Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor,planner/core,util/plancodec: extend executor.ShuffleExec and planner.core.PhysicalShuffle to support multiple data sources #20942

Merged
merged 17 commits into from
Nov 24, 2020
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions executor/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/planner/core"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/planner/property"
"github.com/pingcap/tidb/planner/util"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -497,11 +498,11 @@ func buildWindowExecutor(ctx sessionctx.Context, windowFunc string, funcs int, f
}

plan = core.PhysicalShuffle{
Concurrency: concurrency,
Tail: tail,
DataSource: src,
SplitterType: core.PartitionHashSplitterType,
HashByItems: byItems,
Concurrency: concurrency,
Tails: []plannercore.PhysicalPlan{tail},
DataSources: []plannercore.PhysicalPlan{src},
SplitterType: core.PartitionHashSplitterType,
HashByItemArrays: [][]expression.Expression{byItems},
}.Init(ctx, nil, 0)
plan.SetChildren(win)
} else {
Expand Down
59 changes: 38 additions & 21 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,8 +216,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
return b.buildWindow(v)
case *plannercore.PhysicalShuffle:
return b.buildShuffle(v)
case *plannercore.PhysicalShuffleDataSourceStub:
return b.buildShuffleDataSourceStub(v)
case *plannercore.PhysicalShuffleReceiverStub:
return b.buildShuffleReceiverStub(v)
case *plannercore.SQLBindPlan:
return b.buildSQLBindExec(v)
case *plannercore.SplitRegion:
Expand Down Expand Up @@ -3670,40 +3670,57 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec

func (b *executorBuilder) buildShuffle(v *plannercore.PhysicalShuffle) *ShuffleExec {
base := newBaseExecutor(b.ctx, v.Schema(), v.ID())
shuffle := &ShuffleExec{baseExecutor: base,
concurrency: v.Concurrency,
shuffle := &ShuffleExec{
baseExecutor: base,
concurrency: v.Concurrency,
}

switch v.SplitterType {
case plannercore.PartitionHashSplitterType:
shuffle.splitter = &partitionHashSplitter{
byItems: v.HashByItems,
numWorkers: shuffle.concurrency,
splitters := make([]partitionSplitter, len(v.HashByItemArrays))
for i, hashByItemArray := range v.HashByItemArrays {
hashSplitter := &partitionHashSplitter{
byItems: hashByItemArray,
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
numWorkers: shuffle.concurrency,
}
copy(hashSplitter.byItems, hashByItemArray)
splitters[i] = hashSplitter
}
shuffle.splitters = splitters
default:
panic("Not implemented. Should not reach here.")
}

shuffle.dataSource = b.build(v.DataSource)
if b.err != nil {
return nil
shuffle.dataSources = make([]Executor, len(v.DataSources))
for i, dataSource := range v.DataSources {
shuffle.dataSources[i] = b.build(dataSource)
if b.err != nil {
return nil
}
}

// head & tail of physical plans' chain within "partition".
var head, tail = v.Children()[0], v.Tail

head := v.Children()[0]
shuffle.workers = make([]*shuffleWorker, shuffle.concurrency)
for i := range shuffle.workers {
receivers := make([]*shuffleReceiver, len(v.DataSources))
for j, dataSource := range v.DataSources {
receivers[j] = &shuffleReceiver{
baseExecutor: newBaseExecutor(b.ctx, dataSource.Schema(), dataSource.ID()),
}
}

w := &shuffleWorker{
baseExecutor: newBaseExecutor(b.ctx, v.DataSource.Schema(), v.DataSource.ID()),
receivers: receivers,
}

stub := plannercore.PhysicalShuffleDataSourceStub{
Worker: (unsafe.Pointer)(w),
}.Init(b.ctx, v.DataSource.Stats(), v.DataSource.SelectBlockOffset(), nil)
stub.SetSchema(v.DataSource.Schema())
for j, dataSource := range v.DataSources {
stub := plannercore.PhysicalShuffleReceiverStub{
Receiver: (unsafe.Pointer)(receivers[j]),
}.Init(b.ctx, dataSource.Stats(), dataSource.SelectBlockOffset(), nil)
stub.SetSchema(dataSource.Schema())
v.Tails[j].SetChildren(stub)
}

tail.SetChildren(stub)
w.childExec = b.build(head)
if b.err != nil {
return nil
Expand All @@ -3715,8 +3732,8 @@ func (b *executorBuilder) buildShuffle(v *plannercore.PhysicalShuffle) *ShuffleE
return shuffle
}

func (b *executorBuilder) buildShuffleDataSourceStub(v *plannercore.PhysicalShuffleDataSourceStub) *shuffleWorker {
return (*shuffleWorker)(v.Worker)
func (b *executorBuilder) buildShuffleReceiverStub(v *plannercore.PhysicalShuffleReceiverStub) *shuffleReceiver {
return (*shuffleReceiver)(v.Receiver)
}

func (b *executorBuilder) buildSQLBindExec(v *plannercore.SQLBindPlan) Executor {
Expand Down
117 changes: 74 additions & 43 deletions executor/shuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,11 @@ import (
)

// ShuffleExec is the executor to run other executors in a parallel manner.
// 1. It fetches chunks from `DataSource`.
// 2. It splits tuples from `DataSource` into N partitions (Only "split by hash" is implemented so far).
// 3. It invokes N workers in parallel, assign each partition as input to each worker and execute child executors.
// 4. It collects outputs from each worker, then sends outputs to its parent.
// 1. It fetches chunks from M `DataSources` (value of M depends on the actual executor, e.g. M = 1 for WindowExec, M = 2 for MergeJoinExec).
// 2. It splits tuples from each `DataSource` into N partitions (Only "split by hash" is implemented so far).
// 3. It invokes N workers in parallel, each one has M `receiver` to receive partitions from `DataSources`
// 4. It assigns partitions received as input to each worker and executes child executors.
// 5. It collects outputs from each worker, then sends outputs to its parent.
//
// +-------------+
// +-------| Main Thread |
Expand Down Expand Up @@ -80,8 +81,9 @@ type ShuffleExec struct {
prepared bool
executed bool

splitter partitionSplitter
dataSource Executor
// each dataSource has a corresponding spliter
splitters []partitionSplitter
dataSources []Executor

finishCh chan struct{}
outputCh chan *shuffleOutput
Expand All @@ -95,8 +97,11 @@ type shuffleOutput struct {

// Open implements the Executor Open interface.
func (e *ShuffleExec) Open(ctx context.Context) error {
if err := e.dataSource.Open(ctx); err != nil {
return err
for _, s := range e.dataSources {
if err := s.Open(ctx); err != nil {
return err
}

}
if err := e.baseExecutor.Open(ctx); err != nil {
return err
Expand All @@ -109,16 +114,21 @@ func (e *ShuffleExec) Open(ctx context.Context) error {
for _, w := range e.workers {
w.finishCh = e.finishCh

w.inputCh = make(chan *chunk.Chunk, 1)
w.inputHolderCh = make(chan *chunk.Chunk, 1)
for _, r := range w.receivers {
r.inputCh = make(chan *chunk.Chunk, 1)
r.inputHolderCh = make(chan *chunk.Chunk, 1)
}

w.outputCh = e.outputCh
w.outputHolderCh = make(chan *chunk.Chunk, 1)

if err := w.childExec.Open(ctx); err != nil {
return err
}

w.inputHolderCh <- newFirstChunk(e.dataSource)
for i, r := range w.receivers {
r.inputHolderCh <- newFirstChunk(e.dataSources[i])
}
w.outputHolderCh <- newFirstChunk(e)
}

Expand All @@ -129,15 +139,19 @@ func (e *ShuffleExec) Open(ctx context.Context) error {
func (e *ShuffleExec) Close() error {
if !e.prepared {
for _, w := range e.workers {
close(w.inputHolderCh)
close(w.inputCh)
for _, r := range w.receivers {
close(r.inputHolderCh)
close(r.inputCh)
}
close(w.outputHolderCh)
}
close(e.outputCh)
}
close(e.finishCh)
for _, w := range e.workers {
for range w.inputCh {
for _, r := range w.receivers {
for range r.inputCh {
}
}
}
for range e.outputCh { // workers exit before `e.outputCh` is closed.
Expand All @@ -150,16 +164,27 @@ func (e *ShuffleExec) Close() error {
e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
}

err := e.dataSource.Close()
err1 := e.baseExecutor.Close()
if err != nil {
return errors.Trace(err)
// close dataSources
errArr := make([]error, len(e.dataSources))
for i, dataSource := range e.dataSources {
errArr[i] = dataSource.Close()
}
return errors.Trace(err1)
// check close error
for _, err := range errArr {
if err != nil {
return errors.Trace(err)
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
}
}
// close baseExecutor
err := e.baseExecutor.Close()
return errors.Trace(err)
}

func (e *ShuffleExec) prepare4ParallelExec(ctx context.Context) {
go e.fetchDataAndSplit(ctx)
// create a goroutine for each dataSource to fetch and split data
for i := range e.dataSources {
go e.fetchDataAndSplit(ctx, i)
}

waitGroup := &sync.WaitGroup{}
waitGroup.Add(len(e.workers))
Expand Down Expand Up @@ -213,25 +238,25 @@ func recoveryShuffleExec(output chan *shuffleOutput, r interface{}) {
logutil.BgLogger().Error("shuffle panicked", zap.Error(err), zap.Stack("stack"))
}

func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) {
func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context, dataSourceIndex int) {
var (
err error
workerIndices []int
)
results := make([]*chunk.Chunk, len(e.workers))
chk := newFirstChunk(e.dataSource)
chk := newFirstChunk(e.dataSources[dataSourceIndex])

defer func() {
if r := recover(); r != nil {
recoveryShuffleExec(e.outputCh, r)
}
for _, w := range e.workers {
close(w.inputCh)
close(w.receivers[dataSourceIndex].inputCh)
}
}()

for {
err = Next(ctx, e.dataSource, chk)
err = Next(ctx, e.dataSources[dataSourceIndex], chk)
if err != nil {
e.outputCh <- &shuffleOutput{err: err}
return
Expand All @@ -240,7 +265,7 @@ func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) {
break
}

workerIndices, err = e.splitter.split(e.ctx, chk, workerIndices)
workerIndices, err = e.splitters[dataSourceIndex].split(e.ctx, chk, workerIndices)
if err != nil {
e.outputCh <- &shuffleOutput{err: err}
return
Expand All @@ -254,47 +279,40 @@ func (e *ShuffleExec) fetchDataAndSplit(ctx context.Context) {
select {
case <-e.finishCh:
return
case results[workerIdx] = <-w.inputHolderCh:
case results[workerIdx] = <-w.receivers[dataSourceIndex].inputHolderCh:
break
}
}
results[workerIdx].AppendRow(chk.GetRow(i))
if results[workerIdx].IsFull() {
w.inputCh <- results[workerIdx]
w.receivers[dataSourceIndex].inputCh <- results[workerIdx]
results[workerIdx] = nil
}
}
}
for i, w := range e.workers {
if results[i] != nil {
w.inputCh <- results[i]
w.receivers[dataSourceIndex].inputCh <- results[i]
results[i] = nil
}
}
}

var _ Executor = &shuffleWorker{}
var _ Executor = &shuffleReceiver{}

// shuffleWorker is the multi-thread worker executing child executors within "partition".
type shuffleWorker struct {
// shuffleReceiver receives chunk from dataSource through inputCh
type shuffleReceiver struct {
baseExecutor
childExec Executor

finishCh <-chan struct{}
executed bool

// Workers get inputs from dataFetcherThread by `inputCh`,
// and output results to main thread by `outputCh`.
// `inputHolderCh` and `outputHolderCh` are "Chunk Holder" channels of `inputCh` and `outputCh` respectively,
// which give the `*Chunk` back, to implement the data transport in a streaming manner.
inputCh chan *chunk.Chunk
inputHolderCh chan *chunk.Chunk
outputCh chan *shuffleOutput
outputHolderCh chan *chunk.Chunk
inputCh chan *chunk.Chunk
inputHolderCh chan *chunk.Chunk
}

// Open implements the Executor Open interface.
func (e *shuffleWorker) Open(ctx context.Context) error {
func (e *shuffleReceiver) Open(ctx context.Context) error {
if err := e.baseExecutor.Open(ctx); err != nil {
return err
}
Expand All @@ -303,13 +321,13 @@ func (e *shuffleWorker) Open(ctx context.Context) error {
}

// Close implements the Executor Close interface.
func (e *shuffleWorker) Close() error {
func (e *shuffleReceiver) Close() error {
return errors.Trace(e.baseExecutor.Close())
}

// Next implements the Executor Next interface.
// It is called by `Tail` executor within "shuffle", to fetch data from `DataSource` by `inputCh`.
func (e *shuffleWorker) Next(ctx context.Context, req *chunk.Chunk) error {
func (e *shuffleReceiver) Next(ctx context.Context, req *chunk.Chunk) error {
req.Reset()
if e.executed {
return nil
Expand All @@ -329,6 +347,19 @@ func (e *shuffleWorker) Next(ctx context.Context, req *chunk.Chunk) error {
}
}

// shuffleWorker is the multi-thread worker executing child executors within "partition".
type shuffleWorker struct {
childExec Executor

finishCh <-chan struct{}

// each receiver corresponse to a dataSource
receivers []*shuffleReceiver

outputCh chan *shuffleOutput
outputHolderCh chan *chunk.Chunk
}

func (e *shuffleWorker) run(ctx context.Context, waitGroup *sync.WaitGroup) {
defer func() {
if r := recover(); r != nil {
Expand Down
7 changes: 6 additions & 1 deletion planner/core/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -788,8 +788,13 @@ func (p *PhysicalWindow) ExplainInfo() string {

// ExplainInfo implements Plan interface.
func (p *PhysicalShuffle) ExplainInfo() string {
explainIds := make([]fmt.Stringer, len(p.DataSources))
for i := range p.DataSources {
explainIds[i] = p.DataSources[i].ExplainID()
}

buffer := bytes.NewBufferString("")
fmt.Fprintf(buffer, "execution info: concurrency:%v, data source:%v", p.Concurrency, p.DataSource.ExplainID())
fmt.Fprintf(buffer, "execution info: concurrency:%v, data sources:%v", p.Concurrency, explainIds)
return buffer.String()
}

Expand Down