Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support row framed window functions #9358

Merged
merged 3 commits into from
Feb 21, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 16 additions & 8 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1905,14 +1905,22 @@ func (b *executorBuilder) buildWindow(v *plannercore.PhysicalWindow) *WindowExec
aggDesc := aggregation.NewAggFuncDesc(b.ctx, v.WindowFuncDesc.Name, v.WindowFuncDesc.Args, false)
resultColIdx := len(v.Schema().Columns) - 1
agg := aggfuncs.Build(b.ctx, aggDesc, resultColIdx)
if agg == nil {
b.err = errors.Trace(errors.New("window evaluator only support aggregation functions without frame now"))
return nil
var processor windowProcessor
if v.Frame == nil {
processor = &aggWindowProcessor{
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
}
} else {
processor = &rowFrameWindowProcessor{
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
start: v.Frame.Start,
end: v.Frame.End,
}
}
e := &WindowExec{baseExecutor: base,
windowFunc: agg,
partialResult: agg.AllocPartialResult(),
groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems),
return &WindowExec{baseExecutor: base,
processor: processor,
groupChecker: newGroupChecker(b.ctx.GetSessionVars().StmtCtx, groupByItems),
}
return e
}
167 changes: 149 additions & 18 deletions executor/window.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ import (
"context"
"time"

"github.com/cznic/mathutil"
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/parser/ast"
"github.com/pingcap/tidb/executor/aggfuncs"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/util/chunk"
)

// WindowExec is the executor for window functions. Note that it only supports aggregation without frame clause now.
// WindowExec is the executor for window functions.
type WindowExec struct {
baseExecutor

Expand All @@ -32,12 +36,11 @@ type WindowExec struct {
inputRow chunk.Row
groupRows []chunk.Row
childResults []*chunk.Chunk
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
executed bool
meetNewGroup bool
remainingRowsInGroup int64
remainingRowsInGroup int
remainingRowsInChunk int
processor windowProcessor
}

// Close implements the Executor Close interface.
Expand Down Expand Up @@ -93,6 +96,7 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
return errors.Trace(err)
}
}
e.remainingRowsInGroup++
e.groupRows = append(e.groupRows, e.inputRow)
if e.meetNewGroup {
e.inputRow = e.inputIter.Next()
Expand All @@ -102,16 +106,14 @@ func (e *WindowExec) consumeOneGroup(ctx context.Context, chk *chunk.Chunk) erro
return nil
}

func (e *WindowExec) consumeGroupRows() error {
func (e *WindowExec) consumeGroupRows() (err error) {
if len(e.groupRows) == 0 {
return nil
}
err := e.windowFunc.UpdatePartialResult(e.ctx, e.groupRows, e.partialResult)
e.groupRows, err = e.processor.consumeGroupRows(e.ctx, e.groupRows)
if err != nil {
return errors.Trace(err)
}
e.remainingRowsInGroup += int64(len(e.groupRows))
e.groupRows = e.groupRows[:0]
return nil
}

Expand Down Expand Up @@ -145,19 +147,18 @@ func (e *WindowExec) fetchChildIfNecessary(ctx context.Context, chk *chunk.Chunk
}

// appendResult2Chunk appends result of the window function to the result chunk.
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) error {
func (e *WindowExec) appendResult2Chunk(chk *chunk.Chunk) (err error) {
e.copyChk(chk)
for e.remainingRowsInGroup > 0 && e.remainingRowsInChunk > 0 {
// TODO: We can extend the agg func interface to avoid the `for` loop here.
err := e.windowFunc.AppendFinalResult2Chunk(e.ctx, e.partialResult, chk)
if err != nil {
return err
}
e.remainingRowsInGroup--
e.remainingRowsInChunk--
remained := mathutil.Min(e.remainingRowsInChunk, e.remainingRowsInGroup)
e.groupRows, err = e.processor.appendResult2Chunk(e.ctx, e.groupRows, chk, remained)
if err != nil {
return err
}
e.remainingRowsInGroup -= remained
e.remainingRowsInChunk -= remained
if e.remainingRowsInGroup == 0 {
e.windowFunc.ResetPartialResult(e.partialResult)
e.processor.resetPartialResult()
e.groupRows = e.groupRows[:0]
}
return nil
}
Expand All @@ -174,3 +175,133 @@ func (e *WindowExec) copyChk(chk *chunk.Chunk) {
chk.MakeRefTo(i, childResult, col.Index)
}
}

// windowProcessor is the interface for processing different kinds of windows.
type windowProcessor interface {
qw4990 marked this conversation as resolved.
Show resolved Hide resolved
// consumeGroupRows updates the result for an window function using the input rows
// which belong to the same partition.
consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row) ([]chunk.Row, error)
// appendResult2Chunk appends the final results to chunk.
// It is called when there are no more rows in current partition.
appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error)
// resetPartialResult resets the partial result to the original state for a specific window function.
resetPartialResult()
}

