Skip to content

Commit

Permalink
executor: implement disk-based sort (Part 1) (#13718)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored and sre-bot committed Dec 18, 2019
1 parent 9b7b2c0 commit fc603b6
Show file tree
Hide file tree
Showing 2 changed files with 219 additions and 14 deletions.
18 changes: 17 additions & 1 deletion executor/explain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,30 @@ func (s *testSuite1) TestMemoryUsageAfterClose(c *C) {
for i := 0; i < tk.Se.GetSessionVars().MaxChunkSize*5; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%v, %v)", i, i))
}
SQLs := []string{"select v+abs(k) from t"}
SQLs := []string{"select v+abs(k) from t",
"select v from t order by v"}
for _, sql := range SQLs {
tk.MustQuery(sql)
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.BytesConsumed(), Equals, int64(0))
c.Assert(tk.Se.GetSessionVars().StmtCtx.MemTracker.MaxConsumed(), Greater, int64(0))
}
}

func (s *testSuite1) TestDiskUsageAfterClose(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
tk.MustExec("create table t (v int, k int, key(k))")
for i := 0; i < tk.Se.GetSessionVars().MaxChunkSize*5; i++ {
tk.MustExec(fmt.Sprintf("insert into t values (%v, %v)", i, i))
}
SQLs := []string{
"select v from t order by v"}
for _, sql := range SQLs {
tk.MustQuery(sql)
c.Assert(tk.Se.GetSessionVars().StmtCtx.DiskTracker.BytesConsumed(), Equals, int64(0))
}
}

func (s *testSuite2) TestExplainAnalyzeExecutionInfo(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk.MustExec("drop table if exists t")
Expand Down
215 changes: 202 additions & 13 deletions executor/sort.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import (
"context"
"fmt"
"sort"
"sync/atomic"

"github.com/pingcap/tidb/expression"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/stringutil"
)
Expand All @@ -49,12 +51,57 @@ type SortExec struct {
// rowPointer store the chunk index and row index for each row.
rowPtrs []chunk.RowPtr

memTracker *memory.Tracker
memTracker *memory.Tracker
diskTracker *disk.Tracker

// rowChunksInDisk is the chunks to store row values in disk.
rowChunksInDisk *chunk.ListInDisk
// rowPtrsInDisk store the disk-chunk index and row index for each row.
rowPtrsInDisk []chunk.RowPtr
// partitionList is the chunks to store row values in disk for partitions.
partitionList []*chunk.ListInDisk
// partitionRowPtrs store the disk-chunk index and row index for each row for partitions.
partitionRowPtrs [][]chunk.RowPtr
// exceeded indicates that records have exceeded memQuota during
// adding this chunk and we should spill now.
exceeded uint32
// spilled indicates that records have spilled out into disk.
spilled uint32
}

// Close implements the Executor Close interface.
func (e *SortExec) Close() error {
if e.alreadySpilled() {
if e.rowChunksInDisk != nil {
if err := e.rowChunksInDisk.Close(); err != nil {
return err
}
}
for _, chunkInDisk := range e.partitionList {
if chunkInDisk != nil {
if err := chunkInDisk.Close(); err != nil {
return err
}
}
}
e.rowChunksInDisk = nil
e.partitionList = e.partitionList[:0]

e.memTracker.Consume(int64(-8 * cap(e.rowPtrsInDisk)))
e.rowPtrsInDisk = nil
for _, partitionPtrs := range e.partitionRowPtrs {
e.memTracker.Consume(int64(-8 * cap(partitionPtrs)))
}
e.partitionRowPtrs = nil
}
if e.rowChunks != nil {
e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed())
e.rowChunks = nil
}
e.memTracker.Consume(int64(-8 * cap(e.rowPtrs)))
e.rowPtrs = nil
e.memTracker = nil
e.diskTracker = nil
return e.children[0].Close()
}

