diff --git a/coordinator/coordinator_test.go b/coordinator/coordinator_test.go index 1c4dc0c173..0982f439cd 100644 --- a/coordinator/coordinator_test.go +++ b/coordinator/coordinator_test.go @@ -373,8 +373,9 @@ func TestCoordinatorScheduling(t *testing.T) { } func TestScaleNode(t *testing.T) { - ctx := context.Background() - info := node.NewInfo("127.0.0.1:28300", "") + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + info, lis1 := newMaintainerNodeForTest(t) etcdClient := newMockEtcdClient(string(info.ID)) nodeManager := watcher.NewNodeManager(nil, etcdClient) appcontext.SetService(watcher.NodeManagerName, nodeManager) @@ -384,13 +385,10 @@ func TestScaleNode(t *testing.T) { cfg := config.NewDefaultMessageCenterConfig(info.AdvertiseAddr) mc1 := messaging.NewMessageCenter(ctx, info.ID, cfg, nil) mc1.Run(ctx) - defer func() { - mc1.Close() - log.Info("close message center 1") - }() appcontext.SetService(appcontext.MessageCenter, mc1) - startMaintainerNode(ctx, info, mc1, nodeManager) + node1 := startMaintainerNode(ctx, info, mc1, nodeManager, lis1) + t.Cleanup(node1.stop) serviceID := "default" @@ -428,23 +426,16 @@ func TestScaleNode(t *testing.T) { }, waitTime, time.Millisecond*5) // add two nodes - info2 := node.NewInfo("127.0.0.1:28400", "") + info2, lis2 := newMaintainerNodeForTest(t) mc2 := messaging.NewMessageCenter(ctx, info2.ID, config.NewDefaultMessageCenterConfig(info2.AdvertiseAddr), nil) mc2.Run(ctx) - defer func() { - mc2.Close() - log.Info("close message center 2") - }() - startMaintainerNode(ctx, info2, mc2, nodeManager) - info3 := node.NewInfo("127.0.0.1:28500", "") + node2 := startMaintainerNode(ctx, info2, mc2, nodeManager, lis2) + t.Cleanup(node2.stop) + info3, lis3 := newMaintainerNodeForTest(t) mc3 := messaging.NewMessageCenter(ctx, info3.ID, config.NewDefaultMessageCenterConfig(info3.AdvertiseAddr), nil) mc3.Run(ctx) - defer func() { - mc3.Close() - log.Info("close message center 3") - }() - - startMaintainerNode(ctx, info3, mc3, nodeManager) + node3 := startMaintainerNode(ctx, info3, mc3, nodeManager, lis3) + t.Cleanup(node3.stop) log.Info("Start maintainer node", zap.Stringer("id", info3.ID), @@ -490,7 +481,7 @@ func TestScaleNode(t *testing.T) { func TestBootstrapWithUnStoppedChangefeed(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - info := node.NewInfo("127.0.0.1:28301", "") + info, lis := newMaintainerNodeForTest(t) etcdClient := newMockEtcdClient(string(info.ID)) nodeManager := watcher.NewNodeManager(nil, etcdClient) appcontext.SetService(watcher.NodeManagerName, nodeManager) @@ -500,10 +491,10 @@ func TestBootstrapWithUnStoppedChangefeed(t *testing.T) { mc1 := messaging.NewMessageCenter(ctx, info.ID, config.NewDefaultMessageCenterConfig(info.AdvertiseAddr), nil) mc1.Run(ctx) - defer mc1.Close() appcontext.SetService(appcontext.MessageCenter, mc1) - mNode := startMaintainerNode(ctx, info, mc1, nodeManager) + mNode := startMaintainerNode(ctx, info, mc1, nodeManager, lis) + defer mNode.stop() removingCf1 := &changefeed.ChangefeedMetaWrapper{ Info: &config.ChangeFeedInfo{ @@ -712,40 +703,51 @@ type maintainNode struct { cancel context.CancelFunc mc messaging.MessageCenter manager *mockMaintainerManager + wg sync.WaitGroup } func (d *maintainNode) stop() { - d.mc.Close() d.cancel() + d.wg.Wait() + d.mc.Close() +} + +func newMaintainerNodeForTest(t *testing.T) (*node.Info, net.Listener) { + t.Helper() + + lis, err := net.Listen("tcp", "127.0.0.1:0") + require.NoError(t, err) + + return node.NewInfo(lis.Addr().String(), ""), lis } func startMaintainerNode(ctx context.Context, node *node.Info, mc messaging.MessageCenter, nodeManager *watcher.NodeManager, + lis net.Listener, ) *maintainNode { nodeManager.RegisterNodeChangeHandler(node.ID, mc.OnNodeChanges) ctx, cancel := context.WithCancel(ctx) maintainerM := NewMaintainerManager(mc) + res := &maintainNode{ + cancel: cancel, + mc: mc, + manager: maintainerM, + } + res.wg.Add(1) go func() { + defer res.wg.Done() var opts []grpc.ServerOption grpcServer := grpc.NewServer(opts...) mcs := messaging.NewMessageCenterServer(mc) proto.RegisterMessageServiceServer(grpcServer, mcs) - lis, err := net.Listen("tcp", node.AdvertiseAddr) - if err != nil { - panic(err) - } go func() { _ = grpcServer.Serve(lis) }() _ = maintainerM.Run(ctx) grpcServer.Stop() }() - return &maintainNode{ - cancel: cancel, - mc: mc, - manager: maintainerM, - } + return res } type mockEtcdClient struct { diff --git a/coordinator/operator/operator_move.go b/coordinator/operator/operator_move.go index 382d927eef..560b1a1088 100644 --- a/coordinator/operator/operator_move.go +++ b/coordinator/operator/operator_move.go @@ -98,12 +98,13 @@ func (m *MoveMaintainerOperator) OnNodeRemove(n node.ID) { m.lck.Lock() defer m.lck.Unlock() - if m.finished || m.canceled { + if m.canceled { return } if n == m.dest { - // the origin node is finished, we must mark the maintainer as absent to reschedule it again + // Node removal must win over a just-finished move. Otherwise PostFinish can still + // mark the changefeed replicating on a node that has already been removed. if m.originNodeStopped { log.Info("dest node is stopped, mark changefeed absent", zap.String("changefeed", m.changefeed.ID.String()), @@ -123,6 +124,10 @@ func (m *MoveMaintainerOperator) OnNodeRemove(n node.ID) { m.db.BindChangefeedToNode(m.dest, m.origin, m.changefeed) m.bind = true m.originNodeStopped = true + return + } + if m.finished { + return } if n == m.origin { log.Info("origin node is stopped", diff --git a/coordinator/operator/operator_move_test.go b/coordinator/operator/operator_move_test.go index e148fc16b1..4b9d4ec3cb 100644 --- a/coordinator/operator/operator_move_test.go +++ b/coordinator/operator/operator_move_test.go @@ -20,6 +20,7 @@ import ( "github.com/pingcap/ticdc/heartbeatpb" "github.com/pingcap/ticdc/pkg/common" "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/node" "github.com/stretchr/testify/require" ) @@ -106,3 +107,35 @@ func TestMoveMaintainerOperator_CheckRequiresDestBootstrapDone(t *testing.T) { require.True(t, op.finished) require.Nil(t, op.Schedule()) } + +func TestMoveMaintainerOperator_OnNodeRemoveAfterFinishMarksAbsent(t *testing.T) { + changefeedDB := changefeed.NewChangefeedDB(1216) + cfID := common.NewChangeFeedIDWithName("test", common.DefaultKeyspaceName) + cf := changefeed.NewChangefeed(cfID, &config.ChangeFeedInfo{ + ChangefeedID: cfID, + Config: config.GetDefaultReplicaConfig(), + SinkURI: "mysql://127.0.0.1:3306", + }, + 1, true) + changefeedDB.AddReplicatingMaintainer(cf, "n1") + + op := NewMoveMaintainerOperator(changefeedDB, cf, "n1", "n2") + op.Check("n1", &heartbeatpb.MaintainerStatus{State: heartbeatpb.ComponentState_Stopped}) + require.NotNil(t, op.Schedule()) + require.Equal(t, node.ID("n2"), cf.GetNodeID()) + + op.Check("n2", &heartbeatpb.MaintainerStatus{ + State: heartbeatpb.ComponentState_Working, + BootstrapDone: true, + }) + require.True(t, op.finished) + + op.OnNodeRemove("n2") + require.True(t, op.canceled) + require.Equal(t, 1, changefeedDB.GetAbsentSize()) + require.Len(t, changefeedDB.GetByNodeID("n2"), 0) + + op.PostFinish() + require.Equal(t, 1, changefeedDB.GetAbsentSize()) + require.Len(t, changefeedDB.GetByNodeID("n2"), 0) +} diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index b48ddad95c..a118278a1d 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -115,6 +115,8 @@ func (s *GlobalReactorState) Update(key util.EtcdKey, value []byte, _ bool) erro } log.Info("remote capture online", zap.Any("info", newCaptureInfo), zap.String("role", s.Role)) + // A fresh online event supersedes any pending delayed removal for the same capture. + delete(s.toRemoveCaptures, k.CaptureID) if s.onCaptureAdded != nil { s.onCaptureAdded(k.CaptureID, newCaptureInfo.AdvertiseAddr) } diff --git a/pkg/orchestrator/reactor_state_capture_test.go b/pkg/orchestrator/reactor_state_capture_test.go new file mode 100644 index 0000000000..032368e7f7 --- /dev/null +++ b/pkg/orchestrator/reactor_state_capture_test.go @@ -0,0 +1,102 @@ +// Copyright 2026 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package orchestrator + +import ( + "testing" + "time" + + "github.com/pingcap/ticdc/pkg/config" + "github.com/pingcap/ticdc/pkg/etcd" + "github.com/pingcap/ticdc/pkg/orchestrator/util" + "github.com/stretchr/testify/require" +) + +func TestGlobalReactorStateKeepsCaptureAfterReRegister(t *testing.T) { + t.Parallel() + + state := NewGlobalState(etcd.DefaultCDCClusterID, 0) + state.captureRemoveTTL = 10 + + captureID := config.CaptureID("capture-1") + var removed []config.CaptureID + state.SetOnCaptureRemoved(func(id config.CaptureID) { + removed = append(removed, id) + }) + + mustUpdateCapture(t, state, captureID, "127.0.0.1:8300") + mustDeleteCapture(t, state, captureID) + state.toRemoveCaptures[captureID] = time.Now().Add(-11 * time.Second) + mustUpdateCapture(t, state, captureID, "127.0.0.1:8301") + + state.UpdatePendingChange() + + require.Contains(t, state.Captures, captureID) + require.Equal(t, "127.0.0.1:8301", state.Captures[captureID].AdvertiseAddr) + require.Empty(t, removed) + require.NotContains(t, state.toRemoveCaptures, captureID) +} + +func TestGlobalReactorStateRemovesCaptureAfterTombstoneExpires(t *testing.T) { + t.Parallel() + + state := NewGlobalState(etcd.DefaultCDCClusterID, 0) + state.captureRemoveTTL = 10 + + captureID := config.CaptureID("capture-1") + var removed []config.CaptureID + state.SetOnCaptureRemoved(func(id config.CaptureID) { + removed = append(removed, id) + }) + + mustUpdateCapture(t, state, captureID, "127.0.0.1:8300") + mustDeleteCapture(t, state, captureID) + + state.UpdatePendingChange() + require.Contains(t, state.Captures, captureID) + require.Empty(t, removed) + + state.toRemoveCaptures[captureID] = time.Now().Add(-11 * time.Second) + state.UpdatePendingChange() + + require.NotContains(t, state.Captures, captureID) + require.Equal(t, []config.CaptureID{captureID}, removed) + require.NotContains(t, state.toRemoveCaptures, captureID) +} + +func mustUpdateCapture( + t *testing.T, + state *GlobalReactorState, + captureID config.CaptureID, + advertiseAddr string, +) { + t.Helper() + + info := &config.CaptureInfo{ + ID: captureID, + AdvertiseAddr: advertiseAddr, + } + data, err := info.Marshal() + require.NoError(t, err) + + err = state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, string(captureID))), data, false) + require.NoError(t, err) +} + +func mustDeleteCapture(t *testing.T, state *GlobalReactorState, captureID config.CaptureID) { + t.Helper() + + err := state.Update(util.NewEtcdKey(etcd.GetEtcdKeyCaptureInfo(state.ClusterID, string(captureID))), nil, false) + require.NoError(t, err) +}