Skip to content

Commit

Permalink
enhance: Tidy dc channel manager (#34515)
Browse files Browse the repository at this point in the history
See also: #34518

Signed-off-by: yangxuan <xuan.yang@zilliz.com>
  • Loading branch information
XuanYang-cn committed Jul 9, 2024
1 parent c1e0453 commit 314f4d9
Show file tree
Hide file tree
Showing 9 changed files with 55 additions and 779 deletions.
12 changes: 7 additions & 5 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type ChannelManagerImpl struct {

factory ChannelPolicyFactory
balancePolicy BalanceChannelPolicy
assignPolicy AssignPolicy

balanceCheckLoop ChannelBGChecker

Expand All @@ -93,7 +94,7 @@ func withCheckerV2() ChannelmanagerOpt {
return func(c *ChannelManagerImpl) { c.balanceCheckLoop = c.CheckLoop }
}

func NewChannelManagerV2(
func NewChannelManager(
kv kv.TxnKV,
h Handler,
subCluster SubCluster, // sessionManager
Expand All @@ -117,6 +118,7 @@ func NewChannelManagerV2(
}

m.balancePolicy = m.factory.NewBalancePolicy()
m.assignPolicy = m.factory.NewAssignPolicy()
m.lastActiveTimestamp = time.Now()
return m, nil
}
Expand Down Expand Up @@ -189,7 +191,7 @@ func (m *ChannelManagerImpl) AddNode(nodeID UniqueID) error {
log.Info("register node", zap.Int64("registered node", nodeID))

m.store.AddNode(nodeID)
updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
updates := m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())

if updates == nil {
log.Info("register node with no reassignment", zap.Int64("registered node", nodeID))
Expand Down Expand Up @@ -242,7 +244,7 @@ func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {

// channel already written into meta, try to assign it to the cluster
// not error is returned if failed, the assignment will retry later
updates = AvgAssignByCountPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
updates = m.assignPolicy(m.store.GetNodesChannels(), m.store.GetBufferChannelInfo(), m.legacyNodes.Collect())
if updates == nil {
return nil
}
Expand Down Expand Up @@ -270,7 +272,7 @@ func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error {
}

updates := NewChannelOpSet(
NewDeleteOp(info.NodeID, lo.Values(info.Channels)...),
NewChannelOp(info.NodeID, Delete, lo.Values(info.Channels)...),
NewChannelOp(bufferID, Watch, lo.Values(info.Channels)...),
)
log.Info("deregister node", zap.Int64("nodeID", nodeID), zap.Array("updates", updates))
Expand All @@ -292,7 +294,7 @@ func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error {
m.mu.Lock()
defer m.mu.Unlock()

updates := AvgAssignByCountPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
updates := m.assignPolicy(m.store.GetNodesChannels(), original, m.legacyNodes.Collect())
if updates != nil {
return m.execute(updates)
}
Expand Down
34 changes: 6 additions & 28 deletions internal/datacoord/channel_manager_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,10 @@ package datacoord

// ChannelPolicyFactory is the abstract factory that creates policies for channel manager.
type ChannelPolicyFactory interface {
// NewRegisterPolicy creates a new register policy.
NewRegisterPolicy() RegisterPolicy
// NewDeregisterPolicy creates a new deregister policy.
NewDeregisterPolicy() DeregisterPolicy
// NewAssignPolicy creates a new channel assign policy.
NewAssignPolicy() ChannelAssignPolicy
// NewReassignPolicy creates a new channel reassign policy.
NewReassignPolicy() ChannelReassignPolicy
// NewBalancePolicy creates a new channel balance policy.
NewBalancePolicy() BalanceChannelPolicy

NewAssignPolicy() AssignPolicy
}

// ChannelPolicyFactoryV1 equal to policy batch
Expand All @@ -38,26 +32,10 @@ func NewChannelPolicyFactoryV1() *ChannelPolicyFactoryV1 {
return &ChannelPolicyFactoryV1{}
}

// NewRegisterPolicy implementing ChannelPolicyFactory returns BufferChannelAssignPolicy.
func (f *ChannelPolicyFactoryV1) NewRegisterPolicy() RegisterPolicy {
return AvgAssignRegisterPolicy
}

// NewDeregisterPolicy implementing ChannelPolicyFactory returns AvgAssignUnregisteredChannels.
func (f *ChannelPolicyFactoryV1) NewDeregisterPolicy() DeregisterPolicy {
return AvgAssignUnregisteredChannels
}

// NewAssignPolicy implementing ChannelPolicyFactory returns AverageAssignPolicy.
func (f *ChannelPolicyFactoryV1) NewAssignPolicy() ChannelAssignPolicy {
return AverageAssignPolicy
}

// NewReassignPolicy implementing ChannelPolicyFactory returns AverageReassignPolicy.
func (f *ChannelPolicyFactoryV1) NewReassignPolicy() ChannelReassignPolicy {
return AverageReassignPolicy
}

func (f *ChannelPolicyFactoryV1) NewBalancePolicy() BalanceChannelPolicy {
return AvgBalanceChannelPolicy
}

func (f *ChannelPolicyFactoryV1) NewAssignPolicy() AssignPolicy {
return AvgAssignByCountPolicy
}
58 changes: 29 additions & 29 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *ChannelManagerSuite) TearDownTest() {}
func (s *ChannelManagerSuite) TestAddNode() {
s.Run("AddNode with empty store", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

var testNode int64 = 1
Expand All @@ -134,7 +134,7 @@ func (s *ChannelManagerSuite) TestAddNode() {
"ch2": bufferID,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

var (
Expand Down Expand Up @@ -162,7 +162,7 @@ func (s *ChannelManagerSuite) TestAddNode() {
chNodes := map[string]int64{testChannel: storedNodeID}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)

m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

s.checkAssignment(m, storedNodeID, testChannel, Watched)
Expand All @@ -189,7 +189,7 @@ func (s *ChannelManagerSuite) TestAddNode() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)

m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

var testNodeID int64 = 100
Expand All @@ -205,7 +205,7 @@ func (s *ChannelManagerSuite) TestAddNode() {
func (s *ChannelManagerSuite) TestWatch() {
s.Run("test Watch with empty store", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

var testCh string = "ch1"
Expand All @@ -217,7 +217,7 @@ func (s *ChannelManagerSuite) TestWatch() {
})
s.Run("test Watch with nodeID in store", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

var (
Expand All @@ -238,7 +238,7 @@ func (s *ChannelManagerSuite) TestWatch() {
func (s *ChannelManagerSuite) TestRelease() {
s.Run("release not exist nodeID and channel", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

err = m.Release(1, "ch1")
Expand All @@ -253,7 +253,7 @@ func (s *ChannelManagerSuite) TestRelease() {

s.Run("release channel in bufferID", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

m.Watch(context.TODO(), getChannel("ch1", 1))
Expand All @@ -268,7 +268,7 @@ func (s *ChannelManagerSuite) TestRelease() {
func (s *ChannelManagerSuite) TestDeleteNode() {
s.Run("delete not exsit node", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
info := m.store.GetNode(1)
s.Require().Nil(info)
Expand All @@ -278,7 +278,7 @@ func (s *ChannelManagerSuite) TestDeleteNode() {
})
s.Run("delete bufferID", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
info := m.store.GetNode(bufferID)
s.Require().NotNil(info)
Expand All @@ -289,7 +289,7 @@ func (s *ChannelManagerSuite) TestDeleteNode() {

s.Run("delete node without assigment", func() {
s.prepareMeta(nil, 0)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

err = m.AddNode(1)
Expand All @@ -309,7 +309,7 @@ func (s *ChannelManagerSuite) TestDeleteNode() {
"ch3": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", Watched)
s.checkAssignment(m, 1, "ch2", Watched)
Expand Down Expand Up @@ -342,7 +342,7 @@ func (s *ChannelManagerSuite) TestFindWatcher() {
"ch4": 1,
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

tests := []struct {
Expand Down Expand Up @@ -382,7 +382,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, bufferID, "ch1", Standby)
s.checkAssignment(m, bufferID, "ch2", Standby)
Expand All @@ -400,7 +400,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_WatchSuccess)
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false).Times(2)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, bufferID, "ch1", Standby)
s.checkAssignment(m, bufferID, "ch2", Standby)
Expand All @@ -417,7 +417,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
Expand All @@ -433,7 +433,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
Expand All @@ -455,7 +455,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
Expand All @@ -478,7 +478,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
Expand All @@ -500,7 +500,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Times(2)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
Expand All @@ -527,7 +527,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
Expand All @@ -549,7 +549,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
Expand All @@ -571,7 +571,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
Expand All @@ -598,7 +598,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
Expand Down Expand Up @@ -627,7 +627,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToWatch)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("mock error")).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToWatch)
s.checkAssignment(m, 1, "ch2", ToWatch)
Expand All @@ -643,7 +643,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).Return(nil).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
Expand All @@ -660,7 +660,7 @@ func (s *ChannelManagerSuite) TestAdvanceChannelState() {
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockCluster.EXPECT().NotifyChannelOperation(mock.Anything, mock.Anything, mock.Anything).
Return(fmt.Errorf("mock error")).Twice()
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)
s.checkAssignment(m, 1, "ch1", ToRelease)
s.checkAssignment(m, 1, "ch2", ToRelease)
Expand All @@ -679,7 +679,7 @@ func (s *ChannelManagerSuite) TestStartup() {
}
s.prepareMeta(chNodes, datapb.ChannelWatchState_ToRelease)
s.mockHandler.EXPECT().CheckShouldDropChannel(mock.Anything).Return(false)
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

var (
Expand Down Expand Up @@ -717,7 +717,7 @@ func (s *ChannelManagerSuite) TestStartupRootCoordFailed() {

s.mockAlloc = NewNMockAllocator(s.T())
s.mockAlloc.EXPECT().allocID(mock.Anything).Return(0, errors.New("mock rootcoord failure"))
m, err := NewChannelManagerV2(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
m, err := NewChannelManager(s.mockKv, s.mockHandler, s.mockCluster, s.mockAlloc)
s.Require().NoError(err)

err = m.Startup(context.TODO(), nil, []int64{2})
Expand Down
Loading

0 comments on commit 314f4d9

Please sign in to comment.