Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: merge conflict record tables for preprocess duplicate detection and post-import conflict detection #52307

Merged
merged 12 commits into from
Apr 15, 2024
9 changes: 2 additions & 7 deletions br/OWNERS
Original file line number Diff line number Diff line change
@@ -1,10 +1,5 @@
# See the OWNERS docs at https://go.k8s.io/owners
options:
no_parent_owners: true
filters:
"(tidb-lightning\\.toml)$":
approvers:
- sig-critical-approvers-tidb-lightning
Comment on lines -5 to -7
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lyzx2001 you forgot to move this part into lightning/OWNERS.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done
#52745

".*":
approvers:
- sig-approvers-br
approvers:
- sig-approvers-br
2 changes: 1 addition & 1 deletion lightning/tests/lightning_config_max_error/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ run_sql 'DROP DATABASE IF EXISTS lightning_task_info'
run_sql 'DROP DATABASE IF EXISTS mytest'
run_lightning --backend tidb --config "${mydir}/tidb-limit-record.toml" 2>&1 | grep "\`lightning_task_info\`.\`conflict_records\`" | grep -q "5"
run_sql 'SELECT COUNT(*) FROM lightning_task_info.conflict_records'
check_contains "COUNT(*): 1"
check_contains "COUNT(*): 5"

# Check conflict.threshold
run_sql 'DROP DATABASE IF EXISTS lightning_task_info'
Expand Down

This file was deleted.

14 changes: 1 addition & 13 deletions lightning/tests/lightning_duplicate_detection_new/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -77,19 +77,7 @@ if [ "$files_left" -ne "0" ];then
fi
rm -rf "$TEST_DIR/$TEST_NAME.sorted"

# 4. Test limit error records.
cleanup
run_lightning --backend local --config "$CUR/local-limit-error-records.toml" --log-file "$LOG_FILE"
run_sql "SELECT count(*) FROM test.dup_detect"
check_contains "count(*): 174"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records"
check_contains "count(*): 50"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records WHERE error LIKE '%PRIMARY%'"
check_contains "count(*): 49"
run_sql "SELECT count(*) FROM lightning_task_info.conflict_records WHERE error LIKE '%uniq_col6_col7%'"
check_contains "count(*): 1"

# 5. Test fail after duplicate detection.
# 4. Test fail after duplicate detection.
cleanup

export GO_FAILPOINTS="github.com/pingcap/tidb/lightning/pkg/importer/FailAfterDuplicateDetection=return()"
Expand Down
9 changes: 9 additions & 0 deletions lightning/tests/lightning_duplicate_resolution_merge/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,12 @@ check_contains 'count(*): 16'

run_sql 'select count(*) from lightning_task_info.conflict_error_v2'
check_contains 'count(*): 4'

run_sql 'select count(*) from lightning_task_info.conflict_view'
check_contains 'count(*): 20'

run_sql 'select count(*) from lightning_task_info.conflict_view where is_precheck_conflict = 1'
check_contains 'count(*): 16'

run_sql 'select count(*) from lightning_task_info.conflict_view where is_precheck_conflict = 0'
check_contains 'count(*): 4'
10 changes: 6 additions & 4 deletions br/tidb-lightning.toml → lightning/tidb-lightning.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,19 @@ driver = "file"
# - "replace": When encountering conflicting primary or unique key records, TiDB Lightning retains the latest data and overwrites the old data.
# The conflicting data are recorded in the `lightning_task_info.conflict_error_v2` table (recording conflicting data detected by post-import conflict detection in the physical import mode)
# and the `conflict_records` table (recording conflicting data detected by preprocess conflict detection in both logical and physical import modes) of the target TiDB cluster.
# If you turn on both preprocess and post-import conflict detection in physical import mode, the conflicting data can be checked in `lightning_task_info.conflict_view` view.
# You can manually insert the correct records into the target table based on your application requirements. Note that the target TiKV must be v5.2.0 or later versions.
# - "ignore": When encountering conflicting primary or unique key records, TiDB Lightning retains the old data and ignores the new data. This option can only be used in the logical import mode.
strategy = ""
# Controls whether to enable preprocess conflict detection, which check conflicts in the data before importing it to TiDB. In scenarios where the ratio of conflict records is greater than or equal to 1%, it is recommended to enable preprocess conflict detection for better performance in conflict detection.
# In other scenarios, it is recommended to disable it. The default value is false, indicating that TiDB Lightning only checks conflicts after the import. If you set it to true, TiDB Lightning checks conflicts both before and after the import. This parameter is experimental, and it can be used only in the physical import mode.
# precheck-conflict-before-import = false
# Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when strategy is "replace" or "ignore". The default value is 9223372036854775807, which means that almost all errors are tolerant.
# threshold = 9223372036854775807
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remember the reason of separate threshold and max-record-rows is the user wants the import task success for the first time, but the user did not know how many duplicate records in it. So threshold should be set to a large value.

