Skip to content

Commit

Permalink
fix: Update segment's version in leader task
Browse files Browse the repository at this point in the history
Signed-off-by: Wei Liu <wei.liu@zilliz.com>
  • Loading branch information
weiliu1031 committed Mar 27, 2024
1 parent 5d75249 commit c70ef02
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 12 deletions.
14 changes: 9 additions & 5 deletions internal/querycoordv2/checkers/leader_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,14 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
continue
}

// when segment's version in leader view doesn't match segment's version in dist
// which means leader view store wrong segment location in leader view, then we should update segment location and segment's version
version, ok := leaderView.Segments[s.GetID()]
if !ok || version.GetVersion() < s.Version {
if !ok || version.GetVersion() != s.Version {
log.RatedDebug(10, "leader checker append a segment to set",
zap.Int64("segmentID", s.GetID()),
zap.Int64("nodeID", s.Node))
action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), s.Version)
action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), time.Now().UnixNano())
t := task.NewLeaderTask(
ctx,
params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
Expand All @@ -145,8 +147,9 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
leaderView.ID,
action,
)
// index task shall have lower or equal priority than balance task
t.SetPriority(task.TaskPriorityHigh)

// leader task shouldn't replace executing segment task
t.SetPriority(task.TaskPriorityLow)
t.SetReason("add segment to leader view")
ret = append(ret, t)
}
Expand Down Expand Up @@ -190,7 +193,8 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica *me
action,
)

t.SetPriority(task.TaskPriorityHigh)
// leader task shouldn't replace executing segment task
t.SetPriority(task.TaskPriorityLow)
t.SetReason("remove segment from leader view")
ret = append(ret, t)
}
Expand Down
36 changes: 29 additions & 7 deletions internal/querycoordv2/checkers/leader_checker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
Expand Down Expand Up @@ -105,6 +106,8 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
// before target ready, should skip check collection
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 0)

// test leader view lack of segments
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
loadVersion := time.Now().UnixMilli()
Expand All @@ -121,8 +124,27 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), loadVersion)
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

// test segment's version in leader view doesn't match segment's version in dist

observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
view.Segments[1] = &querypb.SegmentDist{
NodeID: 0,
Version: time.Now().UnixMilli() - 1,
}
observer.dist.LeaderViewManager.Update(2, view)

tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

// test skip sync l0 segment
segments = []*datapb.SegmentInfo{
Expand Down Expand Up @@ -187,7 +209,7 @@ func (suite *LeaderCheckerTestSuite) TestActivation() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
}

func (suite *LeaderCheckerTestSuite) TestStoppingNode() {
Expand Down Expand Up @@ -266,7 +288,7 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
}

func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegmentsWithReplicas() {
Expand Down Expand Up @@ -311,7 +333,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegmentsWithReplicas() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
}

func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
Expand Down Expand Up @@ -346,7 +368,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)

// skip sync l0 segments
segments := []*datapb.SegmentInfo{
Expand Down Expand Up @@ -407,7 +429,7 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce)
suite.Equal(tasks[0].Actions()[0].Node(), int64(2))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
}

func TestLeaderCheckerSuite(t *testing.T) {
Expand Down
3 changes: 3 additions & 0 deletions internal/querynodev2/segments/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1291,6 +1291,9 @@ func (loader *segmentLoader) LoadDeltaLogs(ctx context.Context, segment Segment,
return err
}

// try to update segment version after load delta logs
loader.manager.Segment.UpdateBy(IncreaseVersion(segment.Version()), WithType(SegmentTypeSealed), WithID(segment.ID()))

log.Info("load delta logs done", zap.Int64("deleteCount", deltaData.RowCount))
return nil
}
Expand Down

0 comments on commit c70ef02

Please sign in to comment.