Skip to content

Commit

Permalink
lightning: skip CREATE DATABASE when downstream exists, and return er…
Browse files Browse the repository at this point in the history
…ror when parse failed (#51801) (#52586)

close #51800
  • Loading branch information
ti-chi-bot committed Apr 15, 2024
1 parent a1b73d0 commit f3886fc
Show file tree
Hide file tree
Showing 9 changed files with 126 additions and 15 deletions.
4 changes: 4 additions & 0 deletions br/pkg/lightning/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ type CheckCtx struct {

// TargetInfoGetter defines the interfaces to get target information.
type TargetInfoGetter interface {
// FetchRemoteDBModels obtains the models of all databases. Currently, only
// the database name is filled.
FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error)

// FetchRemoteTableModels obtains the models of all tables given the schema
// name. The returned table info does not need to be precise if the encoder,
// is not requiring them, but must at least fill in the following fields for
Expand Down
5 changes: 5 additions & 0 deletions br/pkg/lightning/backend/local/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,11 @@ func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.T
}
}

// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface.
func (g *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return tikv.FetchRemoteDBModelsFromTLS(ctx, g.tls)
}

// FetchRemoteTableModels obtains the models of all tables given the schema name.
// It implements the `TargetInfoGetter` interface.
func (g *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
Expand Down
32 changes: 32 additions & 0 deletions br/pkg/lightning/backend/tidb/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,38 @@ func NewTargetInfoGetter(db *sql.DB) backend.TargetInfoGetter {
}
}

// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface.
func (b *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
results := []*model.DBInfo{}
logger := log.FromContext(ctx)
s := common.SQLWithRetry{
DB: b.db,
Logger: logger,
}
err := s.Transact(ctx, "fetch db models", func(_ context.Context, tx *sql.Tx) error {
results = results[:0]

rows, e := tx.Query("SHOW DATABASES")
if e != nil {
return e
}
defer rows.Close()

for rows.Next() {
var dbName string
if e := rows.Scan(&dbName); e != nil {
return e
}
dbInfo := &model.DBInfo{
Name: model.NewCIStr(dbName),
}
results = append(results, dbInfo)
}
return rows.Err()
})
return results, err
}

// FetchRemoteTableModels obtains the models of all tables given the schema name.
// It implements the `backend.TargetInfoGetter` interface.
// TODO: refactor
Expand Down
13 changes: 13 additions & 0 deletions br/pkg/lightning/importer/get_pre_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ type PreImportInfoGetter interface {

// TargetInfoGetter defines the operations to get information from target.
type TargetInfoGetter interface {
// FetchRemoteDBModels fetches the database structures from the remote target.
FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error)
// FetchRemoteTableModels fetches the table structures from the remote target.
FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error)
// CheckVersionRequirements performs the check whether the target satisfies the version requirements.
Expand Down Expand Up @@ -158,6 +160,11 @@ func NewTargetInfoGetterImpl(
}, nil
}

// FetchRemoteDBModels implements TargetInfoGetter.
func (g *TargetInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return g.backend.FetchRemoteDBModels(ctx)
}

