Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support window function row number #9098

Merged
merged 12 commits into from Feb 18, 2019
@@ -31,6 +31,7 @@ import (
"github.com/pingcap/tidb/distsql"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/executor/windowfunc"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/infoschema"
@@ -1885,17 +1886,28 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec
for _, item := range v.PartitionBy {
groupByItems = append(groupByItems, item.Col)
}
aggDesc := aggregation.NewAggFuncDesc(b.ctx, v.WindowFuncDesc.Name, v.WindowFuncDesc.Args, false)
resultColIdx := len(v.Schema().Columns) - 1
agg := aggfuncs.Build(b.ctx, aggDesc, resultColIdx)
if agg == nil {
b.err = errors.Trace(errors.New("window evaluator only support aggregation functions without frame now"))
return nil
var processor windowProcessor
if aggregation.IsAggFuncs(v.WindowFuncDesc.Name) {
aggDesc := aggregation.NewAggFuncDesc(b.ctx, v.WindowFuncDesc.Name, v.WindowFuncDesc.Args, false)
agg := aggfuncs.Build(b.ctx, aggDesc, resultColIdx)
processor = &aggWindowProcessor{
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
}
} else {
wf, err := windowfuncs.Build(b.ctx, v.WindowFuncDesc, resultColIdx)
This conversation was marked as resolved by eurekaka

This comment has been minimized.

Copy link
@eurekaka

eurekaka Jan 21, 2019

Contributor

Directly assign returned error to b.err?

if err != nil {
b.err = err
return nil
}
processor = &noFrameWindowProcessor{
windowFunc: wf,
partialResult: wf.AllocPartialResult(),
}
}
e := &WindowExec{baseExecutor: base,
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems),
return &WindowExec{baseExecutor: base,
processor: processor,
groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems),
}
return e
}
@@ -17,13 +17,16 @@ import (
"context"
"time"

"github.com/cznic/mathutil"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/executor/windowfunc"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
)

// WindowExec is the executor for window functions. Note that it only supports aggregation without frame clause now.
// WindowExec is the executor for window functions.
type WindowExec struct {
baseExecutor

@@ -32,12 +35,11 @@ type WindowExec struct {
inputRow chunk.Row
groupRows []chunk.Row
childResults []*chunk.Chunk
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
executed bool
meetNewGroup bool
remainingRowsInGroup int64
remainingRowsInGroup int
remainingRowsInChunk int
processor windowProcessor
}

// Close implements the Executor Close interface.
@@ -84,7 +86,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
return errors.Trace(err)
}
if e.meetNewGroup {
err := e.consumeGroupRows()
err := e.consumeGroupRows(chk)
if err != nil {
return errors.Trace(err)
}
@@ -93,6 +95,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
return errors.Trace(err)
}
}
e.remainingRowsInGroup++
e.groupRows = append(e.groupRows, e.inputRow)
if e.meetNewGroup {
e.inputRow = e.inputIter.Next()
@@ -102,16 +105,19 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
return nil
}

func (e *WindowExec) consumeGroupRows() error {
func (e *WindowExec) consumeGroupRows(chk *chunk.Chunk) (err error) {
if len(e.groupRows) == 0 {
return nil
}
err := e.windowFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResult)
e.copyChk(chk)
remained := mathutil.Min(e.remainingRowsInChunk, e.remainingRowsInGroup)
oldRemained := remained
e.groupRows, remained, err = e.processor.consumeGroupRows(e.ctx, e.groupRows, chk, remained)
if err != nil {
return errors.Trace(err)
}
e.remainingRowsInGroup += int64(len(e.groupRows))
e.groupRows = e.groupRows[:0]
e.remainingRowsInGroup -= oldRemained - remained
e.remainingRowsInChunk -= oldRemained - remained
return nil
}

@@ -121,7 +127,7 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk
}

// Before fetching a new batch of input, we should consume the last group rows.
err = e.consumeGroupRows()
err = e.consumeGroupRows(chk)
if err != nil {
return errors.Trace(err)
}
@@ -145,19 +151,18 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk
}

