Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util/chunk: optimize (*ListInDisk).GetChunk and add a fast row container reader (#45130) #45204

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions util/chunk/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"pool.go",
"row.go",
"row_container.go",
"row_container_reader.go",
],
importpath = "github.com/pingcap/tidb/util/chunk",
visibility = ["//visibility:public"],
Expand Down
35 changes: 30 additions & 5 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package chunk

import (
"bufio"
"io"
"os"
"strconv"
Expand Down Expand Up @@ -172,13 +173,37 @@ func (l *ListInDisk) Add(chk *Chunk) (err error) {
func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {
chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx))
chkSize := l.numRowsOfEachChunk[chkIdx]
for rowIdx := 0; rowIdx < chkSize; rowIdx++ {
_, _, err := l.GetRowAndAppendToChunk(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}, chk)
if err != nil {
return chk, err

firstRowOffset, err := l.getOffset(uint32(chkIdx), 0)
if err != nil {
return nil, err
}

// this channel is big enough and will never be blocked.
formatCh := make(chan rowInDisk, chkSize)
var formatChErr error
go func() {
defer close(formatCh)

// If the row is small, a bufio can significantly improve the performance. As benchmark shows, it's still not bad
// for longer rows.
r := bufio.NewReader(l.dataFile.getSectionReader(firstRowOffset))
format := rowInDisk{numCol: len(l.fieldTypes)}
for rowIdx := 0; rowIdx < chkSize; rowIdx++ {
_, err = format.ReadFrom(r)
if err != nil {
formatChErr = err
break
}

formatCh <- format
}
}()

for format := range formatCh {
_, chk = format.toRow(l.fieldTypes, chk)
}
return chk, nil
return chk, formatChErr
}

// GetRow gets a Row from the ListInDisk by RowPtr.
Expand Down
2 changes: 2 additions & 0 deletions util/chunk/disk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ func testListInDisk(t *testing.T, concurrency int) {
}

func BenchmarkListInDisk_GetChunk(b *testing.B) {
b.StopTimer()
numChk, numRow := 10, 1000
chks, fields := initChunks(numChk, numRow)
l := NewListInDisk(fields)
Expand All @@ -267,6 +268,7 @@ func BenchmarkListInDisk_GetChunk(b *testing.B) {
_ = l.Add(chk)
}

b.StartTimer()
for i := 0; i < b.N; i++ {
v := i % numChk
_, _ = l.GetChunk(v)
Expand Down
163 changes: 163 additions & 0 deletions util/chunk/row_container_reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chunk

import (
"context"
"runtime"
"sync"

"github.com/pingcap/tidb/util/logutil"
)

// RowContainerReader is a forward-only iterator for the row container. It provides an interface similar to other
// iterators, but it doesn't provide `ReachEnd` function and requires manually closing to release goroutine.
//
// It's recommended to use the following pattern to use it:
//
// for iter := NewRowContainerReader(rc); iter.Current() != iter.End(); iter.Next() {
// ...
// }
// iter.Close()
// if iter.Error() != nil {
// }
type RowContainerReader interface {
// Next returns the next Row.
Next() Row

// Current returns the current Row.
Current() Row

// End returns the invalid end Row.
End() Row

// Error returns none-nil error if anything wrong happens during the iteration.
Error() error

// Close closes the dumper
Close()
}

var _ RowContainerReader = &rowContainerReader{}

// rowContainerReader is a forward-only iterator for the row container
// It will spawn two goroutines for reading chunks from disk, and converting the chunk to rows. The row will only be sent
// to `rowCh` inside only after when the full chunk has been read, to avoid concurrently read/write to the chunk.
//
// TODO: record the memory allocated for the channel and chunks.
type rowContainerReader struct {
// context, cancel and waitgroup are used to stop and wait until all goroutine stops.
ctx context.Context
cancel func()
wg sync.WaitGroup

rc *RowContainer

currentRow Row
rowCh chan Row

// this error will only be set by worker
err error
}

// Next implements RowContainerReader
func (reader *rowContainerReader) Next() Row {
for row := range reader.rowCh {
reader.currentRow = row
return row
}
reader.currentRow = reader.End()
return reader.End()
}

// Current implements RowContainerReader
func (reader *rowContainerReader) Current() Row {
return reader.currentRow
}

// End implements RowContainerReader
func (*rowContainerReader) End() Row {
return Row{}
}

// Error implements RowContainerReader
func (reader *rowContainerReader) Error() error {
return reader.err
}

func (reader *rowContainerReader) initializeChannel() {
if reader.rc.NumChunks() == 0 {
reader.rowCh = make(chan Row, 1024)
} else {
assumeChunkSize := reader.rc.NumRowsOfChunk(0)
// To avoid blocking in sending to `rowCh` and don't start reading the next chunk, it'd be better to give it
// a buffer at least larger than a single chunk. Here it's allocated twice the chunk size to leave some margin.
reader.rowCh = make(chan Row, 2*assumeChunkSize)
}
}

// Close implements RowContainerReader
func (reader *rowContainerReader) Close() {
reader.cancel()
reader.wg.Wait()
}

func (reader *rowContainerReader) startWorker() {
reader.wg.Add(1)
go func() {
defer close(reader.rowCh)
defer reader.wg.Done()

for chkIdx := 0; chkIdx < reader.rc.NumChunks(); chkIdx++ {
chk, err := reader.rc.GetChunk(chkIdx)
if err != nil {
reader.err = err
return
}

for i := 0; i < chk.NumRows(); i++ {
select {
case reader.rowCh <- chk.GetRow(i):
case <-reader.ctx.Done():
return
}
}
}
}()
}

// NewRowContainerReader creates a forward only iterator for row container
func NewRowContainerReader(rc *RowContainer) *rowContainerReader {
ctx, cancel := context.WithCancel(context.Background())

reader := &rowContainerReader{
ctx: ctx,
cancel: cancel,
wg: sync.WaitGroup{},

rc: rc,
}
reader.initializeChannel()
reader.startWorker()
reader.Next()
runtime.SetFinalizer(reader, func(reader *rowContainerReader) {
if reader.ctx.Err() == nil {
logutil.BgLogger().Warn("rowContainerReader is closed by finalizer")
reader.Close()
}
})

return reader
}
Loading