cmd: isolate DDL stale drop keys in consumers#4455
cmd: isolate DDL stale drop keys in consumers#4455wlwilliamx wants to merge 8 commits intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughIntroduces per-DDL-object ordering keys and replaces per-table commit-ts tracking with a keyed map; DDLs are pre-sorted by commit-ts and may bypass watermark for certain independent DDLs; append/write logic updated to drop only truly stale DDLs using the new ordering keys. Changes
Sequence Diagram(s)sequenceDiagram
participant Producer
participant Writer
participant Buffer as "DDL Buffer"
participant Sorter
participant Watermark as "Watermark Checker"
participant Sink
Producer->>Writer: appendDDL(ddl)
Writer->>Writer: compute orderKeys (getDDLOrderKeys)
alt commitTs >= maxCommitTs[any orderKey]
Writer->>Buffer: enqueue DDL
else
Writer-->>Producer: drop stale DDL
end
Writer->>Sorter: Write() -> fetch buffered DDLs
Sorter->>Sorter: sort.SliceStable by commitTs
loop for each DDL in sorted order
Sorter->>Watermark: isBypass(ddl)? / watermark >= commitTs?
alt bypassWatermark
Watermark->>Sink: flush DDL immediately
else if watermark >= commitTs
Watermark->>Sink: flush DDL
else
Watermark->>Writer: defer remaining DDLs (break)
end
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refines the DDL (Data Definition Language) processing within the Kafka and Pulsar consumers to prevent the incorrect dropping of out-of-order Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request effectively addresses a bug in MQ consumers where out-of-order DDLs could be incorrectly dropped. The introduction of ddlOrderKey to distinguish between flush coordination and stale-drop logic is a solid approach. The changes are well-commented, and the addition of comprehensive regression tests is commendable. My review includes suggestions to address significant code duplication between the Kafka and Pulsar consumers, and a potential issue in the new watermark bypass logic.
| bypassWatermark = blockedTables != nil && | ||
| blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal && | ||
| len(blockedTables.TableIDs) == 1 && | ||
| blockedTables.TableIDs[0] == commonType.DDLSpanTableID && | ||
| len(todoDDL.GetBlockedTableNames()) == 0 |
There was a problem hiding this comment.
The logic to bypass the watermark for an independent CREATE TABLE DDL seems to handle only the case where TableIDs contains just DDLSpanTableID. However, as shown in TestWriterAppendDDL_keepsCrossObjectCreateDDLsWithEmptyBlockedTableIDs, an independent CREATE TABLE can also have an empty TableIDs. In that scenario, this logic will not bypass the watermark, which could lead to the same deadlock issue in integration tests that this change aims to solve.
Consider adjusting the condition to also handle an empty TableIDs slice for independent CREATE TABLE DDLs.
| bypassWatermark = blockedTables != nil && | |
| blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal && | |
| len(blockedTables.TableIDs) == 1 && | |
| blockedTables.TableIDs[0] == commonType.DDLSpanTableID && | |
| len(todoDDL.GetBlockedTableNames()) == 0 | |
| bypassWatermark = blockedTables != nil && | |
| blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal && | |
| (len(blockedTables.TableIDs) == 0 || | |
| (len(blockedTables.TableIDs) == 1 && blockedTables.TableIDs[0] == commonType.DDLSpanTableID)) && | |
| len(todoDDL.GetBlockedTableNames()) == 0 |
| bypassWatermark = blockedTables != nil && | ||
| blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal && | ||
| len(blockedTables.TableIDs) == 1 && | ||
| blockedTables.TableIDs[0] == commonType.DDLSpanTableID && | ||
| len(todoDDL.GetBlockedTableNames()) == 0 |
There was a problem hiding this comment.
The logic to bypass the watermark for an independent CREATE TABLE DDL seems to handle only the case where TableIDs contains just DDLSpanTableID. However, as shown in TestWriterAppendDDL_keepsCrossObjectCreateDDLsWithEmptyBlockedTableIDs, an independent CREATE TABLE can also have an empty TableIDs. In that scenario, this logic will not bypass the watermark, which could lead to the same deadlock issue in integration tests that this change aims to solve.
Consider adjusting the condition to also handle an empty TableIDs slice for independent CREATE TABLE DDLs.
| bypassWatermark = blockedTables != nil && | |
| blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal && | |
| len(blockedTables.TableIDs) == 1 && | |
| blockedTables.TableIDs[0] == commonType.DDLSpanTableID && | |
| len(todoDDL.GetBlockedTableNames()) == 0 | |
| bypassWatermark = blockedTables != nil && | |
| blockedTables.InfluenceType == commonEvent.InfluenceTypeNormal && | |
| (len(blockedTables.TableIDs) == 0 || | |
| (len(blockedTables.TableIDs) == 1 && blockedTables.TableIDs[0] == commonType.DDLSpanTableID)) && | |
| len(todoDDL.GetBlockedTableNames()) == 0 |
| type ddlOrderKeyKind uint8 | ||
|
|
||
| const ( | ||
| ddlOrderKeyKindGlobal ddlOrderKeyKind = iota | ||
| ddlOrderKeyKindTable | ||
| ddlOrderKeyKindSchema | ||
| ddlOrderKeyKindSchemaTable | ||
| ) | ||
|
|
||
| // ddlOrderKey identifies which logical object a DDL ordering guarantee applies to. | ||
| // | ||
| // It is intentionally different from the flush barrier set returned by getBlockTableIDs(): | ||
| // DDLSpanTableID is a shared coordination barrier, not a real object identity. Reusing it as the | ||
| // stale-drop key causes unrelated CREATE SCHEMA / independent CREATE TABLE DDLs to suppress each other. | ||
| type ddlOrderKey struct { | ||
| kind ddlOrderKeyKind | ||
| id int64 | ||
| schema string | ||
| table string | ||
| } |
There was a problem hiding this comment.
This new DDL ordering logic, including ddlOrderKeyKind, ddlOrderKey, and the helper functions newDDLOrderKeyForSchema, newDDLOrderKeyForTable, and getDDLOrderKeys, is also present in cmd/pulsar-consumer/writer.go. To improve maintainability and avoid code duplication, consider extracting this shared logic into a common package, for example, in pkg/cmd/util or a new dedicated package. This would ensure that any future changes to this logic only need to be made in one place.
| // recordingSink is a minimal sink.Sink implementation that records which DDLs are executed. | ||
| // | ||
| // It lets unit tests validate consumer-side DDL flushing behavior without requiring a real downstream. | ||
| type recordingSink struct { | ||
| ddls []string | ||
| } | ||
|
|
||
| var _ sink.Sink = (*recordingSink)(nil) | ||
|
|
||
| func (s *recordingSink) SinkType() common.SinkType { return common.MysqlSinkType } | ||
| func (s *recordingSink) IsNormal() bool { return true } | ||
| func (s *recordingSink) AddDMLEvent(_ *commonEvent.DMLEvent) { | ||
| } | ||
|
|
||
| func (s *recordingSink) WriteBlockEvent(event commonEvent.BlockEvent) error { | ||
| if ddl, ok := event.(*commonEvent.DDLEvent); ok { | ||
| s.ddls = append(s.ddls, ddl.Query) | ||
| } | ||
| return nil | ||
| } | ||
|
|
||
| func (s *recordingSink) AddCheckpointTs(_ uint64) { | ||
| } | ||
|
|
||
| func (s *recordingSink) SetTableSchemaStore(_ *commonEvent.TableSchemaStore) { | ||
| } | ||
|
|
||
| func (s *recordingSink) Close(_ bool) { | ||
| } | ||
| func (s *recordingSink) Run(_ context.Context) error { return nil } |
There was a problem hiding this comment.
This test file is nearly identical to cmd/pulsar-consumer/writer_test.go. The recordingSink and all the test cases are duplicated. This indicates that the tested DDL handling logic is generic. To reduce code duplication and make tests easier to maintain, consider refactoring these tests into a shared test suite. You could create test helpers in a common package that take a writer interface or a factory function to set up the consumer-specific writer, allowing the same test logic to be executed for both Kafka and Pulsar consumers.
…r-ddl-order-key # Conflicts: # cmd/kafka-consumer/writer.go # cmd/kafka-consumer/writer_test.go # cmd/pulsar-consumer/writer.go # cmd/pulsar-consumer/writer_test.go
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@cmd/kafka-consumer/writer.go`:
- Around line 315-330: The current early-return for
commonEvent.InfluenceTypeNormal collects blockedTables.TableIDs and returns them
before handling timodel.ActionCreateTable, causing CREATE TABLE ... LIKE src to
be keyed by src; change the logic so ActionCreateTable is handled before the
early return (or special-case it) by invoking the switch
timodel.ActionType(ddl.Type) (and adding newDDLOrderKeyForTable(ddl)) prior to
returning keys, or ensure when ddl is ActionCreateTable you add the
created-table key (newDDLOrderKeyForTable) and avoid returning only
blockedTables.TableIDs; update the code paths around
commonEvent.InfluenceTypeNormal, the blockedTables.TableIDs loop, the early
return, and the switch handling (timodel.ActionCreateTable /
newDDLOrderKeyForTable) accordingly so stale-drop uses the created table
identity.
In `@cmd/pulsar-consumer/writer.go`:
- Around line 306-324: The current commonEvent.InfluenceTypeNormal block returns
blockedTables.TableIDs as soon as any real table ID is present, which causes
ActionCreateTable to be keyed by the referenced/source table instead of the
created table; modify the code in that case block (around
blockedTables.TableIDs, ddlOrderKey, ddlOrderKeyKindTable,
commonType.DDLSpanTableID, timodel.ActionType(ddl.Type),
timodel.ActionCreateTable, newDDLOrderKeyForTable, newDDLOrderKeyForSchema,
addGlobalKey) so that ActionCreateTable always uses the created-table key:
specifically, do not short-circuit and return keys for the general blocked table
case when timodel.ActionCreateTable applies—instead either move the len(keys) !=
0 check after the switch or special-case ActionCreateTable to replace/override
keys with newDDLOrderKeyForTable(ddl) (and return that), leaving other action
types to use blocked tables or the schema/global keys as before.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 85c99fc4-1959-4be3-bf91-195c6c2167c7
📒 Files selected for processing (4)
cmd/kafka-consumer/writer.gocmd/kafka-consumer/writer_test.gocmd/pulsar-consumer/writer.gocmd/pulsar-consumer/writer_test.go
| case commonEvent.InfluenceTypeNormal: | ||
| for _, tableID := range blockedTables.TableIDs { | ||
| if tableID == commonType.DDLSpanTableID { | ||
| continue | ||
| } | ||
| keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{} | ||
| } | ||
| if len(keys) != 0 { | ||
| return keys | ||
| } | ||
|
|
||
| switch timodel.ActionType(ddl.Type) { | ||
| case timodel.ActionCreateSchema: | ||
| keys[newDDLOrderKeyForSchema(ddl)] = struct{}{} | ||
| case timodel.ActionCreateTable: | ||
| keys[newDDLOrderKeyForTable(ddl)] = struct{}{} |
There was a problem hiding this comment.
Don't use referenced source-table IDs as the stale key for ActionCreateTable.
Lines 315-323 return the blocked table IDs for every normal DDL, so CREATE TABLE ... LIKE src is still keyed by src instead of by the created table. If CREATE TABLE dst1 LIKE src and CREATE TABLE dst2 LIKE src arrive out of order, the older one will be dropped as "stale" even though the two DDLs target different objects. The source-table dependency is already handled by watermark gating; stale-drop should use the created table identity here.
💡 Suggested fix
case commonEvent.InfluenceTypeNormal:
+ if timodel.ActionType(ddl.Type) == timodel.ActionCreateTable {
+ keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
+ return keys
+ }
for _, tableID := range blockedTables.TableIDs {
if tableID == commonType.DDLSpanTableID {
continue
}
keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
}
if len(keys) != 0 {
return keys
}
switch timodel.ActionType(ddl.Type) {
case timodel.ActionCreateSchema:
keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
- case timodel.ActionCreateTable:
- keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
default:
addGlobalKey()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case commonEvent.InfluenceTypeNormal: | |
| for _, tableID := range blockedTables.TableIDs { | |
| if tableID == commonType.DDLSpanTableID { | |
| continue | |
| } | |
| keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{} | |
| } | |
| if len(keys) != 0 { | |
| return keys | |
| } | |
| switch timodel.ActionType(ddl.Type) { | |
| case timodel.ActionCreateSchema: | |
| keys[newDDLOrderKeyForSchema(ddl)] = struct{}{} | |
| case timodel.ActionCreateTable: | |
| keys[newDDLOrderKeyForTable(ddl)] = struct{}{} | |
| case commonEvent.InfluenceTypeNormal: | |
| if timodel.ActionType(ddl.Type) == timodel.ActionCreateTable { | |
| keys[newDDLOrderKeyForTable(ddl)] = struct{}{} | |
| return keys | |
| } | |
| for _, tableID := range blockedTables.TableIDs { | |
| if tableID == commonType.DDLSpanTableID { | |
| continue | |
| } | |
| keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{} | |
| } | |
| if len(keys) != 0 { | |
| return keys | |
| } | |
| switch timodel.ActionType(ddl.Type) { | |
| case timodel.ActionCreateSchema: | |
| keys[newDDLOrderKeyForSchema(ddl)] = struct{}{} |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/kafka-consumer/writer.go` around lines 315 - 330, The current
early-return for commonEvent.InfluenceTypeNormal collects blockedTables.TableIDs
and returns them before handling timodel.ActionCreateTable, causing CREATE TABLE
... LIKE src to be keyed by src; change the logic so ActionCreateTable is
handled before the early return (or special-case it) by invoking the switch
timodel.ActionType(ddl.Type) (and adding newDDLOrderKeyForTable(ddl)) prior to
returning keys, or ensure when ddl is ActionCreateTable you add the
created-table key (newDDLOrderKeyForTable) and avoid returning only
blockedTables.TableIDs; update the code paths around
commonEvent.InfluenceTypeNormal, the blockedTables.TableIDs loop, the early
return, and the switch handling (timodel.ActionCreateTable /
newDDLOrderKeyForTable) accordingly so stale-drop uses the created table
identity.
| case commonEvent.InfluenceTypeNormal: | ||
| for _, tableID := range blockedTables.TableIDs { | ||
| if tableID == commonType.DDLSpanTableID { | ||
| continue | ||
| } | ||
| keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{} | ||
| } | ||
| if len(keys) != 0 { | ||
| return keys | ||
| } | ||
|
|
||
| switch timodel.ActionType(ddl.Type) { | ||
| case timodel.ActionCreateSchema: | ||
| keys[newDDLOrderKeyForSchema(ddl)] = struct{}{} | ||
| case timodel.ActionCreateTable: | ||
| keys[newDDLOrderKeyForTable(ddl)] = struct{}{} | ||
| default: | ||
| addGlobalKey() | ||
| } |
There was a problem hiding this comment.
Key ActionCreateTable by the created table, not the referenced blocked table.
Lines 306-315 return BlockedTables.TableIDs as soon as any real table ID is present, so CREATE TABLE ... LIKE src is still keyed by the source table. That means two valid DDLs like CREATE TABLE dst1 LIKE src and CREATE TABLE dst2 LIKE src can suppress each other if they arrive out of order, even though they target different objects. Watermark gating already protects the source-table dependency; the stale-drop key for ActionCreateTable should be the created table only.
💡 Suggested fix
case commonEvent.InfluenceTypeNormal:
+ if timodel.ActionType(ddl.Type) == timodel.ActionCreateTable {
+ keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
+ return keys
+ }
for _, tableID := range blockedTables.TableIDs {
if tableID == commonType.DDLSpanTableID {
continue
}
keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{}
}
if len(keys) != 0 {
return keys
}
switch timodel.ActionType(ddl.Type) {
case timodel.ActionCreateSchema:
keys[newDDLOrderKeyForSchema(ddl)] = struct{}{}
- case timodel.ActionCreateTable:
- keys[newDDLOrderKeyForTable(ddl)] = struct{}{}
default:
addGlobalKey()
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| case commonEvent.InfluenceTypeNormal: | |
| for _, tableID := range blockedTables.TableIDs { | |
| if tableID == commonType.DDLSpanTableID { | |
| continue | |
| } | |
| keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{} | |
| } | |
| if len(keys) != 0 { | |
| return keys | |
| } | |
| switch timodel.ActionType(ddl.Type) { | |
| case timodel.ActionCreateSchema: | |
| keys[newDDLOrderKeyForSchema(ddl)] = struct{}{} | |
| case timodel.ActionCreateTable: | |
| keys[newDDLOrderKeyForTable(ddl)] = struct{}{} | |
| default: | |
| addGlobalKey() | |
| } | |
| case commonEvent.InfluenceTypeNormal: | |
| if timodel.ActionType(ddl.Type) == timodel.ActionCreateTable { | |
| keys[newDDLOrderKeyForTable(ddl)] = struct{}{} | |
| return keys | |
| } | |
| for _, tableID := range blockedTables.TableIDs { | |
| if tableID == commonType.DDLSpanTableID { | |
| continue | |
| } | |
| keys[ddlOrderKey{kind: ddlOrderKeyKindTable, id: tableID}] = struct{}{} | |
| } | |
| if len(keys) != 0 { | |
| return keys | |
| } | |
| switch timodel.ActionType(ddl.Type) { | |
| case timodel.ActionCreateSchema: | |
| keys[newDDLOrderKeyForSchema(ddl)] = struct{}{} | |
| default: | |
| addGlobalKey() | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@cmd/pulsar-consumer/writer.go` around lines 306 - 324, The current
commonEvent.InfluenceTypeNormal block returns blockedTables.TableIDs as soon as
any real table ID is present, which causes ActionCreateTable to be keyed by the
referenced/source table instead of the created table; modify the code in that
case block (around blockedTables.TableIDs, ddlOrderKey, ddlOrderKeyKindTable,
commonType.DDLSpanTableID, timodel.ActionType(ddl.Type),
timodel.ActionCreateTable, newDDLOrderKeyForTable, newDDLOrderKeyForSchema,
addGlobalKey) so that ActionCreateTable always uses the created-table key:
specifically, do not short-circuit and return keys for the general blocked table
case when timodel.ActionCreateTable applies—instead either move the len(keys) !=
0 check after the switch or special-case ActionCreateTable to replace/override
keys with newDDLOrderKeyForTable(ddl) (and return that), leaving other action
types to use blocked tables or the schema/global keys as before.
|
/test all |
|
@wlwilliamx: The following tests failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #4454
What is changed and how it works?
This PR separates DDL flush coordination keys from DDL stale-drop keys in the Kafka and Pulsar consumers.
getBlockTableIDs()is kept for flush and watermark coordination, butappendDDL()now uses a dedicatedddlOrderKeyto decide whether a DDL is truly stale.For DDLSpan-only
CREATE SCHEMA/ independentCREATE TABLE, the new logic no longer reuses the sharedDDLSpanTableIDas the identity key. Instead, it falls back to schema-scoped or table-scoped logical object keys, so cross-object DDLs stay queued and are later sorted by commitTs.The PR also adds regression tests for:
CREATE SCHEMAALTER TABLEBlockedTables.TableIDson create DDLsCheck List
Tests
Manual test:
go test ./cmd/kafka-consumer ./cmd/pulsar-consumer -count=1Questions
Will it cause performance regression or break compatibility?
No material performance regression is expected. The change is limited to in-memory DDL stale-drop keying inside the MQ consumers and does not change the existing flush or watermark behavior.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit
Bug Fixes
Improvements
Tests