downstreamadapter,config,api: wire event collector batch config#4663
downstreamadapter,config,api: wire event collector batch config#4663ti-chi-bot[bot] merged 20 commits intomasterfrom
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughThis PR introduces configurable event collector batch settings ( Changes
Sequence DiagramsequenceDiagram
actor Config as ReplicaConfig/<br/>ChangefeedConfig
participant DM as DispatcherManager
participant SI as SharedInfo
participant EC as EventCollector
participant DS as DynStream
Config->>DM: Pass batch count/bytes config<br/>(or nil for defaults)
activate DM
DM->>DM: getEventCollectorBatchCountAndBytes()<br/>sink defaults + config overrides
DM->>SI: NewSharedInfo(..., batchCount, batchBytes)
deactivate DM
EC->>SI: target.GetEventCollectorBatchConfig()
activate EC
SI-->>EC: Return batchCount, batchBytes
EC->>DS: NewAreaSettingsWithMaxPendingSizeAndBatchConfig<br/>(batchCount, batchBytes)
deactivate EC
activate DS
DS-->>EC: Configured area with batch settings
deactivate DS
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces per-area batch configuration for the event collector's dynamic stream, allowing for optimized batching performance. It adds batch count and byte limit fields to the API, changefeed, and replica configurations, and updates the sink interfaces to provide default batching values. The dynstream utility has been refactored to support these overrides through a new registry and batcher implementation. Feedback identifies a potential memory leak in stream.go due to the removal of event buffer zeroing and points out a validation error in replica_config.go that incorrectly prevents users from disabling the byte-based batching limit with a zero value.
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
utils/dynstream/stream.go (1)
238-243:⚠️ Potential issue | 🔴 CriticalBuild failure:
cleanUpEventBufis declared but never used.The pipeline reports a compilation error because
cleanUpEventBufis declared but the calls to it were removed. Either remove the unused declaration or restore the cleanup calls.🐛 Proposed fix: remove the unused declaration
var ( eventQueueEmpty = false eventBuf []T - zeroT T - cleanUpEventBuf = func() { - for i := range eventBuf { - eventBuf[i] = zeroT - } - eventBuf = nil - } path *pathInfo[A, P, T, D, H] nBytes int duration time.Duration )Alternatively, if the cleanup is intentionally needed to avoid memory leaks (releasing references to events for GC), the
cleanUpEventBuf()calls should be restored after processing events.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@utils/dynstream/stream.go` around lines 238 - 243, The helper closure cleanUpEventBuf is declared but never used, causing a compile error; either delete the unused declaration or reintroduce calls to it where events are finished processing to release references for GC. If you choose deletion, remove the cleanUpEventBuf variable and its body (the loop zeroing eventBuf and setting it to nil); if cleanup is required, locate where events are consumed (places handling eventBuf in the processing loop) and call cleanUpEventBuf() after processing to zero entries and nil out eventBuf; reference the symbols cleanUpEventBuf, eventBuf and zeroT when making the change.
🧹 Nitpick comments (1)
utils/dynstream/stream.go (1)
254-256: Remove duplicate/stale comment block.This comment block duplicates the intent already stated at lines 233-247. The "todo" comment appears to be leftover from refactoring.
🧹 Proposed cleanup
- // Variables below will be used in the Loop below. - // Declared here to avoid repeated allocation. - // todo: shall we preallocate the event buff and path here ? // 1. Drain the eventChan to pendingQueue. // 2. Pop events from the eventQueue and handle them.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@utils/dynstream/stream.go` around lines 254 - 256, Remove the duplicate/stale comment block that starts with "Variables below will be used in the Loop below." (the TODO about preallocating event buff and path) since the same intent is already documented earlier; locate the block near the variable declarations used by the Loop in stream.go (the comment immediately preceding the variables for event buffer/path) and delete it to avoid redundant/outdated comments.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Outside diff comments:
In `@utils/dynstream/stream.go`:
- Around line 238-243: The helper closure cleanUpEventBuf is declared but never
used, causing a compile error; either delete the unused declaration or
reintroduce calls to it where events are finished processing to release
references for GC. If you choose deletion, remove the cleanUpEventBuf variable
and its body (the loop zeroing eventBuf and setting it to nil); if cleanup is
required, locate where events are consumed (places handling eventBuf in the
processing loop) and call cleanUpEventBuf() after processing to zero entries and
nil out eventBuf; reference the symbols cleanUpEventBuf, eventBuf and zeroT when
making the change.
---
Nitpick comments:
In `@utils/dynstream/stream.go`:
- Around line 254-256: Remove the duplicate/stale comment block that starts with
"Variables below will be used in the Loop below." (the TODO about preallocating
event buff and path) since the same intent is already documented earlier; locate
the block near the variable declarations used by the Loop in stream.go (the
comment immediately preceding the variables for event buffer/path) and delete it
to avoid redundant/outdated comments.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 521f050c-d68b-47f7-a6bb-770085d97b5b
📒 Files selected for processing (21)
api/v2/model.goapi/v2/model_test.godownstreamadapter/dispatcher/basic_dispatcher.godownstreamadapter/dispatcher/basic_dispatcher_active_active_test.godownstreamadapter/dispatcher/basic_dispatcher_info.godownstreamadapter/dispatcher/event_dispatcher.godownstreamadapter/dispatcher/event_dispatcher_test.godownstreamadapter/dispatcher/redo_dispatcher.godownstreamadapter/dispatcher/redo_dispatcher_test.godownstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_batch_test.godownstreamadapter/dispatchermanager/dispatcher_manager_test.godownstreamadapter/eventcollector/event_collector.godownstreamadapter/eventcollector/event_collector_test.gopkg/config/changefeed.gopkg/config/changefeed_test.gopkg/config/replica_config.gopkg/config/replica_config_test.goutils/dynstream/area_batch_config_registry.goutils/dynstream/area_batch_config_registry_test.goutils/dynstream/stream.go
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@pkg/config/replica_config.go`:
- Around line 368-374: Update the misleading comment above the validation for
EventCollectorBatchCount and EventCollectorBatchBytes to accurately describe
behavior: explain that setting these fields to 0 does not disable batching but
instead causes the code to fall back to sink defaults (dispatcher_manager.go
only overrides when > 0) and that newBatchConfig in utils/dynstream/batcher.go
normalizes non-positive counts to 1, so 0 will not disable batching; reference
the config fields EventCollectorBatchCount and EventCollectorBatchBytes and the
relevant behavior in dispatcher_manager.go and
utils/dynstream/batcher.go/newBatchConfig.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 13d05519-91e4-4d90-ad55-3a1379ec4284
📒 Files selected for processing (3)
downstreamadapter/dispatchermanager/dispatcher_manager_batch_test.gopkg/config/replica_config.goutils/dynstream/stream.go
✅ Files skipped from review due to trivial changes (2)
- utils/dynstream/stream.go
- downstreamadapter/dispatchermanager/dispatcher_manager_batch_test.go
…ticdc into pr3-normal-batch-wiring
|
/test all |
…pr3-normal-batch-wiring
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@utils/dynstream/interfaces_test.go`:
- Line 26: The test currently compares settings.batchConfig to newBatchConfig(0,
-1) which reuses the same helper as the code under test; replace that
implementation-coupled assertion by asserting explicit normalized fields for
settings.batchConfig (e.g., check MaxBatchSize, BatchTimeout, and any other
exported fields) or construct an expected batchConfig struct literal with
concrete values and compare that to settings.batchConfig so the test will catch
normalization regressions; update the assertion(s) around settings.batchConfig
and remove the call to newBatchConfig in the expectation.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 134dad65-ff57-42ee-86f8-abbc971b55d7
📒 Files selected for processing (7)
api/v2/model_test.godownstreamadapter/dispatchermanager/dispatcher_manager_batch_test.godownstreamadapter/eventcollector/event_collector_test.gopkg/config/changefeed_test.gopkg/config/replica_config_test.goutils/dynstream/area_batch_config_registry_test.goutils/dynstream/interfaces_test.go
✅ Files skipped from review due to trivial changes (3)
- pkg/config/replica_config_test.go
- pkg/config/changefeed_test.go
- api/v2/model_test.go
🚧 Files skipped from review as they are similar to previous changes (2)
- utils/dynstream/area_batch_config_registry_test.go
- downstreamadapter/dispatchermanager/dispatcher_manager_batch_test.go
| settings := NewAreaSettingsWithMaxPendingSizeAndBatchConfig( | ||
| 64*1024*1024, 0, "test", 0, -1, | ||
| ) | ||
| require.Equal(t, newBatchConfig(0, -1), settings.batchConfig) |
There was a problem hiding this comment.
Assertion is implementation-coupled and may miss normalization regressions.
Line 26 builds the expected value with newBatchConfig(0, -1), which is the same helper used inside the constructor under test. This makes the test less effective at catching behavior regressions. Assert explicit normalized fields instead.
Suggested fix
func TestNewAreaSettingsWithMaxPendingSizeAndBatchConfigNormalizesBatchConfig(t *testing.T) {
settings := NewAreaSettingsWithMaxPendingSizeAndBatchConfig(
64*1024*1024, 0, "test", 0, -1,
)
- require.Equal(t, newBatchConfig(0, -1), settings.batchConfig)
+ require.Equal(t, batchConfig{
+ softCount: 1,
+ hardCount: countCapMultiple,
+ hardBytes: 0,
+ }, settings.batchConfig)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| require.Equal(t, newBatchConfig(0, -1), settings.batchConfig) | |
| func TestNewAreaSettingsWithMaxPendingSizeAndBatchConfigNormalizesBatchConfig(t *testing.T) { | |
| settings := NewAreaSettingsWithMaxPendingSizeAndBatchConfig( | |
| 64*1024*1024, 0, "test", 0, -1, | |
| ) | |
| require.Equal(t, batchConfig{ | |
| softCount: 1, | |
| hardCount: countCapMultiple, | |
| hardBytes: 0, | |
| }, settings.batchConfig) | |
| } |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@utils/dynstream/interfaces_test.go` at line 26, The test currently compares
settings.batchConfig to newBatchConfig(0, -1) which reuses the same helper as
the code under test; replace that implementation-coupled assertion by asserting
explicit normalized fields for settings.batchConfig (e.g., check MaxBatchSize,
BatchTimeout, and any other exported fields) or construct an expected
batchConfig struct literal with concrete values and compare that to
settings.batchConfig so the test will catch normalization regressions; update
the assertion(s) around settings.batchConfig and remove the call to
newBatchConfig in the expectation.
|
/test all |
|
/test all |
4 similar comments
|
/test all |
|
/test all |
|
/test all |
|
/test all |
|
/retest |
1 similar comment
|
/retest |
| Filter *FilterConfig `toml:"filter" json:"filter"` | ||
| MemoryQuota uint64 `toml:"memory-quota" json:"memory-quota"` | ||
| ForceReplicate bool `json:"force_replicate" default:"false"` | ||
| Filter *FilterConfig `toml:"filter" json:"filter"` |
There was a problem hiding this comment.
Why does this field need toml?
| ForceReplicate bool `json:"force_replicate" default:"false"` | ||
| Filter *FilterConfig `toml:"filter" json:"filter"` | ||
| MemoryQuota uint64 `toml:"memory-quota" json:"memory-quota"` | ||
| EventCollectorBatchCount *int `json:"event_collector_batch_count"` |
There was a problem hiding this comment.
Does it need the default value?
There was a problem hiding this comment.
No,level it to be nil, and then it can be derived from the sink configuration in the runtime.
[LGTM Timeline notifier]Timeline:
|
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: asddongmen, flowbehappy, wk989898 The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What problem does this PR solve?
Issue Number: close #4777
What is changed and how it works?
This PR wires event collector batch config through the API, config, dispatcher manager, and event collector paths.
event_collector_batch_countandevent_collector_batch_bytesto the v2 replica config and internal replica configunsetversus explicit0by keeping these fields as*intin runtimeChangefeedConfigCheck List
Tests
go test ./api/v2 -run 'TestReplicaConfigConversionBatchFields$'go test ./pkg/config -run 'Test(ChangeFeedInfoToChangefeedConfigBatchFields|ReplicaConfigValidateBatchConfig)$'go test ./downstreamadapter/dispatchermanager -run 'TestDispatcherManagerBatchConfig$'Questions
Will it cause performance regression or break compatibility?
No compatibility change for existing users when these fields are unset, because the default path still uses sink derived settings. When configured, the behavior changes only for the target changefeed.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit
Release Notes
New Features
Tests