But if we record too many rows in the conflict table, the performance is not affordable, so there's another configuration max-record-rows.

How to resolve this case now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change was determined by PM. The default threshold/max-record-rows is now set to 10000. In most scenarios, if the user wants to import data with more than 10000 duplicate records, there is probably something wrong with the data source, and the previous experience is that maybe the configuration is not set correctly, or the downstream table structure pk or uk is not well defined. So there is no meaning to keep importing although not further recording conflict data.
The change is the first step that we are gradually deprecating max-record-rows, which is to directly set the value of max-record-rows to be that of threshold.

# Controls the maximum number of records in the `conflict_records` table. The default value is 100. In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded.
# Controls the maximum number of conflict errors that can be handled when strategy is "replace" or "ignore". You can set it only when strategy is "replace" or "ignore". The default value is 10000.
# threshold = 10000
# Controls the maximum number of records in the `conflict_records` table. The default value is 10000. In the physical import mode, if the strategy is "replace", the conflict records that are overwritten are recorded.
# In the logical import mode, if the strategy is "ignore", the conflict records that are ignored are recorded; if the strategy is "replace", the conflict records can not be recorded.
# max-record-rows = 100
# Starting from v8.1.0, max-record-rows will be assigned the value of threshold, regardless the user input. max-record-rows will be deprecated in the future.
# max-record-rows = 10000

[tikv-importer]
# Delivery backend, can be "importer", "local" or "tidb".
Expand Down
3 changes: 1 addition & 2 deletions pkg/ddl/ingest/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package ingest

