Skip to content

Commit

Permalink
enhance: Remove datanode reporting TT based on MQ implementation (#34421
Browse files Browse the repository at this point in the history
)

issue: #34420

Signed-off-by: jaime <yun.zhang@zilliz.com>
  • Loading branch information
jaime0815 committed Jul 5, 2024
1 parent 0817802 commit 21fc5f5
Show file tree
Hide file tree
Showing 17 changed files with 51 additions and 813 deletions.
1 change: 0 additions & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,6 @@ dataNode:
checkInterval: 3000 # the interal to check datanode memory usage, in milliseconds
forceSyncWatermark: 0.5 # memory watermark for standalone, upon reaching this watermark, segments will be synced.
timetick:
byRPC: true
interval: 500
channel:
# specify the size of global work pool of all channels
Expand Down
70 changes: 35 additions & 35 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type SubCluster interface {
CheckChannelOperationProgress(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (*datapb.ChannelOperationProgressResponse, error)
}

type ChannelManagerImplV2 struct {
type ChannelManagerImpl struct {
cancel context.CancelFunc
mu lock.RWMutex
wg sync.WaitGroup
Expand All @@ -82,25 +82,25 @@ type ChannelManagerImplV2 struct {
// ChannelBGChecker are goroutining running background
type ChannelBGChecker func(ctx context.Context)

// ChannelmanagerOptV2 is to set optional parameters in channel manager.
type ChannelmanagerOptV2 func(c *ChannelManagerImplV2)
// ChannelmanagerOpt is to set optional parameters in channel manager.
type ChannelmanagerOpt func(c *ChannelManagerImpl)

func withFactoryV2(f ChannelPolicyFactory) ChannelmanagerOptV2 {
return func(c *ChannelManagerImplV2) { c.factory = f }
func withFactoryV2(f ChannelPolicyFactory) ChannelmanagerOpt {
return func(c *ChannelManagerImpl) { c.factory = f }
}

func withCheckerV2() ChannelmanagerOptV2 {
return func(c *ChannelManagerImplV2) { c.balanceCheckLoop = c.CheckLoop }
func withCheckerV2() ChannelmanagerOpt {
return func(c *ChannelManagerImpl) { c.balanceCheckLoop = c.CheckLoop }
}

func NewChannelManagerV2(
kv kv.TxnKV,
h Handler,
subCluster SubCluster, // sessionManager
alloc allocator,
options ...ChannelmanagerOptV2,
) (*ChannelManagerImplV2, error) {
m := &ChannelManagerImplV2{
options ...ChannelmanagerOpt,
) (*ChannelManagerImpl, error) {
m := &ChannelManagerImpl{
h: h,
factory: NewChannelPolicyFactoryV1(),
store: NewChannelStoreV2(kv),
Expand All @@ -121,7 +121,7 @@ func NewChannelManagerV2(
return m, nil
}

func (m *ChannelManagerImplV2) Startup(ctx context.Context, legacyNodes, allNodes []int64) error {
func (m *ChannelManagerImpl) Startup(ctx context.Context, legacyNodes, allNodes []int64) error {
ctx, m.cancel = context.WithCancel(ctx)

m.legacyNodes = typeutil.NewUniqueSet(legacyNodes...)
Expand Down Expand Up @@ -175,14 +175,14 @@ func (m *ChannelManagerImplV2) Startup(ctx context.Context, legacyNodes, allNode
return nil
}

func (m *ChannelManagerImplV2) Close() {
func (m *ChannelManagerImpl) Close() {
if m.cancel != nil {
m.cancel()
m.wg.Wait()
}
}

func (m *ChannelManagerImplV2) AddNode(nodeID UniqueID) error {
func (m *ChannelManagerImpl) AddNode(nodeID UniqueID) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -204,7 +204,7 @@ func (m *ChannelManagerImplV2) AddNode(nodeID UniqueID) error {
}

// Release writes ToRelease channel watch states for a channel
func (m *ChannelManagerImplV2) Release(nodeID UniqueID, channelName string) error {
func (m *ChannelManagerImpl) Release(nodeID UniqueID, channelName string) error {
log := log.With(
zap.Int64("nodeID", nodeID),
zap.String("channel", channelName),
Expand All @@ -227,7 +227,7 @@ func (m *ChannelManagerImplV2) Release(nodeID UniqueID, channelName string) erro
return m.execute(updates)
}

func (m *ChannelManagerImplV2) Watch(ctx context.Context, ch RWChannel) error {
func (m *ChannelManagerImpl) Watch(ctx context.Context, ch RWChannel) error {
log := log.Ctx(ctx).With(zap.String("channel", ch.GetName()))
m.mu.Lock()
defer m.mu.Unlock()
Expand Down Expand Up @@ -256,7 +256,7 @@ func (m *ChannelManagerImplV2) Watch(ctx context.Context, ch RWChannel) error {
return nil
}

func (m *ChannelManagerImplV2) DeleteNode(nodeID UniqueID) error {
func (m *ChannelManagerImpl) DeleteNode(nodeID UniqueID) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand Down Expand Up @@ -288,7 +288,7 @@ func (m *ChannelManagerImplV2) DeleteNode(nodeID UniqueID) error {
}

// reassign reassigns a channel to another DataNode.
func (m *ChannelManagerImplV2) reassign(original *NodeChannelInfo) error {
func (m *ChannelManagerImpl) reassign(original *NodeChannelInfo) error {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -309,7 +309,7 @@ func (m *ChannelManagerImplV2) reassign(original *NodeChannelInfo) error {
return nil
}

func (m *ChannelManagerImplV2) Balance() {
func (m *ChannelManagerImpl) Balance() {
m.mu.Lock()
defer m.mu.Unlock()

Expand All @@ -325,7 +325,7 @@ func (m *ChannelManagerImplV2) Balance() {
}
}

func (m *ChannelManagerImplV2) Match(nodeID UniqueID, channel string) bool {
func (m *ChannelManagerImpl) Match(nodeID UniqueID, channel string) bool {
m.mu.RLock()
defer m.mu.RUnlock()

Expand All @@ -338,7 +338,7 @@ func (m *ChannelManagerImplV2) Match(nodeID UniqueID, channel string) bool {
return ok
}

func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWChannel, bool) {
func (m *ChannelManagerImpl) GetChannel(nodeID int64, channelName string) (RWChannel, bool) {
m.mu.RLock()
defer m.mu.RUnlock()

Expand All @@ -350,13 +350,13 @@ func (m *ChannelManagerImplV2) GetChannel(nodeID int64, channelName string) (RWC
return nil, false
}

func (m *ChannelManagerImplV2) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
func (m *ChannelManagerImpl) GetNodeChannelsByCollectionID(collectionID int64) map[int64][]string {
m.mu.RLock()
defer m.mu.RUnlock()
return m.store.GetNodeChannelsByCollectionID(collectionID)
}

func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []RWChannel {
func (m *ChannelManagerImpl) GetChannelsByCollectionID(collectionID int64) []RWChannel {
m.mu.RLock()
defer m.mu.RUnlock()
channels := []RWChannel{}
Expand All @@ -370,14 +370,14 @@ func (m *ChannelManagerImplV2) GetChannelsByCollectionID(collectionID int64) []R
return channels
}

func (m *ChannelManagerImplV2) GetChannelNamesByCollectionID(collectionID int64) []string {
func (m *ChannelManagerImpl) GetChannelNamesByCollectionID(collectionID int64) []string {
channels := m.GetChannelsByCollectionID(collectionID)
return lo.Map(channels, func(ch RWChannel, _ int) string {
return ch.GetName()
})
}

func (m *ChannelManagerImplV2) FindWatcher(channel string) (UniqueID, error) {
func (m *ChannelManagerImpl) FindWatcher(channel string) (UniqueID, error) {
m.mu.RLock()
defer m.mu.RUnlock()

Expand All @@ -400,7 +400,7 @@ func (m *ChannelManagerImplV2) FindWatcher(channel string) (UniqueID, error) {
}

// unsafe innter func
func (m *ChannelManagerImplV2) removeChannel(nodeID int64, ch RWChannel) error {
func (m *ChannelManagerImpl) removeChannel(nodeID int64, ch RWChannel) error {
op := NewChannelOpSet(NewChannelOp(nodeID, Delete, ch))
log.Info("remove channel assignment",
zap.String("channel", ch.GetName()),
Expand All @@ -409,7 +409,7 @@ func (m *ChannelManagerImplV2) removeChannel(nodeID int64, ch RWChannel) error {
return m.store.Update(op)
}

func (m *ChannelManagerImplV2) CheckLoop(ctx context.Context) {
func (m *ChannelManagerImpl) CheckLoop(ctx context.Context) {
balanceTicker := time.NewTicker(Params.DataCoordCfg.ChannelBalanceInterval.GetAsDuration(time.Second))
defer balanceTicker.Stop()
checkTicker := time.NewTicker(Params.DataCoordCfg.ChannelCheckInterval.GetAsDuration(time.Second))
Expand All @@ -430,7 +430,7 @@ func (m *ChannelManagerImplV2) CheckLoop(ctx context.Context) {
}
}

func (m *ChannelManagerImplV2) AdvanceChannelState(ctx context.Context) {
func (m *ChannelManagerImpl) AdvanceChannelState(ctx context.Context) {
m.mu.RLock()
standbys := m.store.GetNodeChannelsBy(WithAllNodes(), WithChannelStates(Standby))
toNotifies := m.store.GetNodeChannelsBy(WithoutBufferNode(), WithChannelStates(ToWatch, ToRelease))
Expand All @@ -447,7 +447,7 @@ func (m *ChannelManagerImplV2) AdvanceChannelState(ctx context.Context) {
}
}

func (m *ChannelManagerImplV2) finishRemoveChannel(nodeID int64, channels ...RWChannel) {
func (m *ChannelManagerImpl) finishRemoveChannel(nodeID int64, channels ...RWChannel) {
m.mu.Lock()
defer m.mu.Unlock()
for _, ch := range channels {
Expand All @@ -463,7 +463,7 @@ func (m *ChannelManagerImplV2) finishRemoveChannel(nodeID int64, channels ...RWC
}
}

func (m *ChannelManagerImplV2) advanceStandbys(_ context.Context, standbys []*NodeChannelInfo) bool {
func (m *ChannelManagerImpl) advanceStandbys(_ context.Context, standbys []*NodeChannelInfo) bool {
var advanced bool = false
for _, nodeAssign := range standbys {
validChannels := make(map[string]RWChannel)
Expand Down Expand Up @@ -500,7 +500,7 @@ func (m *ChannelManagerImplV2) advanceStandbys(_ context.Context, standbys []*No
return advanced
}

func (m *ChannelManagerImplV2) advanceToNotifies(ctx context.Context, toNotifies []*NodeChannelInfo) bool {
func (m *ChannelManagerImpl) advanceToNotifies(ctx context.Context, toNotifies []*NodeChannelInfo) bool {
var advanced bool = false
for _, nodeAssign := range toNotifies {
channelCount := len(nodeAssign.Channels)
Expand Down Expand Up @@ -563,7 +563,7 @@ type poolResult struct {
ch RWChannel
}

func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
func (m *ChannelManagerImpl) advanceToChecks(ctx context.Context, toChecks []*NodeChannelInfo) bool {
var advanced bool = false
for _, nodeAssign := range toChecks {
if len(nodeAssign.Channels) == 0 {
Expand Down Expand Up @@ -615,7 +615,7 @@ func (m *ChannelManagerImplV2) advanceToChecks(ctx context.Context, toChecks []*
return advanced
}

func (m *ChannelManagerImplV2) Notify(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) error {
func (m *ChannelManagerImpl) Notify(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) error {
log := log.With(
zap.String("channel", info.GetVchan().GetChannelName()),
zap.Int64("assignment", nodeID),
Expand All @@ -631,7 +631,7 @@ func (m *ChannelManagerImplV2) Notify(ctx context.Context, nodeID int64, info *d
return nil
}

func (m *ChannelManagerImplV2) Check(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (successful bool, got bool) {
func (m *ChannelManagerImpl) Check(ctx context.Context, nodeID int64, info *datapb.ChannelWatchInfo) (successful bool, got bool) {
log := log.With(
zap.Int64("opID", info.GetOpID()),
zap.Int64("nodeID", nodeID),
Expand Down Expand Up @@ -674,7 +674,7 @@ func (m *ChannelManagerImplV2) Check(ctx context.Context, nodeID int64, info *da
return false, false
}

func (m *ChannelManagerImplV2) execute(updates *ChannelOpSet) error {
func (m *ChannelManagerImpl) execute(updates *ChannelOpSet) error {
for _, op := range updates.ops {
if op.Type != Delete {
if err := m.fillChannelWatchInfo(op); err != nil {
Expand All @@ -688,7 +688,7 @@ func (m *ChannelManagerImplV2) execute(updates *ChannelOpSet) error {
}

// fillChannelWatchInfoWithState updates the channel op by filling in channel watch info.
func (m *ChannelManagerImplV2) fillChannelWatchInfo(op *ChannelOp) error {
func (m *ChannelManagerImpl) fillChannelWatchInfo(op *ChannelOp) error {
startTs := time.Now().Unix()
for _, ch := range op.Channels {
vcInfo := m.h.GetDataVChanPositions(ch, allPartitionID)
Expand Down
4 changes: 2 additions & 2 deletions internal/datacoord/channel_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (s *ChannelManagerSuite) prepareMeta(chNodes map[string]int64, state datapb
s.mockKv.EXPECT().LoadWithPrefix(mock.Anything).Return(keys, values, nil).Once()
}

func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImplV2, nodeID int64, channel string, state ChannelState) {
func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImpl, nodeID int64, channel string, state ChannelState) {
rwChannel, found := m.GetChannel(nodeID, channel)
s.True(found)
s.NotNil(rwChannel)
Expand All @@ -84,7 +84,7 @@ func (s *ChannelManagerSuite) checkAssignment(m *ChannelManagerImplV2, nodeID in
}
}

func (s *ChannelManagerSuite) checkNoAssignment(m *ChannelManagerImplV2, nodeID int64, channel string) {
func (s *ChannelManagerSuite) checkNoAssignment(m *ChannelManagerImpl, nodeID int64, channel string) {
rwChannel, found := m.GetChannel(nodeID, channel)
s.False(found)
s.Nil(rwChannel)
Expand Down
1 change: 0 additions & 1 deletion internal/datacoord/channel_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ type ROChannelStore interface {
// GetNodeChannels for given collection
GetNodeChannelsByCollectionID(collectionID UniqueID) map[UniqueID][]string

// GetNodeChannelsBy used by channel_store_v2 and channel_manager_v2 only
GetNodeChannelsBy(nodeSelector NodeSelector, channelSelectors ...ChannelSelector) []*NodeChannelInfo
}

Expand Down
Loading

0 comments on commit 21fc5f5

Please sign in to comment.