Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: add glue.Glue interface and other function #456

Merged
merged 42 commits into from Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b4aeb1e
save my work
lance6716 Nov 2, 2020
a5666a6
add notes
lance6716 Nov 4, 2020
5b79eda
save work
lance6716 Nov 6, 2020
51e8c0e
save work
lance6716 Nov 6, 2020
02f3ac5
fix unit test
lance6716 Nov 6, 2020
84d27f3
remove tidbMgr in RestoreController
lance6716 Nov 6, 2020
114d7f1
remove some comments
lance6716 Nov 6, 2020
d13bc83
remove some comments
lance6716 Nov 6, 2020
22d2f4e
change logger in SQLWithRetry
lance6716 Nov 6, 2020
c55f5aa
Merge branch 'master' into glue
lance6716 Nov 6, 2020
d0d01b9
revert replace log.Logger to *zap.Logger
lance6716 Nov 8, 2020
4fc56ec
Merge branch 'glue' of github.com:lance6716/tidb-lightning into glue
lance6716 Nov 8, 2020
e85c0b2
replace tab to space
lance6716 Nov 9, 2020
5811245
try another port to fix CI
lance6716 Nov 9, 2020
d75e0b9
remove some comment
lance6716 Nov 9, 2020
19d3aa5
*: more glue
lance6716 Nov 9, 2020
c264392
report info to host TiDB
lance6716 Nov 9, 2020
f50ecb0
Merge pull request #1 from lance6716/glue-more
lance6716 Nov 9, 2020
9e595e8
Merge branch 'master' into glue
lance6716 Nov 9, 2020
dbee87d
fix CI
lance6716 Nov 9, 2020
e081a6d
Merge branch 'glue-more' into glue
lance6716 Nov 9, 2020
a4b9fc8
Merge branch 'master' of https://github.com/pingcap/tidb-lightning in…
lance6716 Nov 10, 2020
f184e32
Merge branch 'master' into glue
lance6716 Nov 11, 2020
6e4da0a
address comment
lance6716 Nov 12, 2020
679b62d
Merge branch 'glue' of github.com:lance6716/tidb-lightning into glue
lance6716 Nov 12, 2020
832f65c
address comment
lance6716 Nov 12, 2020
4140470
Merge branch 'master' into glue
lance6716 Nov 12, 2020
33150d2
rename a method in interface
lance6716 Nov 12, 2020
7b368d8
save work
lance6716 Nov 12, 2020
aa921dd
try fix CI
lance6716 Nov 12, 2020
71581a3
could work
lance6716 Nov 12, 2020
8fb2745
change ctx usage
lance6716 Nov 12, 2020
481dbf4
try fix CI
lance6716 Nov 12, 2020
04a0b7f
try fix CI
lance6716 Nov 13, 2020
8978d8b
refine function interface
lance6716 Nov 13, 2020
70188c4
refine some fucntion interface
lance6716 Nov 13, 2020
15376ff
debug CI
lance6716 Nov 13, 2020
ff5c065
address comment
lance6716 Nov 13, 2020
1b446e5
Merge branch 'master' into glue
lance6716 Nov 13, 2020
14cc270
remove debug log
lance6716 Nov 15, 2020
1da5464
Merge branch 'master' into glue
lance6716 Nov 16, 2020
e880ad1
address comment
lance6716 Nov 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/tidb-lightning-ctl/main.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"

kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/restore"
Expand Down Expand Up @@ -172,7 +173,7 @@ func fetchMode(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
}

func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -182,7 +183,7 @@ func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string)
}

func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -192,7 +193,7 @@ func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName st
}

func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common.TLS, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
}

func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down
26 changes: 26 additions & 0 deletions lightning/checkpoints/checkpoints.go
Expand Up @@ -371,6 +371,32 @@ type CheckpointsDB interface {
DumpChunks(ctx context.Context, csv io.Writer) error
}