import (
"context"
"math"
"net"
"path/filepath"
"strconv"
Expand Down Expand Up @@ -64,7 +63,7 @@ func genConfig(ctx context.Context, memRoot MemRoot, jobID int64, unique bool, r
cfg.Checkpoint.Enable = true
if unique {
cfg.Conflict.Strategy = lightning.ErrorOnDup
cfg.Conflict.Threshold = math.MaxInt64
cfg.Conflict.Threshold = lightning.DefaultRecordDuplicateThreshold
} else {
cfg.Conflict.Strategy = lightning.NoneOnDup
}
Expand Down
53 changes: 18 additions & 35 deletions pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@ const (
defaultLogicalImportBatchRows = 65536

// defaultMetaSchemaName is the default database name used to store lightning metadata
defaultMetaSchemaName = "lightning_metadata"
defaultTaskInfoSchemaName = "lightning_task_info"
defaultMaxRecordRows = 100
defaultMetaSchemaName = "lightning_metadata"
defaultTaskInfoSchemaName = "lightning_task_info"
DefaultRecordDuplicateThreshold = 10000

// autoDiskQuotaLocalReservedSpeed is the estimated size increase per
// millisecond per write thread the local backend may gain on all engines.
Expand Down Expand Up @@ -1339,7 +1339,7 @@ type Conflict struct {

// adjust assigns default values and check illegal values. The arguments must be
// adjusted before calling this function.
func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error {
func (c *Conflict) adjust(i *TikvImporter) error {
strategyConfigFrom := "conflict.strategy"
if c.Strategy == NoneOnDup {
if i.OnDuplicate == NoneOnDup && i.Backend == BackendTiDB {
Expand Down Expand Up @@ -1378,48 +1378,31 @@ func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error {

if c.Threshold < 0 {
switch c.Strategy {
case ErrorOnDup:
case ErrorOnDup, NoneOnDup:
c.Threshold = 0
case IgnoreOnDup, ReplaceOnDup:
c.Threshold = math.MaxInt64
case NoneOnDup:
c.Threshold = 0
if i.Backend == BackendLocal && c.Strategy != NoneOnDup {
c.Threshold = math.MaxInt64
}
c.Threshold = DefaultRecordDuplicateThreshold
}
}
if c.Threshold > 0 && c.Strategy == ErrorOnDup {
return common.ErrInvalidConfig.GenWithStack(
`conflict.threshold cannot be set when use conflict.strategy = "error"`)
}

if c.MaxRecordRows < 0 {
maxErr := l.MaxError
// Compatible with the old behavior that records all syntax,charset,type errors.
maxAccepted := max(maxErr.Syntax.Load(), maxErr.Charset.Load(), maxErr.Type.Load())
if maxAccepted < defaultMaxRecordRows {
maxAccepted = defaultMaxRecordRows
}
if maxAccepted > c.Threshold {
maxAccepted = c.Threshold
}
if c.Strategy == ReplaceOnDup && i.Backend == BackendTiDB {
// due to we use batch insert, we can't know which row is duplicated.
maxAccepted = 0
if c.Strategy == ReplaceOnDup && i.Backend == BackendTiDB {
// due to we use batch insert, we can't know which row is duplicated.
if c.MaxRecordRows >= 0 {
// only warn when it is set by user.
log.L().Warn(`Cannot record duplication (conflict.max-record-rows > 0) when use tikv-importer.backend = \"tidb\" and conflict.strategy = \"replace\".
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this behaviour also changed. In the old behaviour tidb backend + replace will see error if set max-record-rows, the error will tell the user can't find the error details in conflict table in advance. Now the user will not see the error. Please double check with PM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PM confirmed that the user will no longer see this error. Instead there will be a warning in the log

The value of conflict.max-record-rows has been converted to 0.`)
}
c.MaxRecordRows = maxAccepted
c.MaxRecordRows = 0
} else {
// only check it when it is set by user.
if c.MaxRecordRows > c.Threshold {
return common.ErrInvalidConfig.GenWithStack(
"conflict.max-record-rows (%d) cannot be larger than conflict.threshold (%d)",
c.MaxRecordRows, c.Threshold)
}
if c.Strategy == ReplaceOnDup && i.Backend == BackendTiDB {
return common.ErrInvalidConfig.GenWithStack(
`cannot record duplication (conflict.max-record-rows > 0) when use tikv-importer.backend = "tidb" and conflict.strategy = "replace"`)
if c.MaxRecordRows >= 0 {
// only warn when it is set by user.
log.L().Warn("Setting conflict.max-record-rows does not take affect. The value of conflict.max-record-rows has been converted to conflict.threshold.")
}
c.MaxRecordRows = c.Threshold
}
return nil
}
Expand Down Expand Up @@ -1622,7 +1605,7 @@ func (cfg *Config) Adjust(ctx context.Context) error {
if err = cfg.Routes.adjust(&cfg.Mydumper); err != nil {
return err
}
return cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App)
return cfg.Conflict.adjust(&cfg.TikvImporter)
}

// AdjustForDDL acts like Adjust, but DDL will not use some functionalities so
Expand Down
33 changes: 20 additions & 13 deletions pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1042,16 +1042,21 @@ func TestAdjustMaxRecordRows(t *testing.T) {

cfg := NewConfig()
assignMinimalLegalValue(cfg)
cfg.Conflict.Threshold = 9999

cfg.Conflict.MaxRecordRows = -1
cfg.Conflict.Strategy = ReplaceOnDup
require.NoError(t, cfg.Adjust(ctx))
require.Equal(t, int64(100), cfg.Conflict.MaxRecordRows)
require.EqualValues(t, 10000, cfg.Conflict.MaxRecordRows)

cfg.Conflict.MaxRecordRows = -1
cfg.App.MaxError.Syntax.Store(1000)
cfg.Conflict.Threshold = 9999
require.NoError(t, cfg.Adjust(ctx))
require.EqualValues(t, 9999, cfg.Conflict.MaxRecordRows)

cfg.Conflict.MaxRecordRows = 1000
cfg.Conflict.Threshold = 100
require.NoError(t, cfg.Adjust(ctx))
require.Equal(t, int64(1000), cfg.Conflict.MaxRecordRows)
require.EqualValues(t, 100, cfg.Conflict.MaxRecordRows)
}

func TestRemoveAllowAllFiles(t *testing.T) {
Expand Down Expand Up @@ -1290,35 +1295,37 @@ func TestAdjustConflict(t *testing.T) {

require.NoError(t, dra.FromStringValue("REPLACE"))
cfg.Conflict.Strategy = dra
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App))
require.Equal(t, int64(math.MaxInt64), cfg.Conflict.Threshold)
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
require.EqualValues(t, 10000, cfg.Conflict.Threshold)

require.NoError(t, dra.FromStringValue("IGNORE"))
cfg.Conflict.Strategy = dra
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `conflict.strategy cannot be set to "ignore" when use tikv-importer.backend = "local"`)
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter), `conflict.strategy cannot be set to "ignore" when use tikv-importer.backend = "local"`)

cfg.Conflict.Strategy = ErrorOnDup
cfg.Conflict.Threshold = 1
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `conflict.threshold cannot be set when use conflict.strategy = "error"`)
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter), `conflict.threshold cannot be set when use conflict.strategy = "error"`)

cfg.TikvImporter.Backend = BackendTiDB
cfg.Conflict.Strategy = ReplaceOnDup
cfg.Conflict.MaxRecordRows = -1
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App))
require.Equal(t, int64(0), cfg.Conflict.MaxRecordRows)
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
require.EqualValues(t, 0, cfg.Conflict.MaxRecordRows)

cfg.TikvImporter.Backend = BackendLocal
cfg.Conflict.Threshold = 1
cfg.Conflict.MaxRecordRows = 1
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App))
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
cfg.Conflict.MaxRecordRows = 2
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `conflict.max-record-rows (2) cannot be larger than conflict.threshold (1)`)
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
require.EqualValues(t, 1, cfg.Conflict.MaxRecordRows)

