Skip to content

DNM: refine dispatcher scan priority with syncpoint refine#4123

Draft
asddongmen wants to merge 36 commits intopingcap:masterfrom
asddongmen:0119-refine-dispatcher-scan-priority-with-syncpoint-refine
Draft

DNM: refine dispatcher scan priority with syncpoint refine#4123
asddongmen wants to merge 36 commits intopingcap:masterfrom
asddongmen:0119-refine-dispatcher-scan-priority-with-syncpoint-refine

Conversation

@asddongmen
Copy link
Collaborator

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 3, 2026

Skipping CI for Draft Pull Request.
If you want CI signal for your change, please convert it to an actual PR.
You can still manually trigger a test run with /test all

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue release-note Denotes a PR that will be considered when it comes time to generate release notes. do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. labels Feb 3, 2026
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 3, 2026

[APPROVALNOTIFIER] This PR is NOT APPROVED

This pull-request has been approved by:
Once this PR has been reviewed and has the lgtm label, please assign flowbehappy for approval. For more information see the Code Review Process.
Please ensure that each of them provides their approval before proceeding.

The full list of commands accepted by this bot can be found here.

Details Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

@coderabbitai
Copy link

coderabbitai bot commented Feb 3, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

  • 🔍 Trigger a full review
✨ Finishing touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment

Important

Action Needed: IP Allowlist Update

If your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:

  • 136.113.208.247/32 (new)
  • 34.170.211.100/32
  • 35.222.179.152/32

Reviews will stop working after February 8, 2026 if the new IP is not added to your allowlist.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

@ti-chi-bot ti-chi-bot bot added the size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files. label Feb 3, 2026
@gemini-code-assist
Copy link

Summary of Changes

Hello @asddongmen, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces significant enhancements to event dispatching, memory management, and testing capabilities. It refines how syncpoints are handled by allowing them to be skipped under certain conditions, improving system responsiveness. A new dynamic scan interval adjustment mechanism intelligently adapts event processing rates based on memory pressure, optimizing resource utilization. The underlying congestion control protocol is upgraded to provide richer memory usage data. Additionally, the workload testing tool now supports comprehensive DDL operations, enabling more thorough testing of schema evolution scenarios.

