Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lightning: allow parallel import with conflict detection (#52002) #52037

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions br/pkg/lightning/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -1361,17 +1361,10 @@ func (c *Conflict) adjust(i *TikvImporter, l *Lightning) error {
return common.ErrInvalidConfig.GenWithStack(
"unsupported `%s` (%s)", strategyConfigFrom, c.Strategy)
}
if c.Strategy != NoneOnDup {
if i.ParallelImport && i.Backend == BackendLocal && c.PrecheckConflictBeforeImport {
return common.ErrInvalidConfig.GenWithStack(
`%s cannot be used with tikv-importer.parallel-import and tikv-importer.backend = "local" and conflict.precheck-conflict-before-import = true`,
strategyConfigFrom)
}
if !strategyFromDuplicateResolution && i.DuplicateResolution != NoneOnDup {
return common.ErrInvalidConfig.GenWithStack(
"%s cannot be used with tikv-importer.duplicate-resolution",
strategyConfigFrom)
}
if !strategyFromDuplicateResolution && c.Strategy != NoneOnDup && i.DuplicateResolution != NoneOnDup {
return common.ErrInvalidConfig.GenWithStack(
"%s cannot be used with tikv-importer.duplicate-resolution",
strategyConfigFrom)
}
if c.Strategy == IgnoreOnDup && i.Backend == BackendLocal {
return common.ErrInvalidConfig.GenWithStack(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/lightning/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -993,7 +993,7 @@ func TestAdjustConflictStrategy(t *testing.T) {
cfg.Conflict.Strategy = ReplaceOnDup
cfg.TikvImporter.ParallelImport = true
cfg.Conflict.PrecheckConflictBeforeImport = true
require.ErrorContains(t, cfg.Adjust(ctx), `conflict.strategy cannot be used with tikv-importer.parallel-import and tikv-importer.backend = "local" and conflict.precheck-conflict-before-import = true`)
require.NoError(t, cfg.Adjust(ctx))

cfg.TikvImporter.Backend = BackendLocal
cfg.Conflict.Strategy = ReplaceOnDup
Expand Down
20 changes: 20 additions & 0 deletions br/tests/lightning_duplicate_resolution_merge/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[lightning]
task-info-schema-name = 'lightning_task_info'

[tikv-importer]
backend = 'local'
add-index-by-sql = false
parallel-import = true

[conflict]
strategy = "replace"
precheck-conflict-before-import= true

[checkpoint]
enable = false

[mydumper]
batch-size = 1

[mydumper.csv]
header = true
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create schema dup_resolve;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table a (
a int primary key clustered,
b int not null,
c text
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
a,b,c
1,1,1.csv
1,1,2.csv
1,1,3.csv
2,2,1.csv
2,1,1.csv
2,2,2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
a,b,c
3,3,3.csv
3,4,4.csv
4,3,4.csv
4,4,4.csv
4,5,4.csv
5,5,5.csv
5,6,6.csv
6,6,6.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
create schema dup_resolve;
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
create table a (
a int primary key clustered,
b int not null,
c text
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
a,b,c
1,1,1.csv
1,1,2.csv
1,1,3.csv
2,2,1.csv
2,1,1.csv
2,2,2.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
a,b,c
13,3,3.csv
13,4,4.csv
14,3,4.csv
14,4,4.csv
14,5,4.csv
15,5,5.csv
15,6,6.csv
16,6,6.csv
49 changes: 49 additions & 0 deletions br/tests/lightning_duplicate_resolution_merge/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/bin/bash
#
# Copyright 2024 PingCAP, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

set -eux

check_cluster_version 5 2 0 'duplicate detection' || exit 0

CUR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)

LOG_FILE1="$TEST_DIR/lightning_duplicate_resolution_merge1.log"
LOG_FILE2="$TEST_DIR/lightning_duplicate_resolution_merge2.log"

# let lightning run a bit slow to avoid some table in the first lightning finish too fast.
export GO_FAILPOINTS="github.com/pingcap/tidb/br/pkg/lightning/importer/SlowDownImport=sleep(250)"

run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_merge.sorted1" \
-d "$CUR/data1" --log-file "$LOG_FILE1" --config "$CUR/config.toml" &
pid1="$!"

run_lightning --backend local --sorted-kv-dir "$TEST_DIR/lightning_duplicate_resolution_merge.sorted2" \
-d "$CUR/data2" --log-file "$LOG_FILE2" --config "$CUR/config.toml" &
pid2="$!"

wait "$pid1" "$pid2"

# Ensure all tables are consistent.
run_sql 'admin check table dup_resolve.a'

run_sql 'select count(*) from dup_resolve.a'
check_contains 'count(*): 10'

run_sql 'select count(*) from lightning_task_info.conflict_records'
check_contains 'count(*): 16'

run_sql 'select count(*) from lightning_task_info.conflict_error_v2'
check_contains 'count(*): 4'
2 changes: 1 addition & 1 deletion br/tests/run_group_lightning_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ groups=(
["G00"]='lightning_auto_random_default lightning_bom_file lightning_character_sets lightning_check_partial_imported lightning_checkpoint lightning_checkpoint_chunks lightning_checkpoint_columns lightning_checkpoint_dirty_tableid'
["G01"]='lightning_checkpoint_engines lightning_checkpoint_engines_order lightning_checkpoint_error_destroy lightning_checkpoint_parquet lightning_checkpoint_timestamp lightning_checksum_mismatch lightning_cmdline_override lightning_column_permutation lightning_common_handle lightning_compress lightning_concurrent-restore'
["G02"]='lightning_config_max_error lightning_config_skip_csv_header lightning_csv lightning_default-columns lightning_disable_scheduler_by_key_range lightning_disk_quota lightning_distributed_import lightning_drop_other_tables_halfway lightning_duplicate_detection lightning_duplicate_detection_new lightning_duplicate_resolution_error lightning_duplicate_resolution_error_pk_multiple_files lightning_duplicate_resolution_error_uk_multiple_files lightning_duplicate_resolution_error_uk_multiple_files_multicol_index lightning_duplicate_resolution_incremental'
["G03"]='lightning_duplicate_resolution_replace_multiple_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_keys_nonclustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_nonclustered_pk lightning_duplicate_resolution_replace_one_key lightning_duplicate_resolution_replace_one_key_multiple_conflicts_clustered_pk lightning_duplicate_resolution_replace_one_key_multiple_conflicts_nonclustered_pk'
["G03"]='lightning_duplicate_resolution_merge lightning_duplicate_resolution_replace_multiple_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_keys_nonclustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_clustered_pk lightning_duplicate_resolution_replace_multiple_unique_keys_nonclustered_pk lightning_duplicate_resolution_replace_one_key lightning_duplicate_resolution_replace_one_key_multiple_conflicts_clustered_pk lightning_duplicate_resolution_replace_one_key_multiple_conflicts_nonclustered_pk'
["G04"]='lightning_duplicate_resolution_replace_one_unique_key_clustered_pk lightning_duplicate_resolution_replace_one_unique_key_multiple_conflicts_clustered_pk lightning_duplicate_resolution_replace_one_unique_key_multiple_conflicts_nonclustered_pk lightning_duplicate_resolution_replace_one_unique_key_nonclustered_varchar_pk lightning_error_summary lightning_examples lightning_exotic_filenames lightning_extend_routes'
["G05"]='lightning_fail_fast lightning_fail_fast_on_nonretry_err lightning_file_routing lightning_foreign_key lightning_gcs lightning_generated_columns lightning_ignore_columns lightning_import_compress lightning_incremental lightning_issue_282 lightning_issue_40657 lightning_issue_410 lightning_issue_519 lightning_local_backend lightning_max_incr'
["G06"]='lightning_max_random lightning_multi_valued_index lightning_new_collation lightning_no_schema lightning_parquet lightning_partition_incremental lightning_partitioned-table lightning_record_network lightning_reload_cert lightning_restore lightning_routes lightning_routes_panic lightning_row-format-v2 lightning_s3'
Expand Down
Loading