func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, error) {
if !cfg.Checkpoint.Enable {
return NewNullCheckpointsDB(), nil
}

switch cfg.Checkpoint.Driver {
case config.CheckpointDriverMySQL:
db, err := sql.Open("mysql", cfg.Checkpoint.DSN)
if err != nil {
return nil, errors.Trace(err)
}
cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema, cfg.TaskID)
if err != nil {
db.Close()
return nil, errors.Trace(err)
}
return cpdb, nil

case config.CheckpointDriverFile:
return NewFileCheckpointsDB(cfg.Checkpoint.DSN), nil

default:
return nil, errors.Errorf("Unknown checkpoint driver %s", cfg.Checkpoint.Driver)
}
}

// NullCheckpointsDB is a checkpoints database with no checkpoints.
type NullCheckpointsDB struct{}

Expand Down
120 changes: 120 additions & 0 deletions lightning/glue/glue.go
@@ -0,0 +1,120 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package glue

import (
"context"
"database/sql"

"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/log"
"go.uber.org/zap"
)

type Glue interface {
OwnsSQLExecutor() bool
GetSQLExecutor() SQLExecutor
GetParser() *parser.Parser
GetTables(context.Context, string) ([]*model.TableInfo, error)
OpenCheckpointsDB(context.Context, *config.Config) (checkpoints.CheckpointsDB, error)
Record(string, uint64)
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}

type SQLExecutor interface {
ExecuteWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) error
ObtainStringLog(ctx context.Context, query string, purpose string, logger *zap.Logger) (string, error)
Close()
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why pass a logger through the interface?

What about passing a logger when we creating an SQLExecutor instance?

func NewExternalTiDBGlue(db *sql.DB, sqlMode mysql.SQLMode, logger *zap.Logger) *ExternalTiDBGlue { ... }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some functions need a logger from Task, and that Task is often built from logger with different fields. It seems that pinning a logger at New is not very flexible


type ExternalTiDBGlue struct {
db *sql.DB
parser *parser.Parser
}

func NewExternalTiDBGlue(db *sql.DB, sqlMode mysql.SQLMode) *ExternalTiDBGlue {
p := parser.New()
p.SetSQLMode(sqlMode)

return &ExternalTiDBGlue{db: db, parser: p}
}

func (e ExternalTiDBGlue) GetSQLExecutor() SQLExecutor {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
return e
}

func (e ExternalTiDBGlue) ExecuteWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) error {
sql := common.SQLWithRetry{
DB: e.db,
Logger: log.Logger{Logger: logger},
}
return sql.Exec(ctx, purpose, query)
}

func (e ExternalTiDBGlue) ObtainStringLog(ctx context.Context, query string, purpose string, logger *zap.Logger) (string, error) {
var s string
err := common.SQLWithRetry{
DB: e.db,
Logger: log.Logger{Logger: logger},
}.QueryRow(ctx, purpose, query, &s)
return s, err
}

func (e ExternalTiDBGlue) GetParser() *parser.Parser {
return e.parser
}

func (e ExternalTiDBGlue) GetDB() *sql.DB {
return e.db
}

func (e ExternalTiDBGlue) GetTables(context.Context, string) ([]*model.TableInfo, error) {
return nil, nil
}

func (e ExternalTiDBGlue) OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (checkpoints.CheckpointsDB, error) {
return checkpoints.OpenCheckpointsDB(ctx, cfg)
}

func (e ExternalTiDBGlue) OwnsSQLExecutor() bool {
return true
}

func (e ExternalTiDBGlue) Close() {
e.db.Close()
}

func (e ExternalTiDBGlue) Record(string, uint64) {
}

type ImportStage uint64

const (
StageWriting ImportStage = iota
StagePostProcessing
)

func (s ImportStage) String() string {
switch s {
case StageWriting:
return "Writing"
case StagePostProcessing:
return "PostProcessing"
}
return ""
}
30 changes: 28 additions & 2 deletions lightning/lightning.go
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/pingcap/br/pkg/storage"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-lightning/lightning/glue"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/shurcooL/httpgzip"
"go.uber.org/zap"
Expand Down Expand Up @@ -185,6 +186,19 @@ func (l *Lightning) RunOnce() error {
return l.run(cfg)
}