type aggWindowProcessor struct {
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
}

func (p *aggWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row) ([]chunk.Row, error) {
err := p.windowFunc.UpdatePartialResult(ctx, rows, p.partialResult)
rows = rows[:0]
return rows, err
}

func (p *aggWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error) {
for remained > 0 {
// TODO: We can extend the agg func interface to avoid the `for` loop here.
err := p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk)
if err != nil {
return rows, err
}
remained--
}
return rows, nil
}

func (p *aggWindowProcessor) resetPartialResult() {
p.windowFunc.ResetPartialResult(p.partialResult)
}

type rowFrameWindowProcessor struct {
windowFunc aggfuncs.AggFunc
partialResult aggfuncs.PartialResult
start *core.FrameBound
end *core.FrameBound
curRowIdx uint64
}

func (p *rowFrameWindowProcessor) getStartOffset(numRows uint64) uint64 {
if p.start.UnBounded {
return 0
}
switch p.start.Type {
case ast.Preceding:
if p.curRowIdx >= p.start.Num {
winoros marked this conversation as resolved.
Show resolved Hide resolved
return p.curRowIdx - p.start.Num
}
return 0
case ast.Following:
offset := p.curRowIdx + p.start.Num
if offset >= numRows {
return numRows
}
return offset
case ast.CurrentRow:
return p.curRowIdx
}
// It will never reach here.
return 0
}

func (p *rowFrameWindowProcessor) getEndOffset(numRows uint64) uint64 {
if p.end.UnBounded {
return numRows
}
switch p.end.Type {
case ast.Preceding:
if p.curRowIdx >= p.end.Num {
return p.curRowIdx - p.end.Num + 1
}
return 0
case ast.Following:
offset := p.curRowIdx + p.end.Num
if offset >= numRows {
return numRows
}
return offset + 1
case ast.CurrentRow:
return p.curRowIdx + 1
}
// It will never reach here.
return 0
}

func (p *rowFrameWindowProcessor) consumeGroupRows(ctx sessionctx.Context, rows []chunk.Row) ([]chunk.Row, error) {
return rows, nil
}

// TODO: We can optimize it using sliding window algorithm.
func (p *rowFrameWindowProcessor) appendResult2Chunk(ctx sessionctx.Context, rows []chunk.Row, chk *chunk.Chunk, remained int) ([]chunk.Row, error) {
numRows := uint64(len(rows))
for remained > 0 {
start := p.getStartOffset(numRows)
end := p.getEndOffset(numRows)
p.curRowIdx++
remained--
if start >= end {
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
zz-jason marked this conversation as resolved.
Show resolved Hide resolved
err := p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk)
if err != nil {
return nil, err
}
continue
}
err := p.windowFunc.UpdatePartialResult(ctx, rows[start:end], p.partialResult)
if err != nil {
return nil, err
}
err = p.windowFunc.AppendFinalResult2Chunk(ctx, p.partialResult, chk)
if err != nil {
return nil, err
}
p.windowFunc.ResetPartialResult(p.partialResult)
}
return rows, nil
}

func (p *rowFrameWindowProcessor) resetPartialResult() {
p.windowFunc.ResetPartialResult(p.partialResult)
p.curRowIdx = 0
}
7 changes: 7 additions & 0 deletions executor/window_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,11 @@ func (s *testSuite2) TestWindowFunctions(c *C) {
result.Check(testkit.Rows("1 1", "4 2", "2 3"))
result = tk.MustQuery("select a, row_number() over(partition by a) from t")
result.Check(testkit.Rows("1 1", "2 1", "4 1"))

result = tk.MustQuery("select a, sum(a) over(rows between unbounded preceding and 1 following) from t")
result.Check(testkit.Rows("1 5", "4 7", "2 7"))
result = tk.MustQuery("select a, sum(a) over(rows between 1 preceding and 1 following) from t")
result.Check(testkit.Rows("1 5", "4 7", "2 6"))
result = tk.MustQuery("select a, sum(a) over(rows between unbounded preceding and 1 preceding) from t")
result.Check(testkit.Rows("1 <nil>", "4 1", "2 5"))
}