Skip to content

Commit

Permalink
enhance: Avoid assign too much segment/channels to new querynode (#34096
Browse files Browse the repository at this point in the history
) (#34461)

issue: #34095
pr: #34096

When a new query node comes online, the segment_checker,
channel_checker, and balance_checker simultaneously attempt to allocate
segments to it. If this occurs during the execution of a load task and
the distribution of the new query node hasn't been updated, the query
coordinator may mistakenly view the new query node as empty. As a
result, it assigns segments or channels to it, potentially overloading
the new query node with more segments or channels than expected.

This PR measures the workload of the executing tasks on the target query
node to prevent assigning an excessive number of segments to it.

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Jul 10, 2024
1 parent 0bfa1a7 commit d3d1920
Show file tree
Hide file tree
Showing 10 changed files with 304 additions and 127 deletions.
4 changes: 2 additions & 2 deletions internal/querycoordv2/balance/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ func (b *RoundRobinBalancer) AssignSegment(collectionID int64, segments []*meta.
sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].SegmentCnt(), nodesInfo[j].SegmentCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetNodeSegmentDelta(id1), b.scheduler.GetNodeSegmentDelta(id2)
delta1, delta2 := b.scheduler.GetSegmentTaskDelta(id1, -1), b.scheduler.GetSegmentTaskDelta(id2, -1)
return cnt1+delta1 < cnt2+delta2
})
ret := make([]SegmentAssignPlan, 0, len(segments))
Expand Down Expand Up @@ -114,7 +114,7 @@ func (b *RoundRobinBalancer) AssignChannel(channels []*meta.DmChannel, nodes []i
sort.Slice(nodesInfo, func(i, j int) bool {
cnt1, cnt2 := nodesInfo[i].ChannelCnt(), nodesInfo[j].ChannelCnt()
id1, id2 := nodesInfo[i].ID(), nodesInfo[j].ID()
delta1, delta2 := b.scheduler.GetNodeChannelDelta(id1), b.scheduler.GetNodeChannelDelta(id2)
delta1, delta2 := b.scheduler.GetChannelTaskDelta(id1, -1), b.scheduler.GetChannelTaskDelta(id2, -1)
return cnt1+delta1 < cnt2+delta2
})
ret := make([]ChannelAssignPlan, 0, len(channels))
Expand Down
10 changes: 8 additions & 2 deletions internal/querycoordv2/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package balance
import (
"testing"

"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus/internal/proto/datapb"
Expand All @@ -37,6 +38,9 @@ func (suite *BalanceTestSuite) SetupTest() {
nodeManager := session.NewNodeManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.roundRobinBalancer = NewRoundRobinBalancer(suite.mockScheduler, nodeManager)

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *BalanceTestSuite) TestAssignBalance() {
Expand Down Expand Up @@ -84,13 +88,14 @@ func (suite *BalanceTestSuite) TestAssignBalance() {
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupTest()
suite.mockScheduler.ExpectedCalls = nil
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0")
nodeInfo.UpdateStats(session.WithSegmentCnt(c.segmentCnts[i]))
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)
if !nodeInfo.IsStoppingState() {
suite.mockScheduler.EXPECT().GetNodeSegmentDelta(c.nodeIDs[i]).Return(c.deltaCnts[i])
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignSegment(0, c.assignments, c.nodeIDs, false)
Expand Down Expand Up @@ -144,13 +149,14 @@ func (suite *BalanceTestSuite) TestAssignChannel() {
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupTest()
suite.mockScheduler.ExpectedCalls = nil
for i := range c.nodeIDs {
nodeInfo := session.NewNodeInfo(c.nodeIDs[i], "127.0.0.1:0")
nodeInfo.UpdateStats(session.WithChannelCnt(c.channelCnts[i]))
nodeInfo.SetState(c.states[i])
suite.roundRobinBalancer.nodeManager.Add(nodeInfo)
if !nodeInfo.IsStoppingState() {
suite.mockScheduler.EXPECT().GetNodeChannelDelta(c.nodeIDs[i]).Return(c.deltaCnts[i])
suite.mockScheduler.EXPECT().GetChannelTaskDelta(c.nodeIDs[i], int64(-1)).Return(c.deltaCnts[i])
}
}
plans := suite.roundRobinBalancer.AssignChannel(c.assignments, c.nodeIDs, false)
Expand Down
8 changes: 7 additions & 1 deletion internal/querycoordv2/balance/rowcount_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ func (b *RowCountBasedBalancer) convertToNodeItemsBySegment(nodeIDs []int64) []*
rowcnt += int(view.NumOfGrowingRows)
}

// calculate executing task cost in scheduler
rowcnt += b.scheduler.GetSegmentTaskDelta(node, -1)

// more row count, less priority
nodeItem := newNodeItem(rowcnt, node)
ret = append(ret, &nodeItem)
Expand All @@ -150,8 +153,11 @@ func (b *RowCountBasedBalancer) convertToNodeItemsByChannel(nodeIDs []int64) []*
node := nodeInfo.ID()
channels := b.dist.ChannelDistManager.GetByNode(node)

channelCount := len(channels)
// calculate executing task cost in scheduler
channelCount += b.scheduler.GetChannelTaskDelta(node, -1)
// more channel num, less priority
nodeItem := newNodeItem(len(channels), node)
nodeItem := newNodeItem(channelCount, node)
ret = append(ret, &nodeItem)
}
return ret
Expand Down
7 changes: 3 additions & 4 deletions internal/querycoordv2/balance/rowcount_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ func (suite *RowCountBasedBalancerTestSuite) SetupTest() {
suite.balancer = NewRowCountBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)

suite.broker.EXPECT().GetPartitions(mock.Anything, int64(1)).Return([]int64{1}, nil).Maybe()

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *RowCountBasedBalancerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -451,7 +454,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalance() {
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
Expand Down Expand Up @@ -658,7 +660,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOnPartStopping() {
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(nil, c.segmentInNext, nil)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
Expand Down Expand Up @@ -756,7 +757,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestBalanceOutboundNodes() {
},
}

suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
Expand Down Expand Up @@ -1020,7 +1020,6 @@ func (suite *RowCountBasedBalancerTestSuite) TestDisableBalanceChannel() {
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
balancer.targetMgr.UpdateCollectionCurrentTarget(1)
balancer.targetMgr.UpdateCollectionNextTarget(int64(1))
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
Expand Down
15 changes: 11 additions & 4 deletions internal/querycoordv2/balance/score_based_balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,19 +148,22 @@ func (b *ScoreBasedBalancer) convertToNodeItems(collectionID int64, nodeIDs []in
}

func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
rowCount := 0
nodeRowCount := 0
// calculate global sealed segment row count
globalSegments := b.dist.SegmentDistManager.GetByNode(nodeID)
for _, s := range globalSegments {
rowCount += int(s.GetNumOfRows())
nodeRowCount += int(s.GetNumOfRows())
}

// calculate global growing segment row count
views := b.dist.GetLeaderView(nodeID)
for _, view := range views {
rowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
nodeRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
}

// calculate executing task cost in scheduler
nodeRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, -1)

collectionRowCount := 0
// calculate collection sealed segment row count
collectionSegments := b.dist.SegmentDistManager.GetByCollectionAndNode(collectionID, nodeID)
Expand All @@ -173,7 +176,11 @@ func (b *ScoreBasedBalancer) calculateScore(collectionID, nodeID int64) int {
for _, view := range collectionViews {
collectionRowCount += int(float64(view.NumOfGrowingRows) * params.Params.QueryCoordCfg.GrowingRowCountWeight.GetAsFloat())
}
return collectionRowCount + int(float64(rowCount)*

// calculate executing task cost in scheduler
collectionRowCount += b.scheduler.GetSegmentTaskDelta(nodeID, collectionID)

return collectionRowCount + int(float64(nodeRowCount)*
params.Params.QueryCoordCfg.GlobalRowCountFactor.GetAsFloat())
}

Expand Down
104 changes: 100 additions & 4 deletions internal/querycoordv2/balance/score_based_balancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"

"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

Expand Down Expand Up @@ -72,6 +73,9 @@ func (suite *ScoreBasedBalancerTestSuite) SetupTest() {
distManager := meta.NewDistributionManager()
suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.balancer = NewScoreBasedBalancer(suite.mockScheduler, nodeManager, distManager, testMeta, testTarget)

suite.mockScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.mockScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
}

func (suite *ScoreBasedBalancerTestSuite) TearDownTest() {
Expand Down Expand Up @@ -431,6 +435,101 @@ func (suite *ScoreBasedBalancerTestSuite) TestBalanceOneRound() {
}
}

func (suite *ScoreBasedBalancerTestSuite) TestBalanceWithExecutingTask() {
cases := []struct {
name string
nodes []int64
collectionID int64
replicaID int64
collectionsSegments []*datapb.SegmentInfo
states []session.State
shouldMock bool
distributions map[int64][]*meta.Segment
distributionChannels map[int64][]*meta.DmChannel
deltaCounts []int
expectPlans []SegmentAssignPlan
expectChannelPlans []ChannelAssignPlan
}{
{
name: "normal balance for one collection only",
nodes: []int64{1, 2, 3},
deltaCounts: []int{30, 0, 0},
collectionID: 1,
replicaID: 1,
collectionsSegments: []*datapb.SegmentInfo{
{ID: 1, PartitionID: 1}, {ID: 2, PartitionID: 1}, {ID: 3, PartitionID: 1},
},
states: []session.State{session.NodeStateNormal, session.NodeStateNormal, session.NodeStateNormal},
distributions: map[int64][]*meta.Segment{
1: {{SegmentInfo: &datapb.SegmentInfo{ID: 1, CollectionID: 1, NumOfRows: 10}, Node: 1}},
2: {{SegmentInfo: &datapb.SegmentInfo{ID: 2, CollectionID: 1, NumOfRows: 10}, Node: 2}},
3: {
{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3},
{SegmentInfo: &datapb.SegmentInfo{ID: 4, CollectionID: 1, NumOfRows: 30}, Node: 3},
},
},
expectPlans: []SegmentAssignPlan{
{Segment: &meta.Segment{SegmentInfo: &datapb.SegmentInfo{ID: 3, CollectionID: 1, NumOfRows: 20}, Node: 3}, From: 3, To: 2, ReplicaID: 1},
},
expectChannelPlans: []ChannelAssignPlan{},
},
}

for _, c := range cases {
suite.Run(c.name, func() {
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer

// 1. set up target for multi collections
collection := utils.CreateTestCollection(c.collectionID, int32(c.replicaID))
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, c.collectionID).Return(
nil, c.collectionsSegments, nil)
suite.broker.EXPECT().GetPartitions(mock.Anything, c.collectionID).Return([]int64{c.collectionID}, nil).Maybe()
collection.LoadPercentage = 100
collection.Status = querypb.LoadStatus_Loaded
balancer.meta.CollectionManager.PutCollection(collection)
balancer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(c.collectionID, c.collectionID))
balancer.meta.ReplicaManager.Put(utils.CreateTestReplica(c.replicaID, c.collectionID, c.nodes))
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionCurrentTarget(c.collectionID)
balancer.targetMgr.UpdateCollectionNextTarget(c.collectionID)

// 2. set up target for distribution for multi collections
for node, s := range c.distributions {
balancer.dist.SegmentDistManager.Update(node, s...)
}
for node, v := range c.distributionChannels {
balancer.dist.ChannelDistManager.Update(node, v...)
}

// 3. set up nodes info and resourceManager for balancer
for i := range c.nodes {
nodeInfo := session.NewNodeInfo(c.nodes[i], "127.0.0.1:0")
nodeInfo.UpdateStats(session.WithChannelCnt(len(c.distributionChannels[c.nodes[i]])))
nodeInfo.SetState(c.states[i])
suite.balancer.nodeManager.Add(nodeInfo)
suite.balancer.meta.ResourceManager.HandleNodeUp(c.nodes[i])
}

// set node delta count
suite.mockScheduler.ExpectedCalls = nil
for i, node := range c.nodes {
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(1)).Return(c.deltaCounts[i]).Maybe()
suite.mockScheduler.EXPECT().GetSegmentTaskDelta(node, int64(-1)).Return(c.deltaCounts[i]).Maybe()
}

// 4. balance and verify result
segmentPlans, channelPlans := suite.getCollectionBalancePlans(balancer, c.collectionID)
assert.Equal(suite.T(), len(c.expectPlans), len(segmentPlans))
for i := range segmentPlans {
assert.Equal(suite.T(), c.expectPlans[i].To, segmentPlans[i].To)
}
assert.Equal(suite.T(), len(c.expectChannelPlans), len(channelPlans))
})
}
}

func (suite *ScoreBasedBalancerTestSuite) TestBalanceMultiRound() {
balanceCase := struct {
name string
Expand Down Expand Up @@ -636,11 +735,8 @@ func (suite *ScoreBasedBalancerTestSuite) TestStoppedBalance() {
expectChannelPlans: []ChannelAssignPlan{},
},
}
for i, c := range cases {
for _, c := range cases {
suite.Run(c.name, func() {
if i == 0 {
suite.mockScheduler.Mock.On("GetNodeChannelDelta", mock.Anything).Return(0).Maybe()
}
suite.SetupSuite()
defer suite.TearDownTest()
balancer := suite.balancer
Expand Down
15 changes: 15 additions & 0 deletions internal/querycoordv2/ops_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,9 @@ func (suite *OpsServiceSuite) SetupTest() {
suite.cluster = session.NewMockCluster(suite.T())
suite.jobScheduler = job.NewScheduler()
suite.taskScheduler = task.NewMockScheduler(suite.T())
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()

suite.jobScheduler.Start()
suite.balancer = balance.NewScoreBasedBalancer(
suite.taskScheduler,
Expand Down Expand Up @@ -539,6 +542,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {

// test copy mode, expect generate 1 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
suite.Equal(len(actions), 1)
Expand All @@ -556,6 +561,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {

// test transfer all segments, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter := atomic.NewInt64(0)
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
Expand All @@ -575,6 +582,8 @@ func (suite *OpsServiceSuite) TestTransferSegment() {

// test transfer all segment to all nodes, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
nodeSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
Expand Down Expand Up @@ -745,6 +754,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {

// test copy mode, expect generate 1 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
suite.Equal(len(actions), 1)
Expand All @@ -762,6 +773,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {

// test transfer all channels, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter := atomic.NewInt64(0)
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
actions := t.Actions()
Expand All @@ -781,6 +794,8 @@ func (suite *OpsServiceSuite) TestTransferChannel() {

// test transfer all channels to all nodes, expect generate 4 load segment task
suite.taskScheduler.ExpectedCalls = nil
suite.taskScheduler.EXPECT().GetSegmentTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
suite.taskScheduler.EXPECT().GetChannelTaskDelta(mock.Anything, mock.Anything).Return(0).Maybe()
counter = atomic.NewInt64(0)
nodeSet := typeutil.NewUniqueSet()
suite.taskScheduler.EXPECT().Add(mock.Anything).RunAndReturn(func(t task.Task) error {
Expand Down
Loading

0 comments on commit d3d1920

Please sign in to comment.