Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix load hang when restart querynode many time in short time #6351

Merged
merged 8 commits into from Jul 13, 2021
41 changes: 5 additions & 36 deletions internal/distributed/querynode/service.go
Expand Up @@ -68,7 +68,7 @@ func NewServer(ctx context.Context, factory msgstream.Factory) (*Server, error)
s := &Server{
ctx: ctx1,
cancel: cancel,
querynode: qn.NewQueryNodeWithoutID(ctx, factory),
querynode: qn.NewQueryNode(ctx, factory),
grpcErrChan: make(chan error),
}
return s, nil
Expand Down Expand Up @@ -96,10 +96,6 @@ func (s *Server) init() error {
return err
}

if err := s.querynode.Register(); err != nil {
return err
}

// --- RootCoord Client ---
//ms.Params.Init()
addr := Params.RootCoordAddress
Expand Down Expand Up @@ -163,39 +159,16 @@ func (s *Server) init() error {
panic(err)
}

// --- DataCoord ---
log.Debug("QueryNode start to new DataCoordClient", zap.Any("DataCoordAddress", Params.DataCoordAddress))
dataCoord, err := dsc.NewClient(s.ctx, qn.Params.MetaRootPath, qn.Params.EtcdEndpoints)
if err != nil {
log.Debug("QueryNode new DataCoordClient failed", zap.Error(err))
panic(err)
}
if err = dataCoord.Init(); err != nil {
log.Debug("QueryNode DataCoordClient Init failed", zap.Error(err))
panic(err)
}
if err = dataCoord.Start(); err != nil {
log.Debug("QueryNode DataCoordClient Start failed", zap.Error(err))
panic(err)
}
log.Debug("QueryNode start to wait for DataCoord ready")
err = funcutil.WaitForComponentInitOrHealthy(s.ctx, dataCoord, "DataCoord", 1000000, time.Millisecond*200)
if err != nil {
log.Debug("QueryNode wait for DataCoord ready failed", zap.Error(err))
panic(err)
}
log.Debug("QueryNode report DataCoord is ready")

if err := s.SetDataCoord(dataCoord); err != nil {
panic(err)
}

s.querynode.UpdateStateCode(internalpb.StateCode_Initializing)
log.Debug("QueryNode", zap.Any("State", internalpb.StateCode_Initializing))
if err := s.querynode.Init(); err != nil {
log.Error("QueryNode init error: ", zap.Error(err))
return err
}

if err := s.querynode.Register(); err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -288,10 +261,6 @@ func (s *Server) SetIndexCoord(indexCoord types.IndexCoord) error {
return s.querynode.SetIndexCoord(indexCoord)
}

func (s *Server) SetDataCoord(dataCoord types.DataCoord) error {
return s.querynode.SetDataCoord(dataCoord)
}

