Skip to content

Commit

Permalink
lightning: fix CI and deprecate max-error.conflict (#45349)
Browse files Browse the repository at this point in the history
ref #41629
  • Loading branch information
lance6716 committed Jul 14, 2023
1 parent f9e5621 commit 2cf78c7
Show file tree
Hide file tree
Showing 18 changed files with 193 additions and 169 deletions.
3 changes: 3 additions & 0 deletions Makefile
Expand Up @@ -351,6 +351,9 @@ br_unit_test_in_verify_ci: tools/bin/gotestsum
br_integration_test: br_bins build_br build_for_br_integration_test
@cd br && tests/run.sh

br_integration_test_debug:
@cd br && tests/run.sh --no-tiflash

br_compatibility_test_prepare:
@cd br && tests/run_compatible.sh prepare

Expand Down
2 changes: 0 additions & 2 deletions br/pkg/lightning/backend/tidb/tidb_test.go
Expand Up @@ -556,7 +556,6 @@ func TestWriteRowsErrorDowngradingAll(t *testing.T) {
MaxError: config.MaxError{
Type: *atomic.NewInt64(10),
},
MaxErrorRecords: 100,
},
}, log.L()),
)
Expand Down Expand Up @@ -613,7 +612,6 @@ func TestWriteRowsErrorDowngradingExceedThreshold(t *testing.T) {
MaxError: config.MaxError{
Type: *atomic.NewInt64(3),
},
MaxErrorRecords: 100,
},
}, log.L()),
)
Expand Down
44 changes: 17 additions & 27 deletions br/pkg/lightning/config/config.go
Expand Up @@ -329,15 +329,6 @@ func (l *Lightning) adjust(i *TikvImporter) {
l.RegionConcurrency = cpuCount
}
}
if l.MaxError.Conflict.Load() == -1 {
if i.Backend == BackendTiDB {
// in versions before v7.3, tidb backend will treat "duplicate entry"
// as type error which default is 0. So we set it to 0 to keep compatible.
l.MaxError.Conflict.Store(0)
} else {
l.MaxError.Conflict.Store(math.MaxInt64)
}
}
}

