-
Notifications
You must be signed in to change notification settings - Fork 50
logservice,event: stop capturing tidb_ddl_history (#4766) #4905
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: release-8.5
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -32,17 +32,13 @@ import ( | |||||
| "go.uber.org/zap" | ||||||
| ) | ||||||
|
|
||||||
| // DDLTableInfo contains the tableInfo about tidb_ddl_job and tidb_ddl_history | ||||||
| // and the column id of `job_meta` in these two tables. | ||||||
| // DDLTableInfo contains the tableInfo about tidb_ddl_job and the column id of | ||||||
| // `job_meta` in that table. | ||||||
| type DDLTableInfo struct { | ||||||
| // ddlJobsTable use to parse all ddl jobs except `create table` | ||||||
| // DDLJobTable is used to parse DDL jobs from tidb_ddl_job. | ||||||
| DDLJobTable *common.TableInfo | ||||||
| // It holds the column id of `job_meta` in table `tidb_ddl_jobs`. | ||||||
| JobMetaColumnIDinJobTable int64 | ||||||
| // ddlHistoryTable only use to parse `create table` ddl job | ||||||
| DDLHistoryTable *common.TableInfo | ||||||
| // It holds the column id of `job_meta` in table `tidb_ddl_history`. | ||||||
| JobMetaColumnIDinHistoryTable int64 | ||||||
| } | ||||||
|
|
||||||
| // Mounter is used to parse SQL events from KV events | ||||||
|
|
@@ -183,11 +179,7 @@ func ParseDDLJob(rawKV *common.RawKVEntry, ddlTableInfo *DDLTableInfo) (*model.J | |||||
| // for test case only | ||||||
| if bytes.HasPrefix(rawKV.Key, metaPrefix) { | ||||||
| v = rawKV.Value | ||||||
| job, err := parseJob(v, rawKV.StartTs, rawKV.CRTs, false) | ||||||
| if err != nil || job == nil { | ||||||
| job, err = parseJob(v, rawKV.StartTs, rawKV.CRTs, true) | ||||||
| } | ||||||
| return job, err | ||||||
| return parseJob(v, rawKV.StartTs, rawKV.CRTs) | ||||||
| } | ||||||
|
|
||||||
| rawKV.Key = RemoveKeyspacePrefix(rawKV.Key) | ||||||
|
|
@@ -207,55 +199,26 @@ func ParseDDLJob(rawKV *common.RawKVEntry, ddlTableInfo *DDLTableInfo) (*model.J | |||||
| datum = row[ddlTableInfo.JobMetaColumnIDinJobTable] | ||||||
| v = datum.GetBytes() | ||||||
|
|
||||||
| return parseJob(v, rawKV.StartTs, rawKV.CRTs, false) | ||||||
| } else if tableID == common.JobHistoryID { | ||||||
| // parse it with tidb_ddl_history | ||||||
| row, err := decodeRow(rawKV.Value, recordID, ddlTableInfo.DDLHistoryTable, time.UTC) | ||||||
| if err != nil { | ||||||
| return nil, errors.Trace(err) | ||||||
| } | ||||||
| datum = row[ddlTableInfo.JobMetaColumnIDinHistoryTable] | ||||||
| v = datum.GetBytes() | ||||||
| return parseJob(v, rawKV.StartTs, rawKV.CRTs, true) | ||||||
| return parseJob(v, rawKV.StartTs, rawKV.CRTs) | ||||||
| } | ||||||
|
|
||||||
| return nil, fmt.Errorf("invalid tableID %v in rawKV.Key", tableID) | ||||||
| } | ||||||
|
|
||||||
| // parseJob unmarshal the job from "v". | ||||||
| // fromHistoryTable is used to distinguish the job is from tidb_dd_job or tidb_ddl_history | ||||||
| // We need to be compatible with the two modes, enable_fast_create_table=on and enable_fast_create_table=off | ||||||
| // When enable_fast_create_table=on, `create table` will only be inserted into tidb_ddl_history after being executed successfully. | ||||||
| // When enable_fast_create_table=off, `create table` just like other ddls will be firstly inserted to tidb_ddl_job, | ||||||
| // and being inserted into tidb_ddl_history after being executed successfully. | ||||||
| // In both two modes, other ddls are all firstly inserted into tidb_ddl_job, and then inserted into tidb_ddl_history after being executed successfully. | ||||||
| // | ||||||
| // To be compatible with these two modes, we will get `create table` ddl from tidb_ddl_history, and all ddls from tidb_ddl_job. | ||||||
| // When enable_fast_create_table=off, for each `create table` ddl we will get twice(once from tidb_ddl_history, once from tidb_ddl_job) | ||||||
| // Because in `handleJob` we will skip the repeated ddls, thus it's ok for us to get `create table` twice. | ||||||
| // Besides, the `create table` from tidb_ddl_job always have a earlier commitTs than from tidb_ddl_history. | ||||||
| // Therefore, we always use the commitTs of ddl from `tidb_ddl_job` as StartTs, which ensures we can get all the dmls. | ||||||
| func parseJob(v []byte, startTs, CRTs uint64, fromHistoryTable bool) (*model.Job, error) { | ||||||
| // parseJob unmarshals a job from tidb_ddl_job. | ||||||
| // TiCDC now relies on TiDB's normalized DDL lifecycle, so only jobs that have | ||||||
| // already reached JobStateDone are replayed into the schema store. | ||||||
| func parseJob(v []byte, startTs, CRTs uint64) (*model.Job, error) { | ||||||
| var job model.Job | ||||||
| err := json.Unmarshal(v, &job) | ||||||
| if err != nil { | ||||||
| return nil, errors.Trace(err) | ||||||
| } | ||||||
|
|
||||||
| if fromHistoryTable { | ||||||
| // we only want to get `create table` and `create tables` ddl from tidb_ddl_history, so we just throw out others ddls. | ||||||
| // We only want the job with `JobStateSynced`, which is means the ddl job is done successfully. | ||||||
| // Besides, to satisfy the subsequent processing, | ||||||
| // We need to set the job to be Done to make it will replay in schemaStorage | ||||||
| if (job.Type != model.ActionCreateTable && job.Type != model.ActionCreateTables) || job.State != model.JobStateSynced { | ||||||
| return nil, nil | ||||||
| } | ||||||
| job.State = model.JobStateDone | ||||||
| } else { | ||||||
| // we need to get all ddl job which is done from tidb_ddl_job | ||||||
| if !job.IsDone() { | ||||||
| return nil, nil | ||||||
| } | ||||||
| // We only replay DDL jobs that are already visible in the normalized Done | ||||||
| // state so schema store ordering stays aligned with the job table stream. | ||||||
| if !job.IsDone() { | ||||||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The PR description and the added test case
Suggested change
|
||||||
| return nil, nil | ||||||
| } | ||||||
|
|
||||||
| // FinishedTS is only set when the job is synced, | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,9 +32,13 @@ | |
|
|
||
| const ( | ||
| // JobTableID is the id of `tidb_ddl_job`. | ||
| <<<<<<< HEAD | ||
| JobTableID = ddl.JobTableID | ||
| // JobHistoryID is the id of `tidb_ddl_history` | ||
| JobHistoryID = ddl.HistoryTableID | ||
| ======= | ||
| JobTableID = metadef.TiDBDDLJobTableID | ||
| >>>>>>> 895bb89bc (logservice,event: stop capturing tidb_ddl_history (#4766)) | ||
|
Check failure on line 41 in pkg/common/span_op.go
|
||
|
Comment on lines
+35
to
+41
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The code contains git merge conflict markers. Additionally, JobTableID = ddl.JobTableID |
||
| ) | ||
|
|
||
| // TableIDToComparableSpan converts a TableID to a Span whose | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code contains git merge conflict markers. These should be resolved before merging. Since
common.JobTableIDis now used ingetAllDDLSpan, these local constants are redundant and can be removed.