// appendResult2Chunk appends result of the window function to the result chunk.
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error {
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) (err error) {
e.copyChk(chk)
for e.remainingRowsInGroup > 0 && e.remainingRowsInChunk > 0 {
// TODO: We can extend the agg func interface to avoid the `for` loop here.
err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk)
if err != nil {
return err
}
e.remainingRowsInGroup--
e.remainingRowsInChunk--
remained := mathutil.Min(e.remainingRowsInChunk, e.remainingRowsInGroup)
oldRemained := remained
e.groupRows, remained, err = e.processor.appendResult2Chunk(e.ctx, e.groupRows, chk, remained)
if err != nil {
return err
}
e.remainingRowsInGroup -= oldRemained - remained
e.remainingRowsInChunk -= oldRemained - remained
if e.remainingRowsInGroup == 0 {
e.windowFunc.ResetPartialResult(e.partialResult)
e.processor.resetPartialResult()
}
return nil
}
@@ -174,3 +179,63 @@ func (e *WindowExec) copyChk(chk *chunk.Chunk) {
chk.MakeRefTo(i, childResult, col.Index)
}
}

// windowProcessor is the interface for processing different kinds of window functions.
type windowProcessor interface {
This conversation was marked as resolved by zz-jason

This comment has been minimized.

Copy link
@zz-jason

zz-jason Jan 25, 2019

Member

Please add some comments about each function in this interface. You can take https://github.com/pingcap/tidb/blob/master/executor/aggfuncs/aggfuncs.go#L92 as an example.

consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, int, error)
appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, int, error)
resetPartialResult()
}

type aggWindowProcessor struct {
This conversation was marked as resolved by zz-jason

This comment has been minimized.

Copy link
@zz-jason

zz-jason Feb 11, 2019

Member

how about:

  • s/aggWindowProcessor/nonStreamedWinProc/
  • s/noFrameWindowProcessor/streamedWinProc/

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Feb 11, 2019

Author Member

Window function with frame clause is also streamed, so it is better to keep the name here?

windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
}

func (p *aggWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, int, error) {
err := p.windowFunc.UpdatePartialResult(ctx, rows, p.partialResult)
rows = rows[:0]
return rows, remained, err
}

func (p *aggWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, int, error) {
for remained > 0 {
// TODO: We can extend the agg func interface to avoid the `for` loop here.
err := p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk)
if err != nil {
return rows, remained, err
}
remained--
}
return rows, remained, nil
}

func (p *aggWindowProcessor) resetPartialResult() {
p.windowFunc.ResetPartialResult(p.partialResult)
}

type noFrameWindowProcessor struct {
windowFunc windowfuncs.WindowFunc
partialResult windowfuncs.PartialResult
}

func (p *noFrameWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, int, error) {
var err error
rows, remained, err = p.windowFunc.ProcessOneChunk(ctx, rows, p.partialResult, chk, remained)
return rows, remained, err
}

func (p *noFrameWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, int, error) {
var err error
for remained > 0 {
rows, remained, err = p.windowFunc.ExhaustResult(ctx, rows, p.partialResult, chk, remained)
if err != nil {
return rows, remained, err
}
}
return rows, remained, err
}

func (p *noFrameWindowProcessor) resetPartialResult() {
p.windowFunc.ResetPartialResult(p.partialResult)
}
@@ -40,4 +40,9 @@ func (s *testSuite2) TestWindowFunctions(c *C) {
result.Check(testkit.Rows("21", "21", "21", "21", "21", "21", "21", "21", "21"))
result = tk.MustQuery("select _tidb_rowid, sum(t.a) over() from t")
result.Check(testkit.Rows("1 7", "2 7", "3 7"))

result = tk.MustQuery("select a, row_number() over() from t")
result.Check(testkit.Rows("1 1", "4 2", "2 3"))
result = tk.MustQuery("select a, row_number() over(partition by a) from t")
result.Check(testkit.Rows("1 1", "2 1", "4 1"))
}
@@ -0,0 +1,34 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package windowfuncs

import (
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/expression/aggregation"
"github.com/pingcap/tidb/sessionctx"
"github.com/pkg/errors"
This conversation was marked as resolved by lamxTyler

This comment has been minimized.

Copy link
@winoros

winoros Jan 24, 2019

Member

use pingcap/errors instead.

)

// Build builds window functions according to the window functions description.
func Build(sctx sessionctx.Context, windowFuncDesc *aggregation.WindowFuncDesc, ordinal int) (WindowFunc, error) {
This conversation was marked as resolved by eurekaka

This comment has been minimized.

Copy link
@eurekaka

eurekaka Jan 21, 2019

Contributor

The sctx is not used?

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Jan 21, 2019

Author Member

It will be used for functions added in future.

switch windowFuncDesc.Name {
case ast.WindowFuncRowNumber:
return buildRowNumber(ordinal)
}
return nil, errors.Errorf("not supported window function %s", windowFuncDesc.Name)
}

