Skip to content

Commit

Permalink
fix: Skip release duplicate l0 segment (milvus-io#31540)
Browse files Browse the repository at this point in the history
issue: milvus-io#31480 milvus-io#31481

release duplicate l0 segment task, which execute on old delegator may
cause segment lack, and execute on new delegator may break new
delegator's leader view.

This PR skip release duplicate l0 segment by segment_checker, cause l0
segment will be released with unsub channel

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Mar 27, 2024
1 parent ded1fcc commit 48b43be
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 35 deletions.
15 changes: 15 additions & 0 deletions internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,11 @@ func (c *SegmentChecker) getSealedSegmentDiff(

// l0 Segment which exist on current target, but not on dist
for segmentID, segment := range currentTargetMap {
// to avoid generate duplicate segment task
if nextTargetMap[segmentID] != nil {
continue
}

node, existInDist := distMap[segmentID]
l0WithWrongLocation := false
if existInDist && segment.GetLevel() == datapb.SegmentLevel_L0 {
Expand All @@ -248,6 +253,8 @@ func (c *SegmentChecker) getSealedSegmentDiff(
for _, segment := range dist {
_, existOnCurrent := currentTargetMap[segment.GetID()]
_, existOnNext := nextTargetMap[segment.GetID()]

// l0 segment should be release with channel together
if !existOnNext && !existOnCurrent {
toRelease = append(toRelease, segment)
}
Expand Down Expand Up @@ -284,6 +291,14 @@ func (c *SegmentChecker) findRepeatedSealedSegments(replicaID int64) []*meta.Seg
dist := c.getSealedSegmentsDist(replica)
versions := make(map[int64]*meta.Segment)
for _, s := range dist {
// l0 segment should be release with channel together
segment := c.targetMgr.GetSealedSegment(s.GetCollectionID(), s.GetID(), meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
if isL0Segment {
continue
}

maxVer, ok := versions[s.GetID()]
if !ok {
versions[s.GetID()] = s
Expand Down
39 changes: 23 additions & 16 deletions internal/querycoordv2/checkers/segment_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,22 +240,17 @@ func (suite *SegmentCheckerTestSuite) TestLoadL0Segments() {
// seg l0 segment exist on a non delegator node
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
// test load l0 segments to delegator and release the one on non delegator
// test load l0 segments to delegator
tasks = checker.Check(context.TODO())
suite.Len(tasks, 2)
for _, t := range tasks {
suite.Len(t.Actions(), 1)
action, ok = t.Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.EqualValues(1, action.SegmentID())
suite.Equal(t.Priority(), task.TaskPriorityNormal)
if action.Type() == task.ActionTypeGrow {
suite.EqualValues(2, action.Node())
} else {
suite.EqualValues(1, action.Node())
}
}
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeGrow, action.Type())
suite.EqualValues(1, action.SegmentID())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}

func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {
Expand Down Expand Up @@ -309,14 +304,26 @@ func (suite *SegmentCheckerTestSuite) TestReleaseL0Segments() {

// release duplicate l0 segment
tasks := checker.Check(context.TODO())
suite.Len(tasks, 0)

checker.dist.SegmentDistManager.Update(1)

// test release l0 segment which doesn't exist in target
suite.broker.ExpectedCalls = nil
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, nil, nil)
checker.targetMgr.UpdateCollectionNextTarget(int64(1))
checker.targetMgr.UpdateCollectionCurrentTarget(int64(1))

tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok := tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(1, action.SegmentID())
suite.EqualValues(1, action.Node())
suite.EqualValues(2, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)

checker.dist.SegmentDistManager.Update(1)
Expand Down
27 changes: 18 additions & 9 deletions internal/querycoordv2/task/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,18 +197,19 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
)

// Get shard leader for the given replica and segment
leaderID, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), task.Shard())
if !ok {
replica := ex.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node())
view := ex.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, action.Shard())
if view == nil {
msg := "no shard leader for the segment to execute loading"
err = merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found")
log.Warn(msg, zap.Error(err))
return err
}
log = log.With(zap.Int64("shardLeader", leaderID))
log = log.With(zap.Int64("shardLeader", view.ID))

startTs := time.Now()
log.Info("load segments...")
status, err := ex.cluster.LoadSegments(task.Context(), leaderID, req)
status, err := ex.cluster.LoadSegments(task.Context(), view.ID, req)
err = merr.CheckRPCCall(status, err)
if err != nil {
log.Warn("failed to load segment", zap.Error(err))
Expand Down Expand Up @@ -248,13 +249,21 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
req.Shard = task.shard

if ex.meta.CollectionManager.Exist(task.CollectionID()) {
leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard())
if !ok {
log.Warn("no shard leader for the segment to execute releasing", zap.String("shard", req.GetShard()))
// leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), req.GetShard())
// if !ok {
// log.Warn("no shard leader for the segment to execute releasing", zap.String("shard", req.GetShard()))
// return
// }
replica := ex.meta.ReplicaManager.GetByCollectionAndNode(task.CollectionID(), action.Node())
view := ex.dist.LeaderViewManager.GetLatestLeadersByReplicaShard(replica, action.Shard())
if view == nil {
msg := "no shard leader for the segment to execute releasing"
err := merr.WrapErrChannelNotFound(task.Shard(), "shard delegator not found")
log.Warn(msg, zap.Error(err))
return
}
dstNode = leader
log = log.With(zap.Int64("shardLeader", leader))
dstNode = view.ID
log = log.With(zap.Int64("shardLeader", view.ID))
req.NeedTransfer = true
}
}
Expand Down
5 changes: 5 additions & 0 deletions internal/querycoordv2/task/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,7 @@ func (suite *TaskSuite) TestLoadSegmentTask() {
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
}))
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
tasks := []Task{}
segments := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments {
Expand Down Expand Up @@ -529,6 +530,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskNotIndex() {
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
}))
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
tasks := []Task{}
segments := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments {
Expand Down Expand Up @@ -623,6 +625,7 @@ func (suite *TaskSuite) TestLoadSegmentTaskFailed() {
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
}))
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
tasks := []Task{}
segments := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments {
Expand Down Expand Up @@ -1005,6 +1008,7 @@ func (suite *TaskSuite) TestTaskCanceled() {
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
}))
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
tasks := []Task{}
segmentInfos := []*datapb.SegmentInfo{}
for _, segment := range suite.loadSegments {
Expand Down Expand Up @@ -1097,6 +1101,7 @@ func (suite *TaskSuite) TestSegmentTaskStale() {
CollectionID: suite.collection,
ChannelName: channel.ChannelName,
}))
suite.dist.LeaderViewManager.Update(targetNode, utils.CreateTestLeaderView(targetNode, suite.collection, channel.ChannelName, map[int64]int64{}, map[int64]*meta.Segment{}))
tasks := []Task{}
segments := make([]*datapb.SegmentInfo, 0)
for _, segment := range suite.loadSegments {
Expand Down
22 changes: 12 additions & 10 deletions tests/integration/balance/balance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,17 +188,18 @@ func (s *BalanceTestSuit) TestBalanceOnSingleReplica() {
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
}, 30*time.Second, 1*time.Second)

