Skip to content

Commit

Permalink
lightning: enable pre-deduplication and rename the recording table (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
lance6716 committed Jul 5, 2023
1 parent fdf8c2c commit ee7f4fc
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 41 deletions.
2 changes: 1 addition & 1 deletion br/pkg/lightning/backend/tidb/tidb.go
Expand Up @@ -722,7 +722,7 @@ func (be *tidbBackend) execStmts(ctx context.Context, stmtTasks []stmtTask, tabl

if isDupEntryError(err) {
// rowID is ignored in tidb backend
err = be.errorMgr.RecordConflictErrorV2(
err = be.errorMgr.RecordDuplicate(
ctx,
log.FromContext(ctx),
tableName,
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/config/config.go
Expand Up @@ -1205,8 +1205,6 @@ func (cfg *Config) AdjustCommon() (bool, error) {
}
cfg.TikvImporter.DuplicateResolution = DupeResAlgNone
case BackendLocal:
// force turn off pre-dedup for local backend
cfg.TikvImporter.OnDuplicate = ""
if cfg.TikvImporter.RegionSplitBatchSize <= 0 {
return mustHaveInternalConnections, common.ErrInvalidConfig.GenWithStack("`tikv-importer.region-split-batch-size` got %d, should be larger than 0", cfg.TikvImporter.RegionSplitBatchSize)
}
Expand Down
6 changes: 3 additions & 3 deletions br/pkg/lightning/config/config_test.go
Expand Up @@ -1006,18 +1006,18 @@ func TestAdjustOnDuplicate(t *testing.T) {
cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.OnDuplicate = config.ReplaceOnDup
require.NoError(t, cfg.Adjust(ctx))
require.Equal(t, "", cfg.TikvImporter.OnDuplicate)
require.Equal(t, config.ReplaceOnDup, cfg.TikvImporter.OnDuplicate)

cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.OnDuplicate = config.ReplaceOnDup
cfg.TikvImporter.IncrementalImport = true
require.NoError(t, cfg.Adjust(ctx))
require.ErrorContains(t, cfg.Adjust(ctx), "tikv-importer.on-duplicate cannot be used with tikv-importer.incremental-import")

cfg.TikvImporter.Backend = config.BackendLocal
cfg.TikvImporter.OnDuplicate = config.ReplaceOnDup
cfg.TikvImporter.IncrementalImport = false
cfg.TikvImporter.DuplicateResolution = config.DupeResAlgRemove
require.NoError(t, cfg.Adjust(ctx))
require.ErrorContains(t, cfg.Adjust(ctx), "tikv-importer.on-duplicate cannot be used with tikv-importer.duplicate-resolution")
}

func TestAdjustMaxErrorRecords(t *testing.T) {
Expand Down
27 changes: 13 additions & 14 deletions br/pkg/lightning/errormanager/errormanager.go
Expand Up @@ -45,9 +45,8 @@ const (
typeErrorTableName = "type_error_v1"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v1"
// conflictErrorV2TableName is the table name to record duplicate data in tidb
// backend and pre-deduplication of local backend.
conflictErrorV2TableName = "conflict_error_v2"
// DupRecordTable is the table name to record duplicate data that displayed to user.
DupRecordTable = "duplicate_records"

createSyntaxErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` (
Expand Down Expand Up @@ -89,8 +88,8 @@ const (
);
`

createConflictErrorV2Table = `
CREATE TABLE IF NOT EXISTS %s.` + conflictErrorV2TableName + ` (
createDupRecordTable = `
CREATE TABLE IF NOT EXISTS %s.` + DupRecordTable + ` (
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand Down Expand Up @@ -132,8 +131,8 @@ const (
ORDER BY _tidb_rowid LIMIT ?;
`

insertIntoConflictErrorV2 = `
INSERT INTO %s.` + conflictErrorV2TableName + `
insertIntoDupRecord = `
INSERT INTO %s.` + DupRecordTable + `
(task_id, table_name, path, offset, error, row_id, row_data)
VALUES (?, ?, ?, ?, ?, ?, ?);
`
Expand Down Expand Up @@ -217,7 +216,7 @@ func (em *ErrorManager) Init(ctx context.Context) error {
sqls = append(sqls, [2]string{"create conflict error v1 table", createConflictErrorTable})
}
if em.conflictV2Enabled && em.remainingError.Conflict.Load() > 0 {
sqls = append(sqls, [2]string{"create conflict error v2 table", createConflictErrorV2Table})
sqls = append(sqls, [2]string{"create duplicate records table", createDupRecordTable})
}

// No need to create task info schema if no error is allowed.
Expand Down Expand Up @@ -494,8 +493,8 @@ func (em *ErrorManager) ResolveAllConflictKeys(
return errors.Trace(g.Wait())
}

// RecordConflictErrorV2 records a conflict error detected by the new conflict detector or tidb backend.
func (em *ErrorManager) RecordConflictErrorV2(
// RecordDuplicate records a "duplicate entry" error so user can query them later.
func (em *ErrorManager) RecordDuplicate(
ctx context.Context,
logger log.Logger,
tableName string,
Expand Down Expand Up @@ -523,8 +522,8 @@ func (em *ErrorManager) RecordConflictErrorV2(
Logger: logger,
HideQueryLog: redact.NeedRedact(),
}
return exec.Exec(ctx, "insert conflict error record",
fmt.Sprintf(insertIntoConflictErrorV2, em.schemaEscaped),
return exec.Exec(ctx, "insert duplicate record",
fmt.Sprintf(insertIntoDupRecord, em.schemaEscaped),
em.taskID,
tableName,
path,
Expand Down Expand Up @@ -594,7 +593,7 @@ func (em *ErrorManager) LogErrorDetails() {
if em.conflictV1Enabled {
em.logger.Warn(fmtErrMsg(errCnt, "data conflict", ConflictErrorTableName))
} else {
em.logger.Warn(fmtErrMsg(errCnt, "data conflict", conflictErrorV2TableName))
em.logger.Warn(fmtErrMsg(errCnt, "data conflict", DupRecordTable))
}
}
}
Expand Down Expand Up @@ -640,7 +639,7 @@ func (em *ErrorManager) Output() string {
if em.conflictV1Enabled {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictErrorTableName)})
} else {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(conflictErrorV2TableName)})
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(DupRecordTable)})
}
}

Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/errormanager/errormanager_test.go
Expand Up @@ -75,7 +75,7 @@ func TestInit(t *testing.T) {
WillReturnResult(sqlmock.NewResult(6, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v1.*").
WillReturnResult(sqlmock.NewResult(7, 1))
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.conflict_error_v2.*").
mock.ExpectExec("CREATE TABLE IF NOT EXISTS `lightning_errors`\\.duplicate_records.*").
WillReturnResult(sqlmock.NewResult(7, 1))
err = em.Init(ctx)
require.NoError(t, err)
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/importer/chunk_process.go
Expand Up @@ -435,7 +435,7 @@ func (cr *chunkProcessor) encodeLoop(
logger,
)
rowText := tidb.EncodeRowForRecord(ctx, t.encTable, rc.cfg.TiDB.SQLMode, lastRow.Row, cr.chunk.ColumnPermutation)
err = rc.errorMgr.RecordConflictErrorV2(
err = rc.errorMgr.RecordDuplicate(
ctx,
logger,
t.tableName,
Expand Down
14 changes: 7 additions & 7 deletions br/tests/lightning_config_max_error/run.sh
Expand Up @@ -80,17 +80,17 @@ check_contains "COUNT(*): ${duplicated_row_count}"
run_sql 'SELECT COUNT(*) FROM mytest.testtbl'
check_contains "COUNT(*): ${uniq_row_count}"

# Check tidb backend record duplicate entry in conflict_error_v2 table
run_sql 'DROP TABLE IF EXISTS lightning_task_info.conflict_error_v2'
# Check tidb backend record duplicate entry in duplicate_records table
run_sql 'DROP TABLE IF EXISTS lightning_task_info.duplicate_records'
run_lightning --backend tidb --config "${mydir}/tidb.toml"
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2'
run_sql 'SELECT COUNT(*) FROM lightning_task_info.duplicate_records'
check_contains "COUNT(*): 10"
run_sql 'SELECT * FROM lightning_task_info.conflict_error_v2 WHERE offset = 149'
run_sql 'SELECT * FROM lightning_task_info.duplicate_records WHERE offset = 149'
check_contains "error: Error 1062 (23000): Duplicate entry '5' for key 'testtbl.PRIMARY'"
check_contains "row_data: ('5','bbb05')"

# Check max-error-record can limit the size of conflict_error_v2 table
# Check max-error-record can limit the size of duplicate_records table
run_sql 'DROP DATABASE IF EXISTS lightning_task_info'
run_lightning --backend tidb --config "${mydir}/tidb-limit-record.toml" 2>&1 | grep "\`lightning_task_info\`.\`conflict_error_v2\`" | grep -q "15"
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_error_v2'
run_lightning --backend tidb --config "${mydir}/tidb-limit-record.toml" 2>&1 | grep "\`lightning_task_info\`.\`duplicate_records\`" | grep -q "15"
run_sql 'SELECT COUNT(*) FROM lightning_task_info.duplicate_records'
check_contains "COUNT(*): 1"
21 changes: 9 additions & 12 deletions br/tests/lightning_duplicate_detection_new/run.sh
Expand Up @@ -14,9 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.

# TODO(lance6716): enable it after we turn on pre-deduplication
exit 0

set -eux

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
Expand All @@ -43,14 +40,14 @@ if [ "$expected_rows" != "$actual_rows" ] || [ "$expected_pks" != "$actual_pks"
echo "local backend replace strategy result is not equal to tidb backend"
exit 1
fi
run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2"
run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records"
check_contains "count(*): 227"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2 WHERE error = ''"
run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records WHERE error = ''"
check_contains "count(*): 0"
run_sql "SELECT * FROM lightning_task_info.conflict_error_v2 WHERE row_id = 12"
run_sql "SELECT * FROM lightning_task_info.duplicate_records WHERE row_id = 12"
check_contains "(171,'yRxZE',9201592769833450947,'xs3d',5,4,283270321)"
check_contains "[kv:1062]Duplicate entry '171' for key 'dup_detect.PRIMARY'"
run_sql "SELECT * FROM lightning_task_info.conflict_error_v2 WHERE row_id = 1"
run_sql "SELECT * FROM lightning_task_info.duplicate_records WHERE row_id = 1"
check_contains "(87,'nEoKu',7836621565948506759,'y6',48,0,177543185)"
check_contains "[kv:1062]Duplicate entry '0-177543185' for key 'dup_detect.uniq_col6_col7'"

Expand All @@ -68,7 +65,7 @@ if [ "$expected_rows" != "$actual_rows" ] || [ "$expected_pks" != "$actual_pks"
echo "local backend ignore strategy result is not equal to tidb backend"
exit 1
fi
run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2"
run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records"
check_contains "count(*): 228"

# 3. Test error strategy.
Expand All @@ -90,11 +87,11 @@ cleanup
run_lightning --backend local --config "$CUR/local-limit-error-records.toml" --log-file "$LOG_FILE"
run_sql "SELECT count(*) FROM test.dup_detect"
check_contains "count(*): 174"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2"
run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records"
check_contains "count(*): 50"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2 WHERE error LIKE '%PRIMARY%'"
run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records WHERE error LIKE '%PRIMARY%'"
check_contains "count(*): 49"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2 WHERE error LIKE '%uniq_col6_col7%'"
run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records WHERE error LIKE '%uniq_col6_col7%'"
check_contains "count(*): 1"

# 5. Test fail after duplicate detection.
Expand All @@ -109,6 +106,6 @@ run_lightning_ctl --enable-checkpoint=1 --backend local --config "$CUR/local-rep
run_lightning --enable-checkpoint=1 --backend local --config "$CUR/local-replace.toml" --log-file "$LOG_FILE"
run_sql "SELECT count(*) FROM test.dup_detect"
check_contains "count(*): 174"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_error_v2"
run_sql "SELECT count(*) FROM lightning_task_info.duplicate_records"
check_contains "count(*): 227"
check_not_contains "duplicate detection start" "$LOG_FILE"

0 comments on commit ee7f4fc

Please sign in to comment.