Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: more glue #1

Merged
merged 1 commit into from Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 6 additions & 5 deletions cmd/tidb-lightning-ctl/main.go
Expand Up @@ -22,9 +22,10 @@ import (
"strconv"
"strings"

"github.com/google/uuid"
"github.com/pingcap/errors"
"github.com/pingcap/kvproto/pkg/import_sstpb"
"github.com/google/uuid"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"

kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/lightning/common"
Expand Down Expand Up @@ -172,7 +173,7 @@ func fetchMode(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
}

func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -182,7 +183,7 @@ func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string)
}

func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -192,7 +193,7 @@ func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName st
}

func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common.TLS, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
}

func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down
26 changes: 26 additions & 0 deletions lightning/checkpoints/checkpoints.go
Expand Up @@ -371,6 +371,32 @@ type CheckpointsDB interface {
DumpChunks(ctx context.Context, csv io.Writer) error
}

func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, error) {
if !cfg.Checkpoint.Enable {
return NewNullCheckpointsDB(), nil
}

switch cfg.Checkpoint.Driver {
case config.CheckpointDriverMySQL:
db, err := sql.Open("mysql", cfg.Checkpoint.DSN)
if err != nil {
return nil, errors.Trace(err)
}
cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema, cfg.TaskID)
if err != nil {
db.Close()
return nil, errors.Trace(err)
}
return cpdb, nil

case config.CheckpointDriverFile:
return NewFileCheckpointsDB(cfg.Checkpoint.DSN), nil

default:
return nil, errors.Errorf("Unknown checkpoint driver %s", cfg.Checkpoint.Driver)
}
}

// NullCheckpointsDB is a checkpoints database with no checkpoints.
type NullCheckpointsDB struct{}

Expand Down
7 changes: 7 additions & 0 deletions lightning/glue/glue.go
Expand Up @@ -20,7 +20,9 @@ import (
"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/log"
"go.uber.org/zap"
)
Expand All @@ -29,6 +31,7 @@ type Glue interface {
GetSQLExecutor() SQLExecutor
GetParser() *parser.Parser
GetTables(context.Context, string) ([]*model.TableInfo, error)
OpenCheckpointsDB(context.Context, *config.Config) (checkpoints.CheckpointsDB, error)
OwnsSQLExecutor() bool
}

Expand Down Expand Up @@ -83,6 +86,10 @@ func (e ExternalTiDBGlue) GetTables(context.Context, string) ([]*model.TableInfo
return nil, nil
}

func (e ExternalTiDBGlue) OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (checkpoints.CheckpointsDB, error) {
return checkpoints.OpenCheckpointsDB(ctx, cfg)
}

func (e ExternalTiDBGlue) OwnsSQLExecutor() bool {
return true
}
Expand Down
27 changes: 26 additions & 1 deletion lightning/lightning.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-lightning/lightning/glue"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/shurcooL/httpgzip"
"go.uber.org/zap"
Expand Down Expand Up @@ -185,6 +186,19 @@ func (l *Lightning) RunOnce() error {
return l.run(cfg)
}

func (l *Lightning) RunEmbeddedOnce(ctx context.Context, taskCfg *config.Config, logger *zap.Logger, g glue.Glue) error {
if err := taskCfg.Adjust(); err != nil {
return err
}
taskCfg.TaskID = time.Now().UnixNano()

l.shutdown()
l.ctx = ctx

log.SetAppLogger(logger)
return l.runWithGlue(taskCfg, g)
}