func (s *Server) GetTimeTickChannel(ctx context.Context, req *internalpb.GetTimeTickChannelRequest) (*milvuspb.StringResponse, error) {
return s.querynode.GetTimeTickChannel(ctx)
}
Expand Down
94 changes: 60 additions & 34 deletions internal/querycoord/cluster.go
Expand Up @@ -78,7 +78,7 @@ func (c *queryNodeCluster) reloadFromKV() error {
}
err = c.RegisterNode(context.Background(), session, nodeID)
if err != nil {
log.Debug("query node failed to register")
log.Debug("reloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
continue
}
nodeIDs = append(nodeIDs, nodeID)
Expand Down Expand Up @@ -195,7 +195,7 @@ func (c *queryNodeCluster) ReleaseSegments(ctx context.Context, nodeID int64, in
for _, segmentID := range in.SegmentIDs {
err := c.clusterMeta.removeSegmentInfo(segmentID)
if err != nil {
log.Error("remove segmentInfo Error", zap.Any("error", err.Error()), zap.Int64("segmentID", segmentID))
log.Error("ReleaseSegments: remove segmentInfo Error", zap.Any("error", err.Error()), zap.Int64("segmentID", segmentID))
}
}
status, err := node.client.ReleaseSegments(ctx, in)
Expand All @@ -222,9 +222,9 @@ func (c *queryNodeCluster) WatchDmChannels(ctx context.Context, nodeID int64, in
for _, info := range in.Infos {
channels = append(channels, info.ChannelName)
}
log.Debug("wait queryNode watch dm channel")
log.Debug("WatchDmChannels: wait queryNode watch dm channel")
status, err := node.client.WatchDmChannels(ctx, in)
log.Debug("queryNode watch dm channel done")
log.Debug("WatchDmChannels: queryNode watch dm channel done")
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
collectionID := in.CollectionID
//c.clusterMeta.addCollection(collectionID, in.Schema)
Expand All @@ -251,6 +251,9 @@ func (c *queryNodeCluster) AddQueryChannel(ctx context.Context, nodeID int64, in
c.Lock()
defer c.Unlock()
if node, ok := c.nodes[nodeID]; ok {
if !node.isOnService() {
return nil, errors.New("node offline")
}
status, err := node.client.AddQueryChannel(ctx, in)
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
//TODO::should reopen
Expand All @@ -260,7 +263,7 @@ func (c *queryNodeCluster) AddQueryChannel(ctx context.Context, nodeID int64, in
node.addQueryChannel(collectionID, queryChannelInfo)
return status, err
}
log.Error("queryChannel for collection not assigned", zap.Int64("collectionID", collectionID))
log.Error("AddQueryChannel: queryChannel for collection not assigned", zap.Int64("collectionID", collectionID))
}
return status, err
}
Expand All @@ -272,6 +275,9 @@ func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64,
defer c.Unlock()

if node, ok := c.nodes[nodeID]; ok {
if !node.isOnService() {
return nil, errors.New("node offline")
}
status, err := node.client.RemoveQueryChannel(ctx, in)
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
//TODO::should reopen
Expand All @@ -281,7 +287,7 @@ func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64,
node.removeQueryChannel(collectionID)
return status, err
}
log.Error("queryChannel for collection not watched", zap.Int64("collectionID", collectionID))
log.Error("removeQueryChannel: queryChannel for collection not watched", zap.Int64("collectionID", collectionID))
}
return status, err
}
Expand All @@ -294,6 +300,9 @@ func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64,
defer c.Unlock()

if node, ok := c.nodes[nodeID]; ok {
if !node.isOnService() {
return nil, errors.New("node offline")
}
status, err := node.client.ReleaseCollection(ctx, in)
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
node.releaseCollection(in.CollectionID)
Expand All @@ -310,6 +319,9 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64,
defer c.Unlock()

if node, ok := c.nodes[nodeID]; ok {
if !node.isOnService() {
return nil, errors.New("node offline")
}
status, err := node.client.ReleasePartitions(ctx, in)
if err == nil && status.ErrorCode == commonpb.ErrorCode_Success {
for _, partitionID := range in.PartitionIDs {
Expand All @@ -324,8 +336,8 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64,
}

func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSegmentInfoRequest) ([]*querypb.SegmentInfo, error) {
c.Lock()
defer c.Unlock()
c.RLock()
defer c.RUnlock()

segmentInfos := make([]*querypb.SegmentInfo, 0)
nodes, err := c.getOnServiceNodes()
Expand All @@ -345,8 +357,8 @@ func (c *queryNodeCluster) getSegmentInfo(ctx context.Context, in *querypb.GetSe
}

func (c *queryNodeCluster) getNumDmChannels(nodeID int64) (int, error) {
c.Lock()
defer c.Unlock()
c.RLock()
defer c.RUnlock()

if _, ok := c.nodes[nodeID]; !ok {
return 0, errors.New("Can't find query node by nodeID ")
Expand All @@ -364,8 +376,8 @@ func (c *queryNodeCluster) getNumDmChannels(nodeID int64) (int, error) {
}

func (c *queryNodeCluster) getNumSegments(nodeID int64) (int, error) {
c.Lock()
defer c.Unlock()
c.RLock()
defer c.RUnlock()

if _, ok := c.nodes[nodeID]; !ok {
return 0, errors.New("Can't find query node by nodeID ")
Expand All @@ -384,26 +396,31 @@ func (c *queryNodeCluster) RegisterNode(ctx context.Context, session *sessionuti
c.Lock()
defer c.Unlock()

sessionJSON, err := json.Marshal(session)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id)
err = c.client.Save(key, string(sessionJSON))
if err != nil {
return err
}
node, err := newQueryNode(ctx, session.Address, id, c.client)
if err != nil {
return err
}
log.Debug("register a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address))

if _, ok := c.nodes[id]; !ok {
c.nodes[id] = node
sessionJSON, err := json.Marshal(session)
if err != nil {
return err
}
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id)
err = c.client.Save(key, string(sessionJSON))
if err != nil {
return err
}
c.nodes[id] = newQueryNode(ctx, session.Address, id, c.client)
log.Debug("RegisterNode: create a new query node", zap.Int64("nodeID", id), zap.String("address", session.Address))

go func() {
err = c.nodes[id].start()
if err != nil {
log.Error("RegisterNode: start queryNode client failed", zap.Int64("nodeID", id), zap.String("error", err.Error()))
return
}
log.Debug("RegisterNode: start queryNode success, print cluster meta info", zap.Int64("nodeID", id))
c.printMeta()
}()

return nil
}

return fmt.Errorf("node %d alredy exists in cluster", id)
}

Expand All @@ -428,16 +445,25 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
return err
}

err = c.nodes[nodeID].clearNodeInfo()
if err != nil {
return err
if _, ok := c.nodes[nodeID]; ok {
err = c.nodes[nodeID].clearNodeInfo()
if err != nil {
return err
}
delete(c.nodes, nodeID)
log.Debug("removeNodeInfo: delete nodeInfo in cluster meta and etcd", zap.Int64("nodeID", nodeID))
}
delete(c.nodes, nodeID)
log.Debug("delete nodeInfo in cluster meta and etcd", zap.Int64("nodeID", nodeID))

return nil
}

func (c *queryNodeCluster) stopNode(nodeID int64) {
if node, ok := c.nodes[nodeID]; ok {
node.stop()
log.Debug("stopNode: queryNode offline", zap.Int64("nodeID", nodeID))
}
}

func (c *queryNodeCluster) onServiceNodes() (map[int64]*queryNode, error) {
c.RLock()
defer c.RUnlock()
Expand Down
6 changes: 3 additions & 3 deletions internal/querycoord/impl.go
Expand Up @@ -110,8 +110,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle

loadCollectionTask := &LoadCollectionTask{
BaseTask: BaseTask{
ctx: ctx,
Condition: NewTaskCondition(ctx),
ctx: qc.loopCtx,
Condition: NewTaskCondition(qc.loopCtx),
triggerCondition: querypb.TriggerCondition_grpcRequest,
},
LoadCollectionRequest: req,
Expand Down Expand Up @@ -156,7 +156,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas

releaseCollectionTask := &ReleaseCollectionTask{
BaseTask: BaseTask{
ctx: ctx,
ctx: qc.loopCtx,
Condition: NewTaskCondition(qc.loopCtx),
triggerCondition: querypb.TriggerCondition_grpcRequest,
},
Expand Down
8 changes: 7 additions & 1 deletion internal/querycoord/meta_test.go
Expand Up @@ -16,10 +16,16 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/clientv3"

etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
)

func TestReplica_Release(t *testing.T) {
meta, err := newMeta(nil)
etcdClient, err := clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints})
assert.Nil(t, err)
etcdKV := etcdkv.NewEtcdKV(etcdClient, Params.MetaRootPath)
meta, err := newMeta(etcdKV)
assert.Nil(t, err)
err = meta.addCollection(1, nil)
require.NoError(t, err)
Expand Down