Highlights

  • Syncpoint Refinement and Skipping: Introduced a new Action_Skip type for dispatcher actions, allowing the system to bypass syncpoints under specific conditions (e.g., when syncPointSkipEnabled is true or a syncpoint has already been skipped). This mechanism is managed by the Barrier component, which now includes logic to set skip states, record skipped syncpoints, and build skip actions for dispatchers.
  • Dynamic Scan Interval Adjustment: Implemented a dynamic scan interval adjustment mechanism within the event service. This system intelligently modifies the event scanning rate based on real-time memory pressure (used, max, and available memory), employing a 'fast brake, slow accelerate' approach with tiered responses and trend prediction to optimize resource usage and prevent bottlenecks.
  • Congestion Control Protocol V2: Upgraded the congestion control protocol to CongestionControlVersion2. This new version enhances the AvailableMemory structure to include additional metrics such as Used and Max memory, providing more comprehensive data for the event collector to assess memory pressure and make informed decisions.
  • Workload Tool DDL Support: Significantly extended the tools/workload utility to support DDL (Data Definition Language) workloads. Users can now define and execute various DDL operations (add/drop column, add/drop index, truncate table) using a TOML configuration file, targeting either fixed tables or randomly selected ones, enabling more robust testing of schema changes.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • downstreamadapter/dispatcher/basic_dispatcher.go
    • Updated HandleDispatcherStatus to use a switch statement for action.Action, explicitly handling Action_Skip alongside Action_Write and Action_Pass.
  • downstreamadapter/eventcollector/event_collector.go
    • Added lastCongestionLogTime field.
    • Modified newCongestionControlMessages to collect changefeedUsedMemory, changefeedMaxMemory, totalDispatchers, dispatchersWithoutService, and changefeedsInMessages.
    • Updated AddAvailableMemoryWithDispatchers to AddAvailableMemoryWithDispatchersAndUsage for CongestionControlVersion2.
    • Added logging for congestion control build summary.
  • heartbeatpb/heartbeat.pb.go
    • Added Action_Skip to the Action enum.
  • heartbeatpb/heartbeat.proto
    • Added Skip = 2 to the Action enum.
  • maintainer/barrier.go
    • Imported metrics and atomic.
    • Added fields syncPointSkipEnabled, syncPointSkipCheckpointLag, syncPointInterval, maxSkippedSyncPointTs.
    • Added SetSyncPointSkipState, isSkippedSyncPointCommitTs, recordSkippedSyncPoint, and buildSkipSyncPointStatus functions.
    • Modified HandleStatus to handle nil status states and implement logic for skipping syncpoints.
  • maintainer/barrier_test.go
    • Added TestSyncPointSkip to test the new syncpoint skipping functionality.
  • maintainer/maintainer.go
    • Added calculateSyncPointSkipState and refreshBarrierSyncPointSkipState functions.
    • Integrated refreshBarrierSyncPointSkipState into onBlockStateRequest and handleResendMessage.
  • pkg/common/event/congestion_control.go
    • Added CongestionControlVersion2.
    • Modified AvailableMemory struct to include Used and Max fields.
    • Introduced marshalV1, marshalV2, unmarshalV1, unmarshalV2, sizeV1, sizeV2 methods for versioned serialization.
    • Added NewCongestionControlWithVersion and AddAvailableMemoryWithDispatchersAndUsage.
  • pkg/common/event/congestion_control_test.go
    • Removed comment about DispatcherAvailable not being properly serialized.
    • Added TestCongestionControlV2 to test the new versioned congestion control.
  • pkg/eventservice/dispatcher_stat.go
    • Added minSentTs, scanInterval, lastRatio, lastAdjustTime, lastTrendAdjustTime, usageWindow, syncPointEnabled, syncPointInterval, lastUsageLogTime, lastUsageWindowLogTime fields to changefeedStatus.
    • Initialized these new fields in newChangefeedStatus.
  • pkg/eventservice/event_broker.go
    • Added lastCongestionLogTime to eventBroker.
    • Added refreshMinSentResolvedTs goroutine.
    • Modified getScanTaskDataRange to cap CommitTsEnd by scanMaxTs.
    • Added updateSyncPointConfig call in addDispatcher and resetDispatcher.
    • Added metric deletions for EventServiceAvailableMemoryQuotaGaugeVec, EventServiceScanWindowBaseTsGaugeVec, EventServiceScanWindowIntervalGaugeVec when a changefeed is removed.
    • Updated handleDispatcherHeartbeat to track changedChangefeeds and use now for lastReceivedHeartbeatTime.
    • Modified handleCongestionControl to process CongestionControlVersion2 data, including used and max memory, and call updateMemoryUsage.
  • pkg/eventservice/event_broker_test.go
    • Added TestScanRangeCappedByScanWindow.
    • Added TestHandleCongestionControlV2AdjustsScanInterval.
    • Added TestHandleCongestionControlV1DoesNotAdjustScanInterval.
  • pkg/eventservice/scan_window.go
    • New file: Implements dynamic scan interval adjustment logic based on memory usage, including thresholds for critical, high, low, and very low pressure, trend detection, and cooldowns for increases. Defines memoryUsageSample, memoryUsageWindow, memoryUsageStats structs and related functions.
  • pkg/eventservice/scan_window_test.go
    • New file: Contains unit tests for the dynamic scan interval adjustment, covering various scenarios like sync point caps, decrease behavior, pressure signals, jittered samples, and increasing usage trends. Also tests refreshMinSentResolvedTs, getScanMaxTs, and updateSyncPointConfig.
  • pkg/metrics/event_service.go
    • Added EventServiceScanWindowBaseTsGaugeVec and EventServiceScanWindowIntervalGaugeVec metrics.
    • Registered these new metrics.
  • pkg/metrics/maintainer.go
    • Added MaintainerSkipSyncPointCount metric and registered it.
  • pkg/sink/mysql/mysql_writer_for_syncpoint.go
    • Added a log message for SendSyncPointEvent.
  • tools/workload/Makefile
    • Modified go build command to include -mod=vendor.
  • tools/workload/app.go
    • Added DDLExecuted, DDLSucceeded, DDLSkipped, DDLFailed to WorkloadStats.
    • Modified executeWorkload to handle "ddl" action and OnlyDDL flag, calling handleDDLExecution.
  • tools/workload/config.go
    • Imported time.
    • Added DDLConfigPath, DDLWorker, DDLTimeout fields to WorkloadConfig.
    • Updated ParseFlags to include new DDL-related flags and validation.
  • tools/workload/ddl_app.go
    • New file: Implements handleDDLExecution to load DDL config, create a DDLRunner, and start it.
  • tools/workload/ddl_config.go
    • New file: Defines DDLConfig and DDLRatePerMinute structs for DDL workload configuration. Includes functions for loading, normalizing, and validating DDL configurations from TOML files. Also defines TableName struct and ParseTableName utility.
  • tools/workload/ddl_config_test.go
    • New file: Contains unit tests for ParseTableName and LoadDDLConfig.
  • tools/workload/ddl_executor.go
    • New file: Implements DDL execution logic for the workload tool. Includes startWorkers, executeTask, buildDDL (for add/drop column, add/drop index, truncate table), and helper functions like ddlNameSuffix, quoteIdent, quoteTable, selectOne.
  • tools/workload/ddl_runner.go
    • New file: Manages the DDL workload execution. Defines DDLTask and DDLRunner structs. Includes NewDDLRunner, Start, startTaskSchedulers, startTypeScheduler, startRandomTableRefresh, refreshRandomTables, fetchBaseTables, sampleStrings, ddlTableSelector interface, fixedTableSelector, and randomTableSelector.
  • tools/workload/ddl_types.go
    • New file: Defines DDLType enum for different DDL operations.
  • tools/workload/go.mod
    • Added github.com/BurntSushi/toml.
  • tools/workload/go.sum
    • Added github.com/BurntSushi/toml entries.
  • tools/workload/readme.md
    • Added a new section "0. DDL Workload" with usage examples for the new DDL workload feature.
    • Updated existing examples to use ./bin/workload.
  • tools/workload/statistics.go
    • Added ddlExecuted, ddlSucceeded, ddlSkipped, ddlFailed, ddls to statistics struct.
    • Modified calculateStats and printStats to include DDL-related metrics.
    • Updated reportMetrics to track lastDDLCount.
  • utils/dynstream/memory_control.go
    • Changed defaultReleaseMemoryRatio from 0.4 to 0.6.
    • Modified checkDeadlock condition for memoryHighWaterMark.
  • utils/dynstream/memory_control_test.go
    • Modified calcExpectedReleasedPaths to reflect the new defaultReleaseMemoryRatio.
    • Updated assertions in TestReleaseMemory to check for multiple released paths.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a sophisticated mechanism to refine dispatcher scan priority and skip sync points, which is crucial for managing high changefeed lag. The implementation includes dynamic scan window adjustments based on memory pressure and adds a new Action_Skip. Additionally, a comprehensive DDL workload generator has been added to the testing tools. The core logic appears sound, with good test coverage and new metrics for better observability. My review includes a few suggestions to improve code clarity and reduce duplication.

