dispatcher,dispatchermanager: deduplicate pending done statuses#4814
dispatcher,dispatchermanager: deduplicate pending done statuses#4814hongyunyan wants to merge 2 commits intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
📝 WalkthroughWalkthroughThe PR refactors block status reporting from raw Go channels to a new bounded Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
⚔️ Resolve merge conflicts
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 golangci-lint (2.11.4)Command failed Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a BlockStatusBuffer and updates the BlockStatusRequestQueue to coalesce and deduplicate identical DONE block statuses. These changes are designed to reduce local memory amplification and improve the efficiency of reporting dispatcher statuses to the maintainer by replacing direct channel communication with a structured buffering and batching mechanism. I have no feedback to provide as no review comments were submitted.
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
downstreamadapter/dispatchermanager/heartbeat_collector.go (1)
245-253:⚠️ Potential issue | 🟠 MajorOnly mark block-status sends complete after
SendCommandsucceeds.
OnSendCompleteclears the queue’s in-flight DONE tracking. Calling it before checkingerrtreats failed sends as completed and can reopen the same DONE key for duplicate enqueueing.🐛 Proposed fix
err := c.mc.SendCommand( messaging.NewSingleTargetMessage( blockStatusRequestWithTargetID.TargetID, messaging.MaintainerManagerTopic, blockStatusRequestWithTargetID.Request, )) - c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID) if err != nil { log.Error("failed to send block status request message", zap.Error(err)) + continue } + c.blockStatusReqQueue.OnSendComplete(blockStatusRequestWithTargetID)If failed sends should be retried/released, add a distinct failure path instead of using the success-completion hook.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/heartbeat_collector.go` around lines 245 - 253, The code calls blockStatusReqQueue.OnSendComplete before checking the SendCommand error, which marks the request done even on failure; move the call to blockStatusReqQueue.OnSendComplete to the success path (after c.mc.SendCommand returns nil) and add an explicit failure path that does not call OnSendComplete (and optionally logs and/or requeues/releases the blockStatusRequestWithTargetID) so failed sends remain in-flight for retry; update the logic around c.mc.SendCommand, blockStatusReqQueue.OnSendComplete, and error handling for blockStatusRequestWithTargetID.TargetID / blockStatusRequestWithTargetID.Request accordingly.
🧹 Nitpick comments (3)
maintainer/barrier_event.go (1)
113-114: Nit:lastStatusReceivedTimeinitialized totime.Now()implies a status was just received.Semantically, this field represents the last time a dispatcher reported progress for this barrier; seeding it with
time.Now()at construction (before any dispatcher has reported) is a small white lie. It doesn't cause incorrect behavior today becausepassActionSentstartsfalse, so the first pass-action branch entry always sends and the quiet-window check is bypassed. Still, initializing to the zero value (or skipping assignment) would match the intent and avoid accidental suppression ifpassActionSentever gets flipped via some future path before a real status arrives.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@maintainer/barrier_event.go` around lines 113 - 114, The struct field lastStatusReceivedTime should not be initialized to time.Now() in the constructor; instead set it to the zero time (or omit the assignment) so it accurately represents "no status received yet"—update the initialization in barrier_event.go where lastStatusReceivedTime and lastWarningLogTime are set (referencing lastStatusReceivedTime and passActionSent logic) to leave lastStatusReceivedTime at its zero value while leaving lastWarningLogTime as appropriate, ensuring existing passActionSent handling still triggers the first pass action.downstreamadapter/dispatcher/block_status_buffer.go (1)
59-150: Back-pressure and dedupe-window semantics worth documenting.Two related subtleties that are correct as-is but should be understood by future readers:
Offer blocks on a full queue while holding a reserved key.
reserveDoneat Line 74/92 marks the key inpendingDonebefore the blockingb.queue <- ...send. If the dispatcher manager stops draining (shutdown, stall), every subsequentOfferDonefor the same key will silently be dropped (sincereserveDonereturns false), while the original offerer is blocked on the channel send. That's acceptable because the dropped entries would be redundant with the one that's already queued, but it does mean liveness of DONE reporting is tied to the manager'sTakeBlockStatusloop.Narrow race between dequeue and
pendingDonedelete. Inmaterialize(Line 130-138) the key is removed frompendingDoneafter the entry is pulled off the channel. During that tiny window, a concurrent identicalOfferDonewill see the key still reserved and be dropped. This is fine (the taken entry will be delivered downstream), but strictly speaking it means "identical DONE after the previous has already been dequeued" can be lost if timed exactly within that window. The downstream consumer (maintainer) is idempotent w.r.t. duplicate DONE acks, so this doesn't cause incorrect behavior — a short comment here would help the next reader.
Len()does not include in-flight or pending. The metric wired up incollectBlockStatusRequestusesb.Len()=len(channel), which excludes the one slot consumed between<-b.queueandmaterialize. Fine for queue-depth monitoring, just flagging in case anyone correlates this withpendingDonesize later.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcher/block_status_buffer.go` around lines 59 - 150, Add short clarifying comments documenting the back-pressure and dedupe-window semantics: in Offer/OfferDone (around where reserveDone is called) note that reserveDone marks pendingDone before the potentially-blocking send so offers for the same key will be dropped while the original sender blocks; in materialize (where delete(b.pendingDone, key) occurs) document the narrow race where an identical OfferDone arriving between channel receive and the pendingDone deletion will be dropped and why that is acceptable; and next to Len() note that it returns only the channel length and does not include the slot consumed in-flight or entries tracked in pendingDone. Reference the methods BlockStatusBuffer.Offer/OfferDone, reserveDone, materialize, and Len in these comments.downstreamadapter/dispatcher/basic_dispatcher_info.go (1)
282-290: Nit: theok boolreturn inTakeBlockStatusWithTimeoutis redundant withstatus != nil.
BlockStatusBuffer.Takeonly returnsnilonctx.Done(), sostatus == nilandok == falseare equivalent here. Either drop the bool and have callers checknil, or document the intent (e.g., "ok=false on timeout, reserved for future non-nil sentinel values"). Minor; purely for API clarity.Proposed simplification
-func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) (*heartbeatpb.TableSpanBlockStatus, bool) { - ctx, cancel := context.WithTimeout(context.Background(), timeout) - defer cancel() - status := s.TakeBlockStatus(ctx) - if status == nil { - return nil, false - } - return status, true -} +func (s *SharedInfo) TakeBlockStatusWithTimeout(timeout time.Duration) *heartbeatpb.TableSpanBlockStatus { + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + return s.TakeBlockStatus(ctx) +}Note: this changes the public signature — would need to update test helpers (
event_dispatcher_test.go,basic_dispatcher_active_active_test.go) that consume theboolreturn.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcher/basic_dispatcher_info.go` around lines 282 - 290, The method TakeBlockStatusWithTimeout currently returns (*heartbeatpb.TableSpanBlockStatus, bool) where the bool is redundant because BlockStatusBuffer.Take only returns nil on ctx.Done(); change TakeBlockStatusWithTimeout to return only (*heartbeatpb.TableSpanBlockStatus) (return nil on timeout) by removing the bool, update its implementation (remove the ok handling) and then update all callers and test helpers that expect the two-value signature (notably event_dispatcher_test.go and basic_dispatcher_active_active_test.go) to check for nil instead of checking the bool; ensure any exported documentation/comments reflect that nil means timeout/closed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/dispatchermanager/dispatcher_manager.go`:
- Around line 575-613: The first-taken status returned by
e.sharedInfo.TakeBlockStatus(ctx) can be dropped if ctx is cancelled during the
10ms batching window because the code checks ctx.Err() and returns before
calling enqueueBlockStatus; fix by moving the enqueue logic ahead of the
ctx.Err() check so you always flush blockStatusMessage and
redoBlockStatusMessage (call enqueueBlockStatus for common.DefaultMode and
common.RedoMode when their slices are non-empty) before returning on parent
context cancellation; keep the batching loop and metric update
(e.metricBlockStatusesChanLen.Set(e.sharedInfo.BlockStatusLen())) as-is but
ensure enqueueBlockStatus is invoked prior to checking ctx.Err() after cancel(),
referencing TakeBlockStatus, enqueueBlockStatus, blockStatusMessage,
redoBlockStatusMessage, ctx, and batchCtx.
In `@maintainer/barrier_event.go`:
- Around line 693-710: The quiet-window check uses fanoutPassResendQuietInterval
which equals the hard resend gate (1s), making suppression ineffective; also
passActionSent is set before sendPassAction(mode) completes so it may be flipped
even when no action is actually sent (e.g., InfluenceType_DB empty span). Fix by
making fanoutPassResendQuietInterval strictly larger than the resend gate or
change the suppression check to compare now.Sub(be.lastResendTime) against
fanoutPassResendQuietInterval consistently, and move the be.passActionSent =
true assignment to after sendPassAction(mode) only when sendPassAction reports
an actual send (i.e., only mark passActionSent when sendPassAction indicates
success/that a message was emitted). Ensure references:
fanoutPassResendQuietInterval, isFanoutPassAction(), passActionSent,
lastResendTime, sendPassAction(mode).
---
Outside diff comments:
In `@downstreamadapter/dispatchermanager/heartbeat_collector.go`:
- Around line 245-253: The code calls blockStatusReqQueue.OnSendComplete before
checking the SendCommand error, which marks the request done even on failure;
move the call to blockStatusReqQueue.OnSendComplete to the success path (after
c.mc.SendCommand returns nil) and add an explicit failure path that does not
call OnSendComplete (and optionally logs and/or requeues/releases the
blockStatusRequestWithTargetID) so failed sends remain in-flight for retry;
update the logic around c.mc.SendCommand, blockStatusReqQueue.OnSendComplete,
and error handling for blockStatusRequestWithTargetID.TargetID /
blockStatusRequestWithTargetID.Request accordingly.
---
Nitpick comments:
In `@downstreamadapter/dispatcher/basic_dispatcher_info.go`:
- Around line 282-290: The method TakeBlockStatusWithTimeout currently returns
(*heartbeatpb.TableSpanBlockStatus, bool) where the bool is redundant because
BlockStatusBuffer.Take only returns nil on ctx.Done(); change
TakeBlockStatusWithTimeout to return only (*heartbeatpb.TableSpanBlockStatus)
(return nil on timeout) by removing the bool, update its implementation (remove
the ok handling) and then update all callers and test helpers that expect the
two-value signature (notably event_dispatcher_test.go and
basic_dispatcher_active_active_test.go) to check for nil instead of checking the
bool; ensure any exported documentation/comments reflect that nil means
timeout/closed.
In `@downstreamadapter/dispatcher/block_status_buffer.go`:
- Around line 59-150: Add short clarifying comments documenting the
back-pressure and dedupe-window semantics: in Offer/OfferDone (around where
reserveDone is called) note that reserveDone marks pendingDone before the
potentially-blocking send so offers for the same key will be dropped while the
original sender blocks; in materialize (where delete(b.pendingDone, key) occurs)
document the narrow race where an identical OfferDone arriving between channel
receive and the pendingDone deletion will be dropped and why that is acceptable;
and next to Len() note that it returns only the channel length and does not
include the slot consumed in-flight or entries tracked in pendingDone. Reference
the methods BlockStatusBuffer.Offer/OfferDone, reserveDone, materialize, and Len
in these comments.
In `@maintainer/barrier_event.go`:
- Around line 113-114: The struct field lastStatusReceivedTime should not be
initialized to time.Now() in the constructor; instead set it to the zero time
(or omit the assignment) so it accurately represents "no status received
yet"—update the initialization in barrier_event.go where lastStatusReceivedTime
and lastWarningLogTime are set (referencing lastStatusReceivedTime and
passActionSent logic) to leave lastStatusReceivedTime at its zero value while
leaving lastWarningLogTime as appropriate, ensuring existing passActionSent
handling still triggers the first pass action.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 262cfc2f-5556-4b2b-aaa1-0c4402f5af4d
📒 Files selected for processing (15)
downstreamadapter/dispatcher/basic_dispatcher.godownstreamadapter/dispatcher/basic_dispatcher_active_active_test.godownstreamadapter/dispatcher/basic_dispatcher_info.godownstreamadapter/dispatcher/block_status_buffer.godownstreamadapter/dispatcher/block_status_buffer_test.godownstreamadapter/dispatcher/event_dispatcher_test.godownstreamadapter/dispatcher/helper.godownstreamadapter/dispatcher/redo_dispatcher_test.godownstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_test.godownstreamadapter/dispatchermanager/heartbeat_collector.godownstreamadapter/dispatchermanager/heartbeat_queue.godownstreamadapter/dispatchermanager/heartbeat_queue_test.gomaintainer/barrier_event.gomaintainer/barrier_event_test.go
| for { | ||
| blockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0) | ||
| redoBlockStatusMessage := make([]*heartbeatpb.TableSpanBlockStatus, 0) | ||
| select { | ||
| case <-ctx.Done(): | ||
| blockStatus := e.sharedInfo.TakeBlockStatus(ctx) | ||
| if blockStatus == nil { | ||
| return | ||
| case blockStatus := <-e.sharedInfo.GetBlockStatusesChan(): | ||
| } | ||
| if common.IsDefaultMode(blockStatus.Mode) { | ||
| blockStatusMessage = append(blockStatusMessage, blockStatus) | ||
| } else { | ||
| redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) | ||
| } | ||
|
|
||
| batchCtx, cancel := context.WithTimeout(ctx, 10*time.Millisecond) | ||
| for { | ||
| blockStatus = e.sharedInfo.TakeBlockStatus(batchCtx) | ||
| if blockStatus == nil { | ||
| break | ||
| } | ||
| if common.IsDefaultMode(blockStatus.Mode) { | ||
| blockStatusMessage = append(blockStatusMessage, blockStatus) | ||
| } else { | ||
| redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) | ||
| } | ||
| delay.Reset(10 * time.Millisecond) | ||
| loop: | ||
| for { | ||
| select { | ||
| case blockStatus := <-e.sharedInfo.GetBlockStatusesChan(): | ||
| if common.IsDefaultMode(blockStatus.Mode) { | ||
| blockStatusMessage = append(blockStatusMessage, blockStatus) | ||
| } else { | ||
| redoBlockStatusMessage = append(redoBlockStatusMessage, blockStatus) | ||
| } | ||
| case <-delay.C: | ||
| break loop | ||
| } | ||
| } | ||
| } | ||
| cancel() | ||
| if ctx.Err() != nil { | ||
| return | ||
| } | ||
|
|
||
| e.metricBlockStatusesChanLen.Set(float64(len(e.sharedInfo.GetBlockStatusesChan()))) | ||
| if len(blockStatusMessage) != 0 { | ||
| enqueueBlockStatus(blockStatusMessage, common.DefaultMode) | ||
| } | ||
| if len(redoBlockStatusMessage) != 0 { | ||
| enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode) | ||
| } | ||
| e.metricBlockStatusesChanLen.Set(float64(e.sharedInfo.BlockStatusLen())) | ||
| if len(blockStatusMessage) != 0 { | ||
| enqueueBlockStatus(blockStatusMessage, common.DefaultMode) | ||
| } | ||
| if len(redoBlockStatusMessage) != 0 { | ||
| enqueueBlockStatus(redoBlockStatusMessage, common.RedoMode) | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Minor: first-taken status is dropped if the parent context cancels during the 10ms batch window.
If ctx is canceled after the outer TakeBlockStatus(ctx) at Line 578 returns a non-nil status but before or during the batch loop, the if ctx.Err() != nil { return } at Line 601 returns without calling enqueueBlockStatus. The first status is effectively dropped. The old timer-based implementation had an equivalent drop window on shutdown, so this isn't a regression, but it's worth being aware of — if you want to be strict about not losing any status acks during graceful shutdown, you could perform the enqueueBlockStatus flush first and then check ctx.Err() for whether to continue the loop.
The rest of the refactor looks good: Take(batchCtx) returning nil correctly terminates the batch on both timeout and parent cancellation, and materialize only runs on successful dequeue so the dropped select branch just leaves the entry queued for the next iteration.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@downstreamadapter/dispatchermanager/dispatcher_manager.go` around lines 575 -
613, The first-taken status returned by e.sharedInfo.TakeBlockStatus(ctx) can be
dropped if ctx is cancelled during the 10ms batching window because the code
checks ctx.Err() and returns before calling enqueueBlockStatus; fix by moving
the enqueue logic ahead of the ctx.Err() check so you always flush
blockStatusMessage and redoBlockStatusMessage (call enqueueBlockStatus for
common.DefaultMode and common.RedoMode when their slices are non-empty) before
returning on parent context cancellation; keep the batching loop and metric
update (e.metricBlockStatusesChanLen.Set(e.sharedInfo.BlockStatusLen())) as-is
but ensure enqueueBlockStatus is invoked prior to checking ctx.Err() after
cancel(), referencing TakeBlockStatus, enqueueBlockStatus, blockStatusMessage,
redoBlockStatusMessage, ctx, and batchCtx.
| if be.passActionSent && be.isFanoutPassAction() && | ||
| now.Sub(be.lastStatusReceivedTime) < fanoutPassResendQuietInterval { | ||
| return nil | ||
| } | ||
| be.passActionSent = true | ||
| be.lastResendTime = now | ||
| return be.sendPassAction(mode) | ||
| } | ||
| return msgs | ||
| } | ||
|
|
||
| func (be *BarrierEvent) isFanoutPassAction() bool { | ||
| if be.blockedDispatchers == nil { | ||
| return false | ||
| } | ||
| return be.blockedDispatchers.InfluenceType == heartbeatpb.InfluenceType_All || | ||
| be.blockedDispatchers.InfluenceType == heartbeatpb.InfluenceType_DB | ||
| } |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
Go time.Now() monotonic clock semantics for repeated Sub comparisons
💡 Result:
In Go, time.Now returns a time.Time value containing both a wall clock reading (for telling time) and a monotonic clock reading (for measuring time intervals). The monotonic clock is monotonically non-decreasing within the process lifetime and is used automatically by comparison methods (After, Before, Equal, Compare) and Sub when both time.Time values have monotonic readings. For repeated Sub comparisons (e.g., t2.Sub(t1), t3.Sub(t2) where t1, t2, t3 are from time.Now): - If both operands have monotonic clock readings (true for time.Now values), Sub uses ONLY the monotonic readings, ignoring wall time. This ensures correct, positive durations even if the wall clock jumps backward due to NTP adjustments or manual changes. - Example: start := time.Now; ...; elapsed := time.Now.Sub(start) always yields a non-negative Duration approximating real elapsed time. - Caveats: Monotonic clock may pause during system sleep; absolute monotonic values are process-local and unexported; serialization (e.g., JSON) strips monotonic info; mixing time.Now with parsed/constructed times falls back to wall time. This behavior, introduced in Go 1.9, prevents bugs like negative durations from clock skew. Official docs confirm: "If Times t and u both contain monotonic clock readings, ... t.Sub(u) [uses] the monotonic clock readings alone" (pkg.go.dev/time). Code example: start := time.Now time.Sleep(20 * time.Millisecond) elapsed := time.Now.Sub(start) // Uses monotonic; always ~20ms even if wall clock adjusted fmt.Println(elapsed) // Positive duration
Citations:
- 1: https://pkg.go.dev/time@go1.19.2
- 2: https://pkg.go.dev/time@go1.22.1
- 3: https://pkg.go.dev/time
- 4: https://www.willem.dev/articles/time-now-monotonic-clock/
- 5: https://victoriametrics.com/blog/go-time-monotonic-wall-clock/
🏁 Script executed:
# First, let me find the file and check its size
find . -name "barrier_event.go" -type fRepository: pingcap/ticdc
Length of output: 86
🏁 Script executed:
# Check file size first
wc -l ./maintainer/barrier_event.goRepository: pingcap/ticdc
Length of output: 90
🏁 Script executed:
# Examine key lines mentioned in the review
# First, let's look at line 35 and surrounding context
sed -n '30,40p' ./maintainer/barrier_event.go
# Line 113-114 (initialization)
sed -n '110,120p' ./maintainer/barrier_event.go
# Line 223 (passActionSent reset)
sed -n '220,230p' ./maintainer/barrier_event.go
# Line 298-300 (markStatusReceived)
sed -n '295,305p' ./maintainer/barrier_event.go
# Line 597 (outer resend gate)
sed -n '594,604p' ./maintainer/barrier_event.go
# Lines 693-710 (code snippet)
sed -n '690,715p' ./maintainer/barrier_event.goRepository: pingcap/ticdc
Length of output: 2845
🏁 Script executed:
# Search for where markStatusReceived is called
rg "markStatusReceived" ./maintainer/barrier_event.go -A 2 -B 2
# Search for markDispatcherEventDone to verify the claim
rg "markDispatcherEventDone" ./maintainer/barrier_event.go -A 5 -B 2
# Also check where NewBarrierEvent is defined to confirm initialization
rg "func NewBarrierEvent" ./maintainer/barrier_event.go -A 20Repository: pingcap/ticdc
Length of output: 575
🏁 Script executed:
# Search for NewBarrierEvent constructor
rg "func NewBarrierEvent" ./maintainer/barrier_event.go -A 30Repository: pingcap/ticdc
Length of output: 39
🏁 Script executed:
# Try to find the constructor with different patterns
rg "NewBarrierEvent|BarrierEvent{" ./maintainer/barrier_event.go -A 25
# Or look for struct initialization
rg "lastStatusReceivedTime.*time.Now" ./maintainer/barrier_event.go -B 5 -A 2Repository: pingcap/ticdc
Length of output: 618
🏁 Script executed:
# Look at sendPassAction implementation
rg "func \(be \*BarrierEvent\) sendPassAction" ./maintainer/barrier_event.go -A 30Repository: pingcap/ticdc
Length of output: 1081
Fanout quiet interval timing: equal to the outer resend gate, limiting practical suppression effect.
fanoutPassResendQuietInterval at line 35 is 1s, identical to the minimum interval enforced at line 597 (now.Sub(be.lastResendTime) < time.Second). When the quiet window check runs at line 695, the constraint now.Sub(be.lastResendTime) >= 1s is guaranteed, so suppression only fires when a status update arrived strictly between the last resend and now. This works correctly but means the quiet window's benefit is limited—any batching delay in the dispatcher manager will make this check a no-op in practice, since both gates are synchronized to the same 1s cadence.
Additionally, at line 697, be.passActionSent = true is assigned before be.sendPassAction(mode) is called. For InfluenceType_DB with an empty span set (which invokes rangeChecker.MarkCovered() and returns nil), the flag is still flipped. This is harmless because the event is considered covered and the resend loop will exit via higher-level lifecycle, but worth tracking in case future code paths need to reset this flag to re-send.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@maintainer/barrier_event.go` around lines 693 - 710, The quiet-window check
uses fanoutPassResendQuietInterval which equals the hard resend gate (1s),
making suppression ineffective; also passActionSent is set before
sendPassAction(mode) completes so it may be flipped even when no action is
actually sent (e.g., InfluenceType_DB empty span). Fix by making
fanoutPassResendQuietInterval strictly larger than the resend gate or change the
suppression check to compare now.Sub(be.lastResendTime) against
fanoutPassResendQuietInterval consistently, and move the be.passActionSent =
true assignment to after sendPassAction(mode) only when sendPassAction reports
an actual send (i.e., only mark passActionSent when sendPassAction indicates
success/that a message was emitted). Ensure references:
fanoutPassResendQuietInterval, isFanoutPassAction(), passActionSent,
lastResendTime, sendPassAction(mode).
What problem does this PR solve?
Issue Number: ref #0
In large table-count DDL scenarios, maintainer responses can cause repeated local DONE statuses to accumulate before the dispatcher side drains them. The previous implementation materialized a fresh protobuf object for every repeated DONE and queued all of them, which amplified memory usage with a very large number of identical small objects.
What is changed and how it works?
Background:
Motivation:
Summary:
BlockStatusBufferbetween dispatchers and dispatcher manager to keep ordering while coalescing identical pending DONE statuses.BlockStatusRequestQueueso identical DONE statuses that are already queued or in flight are not enqueued again.modeandisSyncPoint.Check List
Tests
Unit test
go test ./downstreamadapter/dispatchermanagerQuestions
Will it cause performance regression or break compatibility?
No protocol or semantic change is introduced. The change only coalesces identical pending DONE statuses locally and delays protobuf materialization until drain time. This is intended to reduce memory pressure and queue amplification on the hot path.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit
Refactor
Bug Fixes