diff --git a/internal/querycoordv2/services.go b/internal/querycoordv2/services.go index 742cf80ba70c..3d04dd3827bc 100644 --- a/internal/querycoordv2/services.go +++ b/internal/querycoordv2/services.go @@ -650,28 +650,53 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade return resp, nil } + currentTargets := s.targetMgr.GetHistoricalSegmentsByCollection(req.GetCollectionID(), meta.CurrentTarget) for _, channel := range channels { log := log.With(zap.String("channel", channel.GetChannelName())) leaders := s.dist.LeaderViewManager.GetLeadersByShard(channel.GetChannelName()) ids := make([]int64, 0, len(leaders)) addrs := make([]string, 0, len(leaders)) + + // In a replica, a shard is available, if and only if: + // 1. The leader is online + // 2. All QueryNodes in the distribution are online + // 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution + // 4. All segments of the shard in target should be in the distribution for _, leader := range leaders { info := s.nodeMgr.Get(leader.ID) + + // Check whether leader is online if info == nil || time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval { continue } - isAllNodeAvailable := true + + // Check whether QueryNodes are online and available + isAvailable := true for _, version := range leader.Segments { info := s.nodeMgr.Get(version.NodeID) if info == nil || time.Since(info.LastHeartbeat()) > Params.QueryCoordCfg.HeartbeatAvailableInterval { - isAllNodeAvailable = false + isAvailable = false break } } - if !isAllNodeAvailable { + + // Check whether segments are fully loaded + for segmentID, info := range currentTargets { + if info.GetInsertChannel() != leader.Channel { + continue + } + + _, exist := leader.Segments[segmentID] + if !exist { + isAvailable = false + break + } + } + if !isAvailable { continue } + ids = append(ids, info.ID()) addrs = append(addrs, info.Addr()) } @@ -690,6 +715,7 @@ func (s *Server) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeade NodeAddrs: addrs, }) } + return resp, nil } diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index 2d9a351bf761..ea715eb206c0 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -893,7 +893,7 @@ func (suite *ServiceSuite) TestGetShardLeaders() { CollectionID: collection, } - suite.fetchHeartbeats() + suite.fetchHeartbeats(time.Now()) resp, err := server.GetShardLeaders(ctx, req) suite.NoError(err) suite.Equal(commonpb.ErrorCode_Success, resp.Status.ErrorCode) @@ -913,6 +913,33 @@ func (suite *ServiceSuite) TestGetShardLeaders() { suite.Contains(resp.Status.Reason, ErrNotHealthy.Error()) } +func (suite *ServiceSuite) TestGetShardLeadersFailed() { + suite.loadAll() + ctx := context.Background() + server := suite.server + + for _, collection := range suite.collections { + suite.updateCollectionStatus(collection, querypb.LoadStatus_Loaded) + suite.updateChannelDist(collection) + req := &querypb.GetShardLeadersRequest{ + CollectionID: collection, + } + + // Last heartbeat response time too old + suite.fetchHeartbeats(time.Now().Add(-Params.QueryCoordCfg.HeartbeatAvailableInterval - 1)) + resp, err := server.GetShardLeaders(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode) + + // Segment not fully loaded + suite.updateChannelDistWithoutSegment(collection) + suite.fetchHeartbeats(time.Now()) + resp, err = server.GetShardLeaders(ctx, req) + suite.NoError(err) + suite.Equal(commonpb.ErrorCode_NoReplicaAvailable, resp.Status.ErrorCode) + } +} + func (suite *ServiceSuite) loadAll() { ctx := context.Background() for _, collection := range suite.collections { @@ -1067,6 +1094,38 @@ func (suite *ServiceSuite) updateSegmentDist(collection, node int64) { func (suite *ServiceSuite) updateChannelDist(collection int64) { channels := suite.channels[collection] + segments := lo.Flatten(lo.Values(suite.segments[collection])) + + replicas := suite.meta.ReplicaManager.GetByCollection(collection) + for _, replica := range replicas { + i := 0 + for _, node := range replica.GetNodes() { + suite.dist.ChannelDistManager.Update(node, meta.DmChannelFromVChannel(&datapb.VchannelInfo{ + CollectionID: collection, + ChannelName: channels[i], + })) + suite.dist.LeaderViewManager.Update(node, &meta.LeaderView{ + ID: node, + CollectionID: collection, + Channel: channels[i], + Segments: lo.SliceToMap(segments, func(segment int64) (int64, *querypb.SegmentDist) { + return segment, &querypb.SegmentDist{ + NodeID: node, + Version: time.Now().Unix(), + } + }), + }) + i++ + if i >= len(channels) { + break + } + } + } +} + +func (suite *ServiceSuite) updateChannelDistWithoutSegment(collection int64) { + channels := suite.channels[collection] + replicas := suite.meta.ReplicaManager.GetByCollection(collection) for _, replica := range replicas { i := 0 @@ -1112,10 +1171,10 @@ func (suite *ServiceSuite) updateCollectionStatus(collectionID int64, status que } } -func (suite *ServiceSuite) fetchHeartbeats() { +func (suite *ServiceSuite) fetchHeartbeats(time time.Time) { for _, node := range suite.nodes { node := suite.nodeMgr.Get(node) - node.SetLastHeartbeat(time.Now()) + node.SetLastHeartbeat(time) } }