Comment on lines +640 to 647
case heartbeatpb.Action_Pass, heartbeatpb.Action_Skip:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
default:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic in the default case is identical to the case heartbeatpb.Action_Pass, heartbeatpb.Action_Skip. This code duplication can be removed by combining them. You can use a default case to handle Pass, Skip, and any potential future actions with the same logic, which simplifies the code and improves maintainability.

Suggested change
case heartbeatpb.Action_Pass, heartbeatpb.Action_Skip:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
default:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)
case heartbeatpb.Action_Pass, heartbeatpb.Action_Skip:
fallthrough
default:
failpoint.Inject("BlockOrWaitBeforePass", nil)
d.PassBlockEventToSink(pendingEvent)
failpoint.Inject("BlockAfterPass", nil)

Comment on lines +654 to +663
if existing, exists := changefeedUsedMemory[cfID]; exists {
changefeedUsedMemory[cfID] = min(existing, uint64(quota.MemoryUsage()))
} else {
changefeedUsedMemory[cfID] = uint64(quota.MemoryUsage())
}
if existing, exists := changefeedMaxMemory[cfID]; exists {
changefeedMaxMemory[cfID] = min(existing, uint64(quota.MaxMemory()))
} else {
changefeedMaxMemory[cfID] = uint64(quota.MaxMemory())
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic to find the minimum value for changefeedUsedMemory and changefeedMaxMemory is duplicated. This pattern of checking existence and updating the minimum value could be extracted into a small helper function to reduce code duplication and improve readability.

}

func (w *Writer) SendSyncPointEvent(event *commonEvent.SyncPointEvent) error {
log.Info("fizz send syncpoint event", zap.Stringer("changefeedID", w.ChangefeedID), zap.Uint64("commitTs", event.GetCommitTs()))

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The log message contains the word "fizz", which appears to be a temporary debug placeholder. It should be removed to make the log message clearer and more professional.

Suggested change
log.Info("fizz send syncpoint event", zap.Stringer("changefeedID", w.ChangefeedID), zap.Uint64("commitTs", event.GetCommitTs()))
log.Info("send syncpoint event", zap.Stringer("changefeedID", w.ChangefeedID), zap.Uint64("commitTs", event.GetCommitTs()))

Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
@ti-chi-bot
Copy link

ti-chi-bot bot commented Feb 4, 2026

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/XXL Denotes a PR that changes 1000+ lines, ignoring generated files.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant