From ffc3cc3aa79e8f06227a46816a71b315c0e55672 Mon Sep 17 00:00:00 2001 From: wei liu Date: Thu, 27 Jun 2024 19:06:05 +0800 Subject: [PATCH] enhance: Avoid assign too much segment/channels to new querynode (#34096) issue: #34095 When a new query node comes online, the segment_checker, channel_checker, and balance_checker simultaneously attempt to allocate segments to it. If this occurs during the execution of a load task and the distribution of the new query node hasn't been updated, the query coordinator may mistakenly view the new query node as empty. As a result, it assigns segments or channels to it, potentially overloading the new query node with more segments or channels than expected. This PR measures the workload of the executing tasks on the target query node to prevent assigning an excessive number of segments to it. --------- Signed-off-by: Wei Liu --- internal/querycoordv2/balance/balance.go | 4 +- internal/querycoordv2/balance/balance_test.go | 10 +- .../channel_level_score_balancer_test.go | 8 +- .../balance/rowcount_based_balancer.go | 8 +- .../balance/rowcount_based_balancer_test.go | 7 +- .../balance/score_based_balancer.go | 15 +- .../balance/score_based_balancer_test.go | 105 ++++++++++++- internal/querycoordv2/ops_service_test.go | 15 ++ internal/querycoordv2/services_test.go | 14 +- internal/querycoordv2/task/mock_scheduler.go | 114 ++++++++------- internal/querycoordv2/task/scheduler.go | 138 +++++++++++------- 11 files changed, 308 insertions(+), 130 deletions(-) diff --git a/internal/querycoordv2/balance/balance.go b/internal/querycoordv2/balance/balance.go index 26e1acc10825..17228d6bb070 100644 --- a/internal/querycoordv2/balance/balance.go +++ b/internal/querycoordv2/balance/balance.go @@ -79,7 +79,7 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta. sort.Slice(nodesInfo, func(i, j int) bool { cnt1, cnt2 := nodesInfo[i].SegmentCnt(), nodesInfo[j].SegmentCnt() id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID() - delta1, delta2 := b.scheduler.GetNodeSegmentDelta(id1), b.scheduler.GetNodeSegmentDelta(id2) + delta1, delta2 := b.scheduler.GetSegmentTaskDelta(id1, -1), b.scheduler.GetSegmentTaskDelta(id2, -1) return cnt1+delta1 < cnt2+delta2 }) ret := make([]SegmentAssignPlan, 0, len(segments)) @@ -112,7 +112,7 @@ func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []i sort.Slice(nodesInfo, func(i, j int) bool { cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt() id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID() - delta1, delta2 := b.scheduler.GetNodeChannelDelta(id1), b.scheduler.GetNodeChannelDelta(id2) + delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1) return cnt1+delta1 < cnt2+delta2 }) ret := make([]ChannelAssignPlan, 0, len(channels)) diff --git a/internal/querycoordv2/balance/balance_test.go b/internal/querycoordv2/balance/balance_test.go index 2ded89d5ade4..543c04c5346a 100644 --- a/internal/querycoordv2/balance/balance_test.go +++ b/internal/querycoordv2/balance/balance_test.go @@ -19,6 +19,7 @@ package balance import ( "testing" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -38,6 +39,9 @@ func (suite *BalanceTestSuite) SetupTest() { nodeManager := session.NewNodeManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) suite.roundRobinBalancer = NewRoundRobinBalancer(suite.mockScheduler, nodeManager) + + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() } func (suite *BalanceTestSuite) TestAssignBalance() { @@ -85,6 +89,7 @@ func (suite *BalanceTestSuite) TestAssignBalance() { for _, c := range cases { suite.Run(c.name, func() { suite.SetupTest() + suite.mockScheduler.ExpectedCalls = nil for i := range c.nodeIDs { nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ NodeID: c.nodeIDs[i], @@ -95,7 +100,7 @@ func (suite *BalanceTestSuite) TestAssignBalance() { nodeInfo.SetState(c.states[i]) suite.roundRobinBalancer.nodeManager.Add(nodeInfo) if !nodeInfo.IsStoppingState() { - suite.mockScheduler.EXPECT().GetNodeSegmentDelta(c.nodeIDs[i]).Return(c.deltaCnts[i]) + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i]) } } plans := suite.roundRobinBalancer.AssignSegment(0, c.assignments, c.nodeIDs, false) @@ -149,6 +154,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() { for _, c := range cases { suite.Run(c.name, func() { suite.SetupTest() + suite.mockScheduler.ExpectedCalls = nil for i := range c.nodeIDs { nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ NodeID: c.nodeIDs[i], @@ -160,7 +166,7 @@ func (suite *BalanceTestSuite) TestAssignChannel() { nodeInfo.SetState(c.states[i]) suite.roundRobinBalancer.nodeManager.Add(nodeInfo) if !nodeInfo.IsStoppingState() { - suite.mockScheduler.EXPECT().GetNodeChannelDelta(c.nodeIDs[i]).Return(c.deltaCnts[i]) + suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i]) } } plans := suite.roundRobinBalancer.AssignChannel(c.assignments, c.nodeIDs, false) diff --git a/internal/querycoordv2/balance/channel_level_score_balancer_test.go b/internal/querycoordv2/balance/channel_level_score_balancer_test.go index 6e1fbddd19de..84256187a3dd 100644 --- a/internal/querycoordv2/balance/channel_level_score_balancer_test.go +++ b/internal/querycoordv2/balance/channel_level_score_balancer_test.go @@ -73,6 +73,9 @@ func (suite *ChannelLevelScoreBalancerTestSuite) SetupTest() { distManager := meta.NewDistributionManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) suite.balancer = NewChannelLevelScoreBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget) + + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() } func (suite *ChannelLevelScoreBalancerTestSuite) TearDownTest() { @@ -685,11 +688,8 @@ func (suite *ChannelLevelScoreBalancerTestSuite) TestStoppedBalance() { expectChannelPlans: []ChannelAssignPlan{}, }, } - for i, c := range cases { + for _, c := range cases { suite.Run(c.name, func() { - if i == 0 { - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() - } suite.SetupSuite() defer suite.TearDownTest() balancer := suite.balancer diff --git a/internal/querycoordv2/balance/rowcount_based_balancer.go b/internal/querycoordv2/balance/rowcount_based_balancer.go index 15ee6f80f8ae..d36815432c6a 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer.go @@ -140,6 +140,9 @@ func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []* rowcnt += int(view.NumOfGrowingRows) } + // calculate executing task cost in scheduler + rowcnt += b.scheduler.GetSegmentTaskDelta(node, -1) + // more row count, less priority nodeItem := newNodeItem(rowcnt, node) ret = append(ret, &nodeItem) @@ -152,8 +155,11 @@ func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []* for _, node := range nodeIDs { channels := b.dist.ChannelDistManager.GetByFilter(meta.WithNodeID2Channel(node)) + channelCount := len(channels) + // calculate executing task cost in scheduler + channelCount += b.scheduler.GetChannelTaskDelta(node, -1) // more channel num, less priority - nodeItem := newNodeItem(len(channels), node) + nodeItem := newNodeItem(channelCount, node) ret = append(ret, &nodeItem) } return ret diff --git a/internal/querycoordv2/balance/rowcount_based_balancer_test.go b/internal/querycoordv2/balance/rowcount_based_balancer_test.go index f3ad8d9fae3d..f6d6300512d1 100644 --- a/internal/querycoordv2/balance/rowcount_based_balancer_test.go +++ b/internal/querycoordv2/balance/rowcount_based_balancer_test.go @@ -78,6 +78,9 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() { suite.balancer = NewRowCountBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget) suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe() + + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() } func (suite *RowCountBasedBalancerTestSuite) TearDownTest() { @@ -461,7 +464,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() { balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) balancer.targetMgr.UpdateCollectionCurrentTarget(1) balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } @@ -675,7 +677,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() { suite.broker.ExpectedCalls = nil suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, c.segmentInNext, nil) balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } @@ -780,7 +781,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() { }, } - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for _, c := range cases { suite.Run(c.name, func() { suite.SetupSuite() @@ -1052,7 +1052,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() { balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) balancer.targetMgr.UpdateCollectionCurrentTarget(1) balancer.targetMgr.UpdateCollectionNextTarget(int64(1)) - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() for node, s := range c.distributions { balancer.dist.SegmentDistManager.Update(node, s...) } diff --git a/internal/querycoordv2/balance/score_based_balancer.go b/internal/querycoordv2/balance/score_based_balancer.go index 2730645fa9ab..93ffdd15d2c0 100644 --- a/internal/querycoordv2/balance/score_based_balancer.go +++ b/internal/querycoordv2/balance/score_based_balancer.go @@ -150,19 +150,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in } func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { - rowCount := 0 + nodeRowCount := 0 // calculate global sealed segment row count globalSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(nodeID)) for _, s := range globalSegments { - rowCount += int(s.GetNumOfRows()) + nodeRowCount += int(s.GetNumOfRows()) } // calculate global growing segment row count views := b.dist.LeaderViewManager.GetByFilter(meta.WithNodeID2LeaderView(nodeID)) for _, view := range views { - rowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) + nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) } + // calculate executing task cost in scheduler + nodeRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, -1) + collectionRowCount := 0 // calculate collection sealed segment row count collectionSegments := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(collectionID), meta.WithNodeID(nodeID)) @@ -175,7 +178,11 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int { for _, view := range collectionViews { collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat()) } - return collectionRowCount + int(float64(rowCount)* + + // calculate executing task cost in scheduler + collectionRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, collectionID) + + return collectionRowCount + int(float64(nodeRowCount)* params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat()) } diff --git a/internal/querycoordv2/balance/score_based_balancer_test.go b/internal/querycoordv2/balance/score_based_balancer_test.go index b209e71fa39a..20ba2e583925 100644 --- a/internal/querycoordv2/balance/score_based_balancer_test.go +++ b/internal/querycoordv2/balance/score_based_balancer_test.go @@ -73,6 +73,9 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() { distManager := meta.NewDistributionManager() suite.mockScheduler = task.NewMockScheduler(suite.T()) suite.balancer = NewScoreBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget) + + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() } func (suite *ScoreBasedBalancerTestSuite) TearDownTest() { @@ -449,6 +452,103 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() { } } +func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() { + cases := []struct { + name string + nodes []int64 + collectionID int64 + replicaID int64 + collectionsSegments []*datapb.SegmentInfo + states []session.State + shouldMock bool + distributions map[int64][]*meta.Segment + distributionChannels map[int64][]*meta.DmChannel + deltaCounts []int + expectPlans []SegmentAssignPlan + expectChannelPlans []ChannelAssignPlan + }{ + { + name: "normal balance for one collection only", + nodes: []int64{1, 2, 3}, + deltaCounts: []int{30, 0, 0}, + collectionID: 1, + replicaID: 1, + collectionsSegments: []*datapb.SegmentInfo{ + {ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1}, + }, + states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal}, + distributions: map[int64][]*meta.Segment{ + 1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}}, + 2: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2}}, + 3: { + {SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3}, + {SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 3}, + }, + }, + expectPlans: []SegmentAssignPlan{ + {Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3}, From: 3, To: 2, Replica: newReplicaDefaultRG(1)}, + }, + expectChannelPlans: []ChannelAssignPlan{}, + }, + } + + for _, c := range cases { + suite.Run(c.name, func() { + suite.SetupSuite() + defer suite.TearDownTest() + balancer := suite.balancer + + // 1. set up target for multi collections + collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID)) + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return( + nil, c.collectionsSegments, nil) + suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe() + collection.LoadPercentage = 100 + collection.Status = querypb.LoadStatus_Loaded + balancer.meta.CollectionManager.PutCollection(collection) + balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID)) + balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes)) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID) + balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID) + + // 2. set up target for distribution for multi collections + for node, s := range c.distributions { + balancer.dist.SegmentDistManager.Update(node, s...) + } + for node, v := range c.distributionChannels { + balancer.dist.ChannelDistManager.Update(node, v...) + } + + // 3. set up nodes info and resourceManager for balancer + for i := range c.nodes { + nodeInfo := session.NewNodeInfo(session.ImmutableNodeInfo{ + NodeID: c.nodes[i], + Address: "127.0.0.1:0", + Hostname: "localhost", + }) + nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]]))) + nodeInfo.SetState(c.states[i]) + suite.balancer.nodeManager.Add(nodeInfo) + suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i]) + } + utils.RecoverAllCollection(balancer.meta) + + // set node delta count + suite.mockScheduler.ExpectedCalls = nil + for i, node := range c.nodes { + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(1)).Return(c.deltaCounts[i]).Maybe() + suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(-1)).Return(c.deltaCounts[i]).Maybe() + } + + // 4. balance and verify result + segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID) + assertChannelAssignPlanElementMatch(&suite.Suite, c.expectChannelPlans, channelPlans) + assertSegmentAssignPlanElementMatch(&suite.Suite, c.expectPlans, segmentPlans) + }) + } +} + func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() { balanceCase := struct { name string @@ -658,11 +758,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() { expectChannelPlans: []ChannelAssignPlan{}, }, } - for i, c := range cases { + for _, c := range cases { suite.Run(c.name, func() { - if i == 0 { - suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe() - } suite.SetupSuite() defer suite.TearDownTest() balancer := suite.balancer diff --git a/internal/querycoordv2/ops_service_test.go b/internal/querycoordv2/ops_service_test.go index c25eb6f01718..c073bdf0f5fd 100644 --- a/internal/querycoordv2/ops_service_test.go +++ b/internal/querycoordv2/ops_service_test.go @@ -108,6 +108,9 @@ func (suite *OpsServiceSuite) SetupTest() { suite.cluster = session.NewMockCluster(suite.T()) suite.jobScheduler = job.NewScheduler() suite.taskScheduler = task.NewMockScheduler(suite.T()) + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.jobScheduler.Start() suite.balancer = balance.NewScoreBasedBalancer( suite.taskScheduler, @@ -609,6 +612,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() { // test copy mode, expect generate 1 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { actions := t.Actions() suite.Equal(len(actions), 1) @@ -626,6 +631,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() { // test transfer all segments, expect generate 4 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() counter := atomic.NewInt64(0) suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { actions := t.Actions() @@ -645,6 +652,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() { // test transfer all segment to all nodes, expect generate 4 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() counter = atomic.NewInt64(0) nodeSet := typeutil.NewUniqueSet() suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { @@ -827,6 +836,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() { // test copy mode, expect generate 1 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { actions := t.Actions() suite.Equal(len(actions), 1) @@ -844,6 +855,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() { // test transfer all channels, expect generate 4 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() counter := atomic.NewInt64(0) suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { actions := t.Actions() @@ -863,6 +876,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() { // test transfer all channels to all nodes, expect generate 4 load segment task suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() counter = atomic.NewInt64(0) nodeSet := typeutil.NewUniqueSet() suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error { diff --git a/internal/querycoordv2/services_test.go b/internal/querycoordv2/services_test.go index f7d12cce6d27..782b672ab517 100644 --- a/internal/querycoordv2/services_test.go +++ b/internal/querycoordv2/services_test.go @@ -165,6 +165,8 @@ func (suite *ServiceSuite) SetupTest() { suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe() suite.jobScheduler = job.NewScheduler() suite.taskScheduler = task.NewMockScheduler(suite.T()) + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.jobScheduler.Start() suite.balancer = balance.NewRowCountBasedBalancer( suite.taskScheduler, @@ -1211,7 +1213,9 @@ func (suite *ServiceSuite) TestLoadBalance() { DstNodeIDs: []int64{dstNode}, SealedSegmentIDs: segments, } - suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0) + suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) { actions := task.Actions() suite.Len(actions, 2) @@ -1256,7 +1260,9 @@ func (suite *ServiceSuite) TestLoadBalanceWithNoDstNode() { SourceNodeIDs: []int64{srcNode}, SealedSegmentIDs: segments, } - suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0) + suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) { actions := task.Actions() suite.Len(actions, 2) @@ -1337,7 +1343,9 @@ func (suite *ServiceSuite) TestLoadBalanceWithEmptySegmentList() { SourceNodeIDs: []int64{srcNode}, DstNodeIDs: []int64{dstNode}, } - suite.taskScheduler.ExpectedCalls = make([]*mock.Call, 0) + suite.taskScheduler.ExpectedCalls = nil + suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() + suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe() suite.taskScheduler.EXPECT().Add(mock.Anything).Run(func(t task.Task) { actions := t.Actions() suite.Len(actions, 2) diff --git a/internal/querycoordv2/task/mock_scheduler.go b/internal/querycoordv2/task/mock_scheduler.go index 8d4735534409..f3eb7bd69eb5 100644 --- a/internal/querycoordv2/task/mock_scheduler.go +++ b/internal/querycoordv2/task/mock_scheduler.go @@ -125,6 +125,49 @@ func (_c *MockScheduler_Dispatch_Call) RunAndReturn(run func(int64)) *MockSchedu return _c } +// GetChannelTaskDelta provides a mock function with given fields: nodeID, collectionID +func (_m *MockScheduler) GetChannelTaskDelta(nodeID int64, collectionID int64) int { + ret := _m.Called(nodeID, collectionID) + + var r0 int + if rf, ok := ret.Get(0).(func(int64, int64) int); ok { + r0 = rf(nodeID, collectionID) + } else { + r0 = ret.Get(0).(int) + } + + return r0 +} + +// MockScheduler_GetChannelTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetChannelTaskDelta' +type MockScheduler_GetChannelTaskDelta_Call struct { + *mock.Call +} + +// GetChannelTaskDelta is a helper method to define mock.On call +// - nodeID int64 +// - collectionID int64 +func (_e *MockScheduler_Expecter) GetChannelTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetChannelTaskDelta_Call { + return &MockScheduler_GetChannelTaskDelta_Call{Call: _e.mock.On("GetChannelTaskDelta", nodeID, collectionID)} +} + +func (_c *MockScheduler_GetChannelTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetChannelTaskDelta_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].(int64)) + }) + return _c +} + +func (_c *MockScheduler_GetChannelTaskDelta_Call) Return(_a0 int) *MockScheduler_GetChannelTaskDelta_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockScheduler_GetChannelTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetChannelTaskDelta_Call { + _c.Call.Return(run) + return _c +} + // GetChannelTaskNum provides a mock function with given fields: func (_m *MockScheduler) GetChannelTaskNum() int { ret := _m.Called() @@ -210,55 +253,13 @@ func (_c *MockScheduler_GetExecutedFlag_Call) RunAndReturn(run func(int64) <-cha return _c } -// GetNodeChannelDelta provides a mock function with given fields: nodeID -func (_m *MockScheduler) GetNodeChannelDelta(nodeID int64) int { - ret := _m.Called(nodeID) - - var r0 int - if rf, ok := ret.Get(0).(func(int64) int); ok { - r0 = rf(nodeID) - } else { - r0 = ret.Get(0).(int) - } - - return r0 -} - -// MockScheduler_GetNodeChannelDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeChannelDelta' -type MockScheduler_GetNodeChannelDelta_Call struct { - *mock.Call -} - -// GetNodeChannelDelta is a helper method to define mock.On call -// - nodeID int64 -func (_e *MockScheduler_Expecter) GetNodeChannelDelta(nodeID interface{}) *MockScheduler_GetNodeChannelDelta_Call { - return &MockScheduler_GetNodeChannelDelta_Call{Call: _e.mock.On("GetNodeChannelDelta", nodeID)} -} - -func (_c *MockScheduler_GetNodeChannelDelta_Call) Run(run func(nodeID int64)) *MockScheduler_GetNodeChannelDelta_Call { - _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) - }) - return _c -} - -func (_c *MockScheduler_GetNodeChannelDelta_Call) Return(_a0 int) *MockScheduler_GetNodeChannelDelta_Call { - _c.Call.Return(_a0) - return _c -} - -func (_c *MockScheduler_GetNodeChannelDelta_Call) RunAndReturn(run func(int64) int) *MockScheduler_GetNodeChannelDelta_Call { - _c.Call.Return(run) - return _c -} - -// GetNodeSegmentDelta provides a mock function with given fields: nodeID -func (_m *MockScheduler) GetNodeSegmentDelta(nodeID int64) int { - ret := _m.Called(nodeID) +// GetSegmentTaskDelta provides a mock function with given fields: nodeID, collectionID +func (_m *MockScheduler) GetSegmentTaskDelta(nodeID int64, collectionID int64) int { + ret := _m.Called(nodeID, collectionID) var r0 int - if rf, ok := ret.Get(0).(func(int64) int); ok { - r0 = rf(nodeID) + if rf, ok := ret.Get(0).(func(int64, int64) int); ok { + r0 = rf(nodeID, collectionID) } else { r0 = ret.Get(0).(int) } @@ -266,30 +267,31 @@ func (_m *MockScheduler) GetNodeSegmentDelta(nodeID int64) int { return r0 } -// MockScheduler_GetNodeSegmentDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetNodeSegmentDelta' -type MockScheduler_GetNodeSegmentDelta_Call struct { +// MockScheduler_GetSegmentTaskDelta_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSegmentTaskDelta' +type MockScheduler_GetSegmentTaskDelta_Call struct { *mock.Call } -// GetNodeSegmentDelta is a helper method to define mock.On call +// GetSegmentTaskDelta is a helper method to define mock.On call // - nodeID int64 -func (_e *MockScheduler_Expecter) GetNodeSegmentDelta(nodeID interface{}) *MockScheduler_GetNodeSegmentDelta_Call { - return &MockScheduler_GetNodeSegmentDelta_Call{Call: _e.mock.On("GetNodeSegmentDelta", nodeID)} +// - collectionID int64 +func (_e *MockScheduler_Expecter) GetSegmentTaskDelta(nodeID interface{}, collectionID interface{}) *MockScheduler_GetSegmentTaskDelta_Call { + return &MockScheduler_GetSegmentTaskDelta_Call{Call: _e.mock.On("GetSegmentTaskDelta", nodeID, collectionID)} } -func (_c *MockScheduler_GetNodeSegmentDelta_Call) Run(run func(nodeID int64)) *MockScheduler_GetNodeSegmentDelta_Call { +func (_c *MockScheduler_GetSegmentTaskDelta_Call) Run(run func(nodeID int64, collectionID int64)) *MockScheduler_GetSegmentTaskDelta_Call { _c.Call.Run(func(args mock.Arguments) { - run(args[0].(int64)) + run(args[0].(int64), args[1].(int64)) }) return _c } -func (_c *MockScheduler_GetNodeSegmentDelta_Call) Return(_a0 int) *MockScheduler_GetNodeSegmentDelta_Call { +func (_c *MockScheduler_GetSegmentTaskDelta_Call) Return(_a0 int) *MockScheduler_GetSegmentTaskDelta_Call { _c.Call.Return(_a0) return _c } -func (_c *MockScheduler_GetNodeSegmentDelta_Call) RunAndReturn(run func(int64) int) *MockScheduler_GetNodeSegmentDelta_Call { +func (_c *MockScheduler_GetSegmentTaskDelta_Call) RunAndReturn(run func(int64, int64) int) *MockScheduler_GetSegmentTaskDelta_Call { _c.Call.Return(run) return _c } diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 6bec7532db0f..d167f864ddf0 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -141,11 +141,12 @@ type Scheduler interface { Add(task Task) error Dispatch(node int64) RemoveByNode(node int64) - GetNodeSegmentDelta(nodeID int64) int - GetNodeChannelDelta(nodeID int64) int GetExecutedFlag(nodeID int64) <-chan struct{} GetChannelTaskNum() int GetSegmentTaskNum() int + + GetSegmentTaskDelta(nodeID int64, collectionID int64) int + GetChannelTaskDelta(nodeID int64, collectionID int64) int } type taskScheduler struct { @@ -166,6 +167,11 @@ type taskScheduler struct { channelTasks map[replicaChannelIndex]Task processQueue *taskQueue waitQueue *taskQueue + + // executing task delta changes on node: nodeID -> collectionID -> delta changes + // delta changes measure by segment row count and channel num + segmentExecutingTaskDelta map[int64]map[int64]int + channelExecutingTaskDelta map[int64]map[int64]int } func NewScheduler(ctx context.Context, @@ -192,11 +198,13 @@ func NewScheduler(ctx context.Context, cluster: cluster, nodeMgr: nodeMgr, - tasks: make(UniqueSet), - segmentTasks: make(map[replicaSegmentIndex]Task), - channelTasks: make(map[replicaChannelIndex]Task), - processQueue: newTaskQueue(), - waitQueue: newTaskQueue(), + tasks: make(UniqueSet), + segmentTasks: make(map[replicaSegmentIndex]Task), + channelTasks: make(map[replicaChannelIndex]Task), + processQueue: newTaskQueue(), + waitQueue: newTaskQueue(), + segmentExecutingTaskDelta: make(map[int64]map[int64]int), + channelExecutingTaskDelta: make(map[int64]map[int64]int), } } @@ -209,6 +217,8 @@ func (scheduler *taskScheduler) Stop() { for nodeID, executor := range scheduler.executors { executor.Stop() delete(scheduler.executors, nodeID) + delete(scheduler.segmentExecutingTaskDelta, nodeID) + delete(scheduler.channelExecutingTaskDelta, nodeID) } for _, task := range scheduler.segmentTasks { @@ -234,6 +244,8 @@ func (scheduler *taskScheduler) AddExecutor(nodeID int64) { scheduler.cluster, scheduler.nodeMgr) + scheduler.segmentExecutingTaskDelta[nodeID] = make(map[int64]int) + scheduler.channelExecutingTaskDelta[nodeID] = make(map[int64]int) scheduler.executors[nodeID] = executor executor.Start(scheduler.ctx) log.Info("add executor for new QueryNode", zap.Int64("nodeID", nodeID)) @@ -247,6 +259,8 @@ func (scheduler *taskScheduler) RemoveExecutor(nodeID int64) { if ok { executor.Stop() delete(scheduler.executors, nodeID) + delete(scheduler.segmentExecutingTaskDelta, nodeID) + delete(scheduler.channelExecutingTaskDelta, nodeID) log.Info("remove executor of offline QueryNode", zap.Int64("nodeID", nodeID)) } } @@ -279,11 +293,52 @@ func (scheduler *taskScheduler) Add(task Task) error { } scheduler.updateTaskMetrics() + scheduler.updateTaskDelta(task) + log.Ctx(task.Context()).Info("task added", zap.String("task", task.String())) task.RecordStartTs() return nil } +func (scheduler *taskScheduler) updateTaskDelta(task Task) { + var delta int + var deltaMap map[int64]map[int64]int + switch task := task.(type) { + case *SegmentTask: + // skip growing segment's count, cause doesn't know realtime row number of growing segment + if task.Actions()[0].(*SegmentAction).Scope() == querypb.DataScope_Historical { + segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), task.SegmentID(), meta.NextTargetFirst) + if segment != nil { + delta = int(segment.GetNumOfRows()) + } + } + + deltaMap = scheduler.segmentExecutingTaskDelta + + case *ChannelTask: + delta = 1 + deltaMap = scheduler.channelExecutingTaskDelta + } + + // turn delta to negative when try to remove task + if task.Status() == TaskStatusSucceeded || task.Status() == TaskStatusFailed || task.Status() == TaskStatusCanceled { + delta = -delta + } + + if delta != 0 { + for _, action := range task.Actions() { + if deltaMap[action.Node()] == nil { + deltaMap[action.Node()] = make(map[int64]int) + } + if action.Type() == ActionTypeGrow { + deltaMap[action.Node()][task.CollectionID()] += delta + } else if action.Type() == ActionTypeReduce { + deltaMap[action.Node()][task.CollectionID()] -= delta + } + } + } +} + func (scheduler *taskScheduler) updateTaskMetrics() { segmentGrowNum, segmentReduceNum, segmentMoveNum := 0, 0, 0 channelGrowNum, channelReduceNum, channelMoveNum := 0, 0, 0 @@ -474,18 +529,39 @@ func (scheduler *taskScheduler) Dispatch(node int64) { } } -func (scheduler *taskScheduler) GetNodeSegmentDelta(nodeID int64) int { +func (scheduler *taskScheduler) GetSegmentTaskDelta(nodeID, collectionID int64) int { scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - return calculateNodeDelta(nodeID, scheduler.segmentTasks) + return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.segmentExecutingTaskDelta) } -func (scheduler *taskScheduler) GetNodeChannelDelta(nodeID int64) int { +func (scheduler *taskScheduler) GetChannelTaskDelta(nodeID, collectionID int64) int { scheduler.rwmutex.RLock() defer scheduler.rwmutex.RUnlock() - return calculateNodeDelta(nodeID, scheduler.channelTasks) + return scheduler.calculateTaskDelta(nodeID, collectionID, scheduler.channelExecutingTaskDelta) +} + +func (scheduler *taskScheduler) calculateTaskDelta(nodeID, collectionID int64, deltaMap map[int64]map[int64]int) int { + if nodeID == -1 && collectionID == -1 { + return 0 + } + + sum := 0 + for nid, nInfo := range deltaMap { + if nid != nodeID && -1 != nodeID { + continue + } + + for cid, cInfo := range nInfo { + if cid == collectionID || -1 == collectionID { + sum += cInfo + } + } + } + + return sum } func (scheduler *taskScheduler) GetExecutedFlag(nodeID int64) <-chan struct{} { @@ -514,45 +590,6 @@ func (scheduler *taskScheduler) GetSegmentTaskNum() int { return len(scheduler.segmentTasks) } -func calculateNodeDelta[K comparable, T ~map[K]Task](nodeID int64, tasks T) int { - delta := 0 - for _, task := range tasks { - for _, action := range task.Actions() { - if action.Node() != nodeID { - continue - } - if action.Type() == ActionTypeGrow { - delta++ - } else if action.Type() == ActionTypeReduce { - delta-- - } - } - } - return delta -} - -func (scheduler *taskScheduler) GetNodeSegmentCntDelta(nodeID int64) int { - scheduler.rwmutex.RLock() - defer scheduler.rwmutex.RUnlock() - - delta := 0 - for _, task := range scheduler.segmentTasks { - for _, action := range task.Actions() { - if action.Node() != nodeID { - continue - } - segmentAction := action.(*SegmentAction) - segment := scheduler.targetMgr.GetSealedSegment(task.CollectionID(), segmentAction.SegmentID(), meta.NextTarget) - if action.Type() == ActionTypeGrow { - delta += int(segment.GetNumOfRows()) - } else { - delta -= int(segment.GetNumOfRows()) - } - } - } - return delta -} - // schedule selects some tasks to execute, follow these steps for each started selected tasks: // 1. check whether this task is stale, set status to canceled if stale // 2. step up the task's actions, set status to succeeded if all actions finished @@ -811,6 +848,7 @@ func (scheduler *taskScheduler) remove(task Task) { log = log.With(zap.Int64("segmentID", task.SegmentID())) } + scheduler.updateTaskDelta(task) scheduler.updateTaskMetrics() log.Info("task removed")