cfg.TikvImporter.Backend = BackendTiDB
cfg.Conflict.Strategy = ReplaceOnDup
cfg.Conflict.Threshold = 1
cfg.Conflict.MaxRecordRows = 1
require.ErrorContains(t, cfg.Conflict.adjust(&cfg.TikvImporter, &cfg.App), `cannot record duplication (conflict.max-record-rows > 0) when use tikv-importer.backend = "tidb" and conflict.strategy = "replace"`)
require.NoError(t, cfg.Conflict.adjust(&cfg.TikvImporter))
require.EqualValues(t, 0, cfg.Conflict.MaxRecordRows)
}

func TestAdjustBlockSize(t *testing.T) {
Expand Down
55 changes: 39 additions & 16 deletions pkg/lightning/errormanager/errormanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@ const (
typeErrorTableName = "type_error_v1"
// ConflictErrorTableName is the table name for duplicate detection.
ConflictErrorTableName = "conflict_error_v2"
// DupRecordTable is the table name to record duplicate data that displayed to user.
DupRecordTable = "conflict_records"
// DupRecordTableName is the table name to record duplicate data that displayed to user.
DupRecordTableName = "conflict_records"
// ConflictViewName is the view name for presenting the union information of ConflictErrorTable and DupRecordTable.
ConflictViewName = "conflict_view"

createSyntaxErrorTable = `
CREATE TABLE IF NOT EXISTS %s.` + syntaxErrorTableName + ` (
Expand Down Expand Up @@ -100,8 +102,8 @@ const (
);
`

createDupRecordTable = `
CREATE TABLE IF NOT EXISTS %s.` + DupRecordTable + ` (
createDupRecordTableName = `
CREATE TABLE IF NOT EXISTS %s.` + DupRecordTableName + ` (
task_id bigint NOT NULL,
create_time datetime(6) NOT NULL DEFAULT now(6),
table_name varchar(261) NOT NULL,
Expand All @@ -114,6 +116,16 @@ const (
);
`

createConflictView = `
CREATE OR REPLACE VIEW %s.` + ConflictViewName + `
AS SELECT 0 AS is_precheck_conflict, task_id, create_time, table_name, index_name, key_data, row_data,
raw_key, raw_value, raw_handle, raw_row, is_data_kv, NULL AS path, NULL AS offset, NULL AS error, NULL AS row_id
FROM %s.` + ConflictErrorTableName + `
UNION ALL SELECT 1 AS is_precheck_conflict, task_id, create_time, table_name, NULL AS index_name, NULL AS key_data,
row_data, NULL AS raw_key, NULL AS raw_value, NULL AS raw_handle, NULL AS raw_row, NULL AS is_data_kv, path,
offset, error, row_id FROM %s.` + DupRecordTableName + `;
`

insertIntoTypeError = `
INSERT INTO %s.` + typeErrorTableName + `
(task_id, table_name, path, offset, error, row_data)
Expand Down Expand Up @@ -156,7 +168,7 @@ const (
`

insertIntoDupRecord = `
INSERT INTO %s.` + DupRecordTable + `
INSERT INTO %s.` + DupRecordTableName + `
(task_id, table_name, path, offset, error, row_id, row_data)
VALUES (?, ?, ?, ?, ?, ?, ?);
`
Expand Down Expand Up @@ -250,10 +262,10 @@ func (em *ErrorManager) Init(ctx context.Context) error {
sqls = append(sqls, [2]string{"create type error table", createTypeErrorTable})
}
if em.conflictV1Enabled {
sqls = append(sqls, [2]string{"create conflict error v2 table", createConflictErrorTable})
sqls = append(sqls, [2]string{"create conflict error table", createConflictErrorTable})
}
if em.conflictV2Enabled {
sqls = append(sqls, [2]string{"create duplicate records table", createDupRecordTable})
sqls = append(sqls, [2]string{"create duplicate records table", createDupRecordTableName})
}

// No need to create task info schema if no error is allowed.
Expand All @@ -269,6 +281,14 @@ func (em *ErrorManager) Init(ctx context.Context) error {
}
}

// TODO: return VIEW to users regardless of the lightning configuration
if em.conflictV1Enabled && em.conflictV2Enabled {
lance6716 marked this conversation as resolved.
Show resolved Hide resolved
err := exec.Exec(ctx, "create conflict view", strings.TrimSpace(common.SprintfWithIdentifiers(createConflictView, em.schema, em.schema, em.schema)))
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -904,12 +924,14 @@ func (em *ErrorManager) LogErrorDetails() {
// TODO: add charset table name
em.logger.Warn(fmtErrMsg(errCnt, "data charset", ""))
}
if errCnt := em.conflictError(); errCnt > 0 {
if em.conflictV1Enabled {
errCnt := em.conflictError()
if errCnt > 0 {
if em.conflictV1Enabled && em.conflictV2Enabled {
em.logger.Warn(fmtErrMsg(errCnt, "conflict", ConflictViewName))
} else if em.conflictV1Enabled {
em.logger.Warn(fmtErrMsg(errCnt, "conflict", ConflictErrorTableName))
}
if em.conflictV2Enabled {
em.logger.Warn(fmtErrMsg(errCnt, "conflict", DupRecordTable))
} else if em.conflictV2Enabled {
em.logger.Warn(fmtErrMsg(errCnt, "conflict", DupRecordTableName))
}
}
}
Expand Down Expand Up @@ -952,11 +974,12 @@ func (em *ErrorManager) Output() string {
}
if errCnt := em.conflictError(); errCnt > 0 {
count++
if em.conflictV1Enabled {
if em.conflictV1Enabled && em.conflictV2Enabled {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictViewName)})
} else if em.conflictV1Enabled {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(ConflictErrorTableName)})
}
if em.conflictV2Enabled {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(DupRecordTable)})
} else if em.conflictV2Enabled {
t.AppendRow(table.Row{count, "Unique Key Conflict", errCnt, em.fmtTableName(DupRecordTableName)})
}
}

Expand Down