Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: control Chunk size for Selection&Projection #10110

Merged
merged 2 commits into from Apr 11, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion executor/executor.go
Expand Up @@ -891,7 +891,7 @@ func (e *SelectionExec) Next(ctx context.Context, chk *chunk.Chunk) error {
if !e.selected[e.inputRow.Idx()] {
continue
}
if chk.NumRows() >= chk.Capacity() {
if chk.IsFull() {
return nil
}
chk.AppendRow(e.inputRow)
Expand Down
226 changes: 223 additions & 3 deletions executor/executor_required_rows_test.go
Expand Up @@ -18,9 +18,11 @@ import (
"fmt"
"math"
"math/rand"
"time"

"github.com/cznic/mathutil"
. "github.com/pingcap/check"
"github.com/pingcap/parser/ast"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
Expand All @@ -40,6 +42,15 @@ type requiredRowsDataSource struct {

expectedRowsRet []int
numNextCalled int

generator func(valType *types.FieldType) interface{}
}

func newRequiredRowsDataSourceWithGenerator(ctx sessionctx.Context, totalRows int, expectedRowsRet []int,
gen func(valType *types.FieldType) interface{}) *requiredRowsDataSource {
ds := newRequiredRowsDataSource(ctx, totalRows, expectedRowsRet)
ds.generator = gen
return ds
}

func newRequiredRowsDataSource(ctx sessionctx.Context, totalRows int, expectedRowsRet []int) *requiredRowsDataSource {
Expand All @@ -51,7 +62,7 @@ func newRequiredRowsDataSource(ctx sessionctx.Context, totalRows int, expectedRo
}
schema := expression.NewSchema(cols...)
baseExec := newBaseExecutor(ctx, schema, "")
return &requiredRowsDataSource{baseExec, totalRows, 0, ctx, expectedRowsRet, 0}
return &requiredRowsDataSource{baseExec, totalRows, 0, ctx, expectedRowsRet, 0, defaultGenerator}
}

func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.Chunk) error {
Expand Down Expand Up @@ -79,12 +90,12 @@ func (r *requiredRowsDataSource) Next(ctx context.Context, req *chunk.Chunk) err
func (r *requiredRowsDataSource) genOneRow() chunk.Row {
row := chunk.MutRowFromTypes(r.retTypes())
for i := range r.retTypes() {
row.SetValue(i, r.genValue(r.retTypes()[i]))
row.SetValue(i, r.generator(r.retTypes()[i]))
}
return row.ToRow()
}

func (r *requiredRowsDataSource) genValue(valType *types.FieldType) interface{} {
func defaultGenerator(valType *types.FieldType) interface{} {
switch valType.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
return int64(rand.Int())
Expand Down Expand Up @@ -167,6 +178,7 @@ func (s *testExecSuite) TestLimitRequiredRows(c *C) {
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}
Expand Down Expand Up @@ -248,6 +260,7 @@ func (s *testExecSuite) TestSortRequiredRows(c *C) {
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}
Expand Down Expand Up @@ -354,6 +367,7 @@ func (s *testExecSuite) TestTopNRequiredRows(c *C) {
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}
Expand All @@ -369,3 +383,209 @@ func buildTopNExec(ctx sessionctx.Context, offset, count int, byItems []*planner
limit: &plannercore.PhysicalLimit{Count: uint64(count), Offset: uint64(offset)},
}
}

func (s *testExecSuite) TestSelectionRequiredRows(c *C) {
gen01 := func() func(valType *types.FieldType) interface{} {
closureCount := 0
return func(valType *types.FieldType) interface{} {
switch valType.Tp {
case mysql.TypeLong, mysql.TypeLonglong:
ret := int64(closureCount % 2)
closureCount++
return ret
case mysql.TypeDouble:
return rand.Float64()
default:
panic("not implement")
}
}
}

maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
filtersOfCol1 int
requiredRows []int
expectedRows []int
expectedRowsDS []int
gen func(valType *types.FieldType) interface{}
}{
{
totalRows: 20,
requiredRows: []int{1, 2, 3, 4, 5, 20},
expectedRows: []int{1, 2, 3, 4, 5, 5},
expectedRowsDS: []int{20, 0},
},
{
totalRows: 20,
filtersOfCol1: 0,
requiredRows: []int{1, 3, 5, 7, 9},
expectedRows: []int{1, 3, 5, 1, 0},
expectedRowsDS: []int{20, 0, 0},
gen: gen01(),
},
{
totalRows: maxChunkSize + 20,
filtersOfCol1: 1,
requiredRows: []int{1, 3, 5, maxChunkSize},
expectedRows: []int{1, 3, 5, maxChunkSize/2 - 1 - 3 - 5 + 10},
expectedRowsDS: []int{maxChunkSize, 20, 0},
gen: gen01(),
},
}

for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
var filters []expression.Expression
var ds *requiredRowsDataSource
if testCase.gen == nil {
// ignore filters
ds = newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS)
} else {
ds = newRequiredRowsDataSourceWithGenerator(sctx, testCase.totalRows, testCase.expectedRowsDS, testCase.gen)
f, err := expression.NewFunction(
sctx, ast.EQ, types.NewFieldType(byte(types.ETInt)), ds.Schema().Columns[1], &expression.Constant{
Value: types.NewDatum(testCase.filtersOfCol1),
RetType: types.NewFieldType(mysql.TypeTiny),
})
c.Assert(err, IsNil)
filters = append(filters, f)
}
exec := buildSelectionExec(sctx, filters, ds)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}

func buildSelectionExec(ctx sessionctx.Context, filters []expression.Expression, src Executor) Executor {
return &SelectionExec{
baseExecutor: newBaseExecutor(ctx, src.Schema(), "", src),
filters: filters,
}
}

func (s *testExecSuite) TestProjectionUnparallelRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
requiredRows []int
expectedRows []int
expectedRowsDS []int
}{
{
totalRows: 20,
requiredRows: []int{1, 3, 5, 7, 9},
expectedRows: []int{1, 3, 5, 7, 4},
expectedRowsDS: []int{1, 3, 5, 7, 4},
},
{
totalRows: maxChunkSize + 10,
requiredRows: []int{1, 3, 5, 7, 9, maxChunkSize},
expectedRows: []int{1, 3, 5, 7, 9, maxChunkSize - 1 - 3 - 5 - 7 - 9 + 10},
expectedRowsDS: []int{1, 3, 5, 7, 9, maxChunkSize - 1 - 3 - 5 - 7 - 9 + 10},
},
{
totalRows: maxChunkSize*2 + 10,
requiredRows: []int{1, 7, 9, maxChunkSize, maxChunkSize + 10},
expectedRows: []int{1, 7, 9, maxChunkSize, maxChunkSize + 10 - 1 - 7 - 9},
expectedRowsDS: []int{1, 7, 9, maxChunkSize, maxChunkSize + 10 - 1 - 7 - 9},
},
}

for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS)
exprs := make([]expression.Expression, 0, len(ds.Schema().Columns))
if len(exprs) == 0 {
for _, col := range ds.Schema().Columns {
exprs = append(exprs, col)
}
}
exec := buildProjectionExec(sctx, exprs, ds, 0)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}