Expand All @@ -67,7 +114,15 @@ func (e *SortExec) Open(ctx context.Context) error {
if e.memTracker == nil {
e.memTracker = memory.NewTracker(e.id, e.ctx.GetSessionVars().MemQuotaSort)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
e.diskTracker = memory.NewTracker(e.id, -1)
e.diskTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.DiskTracker)
}
e.exceeded = 0
e.spilled = 0
e.rowChunksInDisk = nil
e.rowPtrsInDisk = e.rowPtrsInDisk[:0]
e.partitionList = e.partitionList[:0]
e.partitionRowPtrs = e.partitionRowPtrs[:0]
return e.children[0].Open(ctx)
}

Expand All @@ -79,20 +134,65 @@ func (e *SortExec) Next(ctx context.Context, req *chunk.Chunk) error {
if err != nil {
return err
}
e.initPointers()
e.initCompareFuncs()
e.buildKeyColumns()
sort.Slice(e.rowPtrs, e.keyColumnsLess)
e.fetched = true
if e.alreadySpilled() {
err = e.prepareExternalSorting()
if err != nil {
return err
}
e.fetched = true
} else {
e.initPointers()
e.initCompareFuncs()
e.buildKeyColumns()
sort.Slice(e.rowPtrs, e.keyColumnsLess)
e.fetched = true
}
}
for !req.IsFull() && e.Idx < len(e.rowPtrs) {
rowPtr := e.rowPtrs[e.Idx]
req.AppendRow(e.rowChunks.GetRow(rowPtr))
e.Idx++

if e.alreadySpilled() {
for !req.IsFull() && e.Idx < len(e.partitionRowPtrs[0]) {
rowPtr := e.partitionRowPtrs[0][e.Idx]
row, err := e.partitionList[0].GetRow(rowPtr)
if err != nil {
return err
}
req.AppendRow(row)
e.Idx++
}
} else {
for !req.IsFull() && e.Idx < len(e.rowPtrs) {
rowPtr := e.rowPtrs[e.Idx]
req.AppendRow(e.rowChunks.GetRow(rowPtr))
e.Idx++
}
}
return nil
}

func (e *SortExec) prepareExternalSorting() (err error) {
e.initCompareFuncs()
e.buildKeyColumns()
e.rowPtrsInDisk = e.initPointersForListInDisk(e.rowChunksInDisk)
// partition sort
// Now only have one partition.
// The partition will be adjusted in the next pr.
err = e.readPartition(e.rowChunksInDisk, e.rowPtrsInDisk)
if err != nil {
return err
}
e.initPointers()
sort.Slice(e.rowPtrs, e.keyColumnsLess)
listInDisk, err := e.spillToDiskByRowPtr()
if err != nil {
return err
}
e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed())
e.rowChunks = nil
e.partitionList = append(e.partitionList, listInDisk)
e.partitionRowPtrs = append(e.partitionRowPtrs, e.initPointersForListInDisk(listInDisk))
return err
}

func (e *SortExec) fetchRowChunks(ctx context.Context) error {
fields := retTypes(e)
e.rowChunks = chunk.NewList(fields, e.initCap, e.maxChunkSize)
Expand All @@ -108,20 +208,53 @@ func (e *SortExec) fetchRowChunks(ctx context.Context) error {
if rowCount == 0 {
break
}
e.rowChunks.Add(chk)
if e.alreadySpilled() {
// append chk to disk.
err := e.rowChunksInDisk.Add(chk)
if err != nil {
return err
}
} else {
e.rowChunks.Add(chk)
if atomic.LoadUint32(&e.exceeded) == 1 {
e.rowChunksInDisk, err = e.spillToDisk()
if err != nil {
return err
}
e.memTracker.Consume(-e.rowChunks.GetMemTracker().BytesConsumed())
e.rowChunks = nil // GC its internal chunks.
atomic.StoreUint32(&e.spilled, 1)
}
}
}
return nil
}

