DM: Add foreign key causality for DM syncer. (#12414)#12552
DM: Add foreign key causality for DM syncer. (#12414)#12552ti-chi-bot[bot] merged 6 commits intopingcap:release-8.5from
Conversation
Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
|
@OliverS929 This PR has conflicts, I have hold it. |
|
@ti-chi-bot: ## If you want to know how to resolve it, please read the guide in TiDB Dev Guide. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the ti-community-infra/tichi repository. |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the DM syncer's capability to handle foreign key constraints, particularly in complex replication scenarios. It introduces a robust mechanism to track and enforce foreign key causality across related tables, ensuring data integrity even when multiple workers are processing changes concurrently. The changes also include stricter validation for DDL operations involving foreign keys and improved handling of safe-mode DMLs to prevent violations of referential integrity. This work is crucial for supporting more intricate database schemas and replication topologies. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces foreign key causality support for the DM syncer, which is a significant and valuable feature. The implementation appears well-thought-out, covering recursive causality chain building, pre-checks for unsupported configurations like table routing with multiple workers, and handling of filtered parent tables. The addition of comprehensive unit and integration tests is commendable and greatly increases confidence in the changes. However, there are several merge conflicts present in the code that must be resolved before this PR can be merged. These conflicts are critical as they prevent the code from compiling.
dm/syncer/dml_worker.go
Outdated
| <<<<<<< HEAD | ||
| ======= | ||
| func isForeignKeyChecksEnabled(session map[string]string) bool { | ||
| for key, value := range session { | ||
| if strings.EqualFold(key, "foreign_key_checks") { | ||
| trimmed := strings.Trim(value, " '\"") | ||
| return variable.TiDBOptOn(trimmed) | ||
| } | ||
| } | ||
| return false | ||
| } | ||
|
|
||
| func (w *DMLWorker) shouldDisableForeignKeyChecksForJob(j *job) bool { | ||
| if !w.foreignKeyChecksEnabled { | ||
| return false | ||
| } | ||
| if j == nil || j.tp != dml || !j.safeMode || j.dml == nil { | ||
| return false | ||
| } | ||
| switch j.dml.Type() { | ||
| case sqlmodel.RowChangeInsert, sqlmodel.RowChangeUpdate: | ||
| return true | ||
| default: | ||
| return false | ||
| } | ||
| } | ||
|
|
||
| func (w *DMLWorker) validateSafeModeForeignKeyUpdate(jobs []*job) error { | ||
| if !w.foreignKeyChecksEnabled { | ||
| return nil | ||
| } | ||
| for _, j := range jobs { | ||
| if j == nil || j.tp != dml || !j.safeMode || j.dml == nil { | ||
| continue | ||
| } | ||
| if j.dml.Type() != sqlmodel.RowChangeUpdate { | ||
| continue | ||
| } | ||
| if j.dml.IsPrimaryOrUniqueKeyUpdated() { | ||
| return terror.ErrSyncerUnitNotSupportedOperate.Generatef( | ||
| "safe-mode update with foreign_key_checks=1 and PK/UK changes is not supported; " + | ||
| "delete+replace may break foreign key constraints and cascade operations", | ||
| ) | ||
| } | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| >>>>>>> e709452702 (DM: Add foreign key causality for DM syncer. (#12414)) |
dm/syncer/dml_worker_test.go
Outdated
| <<<<<<< HEAD | ||
| ======= | ||
|
|
||
| func TestShouldDisableForeignKeyChecksForJob(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| worker := &DMLWorker{} | ||
|
|
||
| source := &cdcmodel.TableName{Schema: "db", Table: "tb"} | ||
| target := &cdcmodel.TableName{Schema: "target", Table: "tb"} | ||
| tableInfo := mockTableInfo(t, "create table db.tb(id int primary key, name varchar(10))") | ||
|
|
||
| insertChange := sqlmodel.NewRowChange(source, target, nil, []interface{}{1, "v"}, tableInfo, nil, nil) | ||
| insertJob := &job{tp: dml, safeMode: true, dml: insertChange} | ||
| require.False(t, worker.shouldDisableForeignKeyChecksForJob(insertJob)) | ||
|
|
||
| worker.foreignKeyChecksEnabled = true | ||
| require.True(t, worker.shouldDisableForeignKeyChecksForJob(insertJob)) | ||
|
|
||
| updateChange := sqlmodel.NewRowChange(source, target, []interface{}{1, "a"}, []interface{}{1, "b"}, tableInfo, nil, nil) | ||
| updateJob := &job{tp: dml, safeMode: true, dml: updateChange} | ||
| require.True(t, worker.shouldDisableForeignKeyChecksForJob(updateJob)) | ||
|
|
||
| deleteChange := sqlmodel.NewRowChange(source, target, []interface{}{1, "a"}, nil, tableInfo, nil, nil) | ||
| deleteJob := &job{tp: dml, safeMode: true, dml: deleteChange} | ||
| require.False(t, worker.shouldDisableForeignKeyChecksForJob(deleteJob)) | ||
|
|
||
| insertJob.safeMode = false | ||
| require.False(t, worker.shouldDisableForeignKeyChecksForJob(insertJob)) | ||
| } | ||
|
|
||
| func TestIsForeignKeyChecksEnabled(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| cases := []struct { | ||
| name string | ||
| session map[string]string | ||
| expected bool | ||
| }{ | ||
| {name: "nil session", session: nil, expected: false}, | ||
| {name: "disabled", session: map[string]string{"foreign_key_checks": "0"}, expected: false}, | ||
| {name: "enabled numeric", session: map[string]string{"foreign_key_checks": "1"}, expected: true}, | ||
| {name: "enabled literal", session: map[string]string{"FOREIGN_KEY_CHECKS": "ON"}, expected: true}, | ||
| {name: "enabled quoted", session: map[string]string{"foreign_key_checks": "'1'"}, expected: true}, | ||
| {name: "other value", session: map[string]string{"foreign_key_checks": "off"}, expected: false}, | ||
| {name: "unrelated", session: map[string]string{"sql_mode": ""}, expected: false}, | ||
| } | ||
|
|
||
| for _, c := range cases { | ||
| c := c | ||
| t.Run(c.name, func(t *testing.T) { | ||
| t.Parallel() | ||
| require.Equal(t, c.expected, isForeignKeyChecksEnabled(c.session)) | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| func TestShouldDisableForeignKeyChecks(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| worker := &DMLWorker{foreignKeyChecksEnabled: true} | ||
| source := &cdcmodel.TableName{Schema: "db", Table: "tb"} | ||
| target := &cdcmodel.TableName{Schema: "target", Table: "tbl"} | ||
| tableInfo := mockTableInfo(t, "create table db.tb(id int primary key, col1 int unique not null, col2 int unique, name varchar(24))") | ||
|
|
||
| insertChange := sqlmodel.NewRowChange(source, target, nil, []interface{}{1, 2, 3, "v"}, tableInfo, nil, nil) | ||
| insertJob := newDMLJob(insertChange, ecWithSafeMode) | ||
| require.True(t, worker.shouldDisableForeignKeyChecksForJob(insertJob)) | ||
|
|
||
| insertJob.safeMode = false | ||
| require.False(t, worker.shouldDisableForeignKeyChecksForJob(insertJob)) | ||
|
|
||
| updateChange := sqlmodel.NewRowChange(source, target, []interface{}{1, 2, 3, "v"}, []interface{}{1, 2, 3, "v2"}, tableInfo, nil, nil) | ||
| updateJob := newDMLJob(updateChange, ecWithSafeMode) | ||
| require.True(t, worker.shouldDisableForeignKeyChecksForJob(updateJob)) | ||
|
|
||
| deleteChange := sqlmodel.NewRowChange(source, target, []interface{}{1, 2, 3, "v"}, nil, tableInfo, nil, nil) | ||
| deleteJob := newDMLJob(deleteChange, ecWithSafeMode) | ||
| require.False(t, worker.shouldDisableForeignKeyChecksForJob(deleteJob)) | ||
|
|
||
| worker.foreignKeyChecksEnabled = false | ||
| anotherJob := newDMLJob(insertChange, ecWithSafeMode) | ||
| require.False(t, worker.shouldDisableForeignKeyChecksForJob(anotherJob)) | ||
| } | ||
|
|
||
| func TestValidateSafeModeForeignKeyUpdate(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| worker := &DMLWorker{foreignKeyChecksEnabled: true} | ||
| source := &cdcmodel.TableName{Schema: "db", Table: "tb"} | ||
| target := &cdcmodel.TableName{Schema: "target", Table: "tb"} | ||
|
|
||
| // payload is UK, v is non-key | ||
| tableInfo := mockTableInfo(t, | ||
| "create table db.tb(id int primary key, payload varchar(10) unique, v varchar(10))", | ||
| ) | ||
|
|
||
| // PK change | ||
| pkUpdate := sqlmodel.NewRowChange( | ||
| source, target, | ||
| []interface{}{1, "a", "x"}, | ||
| []interface{}{2, "a", "x"}, | ||
| tableInfo, nil, nil, | ||
| ) | ||
|
|
||
| // UK change | ||
| ukUpdate := sqlmodel.NewRowChange( | ||
| source, target, | ||
| []interface{}{1, "a", "x"}, | ||
| []interface{}{1, "b", "x"}, | ||
| tableInfo, nil, nil, | ||
| ) | ||
|
|
||
| // Non-key change | ||
| nonKeyUpdate := sqlmodel.NewRowChange( | ||
| source, target, | ||
| []interface{}{1, "a", "x"}, | ||
| []interface{}{1, "a", "y"}, | ||
| tableInfo, nil, nil, | ||
| ) | ||
|
|
||
| t.Run("safe-mode pk update should error", func(t *testing.T) { | ||
| j := newDMLJob(pkUpdate, ecWithSafeMode) | ||
| err := worker.validateSafeModeForeignKeyUpdate([]*job{j}) | ||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), "PK/UK changes") | ||
| }) | ||
|
|
||
| t.Run("safe-mode uk update should error", func(t *testing.T) { | ||
| j := newDMLJob(ukUpdate, ecWithSafeMode) | ||
| err := worker.validateSafeModeForeignKeyUpdate([]*job{j}) | ||
| require.Error(t, err) | ||
| require.Contains(t, err.Error(), "PK/UK changes") | ||
| }) | ||
|
|
||
| t.Run("safe-mode non-key update should pass", func(t *testing.T) { | ||
| j := newDMLJob(nonKeyUpdate, ecWithSafeMode) | ||
| require.NoError(t, worker.validateSafeModeForeignKeyUpdate([]*job{j})) | ||
| }) | ||
|
|
||
| t.Run("non-safe-mode pk update should pass", func(t *testing.T) { | ||
| j := newDMLJob(pkUpdate, ec) | ||
| require.NoError(t, worker.validateSafeModeForeignKeyUpdate([]*job{j})) | ||
| }) | ||
| } | ||
|
|
||
| func TestExecuteBatchJobsWithForeignKey(t *testing.T) { | ||
| t.Parallel() | ||
|
|
||
| // helper: convert []interface{} -> []driver.Value | ||
| toDriverValues := func(args []interface{}) []driver.Value { | ||
| out := make([]driver.Value, len(args)) | ||
| for i, v := range args { | ||
| out[i] = v | ||
| } | ||
| return out | ||
| } | ||
|
|
||
| db, mock, err := sqlmock.New() | ||
| require.NoError(t, err) | ||
| defer db.Close() | ||
|
|
||
| sqlConn, err := db.Conn(context.Background()) | ||
| require.NoError(t, err) | ||
| defer sqlConn.Close() | ||
|
|
||
| baseConn := connpkg.NewBaseConnForTest(sqlConn, nil) | ||
| cfg := &config.SubTaskConfig{Name: "test"} | ||
| dbConn := dbconn.NewDBConn(cfg, baseConn) | ||
|
|
||
| worker := &DMLWorker{ | ||
| toDBConns: []*dbconn.DBConn{dbConn}, | ||
| syncCtx: tcontext.Background(), | ||
| successFunc: func(int, int, []*job) {}, | ||
| fatalFunc: func(*job, error) {}, | ||
| metricProxies: nil, | ||
| foreignKeyChecksEnabled: true, | ||
| logger: log.L(), | ||
| } | ||
|
|
||
| source := &cdcmodel.TableName{Schema: "db", Table: "tb"} | ||
| target := &cdcmodel.TableName{Schema: "targetSchema", Table: "targetTable"} | ||
| createSQL := "create table db.tb(id int primary key, col1 int unique not null, col2 int unique, name varchar(24))" | ||
| tableInfo := mockTableInfo(t, createSQL) | ||
|
|
||
| insertChange := sqlmodel.NewRowChange(source, target, nil, []interface{}{1, 2, 3, "normal"}, tableInfo, nil, nil) | ||
| replaceChange := sqlmodel.NewRowChange(source, target, nil, []interface{}{2, 3, 4, "safe"}, tableInfo, nil, nil) | ||
|
|
||
| insertJob := newDMLJob(insertChange, ec) | ||
| replaceJob := newDMLJob(replaceChange, ecWithSafeMode) | ||
|
|
||
| insertQueries, insertArgs := worker.genSQLs([]*job{insertJob}) | ||
| replaceQueries, replaceArgs := worker.genSQLs([]*job{replaceJob}) | ||
|
|
||
| require.Len(t, insertQueries, 1) | ||
| require.Len(t, replaceQueries, 1) | ||
|
|
||
| // Normal insert should execute directly | ||
| mock.ExpectBegin() | ||
| mock.ExpectExec(regexp.QuoteMeta(insertQueries[0])). | ||
| WithArgs(toDriverValues(insertArgs[0])...). | ||
| WillReturnResult(sqlmock.NewResult(0, 1)) | ||
| mock.ExpectCommit() | ||
|
|
||
| // Safe-mode REPLACE should trigger disable + REPLACE + restore | ||
| // disable FKC | ||
| mock.ExpectBegin() | ||
| mock.ExpectExec(regexp.QuoteMeta("SET SESSION foreign_key_checks=0")). | ||
| WillReturnResult(sqlmock.NewResult(0, 0)) | ||
| mock.ExpectCommit() | ||
|
|
||
| // REPLACE job | ||
| mock.ExpectBegin() | ||
| mock.ExpectExec(regexp.QuoteMeta(replaceQueries[0])). | ||
| WithArgs(toDriverValues(replaceArgs[0])...). | ||
| WillReturnResult(sqlmock.NewResult(0, 1)) | ||
| mock.ExpectCommit() | ||
|
|
||
| // restore FKC | ||
| mock.ExpectBegin() | ||
| mock.ExpectExec(regexp.QuoteMeta("SET SESSION foreign_key_checks=1")). | ||
| WillReturnResult(sqlmock.NewResult(0, 0)) | ||
| mock.ExpectCommit() | ||
|
|
||
| // Run two jobs separately: first insert, then REPLACE | ||
| worker.executeBatchJobs(0, []*job{insertJob}, false) | ||
| worker.executeBatchJobs(0, []*job{replaceJob}, true) | ||
|
|
||
| require.NoError(t, mock.ExpectationsWereMet()) | ||
| } | ||
| >>>>>>> e709452702 (DM: Add foreign key causality for DM syncer. (#12414)) |
dm/tests/run_group.sh
Outdated
| <<<<<<< HEAD | ||
| "relay_interrupt safe_mode sequence_safe_mode lightning_load_task lightning_mode metrics" | ||
| ======= | ||
| "relay_interrupt safe_mode safe_mode_foreign_key foreign_key_multi_worker foreign_key_route_single_worker foreign_key_schema_repair sequence_safe_mode lightning_load_task lightning_mode metrics" | ||
| >>>>>>> e709452702 (DM: Add foreign key causality for DM syncer. (#12414)) |
|
/test pull-dm-compatibility-test |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: Benjamin2037 The full list of commands accepted by this bot can be found here. The pull request process is described here |
Codecov Report❌ Patch coverage is Additional details and impacted files
Flags with carried forward coverage won't be shown. Click here to find out more. @@ Coverage Diff @@
## release-8.5 #12552 +/- ##
================================================
Coverage ? 53.3975%
================================================
Files ? 1026
Lines ? 137836
Branches ? 0
================================================
Hits ? 73601
Misses ? 58731
Partials ? 5504 🚀 New features to boost your workflow:
|
|
/retest |
|
/unhold |
|
/retest |
|
/retest |
2 similar comments
|
/retest |
|
/retest |
This is an automated cherry-pick of #12414
What problem does this PR solve?
Issue Number: ref #12350
What is changed and how it works?
This PR introduces foreign key-causality support to the DM syncer.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note