Skip to content
Permalink
Branch: master
Find file Copy path
Find file Copy path
Fetching contributors…
Cannot retrieve contributors at this time
316 lines (287 sloc) 8.78 KB
// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package chunk
import (
"bufio"
"errors"
"fmt"
"io"
"io/ioutil"
"os"
"path"
"sync"
"github.com/pingcap/log"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/disk"
"github.com/pingcap/tidb/util/stringutil"
"go.uber.org/zap"
)
const (
writeBufSize = 128 * 1024
readBufSize = 4 * 1024
)
var bufWriterPool = sync.Pool{
New: func() interface{} { return bufio.NewWriterSize(nil, writeBufSize) },
}
var bufReaderPool = sync.Pool{
New: func() interface{} { return bufio.NewReaderSize(nil, readBufSize) },
}
var tmpDir = path.Join(os.TempDir(), "tidb-server-"+path.Base(os.Args[0]))
func init() {
err := os.RemoveAll(tmpDir) // clean the uncleared temp file during the last run.
if err != nil {
log.Warn("Remove temporary file error", zap.String("tmpDir", tmpDir), zap.Error(err))
}
err = os.Mkdir(tmpDir, 0755)
if err != nil {
log.Warn("Mkdir temporary file error", zap.String("tmpDir", tmpDir), zap.Error(err))
}
}
// ListInDisk represents a slice of chunks storing in temporary disk.
type ListInDisk struct {
fieldTypes []*types.FieldType
// offsets stores the offsets in disk of all RowPtr,
// the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx].
offsets [][]int64
// offWrite is the current offset for writing.
offWrite int64
disk *os.File
bufWriter *bufio.Writer
diskTracker *disk.Tracker // track disk usage.
}
var defaultChunkListInDiskLabel fmt.Stringer = stringutil.StringerStr("chunk.ListInDisk")
// NewListInDisk creates a new ListInDisk with field types.
func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
l := &ListInDisk{
fieldTypes: fieldTypes,
// TODO(fengliyuan): set the quota of disk usage.
diskTracker: disk.NewTracker(defaultChunkListInDiskLabel, -1),
}
return l
}
// GetDiskTracker returns the memory tracker of this List.
func (l *ListInDisk) GetDiskTracker() *disk.Tracker {
return l.diskTracker
}
// Add adds a chunk to the ListInDisk. Caller must make sure the input chk
// is not empty and not used any more and has the same field types.
func (l *ListInDisk) Add(chk *Chunk) (err error) {
if chk.NumRows() == 0 {
return errors.New("chunk appended to List should have at least 1 row")
}
if l.disk == nil {
l.disk, err = ioutil.TempFile(tmpDir, l.diskTracker.Label().String())
if err != nil {
return
}
l.bufWriter = bufWriterPool.Get().(*bufio.Writer)
l.bufWriter.Reset(l.disk)
}
chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite}
n, err := chk2.WriteTo(l.bufWriter)
l.offWrite += n
if err != nil {
return
}
l.offsets = append(l.offsets, chk2.getOffsetsOfRows())
err = l.bufWriter.Flush()
if err == nil {
l.diskTracker.Consume(n)
}
return
}
// GetRow gets a Row from the ListInDisk by RowPtr.
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
off := l.offsets[ptr.ChkIdx][ptr.RowIdx]
r := io.NewSectionReader(l.disk, off, l.offWrite-off)
bufReader := bufReaderPool.Get().(*bufio.Reader)
bufReader.Reset(r)
defer bufReaderPool.Put(bufReader)
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(bufReader)
if err != nil {
return row, err
}
row = format.toMutRow(l.fieldTypes).ToRow()
return row, err
}
// NumChunks returns the number of chunks in the ListInDisk.
func (l *ListInDisk) NumChunks() int {
return len(l.offsets)
}
// Close releases the disk resource.
func (l *ListInDisk) Close() error {
if l.disk != nil {
l.diskTracker.Consume(-l.diskTracker.BytesConsumed())
terror.Call(l.disk.Close)
bufWriterPool.Put(l.bufWriter)
return os.Remove(l.disk.Name())
}
return nil
}
// chunkInDisk represents a chunk in disk format. Each row of the chunk
// is serialized and in sequence ordered. The format of each row is like
// the struct diskFormatRow, put size of each column first, then the
// data of each column.
//
// For example, a chunk has 2 rows and 3 columns, the disk format of the
// chunk is as follow:
//
// [size of row0 column0], [size of row0 column1], [size of row0 column2]
// [data of row0 column0], [data of row0 column1], [data of row0 column2]
// [size of row1 column0], [size of row1 column1], [size of row1 column2]
// [data of row1 column0], [data of row1 column1], [data of row1 column2]
//
// If a column of a row is null, the size of it is -1 and the data is empty.
type chunkInDisk struct {
*Chunk
// offWrite is the current offset for writing.
offWrite int64
// offsetsOfRows stores the offset of each row.
offsetsOfRows []int64
}
// WriteTo serializes the chunk into the format of chunkInDisk, and
// writes to w.
func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) {
var n int64
numRows := chk.NumRows()
chk.offsetsOfRows = make([]int64, 0, numRows)
var format *diskFormatRow
for rowIdx := 0; rowIdx < numRows; rowIdx++ {
format = convertFromRow(chk.GetRow(rowIdx), format)
chk.offsetsOfRows = append(chk.offsetsOfRows, chk.offWrite+written)
n, err = rowInDisk{diskFormatRow: *format}.WriteTo(w)
written += n
if err != nil {
return
}
}
return
}
// getOffsetsOfRows gets the offset of each row.
func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows }
// rowInDisk represents a Row in format of diskFormatRow.
type rowInDisk struct {
numCol int
diskFormatRow
}
// WriteTo serializes a row of the chunk into the format of
// diskFormatRow, and writes to w.
func (row rowInDisk) WriteTo(w io.Writer) (written int64, err error) {
n, err := w.Write(i64SliceToBytes(row.sizesOfColumns))
written += int64(n)
if err != nil {
return
}
for _, data := range row.cells {
n, err = w.Write(data)
written += int64(n)
if err != nil {
return
}
}
return
}
// ReadFrom reads data of r, deserializes it from the format of diskFormatRow
// into Row.
func (row *rowInDisk) ReadFrom(r io.Reader) (n int64, err error) {
b := make([]byte, 8*row.numCol)
var n1 int
n1, err = io.ReadFull(r, b)
n += int64(n1)
if err != nil {
return
}
row.sizesOfColumns = bytesToI64Slice(b)
row.cells = make([][]byte, 0, row.numCol)
for _, size := range row.sizesOfColumns {
if size == -1 {
continue
}
cell := make([]byte, size)
row.cells = append(row.cells, cell)
n1, err = io.ReadFull(r, cell)
n += int64(n1)
if err != nil {
return
}
}
return
}
// diskFormatRow represents a row in a chunk in disk format. The disk format
// of a row is described in the doc of chunkInDisk.
type diskFormatRow struct {
// sizesOfColumns stores the size of each column in a row.
// -1 means the value of this column is null.
sizesOfColumns []int64 // -1 means null
// cells represents raw data of not-null columns in one row.
// In convertFromRow, data from Row is shallow copied to cells.
// In toMutRow, data in cells is shallow copied to MutRow.
cells [][]byte
}
// convertFromRow serializes one row of chunk to diskFormatRow, then
// we can use diskFormatRow to write to disk.
func convertFromRow(row Row, reuse *diskFormatRow) (format *diskFormatRow) {
numCols := row.Chunk().NumCols()
if reuse != nil {
format = reuse
format.sizesOfColumns = format.sizesOfColumns[:0]
format.cells = format.cells[:0]
} else {
format = &diskFormatRow{
sizesOfColumns: make([]int64, 0, numCols),
cells: make([][]byte, 0, numCols),
}
}
for colIdx := 0; colIdx < numCols; colIdx++ {
if row.IsNull(colIdx) {
format.sizesOfColumns = append(format.sizesOfColumns, -1)
} else {
cell := row.GetRaw(colIdx)
format.sizesOfColumns = append(format.sizesOfColumns, int64(len(cell)))
format.cells = append(format.cells, cell)
}
}
return
}
// toMutRow deserializes diskFormatRow to MutRow.
func (format *diskFormatRow) toMutRow(fields []*types.FieldType) MutRow {
chk := &Chunk{columns: make([]*Column, 0, len(format.sizesOfColumns))}
var cellOff int
for colIdx, size := range format.sizesOfColumns {
col := &Column{length: 1}
elemSize := getFixedLen(fields[colIdx])
if size == -1 { // isNull
col.nullBitmap = []byte{0}
if elemSize == varElemLen {
col.offsets = []int64{0, 0}
} else {
buf := make([]byte, elemSize)
col.data = buf
col.elemBuf = buf
}
} else {
col.nullBitmap = []byte{1}
col.data = format.cells[cellOff]
cellOff++
if elemSize == varElemLen {
col.offsets = []int64{0, int64(len(col.data))}
} else {
col.elemBuf = col.data
}
}
chk.columns = append(chk.columns, col)
}
return MutRow{c: chk}
}
You can’t perform that action at this time.