coordinator,maintainer: add session epoch handoff#4816
coordinator,maintainer: add session epoch handoff#4816hongyunyan wants to merge 9 commits intopingcap:masterfrom
Conversation
|
Adding the "do-not-merge/release-note-label-needed" label because no release-note block was detected, please follow our release note process to remove it. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds a PD-backed monotonic maintainer session epoch and propagates it through operator allocation, controller flows, heartbeat protobufs, maintainer runtime, dispatcher managers/orchestrator, scheduler, and tests; introduces session-gating validation for inbound messages and epoch-aware operator/stop semantics. Changes
Sequence Diagram(s)sequenceDiagram
participant Controller
participant OperatorController
participant PD as PDClient
participant Changefeed
participant Maintainer
participant DispatcherManager
Controller->>OperatorController: request NewAdd/NewMove/NewStop operator
OperatorController->>PD: GenerateStrictSessionEpoch(ctx, lastIssued)
PD-->>OperatorController: sessionEpoch
OperatorController->>Changefeed: construct operator with sessionEpoch
OperatorController->>OperatorController: AddOperator(op)
OperatorController->>Maintainer: send Add/RemoveMaintainerRequest(SessionEpoch)
Maintainer->>Maintainer: shouldAcceptDispatcherMessage(incomingSessionEpoch)
alt accepted
Maintainer->>DispatcherManager: bootstrap/post/heartbeat (SessionEpoch)
DispatcherManager->>DispatcherManager: AcceptBootstrapSession/AcceptMaintainerSession
alt accepted
DispatcherManager-->DispatcherManager: apply update
else rejected
DispatcherManager-->>Maintainer: drop/reject (log reason)
end
else rejected
Maintainer-->>OperatorController: ignore/drop (log)
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Suggested labels
Suggested reviewers
Poem
🚥 Pre-merge checks | ✅ 1 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (1 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request implements a session-aware communication protocol between TiCDC components to ensure system consistency by filtering stale control messages. It introduces a monotonically increasing session epoch, generated via PD TSO, which is carried by heartbeat and bootstrap messages and validated by receiving handlers. The review feedback identifies several improvement opportunities: extending session validation to status reports sent from the maintainer to the coordinator, refining bootstrap logic to avoid suppressing maintainer change logs, enhancing nil safety and efficiency in changefeed lookups, resolving package name shadowing, and ensuring network-bound TSO requests use contexts with appropriate timeouts to prevent scheduler blocking.
| logCoordinatorResolvedTs *atomic.Uint64 | ||
| // currentMaintainerSessionEpoch is runtime-only coordinator state. | ||
| // It tracks the maintainer session that has already become active. | ||
| currentMaintainerSessionEpoch *atomic.Uint64 |
There was a problem hiding this comment.
While currentMaintainerSessionEpoch is added to track the active session in the coordinator, it appears that heartbeatpb.MaintainerStatus (the message sent from Maintainer to Coordinator) has not been updated to include the session epoch. This means the Coordinator cannot verify if incoming status reports belong to the current active session, which could allow stale reports from a previous session to mutate the Coordinator's state machine or cause operators to finish prematurely. Consider adding session_epoch to MaintainerStatus and validating it in the Coordinator's heartbeat handler.
| if e.meta.maintainerID != maintainerID { | ||
| e.meta.maintainerID = maintainerID | ||
| } |
There was a problem hiding this comment.
Updating e.meta.maintainerID inside AcceptBootstrapSession when the session epoch matches the current one will cause the subsequent check in DispatcherOrchestrator.handleBootstrapRequest (manager.GetMaintainerID() != from) to always be false. This suppresses the "maintainer changed" log message and any associated monitoring logic in the orchestrator for session-aware bootstraps. It is better to let the orchestrator handle the ID update and logging consistently, or have this method return a flag indicating if the ID was changed.
| zap.String("status", common.FormatMaintainerStatus(status))) | ||
|
|
||
| keyspaceID := c.getChangefeed(cfID).GetKeyspaceID() | ||
| sessionEpoch := c.getChangefeed(cfID).GetCurrentMaintainerSessionEpoch() |
There was a problem hiding this comment.
This line performs a redundant lookup of the changefeed from the database. The changefeed was already fetched in the previous line (line 498). Additionally, if the changefeed is missing from the database (which is expected in handleNonExistentChangefeed), the previous line might cause a nil pointer dereference if GetKeyspaceID() is not nil-safe. Consider fetching the changefeed once and checking for nil before accessing its fields.
| changefeed := oc.changefeedDB.GetByID(cfID) | ||
| keyspaceID := uint32(0) | ||
| if changefeed != nil { | ||
| keyspaceID = changefeed.GetKeyspaceID() | ||
| } |
There was a problem hiding this comment.
The variable name changefeed shadows the package name changefeed imported in this file. This is generally discouraged in Go as it can lead to confusion. Additionally, the changefeed is looked up again in pushStopChangefeedOperator, which is redundant since the lookup is already performed here. Consider using a different name like cf and potentially passing the object to avoid redundant lookups.
| changefeed := oc.changefeedDB.GetByID(cfID) | |
| keyspaceID := uint32(0) | |
| if changefeed != nil { | |
| keyspaceID = changefeed.GetKeyspaceID() | |
| } | |
| cf := oc.changefeedDB.GetByID(cfID) | |
| keyspaceID := uint32(0) | |
| if cf != nil { | |
| keyspaceID = cf.GetKeyspaceID() | |
| } |
| func (oc *Controller) NewAddMaintainerOperator(cf *changefeed.Changefeed, dest node.ID) operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus] { | ||
| return NewAddMaintainerOperator(oc.changefeedDB, cf, dest) | ||
| func (oc *Controller) NewAddMaintainerOperator(cf *changefeed.Changefeed, dest node.ID) (operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus], error) { | ||
| sessionEpoch, err := pdutil.GenerateStrictSessionEpoch(context.Background(), oc.pdClient, cf.GetCurrentMaintainerSessionEpoch()) |
There was a problem hiding this comment.
Using context.Background() for a network request (PD TSO) can block the scheduler loop indefinitely if the PD client hangs or experiences high latency. It is recommended to use a context with a reasonable timeout or one that is tied to the lifecycle of the coordinator to ensure the scheduler remains responsive.
There was a problem hiding this comment.
Actionable comments posted: 3
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
coordinator/operator/operator_controller.go (1)
131-160:⚠️ Potential issue | 🟠 MajorCapture the session epoch before
StopByChangefeedIDcan remove the changefeed.The comment on
StopChangefeedsaysremoved == truedeletes the changefeed fromchangefeedDB. In that path, the secondGetByIDinpushStopChangefeedOperatorreturnsnil, so the stop/remove operator is built withsessionEpoch == 0instead of the active epoch. That drops the new session fence on the exact remove flow this PR is hardening.🛠️ Suggested fix
func (oc *Controller) StopChangefeed(_ context.Context, cfID common.ChangeFeedID, removed bool) operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus] { oc.mu.Lock() defer oc.mu.Unlock() changefeed := oc.changefeedDB.GetByID(cfID) keyspaceID := uint32(0) + sessionEpoch := uint64(0) if changefeed != nil { keyspaceID = changefeed.GetKeyspaceID() + sessionEpoch = changefeed.GetCurrentMaintainerSessionEpoch() } scheduledNode := oc.changefeedDB.StopByChangefeedID(cfID, removed) @@ - return oc.pushStopChangefeedOperator(keyspaceID, cfID, scheduledNode, removed) + return oc.pushStopChangefeedOperator(keyspaceID, cfID, scheduledNode, sessionEpoch, removed) } @@ -func (oc *Controller) pushStopChangefeedOperator(keyspaceID uint32, cfID common.ChangeFeedID, nodeID node.ID, remove bool) operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus] { - sessionEpoch := uint64(0) - if cf := oc.changefeedDB.GetByID(cfID); cf != nil { - sessionEpoch = cf.GetCurrentMaintainerSessionEpoch() - } +func (oc *Controller) pushStopChangefeedOperator(keyspaceID uint32, cfID common.ChangeFeedID, nodeID node.ID, sessionEpoch uint64, remove bool) operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus] { op := NewStopChangefeedOperator(keyspaceID, cfID, nodeID, sessionEpoch, oc.selfNode.ID, oc.backend, remove)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/operator/operator_controller.go` around lines 131 - 160, StopChangefeed can call StopByChangefeedID which may remove the changefeed before pushStopChangefeedOperator reads it, causing sessionEpoch to be zero; capture the current maintainer session epoch from changefeed (via GetByID and GetCurrentMaintainerSessionEpoch) in StopChangefeed before calling StopByChangefeedID and pass that epoch into pushStopChangefeedOperator (update pushStopChangefeedOperator signature to accept sessionEpoch uint64 and use it when calling NewStopChangefeedOperator instead of re-reading the changefeed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@coordinator/controller.go`:
- Around line 630-632: In finishBootstrap, avoid calling c.getChangefeed(id) for
entries in runningChangefeeds because the changefeed may not exist and
c.getChangefeed(id) can return nil; instead obtain keyspaceID and sessionEpoch
directly from the remoteMaintainer (rm) or its status (e.g., rm.KeyspaceID or
rm.GetKeyspaceID and rm.CurrentMaintainerSessionEpoch or
rm.Status.GetCurrentMaintainerSessionEpoch) and use those values when
constructing changefeed.RemoveMaintainerMessage and calling
c.messageCenter.SendCommand, keeping the loop over runningChangefeeds and the rm
variable intact.
- Around line 498-502: handleNonExistentChangefeed must not call
c.getChangefeed(cfID) because cf is nil; instead obtain keyspaceID and
sessionEpoch from the incoming status/message (or from the function parameter
that contains the remote changefeed status) and pass those into
changefeed.RemoveMaintainerMessage; locate the call site in
handleNonExistentChangefeed and replace keyspaceID :=
c.getChangefeed(cfID).GetKeyspaceID() and sessionEpoch :=
c.getChangefeed(cfID).GetCurrentMaintainerSessionEpoch() with values extracted
from the provided status object (or use safe defaults if the status lacks them),
then send the RemoveMaintainerMessage using those variables to avoid
dereferencing a nil changefeed.
In `@downstreamadapter/dispatchermanager/dispatcher_manager_info.go`:
- Around line 83-129: Both AcceptBootstrapSession and AcceptMaintainerSession
currently allow sessionEpoch==0 forever; change them to reject legacy (0) once a
non-zero session is installed: in AcceptBootstrapSession, after the initial zero
check, inspect e.meta.maintainerSessionEpoch (under e.meta.Lock) and if it's
non-zero then return false with a descriptive reason (e.g., "stale:legacy" or
similar) instead of accepting; in AcceptMaintainerSession, replace the
unconditional return true for sessionEpoch==0 with a conditional that returns
true only if GetMaintainerSessionEpoch() == 0, otherwise return false with a
descriptive reason so zero-epoch messages are blocked after a session has been
installed.
---
Outside diff comments:
In `@coordinator/operator/operator_controller.go`:
- Around line 131-160: StopChangefeed can call StopByChangefeedID which may
remove the changefeed before pushStopChangefeedOperator reads it, causing
sessionEpoch to be zero; capture the current maintainer session epoch from
changefeed (via GetByID and GetCurrentMaintainerSessionEpoch) in StopChangefeed
before calling StopByChangefeedID and pass that epoch into
pushStopChangefeedOperator (update pushStopChangefeedOperator signature to
accept sessionEpoch uint64 and use it when calling NewStopChangefeedOperator
instead of re-reading the changefeed).
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 8a4a7712-20f8-4d40-bb33-4110fcec5828
⛔ Files ignored due to path filters (1)
heartbeatpb/heartbeat.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (30)
coordinator/changefeed/changefeed.gocoordinator/changefeed/changefeed_test.gocoordinator/controller.gocoordinator/controller_test.gocoordinator/coordinator_test.gocoordinator/create_changefeed_gc_test.gocoordinator/operator/operator_add.gocoordinator/operator/operator_add_test.gocoordinator/operator/operator_controller.gocoordinator/operator/operator_controller_test.gocoordinator/operator/operator_move.gocoordinator/operator/operator_move_test.gocoordinator/operator/operator_stop.gocoordinator/operator/operator_stop_test.gocoordinator/scheduler/balance.gocoordinator/scheduler/basic.godownstreamadapter/dispatchermanager/dispatcher_manager.godownstreamadapter/dispatchermanager/dispatcher_manager_info.godownstreamadapter/dispatchermanager/dispatcher_manager_redo.godownstreamadapter/dispatchermanager/helper.godownstreamadapter/dispatchermanager/helper_test.godownstreamadapter/dispatcherorchestrator/dispatcher_orchestrator.goheartbeatpb/heartbeat.protomaintainer/maintainer.gomaintainer/maintainer_manager.gomaintainer/maintainer_manager_test.gomaintainer/maintainer_test.gopkg/metrics/maintainer.gopkg/pdutil/utils.gopkg/pdutil/utils_test.go
| keyspaceID := c.getChangefeed(cfID).GetKeyspaceID() | ||
| sessionEpoch := c.getChangefeed(cfID).GetCurrentMaintainerSessionEpoch() | ||
|
|
||
| // Remove working changefeed from maintainer if it's not in changefeedDB | ||
| _ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, cfID, from, true, true)) | ||
| _ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, cfID, from, true, true, sessionEpoch)) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Search for getChangefeed implementation to confirm it queries changefeedDB
ast-grep --pattern $'func (c *Controller) getChangefeed($_) $_ {
$$$
}'Repository: pingcap/ticdc
Length of output: 267
🏁 Script executed:
# Read the context around lines 460-510 to understand when handleNonExistentChangefeed is called
# and the condition that leads to it
sed -n '460,510p' coordinator/controller.goRepository: pingcap/ticdc
Length of output: 1602
Nil pointer dereference: getChangefeed will return nil here.
The handleNonExistentChangefeed function is called when cf == nil (line 469), meaning the changefeed is not in changefeedDB. However, lines 498-499 call c.getChangefeed(cfID) which queries the same changefeedDB and will return nil, causing a panic when accessing GetKeyspaceID() or GetCurrentMaintainerSessionEpoch().
The keyspaceID and sessionEpoch need to be obtained from the incoming status or another source, not from the non-existent changefeed.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@coordinator/controller.go` around lines 498 - 502,
handleNonExistentChangefeed must not call c.getChangefeed(cfID) because cf is
nil; instead obtain keyspaceID and sessionEpoch from the incoming status/message
(or from the function parameter that contains the remote changefeed status) and
pass those into changefeed.RemoveMaintainerMessage; locate the call site in
handleNonExistentChangefeed and replace keyspaceID :=
c.getChangefeed(cfID).GetKeyspaceID() and sessionEpoch :=
c.getChangefeed(cfID).GetCurrentMaintainerSessionEpoch() with values extracted
from the provided status object (or use safe defaults if the status lacks them),
then send the RemoveMaintainerMessage using those variables to avoid
dereferencing a nil changefeed.
| keyspaceID := c.getChangefeed(id).GetKeyspaceID() | ||
| _ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, id, rm.nodeID, true, true)) | ||
| sessionEpoch := c.getChangefeed(id).GetCurrentMaintainerSessionEpoch() | ||
| _ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, id, rm.nodeID, true, true, sessionEpoch)) |
There was a problem hiding this comment.
Same nil pointer dereference risk in finishBootstrap.
This code path handles changefeeds that are NOT in allChangefeeds (line 625: for id, rm := range runningChangefeeds—these are changefeeds remaining after deletion at line 612). Calling c.getChangefeed(id) will return nil since the changefeed was never added to changefeedDB in this bootstrap flow.
The keyspaceID and sessionEpoch should be obtained from the remoteMaintainer struct or the status information, not from the changefeedDB.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@coordinator/controller.go` around lines 630 - 632, In finishBootstrap, avoid
calling c.getChangefeed(id) for entries in runningChangefeeds because the
changefeed may not exist and c.getChangefeed(id) can return nil; instead obtain
keyspaceID and sessionEpoch directly from the remoteMaintainer (rm) or its
status (e.g., rm.KeyspaceID or rm.GetKeyspaceID and
rm.CurrentMaintainerSessionEpoch or rm.Status.GetCurrentMaintainerSessionEpoch)
and use those values when constructing changefeed.RemoveMaintainerMessage and
calling c.messageCenter.SendCommand, keeping the loop over runningChangefeeds
and the rm variable intact.
| func (e *DispatcherManager) AcceptBootstrapSession(maintainerID node.ID, sessionEpoch uint64) (bool, string) { | ||
| if sessionEpoch == 0 { | ||
| // Zero means the peer is still running the legacy protocol. We must keep | ||
| // accepting that path during rolling upgrade, otherwise a new dispatcher | ||
| // manager can no longer interoperate with an old maintainer. | ||
| return true, "legacy" | ||
| } | ||
|
|
||
| e.meta.Lock() | ||
| defer e.meta.Unlock() | ||
|
|
||
| switch { | ||
| case e.meta.maintainerSessionEpoch == 0: | ||
| e.meta.maintainerID = maintainerID | ||
| e.meta.maintainerSessionEpoch = sessionEpoch | ||
| return true, "install" | ||
| case sessionEpoch < e.meta.maintainerSessionEpoch: | ||
| return false, fmt.Sprintf("stale:%d<%d", sessionEpoch, e.meta.maintainerSessionEpoch) | ||
| case sessionEpoch == e.meta.maintainerSessionEpoch: | ||
| if e.meta.maintainerID != maintainerID { | ||
| e.meta.maintainerID = maintainerID | ||
| } | ||
| return true, "current" | ||
| default: | ||
| e.meta.maintainerID = maintainerID | ||
| e.meta.maintainerSessionEpoch = sessionEpoch | ||
| return true, "advance" | ||
| } | ||
| } | ||
|
|
||
| func (e *DispatcherManager) AcceptMaintainerSession(sessionEpoch uint64) (bool, string) { | ||
| if sessionEpoch == 0 { | ||
| // Zero-session control messages intentionally stay on the legacy path for | ||
| // mixed-version upgrade compatibility. Strict stale-session filtering only | ||
| // applies after both ends speak the session-aware protocol. | ||
| return true, "legacy" | ||
| } | ||
|
|
||
| current := e.GetMaintainerSessionEpoch() | ||
| switch { | ||
| case sessionEpoch < current: | ||
| return false, fmt.Sprintf("stale:%d<%d", sessionEpoch, current) | ||
| case sessionEpoch == current: | ||
| return true, "current" | ||
| default: | ||
| return false, fmt.Sprintf("future:%d>%d", sessionEpoch, current) | ||
| } |
There was a problem hiding this comment.
Reject legacy epoch 0 after a non-zero session is installed.
AcceptBootstrapSession and AcceptMaintainerSession both treat sessionEpoch == 0 as valid forever. Once the dispatcher manager has already advanced maintainerSessionEpoch, that still lets stale legacy traffic through the bootstrap/post-bootstrap/close gates; handleCloseRequest will then reach TryClose for a zero-epoch request. The legacy path should only stay open while the current session is still 0.
🛠️ Suggested fix
func (e *DispatcherManager) AcceptBootstrapSession(maintainerID node.ID, sessionEpoch uint64) (bool, string) {
- if sessionEpoch == 0 {
- // Zero means the peer is still running the legacy protocol. We must keep
- // accepting that path during rolling upgrade, otherwise a new dispatcher
- // manager can no longer interoperate with an old maintainer.
- return true, "legacy"
- }
-
e.meta.Lock()
defer e.meta.Unlock()
+
+ if sessionEpoch == 0 {
+ if e.meta.maintainerSessionEpoch == 0 {
+ return true, "legacy"
+ }
+ return false, fmt.Sprintf("stale:%d<%d", sessionEpoch, e.meta.maintainerSessionEpoch)
+ }
@@
func (e *DispatcherManager) AcceptMaintainerSession(sessionEpoch uint64) (bool, string) {
- if sessionEpoch == 0 {
- // Zero-session control messages intentionally stay on the legacy path for
- // mixed-version upgrade compatibility. Strict stale-session filtering only
- // applies after both ends speak the session-aware protocol.
- return true, "legacy"
- }
-
current := e.GetMaintainerSessionEpoch()
+ if sessionEpoch == 0 {
+ if current == 0 {
+ return true, "legacy"
+ }
+ return false, fmt.Sprintf("stale:%d<%d", sessionEpoch, current)
+ }
+
switch {
case sessionEpoch < current:
return false, fmt.Sprintf("stale:%d<%d", sessionEpoch, current)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@downstreamadapter/dispatchermanager/dispatcher_manager_info.go` around lines
83 - 129, Both AcceptBootstrapSession and AcceptMaintainerSession currently
allow sessionEpoch==0 forever; change them to reject legacy (0) once a non-zero
session is installed: in AcceptBootstrapSession, after the initial zero check,
inspect e.meta.maintainerSessionEpoch (under e.meta.Lock) and if it's non-zero
then return false with a descriptive reason (e.g., "stale:legacy" or similar)
instead of accepting; in AcceptMaintainerSession, replace the unconditional
return true for sessionEpoch==0 with a conditional that returns true only if
GetMaintainerSessionEpoch() == 0, otherwise return false with a descriptive
reason so zero-epoch messages are blocked after a session has been installed.
There was a problem hiding this comment.
♻️ Duplicate comments (2)
coordinator/controller.go (2)
498-502:⚠️ Potential issue | 🔴 CriticalNil pointer dereference:
c.getChangefeed(cfID)returns nil here.This function is called when the changefeed doesn't exist in
changefeedDB(line 468-470 checkscf == nil). Callingc.getChangefeed(cfID)again will returnnil, causing a panic when accessingGetKeyspaceID()orGetCurrentMaintainerSessionEpoch().The
sessionEpochshould be obtained fromstatus.SessionEpoch. ForkeyspaceID, check ifMaintainerStatusincludes the field, or use a default value of0for legacy compatibility.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/controller.go` around lines 498 - 502, The call to c.getChangefeed(cfID) can return nil and cause a panic when invoking GetKeyspaceID()/GetCurrentMaintainerSessionEpoch(); update the removal call so it does not call c.getChangefeed(cfID): obtain sessionEpoch from status.SessionEpoch and determine keyspaceID from status.MaintainerStatus if present (otherwise use 0 for legacy compatibility), then call c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, cfID, from, true, true, sessionEpoch)); ensure you remove the c.getChangefeed(cfID) usage and rely solely on the already-available status and its MaintainerStatus field.
633-635:⚠️ Potential issue | 🔴 CriticalNil pointer dereference:
c.getChangefeed(id)returns nil for stale maintainers.At this point in
finishBootstrap,runningChangefeedscontains only entries that were NOT inallChangefeeds(deleted at line 615). These changefeeds were never added tochangefeedDB, soc.getChangefeed(id)returnsnil.The fix should obtain values from
rm(the remoteMaintainer):
sessionEpochfromrm.status.SessionEpochkeyspaceIDfromrm.statusif available, otherwise use a default🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/controller.go` around lines 633 - 635, finishBootstrap dereferences c.getChangefeed(id) which can be nil for stale maintainers; instead use values from the remoteMaintainer (rm) passed into the loop: replace uses of c.getChangefeed(id).GetCurrentMaintainerSessionEpoch() and .GetKeyspaceID() with rm.status.SessionEpoch and rm.status.KeyspaceID (falling back to a sensible default if rm.status or KeyspaceID is nil/empty), and then call c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, id, rm.nodeID, true, true, sessionEpoch)); ensure you reference finishBootstrap, runningChangefeeds, rm (remoteMaintainer), and GetChangefeed only when non-nil.
🧹 Nitpick comments (2)
downstreamadapter/dispatchermanager/helper_test.go (2)
269-273: Add an immutability assertion for stale bootstrap requests.The stale case validates epoch immutability, but it should also assert
maintainerIDis not changed by a stale bootstrap attempt.Proposed test assertion
accepted, reason := dm.AcceptBootstrapSession(node.ID("new"), 9) require.False(t, accepted) require.Contains(t, reason, "stale") require.Equal(t, uint64(10), dm.GetMaintainerSessionEpoch()) + require.Equal(t, node.ID("old"), dm.GetMaintainerID())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/helper_test.go` around lines 269 - 273, Before calling dm.AcceptBootstrapSession for the stale case, capture the current maintainer ID (e.g., via dm.GetMaintainerID() or the existing maintainerID variable) and after the call assert it was not changed; specifically, add a require.Equal comparing the pre-call maintainer ID to the post-call maintainer ID (alongside the existing epoch immutability checks) to ensure a stale bootstrap attempt does not modify maintainerID.
405-432: Expand redo-forward filter coverage to include legacy and future epochs.This test verifies stale and current epochs, but it should also explicitly assert that
SessionEpoch == 0andSessionEpoch > currentare rejected without mutatingredoGlobalTs.Proposed additional cases
blocking := handler.Handle(dm, NewRedoResolvedTsForwardMessage(&heartbeatpb.RedoResolvedTsForwardMessage{ ChangefeedID: changefeedID.ToPB(), ResolvedTs: 150, SessionEpoch: 9, })) require.False(t, blocking) require.Equal(t, uint64(100), dm.redoGlobalTs.Load()) + + blocking = handler.Handle(dm, NewRedoResolvedTsForwardMessage(&heartbeatpb.RedoResolvedTsForwardMessage{ + ChangefeedID: changefeedID.ToPB(), + ResolvedTs: 150, + SessionEpoch: 0, + })) + require.False(t, blocking) + require.Equal(t, uint64(100), dm.redoGlobalTs.Load()) + + blocking = handler.Handle(dm, NewRedoResolvedTsForwardMessage(&heartbeatpb.RedoResolvedTsForwardMessage{ + ChangefeedID: changefeedID.ToPB(), + ResolvedTs: 150, + SessionEpoch: 11, + })) + require.False(t, blocking) + require.Equal(t, uint64(100), dm.redoGlobalTs.Load())🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@downstreamadapter/dispatchermanager/helper_test.go` around lines 405 - 432, Update TestRedoResolvedTsForwardMessageHandlerFiltersSessionEpoch to add two more assertions: send a RedoResolvedTsForwardMessage with SessionEpoch == 0 and one with SessionEpoch > dm.meta.maintainerSessionEpoch (e.g., 11) using NewRedoResolvedTsForwardMessage and the same changefeedID/ResolvedTs, call handler.Handle for each, assert it returns false, and assert dm.redoGlobalTs.Load() remains unchanged (still 100) after each call; this ensures RedoResolvedTsForwardMessage handling in RedoResolvedTsForwardMessageHandler correctly rejects legacy (0) and future epochs without mutating redoGlobalTs.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@coordinator/controller.go`:
- Around line 498-502: The call to c.getChangefeed(cfID) can return nil and
cause a panic when invoking GetKeyspaceID()/GetCurrentMaintainerSessionEpoch();
update the removal call so it does not call c.getChangefeed(cfID): obtain
sessionEpoch from status.SessionEpoch and determine keyspaceID from
status.MaintainerStatus if present (otherwise use 0 for legacy compatibility),
then call
c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, cfID,
from, true, true, sessionEpoch)); ensure you remove the c.getChangefeed(cfID)
usage and rely solely on the already-available status and its MaintainerStatus
field.
- Around line 633-635: finishBootstrap dereferences c.getChangefeed(id) which
can be nil for stale maintainers; instead use values from the remoteMaintainer
(rm) passed into the loop: replace uses of
c.getChangefeed(id).GetCurrentMaintainerSessionEpoch() and .GetKeyspaceID() with
rm.status.SessionEpoch and rm.status.KeyspaceID (falling back to a sensible
default if rm.status or KeyspaceID is nil/empty), and then call
c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, id,
rm.nodeID, true, true, sessionEpoch)); ensure you reference finishBootstrap,
runningChangefeeds, rm (remoteMaintainer), and GetChangefeed only when non-nil.
---
Nitpick comments:
In `@downstreamadapter/dispatchermanager/helper_test.go`:
- Around line 269-273: Before calling dm.AcceptBootstrapSession for the stale
case, capture the current maintainer ID (e.g., via dm.GetMaintainerID() or the
existing maintainerID variable) and after the call assert it was not changed;
specifically, add a require.Equal comparing the pre-call maintainer ID to the
post-call maintainer ID (alongside the existing epoch immutability checks) to
ensure a stale bootstrap attempt does not modify maintainerID.
- Around line 405-432: Update
TestRedoResolvedTsForwardMessageHandlerFiltersSessionEpoch to add two more
assertions: send a RedoResolvedTsForwardMessage with SessionEpoch == 0 and one
with SessionEpoch > dm.meta.maintainerSessionEpoch (e.g., 11) using
NewRedoResolvedTsForwardMessage and the same changefeedID/ResolvedTs, call
handler.Handle for each, assert it returns false, and assert
dm.redoGlobalTs.Load() remains unchanged (still 100) after each call; this
ensures RedoResolvedTsForwardMessage handling in
RedoResolvedTsForwardMessageHandler correctly rejects legacy (0) and future
epochs without mutating redoGlobalTs.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: e2d71c9d-83f3-4da7-8356-4aabecefc623
⛔ Files ignored due to path filters (1)
heartbeatpb/heartbeat.pb.gois excluded by!**/*.pb.go
📒 Files selected for processing (9)
coordinator/controller.gocoordinator/controller_test.gocoordinator/operator/operator_controller_test.godownstreamadapter/dispatchermanager/dispatcher_manager_info.godownstreamadapter/dispatchermanager/helper.godownstreamadapter/dispatchermanager/helper_test.goheartbeatpb/heartbeat.protomaintainer/maintainer.gomaintainer/maintainer_test.go
✅ Files skipped from review due to trivial changes (1)
- downstreamadapter/dispatchermanager/helper.go
🚧 Files skipped from review as they are similar to previous changes (6)
- coordinator/operator/operator_controller_test.go
- coordinator/controller_test.go
- maintainer/maintainer_test.go
- heartbeatpb/heartbeat.proto
- downstreamadapter/dispatchermanager/dispatcher_manager_info.go
- maintainer/maintainer.go
There was a problem hiding this comment.
🧹 Nitpick comments (1)
coordinator/operator/operator_controller.go (1)
324-339: Add caller-side timeout for PD epoch generation.
GenerateStrictSessionEpochinpkg/pdutil/utils.gopasses the context directly topdClient.GetTS(ctx)without creating a timeout-bounded child context. Currently, both callers at lines 325 and 334 usecontext.Background(), which has no timeout. Under PD degradation, these operations could block indefinitely.Proposed caller-side bounded context
func (oc *Controller) NewAddMaintainerOperator(cf *changefeed.Changefeed, dest node.ID) (operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus], error) { - sessionEpoch, err := pdutil.GenerateStrictSessionEpoch(context.Background(), oc.pdClient, cf.GetCurrentMaintainerSessionEpoch()) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + sessionEpoch, err := pdutil.GenerateStrictSessionEpoch(ctx, oc.pdClient, cf.GetCurrentMaintainerSessionEpoch()) if err != nil { return nil, err } return NewAddMaintainerOperator(oc.changefeedDB, cf, dest, sessionEpoch), nil } func (oc *Controller) NewMoveMaintainerOperator(cf *changefeed.Changefeed, origin, dest node.ID) (operator.Operator[common.ChangeFeedID, *heartbeatpb.MaintainerStatus], error) { activeSessionEpoch := cf.GetCurrentMaintainerSessionEpoch() - destSessionEpoch, err := pdutil.GenerateStrictSessionEpoch(context.Background(), oc.pdClient, activeSessionEpoch) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + destSessionEpoch, err := pdutil.GenerateStrictSessionEpoch(ctx, oc.pdClient, activeSessionEpoch) if err != nil { return nil, err } return NewMoveMaintainerOperator(oc.changefeedDB, cf, origin, dest, activeSessionEpoch, destSessionEpoch), nil }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/operator/operator_controller.go` around lines 324 - 339, The calls in Controller.NewAddMaintainerOperator and Controller.NewMoveMaintainerOperator currently pass context.Background() into pdutil.GenerateStrictSessionEpoch (which delegates to oc.pdClient.GetTS) and can block; change both to create a timeout-bounded context (e.g., ctx, cancel := context.WithTimeout(context.Background(), <reasonable timeout>) with defer cancel()) and pass that ctx to pdutil.GenerateStrictSessionEpoch; ensure you choose a shared constant timeout name (e.g., pdEpochTimeout) visible in the package and always defer cancel() after creating the child context.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@coordinator/operator/operator_controller.go`:
- Around line 324-339: The calls in Controller.NewAddMaintainerOperator and
Controller.NewMoveMaintainerOperator currently pass context.Background() into
pdutil.GenerateStrictSessionEpoch (which delegates to oc.pdClient.GetTS) and can
block; change both to create a timeout-bounded context (e.g., ctx, cancel :=
context.WithTimeout(context.Background(), <reasonable timeout>) with defer
cancel()) and pass that ctx to pdutil.GenerateStrictSessionEpoch; ensure you
choose a shared constant timeout name (e.g., pdEpochTimeout) visible in the
package and always defer cancel() after creating the child context.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 50f06f97-a8d2-4b5b-8fc4-bc7ae78137cf
📒 Files selected for processing (7)
coordinator/changefeed/changefeed.gocoordinator/operator/operator_add.gocoordinator/operator/operator_add_test.gocoordinator/operator/operator_controller.gocoordinator/operator/operator_controller_test.gocoordinator/operator/operator_move.gocoordinator/operator/operator_move_test.go
🚧 Files skipped from review as they are similar to previous changes (4)
- coordinator/operator/operator_add.go
- coordinator/operator/operator_add_test.go
- coordinator/operator/operator_move.go
- coordinator/operator/operator_controller_test.go
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@coordinator/controller.go`:
- Around line 506-512: removeReportedMaintainer currently dereferences
c.getChangefeed(cfID) which can be nil; update removeReportedMaintainer to first
grab cf := c.getChangefeed(cfID) and if cf == nil set keyspaceID := uint32(0)
(default for the handleNonExistentChangefeed path) else keyspaceID =
cf.GetKeyspaceID(); then call
c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, cfID,
from, true, true, sessionEpoch)). Additionally, update the finishBootstrap
caller to try to extract keyspaceID from the remote status and pass it into
removeReportedMaintainer (or ensure the code path populates the changefeed so
GetKeyspaceID is non-nil) so finishBootstrap doesn't rely on a nil getChangefeed
result.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 77ecc061-1646-40d5-ac5a-bc8585f8428a
📒 Files selected for processing (11)
coordinator/controller.gocoordinator/controller_test.gocoordinator/coordinator_test.gocoordinator/operator/operator_add.gocoordinator/operator/operator_add_test.gocoordinator/operator/operator_controller.gocoordinator/operator/operator_controller_test.gocoordinator/operator/operator_move.gocoordinator/operator/operator_move_test.gocoordinator/operator/operator_stop.gocoordinator/operator/operator_stop_test.go
✅ Files skipped from review due to trivial changes (1)
- coordinator/operator/operator_add_test.go
🚧 Files skipped from review as they are similar to previous changes (3)
- coordinator/operator/operator_stop_test.go
- coordinator/operator/operator_add.go
- coordinator/operator/operator_move.go
| // removeReportedMaintainer removes a working maintainer that is only visible via | ||
| // remote runtime state. Once local metadata is gone, the reporter's session is | ||
| // the only fenced owner token we can still trust for cleanup. | ||
| func (c *Controller) removeReportedMaintainer(cfID common.ChangeFeedID, from node.ID, sessionEpoch uint64) { | ||
| keyspaceID := c.getChangefeed(cfID).GetKeyspaceID() | ||
| _ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, cfID, from, true, true, sessionEpoch)) | ||
| } |
There was a problem hiding this comment.
Nil pointer dereference: getChangefeed returns nil when local metadata is missing.
removeReportedMaintainer is called from handleNonExistentChangefeed (line 502) and finishBootstrap (line 680) when the changefeed doesn't exist locally. However, line 510 calls c.getChangefeed(cfID).GetKeyspaceID() which will panic because getChangefeed returns nil.
For the handleNonExistentChangefeed path, keyspaceID should default to 0 since local metadata is unavailable. For finishBootstrap, the keyspace info could potentially be extracted from the remote status if available.
🐛 Proposed fix
func (c *Controller) removeReportedMaintainer(cfID common.ChangeFeedID, from node.ID, sessionEpoch uint64) {
- keyspaceID := c.getChangefeed(cfID).GetKeyspaceID()
+ keyspaceID := uint32(0)
+ if cf := c.getChangefeed(cfID); cf != nil {
+ keyspaceID = cf.GetKeyspaceID()
+ }
_ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, cfID, from, true, true, sessionEpoch))
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@coordinator/controller.go` around lines 506 - 512, removeReportedMaintainer
currently dereferences c.getChangefeed(cfID) which can be nil; update
removeReportedMaintainer to first grab cf := c.getChangefeed(cfID) and if cf ==
nil set keyspaceID := uint32(0) (default for the handleNonExistentChangefeed
path) else keyspaceID = cf.GetKeyspaceID(); then call
c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, cfID,
from, true, true, sessionEpoch)). Additionally, update the finishBootstrap
caller to try to extract keyspaceID from the remote status and pass it into
removeReportedMaintainer (or ensure the code path populates the changefeed so
GetKeyspaceID is non-nil) so finishBootstrap doesn't rely on a nil getChangefeed
result.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
coordinator/controller.go (1)
508-510:⚠️ Potential issue | 🔴 CriticalNil dereference in stale-maintainer cleanup.
Line 509 dereferences
c.getChangefeed(cfID)even though this helper is only used after we've already concluded local metadata is missing or stale. That panics before the remove request is sent, so late maintainer reports can still break the coordinator during the exact cleanup flow this PR is adding.🐛 Minimal fix
func (c *Controller) removeReportedMaintainer(cfID common.ChangeFeedID, from node.ID, sessionEpoch uint64) { - keyspaceID := c.getChangefeed(cfID).GetKeyspaceID() + keyspaceID := uint32(0) + if cf := c.getChangefeed(cfID); cf != nil { + keyspaceID = cf.GetKeyspaceID() + } _ = c.messageCenter.SendCommand(changefeed.RemoveMaintainerMessage(keyspaceID, cfID, from, true, true, sessionEpoch)) }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@coordinator/controller.go` around lines 508 - 510, The removeReportedMaintainer function dereferences c.getChangefeed(cfID) unconditionally; change it to call c.getChangefeed(cfID) once into a local variable, check for nil, and only call GetKeyspaceID when the result is non-nil—otherwise use a sensible zero/empty keyspaceID (or omit it if RemoveMaintainerMessage accepts that) and still invoke c.messageCenter.SendCommand with changefeed.RemoveMaintainerMessage(cfID, from, true, true, sessionEpoch) (referencing removeReportedMaintainer, c.getChangefeed, GetKeyspaceID, messageCenter.SendCommand, and changefeed.RemoveMaintainerMessage).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@coordinator/controller.go`:
- Around line 508-510: The removeReportedMaintainer function dereferences
c.getChangefeed(cfID) unconditionally; change it to call c.getChangefeed(cfID)
once into a local variable, check for nil, and only call GetKeyspaceID when the
result is non-nil—otherwise use a sensible zero/empty keyspaceID (or omit it if
RemoveMaintainerMessage accepts that) and still invoke
c.messageCenter.SendCommand with changefeed.RemoveMaintainerMessage(cfID, from,
true, true, sessionEpoch) (referencing removeReportedMaintainer,
c.getChangefeed, GetKeyspaceID, messageCenter.SendCommand, and
changefeed.RemoveMaintainerMessage).
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6f0d1c55-cb0a-4513-bcaf-af2876da7084
📒 Files selected for processing (9)
coordinator/changefeed/changefeed.gocoordinator/controller.gocoordinator/controller_test.gocoordinator/operator/operator_add.gocoordinator/operator/operator_controller.gocoordinator/operator/operator_controller_test.gocoordinator/operator/operator_move.gocoordinator/operator/operator_stop.gocoordinator/operator/operator_stop_test.go
🚧 Files skipped from review as they are similar to previous changes (5)
- coordinator/operator/operator_add.go
- coordinator/operator/operator_move.go
- coordinator/operator/operator_controller_test.go
- coordinator/operator/operator_stop.go
- coordinator/changefeed/changefeed.go
|
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
Background
Maintainer and dispatcher manager currently rely on resend and node placement to converge during maintainer restart or move. Without an explicit runtime session boundary, late control messages and late runtime reports can still be processed by the wrong maintainer session during handoff.
Motivation
This change adds a minimal runtime-only session epoch so stale maintainer and dispatcher-manager messages stop mutating the active state machine, while preserving rolling upgrade compatibility and avoiding any new etcd persistent fields.
Summary of Changes
session_epochto the maintainer/dispatcher-manager runtime protobuf messages used in add/remove, bootstrap, post-bootstrap, close, heartbeat, block-status, redo progress, schedule, and merge flowssessionEpoch == 0path for mixed-version rolling upgrade compatibilityTesting
make fmtgo test ./pkg/pdutil ./coordinator/changefeed ./coordinator/operator ./maintainer ./downstreamadapter/dispatchermanager ./downstreamadapter/dispatcherorchestratorgo test ./coordinator -count=1 -run 'Test(OnPeriodTaskAdvanceLiveness|HandleNonExistentChangefeedUsesLegacyRemoveWhenLocalMetadataMissing|ResumeChangefeed|ResumeChangefeedNormalState|ResumeChangefeedOverwriteUpdatesLastSavedCheckpointTs|ResumeChangefeedIgnoresStaleMaintainerErrorAndSchedules|PauseChangefeed|UpdateChangefeed|GetChangefeed|RemoveChangefeed|ListChangefeed|CreateChangefeed|CreateChangefeedDoesNotUpdateGCSafepoint|UpdateGCSafepointCallsGCManagerUpdate|UpdateGCSafepointDeletesServiceSafepointWhenNoChangefeed|RemoveLastChangefeedDeletesServiceSafepointImmediately|ConcurrentDeleteLastChangefeedAndCreateNewOneKeepsExpectedGCSafepoint)$'go test ./coordinator -count=1 -run 'TestCoordinatorScheduling$'Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Chores
Tests