Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

Commit

Permalink
restore: add glue.Glue interface and other function (#456)
Browse files Browse the repository at this point in the history
* save my work

* add notes

* save work

* save work

* fix unit test

* remove tidbMgr in RestoreController

* remove some comments

* remove some comments

* change logger in SQLWithRetry

* revert replace log.Logger to *zap.Logger

* replace tab to space

* try another port to fix CI

* remove some comment

* *: more glue

* report info to host TiDB

* fix CI

* address comment

* address comment

* rename a method in interface

* save work

* try fix CI

* could work

* change ctx usage

* try fix CI

* try fix CI

* refine function interface

* refine some fucntion interface

* debug CI

* address comment

* remove debug log

* address comment
  • Loading branch information
lance6716 committed Nov 16, 2020
1 parent 7e17f97 commit a78137f
Show file tree
Hide file tree
Showing 14 changed files with 367 additions and 166 deletions.
9 changes: 5 additions & 4 deletions cmd/tidb-lightning-ctl/main.go
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/kvproto/pkg/import_sstpb"

kv "github.com/pingcap/tidb-lightning/lightning/backend"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/restore"
Expand Down Expand Up @@ -172,7 +173,7 @@ func fetchMode(ctx context.Context, cfg *config.Config, tls *common.TLS) error {
}

func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -182,7 +183,7 @@ func checkpointRemove(ctx context.Context, cfg *config.Config, tableName string)
}

func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -192,7 +193,7 @@ func checkpointErrorIgnore(ctx context.Context, cfg *config.Config, tableName st
}

func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common.TLS, tableName string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -264,7 +265,7 @@ func checkpointErrorDestroy(ctx context.Context, cfg *config.Config, tls *common
}

