Skip to content

Commit

Permalink
fix: unreigster node blocked in unsub channel
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Jan 8, 2024
1 parent 0c83440 commit 285c891
Showing 1 changed file with 22 additions and 23 deletions.
45 changes: 22 additions & 23 deletions internal/datacoord/channel_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import (
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/mq/msgstream"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/logutil"
)

Expand Down Expand Up @@ -375,7 +374,7 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {
return nil
}

c.unsubAttempt(nodeChannelInfo)
// c.unsubAttempt(nodeChannelInfo)

updates := c.deregisterPolicy(c.store, nodeID)
if updates == nil {
Expand Down Expand Up @@ -409,22 +408,22 @@ func (c *ChannelManager) DeleteNode(nodeID int64) error {

// unsubAttempt attempts to unsubscribe node-channel info from the channel.
func (c *ChannelManager) unsubAttempt(ncInfo *NodeChannelInfo) {
if ncInfo == nil {
return
}
// if ncInfo == nil {
// return
// }

if c.msgstreamFactory == nil {
log.Warn("msgstream factory is not set")
return
}
// if c.msgstreamFactory == nil {
// log.Warn("msgstream factory is not set")
// return
// }

nodeID := ncInfo.NodeID
for _, ch := range ncInfo.Channels {
// align to datanode subname, using vchannel name
subName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, ch.GetName())
pchannelName := funcutil.ToPhysicalChannel(ch.GetName())
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}
// nodeID := ncInfo.NodeID
// for _, ch := range ncInfo.Channels {
// // align to datanode subname, using vchannel name
// subName := fmt.Sprintf("%s-%d-%s", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, ch.GetName())
// pchannelName := funcutil.ToPhysicalChannel(ch.GetName())
// msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
// }
}

// Watch tries to add the channel to cluster. Watch is a no op if the channel already exists.
Expand Down Expand Up @@ -840,13 +839,13 @@ func (c *ChannelManager) CleanupAndReassign(nodeID UniqueID, channelName string)
}
c.mu.RUnlock()

if c.msgstreamFactory == nil {
log.Warn("msgstream factory is not set, unable to clean up topics")
} else {
subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, chToCleanUp.GetCollectionID())
pchannelName := funcutil.ToPhysicalChannel(channelName)
msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
}
// if c.msgstreamFactory == nil {
// log.Warn("msgstream factory is not set, unable to clean up topics")
// } else {
// subName := fmt.Sprintf("%s-%d-%d", Params.CommonCfg.DataNodeSubName.GetValue(), nodeID, chToCleanUp.GetCollectionID())
// pchannelName := funcutil.ToPhysicalChannel(channelName)
// msgstream.UnsubscribeChannels(c.ctx, c.msgstreamFactory, subName, []string{pchannelName})
// }

reallocates := &NodeChannelInfo{nodeID, []RWChannel{chToCleanUp}}
isDropped := c.isMarkedDrop(channelName)
Expand Down

0 comments on commit 285c891

Please sign in to comment.