func (s *testExecSuite) TestProjectionParallelRequiredRows(c *C) {
maxChunkSize := defaultCtx().GetSessionVars().MaxChunkSize
testCases := []struct {
totalRows int
numWorkers int
requiredRows []int
expectedRows []int
expectedRowsDS []int
}{
{
totalRows: 20,
numWorkers: 1,
requiredRows: []int{1, 2, 3, 4, 5, 6, 1, 1},
expectedRows: []int{1, 1, 2, 3, 4, 5, 4, 0},
expectedRowsDS: []int{1, 1, 2, 3, 4, 5, 4, 0},
},
{
totalRows: maxChunkSize * 2,
numWorkers: 1,
requiredRows: []int{7, maxChunkSize, maxChunkSize, maxChunkSize},
expectedRows: []int{7, 7, maxChunkSize, maxChunkSize - 14},
expectedRowsDS: []int{7, 7, maxChunkSize, maxChunkSize - 14, 0},
},
{
totalRows: 20,
numWorkers: 2,
requiredRows: []int{1, 2, 3, 4, 5, 6, 1, 1, 1},
expectedRows: []int{1, 1, 1, 2, 3, 4, 5, 3, 0},
expectedRowsDS: []int{1, 1, 1, 2, 3, 4, 5, 3, 0},
},
}

for _, testCase := range testCases {
sctx := defaultCtx()
ctx := context.Background()
ds := newRequiredRowsDataSource(sctx, testCase.totalRows, testCase.expectedRowsDS)
exprs := make([]expression.Expression, 0, len(ds.Schema().Columns))
if len(exprs) == 0 {
for _, col := range ds.Schema().Columns {
exprs = append(exprs, col)
}
}
exec := buildProjectionExec(sctx, exprs, ds, testCase.numWorkers)
c.Assert(exec.Open(ctx), IsNil)
chk := exec.newFirstChunk()
for i := range testCase.requiredRows {
chk.SetRequiredRows(testCase.requiredRows[i], maxChunkSize)
c.Assert(exec.Next(ctx, chk), IsNil)
c.Assert(chk.NumRows(), Equals, testCase.expectedRows[i])

// wait projectionInputFetcher blocked on fetching data
// from child in the background.
time.Sleep(time.Millisecond * 5)
}
c.Assert(exec.Close(), IsNil)
c.Assert(ds.checkNumNextCalled(), IsNil)
}
}

