From 6248b922dd6d14c8548ebbfaa56ebd7bf079a87d Mon Sep 17 00:00:00 2001 From: wlwilliamx <53336371+wlwilliamx@users.noreply.github.com> Date: Fri, 17 Apr 2026 18:13:03 +0800 Subject: [PATCH] This is an automated cherry-pick of #4766 Signed-off-by: ti-chi-bot --- logservice/schemastore/ddl_job_fetcher.go | 41 +++------- .../schemastore/ddl_job_fetcher_test.go | 18 +++++ pkg/common/event/mounter.go | 63 +++------------ pkg/common/event/mounter_test.go | 79 +++++++++++++++++++ pkg/common/span_op.go | 4 + 5 files changed, 123 insertions(+), 82 deletions(-) diff --git a/logservice/schemastore/ddl_job_fetcher.go b/logservice/schemastore/ddl_job_fetcher.go index 3d8f2657f8..eddb06987a 100644 --- a/logservice/schemastore/ddl_job_fetcher.go +++ b/logservice/schemastore/ddl_job_fetcher.go @@ -206,20 +206,6 @@ func (p *ddlJobFetcher) initDDLTableInfo(ctx context.Context, kvStorage kv.Stora p.ddlTableInfo.DDLJobTable = common.WrapTableInfo(db.Name.L, tableInfo) p.ddlTableInfo.JobMetaColumnIDinJobTable = col.ID - // for tidb_ddl_history - historyTableInfo, err := findTableByName(tbls, "tidb_ddl_history") - if err != nil { - return errors.Trace(err) - } - - historyTableCol, err := findColumnByName(historyTableInfo.Columns, "job_meta") - if err != nil { - return errors.Trace(err) - } - - p.ddlTableInfo.DDLHistoryTable = common.WrapTableInfo(db.Name.L, historyTableInfo) - p.ddlTableInfo.JobMetaColumnIDinHistoryTable = historyTableCol.ID - return nil } @@ -257,6 +243,7 @@ func findColumnByName(cols []*model.ColumnInfo, name string) (*model.ColumnInfo, errors.Errorf("can't find column %s", name)) } +<<<<<<< HEAD const ( // JobTableID is the id of `tidb_ddl_job`. JobTableID = ddl.JobTableID @@ -264,32 +251,22 @@ const ( JobHistoryID = ddl.HistoryTableID ) +======= +>>>>>>> 895bb89bc (logservice,event: stop capturing tidb_ddl_history (#4766)) func getAllDDLSpan(keyspaceID uint32) ([]heartbeatpb.TableSpan, error) { - spans := make([]heartbeatpb.TableSpan, 0, 2) - - start, end, err := common.GetKeyspaceTableRange(keyspaceID, JobTableID) + // TiDB v8.3+ emits create-table jobs through tidb_ddl_job again, so the + // schema store only needs to subscribe to the job table. + start, end, err := common.GetKeyspaceTableRange(keyspaceID, common.JobTableID) if err != nil { return nil, err } - spans = append(spans, heartbeatpb.TableSpan{ - TableID: JobTableID, - StartKey: common.ToComparableKey(start), - EndKey: common.ToComparableKey(end), - KeyspaceID: keyspaceID, - }) - - start, end, err = common.GetKeyspaceTableRange(keyspaceID, JobHistoryID) - if err != nil { - return nil, err - } - spans = append(spans, heartbeatpb.TableSpan{ - TableID: JobHistoryID, + return []heartbeatpb.TableSpan{{ + TableID: common.JobTableID, StartKey: common.ToComparableKey(start), EndKey: common.ToComparableKey(end), KeyspaceID: keyspaceID, - }) - return spans, nil + }}, nil } type resolvedTsItem struct { diff --git a/logservice/schemastore/ddl_job_fetcher_test.go b/logservice/schemastore/ddl_job_fetcher_test.go index 3a11c268b7..b835e121b2 100644 --- a/logservice/schemastore/ddl_job_fetcher_test.go +++ b/logservice/schemastore/ddl_job_fetcher_test.go @@ -18,6 +18,8 @@ import ( "time" "github.com/pingcap/ticdc/logservice/logpuller" + "github.com/pingcap/ticdc/pkg/common" + "github.com/pingcap/ticdc/pkg/config/kerneltype" "github.com/pingcap/ticdc/utils/heap" "github.com/stretchr/testify/require" ) @@ -107,3 +109,19 @@ func TestAdvanceSchemaStoreResolvedTs(t *testing.T) { } } } + +func TestGetAllDDLSpan(t *testing.T) { + // Scenario: TiCDC now watches only tidb_ddl_job after TiDB normalized + // create-table DDL delivery back onto the job table. Next Gen requires a + // non-default keyspace ID, while Classic keeps using the default keyspace. + // Steps: build the watched spans for a keyspace valid in the active kernel + // and verify there is exactly one subscription span for tidb_ddl_job. + keyspaceID := common.DefaultKeyspaceID + if kerneltype.IsNextGen() { + keyspaceID = 1 + } + spans, err := getAllDDLSpan(keyspaceID) + require.NoError(t, err) + require.Len(t, spans, 1) + require.Equal(t, common.JobTableID, spans[0].TableID) +} diff --git a/pkg/common/event/mounter.go b/pkg/common/event/mounter.go index 8a16b2e446..8def99b22c 100644 --- a/pkg/common/event/mounter.go +++ b/pkg/common/event/mounter.go @@ -32,17 +32,13 @@ import ( "go.uber.org/zap" ) -// DDLTableInfo contains the tableInfo about tidb_ddl_job and tidb_ddl_history -// and the column id of `job_meta` in these two tables. +// DDLTableInfo contains the tableInfo about tidb_ddl_job and the column id of +// `job_meta` in that table. type DDLTableInfo struct { - // ddlJobsTable use to parse all ddl jobs except `create table` + // DDLJobTable is used to parse DDL jobs from tidb_ddl_job. DDLJobTable *common.TableInfo // It holds the column id of `job_meta` in table `tidb_ddl_jobs`. JobMetaColumnIDinJobTable int64 - // ddlHistoryTable only use to parse `create table` ddl job - DDLHistoryTable *common.TableInfo - // It holds the column id of `job_meta` in table `tidb_ddl_history`. - JobMetaColumnIDinHistoryTable int64 } // Mounter is used to parse SQL events from KV events @@ -183,11 +179,7 @@ func ParseDDLJob(rawKV *common.RawKVEntry, ddlTableInfo *DDLTableInfo) (*model.J // for test case only if bytes.HasPrefix(rawKV.Key, metaPrefix) { v = rawKV.Value - job, err := parseJob(v, rawKV.StartTs, rawKV.CRTs, false) - if err != nil || job == nil { - job, err = parseJob(v, rawKV.StartTs, rawKV.CRTs, true) - } - return job, err + return parseJob(v, rawKV.StartTs, rawKV.CRTs) } rawKV.Key = RemoveKeyspacePrefix(rawKV.Key) @@ -207,55 +199,26 @@ func ParseDDLJob(rawKV *common.RawKVEntry, ddlTableInfo *DDLTableInfo) (*model.J datum = row[ddlTableInfo.JobMetaColumnIDinJobTable] v = datum.GetBytes() - return parseJob(v, rawKV.StartTs, rawKV.CRTs, false) - } else if tableID == common.JobHistoryID { - // parse it with tidb_ddl_history - row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLHistoryTable, time.UTC) - if err != nil { - return nil, errors.Trace(err) - } - datum = row[ddlTableInfo.JobMetaColumnIDinHistoryTable] - v = datum.GetBytes() - return parseJob(v, rawKV.StartTs, rawKV.CRTs, true) + return parseJob(v, rawKV.StartTs, rawKV.CRTs) } return nil, fmt.Errorf("invalid tableID %v in rawKV.Key", tableID) } -// parseJob unmarshal the job from "v". -// fromHistoryTable is used to distinguish the job is from tidb_dd_job or tidb_ddl_history -// We need to be compatible with the two modes, enable_fast_create_table=on and enable_fast_create_table=off -// When enable_fast_create_table=on, `create table` will only be inserted into tidb_ddl_history after being executed successfully. -// When enable_fast_create_table=off, `create table` just like other ddls will be firstly inserted to tidb_ddl_job, -// and being inserted into tidb_ddl_history after being executed successfully. -// In both two modes, other ddls are all firstly inserted into tidb_ddl_job, and then inserted into tidb_ddl_history after being executed successfully. -// -// To be compatible with these two modes, we will get `create table` ddl from tidb_ddl_history, and all ddls from tidb_ddl_job. -// When enable_fast_create_table=off, for each `create table` ddl we will get twice(once from tidb_ddl_history, once from tidb_ddl_job) -// Because in `handleJob` we will skip the repeated ddls, thus it's ok for us to get `create table` twice. -// Besides, the `create table` from tidb_ddl_job always have a earlier commitTs than from tidb_ddl_history. -// Therefore, we always use the commitTs of ddl from `tidb_ddl_job` as StartTs, which ensures we can get all the dmls. -func parseJob(v []byte, startTs, CRTs uint64, fromHistoryTable bool) (*model.Job, error) { +// parseJob unmarshals a job from tidb_ddl_job. +// TiCDC now relies on TiDB's normalized DDL lifecycle, so only jobs that have +// already reached JobStateDone are replayed into the schema store. +func parseJob(v []byte, startTs, CRTs uint64) (*model.Job, error) { var job model.Job err := json.Unmarshal(v, &job) if err != nil { return nil, errors.Trace(err) } - if fromHistoryTable { - // we only want to get `create table` and `create tables` ddl from tidb_ddl_history, so we just throw out others ddls. - // We only want the job with `JobStateSynced`, which is means the ddl job is done successfully. - // Besides, to satisfy the subsequent processing, - // We need to set the job to be Done to make it will replay in schemaStorage - if (job.Type != model.ActionCreateTable && job.Type != model.ActionCreateTables) || job.State != model.JobStateSynced { - return nil, nil - } - job.State = model.JobStateDone - } else { - // we need to get all ddl job which is done from tidb_ddl_job - if !job.IsDone() { - return nil, nil - } + // We only replay DDL jobs that are already visible in the normalized Done + // state so schema store ordering stays aligned with the job table stream. + if !job.IsDone() { + return nil, nil } // FinishedTS is only set when the job is synced, diff --git a/pkg/common/event/mounter_test.go b/pkg/common/event/mounter_test.go index d3cac6d583..44f2b294c1 100644 --- a/pkg/common/event/mounter_test.go +++ b/pkg/common/event/mounter_test.go @@ -14,6 +14,7 @@ package event import ( + "encoding/json" "testing" "time" @@ -27,6 +28,84 @@ import ( "github.com/stretchr/testify/require" ) +func TestParseJobFromDDLJob(t *testing.T) { + t.Run("accepts done create table jobs from tidb_ddl_job", func(t *testing.T) { + // Scenario: TiDB writes create-table jobs through tidb_ddl_job in the + // normalized Done state. + // Steps: marshal a Done create-table job, parse it, and verify the parser + // keeps the job while filling StartTS and FinishedTS from the KV metadata. + raw, err := json.Marshal(&timodel.Job{ + Type: timodel.ActionCreateTable, + State: timodel.JobStateDone, + BinlogInfo: &timodel.HistoryInfo{}, + }) + require.NoError(t, err) + + job, err := parseJob(raw, 101, 202) + require.NoError(t, err) + require.NotNil(t, job) + require.Equal(t, uint64(101), job.StartTS) + require.Equal(t, uint64(202), job.BinlogInfo.FinishedTS) + }) + + t.Run("accepts done batch create table jobs from tidb_ddl_job", func(t *testing.T) { + // Scenario: batch create tables now follows the same tidb_ddl_job Done + // lifecycle as other DDLs. + // Steps: marshal a Done create-tables job, parse it, and verify the job + // is kept with the KV metadata mapped onto StartTS and FinishedTS. + raw, err := json.Marshal(&timodel.Job{ + Type: timodel.ActionCreateTables, + State: timodel.JobStateDone, + BinlogInfo: &timodel.HistoryInfo{}, + }) + require.NoError(t, err) + + job, err := parseJob(raw, 303, 404) + require.NoError(t, err) + require.NotNil(t, job) + require.Equal(t, timodel.ActionCreateTables, job.Type) + require.Equal(t, uint64(303), job.StartTS) + require.Equal(t, uint64(404), job.BinlogInfo.FinishedTS) + }) + + t.Run("keeps non create table done jobs", func(t *testing.T) { + // Scenario: removing tidb_ddl_history support must not filter unrelated + // DDLs that already come from tidb_ddl_job in Done state. + // Steps: marshal a Done non-create-table job, parse it, and verify it is + // still accepted. + raw, err := json.Marshal(&timodel.Job{ + Type: timodel.ActionDropTable, + State: timodel.JobStateDone, + BinlogInfo: &timodel.HistoryInfo{}, + }) + require.NoError(t, err) + + job, err := parseJob(raw, 505, 606) + require.NoError(t, err) + require.NotNil(t, job) + require.Equal(t, timodel.ActionDropTable, job.Type) + require.Equal(t, uint64(505), job.StartTS) + require.Equal(t, uint64(606), job.BinlogInfo.FinishedTS) + }) + + t.Run("ignores synced create table compatibility jobs", func(t *testing.T) { + // Scenario: TiCDC no longer consumes tidb_ddl_history, so synced-only + // create-table jobs must not be replayed into schema storage. + // Steps: marshal a Synced create-table job, parse it, and verify the + // parser drops it instead of rewriting it to Done. + raw, err := json.Marshal(&timodel.Job{ + Type: timodel.ActionCreateTable, + State: timodel.JobStateSynced, + BinlogInfo: &timodel.HistoryInfo{}, + }) + require.NoError(t, err) + + job, err := parseJob(raw, 101, 202) + require.NoError(t, err) + require.Nil(t, job) + }) +} + var createTableSQL = `create table t ( id int primary key auto_increment, diff --git a/pkg/common/span_op.go b/pkg/common/span_op.go index 717757e47a..7f2d66be00 100644 --- a/pkg/common/span_op.go +++ b/pkg/common/span_op.go @@ -32,9 +32,13 @@ import ( const ( // JobTableID is the id of `tidb_ddl_job`. +<<<<<<< HEAD JobTableID = ddl.JobTableID // JobHistoryID is the id of `tidb_ddl_history` JobHistoryID = ddl.HistoryTableID +======= + JobTableID = metadef.TiDBDDLJobTableID +>>>>>>> 895bb89bc (logservice,event: stop capturing tidb_ddl_history (#4766)) ) // TableIDToComparableSpan converts a TableID to a Span whose