Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
mydump,restore: implement RFC 2, eliminate one data file read
Browse files Browse the repository at this point in the history
A few tests need to relaxed due to the coarser ID assignment
  • Loading branch information
kennytm committed Dec 22, 2018
1 parent c807b62 commit a9e7e1a
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 156 deletions.
126 changes: 23 additions & 103 deletions lightning/mydump/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@ package mydump
import (
"fmt"
"os"
"runtime"
"sort"
"sync"

"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pkg/errors"
)

Expand All @@ -18,8 +14,7 @@ type TableRegion struct {
Table string
File string

Columns []byte
Chunk Chunk
Chunk Chunk
}

func (reg *TableRegion) Name() string {
Expand Down Expand Up @@ -57,107 +52,32 @@ func (rs regionSlice) Less(i, j int) bool {

////////////////////////////////////////////////////////////////

type RegionFounder struct {
processors chan int
minRegionSize int64
}

func NewRegionFounder(minRegionSize int64) *RegionFounder {
concurrency := runtime.NumCPU() >> 1
if concurrency == 0 {
concurrency = 1
}

processors := make(chan int, concurrency)
for i := 0; i < concurrency; i++ {
processors <- i
}

return &RegionFounder{
processors: processors,
minRegionSize: minRegionSize,
}
}

func (f *RegionFounder) MakeTableRegions(meta *MDTableMeta) ([]*TableRegion, error) {
var lock sync.Mutex
var wg sync.WaitGroup

db := meta.DB
table := meta.Name
processors := f.processors
minRegionSize := f.minRegionSize

var chunkErr error

func MakeTableRegions(meta *MDTableMeta, columns int) ([]*TableRegion, error) {
// Split files into regions
filesRegions := make(regionSlice, 0, len(meta.DataFiles))
for _, dataFile := range meta.DataFiles {
wg.Add(1)
go func(pid int, file string) {
common.AppLogger.Debugf("[%s] loading file's region (%s) ...", table, file)

chunks, err := splitExactChunks(db, table, file, minRegionSize)
lock.Lock()
if err == nil {
filesRegions = append(filesRegions, chunks...)
} else {
chunkErr = errors.Annotatef(err, "%s", file)
common.AppLogger.Errorf("failed to extract chunks from file: %v", chunkErr)
}
lock.Unlock()

processors <- pid
wg.Done()
}(<-processors, dataFile)
}
wg.Wait()

if chunkErr != nil {
return nil, chunkErr
}

// Setup files' regions
sort.Sort(filesRegions) // ps : sort region by - (fileName, fileOffset)
var totalRowCount int64
for i, region := range filesRegions {
region.ID = i

// Every chunk's PrevRowIDMax was uninitialized (set to 0). We need to
// re-adjust the row IDs so they won't be overlapping.
chunkRowCount := region.Chunk.RowIDMax - region.Chunk.PrevRowIDMax
region.Chunk.PrevRowIDMax = totalRowCount
totalRowCount += chunkRowCount
region.Chunk.RowIDMax = totalRowCount
}

return filesRegions, nil
}

func splitExactChunks(db string, table string, file string, minChunkSize int64) ([]*TableRegion, error) {
reader, err := os.Open(file)
if err != nil {
return nil, errors.Trace(err)
}
defer reader.Close()

parser := NewChunkParser(reader)
chunks, err := parser.ReadChunks(minChunkSize)
if err != nil {
return nil, errors.Trace(err)
}

annotatedChunks := make([]*TableRegion, len(chunks))
for i, chunk := range chunks {
annotatedChunks[i] = &TableRegion{
ID: -1,
DB: db,
Table: table,
File: file,
Columns: parser.Columns,
Chunk: chunk,
prevRowIDMax := int64(0)
for i, dataFile := range meta.DataFiles {
dataFileInfo, err := os.Stat(dataFile)
if err != nil {
return nil, errors.Annotatef(err, "cannot stat %s", dataFile)
}
dataFileSize := dataFileInfo.Size()
rowIDMax := prevRowIDMax + dataFileSize/(int64(columns)+2)
filesRegions = append(filesRegions, &TableRegion{
ID: i,
DB: meta.DB,
Table: meta.Name,
File: dataFile,
Chunk: Chunk{
Offset: 0,
EndOffset: dataFileSize,
PrevRowIDMax: prevRowIDMax,
RowIDMax: rowIDMax,
},
})
prevRowIDMax = rowIDMax
}

return annotatedChunks, nil
return filesRegions, nil
}
27 changes: 12 additions & 15 deletions lightning/mydump/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,9 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) {
cfg := &config.Config{Mydumper: config.MydumperRuntime{SourceDir: "./examples"}}
loader, _ := NewMyDumpLoader(cfg)
dbMeta := loader.GetDatabases()[0]
founder := NewRegionFounder(defMinRegionSize)

for _, meta := range dbMeta.Tables {
regions, err := founder.MakeTableRegions(meta)
regions, err := MakeTableRegions(meta, 1)
c.Assert(err, IsNil)

table := meta.Name
Expand All @@ -61,19 +60,18 @@ func (s *testMydumpRegionSuite) TestTableRegion(c *C) {
c.Assert(err, IsNil)
tolFileSize += fileSize
}
// var tolRegionSize int64 = 0
// for _, region := range regions {
// tolRegionSize += region.Size()
// }
// c.Assert(tolRegionSize, Equals, tolFileSize)
// (The size will not be equal since the comments at the end are omitted)

// check - rows num
var tolRows int64 = 0
var tolRegionSize int64 = 0
for _, region := range regions {
tolRows += region.Rows()
tolRegionSize += region.Size()
}
c.Assert(tolRows, Equals, expectedTuplesCount[table])
c.Assert(tolRegionSize, Equals, tolFileSize)

// // check - rows num
// var tolRows int64 = 0
// for _, region := range regions {
// tolRows += region.Rows()
// }
// c.Assert(tolRows, Equals, expectedTuplesCount[table])

// check - range
regionNum := len(regions)
Expand All @@ -98,10 +96,9 @@ func (s *testMydumpRegionSuite) TestRegionReader(c *C) {
cfg := &config.Config{Mydumper: config.MydumperRuntime{SourceDir: "./examples"}}
loader, _ := NewMyDumpLoader(cfg)
dbMeta := loader.GetDatabases()[0]
founder := NewRegionFounder(defMinRegionSize)

for _, meta := range dbMeta.Tables {
regions, err := founder.MakeTableRegions(meta)
regions, err := MakeTableRegions(meta, 1)
c.Assert(err, IsNil)

tolValTuples := 0
Expand Down
76 changes: 42 additions & 34 deletions lightning/restore/restore.go
Original file line number Diff line number Diff line change
Expand Up @@ -492,7 +492,7 @@ func (t *TableRestore) restore(ctx context.Context, rc *RestoreController, cp *T
if len(cp.Chunks) > 0 {
common.AppLogger.Infof("[%s] reusing %d chunks from checkpoint", t.tableName, len(cp.Chunks))
} else if cp.Status < CheckpointStatusAllWritten {
if err := t.populateChunks(rc.cfg.Mydumper.MinRegionSize, cp, t.tableInfo); err != nil {
if err := t.populateChunks(rc.cfg.Mydumper.MinRegionSize, cp); err != nil {
return nil, errors.Trace(err)
}
if err := rc.checkpointsDB.InsertChunkCheckpoints(ctx, t.tableName, cp.Chunks); err != nil {
Expand Down Expand Up @@ -972,56 +972,58 @@ func (tr *TableRestore) Close() {

var tidbRowIDColumnRegex = regexp.MustCompile(fmt.Sprintf("`%[1]s`|(?i:\\b%[1]s\\b)", model.ExtraHandleName))

func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint, tableInfo *TidbTableInfo) error {
func (t *TableRestore) populateChunks(minChunkSize int64, cp *TableCheckpoint) error {
common.AppLogger.Infof("[%s] load chunks", t.tableName)
timer := time.Now()

founder := mydump.NewRegionFounder(minChunkSize)
chunks, err := founder.MakeTableRegions(t.tableMeta)
chunks, err := mydump.MakeTableRegions(t.tableMeta, t.tableInfo.Columns)
if err != nil {
return errors.Trace(err)
}

cp.Chunks = make([]*ChunkCheckpoint, 0, len(chunks))

for _, chunk := range chunks {
columns := chunk.Columns

shouldIncludeRowID := !tableInfo.core.PKIsHandle && !tidbRowIDColumnRegex.Match(columns)
if shouldIncludeRowID {
// we need to inject the _tidb_rowid column
if len(columns) != 0 {
// column listing already exists, just append the new column.
columns = append(columns[:len(columns)-1], (",`" + model.ExtraHandleName.String() + "`)")...)
} else {
// we need to recreate the columns
var buf bytes.Buffer
buf.WriteString("(`")
for _, columnInfo := range tableInfo.core.Columns {
buf.WriteString(columnInfo.Name.String())
buf.WriteString("`,`")
}
buf.WriteString(model.ExtraHandleName.String())
buf.WriteString("`)")
columns = buf.Bytes()
}
}

cp.Chunks = append(cp.Chunks, &ChunkCheckpoint{
Key: ChunkCheckpointKey{
Path: chunk.File,
Offset: chunk.Chunk.Offset,
},
Columns: columns,
ShouldIncludeRowID: shouldIncludeRowID,
Chunk: chunk.Chunk,
Columns: nil,
Chunk: chunk.Chunk,
})
}

common.AppLogger.Infof("[%s] load %d chunks takes %v", t.tableName, len(chunks), time.Since(timer))
return nil
}

func (t *TableRestore) initializeColumns(columns []byte, ccp *ChunkCheckpoint) {
shouldIncludeRowID := !t.tableInfo.core.PKIsHandle && !tidbRowIDColumnRegex.Match(columns)
if shouldIncludeRowID {
// we need to inject the _tidb_rowid column
if len(columns) != 0 {
// column listing already exists, just append the new column.
columns = append(columns[:len(columns)-1], (",`" + model.ExtraHandleName.String() + "`)")...)
} else {
// we need to recreate the columns
var buf bytes.Buffer
buf.WriteString("(`")
for _, columnInfo := range t.tableInfo.core.Columns {
buf.WriteString(columnInfo.Name.String())
buf.WriteString("`,`")
}
buf.WriteString(model.ExtraHandleName.String())
buf.WriteString("`)")
columns = buf.Bytes()
}
} else if columns == nil {
columns = []byte{}
}
ccp.Columns = columns
ccp.ShouldIncludeRowID = shouldIncludeRowID
}

func (tr *TableRestore) restoreTableMeta(ctx context.Context, db *sql.DB) error {
timer := time.Now()

Expand Down Expand Up @@ -1349,18 +1351,23 @@ func (cr *chunkRestore) restore(
buffer.Reset()
start := time.Now()

buffer.WriteString("INSERT INTO ")
buffer.WriteString(t.tableName)
buffer.Write(cr.chunk.Columns)
buffer.WriteString(" VALUES")
var sep byte = ' '
readLoop:
for cr.parser.Pos() < endOffset {
err := cr.parser.ReadRow()
switch errors.Cause(err) {
case nil:
buffer.WriteByte(sep)
sep = ','
if sep == ' ' {
buffer.WriteString("INSERT INTO ")
buffer.WriteString(t.tableName)
if cr.chunk.Columns == nil {
t.initializeColumns(cr.parser.Columns, cr.chunk)
}
buffer.Write(cr.chunk.Columns)
buffer.WriteString(" VALUES ")
sep = ','
}
lastRow := cr.parser.LastRow()
if cr.chunk.ShouldIncludeRowID {
buffer.Write(lastRow.Row[:len(lastRow.Row)-1])
Expand All @@ -1369,6 +1376,7 @@ func (cr *chunkRestore) restore(
buffer.Write(lastRow.Row)
}
case io.EOF:
cr.chunk.Chunk.EndOffset = cr.parser.Pos()
break readLoop
default:
return errors.Trace(err)
Expand Down
2 changes: 0 additions & 2 deletions tests/checkpoint_chunks/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ check_contains "count(i): $(($ROW_COUNT*$CHUNK_COUNT))"
check_contains "sum(i): $(( $ROW_COUNT*$CHUNK_COUNT*(($CHUNK_COUNT+2)*$ROW_COUNT + 1)/2 ))"
run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.table_v1 WHERE status >= 200"
check_contains "count(*): 1"
run_sql "SELECT count(*) FROM tidb_lightning_checkpoint_test_cpch.chunk_v3 WHERE pos = end_offset"
check_contains "count(*): $CHUNK_COUNT"

# Repeat, but using the file checkpoint
run_sql 'DROP DATABASE IF EXISTS cpch_tsr'
Expand Down
4 changes: 2 additions & 2 deletions tests/examples/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ check_contains 'sum(crc32(name)): 21388950023608'

# Ensure the AUTO_INCREMENT value is properly defined
run_sql "insert into mocker_test.tbl_autoid (name) values ('new');"
run_sql "select id from mocker_test.tbl_autoid where name = 'new';"
run_sql "select id > 10000 from mocker_test.tbl_autoid where name = 'new';"
check_not_contains '* 2. row *'
check_contains 'id: 10001'
check_contains 'id > 10000: 1'

run_sql 'select count(*), avg(age), max(name), min(name), sum(crc32(name)) from mocker_test.tbl_multi_index;'
check_contains 'count(*): 10000'
Expand Down

0 comments on commit a9e7e1a

Please sign in to comment.