Skip to content

Commit

Permalink
fix: Dirty sealed segment won't release after channel balance (milvus…
Browse files Browse the repository at this point in the history
…-io#31095)

issue: milvus-io#31074
This PR fix dirty sealed segment doesn't release after channel balance,
dirty sealed segment means segment doesn't exist in targets.

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Mar 7, 2024
1 parent 005dbf2 commit a5abfb2
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 4 deletions.
8 changes: 7 additions & 1 deletion internal/querycoordv2/checkers/segment_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import (
"github.com/milvus-io/milvus/pkg/log"
)

const initialTargetVersion = int64(0)

type SegmentChecker struct {
meta *meta.Meta
dist *meta.DistributionManager
Expand Down Expand Up @@ -325,7 +327,11 @@ func (c *SegmentChecker) filterSegmentInUse(replica *meta.Replica, segments []*m
view := c.dist.LeaderViewManager.GetLeaderShardView(leaderID, s.GetInsertChannel())
currentTargetVersion := c.targetMgr.GetCollectionTargetVersion(s.CollectionID, meta.CurrentTarget)
partition := c.meta.CollectionManager.GetPartition(s.PartitionID)
if partition != nil && view.TargetVersion != currentTargetVersion {

// if delegator has valid target version, and before it update to latest readable version, skip release it's sealed segment
// Notice: if syncTargetVersion stuck, segment on delegator won't be released
readableVersionNotUpdate := view.TargetVersion != initialTargetVersion && view.TargetVersion < currentTargetVersion
if partition != nil && readableVersionNotUpdate {
// leader view version hasn't been updated, segment maybe still in use
continue
}
Expand Down
20 changes: 17 additions & 3 deletions internal/querycoordv2/checkers/segment_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,19 +333,18 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() {
checker.targetMgr.UpdateCollectionCurrentTarget(collectionID)
readableVersion := checker.targetMgr.GetCollectionTargetVersion(collectionID, meta.CurrentTarget)

// set dist
// test less target version exist on leader,meet segment doesn't exit in target, segment should be released
nodeID := int64(2)
segmentID := int64(1)
checker.dist.ChannelDistManager.Update(nodeID, utils.CreateTestChannel(collectionID, nodeID, segmentID, "test-insert-channel"))
view := utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{segmentID: 2}, map[int64]*meta.Segment{})
view.TargetVersion = readableVersion - 1
checker.dist.LeaderViewManager.Update(nodeID, view)
checker.dist.SegmentDistManager.Update(nodeID, utils.CreateTestSegment(collectionID, partitionID, segmentID, nodeID, 2, "test-insert-channel"))

tasks := checker.Check(context.TODO())
suite.Len(tasks, 0)

// test less version exist on leader
// test leader's target version update to latest,meet segment doesn't exit in target, segment should be released
view = utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{1: 3}, map[int64]*meta.Segment{})
view.TargetVersion = readableVersion
checker.dist.LeaderViewManager.Update(2, view)
Expand All @@ -359,6 +358,21 @@ func (suite *SegmentCheckerTestSuite) TestSkipReleaseSealedSegments() {
suite.EqualValues(segmentID, action.SegmentID())
suite.EqualValues(nodeID, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)

// test leader with initialTargetVersion, meet segment doesn't exit in target, segment should be released
view = utils.CreateTestLeaderView(nodeID, collectionID, "test-insert-channel", map[int64]int64{1: 3}, map[int64]*meta.Segment{})
view.TargetVersion = initialTargetVersion
checker.dist.LeaderViewManager.Update(2, view)
tasks = checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Len(tasks[0].Actions(), 1)
action, ok = tasks[0].Actions()[0].(*task.SegmentAction)
suite.True(ok)
suite.EqualValues(1, tasks[0].ReplicaID())
suite.Equal(task.ActionTypeReduce, action.Type())
suite.EqualValues(segmentID, action.SegmentID())
suite.EqualValues(nodeID, action.Node())
suite.Equal(tasks[0].Priority(), task.TaskPriorityNormal)
}

func (suite *SegmentCheckerTestSuite) TestReleaseGrowingSegments() {
Expand Down

0 comments on commit a5abfb2

Please sign in to comment.