maintainer,dispatcherorchestrator: guard direct messages with maintainer epoch#4604
maintainer,dispatcherorchestrator: guard direct messages with maintainer epoch#4604hongyunyan wants to merge 7 commits intopingcap:masterfrom
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 20 minutes and 40 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (7)
📝 WalkthroughWalkthroughDispatcher and maintainer messaging became epoch-aware: Maintainer generates an epoch, heartbeat messages carry Changes
Sequence Diagram(s)sequenceDiagram
participant Maintainer as Maintainer (node)
participant Orchestrator as DispatcherOrchestrator
participant Manager as DispatcherManager
participant Dispatcher as Dispatcher
rect rgba(200,200,255,0.5)
Maintainer->>Orchestrator: MaintainerBootstrapRequest { maintainer_epoch }
end
rect rgba(200,255,200,0.5)
Orchestrator->>Manager: lookup or NewDispatcherManager(..., cfConfig.Epoch)
Note right of Manager: Manager.GetActiveMaintainerEpoch()\nManager.GetChangefeedEpoch()
Manager-->>Orchestrator: activeEpoch / changefeedEpoch
end
rect rgba(255,200,200,0.5)
Orchestrator->>Manager: SetActiveMaintainer(from, req.MaintainerEpoch) (on takeover/accept)
Manager-->>Orchestrator: ack
end
rect rgba(200,255,255,0.5)
Orchestrator->>Dispatcher: create/bootstrap dispatcher (includes maintainer_epoch)
Dispatcher-->>Orchestrator: bootstrap response { maintainer_epoch, success/fail }
Orchestrator->>Maintainer: MaintainerBootstrapResponse { maintainer_epoch, result }
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a maintainer epoch mechanism to uniquely identify maintainer instances and ensure the consistency of the request-response path. By including a maintainer_epoch in heartbeat messages, the system can now detect and drop stale requests or responses that occur during maintainer migration or ownership changes. Changes include updates to protobuf definitions, the Maintainer and DispatcherManager structures, and the orchestration logic to validate epochs. A critical issue was identified regarding a potential race condition in SetActiveMaintainer, where the lack of an atomic check-and-set could allow the active epoch to be updated with a stale value, breaking monotonicity guarantees.
| func (e *DispatcherManager) SetActiveMaintainer(maintainerID node.ID, maintainerEpoch uint64) { | ||
| e.meta.Lock() | ||
| defer e.meta.Unlock() | ||
| e.meta.maintainerID = maintainerID | ||
| e.meta.activeMaintainerEpoch = maintainerEpoch | ||
| } |
There was a problem hiding this comment.
There is a potential race condition here. SetActiveMaintainer unconditionally sets the maintainer epoch. Callers in DispatcherOrchestrator perform a read-modify-write pattern without holding a lock across the operation, which can lead to setting a stale (older) epoch if there are concurrent requests. This could move the activeMaintainerEpoch backward, breaking the epoch guarantee.
To fix this, the check and set should be atomic within this function to ensure the epoch is always monotonically increasing.
func (e *DispatcherManager) SetActiveMaintainer(maintainerID node.ID, maintainerEpoch uint64) {
e.meta.Lock()
defer e.meta.Unlock()
if maintainerEpoch > e.meta.activeMaintainerEpoch {
e.meta.maintainerID = maintainerID
e.meta.activeMaintainerEpoch = maintainerEpoch
}
}There was a problem hiding this comment.
Maintainer ownership transitions are serialized by DispatcherOrchestrator.handleMessages: RecvMaintainerRequest only enqueues requests, and a single consumer goroutine applies the epoch admission checks before calling SetActiveMaintainer(...). There is no concurrent read-modify-write path across these call sites.
This update also adds GetActiveMaintainer() so the active maintainer ID and epoch are read from one metadata snapshot during admission. Making SetActiveMaintainer monotonic inside the setter would blur the phase-specific policy and would break the legacy-compatible activeEpoch == 0 && requestEpoch == 0 owner handoff, where ownership may change without increasing the epoch.
There was a problem hiding this comment.
Actionable comments posted: 1
🧹 Nitpick comments (4)
downstreamadapter/dispatchermanager/dispatcher_manager_info.go (1)
73-78: Consider retiringSetMaintainerIDnow that ID and epoch move together.
SetActiveMaintainerestablishes the new invariant, but the old exported one-field setter still makes it easy to reintroduce partial updates later. I’d strongly consider deleting or unexportingSetMaintainerIDonce callers are migrated.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/dispatcher_manager_info.go` around lines 73 - 78, The repository still exposes the one-field setter SetMaintainerID which can reintroduce partial updates; remove or unexport SetMaintainerID and update all callers to use SetActiveMaintainer(maintainerID, maintainerEpoch) so ID and epoch are set together. Locate existing references to SetMaintainerID, replace each call with SetActiveMaintainer using the appropriate epoch value (or thread through the epoch where needed), run tests and compile to ensure no remaining uses, and then delete or rename the SetMaintainerID function to a non-exported name if you need to keep it temporarily.maintainer/maintainer.go (1)
927-931: Consider log level for common bootstrap states.When
responses == nil, this is a normal condition indicating bootstrap is not yet complete. Logging atInfolevel for every bootstrap response before completion could create excessive noise in high-changefeed environments. Consider usingDebuglevel whenresponses == nilandInfolevel only whenm.removing.Load()is true.♻️ Proposed adjustment
if responses == nil || m.removing.Load() { - log.Info("bootstrap responses is nil or maintainer is removing, ignore it", - zap.Stringer("changefeedID", m.changefeedID), - zap.Bool("removing", m.removing.Load())) + if m.removing.Load() { + log.Info("maintainer is removing, ignore bootstrap responses", + zap.Stringer("changefeedID", m.changefeedID)) + } else { + log.Debug("bootstrap not complete, ignore bootstrap responses", + zap.Stringer("changefeedID", m.changefeedID)) + } return }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@maintainer/maintainer.go` around lines 927 - 931, The current check logs both when responses == nil and when m.removing.Load(), but responses==nil is a normal transient state and should be logged at Debug while removal should stay Info; update the block around the if (the conditional using responses and m.removing.Load()) so it calls log.Debug(...) (including zap.Stringer("changefeedID", m.changefeedID) and zap.Bool("removing", m.removing.Load())) when responses == nil and only call log.Info(...) when m.removing.Load() is true, then return as before.downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go (2)
275-278: Consider whether epoch mismatch should halt bootstrap.The check logs an error when
manager.GetChangefeedEpoch() != cfConfig.Epochbut continues with bootstrap. If this truly "should not happen" and indicates a serious inconsistency, consider returning an error response instead of proceeding. Alternatively, if continuing is intentional, consider reducing the log severity or adding a comment explaining why it's safe to proceed.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around lines 275 - 278, The epoch-mismatch check currently logs an error with log.Error when manager.GetChangefeedEpoch() != cfConfig.Epoch but then continues bootstrapping; decide and implement one of two fixes: (1) Treat it as fatal — replace the log.Error call in the function containing this check with returning a descriptive error (include changefeed id via cfId.Name(), current epoch via manager.GetChangefeedEpoch(), and expected cfConfig.Epoch) so the caller aborts bootstrap; or (2) Treat it as non-fatal — lower severity to log.Warn (or log.Info) and add a brief comment explaining why continuing is safe. Update the code path around manager.GetChangefeedEpoch(), cfConfig.Epoch, cfId.Name(), and the surrounding bootstrap function to reflect the chosen behavior.
392-423: Epoch handling in close request is correct but complex.The logic correctly handles three cases: stale requests, epoch takeover, and ID mismatch. The mutex unlocking before early returns is handled correctly. However, the nested conditionals with multiple early return paths while holding a mutex increases cognitive complexity.
♻️ Consider extracting epoch validation to reduce nesting
m.mutex.Lock() if manager, ok := m.dispatcherManagers[cfId]; ok { activeEpoch := manager.GetActiveMaintainerEpoch() - if !shouldAcceptCloseRequest(activeEpoch, req.MaintainerEpoch) { - m.mutex.Unlock() - log.Info("drop stale close request", - zap.Stringer("changefeedID", cfId), - zap.Stringer("maintainerID", from), - zap.Uint64("requestEpoch", req.MaintainerEpoch), - zap.Uint64("activeEpoch", activeEpoch)) - return nil - } - if req.MaintainerEpoch > activeEpoch { - manager.SetActiveMaintainer(from, req.MaintainerEpoch) - } else if manager.GetMaintainerID() != from { - m.mutex.Unlock() - log.Warn("drop close request from unexpected maintainer", - zap.Stringer("changefeedID", cfId), - zap.Stringer("maintainerID", from), - zap.Stringer("activeMaintainerID", manager.GetMaintainerID()), - zap.Uint64("maintainerEpoch", req.MaintainerEpoch)) - return nil - } + accepted, updated := validateAndUpdateMaintainer(manager, from, req.MaintainerEpoch, activeEpoch) + if !accepted { + m.mutex.Unlock() + // log appropriately based on reason + return nil + } if closed := manager.TryClose(req.Removed); closed {This is optional as the current implementation is correct.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around lines 392 - 423, The epoch/maintainer validation and early-return logic inside the m.mutex-protected block is correct but complex; extract it into a helper (e.g., validateAndClaimMaintainer or validateCloseRequestEpoch) that accepts the manager, req, from and cfId (or minimal bits) and returns a single enum/boolean indicating whether to proceed, whether to set response.Success, and any log-level/message info so the main function can simply lock, look up manager, call the helper, act on the helper's result (call manager.TryClose and modify dispatcherManagers/metrics if needed) and then unlock; keep use of existing symbols GetActiveMaintainerEpoch, shouldAcceptCloseRequest, SetActiveMaintainer, GetMaintainerID and TryClose and preserve all original log messages and early-return semantics.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/dispatchermanager/dispatcher_manager.go`:
- Around line 221-223: The code sets manager.meta.activeMaintainerEpoch = 0
during bootstrap, causing stale epoch when the maintainer ID doesn't change;
update the logic so that whenever a new request provides a different maintainer
epoch (req.MaintainerEpoch) you advance manager.meta.activeMaintainerEpoch to
that value even if GetMaintainerID() == from; specifically, ensure
SetActiveMaintainer (or the bootstrap/takeover path in
dispatcher_orchestrator.go that currently only checks GetMaintainerID()) also
compares and updates the epoch field (manager.meta.activeMaintainerEpoch) when
req.MaintainerEpoch differs, and keep SetActiveMaintainer/GetMaintainerID calls
consistent with the manager.meta.changefeedEpoch update (cfConfig.Epoch) to
avoid leaving the epoch at 0.
---
Nitpick comments:
In `@downstreamadapter/dispatchermanager/dispatcher_manager_info.go`:
- Around line 73-78: The repository still exposes the one-field setter
SetMaintainerID which can reintroduce partial updates; remove or unexport
SetMaintainerID and update all callers to use SetActiveMaintainer(maintainerID,
maintainerEpoch) so ID and epoch are set together. Locate existing references to
SetMaintainerID, replace each call with SetActiveMaintainer using the
appropriate epoch value (or thread through the epoch where needed), run tests
and compile to ensure no remaining uses, and then delete or rename the
SetMaintainerID function to a non-exported name if you need to keep it
temporarily.
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go`:
- Around line 275-278: The epoch-mismatch check currently logs an error with
log.Error when manager.GetChangefeedEpoch() != cfConfig.Epoch but then continues
bootstrapping; decide and implement one of two fixes: (1) Treat it as fatal —
replace the log.Error call in the function containing this check with returning
a descriptive error (include changefeed id via cfId.Name(), current epoch via
manager.GetChangefeedEpoch(), and expected cfConfig.Epoch) so the caller aborts
bootstrap; or (2) Treat it as non-fatal — lower severity to log.Warn (or
log.Info) and add a brief comment explaining why continuing is safe. Update the
code path around manager.GetChangefeedEpoch(), cfConfig.Epoch, cfId.Name(), and
the surrounding bootstrap function to reflect the chosen behavior.
- Around line 392-423: The epoch/maintainer validation and early-return logic
inside the m.mutex-protected block is correct but complex; extract it into a
helper (e.g., validateAndClaimMaintainer or validateCloseRequestEpoch) that
accepts the manager, req, from and cfId (or minimal bits) and returns a single
enum/boolean indicating whether to proceed, whether to set response.Success, and
any log-level/message info so the main function can simply lock, look up
manager, call the helper, act on the helper's result (call manager.TryClose and
modify dispatcherManagers/metrics if needed) and then unlock; keep use of
existing symbols GetActiveMaintainerEpoch, shouldAcceptCloseRequest,
SetActiveMaintainer, GetMaintainerID and TryClose and preserve all original log
messages and early-return semantics.
In `@maintainer/maintainer.go`:
- Around line 927-931: The current check logs both when responses == nil and
when m.removing.Load(), but responses==nil is a normal transient state and
should be logged at Debug while removal should stay Info; update the block
around the if (the conditional using responses and m.removing.Load()) so it
calls log.Debug(...) (including zap.Stringer("changefeedID", m.changefeedID) and
zap.Bool("removing", m.removing.Load())) when responses == nil and only call
log.Info(...) when m.removing.Load() is true, then return as before.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 4a6a4aab-c845-4550-94c4-5722a8e0ba36
⛔ Files ignored due to path filters (1)
heartbeatpb/heartbeat.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (10)
downstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_info.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.godownstreamadapter/dispatcherorchestrator/helper.goheartbeatpb/heartbeat.protomaintainer/maintainer.gomaintainer/maintainer_manager.gomaintainer/maintainer_manager_test.gomaintainer/maintainer_test.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
downstreamadapter/dispatchermanager/dispatcher_manager_info.go (1)
73-80: Remove unusedSetMaintainerIDmethod to avoid confusion.The verification found that
SetMaintainerID(lines 55-59) has no callers in the codebase, whileSetActiveMaintaineris consistently used in all three call sites. Since the atomic update of bothmaintainerIDandactiveMaintainerEpochis now the standard pattern enforced bySetActiveMaintainer, the unusedSetMaintainerIDshould be removed entirely rather than kept as dead code that could mislead future contributors.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/dispatcher_manager_info.go` around lines 73 - 80, Remove the dead SetMaintainerID method from the DispatcherManager type to avoid confusion and enforce atomic updates via SetActiveMaintainer; locate the method named SetMaintainerID on DispatcherManager, delete its definition, and ensure no callers remain (all call sites should already use SetActiveMaintainer(maintainerID, maintainerEpoch)), leaving SetActiveMaintainer intact as the single atomic updater of meta.maintainerID and meta.activeMaintainerEpoch.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@downstreamadapter/dispatchermanager/dispatcher_manager_info.go`:
- Around line 73-80: Remove the dead SetMaintainerID method from the
DispatcherManager type to avoid confusion and enforce atomic updates via
SetActiveMaintainer; locate the method named SetMaintainerID on
DispatcherManager, delete its definition, and ensure no callers remain (all call
sites should already use SetActiveMaintainer(maintainerID, maintainerEpoch)),
leaving SetActiveMaintainer intact as the single atomic updater of
meta.maintainerID and meta.activeMaintainerEpoch.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: db36fd3c-588b-472d-a37f-7b10f97cb95c
📒 Files selected for processing (5)
downstreamadapter/dispatchermanager/dispatcher_manager_info.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/helper.gomaintainer/maintainer.gomaintainer/maintainer_manager.go
✅ Files skipped from review due to trivial changes (1)
- downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go
🚧 Files skipped from review as they are similar to previous changes (2)
- downstreamadapter/dispatcherorchestrator/helper.go
- maintainer/maintainer.go
There was a problem hiding this comment.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go (1)
525-541:⚠️ Potential issue | 🟠 MajorUse the phase-matching response type for post-bootstrap failures.
handleDispatcherErroralways constructsMaintainerBootstrapResponse, buthandlePostBootstrapRequestcalls it on Lines 385 and 392. Sincemaintainer/maintainer_manager.goroutes bootstrap and post-bootstrap responses separately, those initialization failures become the wrong protocol event. Please split this helper or parameterize the response type so post-bootstrap errors returnMaintainerPostBootstrapResponse.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around lines 525 - 541, handleDispatcherError always builds a MaintainerBootstrapResponse causing post-bootstrap failures (from handlePostBootstrapRequest) to be sent with the wrong event type; modify handleDispatcherError to accept a response-type selector (or split it into two helpers) so it can construct and send either a heartbeatpb.MaintainerBootstrapResponse or a heartbeatpb.MaintainerPostBootstrapResponse as appropriate; update callers (notably handlePostBootstrapRequest) to request the post-bootstrap response type and keep using m.sendResponse(messaging.MaintainerManagerTopic, ...) for delivery, ensuring the Err payload and other fields are populated the same way in both response types.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go`:
- Around line 332-353: Split the non-strict maintainer-epoch branch so legacy
zero-epoch requests are validated against ownership while intentional takeover
attempts are handled separately: inside the block where
shouldUseStrictMaintainerEpoch(activeEpoch, req.MaintainerEpoch) is false,
branch on req.MaintainerEpoch == 0 versus activeEpoch == 0 &&
req.MaintainerEpoch > 0; for req.MaintainerEpoch == 0 require
manager.GetMaintainerID() == from (drop otherwise) to preserve legacy ownership
checks, and for the activeEpoch == 0 && req.MaintainerEpoch > 0 path apply the
takeover logic (e.g., allow after shouldAcceptPostBootstrapRequest check) rather
than collapsing both cases together; keep references to
shouldAcceptPostBootstrapRequest(activeEpoch, req.MaintainerEpoch),
manager.GetMaintainerID(), request.MaintainerEpoch and from when implementing
the split.
In `@downstreamadapter/dispatcherorchestrator/helper.go`:
- Around line 95-104: The current replace logic (using getRequestMaintainerEpoch
and assigning pending[key]) allows a newer-epoch request to overwrite
pending[key] without re-queuing, so if handleMessages already fetched the old
key its Done call will delete the replacement; modify the flow so that when you
replace pending[key] with a higher epoch you also re-enqueue the key (or mark it
for reprocessing) unless the in-flight generation is newer, or implement a
generation-aware Done that only removes pending[key] if the generation/epoch it
is finishing matches the stored epoch; update the code paths around
getRequestMaintainerEpoch, pending[key], handleMessages and Done to compare
epochs/generations and either push the key back onto the queue on replacement or
make Done conditional on matching epoch.
---
Outside diff comments:
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go`:
- Around line 525-541: handleDispatcherError always builds a
MaintainerBootstrapResponse causing post-bootstrap failures (from
handlePostBootstrapRequest) to be sent with the wrong event type; modify
handleDispatcherError to accept a response-type selector (or split it into two
helpers) so it can construct and send either a
heartbeatpb.MaintainerBootstrapResponse or a
heartbeatpb.MaintainerPostBootstrapResponse as appropriate; update callers
(notably handlePostBootstrapRequest) to request the post-bootstrap response type
and keep using m.sendResponse(messaging.MaintainerManagerTopic, ...) for
delivery, ensuring the Err payload and other fields are populated the same way
in both response types.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 16ac1443-339a-4205-957c-eade195a255b
📒 Files selected for processing (5)
downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.godownstreamadapter/dispatcherorchestrator/helper.gomaintainer/maintainer_manager.gomaintainer/maintainer_manager_test.go
✅ Files skipped from review due to trivial changes (1)
- downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.go
🚧 Files skipped from review as they are similar to previous changes (1)
- maintainer/maintainer_manager_test.go
downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
🧹 Nitpick comments (3)
downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go (2)
101-117: Add defensive length check before accessingmsg.Message[0].If a message arrives with an empty
Messageslice, accessingmsg.Message[0]at line 103 would cause a panic. While the message center should always populate this field, a defensive check improves robustness against malformed messages.🛡️ Proposed fix
func getPendingMessageKey(msg *messaging.TargetMessage) (pendingMessageKey, bool) { + if len(msg.Message) == 0 { + return pendingMessageKey{}, false + } var changefeedID *heartbeatpb.ChangefeedID switch req := msg.Message[0].(type) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around lines 101 - 117, In getPendingMessageKey, add a defensive check on the messaging.TargetMessage.Message slice before accessing msg.Message[0]; if len(msg.Message) == 0 (or msg == nil), return pendingMessageKey{} and false to avoid a panic on malformed/empty messages—update the function getPendingMessageKey to validate msg and msg.Message length prior to the existing type switch that references msg.Message[0].
127-131: Add nil check for message retrieved from queue.After
Pop(),Get(key)could theoretically returnnilif there's a race condition or internal inconsistency. The subsequent access tomsg.Message[0]would panic in that case.🛡️ Proposed defensive check
msg := m.msgQueue.Get(key) + if msg == nil || len(msg.Message) == 0 { + log.Warn("message missing after pop, skip processing", + zap.String("changefeedID", key.changefeedID.String()), + zap.String("msgType", key.msgType.String())) + m.msgQueue.Done(key) + continue + } // Process the message switch req := msg.Message[0].(type) {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go` around lines 127 - 131, The code assumes msg := m.msgQueue.Get(key) is non-nil before accessing msg.Message[0]; add a defensive nil check immediately after calling m.msgQueue.Get(key) (in the block where Pop() was used) and handle a nil msg by logging an error/warning and continuing (or otherwise skipping processing) to avoid a panic when msg is nil; update the switch that inspects req := msg.Message[0].(type) to only run after verifying msg != nil.downstreamadapter/dispatcherorchestrator/helper.go (1)
97-102: Consider adding a length guard before accessingMessage[0].If a
MaintainerCloseRequestmessage is enqueued with an emptyMessageslice, accessingMessage[0]at lines 98-99 would panic. While production code paths likely always populate this field, a defensive check would improve robustness.🛡️ Proposed defensive check
if oldMsg == nil || newMsg == nil { return false } + if len(oldMsg.Message) == 0 || len(newMsg.Message) == 0 { + return false + } oldReq, ok1 := oldMsg.Message[0].(*heartbeatpb.MaintainerCloseRequest)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatcherorchestrator/helper.go` around lines 97 - 102, The code assumes oldMsg.Message[0] and newMsg.Message[0] exist before type-asserting to *heartbeatpb.MaintainerCloseRequest, which can panic for empty Message slices; update the guard in the block that creates oldReq and newReq (using oldMsg and newMsg) to first check len(oldMsg.Message) > 0 && len(newMsg.Message) > 0 and return false if either is empty, then perform the type assertions for *heartbeatpb.MaintainerCloseRequest (oldReq, newReq) and keep the existing !ok1 || !ok2 return false behavior.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@downstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.go`:
- Around line 101-117: In getPendingMessageKey, add a defensive check on the
messaging.TargetMessage.Message slice before accessing msg.Message[0]; if
len(msg.Message) == 0 (or msg == nil), return pendingMessageKey{} and false to
avoid a panic on malformed/empty messages—update the function
getPendingMessageKey to validate msg and msg.Message length prior to the
existing type switch that references msg.Message[0].
- Around line 127-131: The code assumes msg := m.msgQueue.Get(key) is non-nil
before accessing msg.Message[0]; add a defensive nil check immediately after
calling m.msgQueue.Get(key) (in the block where Pop() was used) and handle a nil
msg by logging an error/warning and continuing (or otherwise skipping
processing) to avoid a panic when msg is nil; update the switch that inspects
req := msg.Message[0].(type) to only run after verifying msg != nil.
In `@downstreamadapter/dispatcherorchestrator/helper.go`:
- Around line 97-102: The code assumes oldMsg.Message[0] and newMsg.Message[0]
exist before type-asserting to *heartbeatpb.MaintainerCloseRequest, which can
panic for empty Message slices; update the guard in the block that creates
oldReq and newReq (using oldMsg and newMsg) to first check len(oldMsg.Message) >
0 && len(newMsg.Message) > 0 and return false if either is empty, then perform
the type assertions for *heartbeatpb.MaintainerCloseRequest (oldReq, newReq) and
keep the existing !ok1 || !ok2 return false behavior.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8ae6d62a-184f-4d23-a05a-d7b7be7dcf03
📒 Files selected for processing (4)
downstreamadapter/dispatchermanager/dispatcher_manager_info.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator_test.godownstreamadapter/dispatcherorchestrator/helper.go
🚧 Files skipped from review as they are similar to previous changes (1)
- downstreamadapter/dispatchermanager/dispatcher_manager_info.go
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: ref #4604
What is changed and how it works?
This PR adds
maintainer_epochto the direct maintainer bootstrap/post-bootstrap/close request-response path so a maintainer instance can reject stale direct messages after ownership changes.The final version keeps the solution focused on ownership admission instead of queue tricks:
maintainer_epoch, and the matching direct responses echo it back.maintainer_epochno longer matches the current maintainer instance.MaintainerPostBootstrapResponseinstead of the bootstrap response type.Check List
Tests
Questions
Will it cause performance regression or break compatibility?
No expected performance regression. The change only touches the direct maintainer bootstrap/post-bootstrap/close admission path.
Mixed-version compatibility is kept by treating zero epoch as legacy traffic, while still preventing legacy requests from taking ownership away from an already established strict owner.
Do you need to update user documentation, design documentation or monitoring documentation?
No.
Release note
Summary by CodeRabbit