// FetchRemoteTableModels fetches the table structures from the remote target.
// It implements the TargetInfoGetter interface.
func (g *TargetInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
Expand Down Expand Up @@ -800,6 +807,12 @@ func (p *PreImportInfoGetterImpl) IsTableEmpty(ctx context.Context, schemaName s
return p.targetInfoGetter.IsTableEmpty(ctx, schemaName, tableName)
}

// FetchRemoteDBModels fetches the database structures from the remote target.
// It implements the PreImportInfoGetter interface.
func (p *PreImportInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) {
return p.targetInfoGetter.FetchRemoteDBModels(ctx)
}

// FetchRemoteTableModels fetches the table structures from the remote target.
// It implements the PreImportInfoGetter interface.
func (p *PreImportInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) {
Expand Down
57 changes: 42 additions & 15 deletions br/pkg/lightning/importer/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,29 +580,47 @@ type restoreSchemaWorker struct {
func (worker *restoreSchemaWorker) addJob(sqlStr string, job *schemaJob) error {
stmts, err := createIfNotExistsStmt(worker.parser, sqlStr, job.dbName, job.tblName)
if err != nil {
worker.logger.Warn("failed to rewrite statement, will use raw input instead",
zap.String("db", job.dbName),
zap.String("table", job.tblName),
zap.String("statement", sqlStr),
zap.Error(err))
job.stmts = []string{sqlStr}
} else {
job.stmts = stmts
return errors.Trace(err)
}
job.stmts = stmts
return worker.appendJob(job)
}

func (worker *restoreSchemaWorker) makeJobs(
dbMetas []*mydump.MDDatabaseMeta,
getDBs func(context.Context) ([]*model.DBInfo, error),
getTables func(context.Context, string) ([]*model.TableInfo, error),
) error {
defer func() {
close(worker.jobCh)
worker.quit()
}()
var err error

if len(dbMetas) == 0 {
return nil
}

// 1. restore databases, execute statements concurrency

dbs, err := getDBs(worker.ctx)
if err != nil {
worker.logger.Warn("get databases from downstream failed", zap.Error(err))
}
dbSet := make(set.StringSet, len(dbs))
for _, db := range dbs {
dbSet.Insert(db.Name.L)
}

for _, dbMeta := range dbMetas {
// if downstream already has this database, we can skip ddl job
if dbSet.Exist(strings.ToLower(dbMeta.Name)) {
worker.logger.Info(
"database already exists in downstream, skip processing the source file",
zap.String("db", dbMeta.Name),
)
continue
}

sql := dbMeta.GetSchema(worker.ctx, worker.store)
err = worker.addJob(sql, &schemaJob{
dbName: dbMeta.Name,
Expand All @@ -617,18 +635,28 @@ func (worker *restoreSchemaWorker) makeJobs(
if err != nil {
return err
}

// 2. restore tables, execute statements concurrency

for _, dbMeta := range dbMetas {
// we can ignore error here, and let check failed later if schema not match
tables, _ := getTables(worker.ctx, dbMeta.Name)
tableMap := make(map[string]struct{})
tables, err := getTables(worker.ctx, dbMeta.Name)
if err != nil {
worker.logger.Warn("get tables from downstream failed", zap.Error(err))
}
tableSet := make(set.StringSet, len(tables))
for _, t := range tables {
tableMap[t.Name.L] = struct{}{}
tableSet.Insert(t.Name.L)
}
for _, tblMeta := range dbMeta.Tables {
if _, ok := tableMap[strings.ToLower(tblMeta.Name)]; ok {
if tableSet.Exist(strings.ToLower(tblMeta.Name)) {
// we already has this table in TiDB.
// we should skip ddl job and let SchemaValid check.
worker.logger.Info(
"table already exists in downstream, skip processing the source file",
zap.String("db", dbMeta.Name),
zap.String("table", tblMeta.Name),
)
continue
} else if tblMeta.SchemaFile.FileMeta.Path == "" {
return common.ErrSchemaNotExists.GenWithStackByArgs(dbMeta.Name, tblMeta.Name)
Expand Down Expand Up @@ -703,7 +731,6 @@ loop:
var err error
if session == nil {
session, err = func() (*sql.Conn, error) {
// TODO: support lightning in SQL
return worker.db.Conn(worker.ctx)
}()
if err != nil {
Expand Down Expand Up @@ -826,7 +853,7 @@ func (rc *Controller) restoreSchema(ctx context.Context) error {
for i := 0; i < concurrency; i++ {
go worker.doJob()
}
err := worker.makeJobs(rc.dbMetas, rc.preInfoGetter.FetchRemoteTableModels)
err := worker.makeJobs(rc.dbMetas, rc.preInfoGetter.FetchRemoteDBModels, rc.preInfoGetter.FetchRemoteTableModels)
logTask.End(zap.ErrorLevel, err)
if err != nil {
return err
Expand Down
9 changes: 9 additions & 0 deletions br/pkg/lightning/importer/mock/mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,15 @@ func (t *TargetInfo) SetTableInfo(schemaName string, tableName string, tblInfo *
t.dbTblInfoMap[schemaName][tableName] = tblInfo
}

// FetchRemoteDBModels implements the TargetInfoGetter interface.
func (t *TargetInfo) FetchRemoteDBModels(_ context.Context) ([]*model.DBInfo, error) {
resultInfos := []*model.DBInfo{}
for dbName := range t.dbTblInfoMap {
resultInfos = append(resultInfos, &model.DBInfo{Name: model.NewCIStr(dbName)})
}
return resultInfos, nil
}

// FetchRemoteTableModels fetches the table structures from the remote target.
// It implements the TargetInfoGetter interface.
func (t *TargetInfo) FetchRemoteTableModels(_ context.Context, schemaName string) ([]*model.TableInfo, error) {
Expand Down
4 changes: 4 additions & 0 deletions br/pkg/lightning/importer/restore_schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ func (s *restoreSchemaSuite) SetupTest() {
s.controller, s.ctx = gomock.WithContext(context.Background(), s.T())
mockTargetInfoGetter := mock.NewMockTargetInfoGetter(s.controller)
mockBackend := mock.NewMockBackend(s.controller)
mockTargetInfoGetter.EXPECT().
FetchRemoteDBModels(gomock.Any()).
AnyTimes().
Return([]*model.DBInfo{{Name: model.NewCIStr("fakedb")}}, nil)
mockTargetInfoGetter.EXPECT().
FetchRemoteTableModels(gomock.Any(), gomock.Any()).
AnyTimes().
Expand Down
15 changes: 15 additions & 0 deletions br/pkg/mock/backend.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions br/tests/lightning_character_sets/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ check_contains 's: 5291'
# test about unsupported charset in UTF-8 encoding dump files
# test local backend
run_lightning --config "tests/$TEST_NAME/greek.toml" -d "tests/$TEST_NAME/greek" 2>&1 | grep -q "Unknown character set: 'greek'"
# check TiDB does not receive the DDL
check_not_contains "greek" $TEST_DIR/tidb.log
run_sql 'DROP DATABASE IF EXISTS charsets;'
run_sql 'CREATE DATABASE charsets;'
run_sql 'CREATE TABLE charsets.greek (c VARCHAR(20) PRIMARY KEY);'
Expand Down

0 comments on commit f3886fc

Please sign in to comment.