// check total segment number
// check total segment number and total channel number
s.Eventually(func() bool {
count := 0
segNum, chNum := 0, 0
for _, node := range s.Cluster.GetAllQueryNodes() {
resp1, err := node.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp1.GetStatus()))
count += len(resp1.Segments)
segNum += len(resp1.Segments)
chNum += len(resp1.Channels)
}
return count == 8
}, 10*time.Second, 1*time.Second)
return segNum == 8 && chNum == 2
}, 30*time.Second, 1*time.Second)
}

func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
Expand Down Expand Up @@ -244,17 +245,18 @@ func (s *BalanceTestSuit) TestBalanceOnMultiReplica() {
return len(resp.Channels) == 1 && len(resp.Segments) >= 2
}, 30*time.Second, 1*time.Second)

// check total segment num
// check total segment number and total channel number
s.Eventually(func() bool {
count := 0
segNum, chNum := 0, 0
for _, node := range s.Cluster.GetAllQueryNodes() {
resp1, err := node.GetDataDistribution(ctx, &querypb.GetDataDistributionRequest{})
s.NoError(err)
s.True(merr.Ok(resp1.GetStatus()))
count += len(resp1.Segments)
segNum += len(resp1.Segments)
chNum += len(resp1.Channels)
}
return count == 16
}, 10*time.Second, 1*time.Second)
return segNum == 16 && chNum == 4
}, 30*time.Second, 1*time.Second)
}

func (s *BalanceTestSuit) TestNodeDown() {
Expand Down

0 comments on commit 48b43be

Please sign in to comment.