func (e *SortExec) initPointers() {
e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len())
e.memTracker.Consume(int64(8 * e.rowChunks.Len()))
if e.rowPtrs != nil {
e.memTracker.Consume(int64(-8 * cap(e.rowPtrs)))
e.rowPtrs = e.rowPtrs[:0]
} else {
e.rowPtrs = make([]chunk.RowPtr, 0, e.rowChunks.Len())
}
for chkIdx := 0; chkIdx < e.rowChunks.NumChunks(); chkIdx++ {
rowChk := e.rowChunks.GetChunk(chkIdx)
for rowIdx := 0; rowIdx < rowChk.NumRows(); rowIdx++ {
e.rowPtrs = append(e.rowPtrs, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
e.memTracker.Consume(int64(8 * cap(e.rowPtrs)))
}

func (e *SortExec) initPointersForListInDisk(disk *chunk.ListInDisk) []chunk.RowPtr {
rowPtrsInDisk := make([]chunk.RowPtr, 0)
for chkIdx := 0; chkIdx < disk.NumChunks(); chkIdx++ {
for rowIdx := 0; rowIdx < disk.NumRowsOfChunk(chkIdx); rowIdx++ {
rowPtrsInDisk = append(rowPtrsInDisk, chunk.RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
}
}
e.memTracker.Consume(int64(8 * len(rowPtrsInDisk)))
return rowPtrsInDisk
}

func (e *SortExec) initCompareFuncs() {
Expand Down Expand Up @@ -163,6 +296,62 @@ func (e *SortExec) keyColumnsLess(i, j int) bool {
return e.lessRow(rowI, rowJ)
}

func (e *SortExec) readPartition(disk *chunk.ListInDisk, rowPtrs []chunk.RowPtr) error {
e.rowChunks = chunk.NewList(retTypes(e), e.initCap, e.maxChunkSize)
e.rowChunks.GetMemTracker().AttachTo(e.memTracker)
e.rowChunks.GetMemTracker().SetLabel(rowChunksLabel)
for _, rowPtr := range rowPtrs {
rowPtr, err := disk.GetRow(rowPtr)
if err != nil {
return err
}
e.rowChunks.AppendRow(rowPtr)
}
return nil
}

// alreadySpilled indicates that records have spilled out into disk.
func (e *SortExec) alreadySpilled() bool { return e.rowChunksInDisk != nil }

// alreadySpilledSafe indicates that records have spilled out into disk. It's thread-safe.
func (e *SortExec) alreadySpilledSafe() bool { return atomic.LoadUint32(&e.spilled) == 1 }

func (e *SortExec) spillToDisk() (disk *chunk.ListInDisk, err error) {
N := e.rowChunks.NumChunks()
rowChunksInDisk := chunk.NewListInDisk(e.retFieldTypes)
rowChunksInDisk.GetDiskTracker().AttachTo(e.diskTracker)
for i := 0; i < N; i++ {
chk := e.rowChunks.GetChunk(i)
err = rowChunksInDisk.Add(chk)
if err != nil {
return nil, err
}
}
return rowChunksInDisk, nil
}

func (e *SortExec) spillToDiskByRowPtr() (disk *chunk.ListInDisk, err error) {
rowChunksInDisk := chunk.NewListInDisk(e.retFieldTypes)
rowChunksInDisk.GetDiskTracker().AttachTo(e.diskTracker)
chk := newFirstChunk(e)
for _, rowPtr := range e.rowPtrs {
chk.AppendRow(e.rowChunks.GetRow(rowPtr))
if chk.IsFull() {
err := rowChunksInDisk.Add(chk)
if err != nil {
return nil, err
}
chk = newFirstChunk(e)
}
}
if chk.NumRows() != 0 {
if err := rowChunksInDisk.Add(chk); err != nil {
return nil, err
}
}
return rowChunksInDisk, nil
}

// TopNExec implements a Top-N algorithm and it is built from a SELECT statement with ORDER BY and LIMIT.
// Instead of sorting all the rows fetched from the table, it keeps the Top-N elements only in a heap to reduce memory usage.
type TopNExec struct {
Expand Down

0 comments on commit fc603b6

Please sign in to comment.