Skip to content

Commit

Permalink
enhance: Remove the unused compaction logic from shard (milvus-io#33932)
Browse files Browse the repository at this point in the history
1. Remove the `compactTo` field in `SegmentInfo`.
2. Remove the target segment not match and its retry logic in
`SyncManager`.

issue: milvus-io#32809

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Jun 23, 2024
1 parent b77ab76 commit 6c1d815
Show file tree
Hide file tree
Showing 21 changed files with 10 additions and 419 deletions.
12 changes: 0 additions & 12 deletions internal/datanode/metacache/actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,6 @@ func WithLevel(level datapb.SegmentLevel) SegmentFilter {
})
}

func WithCompacted() SegmentFilter {
return SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.compactTo != 0
})
}

func WithNoSyncingTask() SegmentFilter {
return SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.syncingTasks == 0
Expand Down Expand Up @@ -165,12 +159,6 @@ func RollStats(newStats ...*storage.PrimaryKeyStats) SegmentAction {
}
}

func CompactTo(compactTo int64) SegmentAction {
return func(info *SegmentInfo) {
info.compactTo = compactTo
}
}

func StartSyncing(batchSize int64) SegmentAction {
return func(info *SegmentInfo) {
info.syncingRows += batchSize
Expand Down
5 changes: 0 additions & 5 deletions internal/datanode/metacache/actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,6 @@ func (s *SegmentActionSuite) TestActions() {
action = UpdateNumOfRows(numOfRows)
action(info)
s.Equal(numOfRows, info.NumOfRows())

compactTo := int64(1002)
action = CompactTo(compactTo)
action(info)
s.Equal(compactTo, info.CompactTo())
}

func (s *SegmentActionSuite) TestMergeActions() {
Expand Down
12 changes: 0 additions & 12 deletions internal/datanode/metacache/segment.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,6 @@ import (
"github.com/milvus-io/milvus/internal/storage"
)

const (
// NullSegment means the segment id to discard
// happens when segment compacted to 0 lines and target segment is dropped directly
NullSegment = int64(-1)
)

type SegmentInfo struct {
segmentID int64
partitionID int64
Expand All @@ -40,7 +34,6 @@ type SegmentInfo struct {
bufferRows int64
syncingRows int64
bfs *BloomFilterSet
compactTo int64
level datapb.SegmentLevel
syncingTasks int32
}
Expand Down Expand Up @@ -80,10 +73,6 @@ func (s *SegmentInfo) GetHistory() []*storage.PkStatistics {
return s.bfs.GetHistory()
}

func (s *SegmentInfo) CompactTo() int64 {
return s.compactTo
}

func (s *SegmentInfo) GetBloomFilterSet() *BloomFilterSet {
return s.bfs
}
Expand All @@ -104,7 +93,6 @@ func (s *SegmentInfo) Clone() *SegmentInfo {
bufferRows: s.bufferRows,
syncingRows: s.syncingRows,
bfs: s.bfs,
compactTo: s.compactTo,
level: s.level,
syncingTasks: s.syncingTasks,
}
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/syncmgr/key_lock_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"github.com/milvus-io/milvus/pkg/util/lock"
)

//go:generate mockery --name=Task --structname=MockTask --output=./ --filename=mock_task.go --with-expecter --inpackage
type Task interface {
SegmentID() int64
CalcTargetSegment() (int64, error)
Checkpoint() *msgpb.MsgPosition
StartPosition() *msgpb.MsgPosition
ChannelName() string
Expand Down
3 changes: 0 additions & 3 deletions internal/datanode/syncmgr/key_lock_dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@ func (t *mockTask) done() {
close(t.ch)
}
func (t *mockTask) CalcTargetSegment() (int64, error) {
return t.targetID, t.err
}
func (t *mockTask) SegmentID() int64 { panic("no implementation") }
func (t *mockTask) Checkpoint() *msgpb.MsgPosition { panic("no implementation") }
func (t *mockTask) StartPosition() *msgpb.MsgPosition { panic("no implementation") }
Expand Down
55 changes: 2 additions & 53 deletions internal/datanode/syncmgr/mock_task.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion internal/datanode/syncmgr/storage_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
bfs := s.getBfs()
segInfo := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(segInfo)
metacache.CompactTo(metacache.NullSegment)(segInfo)
s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) {
action(segInfo)
}).Return().Once()
Expand Down
1 change: 0 additions & 1 deletion internal/datanode/syncmgr/storage_v2_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,6 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() {
bfs := s.getBfs()
segInfo := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(segInfo)
metacache.CompactTo(metacache.NullSegment)(segInfo)
s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Run(func(action metacache.SegmentAction, filters ...metacache.SegmentFilter) {
action(segInfo)
}).Return().Once()
Expand Down
54 changes: 4 additions & 50 deletions internal/datanode/syncmgr/sync_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"fmt"
"strconv"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
Expand Down Expand Up @@ -109,19 +108,12 @@ func (mgr *syncManager) SyncData(ctx context.Context, task Task, callbacks ...fu
return mgr.safeSubmitTask(task, callbacks...)
}

// safeSubmitTask handles submitting task logic with optimistic target check logic
// when task returns errTargetSegmentNotMatch error
// perform refetch then retry logic
// safeSubmitTask submits task to SyncManager
func (mgr *syncManager) safeSubmitTask(task Task, callbacks ...func(error) error) *conc.Future[struct{}] {
taskKey := fmt.Sprintf("%d-%d", task.SegmentID(), task.Checkpoint().GetTimestamp())
mgr.tasks.Insert(taskKey, task)

key, err := task.CalcTargetSegment()
if err != nil {
task.HandleError(err)
return conc.Go(func() (struct{}, error) { return struct{}{}, err })
}

key := task.SegmentID()
return mgr.submit(key, task, callbacks...)
}

Expand All @@ -130,48 +122,10 @@ func (mgr *syncManager) submit(key int64, task Task, callbacks ...func(error) er
if err == nil {
return nil
}
// unexpected error
if !errors.Is(err, errTargetSegmentNotMatch) {
task.HandleError(err)
return err
}

targetID, err := task.CalcTargetSegment()
// shall not reach, segment meta lost during sync
if err != nil {
task.HandleError(err)
return err
}
if targetID == key {
err = merr.WrapErrServiceInternal("recaluated with same key", fmt.Sprint(targetID))
task.HandleError(err)
return err
}
log.Info("task calculated target segment id",
zap.Int64("targetID", targetID),
zap.Int64("segmentID", task.SegmentID()),
)
return mgr.submit(targetID, task).Err()
task.HandleError(err)
return err
}
callbacks = append([]func(error) error{handler}, callbacks...)
log.Info("sync mgr sumbit task with key", zap.Int64("key", key))
return mgr.Submit(key, task, callbacks...)
}

func (mgr *syncManager) GetEarliestPosition(channel string) (int64, *msgpb.MsgPosition) {
var cp *msgpb.MsgPosition
var segmentID int64
mgr.tasks.Range(func(_ string, task Task) bool {
if task.StartPosition() == nil {
return true
}
if task.ChannelName() == channel {
if cp == nil || task.StartPosition().GetTimestamp() < cp.GetTimestamp() {
cp = task.StartPosition()
segmentID = task.SegmentID()
}
}
return true
})
return segmentID, cp
}
64 changes: 2 additions & 62 deletions internal/datanode/syncmgr/sync_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/cockroachdb/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -184,7 +185,6 @@ func (s *SyncManagerSuite) TestCompacted() {
bfs := metacache.NewBloomFilterSet()
seg := metacache.NewSegmentInfo(&datapb.SegmentInfo{}, bfs)
metacache.UpdateNumOfRows(1000)(seg)
metacache.CompactTo(1001)(seg)
s.metacache.EXPECT().GetSegmentByID(s.segmentID).Return(seg, true)
s.metacache.EXPECT().GetSegmentsBy(mock.Anything, mock.Anything).Return([]*metacache.SegmentInfo{seg})
s.metacache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return()
Expand Down Expand Up @@ -261,31 +261,13 @@ func (s *SyncManagerSuite) TestNewSyncManager() {
s.Error(err)
}

func (s *SyncManagerSuite) TestTargetUpdated() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

task := NewMockTask(s.T())
task.EXPECT().SegmentID().Return(1000)
task.EXPECT().Checkpoint().Return(&msgpb.MsgPosition{})
task.EXPECT().CalcTargetSegment().Return(1000, nil).Once()
task.EXPECT().CalcTargetSegment().Return(1001, nil).Once()
task.EXPECT().Run().Return(errTargetSegmentNotMatch).Once()
task.EXPECT().Run().Return(nil).Once()

f := manager.SyncData(context.Background(), task)
_, err = f.Await()
s.NoError(err)
}

func (s *SyncManagerSuite) TestUnexpectedError() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

task := NewMockTask(s.T())
task.EXPECT().SegmentID().Return(1000)
task.EXPECT().Checkpoint().Return(&msgpb.MsgPosition{})
task.EXPECT().CalcTargetSegment().Return(1000, nil).Once()
task.EXPECT().Run().Return(merr.WrapErrServiceInternal("mocked")).Once()
task.EXPECT().HandleError(mock.Anything)

Expand All @@ -294,56 +276,14 @@ func (s *SyncManagerSuite) TestUnexpectedError() {
s.Error(err)
}

func (s *SyncManagerSuite) TestCalcTargetError() {
s.Run("fail_before_submit", func() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

mockErr := merr.WrapErrServiceInternal("mocked")

task := NewMockTask(s.T())
task.EXPECT().SegmentID().Return(1000)
task.EXPECT().Checkpoint().Return(&msgpb.MsgPosition{})
task.EXPECT().CalcTargetSegment().Return(0, mockErr).Once()
task.EXPECT().HandleError(mock.Anything)

f := manager.SyncData(context.Background(), task)
_, err = f.Await()
s.Error(err)
s.ErrorIs(err, mockErr)
})

s.Run("fail_during_rerun", func() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

mockErr := merr.WrapErrServiceInternal("mocked")

task := NewMockTask(s.T())
task.EXPECT().SegmentID().Return(1000)
task.EXPECT().Checkpoint().Return(&msgpb.MsgPosition{})
task.EXPECT().CalcTargetSegment().Return(1000, nil).Once()
task.EXPECT().CalcTargetSegment().Return(0, mockErr).Once()
task.EXPECT().Run().Return(errTargetSegmentNotMatch).Once()
task.EXPECT().HandleError(mock.Anything)

f := manager.SyncData(context.Background(), task)
_, err = f.Await()
s.Error(err)
s.ErrorIs(err, mockErr)
})
}

func (s *SyncManagerSuite) TestTargetUpdateSameID() {
manager, err := NewSyncManager(s.chunkManager, s.allocator)
s.NoError(err)

task := NewMockTask(s.T())
task.EXPECT().SegmentID().Return(1000)
task.EXPECT().Checkpoint().Return(&msgpb.MsgPosition{})
task.EXPECT().CalcTargetSegment().Return(1000, nil).Once()
task.EXPECT().CalcTargetSegment().Return(1000, nil).Once()
task.EXPECT().Run().Return(errTargetSegmentNotMatch).Once()
task.EXPECT().Run().Return(errors.New("mock err")).Once()
task.EXPECT().HandleError(mock.Anything)

f := manager.SyncData(context.Background(), task)
Expand Down
Loading

0 comments on commit 6c1d815

Please sign in to comment.