Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 35 additions & 33 deletions coordinator/coordinator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,9 @@ func TestCoordinatorScheduling(t *testing.T) {
}

func TestScaleNode(t *testing.T) {
ctx := context.Background()
info := node.NewInfo("127.0.0.1:28300", "")
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
info, lis1 := newMaintainerNodeForTest(t)
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
Expand All @@ -384,13 +385,10 @@ func TestScaleNode(t *testing.T) {
cfg := config.NewDefaultMessageCenterConfig(info.AdvertiseAddr)
mc1 := messaging.NewMessageCenter(ctx, info.ID, cfg, nil)
mc1.Run(ctx)
defer func() {
mc1.Close()
log.Info("close message center 1")
}()

appcontext.SetService(appcontext.MessageCenter, mc1)
startMaintainerNode(ctx, info, mc1, nodeManager)
node1 := startMaintainerNode(ctx, info, mc1, nodeManager, lis1)
t.Cleanup(node1.stop)

serviceID := "default"

Expand Down Expand Up @@ -428,23 +426,16 @@ func TestScaleNode(t *testing.T) {
}, waitTime, time.Millisecond*5)

// add two nodes
info2 := node.NewInfo("127.0.0.1:28400", "")
info2, lis2 := newMaintainerNodeForTest(t)
mc2 := messaging.NewMessageCenter(ctx, info2.ID, config.NewDefaultMessageCenterConfig(info2.AdvertiseAddr), nil)
mc2.Run(ctx)
defer func() {
mc2.Close()
log.Info("close message center 2")
}()
startMaintainerNode(ctx, info2, mc2, nodeManager)
info3 := node.NewInfo("127.0.0.1:28500", "")
node2 := startMaintainerNode(ctx, info2, mc2, nodeManager, lis2)
t.Cleanup(node2.stop)
info3, lis3 := newMaintainerNodeForTest(t)
mc3 := messaging.NewMessageCenter(ctx, info3.ID, config.NewDefaultMessageCenterConfig(info3.AdvertiseAddr), nil)
mc3.Run(ctx)
defer func() {
mc3.Close()
log.Info("close message center 3")
}()

startMaintainerNode(ctx, info3, mc3, nodeManager)
node3 := startMaintainerNode(ctx, info3, mc3, nodeManager, lis3)
t.Cleanup(node3.stop)

log.Info("Start maintainer node",
zap.Stringer("id", info3.ID),
Expand Down Expand Up @@ -490,7 +481,7 @@ func TestScaleNode(t *testing.T) {
func TestBootstrapWithUnStoppedChangefeed(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
info := node.NewInfo("127.0.0.1:28301", "")
info, lis := newMaintainerNodeForTest(t)
etcdClient := newMockEtcdClient(string(info.ID))
nodeManager := watcher.NewNodeManager(nil, etcdClient)
appcontext.SetService(watcher.NodeManagerName, nodeManager)
Expand All @@ -500,10 +491,10 @@ func TestBootstrapWithUnStoppedChangefeed(t *testing.T) {

mc1 := messaging.NewMessageCenter(ctx, info.ID, config.NewDefaultMessageCenterConfig(info.AdvertiseAddr), nil)
mc1.Run(ctx)
defer mc1.Close()

appcontext.SetService(appcontext.MessageCenter, mc1)
mNode := startMaintainerNode(ctx, info, mc1, nodeManager)
mNode := startMaintainerNode(ctx, info, mc1, nodeManager, lis)
defer mNode.stop()

removingCf1 := &changefeed.ChangefeedMetaWrapper{
Info: &config.ChangeFeedInfo{
Expand Down Expand Up @@ -712,40 +703,51 @@ type maintainNode struct {
cancel context.CancelFunc
mc messaging.MessageCenter
manager *mockMaintainerManager
wg sync.WaitGroup
}

func (d *maintainNode) stop() {
d.mc.Close()
d.cancel()
d.wg.Wait()
d.mc.Close()
}
Comment on lines +706 to +713
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Expect one wg.Add(1) and two goroutine launches inside startMaintainerNode.
rg -n -A25 -B5 'func \(d \*maintainNode\) stop|func startMaintainerNode|wg\.Add\(|go func\(\)|grpcServer\.Serve' coordinator/coordinator_test.go

Repository: pingcap/ticdc

Length of output: 7895


Wait for the gRPC serve loop and track its errors.

stop() only waits on res.wg, but startMaintainerNode increments that wait group once at line 737 while launching two goroutines: the outer function and the inner gRPC serve loop at line 744. This means cleanup can return before the serve loop exits. Additionally, if grpcServer.Serve(lis) fails unexpectedly before maintainerM.Run(ctx) completes, the error is swallowed and grpcServer.Stop() is never called, leaving the serve goroutine hanging.

♻️ Suggested change
 func startMaintainerNode(ctx context.Context,
 	node *node.Info, mc messaging.MessageCenter,
 	nodeManager *watcher.NodeManager,
 	lis net.Listener,
 ) *maintainNode {
 	nodeManager.RegisterNodeChangeHandler(node.ID, mc.OnNodeChanges)
 	ctx, cancel := context.WithCancel(ctx)
 	maintainerM := NewMaintainerManager(mc)
+	var opts []grpc.ServerOption
+	grpcServer := grpc.NewServer(opts...)
+	mcs := messaging.NewMessageCenterServer(mc)
+	proto.RegisterMessageServiceServer(grpcServer, mcs)
 	res := &maintainNode{
 		cancel:  cancel,
 		mc:      mc,
 		manager: maintainerM,
 	}
-	res.wg.Add(1)
+	res.wg.Add(2)
 	go func() {
 		defer res.wg.Done()
-		var opts []grpc.ServerOption
-		grpcServer := grpc.NewServer(opts...)
-		mcs := messaging.NewMessageCenterServer(mc)
-		proto.RegisterMessageServiceServer(grpcServer, mcs)
-		go func() {
-			_ = grpcServer.Serve(lis)
-		}()
+		defer grpcServer.Stop()
 		_ = maintainerM.Run(ctx)
-		grpcServer.Stop()
+	}()
+	go func() {
+		defer res.wg.Done()
+		if err := grpcServer.Serve(lis); err != nil && ctx.Err() == nil {
+			log.Warn("maintainer test grpc server exited unexpectedly", zap.Error(err))
+		}
 	}()
 	return res
 }

As per coding guidelines for **/*_test.go: favor deterministic tests and use testify/require.

Also applies to: 732-749

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@coordinator/coordinator_test.go` around lines 706 - 713, The test currently
only waits on the maintainer waitgroup and can return before the inner gRPC
serve goroutine exits and swallows serve errors; update startMaintainerNode to
track the serve goroutine’s lifecycle and errors (e.g., increment the waitgroup
for both goroutines or add a dedicated errCh), have the serve goroutine send any
grpcServer.Serve(lis) error to that channel, ensure grpcServer.Stop() is invoked
if Serve returns unexpectedly, and modify maintainNode.stop to wait for the
serve goroutine (or observe errCh) before closing resources; in the test assert
the serve/maintainer error using testify/require (e.g., require.NoError) so
failures are deterministic.


func newMaintainerNodeForTest(t *testing.T) (*node.Info, net.Listener) {
t.Helper()

lis, err := net.Listen("tcp", "127.0.0.1:0")
require.NoError(t, err)

return node.NewInfo(lis.Addr().String(), ""), lis
}

func startMaintainerNode(ctx context.Context,
node *node.Info, mc messaging.MessageCenter,
nodeManager *watcher.NodeManager,
lis net.Listener,
) *maintainNode {
nodeManager.RegisterNodeChangeHandler(node.ID, mc.OnNodeChanges)
ctx, cancel := context.WithCancel(ctx)
maintainerM := NewMaintainerManager(mc)
res := &maintainNode{
cancel: cancel,
mc: mc,
manager: maintainerM,
}
res.wg.Add(1)
go func() {
defer res.wg.Done()
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
mcs := messaging.NewMessageCenterServer(mc)
proto.RegisterMessageServiceServer(grpcServer, mcs)
lis, err := net.Listen("tcp", node.AdvertiseAddr)
if err != nil {
panic(err)
}
go func() {
_ = grpcServer.Serve(lis)
}()
_ = maintainerM.Run(ctx)
grpcServer.Stop()
}()
return &maintainNode{
cancel: cancel,
mc: mc,
manager: maintainerM,
}
return res
}

type mockEtcdClient struct {
Expand Down
9 changes: 7 additions & 2 deletions coordinator/operator/operator_move.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,13 @@ func (m *MoveMaintainerOperator) OnNodeRemove(n node.ID) {
m.lck.Lock()
defer m.lck.Unlock()

if m.finished || m.canceled {
if m.canceled {
return
}

if n == m.dest {
// the origin node is finished, we must mark the maintainer as absent to reschedule it again
// Node removal must win over a just-finished move. Otherwise PostFinish can still
// mark the changefeed replicating on a node that has already been removed.
if m.originNodeStopped {
log.Info("dest node is stopped, mark changefeed absent",
zap.String("changefeed", m.changefeed.ID.String()),
Expand All @@ -123,6 +124,10 @@ func (m *MoveMaintainerOperator) OnNodeRemove(n node.ID) {
m.db.BindChangefeedToNode(m.dest, m.origin, m.changefeed)
m.bind = true
m.originNodeStopped = true
return
}
if m.finished {
return
}
if n == m.origin {
log.Info("origin node is stopped",
Expand Down
33 changes: 33 additions & 0 deletions coordinator/operator/operator_move_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/pingcap/ticdc/heartbeatpb"
"github.com/pingcap/ticdc/pkg/common"
"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/node"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -106,3 +107,35 @@ func TestMoveMaintainerOperator_CheckRequiresDestBootstrapDone(t *testing.T) {
require.True(t, op.finished)
require.Nil(t, op.Schedule())
}

func TestMoveMaintainerOperator_OnNodeRemoveAfterFinishMarksAbsent(t *testing.T) {
changefeedDB := changefeed.NewChangefeedDB(1216)
cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName)
cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{
ChangefeedID: cfID,
Config: config.GetDefaultReplicaConfig(),
SinkURI: "mysql://127.0.0.1:3306",
},
1, true)
changefeedDB.AddReplicatingMaintainer(cf, "n1")

op := NewMoveMaintainerOperator(changefeedDB, cf, "n1", "n2")
op.Check("n1", &heartbeatpb.MaintainerStatus{State: heartbeatpb.ComponentState_Stopped})
require.NotNil(t, op.Schedule())
require.Equal(t, node.ID("n2"), cf.GetNodeID())

op.Check("n2", &heartbeatpb.MaintainerStatus{
State: heartbeatpb.ComponentState_Working,
BootstrapDone: true,
})
require.True(t, op.finished)

op.OnNodeRemove("n2")
require.True(t, op.canceled)
require.Equal(t, 1, changefeedDB.GetAbsentSize())
require.Len(t, changefeedDB.GetByNodeID("n2"), 0)

op.PostFinish()
require.Equal(t, 1, changefeedDB.GetAbsentSize())
require.Len(t, changefeedDB.GetByNodeID("n2"), 0)
}
2 changes: 2 additions & 0 deletions pkg/orchestrator/reactor_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro
}

log.Info("remote capture online", zap.Any("info", newCaptureInfo), zap.String("role", s.Role))
// A fresh online event supersedes any pending delayed removal for the same capture.
delete(s.toRemoveCaptures, k.CaptureID)
if s.onCaptureAdded != nil {
s.onCaptureAdded(k.CaptureID, newCaptureInfo.AdvertiseAddr)
}
Expand Down
102 changes: 102 additions & 0 deletions pkg/orchestrator/reactor_state_capture_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright 2026 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package orchestrator

import (
"testing"
"time"

"github.com/pingcap/ticdc/pkg/config"
"github.com/pingcap/ticdc/pkg/etcd"
"github.com/pingcap/ticdc/pkg/orchestrator/util"
"github.com/stretchr/testify/require"
)

func TestGlobalReactorStateKeepsCaptureAfterReRegister(t *testing.T) {
t.Parallel()

state := NewGlobalState(etcd.DefaultCDCClusterID, 0)
state.captureRemoveTTL = 10

captureID := config.CaptureID("capture-1")
var removed []config.CaptureID
state.SetOnCaptureRemoved(func(id config.CaptureID) {
removed = append(removed, id)
})

mustUpdateCapture(t, state, captureID, "127.0.0.1:8300")
mustDeleteCapture(t, state, captureID)
state.toRemoveCaptures[captureID] = time.Now().Add(-11 * time.Second)
mustUpdateCapture(t, state, captureID, "127.0.0.1:8301")

state.UpdatePendingChange()

require.Contains(t, state.Captures, captureID)
require.Equal(t, "127.0.0.1:8301", state.Captures[captureID].AdvertiseAddr)
require.Empty(t, removed)
require.NotContains(t, state.toRemoveCaptures, captureID)
}

func TestGlobalReactorStateRemovesCaptureAfterTombstoneExpires(t *testing.T) {
t.Parallel()

state := NewGlobalState(etcd.DefaultCDCClusterID, 0)
state.captureRemoveTTL = 10

captureID := config.CaptureID("capture-1")
var removed []config.CaptureID
state.SetOnCaptureRemoved(func(id config.CaptureID) {
removed = append(removed, id)
})

mustUpdateCapture(t, state, captureID, "127.0.0.1:8300")
mustDeleteCapture(t, state, captureID)

state.UpdatePendingChange()
require.Contains(t, state.Captures, captureID)
require.Empty(t, removed)

state.toRemoveCaptures[captureID] = time.Now().Add(-11 * time.Second)
state.UpdatePendingChange()

require.NotContains(t, state.Captures, captureID)
require.Equal(t, []config.CaptureID{captureID}, removed)
require.NotContains(t, state.toRemoveCaptures, captureID)
}

func mustUpdateCapture(
t *testing.T,
state *GlobalReactorState,
captureID config.CaptureID,
advertiseAddr string,
) {
t.Helper()

info := &config.CaptureInfo{
ID: captureID,
AdvertiseAddr: advertiseAddr,
}
data, err := info.Marshal()
require.NoError(t, err)

err = state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, string(captureID))), data, false)
require.NoError(t, err)
}

func mustDeleteCapture(t *testing.T, state *GlobalReactorState, captureID config.CaptureID) {
t.Helper()

err := state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, string(captureID))), nil, false)
require.NoError(t, err)
}
Loading