func (l *Lightning) RunServer() error {
l.taskCfgs = config.NewConfigList()
log.L().Info(
Expand All @@ -208,6 +222,17 @@ func (l *Lightning) RunServer() error {
var taskCfgRecorderKey struct{}

func (l *Lightning) run(taskCfg *config.Config) (err error) {
db, err := restore.DBFromConfig(taskCfg.TiDB)
failpoint.Inject("SkipRunTask", func() {
err = nil
})
if err != nil {
return err
}
return l.runWithGlue(taskCfg, glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode))
}

func (l *Lightning) runWithGlue(taskCfg *config.Config, g glue.Glue) (err error) {
common.PrintInfo("lightning", func() {
log.L().Info("cfg", zap.Stringer("cfg", taskCfg))
})
Expand Down Expand Up @@ -272,7 +297,7 @@ func (l *Lightning) run(taskCfg *config.Config) (err error) {
web.BroadcastInitProgress(dbMetas)

var procedure *restore.RestoreController
procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, s)
procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, s, g)
if err != nil {
log.L().Error("restore failed", log.ShortError(err))
return errors.Trace(err)
Expand Down
13 changes: 9 additions & 4 deletions lightning/lightning_test.go
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/glue"

"github.com/pingcap/tidb-lightning/lightning/mydump"

Expand Down Expand Up @@ -63,8 +64,10 @@ func (s *lightningSuite) TestRun(c *C) {
lightning := New(cfg)
err := lightning.RunOnce()
c.Assert(err, ErrorMatches, ".*mydumper dir does not exist")

invalidGlue := glue.NewExternalTiDBGlue(nil, 0)
path, _ := filepath.Abs(".")
err = lightning.run(&config.Config{
err = lightning.runWithGlue(&config.Config{
Mydumper: config.MydumperRuntime{
SourceDir: "file://" + filepath.ToSlash(path),
Filter: []string{"*.*"},
Expand All @@ -74,10 +77,10 @@ func (s *lightningSuite) TestRun(c *C) {
Enable: true,
Driver: "invalid",
},
})
}, invalidGlue)
c.Assert(err, ErrorMatches, "Unknown checkpoint driver invalid")

err = lightning.run(&config.Config{
err = lightning.runWithGlue(&config.Config{
Mydumper: config.MydumperRuntime{
SourceDir: ".",
Filter: []string{"*.*"},
Expand All @@ -87,7 +90,7 @@ func (s *lightningSuite) TestRun(c *C) {
Driver: "file",
DSN: "any-file",
},
})
}, invalidGlue)
c.Assert(err, NotNil)
}

Expand Down Expand Up @@ -240,6 +243,7 @@ func (s *lightningServerSuite) TestGetDeleteTask(c *C) {

// Check `GET /tasks` returns all tasks currently running

time.Sleep(100 * time.Millisecond)
c.Assert(getAllTasks(), DeepEquals, getAllResultType{
Current: first,
Queue: []int64{second, third},
Expand Down Expand Up @@ -333,6 +337,7 @@ func (s *lightningServerSuite) TestGetDeleteTask(c *C) {
c.Assert(resp.StatusCode, Equals, http.StatusOK)
resp.Body.Close()

time.Sleep(100 * time.Millisecond)
c.Assert(getAllTasks(), DeepEquals, getAllResultType{
Current: third,
Queue: []int64{},
Expand Down
5 changes: 5 additions & 0 deletions lightning/log/log.go
Expand Up @@ -106,6 +106,11 @@ func L() Logger {
return appLogger
}

// SetAppLogger replaces the default logger in this package to given one
func SetAppLogger(l *zap.Logger) {
appLogger = Logger{l.WithOptions(zap.AddStacktrace(zap.DPanicLevel))}
}

// Level returns the current global log level.
func Level() zapcore.Level {
return appLevel.Level()
Expand Down
56 changes: 16 additions & 40 deletions lightning/restore/restore.go
Expand Up @@ -15,7 +15,6 @@ package restore

import (
"context"
"database/sql"
"fmt"
"io"
"math"
Expand Down Expand Up @@ -157,8 +156,14 @@ type RestoreController struct {
checksumManager ChecksumManager
}

func NewRestoreController(ctx context.Context, dbMetas []*mydump.MDDatabaseMeta, cfg *config.Config, s storage.ExternalStorage) (*RestoreController, error) {
return NewRestoreControllerWithPauser(ctx, dbMetas, cfg, s, DeliverPauser)
func NewRestoreController(
ctx context.Context,
dbMetas []*mydump.MDDatabaseMeta,
cfg *config.Config,
s storage.ExternalStorage,
g glue.Glue,
) (*RestoreController, error) {
return NewRestoreControllerWithPauser(ctx, dbMetas, cfg, s, DeliverPauser, g)
}

func NewRestoreControllerWithPauser(
Expand All @@ -167,6 +172,7 @@ func NewRestoreControllerWithPauser(
cfg *config.Config,
s storage.ExternalStorage,
pauser *common.Pauser,
g glue.Glue,
) (*RestoreController, error) {
tls, err := cfg.ToTLS()
if err != nil {
Expand All @@ -176,7 +182,7 @@ func NewRestoreControllerWithPauser(
return nil, err
}

cpdb, err := OpenCheckpointsDB(ctx, cfg)
cpdb, err := g.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -189,12 +195,6 @@ func NewRestoreControllerWithPauser(
return nil, errors.Trace(err)
}

tidbMgr, err := NewTiDBManager(cfg.TiDB, tls)
if err != nil {
return nil, errors.Trace(err)
}
tidbGlue := glue.NewExternalTiDBGlue(tidbMgr.db, cfg.TiDB.SQLMode)

var backend kv.Backend
switch cfg.TikvImporter.Backend {
case config.BackendImporter:
Expand All @@ -204,9 +204,12 @@ func NewRestoreControllerWithPauser(
return nil, err
}
case config.BackendTiDB:
backend = kv.NewTiDBBackend(tidbMgr.db, cfg.TikvImporter.OnDuplicate)
db, err := DBFromConfig(cfg.TiDB)
if err != nil {
return nil, errors.Trace(err)
}
backend = kv.NewTiDBBackend(db, cfg.TikvImporter.OnDuplicate)
case config.BackendLocal:
// TODO(lance6717): check that glue could substitute PdAddr and tls (HTTP API)
backend, err = kv.NewLocalBackend(ctx, tls, cfg.TiDB.PdAddr, cfg.TikvImporter.RegionSplitSize,
cfg.TikvImporter.SortedKVDir, cfg.TikvImporter.RangeConcurrency, cfg.TikvImporter.SendKVPairs,
cfg.Checkpoint.Enable)
Expand All @@ -226,7 +229,7 @@ func NewRestoreControllerWithPauser(
ioWorkers: worker.NewPool(ctx, cfg.App.IOConcurrency, "io"),
pauser: pauser,
backend: backend,
tidbGlue: tidbGlue,
tidbGlue: g,
rowFormatVer: "1",
tls: tls,

Expand All @@ -241,33 +244,6 @@ func NewRestoreControllerWithPauser(
return rc, nil
}

func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, error) {
if !cfg.Checkpoint.Enable {
return NewNullCheckpointsDB(), nil
}

switch cfg.Checkpoint.Driver {
case config.CheckpointDriverMySQL:
// TODO(lance6716): introduce glue or add "glue" checkpoint
db, err := sql.Open("mysql", cfg.Checkpoint.DSN)
if err != nil {
return nil, errors.Trace(err)
}
cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema, cfg.TaskID)
if err != nil {
db.Close()
return nil, errors.Trace(err)
}
return cpdb, nil

case config.CheckpointDriverFile:
return NewFileCheckpointsDB(cfg.Checkpoint.DSN), nil

default:
return nil, errors.Errorf("Unknown checkpoint driver %s", cfg.Checkpoint.Driver)
}
}

func (rc *RestoreController) Close() {
rc.backend.Close()
if rc.tidbGlue.OwnsSQLExecutor() {
Expand Down
10 changes: 9 additions & 1 deletion lightning/restore/tidb.go
Expand Up @@ -63,7 +63,7 @@ func isUnknownSystemVariableErr(err error) bool {
return code == mysql.ErrUnknownSystemVariable
}

func NewTiDBManager(dsn config.DBStore, tls *common.TLS) (*TiDBManager, error) {
func DBFromConfig(dsn config.DBStore) (*sql.DB, error) {
param := common.MySQLConnectParam{
Host: dsn.Host,
Port: dsn.Port,
Expand Down Expand Up @@ -96,6 +96,14 @@ func NewTiDBManager(dsn config.DBStore, tls *common.TLS) (*TiDBManager, error) {
return nil, errors.Trace(err)
}
}
return db, nil
}

func NewTiDBManager(dsn config.DBStore, tls *common.TLS) (*TiDBManager, error) {
db, err := DBFromConfig(dsn)
if err != nil {
return nil, errors.Trace(err)
}

return NewTiDBManagerWithDB(db, dsn.SQLMode), nil
}
Expand Down