func buildProjectionExec(ctx sessionctx.Context, exprs []expression.Expression, src Executor, numWorkers int) Executor {
return &ProjectionExec{
baseExecutor: newBaseExecutor(ctx, src.Schema(), "", src),
numWorkers: int64(numWorkers),
evaluatorSuit: expression.NewEvaluatorSuite(exprs, false),
}
}
16 changes: 16 additions & 0 deletions executor/projection.go
Expand Up @@ -16,6 +16,7 @@ package executor
import (
"context"
"fmt"
"sync/atomic"
"time"

"github.com/pingcap/errors"
Expand Down Expand Up @@ -63,6 +64,13 @@ type ProjectionExec struct {
numWorkers int64
workers []*projectionWorker
childResult *chunk.Chunk

// parentReqRows indicates how many rows the parent executor is
// requiring. It is set when parallelExecute() is called and used by the
// concurrent projectionInputFetcher.
//
// NOTE: It should be protected by atomic operations.
parentReqRows int64
}

// Open implements the Executor Open interface.
Expand All @@ -72,6 +80,7 @@ func (e *ProjectionExec) Open(ctx context.Context) error {
}

e.prepared = false
e.parentReqRows = int64(e.maxChunkSize)

// For now a Projection can not be executed vectorially only because it
// contains "SetVar" or "GetVar" functions, in this scenario this
Expand Down Expand Up @@ -162,6 +171,8 @@ func (e *ProjectionExec) isUnparallelExec() bool {
}

func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk) error {
// push requiredRows down
e.childResult.SetRequiredRows(chk.RequiredRows(), e.maxChunkSize)
err := e.children[0].Next(ctx, e.childResult)
if err != nil {
return errors.Trace(err)
Expand All @@ -171,6 +182,7 @@ func (e *ProjectionExec) unParallelExecute(ctx context.Context, chk *chunk.Chunk
}

func (e *ProjectionExec) parallelExecute(ctx context.Context, chk *chunk.Chunk) error {
atomic.StoreInt64(&e.parentReqRows, int64(chk.RequiredRows()))
if !e.prepared {
e.prepare(ctx)
e.prepared = true
Expand All @@ -197,6 +209,7 @@ func (e *ProjectionExec) prepare(ctx context.Context) {

// Initialize projectionInputFetcher.
e.fetcher = projectionInputFetcher{
proj: e,
child: e.children[0],
globalFinishCh: e.finishCh,
globalOutputCh: e.outputCh,
Expand Down Expand Up @@ -249,6 +262,7 @@ func (e *ProjectionExec) Close() error {
}

type projectionInputFetcher struct {
proj *ProjectionExec
child Executor
globalFinishCh <-chan struct{}
globalOutputCh chan<- *projectionOutput
Expand Down Expand Up @@ -290,6 +304,8 @@ func (f *projectionInputFetcher) run(ctx context.Context) {

f.globalOutputCh <- output

requiredRows := atomic.LoadInt64(&f.proj.parentReqRows)
input.chk.SetRequiredRows(int(requiredRows), f.proj.maxChunkSize)
err := f.child.Next(ctx, input.chk)
if err != nil || input.chk.NumRows() == 0 {
output.done <- errors.Trace(err)
Expand Down