func (l *Lightning) RunEmbeddedOnce(ctx context.Context, taskCfg *config.Config, logger *zap.Logger, g glue.Glue) error {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if err := taskCfg.Adjust(); err != nil {
return err
}
taskCfg.TaskID = time.Now().UnixNano()

l.shutdown()
l.ctx = ctx

log.SetAppLogger(logger)
return l.run(taskCfg, g)
}

func (l *Lightning) RunServer() error {
l.taskCfgs = config.NewConfigList()
log.L().Info(
Expand All @@ -207,7 +221,7 @@ func (l *Lightning) RunServer() error {

var taskCfgRecorderKey struct{}

func (l *Lightning) run(taskCfg *config.Config) (err error) {
func (l *Lightning) run(taskCfg *config.Config, glues ...glue.Glue) (err error) {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
common.PrintInfo("lightning", func() {
log.L().Info("cfg", zap.Stringer("cfg", taskCfg))
})
Expand Down Expand Up @@ -271,8 +285,20 @@ func (l *Lightning) run(taskCfg *config.Config) (err error) {
dbMetas := mdl.GetDatabases()
web.BroadcastInitProgress(dbMetas)

if err = taskCfg.TiDB.Security.RegisterMySQL(); err != nil {
return errors.Trace(err)
}

if len(glues) == 0 {
db, err := restore.DBFromConfig(taskCfg.TiDB)
if err != nil {
return errors.Trace(err)
}
glues = append(glues, glue.NewExternalTiDBGlue(db, taskCfg.TiDB.SQLMode))
}

var procedure *restore.RestoreController
procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, s)
procedure, err = restore.NewRestoreController(ctx, dbMetas, taskCfg, s, glues[0])
if err != nil {
log.L().Error("restore failed", log.ShortError(err))
return errors.Trace(err)
Expand Down
9 changes: 7 additions & 2 deletions lightning/lightning_test.go
Expand Up @@ -25,7 +25,7 @@ import (
"time"

"github.com/pingcap/tidb-lightning/lightning/checkpoints"

"github.com/pingcap/tidb-lightning/lightning/glue"
"github.com/pingcap/tidb-lightning/lightning/mydump"

. "github.com/pingcap/check"
Expand Down Expand Up @@ -63,6 +63,9 @@ func (s *lightningSuite) TestRun(c *C) {
lightning := New(cfg)
err := lightning.RunOnce()
c.Assert(err, ErrorMatches, ".*mydumper dir does not exist")

// to bypass error from db.Ping()
invalidGlue := glue.NewExternalTiDBGlue(nil, 0)
path, _ := filepath.Abs(".")
err = lightning.run(&config.Config{
Mydumper: config.MydumperRuntime{
Expand All @@ -74,7 +77,7 @@ func (s *lightningSuite) TestRun(c *C) {
Enable: true,
Driver: "invalid",
},
})
}, invalidGlue)
c.Assert(err, ErrorMatches, "Unknown checkpoint driver invalid")

err = lightning.run(&config.Config{
Expand Down Expand Up @@ -240,6 +243,7 @@ func (s *lightningServerSuite) TestGetDeleteTask(c *C) {

// Check `GET /tasks` returns all tasks currently running

time.Sleep(100 * time.Millisecond)
c.Assert(getAllTasks(), DeepEquals, getAllResultType{
Current: first,
Queue: []int64{second, third},
Expand Down Expand Up @@ -333,6 +337,7 @@ func (s *lightningServerSuite) TestGetDeleteTask(c *C) {
c.Assert(resp.StatusCode, Equals, http.StatusOK)
resp.Body.Close()

time.Sleep(100 * time.Millisecond)
c.Assert(getAllTasks(), DeepEquals, getAllResultType{
Current: third,
Queue: []int64{},
Expand Down
5 changes: 5 additions & 0 deletions lightning/log/log.go
Expand Up @@ -106,6 +106,11 @@ func L() Logger {
return appLogger
}

// SetAppLogger replaces the default logger in this package to given one
func SetAppLogger(l *zap.Logger) {
appLogger = Logger{l.WithOptions(zap.AddStacktrace(zap.DPanicLevel))}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why still need to change the log config here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

keep same with

// Do not log stack traces at all, as we'll get the stack trace from the
// error itself.
appLogger = Logger{logger.WithOptions(zap.AddStacktrace(zap.DPanicLevel))}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They are not the same. #96~98 is the default operation to build an app log. But if we build a logger and set it as global, the caller should take responsibility to set all options and lightning shouldn't override its config anymore.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's not override, just create a new logger for lightning's specification, and the comment remains reasonable IMO

}

// Level returns the current global log level.
func Level() zapcore.Level {
return appLevel.Level()
Expand Down
9 changes: 7 additions & 2 deletions lightning/restore/checksum.go
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/pingcap/br/pkg/checksum"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb-lightning/lightning/glue"
tidbcfg "github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/store/tikv"
Expand Down Expand Up @@ -84,7 +85,11 @@ func newChecksumManager(rc *RestoreController) (ChecksumManager, error) {

manager = newTiKVChecksumManager(store.(tikv.Storage).GetClient(), pdCli)
} else {
manager = newTiDBChecksumExecutor(rc.tidbMgr.db)
e, ok := rc.tidbGlue.GetSQLExecutor().(glue.ExternalTiDBGlue)
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
if !ok {
return nil, errors.New("can't use lightning via SQL with PD version less than v4.0.0")
}
manager = newTiDBChecksumExecutor(e.GetDB())
}

return manager, nil
Expand Down Expand Up @@ -138,7 +143,7 @@ func (e *tidbChecksumExecutor) Checksum(ctx context.Context, tableInfo *TidbTabl

// DoChecksum do checksum for tables.
// table should be in <db>.<table>, format. e.g. foo.bar
func DoChecksum(ctx context.Context, db *sql.DB, table *TidbTableInfo) (*RemoteChecksum, error) {
func DoChecksum(ctx context.Context, table *TidbTableInfo) (*RemoteChecksum, error) {
var err error
manager, ok := ctx.Value(&checksumManagerKey).(ChecksumManager)
if !ok {
Expand Down
10 changes: 5 additions & 5 deletions lightning/restore/checksum_test.go
Expand Up @@ -58,7 +58,7 @@ func (s *checksumSuite) TestDoChecksum(c *C) {
mock.ExpectClose()

ctx := MockDoChecksumCtx(db)
checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"})
checksum, err := DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
c.Assert(err, IsNil)
c.Assert(*checksum, DeepEquals, RemoteChecksum{
Schema: "test",
Expand Down Expand Up @@ -103,7 +103,7 @@ func (s *checksumSuite) TestDoChecksumParallel(c *C) {
for i := 0; i < 5; i++ {
go func() {
defer wg.Done()
checksum, err := DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"})
checksum, err := DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
c.Assert(err, IsNil)
c.Assert(*checksum, DeepEquals, RemoteChecksum{
Schema: "test",
Expand Down Expand Up @@ -142,7 +142,7 @@ func (s *checksumSuite) TestIncreaseGCLifeTimeFail(c *C) {
wg.Add(5)
for i := 0; i < 5; i++ {
go func() {
_, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"})
_, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
c.Assert(err, ErrorMatches, "update GC lifetime failed: update gc error: context canceled")
wg.Done()
}()
Expand Down Expand Up @@ -173,7 +173,7 @@ func (s *checksumSuite) TestDoChecksumWithTikv(c *C) {

startTs := oracle.ComposeTS(time.Now().Unix()*1000, 0)
ctx := context.WithValue(context.Background(), &checksumManagerKey, checksumExec)
_, err = DoChecksum(ctx, nil, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
_, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t", Core: tableInfo})
c.Assert(err, IsNil)

// after checksum, safepint should be small than start ts
Expand All @@ -197,7 +197,7 @@ func (s *checksumSuite) TestDoChecksumWithErrorAndLongOriginalLifetime(c *C) {
mock.ExpectClose()

ctx := MockDoChecksumCtx(db)
_, err = DoChecksum(ctx, db, &TidbTableInfo{DB: "test", Name: "t"})
_, err = DoChecksum(ctx, &TidbTableInfo{DB: "test", Name: "t"})
c.Assert(err, ErrorMatches, "compute remote checksum failed: mock syntax error.*")

c.Assert(db.Close(), IsNil)
Expand Down