func buildRowNumber(ordinal int) (WindowFunc, error) {
return &rowNumber{baseWindowFunc: baseWindowFunc{ordinal: ordinal}}, nil
}
@@ -0,0 +1,51 @@
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package windowfuncs

import (
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
)

type rowNumber struct {
baseWindowFunc
}

type partialResult4RowNumber struct {
curIdx int64
}

func (wf *rowNumber) ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, pr PartialResult, dest *chunk.Chunk, remained int) ([]chunk.Row, int, error) {
p := (*partialResult4RowNumber)(pr)
for len(rows) > 0 && remained > 0 {
p.curIdx++
remained--
dest.AppendInt64(wf.ordinal, p.curIdx)
This conversation was marked as resolved by zz-jason

This comment has been minimized.

Copy link
@eurekaka

eurekaka Jan 21, 2019

Contributor

Hmm, can we treat row_number as a special aggregate function? we can increase the row number in its AppendFinalResult2Chunk, and reset the row number to 1 in its ResetPartialResult, then we don't need to change the executor framework for window function at all?

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Jan 21, 2019

Author Member

It is for effeciency, because functions like row_number can be executed streamly. It represents a wide varity of functions, and also for the framed window functions.

This comment has been minimized.

Copy link
@eurekaka

eurekaka Jan 22, 2019

Contributor

OK, got it.

This comment has been minimized.

Copy link
@zz-jason

zz-jason Jan 28, 2019

Member

In window executor, we buffered all the rows belonging to the same partition even for row_number(), seems it is not streaming processed?

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Jan 28, 2019

Author Member

No, for row_number, it buffers at most one chunk. The buffered rows are controlled by each function according to their properties.

rows = rows[1:]
This conversation was marked as resolved by eurekaka

This comment has been minimized.

Copy link
@eurekaka

eurekaka Jan 21, 2019

Contributor

We can cut the rows only once after break out from this loop?

}
return rows, remained, nil
}

func (wf *rowNumber) ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, pr PartialResult, dest *chunk.Chunk, remained int) ([]chunk.Row, int, error) {
return rows, remained, nil
}

func (wf *rowNumber) AllocPartialResult() PartialResult {
return PartialResult(&partialResult4RowNumber{})
}

func (wf *rowNumber) ResetPartialResult(pr PartialResult) {
p := (*partialResult4RowNumber)(pr)
p.curIdx = 0
}
@@ -0,0 +1,49 @@
// Copyright 2019 PingCAP, Inc.
This conversation was marked as resolved by qw4990

This comment has been minimized.

Copy link
@qw4990

qw4990 Jan 17, 2019

Contributor

Package windowfunc should be contained by expression instead of executor?

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Jan 17, 2019

Author Member

I think it is better to be in executor because it only deals with the execution stage and it is uniform with package aggfuncs.

//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package windowfuncs

import (
"unsafe"

"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
)

// PartialResult represents data structure to store the partial result for the
// aggregate functions. Here we use unsafe.Pointer to allow the partial result
// to be any type.
type PartialResult unsafe.Pointer
This conversation was marked as resolved by qw4990

This comment has been minimized.

Copy link
@qw4990

qw4990 Jan 17, 2019

Contributor

Why unsafe.Pointer instead of interface{}?

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Jan 17, 2019

Author Member

Because it costs less, an interface needs 16 bytes, while pointer needs 8 bytes.


// WindowFunc is the interface for processing window functions.
type WindowFunc interface {
// ProcessOneChunk processes one chunk and write results to chunk.
ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, pr PartialResult, dest *chunk.Chunk, remainedRows int) ([]chunk.Row, int, error)
This conversation was marked as resolved by zz-jason

This comment has been minimized.

Copy link
@zz-jason

zz-jason Feb 11, 2019

Member

How about changing remainedRows to numRowsToProcess and maintaining this value in the caller side? Thus the return value can be simplified to error and we don't need to care about what does remainedRows mean.

That is:

Suggested change
ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, pr PartialResult, dest *chunk.Chunk, remainedRows int) ([]chunk.Row, int, error)
ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, numRowsToProcess int, pr PartialResult, dest *chunk.Chunk) (error)

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Feb 11, 2019

