Skip to content

Commit

Permalink
lightning: allow parallel import with conflict detection (#52002) (#5…
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot committed Mar 22, 2024
1 parent 92518cd commit 0fabd47
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 13 deletions.
15 changes: 4 additions & 11 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,17 +1361,10 @@ func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error {
return common.ErrInvalidConfig.GenWithStack(
"unsupported `%s` (%s)", strategyConfigFrom, c.Strategy)
}
if c.Strategy != NoneOnDup {
if i.ParallelImport && i.Backend == BackendLocal && c.PrecheckConflictBeforeImport {
return common.ErrInvalidConfig.GenWithStack(
`%s cannot be used with tikv-importer.parallel-import and tikv-importer.backend = "local" and conflict.precheck-conflict-before-import = true`,
strategyConfigFrom)
}
if !strategyFromDuplicateResolution && i.DuplicateResolution != NoneOnDup {
return common.ErrInvalidConfig.GenWithStack(
"%s cannot be used with tikv-importer.duplicate-resolution",
strategyConfigFrom)
}
if !strategyFromDuplicateResolution && c.Strategy != NoneOnDup && i.DuplicateResolution != NoneOnDup {
return common.ErrInvalidConfig.GenWithStack(
"%s cannot be used with tikv-importer.duplicate-resolution",
strategyConfigFrom)
}
if c.Strategy == IgnoreOnDup && i.Backend == BackendLocal {
return common.ErrInvalidConfig.GenWithStack(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ func TestAdjustConflictStrategy(t *testing.T) {
cfg.Conflict.Strategy = ReplaceOnDup
cfg.TikvImporter.ParallelImport = true
cfg.Conflict.PrecheckConflictBeforeImport = true
require.ErrorContains(t, cfg.Adjust(ctx), `conflict.strategy cannot be used with tikv-importer.parallel-import and tikv-importer.backend = "local" and conflict.precheck-conflict-before-import = true`)
require.NoError(t, cfg.Adjust(ctx))

cfg.TikvImporter.Backend = BackendLocal
cfg.Conflict.Strategy = ReplaceOnDup
Expand Down
20 changes: 20 additions & 0 deletions br/tests/lightning_duplicate_resolution_merge/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[lightning]
task-info-schema-name = 'lightning_task_info'

[tikv-importer]
backend = 'local'
add-index-by-sql = false
parallel-import = true

[conflict]
strategy = "replace"
precheck-conflict-before-import= true

[checkpoint]
enable = false

[mydumper]
batch-size = 1

[mydumper.csv]
header = true
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create schema dup_resolve;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table a (
a int primary key clustered,
b int not null,
c text
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
a,b,c
1,1,1.csv
1,1,2.csv
1,1,3.csv
2,2,1.csv
2,1,1.csv
2,2,2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
a,b,c
3,3,3.csv
3,4,4.csv
4,3,4.csv
4,4,4.csv
4,5,4.csv
5,5,5.csv
5,6,6.csv
6,6,6.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create schema dup_resolve;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table a (
a int primary key clustered,
b int not null,
c text
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
a,b,c
1,1,1.csv
1,1,2.csv
1,1,3.csv
2,2,1.csv
2,1,1.csv
2,2,2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
a,b,c
13,3,3.csv
13,4,4.csv
14,3,4.csv
14,4,4.csv
14,5,4.csv
15,5,5.csv
15,6,6.csv
16,6,6.csv
49 changes: 49 additions & 0 deletions br/tests/lightning_duplicate_resolution_merge/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

check_cluster_version 5 2 0 'duplicate detection' || exit 0

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)

LOG_FILE1="$TEST_DIR/lightning_duplicate_resolution_merge1.log"
LOG_FILE2="$TEST_DIR/lightning_duplicate_resolution_merge2.log"

# let lightning run a bit slow to avoid some table in the first lightning finish too fast.
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/importer/SlowDownImport=sleep(250)"

run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_merge.sorted1" \
-d "$CUR/data1" --log-file "$LOG_FILE1" --config "$CUR/config.toml" &
pid1="$!"

run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_merge.sorted2" \
-d "$CUR/data2" --log-file "$LOG_FILE2" --config "$CUR/config.toml" &
pid2="$!"

wait "$pid1" "$pid2"

# Ensure all tables are consistent.
run_sql 'admin check table dup_resolve.a'

run_sql 'select count(*) from dup_resolve.a'
check_contains 'count(*): 10'

run_sql 'select count(*) from lightning_task_info.conflict_records'
check_contains 'count(*): 16'

run_sql 'select count(*) from lightning_task_info.conflict_error_v2'
check_contains 'count(*): 4'
2 changes: 1 addition & 1 deletion br/tests/run_group_lightning_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ groups=(
["G00"]='lightning_auto_random_default lightning_bom_file lightning_character_sets lightning_check_partial_imported lightning_checkpoint lightning_checkpoint_chunks lightning_checkpoint_columns lightning_checkpoint_dirty_tableid'
["G01"]='lightning_checkpoint_engines lightning_checkpoint_engines_order lightning_checkpoint_error_destroy lightning_checkpoint_parquet lightning_checkpoint_timestamp lightning_checksum_mismatch lightning_cmdline_override lightning_column_permutation lightning_common_handle lightning_compress lightning_concurrent-restore'
["G02"]='lightning_config_max_error lightning_config_skip_csv_header lightning_csv lightning_default-columns lightning_disable_scheduler_by_key_range lightning_disk_quota lightning_distributed_import lightning_drop_other_tables_halfway lightning_duplicate_detection lightning_duplicate_detection_new lightning_duplicate_resolution_error lightning_duplicate_resolution_error_pk_multiple_files lightning_duplicate_resolution_error_uk_multiple_files lightning_duplicate_resolution_error_uk_multiple_files_multicol_index lightning_duplicate_resolution_incremental'
["G03"]='lightning_duplicate_resolution_replace_multiple_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_keys_nonclustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_nonclustered_pk lightning_duplicate_resolution_replace_one_key lightning_duplicate_resolution_replace_one_key_multiple_conflicts_clustered_pk lightning_duplicate_resolution_replace_one_key_multiple_conflicts_nonclustered_pk'
["G03"]='lightning_duplicate_resolution_merge lightning_duplicate_resolution_replace_multiple_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_keys_nonclustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_nonclustered_pk lightning_duplicate_resolution_replace_one_key lightning_duplicate_resolution_replace_one_key_multiple_conflicts_clustered_pk lightning_duplicate_resolution_replace_one_key_multiple_conflicts_nonclustered_pk'
["G04"]='lightning_duplicate_resolution_replace_one_unique_key_clustered_pk lightning_duplicate_resolution_replace_one_unique_key_multiple_conflicts_clustered_pk lightning_duplicate_resolution_replace_one_unique_key_multiple_conflicts_nonclustered_pk lightning_duplicate_resolution_replace_one_unique_key_nonclustered_varchar_pk lightning_error_summary lightning_examples lightning_exotic_filenames lightning_extend_routes'
["G05"]='lightning_fail_fast lightning_fail_fast_on_nonretry_err lightning_file_routing lightning_foreign_key lightning_gcs lightning_generated_columns lightning_ignore_columns lightning_import_compress lightning_incremental lightning_issue_282 lightning_issue_40657 lightning_issue_410 lightning_issue_519 lightning_local_backend lightning_max_incr'
["G06"]='lightning_max_random lightning_multi_valued_index lightning_new_collation lightning_no_schema lightning_parquet lightning_partition_incremental lightning_partitioned-table lightning_record_network lightning_reload_cert lightning_restore lightning_routes lightning_routes_panic lightning_row-format-v2 lightning_s3'
Expand Down

0 comments on commit 0fabd47

Please sign in to comment.