Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

restore: add glue.Glue interface and other function #456

Merged
merged 42 commits into from Nov 16, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
b4aeb1e
save my work
lance6716 Nov 2, 2020
a5666a6
add notes
lance6716 Nov 4, 2020
5b79eda
save work
lance6716 Nov 6, 2020
51e8c0e
save work
lance6716 Nov 6, 2020
02f3ac5
fix unit test
lance6716 Nov 6, 2020
84d27f3
remove tidbMgr in RestoreController
lance6716 Nov 6, 2020
114d7f1
remove some comments
lance6716 Nov 6, 2020
d13bc83
remove some comments
lance6716 Nov 6, 2020
22d2f4e
change logger in SQLWithRetry
lance6716 Nov 6, 2020
c55f5aa
Merge branch 'master' into glue
lance6716 Nov 6, 2020
d0d01b9
revert replace log.Logger to *zap.Logger
lance6716 Nov 8, 2020
4fc56ec
Merge branch 'glue' of github.com:lance6716/tidb-lightning into glue
lance6716 Nov 8, 2020
e85c0b2
replace tab to space
lance6716 Nov 9, 2020
5811245
try another port to fix CI
lance6716 Nov 9, 2020
d75e0b9
remove some comment
lance6716 Nov 9, 2020
19d3aa5
*: more glue
lance6716 Nov 9, 2020
c264392
report info to host TiDB
lance6716 Nov 9, 2020
f50ecb0
Merge pull request #1 from lance6716/glue-more
lance6716 Nov 9, 2020
9e595e8
Merge branch 'master' into glue
lance6716 Nov 9, 2020
dbee87d
fix CI
lance6716 Nov 9, 2020
e081a6d
Merge branch 'glue-more' into glue
lance6716 Nov 9, 2020
a4b9fc8
Merge branch 'master' of https://github.com/pingcap/tidb-lightning in…
lance6716 Nov 10, 2020
f184e32
Merge branch 'master' into glue
lance6716 Nov 11, 2020
6e4da0a
address comment
lance6716 Nov 12, 2020
679b62d
Merge branch 'glue' of github.com:lance6716/tidb-lightning into glue
lance6716 Nov 12, 2020
832f65c
address comment
lance6716 Nov 12, 2020
4140470
Merge branch 'master' into glue
lance6716 Nov 12, 2020
33150d2
rename a method in interface
lance6716 Nov 12, 2020
7b368d8
save work
lance6716 Nov 12, 2020
aa921dd
try fix CI
lance6716 Nov 12, 2020
71581a3
could work
lance6716 Nov 12, 2020
8fb2745
change ctx usage
lance6716 Nov 12, 2020
481dbf4
try fix CI
lance6716 Nov 12, 2020
04a0b7f
try fix CI
lance6716 Nov 13, 2020
8978d8b
refine function interface
lance6716 Nov 13, 2020
70188c4
refine some fucntion interface
lance6716 Nov 13, 2020
15376ff
debug CI
lance6716 Nov 13, 2020
ff5c065
address comment
lance6716 Nov 13, 2020
1b446e5
Merge branch 'master' into glue
lance6716 Nov 13, 2020
14cc270
remove debug log
lance6716 Nov 15, 2020
1da5464
Merge branch 'master' into glue
lance6716 Nov 16, 2020
e880ad1
address comment
lance6716 Nov 16, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 5 additions & 4 deletions cmd/tidb-lightning-ctl/main.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"

kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/restore"
Expand Down Expand Up @@ -172,7 +173,7 @@ func fetchMode(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
}

func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -182,7 +183,7 @@ func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string)
}

func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -192,7 +193,7 @@ func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName st
}

func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common.TLS, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
}

func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down
28 changes: 18 additions & 10 deletions cmd/tidb-lightning/main.go
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
Expand All @@ -28,10 +29,10 @@ import (
)