Author Member
  • It is not numRowsToProcess, but min(remainingRowsInChunk, remainingRowsInGroup) to avoid processing more rows than this chunk or group needed.
  • I think it is better to keep it here because it frees the caller to know how many rows will the callee processed and it is consistent with ExhaustResult.

This comment has been minimized.

Copy link
@zz-jason

zz-jason Feb 12, 2019

Member

How about setting numRowsToProcess to min(len(rows), remainingRowsInChunk, remainingRowsInGroup)? Thus the function signature can be simplified, and the window function implementation don't need to consider the staffs that not related to the function itself.

The number of window function implementations can be very large, especially when we need to write a different implementations for different input types of the same window function. I think it's valuable to keep the interface simpler.

From the caller's perspective, I think this interface is more friendly to the case of multiple functions inside a WindowExec. For example, in the noFrameWindowProcessor:

func (p *noFrameWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, int, error) {
	numRowsToProcess := min(len(rows), remained)
	for i := range p.WindowFuncs {
		err := p.WindowFuncs[i].ProcessOneChunk(ctx, rows, numRowsToProcess, p.partialResult[i], chk)
		// handle err
	}
	return rows[numRowsToProcess:], remained-numRowsToProcess, nil
}

From the window function's perspective, it only needs to handle numRowsToProcess input rows, only change the content of output parameters, these are chk and partial result, leave the input parameters unchanged, for example, in row_number():

func (wf *rowNumber) ProcessOneChunk(sctx sessionctx.Context, rows []chunk.Row, numRowsToProcess int, pr PartialResult, dest *chunk.Chunk) error {
	p := (*partialResult4RowNumber)(pr)
	for i := 0; i < numRowsToProcess; i++ {
		p.curIdx++
		dest.AppendInt64(wf.ordinal, p.curIdx)
 	}
	return nil
}

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Feb 12, 2019

Author Member

Not all functions perform like row_number, some functions also need to know rows before the current row, while some functions also need to know rows after the current row, like lead and lag. So it is difficult to know how many rows the function really proceed.

This comment has been minimized.

Copy link
@zz-jason

zz-jason Feb 12, 2019

Member

These infos can be stored in the partial result struct like apache hive. For example LeadBuffer: https://github.com/apache/hive/blob/master/ql/src/java/org/apache/hadoop/hive/ql/udf/generic/GenericUDAFLead.java#L81

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Feb 12, 2019

Author Member

Then what about lag? The main problem here is you cannot know the number of rows processed without knowing the specific function.

This comment has been minimized.

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Feb 12, 2019

Author Member

"The main problem here is you cannot know the number of rows processed without knowing the specific function.".

This comment has been minimized.

Copy link
@zz-jason

zz-jason Feb 12, 2019

Member

The partial result is binded to the specific function, each function has its own partial result.

This comment has been minimized.

Copy link
@zz-jason

zz-jason Feb 12, 2019

Member

I mean, in TiDB, LeadBuffer can be stored in the partial result of Lead()

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Feb 12, 2019

Author Member

The question here is we cannot update the remained correctly without knowing the function. Take the previous code:

func (p *noFrameWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, int, error) {
	numRowsToProcess := min(len(rows), remained)
	for i := range p.WindowFuncs {
		err := p.WindowFuncs[i].ProcessOneChunk(ctx, rows, numRowsToProcess, p.partialResult[i], chk)
		// handle err
	}
	return rows[numRowsToProcess:], remained-numRowsToProcess, nil
}

And assumes the function is lead, remained is 10, len(rows) is 5, and current rows is the first rows in the group, so numRowsToProcess is 5 here, but you can only process 4 rows for lead because you do not know row 6. So numRowsToProcess depends on each function.

This comment has been minimized.

Copy link
@lamxTyler

lamxTyler Feb 14, 2019

Author Member

Updated, PTAL.

// ExhaustResult exhausts result to chunk.
ExhaustResult(sctx sessionctx.Context, rows []chunk.Row, pr PartialResult, dest *chunk.Chunk, remainedRows int) ([]chunk.Row, int, error)
// AllocPartialResult allocates a specific data structure to store the partial result.
AllocPartialResult() PartialResult
// ResetPartialResult resets the partial result.
ResetPartialResult(pr PartialResult)
}

type baseWindowFunc struct {
// args stores the input arguments for an aggregate function, we should
// call arg.EvalXXX to get the actual input data for this function.
args []expression.Expression

// ordinal stores the ordinal of the columns in the output chunk, which is
// used to append the final result of this function.
ordinal int
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.