// PostOpLevel represents the level of post-operation.
Expand Down Expand Up @@ -509,6 +500,7 @@ type MaxError struct {
// The default value is zero, which means that such errors are not tolerated.
Type atomic.Int64 `toml:"type" json:"type"`

// deprecated, use `conflict.threshold` instead.
// Conflict is the maximum number of unique key conflicts in local backend accepted.
// When tolerated, every pair of conflict adds 1 to the counter.
// Those pairs will NOT be deleted from the target. Conflict resolution is performed separately.
Expand All @@ -522,16 +514,14 @@ type MaxError struct {
// UnmarshalTOML implements toml.Unmarshaler interface.
func (cfg *MaxError) UnmarshalTOML(v interface{}) error {
defaultValMap := map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": -1,
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
}
// set default value first
cfg.Syntax.Store(defaultValMap["syntax"])
cfg.Charset.Store(defaultValMap["charset"])
cfg.Type.Store(defaultValMap["type"])
cfg.Conflict.Store(defaultValMap["conflict"])
switch val := v.(type) {
case int64:
// ignore val that is smaller than 0
Expand All @@ -554,11 +544,8 @@ func (cfg *MaxError) UnmarshalTOML(v interface{}) error {
return iVal
}
for k, v := range val {
switch k {
case "type":
if k == "type" {
cfg.Type.Store(getVal(k, v))
case "conflict":
cfg.Conflict.Store(getVal(k, v))
}
}
return nil
Expand Down Expand Up @@ -742,6 +729,7 @@ func (p *PostRestore) adjust(i *TikvImporter) {
p.Checksum = OpLevelOff
p.Analyze = OpLevelOff
p.Compact = false
p.ChecksumViaSQL = false
}

// StringOrStringSlice can unmarshal a TOML string as string slice with one element.
Expand Down Expand Up @@ -1347,10 +1335,15 @@ func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error {

if c.Threshold < 0 {
switch c.Strategy {
case ErrorOnDup, "":
case ErrorOnDup:
c.Threshold = 0
case IgnoreOnDup, ReplaceOnDup:
c.Threshold = math.MaxInt64
case "":
c.Threshold = 0
if i.DuplicateResolution != DupeResAlgNone {
c.Threshold = math.MaxInt64
}
}
}
if c.Threshold > 0 && c.Strategy == ErrorOnDup {
Expand Down Expand Up @@ -1392,14 +1385,11 @@ func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error {
func NewConfig() *Config {
return &Config{
App: Lightning{
RegionConcurrency: runtime.NumCPU(),
TableConcurrency: 0,
IndexConcurrency: 0,
IOConcurrency: 5,
CheckRequirements: true,
MaxError: MaxError{
Conflict: *atomic.NewInt64(-1),
},
RegionConcurrency: runtime.NumCPU(),
TableConcurrency: 0,
IndexConcurrency: 0,
IOConcurrency: 5,
CheckRequirements: true,
TaskInfoSchemaName: defaultTaskInfoSchemaName,
},
Checkpoint: Checkpoint{
Expand Down
63 changes: 26 additions & 37 deletions br/pkg/lightning/config/config_test.go
Expand Up @@ -628,20 +628,18 @@ func TestMaxErrorUnmarshal(t *testing.T) {
{
TOMLStr: `max-error = 123`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
"conflict": -1,
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
},
CaseName: "Normal_Int",
},
{
TOMLStr: `max-error = -123`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": -1,
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
},
CaseName: "Abnormal_Negative_Int",
},
Expand All @@ -655,70 +653,61 @@ func TestMaxErrorUnmarshal(t *testing.T) {
syntax = 1
charset = 2
type = 3
conflict = 4
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 3,
"conflict": 4,
"syntax": 0,
"charset": math.MaxInt64,
"type": 3,
},
CaseName: "Normal_Map_All_Set",
},
{
TOMLStr: `[max-error]
conflict = 1000
type = 1000
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
"syntax": 0,
"charset": math.MaxInt64,
"type": 1000,
},
CaseName: "Normal_Map_Partial_Set",
},
{
TOMLStr: `max-error = { conflict = 1000, type = 123 }`,
TOMLStr: `max-error = { type = 123 }`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
"conflict": 1000,
"syntax": 0,
"charset": math.MaxInt64,
"type": 123,
},
CaseName: "Normal_OneLineMap_Partial_Set",
},
{
TOMLStr: `[max-error]
conflict = 1000
not_exist = 123
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
},
CaseName: "Normal_Map_Partial_Set_Invalid_Key",
},
{
TOMLStr: `[max-error]
conflict = 1000
type = -123
`,
ExpectedValues: map[string]int64{
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
"conflict": 1000,
"syntax": 0,
"charset": math.MaxInt64,
"type": 0,
},
CaseName: "Normal_Map_Partial_Set_Invalid_Value",
},
{
TOMLStr: `[max-error]
conflict = 1000
type = abc
`,
ExpectErrStr: `toml: line 3 (last key "max-error.type"): expected value but found "abc" instead`,
ExpectErrStr: `toml: line 2 (last key "max-error.type"): expected value but found "abc" instead`,
CaseName: "Normal_Map_Partial_Set_Invalid_ValueType",
},
} {
Expand All @@ -732,7 +721,6 @@ type = abc
require.Equalf(t, tc.ExpectedValues["syntax"], targetLightningCfg.MaxError.Syntax.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["charset"], targetLightningCfg.MaxError.Charset.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["type"], targetLightningCfg.MaxError.Type.Load(), "test case: %s", tc.CaseName)
require.Equalf(t, tc.ExpectedValues["conflict"], targetLightningCfg.MaxError.Conflict.Load(), "test case: %s", tc.CaseName)
}
}
}
Expand Down Expand Up @@ -1012,10 +1000,11 @@ func TestAdjustConflictStrategy(t *testing.T) {
}

func TestAdjustMaxRecordRows(t *testing.T) {
ctx := context.Background()

cfg := NewConfig()
assignMinimalLegalValue(cfg)
cfg.Conflict.Threshold = 9999
ctx := context.Background()

cfg.Conflict.MaxRecordRows = -1
require.NoError(t, cfg.Adjust(ctx))
Expand Down
1 change: 1 addition & 0 deletions br/pkg/lightning/errormanager/BUILD.bazel
Expand Up @@ -15,6 +15,7 @@ go_library(
"@com_github_jedib0t_go_pretty_v6//text",
"@com_github_pingcap_errors//:errors",
"@org_golang_x_sync//errgroup",
"@org_uber_go_atomic//:atomic",
"@org_uber_go_multierr//:multierr",
"@org_uber_go_zap//:zap",
],
Expand Down

0 comments on commit 2cf78c7

Please sign in to comment.