diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index ac4fd0dff358..199e6ebd68eb 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -134,6 +134,7 @@ func (h *ServerHandler) GetQueryVChanPositions(channel RWChannel, partitionIDs . zap.String("channel", channel.GetName()), zap.Int("numOfSegments", len(segments)), zap.Int("indexed segment", len(indexedSegments)), + zap.Int64("currentPartitionStatsVersion", currentPartitionStatsVersion), ) unIndexedIDs := make(typeutil.UniqueSet) diff --git a/internal/querycoordv2/checkers/balance_checker.go b/internal/querycoordv2/checkers/balance_checker.go index 81c7c9627163..f8b496b04488 100644 --- a/internal/querycoordv2/checkers/balance_checker.go +++ b/internal/querycoordv2/checkers/balance_checker.go @@ -128,7 +128,7 @@ func (b *BalanceChecker) replicasToBalance() []int64 { hasUnbalancedCollection := false for _, cid := range loadedCollections { if b.normalBalanceCollectionsCurrentRound.Contain(cid) { - log.Debug("ScoreBasedBalancer has balanced collection, skip balancing in this round", + log.Debug("ScoreBasedBalancer is balancing this collection, skip balancing in this round", zap.Int64("collectionID", cid)) continue } diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 0cf149fdf8b2..bb30c407e319 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -71,7 +71,7 @@ func NewCheckerController( utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr), // todo temporary work around must fix // utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, true), - utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr, Params.QueryNodeCfg.EnableSyncPartitionStats.GetAsBool()), + utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr), } manualCheckChs := map[utils.CheckerType]chan struct{}{ diff --git a/internal/querycoordv2/checkers/leader_checker.go b/internal/querycoordv2/checkers/leader_checker.go index bb6c562665b8..054c65ea3b00 100644 --- a/internal/querycoordv2/checkers/leader_checker.go +++ b/internal/querycoordv2/checkers/leader_checker.go @@ -35,11 +35,10 @@ var _ Checker = (*LeaderChecker)(nil) // LeaderChecker perform segment index check. type LeaderChecker struct { *checkerActivation - meta *meta.Meta - dist *meta.DistributionManager - target *meta.TargetManager - nodeMgr *session.NodeManager - enableSyncPartitionStats bool + meta *meta.Meta + dist *meta.DistributionManager + target *meta.TargetManager + nodeMgr *session.NodeManager } func NewLeaderChecker( @@ -47,15 +46,13 @@ func NewLeaderChecker( dist *meta.DistributionManager, target *meta.TargetManager, nodeMgr *session.NodeManager, - enableSyncPartitionStats bool, ) *LeaderChecker { return &LeaderChecker{ - checkerActivation: newCheckerActivation(), - meta: meta, - dist: dist, - target: target, - nodeMgr: nodeMgr, - enableSyncPartitionStats: enableSyncPartitionStats, + checkerActivation: newCheckerActivation(), + meta: meta, + dist: dist, + target: target, + nodeMgr: nodeMgr, } } @@ -100,9 +97,7 @@ func (c *LeaderChecker) Check(ctx context.Context) []task.Task { dist := c.dist.SegmentDistManager.GetByFilter(meta.WithChannel(leaderView.Channel), meta.WithReplica(replica)) tasks = append(tasks, c.findNeedLoadedSegments(ctx, replica, leaderView, dist)...) tasks = append(tasks, c.findNeedRemovedSegments(ctx, replica, leaderView, dist)...) - if c.enableSyncPartitionStats { - tasks = append(tasks, c.findNeedSyncPartitionStats(ctx, replica, leaderView, node)...) - } + tasks = append(tasks, c.findNeedSyncPartitionStats(ctx, replica, leaderView, node)...) } } } @@ -127,22 +122,24 @@ func (c *LeaderChecker) findNeedSyncPartitionStats(ctx context.Context, replica partStatsToUpdate[partID] = psVersionInTarget } } + if len(partStatsToUpdate) > 0 { + action := task.NewLeaderUpdatePartStatsAction(leaderView.ID, nodeID, task.ActionTypeUpdate, leaderView.Channel, partStatsToUpdate) - action := task.NewLeaderUpdatePartStatsAction(leaderView.ID, nodeID, task.ActionTypeUpdate, leaderView.Channel, partStatsToUpdate) + t := task.NewLeaderPartStatsTask( + ctx, + c.ID(), + leaderView.CollectionID, + replica, + leaderView.ID, + action, + ) - t := task.NewLeaderPartStatsTask( - ctx, - c.ID(), - leaderView.CollectionID, - replica, - leaderView.ID, - action, - ) + // leader task shouldn't replace executing segment task + t.SetPriority(task.TaskPriorityLow) + t.SetReason("sync partition stats versions") + ret = append(ret, t) + } - // leader task shouldn't replace executing segment task - t.SetPriority(task.TaskPriorityLow) - t.SetReason("sync partition stats versions") - ret = append(ret, t) return ret } diff --git a/internal/querycoordv2/checkers/leader_checker_test.go b/internal/querycoordv2/checkers/leader_checker_test.go index b8a9b90ebed8..f3cac8b0e522 100644 --- a/internal/querycoordv2/checkers/leader_checker_test.go +++ b/internal/querycoordv2/checkers/leader_checker_test.go @@ -75,7 +75,7 @@ func (suite *LeaderCheckerTestSuite) SetupTest() { distManager := meta.NewDistributionManager() targetManager := meta.NewTargetManager(suite.broker, suite.meta) - suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr, false) + suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr) } func (suite *LeaderCheckerTestSuite) TearDownTest() { @@ -476,10 +476,6 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() { func (suite *LeaderCheckerTestSuite) TestUpdatePartitionStats() { testChannel := "test-insert-channel" - suite.checker.enableSyncPartitionStats = true - defer func() { - suite.checker.enableSyncPartitionStats = false - }() leaderID := int64(2) observer := suite.checker observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index ddaa673177a7..5f9dc7250a58 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -17,6 +17,8 @@ package task import ( + "reflect" + "github.com/samber/lo" "go.uber.org/atomic" @@ -225,6 +227,8 @@ func (action *LeaderAction) IsFinished(distMgr *meta.DistributionManager) bool { return action.rpcReturned.Load() && dist != nil && dist.NodeID == action.Node() case ActionTypeReduce: return action.rpcReturned.Load() && (dist == nil || dist.NodeID != action.Node()) + case ActionTypeUpdate: + return action.rpcReturned.Load() && (dist != nil && reflect.DeepEqual(action.partStatsVersions, view.PartitionStatsVersions)) } return false } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 0d1b1e4474db..5c566d854198 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2168,11 +2168,9 @@ type queryNodeConfig struct { MemoryIndexLoadPredictMemoryUsageFactor ParamItem `refreshable:"true"` EnableSegmentPrune ParamItem `refreshable:"false"` - // todo temporary work around must fix - EnableSyncPartitionStats ParamItem `refreshable:"false"` - DefaultSegmentFilterRatio ParamItem `refreshable:"false"` - UseStreamComputing ParamItem `refreshable:"false"` - QueryStreamBatchSize ParamItem `refreshable:"false"` + DefaultSegmentFilterRatio ParamItem `refreshable:"false"` + UseStreamComputing ParamItem `refreshable:"false"` + QueryStreamBatchSize ParamItem `refreshable:"false"` } func (p *queryNodeConfig) init(base *BaseTable) { @@ -2741,16 +2739,6 @@ user-task-polling: Export: true, } p.EnableSegmentPrune.Init(base.mgr) - - p.EnableSyncPartitionStats = ParamItem{ - Key: "queryNode.enableSyncPartitionStats", - Version: "2.4.4", - DefaultValue: "false", - Doc: "enable sync partitionStats", - Export: true, - } - p.EnableSyncPartitionStats.Init(base.mgr) - p.DefaultSegmentFilterRatio = ParamItem{ Key: "queryNode.defaultSegmentFilterRatio", Version: "2.4.0", diff --git a/tests/integration/balance/balance_test.go b/tests/integration/balance/balance_test.go index 9f0321d6f638..b0df436e6843 100644 --- a/tests/integration/balance/balance_test.go +++ b/tests/integration/balance/balance_test.go @@ -53,15 +53,12 @@ func (s *BalanceTestSuit) SetupSuite() { // disable compaction paramtable.Get().Save(paramtable.Get().DataCoordCfg.EnableCompaction.Key, "false") - // todo @wayblink repair this test - // paramtable.Get().Save(paramtable.Get().QueryNodeCfg.EnableSyncPartitionStats.Key, "false") s.Require().NoError(s.SetupEmbedEtcd()) } func (s *BalanceTestSuit) TearDownSuite() { defer paramtable.Get().Reset(paramtable.Get().DataCoordCfg.EnableCompaction.Key) - // defer paramtable.Get().Reset(paramtable.Get().QueryNodeCfg.EnableSyncPartitionStats.Key) s.MiniClusterSuite.TearDownSuite() }