diff --git a/br/pkg/lightning/backend/tidb/tidb.go b/br/pkg/lightning/backend/tidb/tidb.go index dbf7428c93420..3b438e16e21f5 100644 --- a/br/pkg/lightning/backend/tidb/tidb.go +++ b/br/pkg/lightning/backend/tidb/tidb.go @@ -722,7 +722,7 @@ func (be *tidbBackend) execStmts(ctx context.Context, stmtTasks []stmtTask, tabl if isDupEntryError(err) { // rowID is ignored in tidb backend - err = be.errorMgr.RecordConflictErrorV2( + err = be.errorMgr.RecordDuplicate( ctx, log.FromContext(ctx), tableName, diff --git a/br/pkg/lightning/config/config.go b/br/pkg/lightning/config/config.go index e0df7b2e10721..89ec4a0512429 100644 --- a/br/pkg/lightning/config/config.go +++ b/br/pkg/lightning/config/config.go @@ -1205,8 +1205,6 @@ func (cfg *Config) AdjustCommon() (bool, error) { } cfg.TikvImporter.DuplicateResolution = DupeResAlgNone case BackendLocal: - // force turn off pre-dedup for local backend - cfg.TikvImporter.OnDuplicate = "" if cfg.TikvImporter.RegionSplitBatchSize <= 0 { return mustHaveInternalConnections, common.ErrInvalidConfig.GenWithStack("`tikv-importer.region-split-batch-size` got %d, should be larger than 0", cfg.TikvImporter.RegionSplitBatchSize) } diff --git a/br/pkg/lightning/config/config_test.go b/br/pkg/lightning/config/config_test.go index 628595e255cbd..232b69b7f56f3 100644 --- a/br/pkg/lightning/config/config_test.go +++ b/br/pkg/lightning/config/config_test.go @@ -1006,18 +1006,18 @@ func TestAdjustOnDuplicate(t *testing.T) { cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.OnDuplicate = config.ReplaceOnDup require.NoError(t, cfg.Adjust(ctx)) - require.Equal(t, "", cfg.TikvImporter.OnDuplicate) + require.Equal(t, config.ReplaceOnDup, cfg.TikvImporter.OnDuplicate) cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.OnDuplicate = config.ReplaceOnDup cfg.TikvImporter.IncrementalImport = true - require.NoError(t, cfg.Adjust(ctx)) + require.ErrorContains(t, cfg.Adjust(ctx), "tikv-importer.on-duplicate cannot be used with tikv-importer.incremental-import") cfg.TikvImporter.Backend = config.BackendLocal cfg.TikvImporter.OnDuplicate = config.ReplaceOnDup cfg.TikvImporter.IncrementalImport = false cfg.TikvImporter.DuplicateResolution = config.DupeResAlgRemove - require.NoError(t, cfg.Adjust(ctx)) + require.ErrorContains(t, cfg.Adjust(ctx), "tikv-importer.on-duplicate cannot be used with tikv-importer.duplicate-resolution") } func TestAdjustMaxErrorRecords(t *testing.T) { diff --git a/br/pkg/lightning/errormanager/errormanager.go b/br/pkg/lightning/errormanager/errormanager.go index df4722d740b27..af12f06270672 100644 --- a/br/pkg/lightning/errormanager/errormanager.go +++ b/br/pkg/lightning/errormanager/errormanager.go @@ -45,9 +45,8 @@ const ( typeErrorTableName = "type_error_v1" // ConflictErrorTableName is the table name for duplicate detection. ConflictErrorTableName = "conflict_error_v1" - // conflictErrorV2TableName is the table name to record duplicate data in tidb - // backend and pre-deduplication of local backend. - conflictErrorV2TableName = "conflict_error_v2" + // DupRecordTable is the table name to record duplicate data that displayed to user. + DupRecordTable = "duplicate_records" createSyntaxErrorTable = ` CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` ( @@ -89,8 +88,8 @@ const ( ); ` - createConflictErrorV2Table = ` - CREATE TABLE IF NOT EXISTS %s.` + conflictErrorV2TableName + ` ( + createDupRecordTable = ` + CREATE TABLE IF NOT EXISTS %s.` + DupRecordTable + ` ( task_id bigint NOT NULL, create_time datetime(6) NOT NULL DEFAULT now(6), table_name varchar(261) NOT NULL, @@ -132,8 +131,8 @@ const ( ORDER BY _tidb_rowid LIMIT ?; ` - insertIntoConflictErrorV2 = ` - INSERT INTO %s.` + conflictErrorV2TableName + ` + insertIntoDupRecord = ` + INSERT INTO %s.` + DupRecordTable + ` (task_id, table_name, path, offset, error, row_id, row_data) VALUES (?, ?, ?, ?, ?, ?, ?); ` @@ -217,7 +216,7 @@ func (em *ErrorManager) Init(ctx context.Context) error { sqls = append(sqls, [2]string{"create conflict error v1 table", createConflictErrorTable}) } if em.conflictV2Enabled && em.remainingError.Conflict.Load() > 0 { - sqls = append(sqls, [2]string{"create conflict error v2 table", createConflictErrorV2Table}) + sqls = append(sqls, [2]string{"create duplicate records table", createDupRecordTable}) } // No need to create task info schema if no error is allowed. @@ -494,8 +493,8 @@ func (em *ErrorManager) ResolveAllConflictKeys( return errors.Trace(g.Wait()) } -// RecordConflictErrorV2 records a conflict error detected by the new conflict detector or tidb backend. -func (em *ErrorManager) RecordConflictErrorV2( +// RecordDuplicate records a "duplicate entry" error so user can query them later. +func (em *ErrorManager) RecordDuplicate( ctx context.Context, logger log.Logger, tableName string, @@ -523,8 +522,8 @@ func (em *ErrorManager) RecordConflictErrorV2( Logger: logger, HideQueryLog: redact.NeedRedact(), } - return exec.Exec(ctx, "insert conflict error record", - fmt.Sprintf(insertIntoConflictErrorV2, em.schemaEscaped), + return exec.Exec(ctx, "insert duplicate record", + fmt.Sprintf(insertIntoDupRecord, em.schemaEscaped), em.taskID, tableName, path, @@ -594,7 +593,7 @@ func (em *ErrorManager) LogErrorDetails() { if em.conflictV1Enabled { em.logger.Warn(fmtErrMsg(errCnt, "data conflict", ConflictErrorTableName)) } else { - em.logger.Warn(fmtErrMsg(errCnt, "data conflict", conflictErrorV2TableName)) + em.logger.Warn(fmtErrMsg(errCnt, "data conflict", DupRecordTable)) } } } @@ -640,7 +639,7 @@ func (em *ErrorManager) Output() string { if em.conflictV1Enabled { t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictErrorTableName)}) } else { - t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(conflictErrorV2TableName)}) + t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(DupRecordTable)}) } } diff --git a/br/pkg/lightning/errormanager/errormanager_test.go b/br/pkg/lightning/errormanager/errormanager_test.go index 2afbf200b088a..5a06fb1a9e1e0 100644 --- a/br/pkg/lightning/errormanager/errormanager_test.go +++ b/br/pkg/lightning/errormanager/errormanager_test.go @@ -75,7 +75,7 @@ func TestInit(t *testing.T) { WillReturnResult(sqlmock.NewResult(6, 1)) mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1.*"). WillReturnResult(sqlmock.NewResult(7, 1)) - mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v2.*"). + mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.duplicate_records.*"). WillReturnResult(sqlmock.NewResult(7, 1)) err = em.Init(ctx) require.NoError(t, err) diff --git a/br/pkg/lightning/importer/chunk_process.go b/br/pkg/lightning/importer/chunk_process.go index 1fc236c2b95b8..fe06c8caddc0f 100644 --- a/br/pkg/lightning/importer/chunk_process.go +++ b/br/pkg/lightning/importer/chunk_process.go @@ -435,7 +435,7 @@ func (cr *chunkProcessor) encodeLoop( logger, ) rowText := tidb.EncodeRowForRecord(ctx, t.encTable, rc.cfg.TiDB.SQLMode, lastRow.Row, cr.chunk.ColumnPermutation) - err = rc.errorMgr.RecordConflictErrorV2( + err = rc.errorMgr.RecordDuplicate( ctx, logger, t.tableName, diff --git a/br/tests/lightning_config_max_error/run.sh b/br/tests/lightning_config_max_error/run.sh index f8ea535f2457a..faf0aab11d2cf 100755 --- a/br/tests/lightning_config_max_error/run.sh +++ b/br/tests/lightning_config_max_error/run.sh @@ -80,17 +80,17 @@ check_contains "COUNT(*): ${duplicated_row_count}" run_sql 'SELECT COUNT(*) FROM mytest.testtbl' check_contains "COUNT(*): ${uniq_row_count}" -# Check tidb backend record duplicate entry in conflict_error_v2 table -run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2' +# Check tidb backend record duplicate entry in duplicate_records table +run_sql 'DROP TABLE IF EXISTS lightning_task_info.duplicate_records' run_lightning --backend tidb --config "${mydir}/tidb.toml" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2' +run_sql 'SELECT COUNT(*) FROM lightning_task_info.duplicate_records' check_contains "COUNT(*): 10" -run_sql 'SELECT * FROM lightning_task_info.conflict_error_v2 WHERE offset = 149' +run_sql 'SELECT * FROM lightning_task_info.duplicate_records WHERE offset = 149' check_contains "error: Error 1062 (23000): Duplicate entry '5' for key 'testtbl.PRIMARY'" check_contains "row_data: ('5','bbb05')" -# Check max-error-record can limit the size of conflict_error_v2 table +# Check max-error-record can limit the size of duplicate_records table run_sql 'DROP DATABASE IF EXISTS lightning_task_info' -run_lightning --backend tidb --config "${mydir}/tidb-limit-record.toml" 2>&1 | grep "\`lightning_task_info\`.\`conflict_error_v2\`" | grep -q "15" -run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2' +run_lightning --backend tidb --config "${mydir}/tidb-limit-record.toml" 2>&1 | grep "\`lightning_task_info\`.\`duplicate_records\`" | grep -q "15" +run_sql 'SELECT COUNT(*) FROM lightning_task_info.duplicate_records' check_contains "COUNT(*): 1" diff --git a/br/tests/lightning_duplicate_detection_new/run.sh b/br/tests/lightning_duplicate_detection_new/run.sh index 227146666973a..b0bc2dd52ffa9 100755 --- a/br/tests/lightning_duplicate_detection_new/run.sh +++ b/br/tests/lightning_duplicate_detection_new/run.sh @@ -14,9 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO(lance6716): enable it after we turn on pre-deduplication -exit 0 - set -eux CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) @@ -43,14 +40,14 @@ if [ "$expected_rows" != "$actual_rows" ] || [ "$expected_pks" != "$actual_pks" echo "local backend replace strategy result is not equal to tidb backend" exit 1 fi -run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2" +run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records" check_contains "count(*): 227" -run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2 WHERE error = ''" +run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records WHERE error = ''" check_contains "count(*): 0" -run_sql "SELECT * FROM lightning_task_info.conflict_error_v2 WHERE row_id = 12" +run_sql "SELECT * FROM lightning_task_info.duplicate_records WHERE row_id = 12" check_contains "(171,'yRxZE',9201592769833450947,'xs3d',5,4,283270321)" check_contains "[kv:1062]Duplicate entry '171' for key 'dup_detect.PRIMARY'" -run_sql "SELECT * FROM lightning_task_info.conflict_error_v2 WHERE row_id = 1" +run_sql "SELECT * FROM lightning_task_info.duplicate_records WHERE row_id = 1" check_contains "(87,'nEoKu',7836621565948506759,'y6',48,0,177543185)" check_contains "[kv:1062]Duplicate entry '0-177543185' for key 'dup_detect.uniq_col6_col7'" @@ -68,7 +65,7 @@ if [ "$expected_rows" != "$actual_rows" ] || [ "$expected_pks" != "$actual_pks" echo "local backend ignore strategy result is not equal to tidb backend" exit 1 fi -run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2" +run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records" check_contains "count(*): 228" # 3. Test error strategy. @@ -90,11 +87,11 @@ cleanup run_lightning --backend local --config "$CUR/local-limit-error-records.toml" --log-file "$LOG_FILE" run_sql "SELECT count(*) FROM test.dup_detect" check_contains "count(*): 174" -run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2" +run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records" check_contains "count(*): 50" -run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2 WHERE error LIKE '%PRIMARY%'" +run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records WHERE error LIKE '%PRIMARY%'" check_contains "count(*): 49" -run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2 WHERE error LIKE '%uniq_col6_col7%'" +run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records WHERE error LIKE '%uniq_col6_col7%'" check_contains "count(*): 1" # 5. Test fail after duplicate detection. @@ -109,6 +106,6 @@ run_lightning_ctl --enable-checkpoint=1 --backend local --config "$CUR/local-rep run_lightning --enable-checkpoint=1 --backend local --config "$CUR/local-replace.toml" --log-file "$LOG_FILE" run_sql "SELECT count(*) FROM test.dup_detect" check_contains "count(*): 174" -run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2" +run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records" check_contains "count(*): 227" check_not_contains "duplicate detection start" "$LOG_FILE"