Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Support multi-engine per table (batching) #113

Merged
merged 13 commits into from
Jan 14, 2019
Merged
26 changes: 19 additions & 7 deletions cmd/tidb-lightning-ctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"github.com/pingcap/tidb-lightning/lightning/kv"
"github.com/pingcap/tidb-lightning/lightning/restore"
"github.com/pkg/errors"
"github.com/satori/go.uuid"
)

func main() {
Expand Down Expand Up @@ -163,12 +162,15 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tableName s
}

for _, table := range targetTables {
if table.Engine == uuid.Nil {
continue
}
if closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, table.Engine); err == nil {
fmt.Fprintln(os.Stderr, "Cleaning up engine:", table.TableName, table.Engine)
closedEngine.Cleanup(ctx)
for engineID := 0; engineID < table.EnginesCount; engineID++ {
fmt.Fprintln(os.Stderr, "Closing and cleaning up engine:", table.TableName, engineID)
closedEngine, err := importer.UnsafeCloseEngine(ctx, table.TableName, engineID)
if err != nil {
fmt.Fprintln(os.Stderr, "* Encountered error while closing engine:", err)
lastErr = err
} else {
closedEngine.Cleanup(ctx)
}
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -193,6 +195,13 @@ func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string)
}
defer tablesFile.Close()

enginesFileName := path.Join(dumpFolder, "engines.csv")
enginesFile, err := os.Create(tablesFileName)
if err != nil {
return errors.Annotatef(err, "failed to create %s", enginesFileName)
}
defer enginesFile.Close()

chunksFileName := path.Join(dumpFolder, "chunks.csv")
chunksFile, err := os.Create(chunksFileName)
if err != nil {
Expand All @@ -203,6 +212,9 @@ func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string)
if err := cpdb.DumpTables(ctx, tablesFile); err != nil {
return errors.Trace(err)
}
if err := cpdb.DumpEngines(ctx, enginesFile); err != nil {
return errors.Trace(err)
}
if err := cpdb.DumpChunks(ctx, chunksFile); err != nil {
return errors.Trace(err)
}
Expand Down
36 changes: 36 additions & 0 deletions lightning/common/once_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package common

import (
"sync"
)

// OnceError is an error value which will can be assigned once.
//
// The zero value is ready for use.
type OnceError struct {
lock sync.Mutex
err error
}

// Set assigns an error to this instance, if `e != nil`.
//
// If this method is called multiple times, only the first call is effective.
func (oe *OnceError) Set(tag string, e error) {
if e != nil {
oe.lock.Lock()
if oe.err == nil {
oe.err = e
}
oe.lock.Unlock()
if !IsContextCanceledError(e) {
AppLogger.Errorf("[%s] error %v", tag, e)
}
}
}

// Get returns the first error value stored in this instance.
func (oe *OnceError) Get() error {
oe.lock.Lock()
defer oe.lock.Unlock()
return oe.err
}
45 changes: 45 additions & 0 deletions lightning/common/once_error_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package common_test

import (
"errors"
"testing"

. "github.com/pingcap/check"
"github.com/pingcap/tidb-lightning/lightning/common"
)

func TestCommon(t *testing.T) {
TestingT(t)
}

var _ = Suite(&onceErrorSuite{})

type onceErrorSuite struct{}

func (s *onceErrorSuite) TestOnceError(c *C) {
var err common.OnceError

c.Assert(err.Get(), IsNil)

err.Set("tag", nil)
c.Assert(err.Get(), IsNil)

e := errors.New("1")
err.Set("tag", e)
c.Assert(err.Get(), Equals, e)

e2 := errors.New("2")
err.Set("tag", e2)
c.Assert(err.Get(), Equals, e) // e, not e2.

err.Set("tag", nil)
c.Assert(err.Get(), Equals, e)

ch := make(chan struct{})
go func() {
err.Set("tag", nil)
ch <- struct{}{}
}()
<-ch
c.Assert(err.Get(), Equals, e)
}
26 changes: 12 additions & 14 deletions lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,16 +84,16 @@ type PostRestore struct {
}

type MydumperRuntime struct {
ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"`
MinRegionSize int64 `toml:"region-min-size" json:"region-min-size"`
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
NoSchema bool `toml:"no-schema" json:"no-schema"`
CharacterSet string `toml:"character-set" json:"character-set"`
ReadBlockSize int64 `toml:"read-block-size" json:"read-block-size"`
BatchSize int64 `toml:"batch-size" json:"batch-size"`
BatchImportRatio float64 `toml:"batch-import-ratio" json:"batch-import-ratio"`
SourceDir string `toml:"data-source-dir" json:"data-source-dir"`
NoSchema bool `toml:"no-schema" json:"no-schema"`
CharacterSet string `toml:"character-set" json:"character-set"`
}

type TikvImporter struct {
Addr string `toml:"addr" json:"addr"`
BatchSize int64 `toml:"batch-size" json:"batch-size"`
Addr string `toml:"addr" json:"addr"`
}

type Checkpoint struct {
Expand Down Expand Up @@ -187,8 +187,11 @@ func (cfg *Config) Load() error {
}

// handle mydumper
if cfg.Mydumper.MinRegionSize <= 0 {
cfg.Mydumper.MinRegionSize = MinRegionSize
if cfg.Mydumper.BatchSize <= 0 {
cfg.Mydumper.BatchSize = 100 * _G
}
if cfg.Mydumper.BatchImportRatio < 0.0 || cfg.Mydumper.BatchImportRatio >= 1.0 {
cfg.Mydumper.BatchImportRatio = 0.75
}
if cfg.Mydumper.ReadBlockSize <= 0 {
cfg.Mydumper.ReadBlockSize = ReadBlockSize
Expand All @@ -197,11 +200,6 @@ func (cfg *Config) Load() error {
cfg.Mydumper.CharacterSet = "auto"
}

// hendle kv import
if cfg.TikvImporter.BatchSize <= 0 {
cfg.TikvImporter.BatchSize = KVMaxBatchSize
}

if len(cfg.Checkpoint.Schema) == 0 {
cfg.Checkpoint.Schema = "tidb_lightning_checkpoint"
}
Expand Down
3 changes: 0 additions & 3 deletions lightning/config/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,4 @@ const (
MinRegionSize int64 = 256 * _M

BufferSizeScale = 5

// kv import
KVMaxBatchSize int64 = 200 * _G
)