func checkpointDump(ctx context.Context, cfg *config.Config, dumpFolder string) error {
cpdb, err := restore.OpenCheckpointsDB(ctx, cfg)
cpdb, err := checkpoints.OpenCheckpointsDB(ctx, cfg)
if err != nil {
return errors.Trace(err)
}
Expand Down
28 changes: 18 additions & 10 deletions cmd/tidb-lightning/main.go
Expand Up @@ -14,6 +14,7 @@
package main

import (
"context"
"fmt"
"os"
"os/signal"
Expand All @@ -28,10 +29,10 @@ import (
)

func main() {
cfg := config.Must(config.LoadGlobalConfig(os.Args[1:], nil))
fmt.Fprintf(os.Stdout, "Verbose debug logs will be written to %s\n\n", cfg.App.Config.File)
globalCfg := config.Must(config.LoadGlobalConfig(os.Args[1:], nil))
fmt.Fprintf(os.Stdout, "Verbose debug logs will be written to %s\n\n", globalCfg.App.Config.File)

app := lightning.New(cfg)
app := lightning.New(globalCfg)

sc := make(chan os.Signal, 1)
signal.Notify(sc,
Expand Down Expand Up @@ -59,7 +60,7 @@ func main() {
//
// Local mode need much more memory than importer/tidb mode, if the gc percentage is too high,
// lightning memory usage will also be high.
if cfg.TikvImporter.Backend != config.BackendLocal {
if globalCfg.TikvImporter.Backend != config.BackendLocal {
gogc := os.Getenv("GOGC")
if gogc == "" {
old := debug.SetGCPercent(500)
Expand All @@ -74,11 +75,18 @@ func main() {
return
}

if cfg.App.ServerMode {
err = app.RunServer()
} else {
err = app.RunOnce()
}
err = func() error {
if globalCfg.App.ServerMode {
return app.RunServer()
} else {
cfg := config.NewConfig()
if err := cfg.LoadFromGlobal(globalCfg); err != nil {
return err
}
return app.RunOnce(context.Background(), cfg, nil, nil)
}
}()

if err != nil {
logger.Error("tidb lightning encountered error stack info", zap.Error(err))
logger.Error("tidb lightning encountered error", log.ShortError(err))
Expand All @@ -89,7 +97,7 @@ func main() {
}

// call Sync() with log to stdout may return error in some case, so just skip it
if cfg.App.File != "" {
if globalCfg.App.File != "" {
syncErr := logger.Sync()
if syncErr != nil {
fmt.Fprintln(os.Stderr, "sync log failed", syncErr)
Expand Down
26 changes: 26 additions & 0 deletions lightning/checkpoints/checkpoints.go
Expand Up @@ -371,6 +371,32 @@ type CheckpointsDB interface {
DumpChunks(ctx context.Context, csv io.Writer) error
}

func OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (CheckpointsDB, error) {
if !cfg.Checkpoint.Enable {
return NewNullCheckpointsDB(), nil
}

switch cfg.Checkpoint.Driver {
case config.CheckpointDriverMySQL:
db, err := sql.Open("mysql", cfg.Checkpoint.DSN)
if err != nil {
return nil, errors.Trace(err)
}
cpdb, err := NewMySQLCheckpointsDB(ctx, db, cfg.Checkpoint.Schema, cfg.TaskID)
if err != nil {
db.Close()
return nil, errors.Trace(err)
}
return cpdb, nil

case config.CheckpointDriverFile:
return NewFileCheckpointsDB(cfg.Checkpoint.DSN), nil

default:
return nil, errors.Errorf("Unknown checkpoint driver %s", cfg.Checkpoint.Driver)
}
}

// NullCheckpointsDB is a checkpoints database with no checkpoints.
type NullCheckpointsDB struct{}

Expand Down
110 changes: 110 additions & 0 deletions lightning/glue/glue.go
@@ -0,0 +1,110 @@
// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package glue

import (
"context"
"database/sql"

"github.com/pingcap/parser"
"github.com/pingcap/parser/model"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb-lightning/lightning/checkpoints"
"github.com/pingcap/tidb-lightning/lightning/common"
"github.com/pingcap/tidb-lightning/lightning/config"
"github.com/pingcap/tidb-lightning/lightning/log"
"go.uber.org/zap"
)

type Glue interface {
OwnsSQLExecutor() bool
GetSQLExecutor() SQLExecutor
GetDB() (*sql.DB, error)
GetParser() *parser.Parser
GetTables(context.Context, string) ([]*model.TableInfo, error)
OpenCheckpointsDB(context.Context, *config.Config) (checkpoints.CheckpointsDB, error)
// Record is used to report some information (key, value) to host TiDB, including progress, stage currently
Record(string, uint64)
}

type SQLExecutor interface {
ExecuteWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) error
ObtainStringWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) (string, error)
Close()
}

type ExternalTiDBGlue struct {
db *sql.DB
parser *parser.Parser
}

func NewExternalTiDBGlue(db *sql.DB, sqlMode mysql.SQLMode) *ExternalTiDBGlue {
p := parser.New()
p.SetSQLMode(sqlMode)

return &ExternalTiDBGlue{db: db, parser: p}
}

func (e *ExternalTiDBGlue) GetSQLExecutor() SQLExecutor {
return e
}

func (e *ExternalTiDBGlue) ExecuteWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) error {
sql := common.SQLWithRetry{
DB: e.db,
Logger: log.Logger{Logger: logger},
}
return sql.Exec(ctx, purpose, query)
}

func (e *ExternalTiDBGlue) ObtainStringWithLog(ctx context.Context, query string, purpose string, logger *zap.Logger) (string, error) {
var s string
err := common.SQLWithRetry{
DB: e.db,
Logger: log.Logger{Logger: logger},
}.QueryRow(ctx, purpose, query, &s)
return s, err
}

func (e *ExternalTiDBGlue) GetDB() (*sql.DB, error) {
return e.db, nil
}

func (e *ExternalTiDBGlue) GetParser() *parser.Parser {
return e.parser
}

func (e *ExternalTiDBGlue) GetTables(context.Context, string) ([]*model.TableInfo, error) {
return nil, nil
}

func (e *ExternalTiDBGlue) OpenCheckpointsDB(ctx context.Context, cfg *config.Config) (checkpoints.CheckpointsDB, error) {
return checkpoints.OpenCheckpointsDB(ctx, cfg)
}

func (e *ExternalTiDBGlue) OwnsSQLExecutor() bool {
return true
}

func (e *ExternalTiDBGlue) Close() {
e.db.Close()
}

func (e *ExternalTiDBGlue) Record(string, uint64) {
}

const (
RecordEstimatedChunk = "EstimatedChunk"
RecordFinishedChunk = "FinishedChunk"
)

0 comments on commit a78137f

Please sign in to comment.