From f3886fc17282e949fde76bde4d94457392e18ebb Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Mon, 15 Apr 2024 14:24:05 +0800 Subject: [PATCH] lightning: skip CREATE DATABASE when downstream exists, and return error when parse failed (#51801) (#52586) close pingcap/tidb#51800 --- br/pkg/lightning/backend/backend.go | 4 ++ br/pkg/lightning/backend/local/local.go | 5 ++ br/pkg/lightning/backend/tidb/tidb.go | 32 +++++++++++ br/pkg/lightning/importer/get_pre_info.go | 13 +++++ br/pkg/lightning/importer/import.go | 57 ++++++++++++++----- br/pkg/lightning/importer/mock/mock.go | 9 +++ .../lightning/importer/restore_schema_test.go | 4 ++ br/pkg/mock/backend.go | 15 +++++ br/tests/lightning_character_sets/run.sh | 2 + 9 files changed, 126 insertions(+), 15 deletions(-) diff --git a/br/pkg/lightning/backend/backend.go b/br/pkg/lightning/backend/backend.go index b31f88324eba8..9c99212930845 100644 --- a/br/pkg/lightning/backend/backend.go +++ b/br/pkg/lightning/backend/backend.go @@ -106,6 +106,10 @@ type CheckCtx struct { // TargetInfoGetter defines the interfaces to get target information. type TargetInfoGetter interface { + // FetchRemoteDBModels obtains the models of all databases. Currently, only + // the database name is filled. + FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) + // FetchRemoteTableModels obtains the models of all tables given the schema // name. The returned table info does not need to be precise if the encoder, // is not requiring them, but must at least fill in the following fields for diff --git a/br/pkg/lightning/backend/local/local.go b/br/pkg/lightning/backend/local/local.go index 94090385b5cd3..5e2c6ddd72831 100644 --- a/br/pkg/lightning/backend/local/local.go +++ b/br/pkg/lightning/backend/local/local.go @@ -279,6 +279,11 @@ func NewTargetInfoGetter(tls *common.TLS, db *sql.DB, pdCli pd.Client) backend.T } } +// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface. +func (g *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + return tikv.FetchRemoteDBModelsFromTLS(ctx, g.tls) +} + // FetchRemoteTableModels obtains the models of all tables given the schema name. // It implements the `TargetInfoGetter` interface. func (g *targetInfoGetter) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index 774191d48431d..a67605219e724 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -136,6 +136,38 @@ func NewTargetInfoGetter(db *sql.DB) backend.TargetInfoGetter { } } +// FetchRemoteDBModels implements the `backend.TargetInfoGetter` interface. +func (b *targetInfoGetter) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + results := []*model.DBInfo{} + logger := log.FromContext(ctx) + s := common.SQLWithRetry{ + DB: b.db, + Logger: logger, + } + err := s.Transact(ctx, "fetch db models", func(_ context.Context, tx *sql.Tx) error { + results = results[:0] + + rows, e := tx.Query("SHOW DATABASES") + if e != nil { + return e + } + defer rows.Close() + + for rows.Next() { + var dbName string + if e := rows.Scan(&dbName); e != nil { + return e + } + dbInfo := &model.DBInfo{ + Name: model.NewCIStr(dbName), + } + results = append(results, dbInfo) + } + return rows.Err() + }) + return results, err +} + // FetchRemoteTableModels obtains the models of all tables given the schema name. // It implements the `backend.TargetInfoGetter` interface. // TODO: refactor diff --git a/br/pkg/lightning/importer/get_pre_info.go b/br/pkg/lightning/importer/get_pre_info.go index 191fed628bdf6..c11041dfaa07e 100644 --- a/br/pkg/lightning/importer/get_pre_info.go +++ b/br/pkg/lightning/importer/get_pre_info.go @@ -91,6 +91,8 @@ type PreImportInfoGetter interface { // TargetInfoGetter defines the operations to get information from target. type TargetInfoGetter interface { + // FetchRemoteDBModels fetches the database structures from the remote target. + FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) // FetchRemoteTableModels fetches the table structures from the remote target. FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) // CheckVersionRequirements performs the check whether the target satisfies the version requirements. @@ -158,6 +160,11 @@ func NewTargetInfoGetterImpl( }, nil } +// FetchRemoteDBModels implements TargetInfoGetter. +func (g *TargetInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + return g.backend.FetchRemoteDBModels(ctx) +} + // FetchRemoteTableModels fetches the table structures from the remote target. // It implements the TargetInfoGetter interface. func (g *TargetInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { @@ -800,6 +807,12 @@ func (p *PreImportInfoGetterImpl) IsTableEmpty(ctx context.Context, schemaName s return p.targetInfoGetter.IsTableEmpty(ctx, schemaName, tableName) } +// FetchRemoteDBModels fetches the database structures from the remote target. +// It implements the PreImportInfoGetter interface. +func (p *PreImportInfoGetterImpl) FetchRemoteDBModels(ctx context.Context) ([]*model.DBInfo, error) { + return p.targetInfoGetter.FetchRemoteDBModels(ctx) +} + // FetchRemoteTableModels fetches the table structures from the remote target. // It implements the PreImportInfoGetter interface. func (p *PreImportInfoGetterImpl) FetchRemoteTableModels(ctx context.Context, schemaName string) ([]*model.TableInfo, error) { diff --git a/br/pkg/lightning/importer/import.go b/br/pkg/lightning/importer/import.go index b5cd1262fa98b..17cc060a1b32a 100644 --- a/br/pkg/lightning/importer/import.go +++ b/br/pkg/lightning/importer/import.go @@ -580,29 +580,47 @@ type restoreSchemaWorker struct { func (worker *restoreSchemaWorker) addJob(sqlStr string, job *schemaJob) error { stmts, err := createIfNotExistsStmt(worker.parser, sqlStr, job.dbName, job.tblName) if err != nil { - worker.logger.Warn("failed to rewrite statement, will use raw input instead", - zap.String("db", job.dbName), - zap.String("table", job.tblName), - zap.String("statement", sqlStr), - zap.Error(err)) - job.stmts = []string{sqlStr} - } else { - job.stmts = stmts + return errors.Trace(err) } + job.stmts = stmts return worker.appendJob(job) } func (worker *restoreSchemaWorker) makeJobs( dbMetas []*mydump.MDDatabaseMeta, + getDBs func(context.Context) ([]*model.DBInfo, error), getTables func(context.Context, string) ([]*model.TableInfo, error), ) error { defer func() { close(worker.jobCh) worker.quit() }() - var err error + + if len(dbMetas) == 0 { + return nil + } + // 1. restore databases, execute statements concurrency + + dbs, err := getDBs(worker.ctx) + if err != nil { + worker.logger.Warn("get databases from downstream failed", zap.Error(err)) + } + dbSet := make(set.StringSet, len(dbs)) + for _, db := range dbs { + dbSet.Insert(db.Name.L) + } + for _, dbMeta := range dbMetas { + // if downstream already has this database, we can skip ddl job + if dbSet.Exist(strings.ToLower(dbMeta.Name)) { + worker.logger.Info( + "database already exists in downstream, skip processing the source file", + zap.String("db", dbMeta.Name), + ) + continue + } + sql := dbMeta.GetSchema(worker.ctx, worker.store) err = worker.addJob(sql, &schemaJob{ dbName: dbMeta.Name, @@ -617,18 +635,28 @@ func (worker *restoreSchemaWorker) makeJobs( if err != nil { return err } + // 2. restore tables, execute statements concurrency + for _, dbMeta := range dbMetas { // we can ignore error here, and let check failed later if schema not match - tables, _ := getTables(worker.ctx, dbMeta.Name) - tableMap := make(map[string]struct{}) + tables, err := getTables(worker.ctx, dbMeta.Name) + if err != nil { + worker.logger.Warn("get tables from downstream failed", zap.Error(err)) + } + tableSet := make(set.StringSet, len(tables)) for _, t := range tables { - tableMap[t.Name.L] = struct{}{} + tableSet.Insert(t.Name.L) } for _, tblMeta := range dbMeta.Tables { - if _, ok := tableMap[strings.ToLower(tblMeta.Name)]; ok { + if tableSet.Exist(strings.ToLower(tblMeta.Name)) { // we already has this table in TiDB. // we should skip ddl job and let SchemaValid check. + worker.logger.Info( + "table already exists in downstream, skip processing the source file", + zap.String("db", dbMeta.Name), + zap.String("table", tblMeta.Name), + ) continue } else if tblMeta.SchemaFile.FileMeta.Path == "" { return common.ErrSchemaNotExists.GenWithStackByArgs(dbMeta.Name, tblMeta.Name) @@ -703,7 +731,6 @@ loop: var err error if session == nil { session, err = func() (*sql.Conn, error) { - // TODO: support lightning in SQL return worker.db.Conn(worker.ctx) }() if err != nil { @@ -826,7 +853,7 @@ func (rc *Controller) restoreSchema(ctx context.Context) error { for i := 0; i < concurrency; i++ { go worker.doJob() } - err := worker.makeJobs(rc.dbMetas, rc.preInfoGetter.FetchRemoteTableModels) + err := worker.makeJobs(rc.dbMetas, rc.preInfoGetter.FetchRemoteDBModels, rc.preInfoGetter.FetchRemoteTableModels) logTask.End(zap.ErrorLevel, err) if err != nil { return err diff --git a/br/pkg/lightning/importer/mock/mock.go b/br/pkg/lightning/importer/mock/mock.go index 6b0809729e1ef..d09fc83bb3925 100644 --- a/br/pkg/lightning/importer/mock/mock.go +++ b/br/pkg/lightning/importer/mock/mock.go @@ -212,6 +212,15 @@ func (t *TargetInfo) SetTableInfo(schemaName string, tableName string, tblInfo * t.dbTblInfoMap[schemaName][tableName] = tblInfo } +// FetchRemoteDBModels implements the TargetInfoGetter interface. +func (t *TargetInfo) FetchRemoteDBModels(_ context.Context) ([]*model.DBInfo, error) { + resultInfos := []*model.DBInfo{} + for dbName := range t.dbTblInfoMap { + resultInfos = append(resultInfos, &model.DBInfo{Name: model.NewCIStr(dbName)}) + } + return resultInfos, nil +} + // FetchRemoteTableModels fetches the table structures from the remote target. // It implements the TargetInfoGetter interface. func (t *TargetInfo) FetchRemoteTableModels(_ context.Context, schemaName string) ([]*model.TableInfo, error) { diff --git a/br/pkg/lightning/importer/restore_schema_test.go b/br/pkg/lightning/importer/restore_schema_test.go index b969e01bea358..e3c374295d3a4 100644 --- a/br/pkg/lightning/importer/restore_schema_test.go +++ b/br/pkg/lightning/importer/restore_schema_test.go @@ -136,6 +136,10 @@ func (s *restoreSchemaSuite) SetupTest() { s.controller, s.ctx = gomock.WithContext(context.Background(), s.T()) mockTargetInfoGetter := mock.NewMockTargetInfoGetter(s.controller) mockBackend := mock.NewMockBackend(s.controller) + mockTargetInfoGetter.EXPECT(). + FetchRemoteDBModels(gomock.Any()). + AnyTimes(). + Return([]*model.DBInfo{{Name: model.NewCIStr("fakedb")}}, nil) mockTargetInfoGetter.EXPECT(). FetchRemoteTableModels(gomock.Any(), gomock.Any()). AnyTimes(). diff --git a/br/pkg/mock/backend.go b/br/pkg/mock/backend.go index e32c73f3ceb7b..4e5c56c75d4d8 100644 --- a/br/pkg/mock/backend.go +++ b/br/pkg/mock/backend.go @@ -295,6 +295,21 @@ func (mr *MockTargetInfoGetterMockRecorder) CheckRequirements(arg0, arg1 interfa return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckRequirements", reflect.TypeOf((*MockTargetInfoGetter)(nil).CheckRequirements), arg0, arg1) } +// FetchRemoteDBModels mocks base method. +func (m *MockTargetInfoGetter) FetchRemoteDBModels(arg0 context.Context) ([]*model.DBInfo, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FetchRemoteDBModels", arg0) + ret0, _ := ret[0].([]*model.DBInfo) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FetchRemoteDBModels indicates an expected call of FetchRemoteDBModels. +func (mr *MockTargetInfoGetterMockRecorder) FetchRemoteDBModels(arg0 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FetchRemoteDBModels", reflect.TypeOf((*MockTargetInfoGetter)(nil).FetchRemoteDBModels), arg0) +} + // FetchRemoteTableModels mocks base method. func (m *MockTargetInfoGetter) FetchRemoteTableModels(arg0 context.Context, arg1 string) ([]*model.TableInfo, error) { m.ctrl.T.Helper() diff --git a/br/tests/lightning_character_sets/run.sh b/br/tests/lightning_character_sets/run.sh index d1a7ea5728d16..4c09185853f95 100755 --- a/br/tests/lightning_character_sets/run.sh +++ b/br/tests/lightning_character_sets/run.sh @@ -78,6 +78,8 @@ check_contains 's: 5291' # test about unsupported charset in UTF-8 encoding dump files # test local backend run_lightning --config "tests/$TEST_NAME/greek.toml" -d "tests/$TEST_NAME/greek" 2>&1 | grep -q "Unknown character set: 'greek'" +# check TiDB does not receive the DDL +check_not_contains "greek" $TEST_DIR/tidb.log run_sql 'DROP DATABASE IF EXISTS charsets;' run_sql 'CREATE DATABASE charsets;' run_sql 'CREATE TABLE charsets.greek (c VARCHAR(20) PRIMARY KEY);'