Skip to content
This repository has been archived by the owner on Aug 21, 2023. It is now read-only.

Commit

Permalink
fix snapshot problem (#111)
Browse files Browse the repository at this point in the history
* hotfix snapshot

* update code

* fix again

* use session param to reset db after setup

* address comment

* address comment
  • Loading branch information
lichunzhu committed Jun 30, 2020
1 parent b18f65e commit ff92fcf
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 104 deletions.
2 changes: 1 addition & 1 deletion cmd/dumpling/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,10 +181,10 @@ func main() {
conf.CsvNullValue = csvNullValue
conf.Sql = sql
conf.TableFilter = tableFilter
conf.TiDBMemQuotaQuery = tidbMemQuotaQuery
conf.Security.CAPath = caPath
conf.Security.CertPath = certPath
conf.Security.KeyPath = keyPath
conf.SessionParams["tidb_mem_quota_query"] = tidbMemQuotaQuery

err = export.Dump(context.Background(), conf)
if err != nil {
Expand Down
7 changes: 2 additions & 5 deletions v4/export/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type Config struct {
FileType string
EscapeBackslash bool
DumpEmptyDatabase bool
TiDBMemQuotaQuery uint64
SessionParams map[string]interface{}
}

func DefaultConfig() *Config {
Expand Down Expand Up @@ -86,15 +86,12 @@ func DefaultConfig() *Config {
Sql: "",
TableFilter: allFilter,
DumpEmptyDatabase: true,
TiDBMemQuotaQuery: UnspecifiedSize,
SessionParams: make(map[string]interface{}),
}
}

func (conf *Config) getDSN(db string) string {
dsn := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?charset=utf8mb4", conf.User, conf.Password, conf.Host, conf.Port, db)
if conf.TiDBMemQuotaQuery != UnspecifiedSize {
dsn += fmt.Sprintf("&tidb_mem_quota_query=%v", conf.TiDBMemQuotaQuery)
}
if len(conf.Security.CAPath) > 0 {
dsn += "&tls=dumpling-tls-target"
}
Expand Down
40 changes: 4 additions & 36 deletions v4/export/consistency.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ func NewConsistencyController(conf *Config, session *sql.DB) (ConsistencyControl
allTables: conf.Tables,
}, nil
case "snapshot":
return &ConsistencySnapshot{
serverType: conf.ServerInfo.ServerType,
snapshot: conf.Snapshot,
db: session,
}, nil
if conf.ServerInfo.ServerType != ServerTypeTiDB {
return nil, withStack(errors.New("snapshot consistency is not supported for this server"))
}
return &ConsistencyNone{}, nil
case "none":
return &ConsistencyNone{}, nil
default:
Expand Down Expand Up @@ -92,40 +91,9 @@ func (c *ConsistencyLockDumpingTables) TearDown() error {
return UnlockTables(c.db)
}

type ConsistencySnapshot struct {
serverType ServerType
snapshot string
db *sql.DB
}

const showMasterStatusFieldNum = 5
const snapshotFieldIndex = 1

func (c *ConsistencySnapshot) Setup() error {
if c.serverType != ServerTypeTiDB {
return withStack(errors.New("snapshot consistency is not supported for this server"))
}
if c.snapshot == "" {
str, err := ShowMasterStatus(c.db, showMasterStatusFieldNum)
if err != nil {
return err
}
c.snapshot = str[snapshotFieldIndex]
}
hasTiKV, err := CheckTiDBWithTiKV(c.db)
if err != nil {
return err
}
if !hasTiKV {
return nil
}
return SetTiDBSnapshot(c.db, c.snapshot)
}

func (c *ConsistencySnapshot) TearDown() error {
return nil
}

func resolveAutoConsistency(conf *Config) {
if conf.Consistency != "auto" {
return
Expand Down
20 changes: 3 additions & 17 deletions v4/export/consistency_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,23 +49,10 @@ func (s *testConsistencySuite) TestConsistencyController(c *C) {

conf.Consistency = "snapshot"
conf.ServerInfo.ServerType = ServerTypeTiDB
conf.Snapshot = "" // let dumpling detect the TSO
rows := sqlmock.NewRows([]string{"File", "Position", "Binlog_Do_DB", "Binlog_Ignore_DB", "Executed_Gtid_Set"})
rows.AddRow("tidb-binlog", "413802961528946688", "", "", "")
tidbRows := sqlmock.NewRows([]string{"c"})
tidbRows.AddRow(1)
mock.ExpectQuery("SHOW MASTER STATUS").WillReturnRows(rows)
mock.ExpectQuery("SELECT COUNT\\(1\\) as c FROM MYSQL.TiDB WHERE VARIABLE_NAME='tikv_gc_safe_point'").
WillReturnRows(tidbRows)
mock.ExpectExec("SET SESSION tidb_snapshot").
WillReturnResult(sqlmock.NewResult(0, 1))
ctrl, _ = NewConsistencyController(conf, db)
_, ok = ctrl.(*ConsistencySnapshot)
_, ok = ctrl.(*ConsistencyNone)
c.Assert(ok, IsTrue)
s.assertLifetimeErrNil(ctrl, c)
if err = mock.ExpectationsWereMet(); err != nil {
c.Fatalf(err.Error())
}

conf.Consistency = "lock"
conf.Tables = NewDatabaseTables().
Expand Down Expand Up @@ -119,14 +106,13 @@ func (s *testConsistencySuite) TestConsistencyControllerError(c *C) {
// snapshot consistency is only available in TiDB
conf.Consistency = "snapshot"
conf.ServerInfo.ServerType = ServerTypeUnknown
ctrl, _ := NewConsistencyController(conf, db)
err = ctrl.Setup()
_, err = NewConsistencyController(conf, db)
c.Assert(err, NotNil)

// flush consistency is unavailable in TiDB
conf.Consistency = "flush"
conf.ServerInfo.ServerType = ServerTypeTiDB
ctrl, _ = NewConsistencyController(conf, db)
ctrl, _ := NewConsistencyController(conf, db)
err = ctrl.Setup()
c.Assert(err, NotNil)

Expand Down
85 changes: 45 additions & 40 deletions v4/export/dump.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package export
import (
"context"
"database/sql"
"errors"
"strconv"
"strings"
"time"
Expand All @@ -28,6 +29,7 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
}
}
}()

pool, err := sql.Open("mysql", conf.getDSN(""))
if err != nil {
return withStack(err)
Expand All @@ -36,41 +38,9 @@ func Dump(pCtx context.Context, conf *Config) (err error) {

conf.ServerInfo, err = detectServerInfo(pool)
if err != nil {
if strings.Contains(err.Error(), "tidb_mem_quota_query") {
conf.TiDBMemQuotaQuery = UnspecifiedSize
pool, err = sql.Open("mysql", conf.getDSN(""))
if err != nil {
return withStack(err)
}
conf.ServerInfo, err = detectServerInfo(pool)
if err != nil {
return withStack(err)
}
} else {
return withStack(err)
}
}

databases, err := prepareDumpingDatabases(conf, pool)
if err != nil {
return err
}

conf.Tables, err = listAllTables(pool, databases)
if err != nil {
return err
}

if !conf.NoViews {
views, err := listAllViews(pool, databases)
if err != nil {
return err
}
conf.Tables.Merge(views)
return withStack(err)
}

filterTables(conf)

ctx, cancel := context.WithCancel(pCtx)
defer cancel()

Expand All @@ -95,13 +65,23 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
}
}

if conf.Snapshot == "" && (doPdGC || conf.Consistency == "flush") {
if conf.Snapshot == "" {
str, err := ShowMasterStatus(pool, showMasterStatusFieldNum)
if err != nil {
return err
}
conf.Snapshot = str[snapshotFieldIndex]
if conf.Snapshot == "" && (doPdGC || conf.Consistency == "snapshot") {
conf.Snapshot, err = getSnapshot(pool)
if err != nil {
return err
}
}

if conf.Snapshot != "" {
if conf.ServerInfo.ServerType != ServerTypeTiDB {
return errors.New("snapshot consistency is not supported for this server")
}
hasTiKV, err := CheckTiDBWithTiKV(pool)
if err != nil {
return err
}
if hasTiKV {
conf.SessionParams["tidb_snapshot"] = conf.Snapshot
}
}

Expand All @@ -118,6 +98,31 @@ func Dump(pCtx context.Context, conf *Config) (err error) {
"After dumping: run sql `update mysql.tidb set VARIABLE_VALUE = '10m' where VARIABLE_NAME = 'tikv_gc_life_time';` in tidb.\n")
}

pool, err = resetDBWithSessionParams(pool, conf.getDSN(""), conf.SessionParams)
if err != nil {
return err
}

databases, err := prepareDumpingDatabases(conf, pool)
if err != nil {
return err
}

conf.Tables, err = listAllTables(pool, databases)
if err != nil {
return err
}

if !conf.NoViews {
views, err := listAllViews(pool, databases)
if err != nil {
return err
}
conf.Tables.Merge(views)
}

filterTables(conf)

conCtrl, err := NewConsistencyController(conf, pool)
if err != nil {
return err
Expand Down
4 changes: 4 additions & 0 deletions v4/export/prepare.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ func adjustConfig(conf *Config) error {
// Disable filesize if rows was set
conf.FileSize = UnspecifiedSize
}
if conf.SessionParams == nil {
conf.SessionParams = make(map[string]interface{})
}
resolveAutoConsistency(conf)

return nil
}
Expand Down
54 changes: 49 additions & 5 deletions v4/export/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package export
import (
"database/sql"
"fmt"
"net/url"
"strconv"
"strings"

Expand Down Expand Up @@ -384,11 +385,6 @@ func GetTiDBDDLIDs(db *sql.DB) ([]string, error) {
return GetSpecifiedColumnValue(rows, "DDL_ID")
}

func SetTiDBSnapshot(db *sql.DB, snapshot string) error {
_, err := db.Exec("SET SESSION tidb_snapshot = ?", snapshot)
return withStack(err)
}

func CheckTiDBWithTiKV(db *sql.DB) (bool, error) {
var count int
handleOneRow := func(rows *sql.Rows) error {
Expand All @@ -401,6 +397,54 @@ func CheckTiDBWithTiKV(db *sql.DB) (bool, error) {
return count > 0, nil
}

func getSnapshot(db *sql.DB) (string, error) {
str, err := ShowMasterStatus(db, showMasterStatusFieldNum)
if err != nil {
return "", err
}
return str[snapshotFieldIndex], nil
}

func isUnknownSystemVariableErr(err error) bool {
return strings.Contains(err.Error(), "Unknown system variable")
}

func resetDBWithSessionParams(db *sql.DB, dsn string, params map[string]interface{}) (*sql.DB, error) {
support := make(map[string]interface{})
for k, v := range params {
s := fmt.Sprintf("SET SESSION %s = ?", k)
_, err := db.Exec(s, v)
if err != nil {
if isUnknownSystemVariableErr(err) {
log.Info("session variable is not supported by db", zap.String("variable", k), zap.Reflect("value", v))
continue
}
return nil, withStack(err)
}

support[k] = v
}

for k, v := range support {
var s string
if str, ok := v.(string); ok {
s = wrapStringWith(str, "'")
} else {
s = fmt.Sprintf("%v", v)
}
dsn += fmt.Sprintf("&%s=%s", k, url.QueryEscape(s))
}

newDB, err := sql.Open("mysql", dsn)
if err != nil {
return nil, withStack(err)
}

db.Close()

return newDB, nil
}

func buildSelectField(db *sql.DB, dbName, tableName string) (string, error) {
query := `SELECT COLUMN_NAME,EXTRA FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA=? AND TABLE_NAME=?;`
rows, err := db.Query(query, dbName, tableName)
Expand Down

0 comments on commit ff92fcf

Please sign in to comment.