func main() {
cfg := config.Must(config.LoadGlobalConfig(os.Args[1:], nil))
fmt.Fprintf(os.Stdout, "Verbose debug logs will be written to %s\n\n", cfg.App.Config.File)
globalCfg := config.Must(config.LoadGlobalConfig(os.Args[1:], nil))
fmt.Fprintf(os.Stdout, "Verbose debug logs will be written to %s\n\n", globalCfg.App.Config.File)

app := lightning.New(cfg)
app := lightning.New(globalCfg)

sc := make(chan os.Signal, 1)
signal.Notify(sc,
Expand Down Expand Up @@ -59,7 +60,7 @@ func main() {
//
// Local mode need much more memory than importer/tidb mode, if the gc percentage is too high,
// lightning memory usage will also be high.
if cfg.TikvImporter.Backend != config.BackendLocal {
if globalCfg.TikvImporter.Backend != config.BackendLocal {
gogc := os.Getenv("GOGC")
if gogc == "" {
old := debug.SetGCPercent(500)
Expand All @@ -74,11 +75,18 @@ func main() {
return
}

if cfg.App.ServerMode {
err = app.RunServer()
} else {
err = app.RunOnce()
}
err = func() error {
glorv marked this conversation as resolved.
Show resolved Hide resolved
if globalCfg.App.ServerMode {
return app.RunServer()
} else {
cfg := config.NewConfig()
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return err
}
return app.RunOnce(context.Background(), cfg, nil, nil)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have thought about it again. Since there is already a context in the Lightning struct RunOnce method can always depend on the app.ctx instead of another ctx as parameter?

Copy link
Contributor Author

@lance6716 lance6716 Nov 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In future usage, lightning may be New once (controlled by inner l.ctx) and that instance RunOnce for many times, like a HTTP server mode. So a task context will help

}
}()

if err != nil {
logger.Error("tidb lightning encountered error stack info", zap.Error(err))
logger.Error("tidb lightning encountered error", log.ShortError(err))
Expand All @@ -89,7 +97,7 @@ func main() {
}

// call Sync() with log to stdout may return error in some case, so just skip it
if cfg.App.File != "" {
if globalCfg.App.File != "" {
syncErr := logger.Sync()
if syncErr != nil {
fmt.Fprintln(os.Stderr, "sync log failed", syncErr)
Expand Down
26 changes: 26 additions & 0 deletions lightning/checkpoints/checkpoints.go
Expand Up @@ -371,6 +371,32 @@ type CheckpointsDB interface {
DumpChunks(ctx context.Context, csv io.Writer) error
}

func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, error) {
if !cfg.Checkpoint.Enable {
return NewNullCheckpointsDB(), nil
}

switch cfg.Checkpoint.Driver {
case config.CheckpointDriverMySQL:
db, err := sql.Open("mysql", cfg.Checkpoint.DSN)
if err != nil {
return nil, errors.Trace(err)
}
cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema, cfg.TaskID)
if err != nil {
db.Close()
return nil, errors.Trace(err)
}
return cpdb, nil

case config.CheckpointDriverFile:
return NewFileCheckpointsDB(cfg.Checkpoint.DSN), nil

default:
return nil, errors.Errorf("Unknown checkpoint driver %s", cfg.Checkpoint.Driver)
}
}

// NullCheckpointsDB is a checkpoints database with no checkpoints.
type NullCheckpointsDB struct{}

Expand Down
110 changes: 110 additions & 0 deletions lightning/glue/glue.go
@@ -0,0 +1,110 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package glue

import (
"context"
"database/sql"

"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/log"
"go.uber.org/zap"
)

type Glue interface {
OwnsSQLExecutor() bool
GetSQLExecutor() SQLExecutor
GetDB() (*sql.DB, error)
GetParser() *parser.Parser
GetTables(context.Context, string) ([]*model.TableInfo, error)
OpenCheckpointsDB(context.Context, *config.Config) (checkpoints.CheckpointsDB, error)
// Record is used to report some information (key, value) to host TiDB, including progress, stage currently
Record(string, uint64)
kennytm marked this conversation as resolved.
Show resolved Hide resolved
}

type SQLExecutor interface {
ExecuteWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) error
ObtainStringWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) (string, error)
Close()
}

type ExternalTiDBGlue struct {
db *sql.DB
parser *parser.Parser
}

func NewExternalTiDBGlue(db *sql.DB, sqlMode mysql.SQLMode) *ExternalTiDBGlue {
p := parser.New()
p.SetSQLMode(sqlMode)

return &ExternalTiDBGlue{db: db, parser: p}
}

func (e *ExternalTiDBGlue) GetSQLExecutor() SQLExecutor {
return e
}

func (e *ExternalTiDBGlue) ExecuteWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) error {
sql := common.SQLWithRetry{
DB: e.db,
Logger: log.Logger{Logger: logger},
}
return sql.Exec(ctx, purpose, query)
}

func (e *ExternalTiDBGlue) ObtainStringWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) (string, error) {
var s string
err := common.SQLWithRetry{
DB: e.db,
Logger: log.Logger{Logger: logger},
}.QueryRow(ctx, purpose, query, &s)
return s, err
}

func (e *ExternalTiDBGlue) GetDB() (*sql.DB, error) {
return e.db, nil
}

func (e *ExternalTiDBGlue) GetParser() *parser.Parser {
return e.parser
}

func (e *ExternalTiDBGlue) GetTables(context.Context, string) ([]*model.TableInfo, error) {
return nil, nil
}

func (e *ExternalTiDBGlue) OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (checkpoints.CheckpointsDB, error) {
return checkpoints.OpenCheckpointsDB(ctx, cfg)
}

func (e *ExternalTiDBGlue) OwnsSQLExecutor() bool {
return true
}

func (e *ExternalTiDBGlue) Close() {
e.db.Close()
}

func (e *ExternalTiDBGlue) Record(string, uint64) {
}

const (
RecordEstimatedChunk = "EstimatedChunk"
RecordFinishedChunk = "FinishedChunk"
)