diff --git a/configs/milvus.yaml b/configs/milvus.yaml index 3b42e8a47e29..ea06e87c8cb0 100644 --- a/configs/milvus.yaml +++ b/configs/milvus.yaml @@ -489,6 +489,7 @@ dataCoord: serverMaxRecvSize: 268435456 clientMaxSendSize: 268435456 clientMaxRecvSize: 536870912 + syncSegmentsInterval: 300 dataNode: dataSync: diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index af1ec3a967b4..3dfa5b935718 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -487,18 +487,7 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact // Apply metrics after successful meta update. metricMutation.commit() } - - nodeID := c.plans[plan.GetPlanID()].dataNodeID - req := &datapb.SyncSegmentsRequest{ - PlanID: plan.PlanID, - } - - log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID)) - if err := c.sessions.SyncSegments(nodeID, req); err != nil { - log.Warn("handleCompactionResult: fail to sync segments with node", - zap.Int64("nodeID", nodeID), zap.Error(err)) - return err - } + // TODO @xiaocai2333: drop compaction plan on datanode log.Info("handleCompactionResult: success to handle merge compaction result") return nil @@ -546,13 +535,8 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // task.dataNodeID not match with channel // Mark this compaction as failure and skip processing the meta if !c.chManager.Match(task.dataNodeID, task.plan.GetChannel()) { - // Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task - // without changing the meta + // TODO @xiaocai2333: drop compaction plan on datanode log.Warn("compaction failed for channel nodeID not match") - if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil { - log.Warn("compaction failed to sync segments with node", zap.Error(err)) - continue - } c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan()) c.setSegmentsCompacting(task.plan, false) c.scheduler.Finish(task.dataNodeID, task.plan) @@ -616,16 +600,8 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { if nodeUnkonwnPlan, ok := completedPlans[planID]; ok { nodeID, plan := nodeUnkonwnPlan.A, nodeUnkonwnPlan.B log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID), zap.String("channel", plan.GetChannel())) - - // Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task - // without changing the meta - log.Info("compaction syncing unknown plan with node") - if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{ - PlanID: planID, - }); err != nil { - log.Warn("compaction failed to sync segments with node", zap.Error(err)) - return err - } + // TODO @xiaocai2333: drop compaction plan on datanode + log.Info("drop unknown plan with node") } } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index a71793f82e4c..c9908c8e9cc2 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -85,7 +85,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() { 4: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 4, State: commonpb.CompactionState_Executing}}, }, nil) - s.mockSessMgr.EXPECT().SyncSegments(int64(100), mock.Anything).Return(nil).Once() { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once() handler := newCompactionPlanHandler(nil, s.mockSessMgr, nil, nil, s.mockAlloc) @@ -476,7 +475,6 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { } return nil }).Once() - s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} @@ -518,7 +516,6 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return( []*SegmentInfo{segment}, &segMetricMutation{}, nil).Once() - s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} @@ -530,7 +527,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { } err := handler.handleMergeCompactionResult(plan, compactionResult) - s.Error(err) + s.NoError(err) }) } @@ -550,7 +547,6 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() { }) s.Run("test complete merge compaction task", func() { - s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() // mock for handleMergeCompactionResult s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once() segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100}) @@ -703,14 +699,6 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { }, } - s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error { - s.EqualValues(nodeID, 222) - s.NotNil(req) - s.Empty(req.GetCompactedFrom()) - s.EqualValues(5, req.GetPlanID()) - return nil - }).Once() - s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil) s.mockCm.EXPECT().Match(int64(111), "ch-1").Return(true) s.mockCm.EXPECT().Match(int64(111), "ch-2").Return(false).Once() diff --git a/internal/datacoord/meta.go b/internal/datacoord/meta.go index fe61e95538dd..5587786a892c 100644 --- a/internal/datacoord/meta.go +++ b/internal/datacoord/meta.go @@ -1572,3 +1572,10 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo segToUpdate.DroppedAt = uint64(time.Now().UnixNano()) } } + +func (m *meta) ListCollections() []int64 { + m.RLock() + defer m.RUnlock() + + return lo.Keys(m.collections) +} diff --git a/internal/datacoord/server.go b/internal/datacoord/server.go index 86367a93d6be..e30d4df33005 100644 --- a/internal/datacoord/server.go +++ b/internal/datacoord/server.go @@ -127,6 +127,7 @@ type Server struct { compactionTrigger trigger compactionHandler compactionPlanContext compactionViewManager *CompactionViewManager + syncSegmentsScheduler *SyncSegmentsScheduler metricsCacheManager *metricsinfo.MetricsCacheManager @@ -393,6 +394,8 @@ func (s *Server) initDataCoord() error { s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta, s.buildIndexCh) s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta) + s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager) + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address)) @@ -712,6 +715,7 @@ func (s *Server) startServerLoop() { go s.importScheduler.Start() go s.importChecker.Start() s.garbageCollector.start() + s.syncSegmentsScheduler.Start() } // startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream @@ -1102,6 +1106,7 @@ func (s *Server) Stop() error { s.importScheduler.Close() s.importChecker.Close() + s.syncSegmentsScheduler.Stop() if Params.DataCoordCfg.EnableCompaction.GetAsBool() { s.stopCompactionTrigger() diff --git a/internal/datacoord/sync_segments_scheduler.go b/internal/datacoord/sync_segments_scheduler.go new file mode 100644 index 000000000000..18726df4ddce --- /dev/null +++ b/internal/datacoord/sync_segments_scheduler.go @@ -0,0 +1,150 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "sync" + "time" + + "github.com/samber/lo" + "go.uber.org/zap" + + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/logutil" + "github.com/milvus-io/milvus/pkg/util/typeutil" +) + +type SyncSegmentsScheduler struct { + quit chan struct{} + wg sync.WaitGroup + + meta *meta + channelManager ChannelManager + sessions SessionManager +} + +func newSyncSegmentsScheduler(m *meta, channelManager ChannelManager, sessions SessionManager) *SyncSegmentsScheduler { + return &SyncSegmentsScheduler{ + quit: make(chan struct{}), + wg: sync.WaitGroup{}, + meta: m, + channelManager: channelManager, + sessions: sessions, + } +} + +func (sss *SyncSegmentsScheduler) Start() { + sss.quit = make(chan struct{}) + sss.wg.Add(1) + + go func() { + defer logutil.LogPanic() + ticker := time.NewTicker(Params.DataCoordCfg.SyncSegmentsInterval.GetAsDuration(time.Second)) + defer sss.wg.Done() + + for { + select { + case <-sss.quit: + log.Info("sync segments scheduler quit") + ticker.Stop() + return + case <-ticker.C: + sss.SyncSegmentsForCollections() + } + } + }() + log.Info("SyncSegmentsScheduler started...") +} + +func (sss *SyncSegmentsScheduler) Stop() { + close(sss.quit) + sss.wg.Wait() +} + +func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() { + collIDs := sss.meta.ListCollections() + for _, collID := range collIDs { + collInfo := sss.meta.GetCollection(collID) + if collInfo == nil { + log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID)) + continue + } + pkField, err := typeutil.GetPrimaryFieldSchema(collInfo.Schema) + if err != nil { + log.Warn("get primary field from schema failed", zap.Int64("collectionID", collID), + zap.Error(err)) + continue + } + for _, channelName := range collInfo.VChannelNames { + nodeID, err := sss.channelManager.FindWatcher(channelName) + if err != nil { + log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID), + zap.String("channelName", channelName), zap.Error(err)) + continue + } + for _, partitionID := range collInfo.Partitions { + if err := sss.SyncFlushedSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil { + log.Warn("sync segment with channel failed, retry next ticker", + zap.Int64("collectionID", collID), + zap.Int64("partitionID", partitionID), + zap.String("channel", channelName), + zap.Error(err)) + continue + } + } + } + } +} + +func (sss *SyncSegmentsScheduler) SyncFlushedSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error { + log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), + zap.String("channelName", channelName), zap.Int64("nodeID", nodeID)) + segments := sss.meta.SelectSegments(WithCollection(collectionID), WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool { + return info.GetPartitionID() == partitionID && isFlush(info) + })) + req := &datapb.SyncSegmentsRequest{ + ChannelName: channelName, + PartitionId: partitionID, + CollectionId: collectionID, + SegmentInfos: make(map[int64]*datapb.SyncSegmentInfo), + } + + for _, seg := range segments { + req.SegmentInfos[seg.ID] = &datapb.SyncSegmentInfo{ + SegmentId: seg.GetID(), + State: seg.GetState(), + Level: seg.GetLevel(), + NumOfRows: seg.GetNumOfRows(), + } + for _, statsLog := range seg.GetStatslogs() { + if statsLog.GetFieldID() == pkFieldID { + req.SegmentInfos[seg.ID].PkStatsLog = statsLog + break + } + } + } + + if err := sss.sessions.SyncSegments(nodeID, req); err != nil { + log.Warn("fail to sync segments with node", zap.Error(err)) + return err + } + log.Info("sync segments success", zap.Int64s("segments", lo.Map(segments, func(t *SegmentInfo, i int) int64 { + return t.GetID() + }))) + return nil +} diff --git a/internal/datacoord/sync_segments_scheduler_test.go b/internal/datacoord/sync_segments_scheduler_test.go new file mode 100644 index 000000000000..d21d95734060 --- /dev/null +++ b/internal/datacoord/sync_segments_scheduler_test.go @@ -0,0 +1,369 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package datacoord + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/cockroachdb/errors" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" + "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" + "github.com/milvus-io/milvus/internal/proto/datapb" +) + +type SyncSegmentsSchedulerSuite struct { + suite.Suite + + m *meta + new atomic.Int64 + old atomic.Int64 +} + +func Test_SyncSegmentsSchedulerSuite(t *testing.T) { + suite.Run(t, new(SyncSegmentsSchedulerSuite)) +} + +func (s *SyncSegmentsSchedulerSuite) initParams() { + s.m = &meta{ + RWMutex: sync.RWMutex{}, + collections: map[UniqueID]*collectionInfo{ + 1: { + ID: 1, + Schema: &schemapb.CollectionSchema{ + Name: "coll1", + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + }, + { + FieldID: 101, + Name: "vec", + IsPrimaryKey: false, + Description: "", + DataType: schemapb.DataType_FloatVector, + }, + }, + }, + Partitions: []int64{2, 3}, + VChannelNames: []string{"channel1", "channel2"}, + }, + 2: nil, + }, + segments: &SegmentsInfo{ + collSegments: map[UniqueID]*CollectionSegments{ + 1: { + segments: map[UniqueID]*SegmentInfo{ + 5: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 5, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "channel1", + NumOfRows: 3000, + State: commonpb.SegmentState_Dropped, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 1, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 2, + }, + }, + }, + }, + }, + }, + 6: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 6, + CollectionID: 1, + PartitionID: 3, + InsertChannel: "channel1", + NumOfRows: 3000, + State: commonpb.SegmentState_Dropped, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 3, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 4, + }, + }, + }, + }, + }, + }, + 9: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 9, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "channel1", + NumOfRows: 3000, + State: commonpb.SegmentState_Flushed, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 9, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 10, + }, + }, + }, + }, + CompactionFrom: []int64{5}, + }, + }, + 10: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 10, + CollectionID: 1, + PartitionID: 3, + InsertChannel: "channel1", + NumOfRows: 3000, + State: commonpb.SegmentState_Flushed, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 7, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 8, + }, + }, + }, + }, + CompactionFrom: []int64{6}, + }, + }, + 7: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 7, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "channel2", + NumOfRows: 3000, + State: commonpb.SegmentState_Dropped, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 5, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 6, + }, + }, + }, + }, + }, + }, + 8: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 8, + CollectionID: 1, + PartitionID: 3, + InsertChannel: "channel2", + NumOfRows: 3000, + State: commonpb.SegmentState_Dropped, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 7, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 8, + }, + }, + }, + }, + }, + }, + 11: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 11, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "channel2", + NumOfRows: 3000, + State: commonpb.SegmentState_Flushed, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 5, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 6, + }, + }, + }, + }, + CompactionFrom: []int64{7}, + }, + }, + 12: { + SegmentInfo: &datapb.SegmentInfo{ + ID: 12, + CollectionID: 1, + PartitionID: 3, + InsertChannel: "channel2", + NumOfRows: 3000, + State: commonpb.SegmentState_Flushed, + Statslogs: []*datapb.FieldBinlog{ + { + FieldID: 100, + Binlogs: []*datapb.Binlog{ + { + LogID: 7, + }, + }, + }, + { + FieldID: 101, + Binlogs: []*datapb.Binlog{ + { + LogID: 8, + }, + }, + }, + }, + CompactionFrom: []int64{8}, + }, + }, + }, + }, + }, + }, + } +} + +func (s *SyncSegmentsSchedulerSuite) SetupTest() { + s.initParams() +} + +func (s *SyncSegmentsSchedulerSuite) Test_newSyncSegmentsScheduler() { + cm := NewMockChannelManager(s.T()) + cm.EXPECT().FindWatcher(mock.Anything).Return(100, nil) + + sm := NewMockSessionManager(s.T()) + sm.EXPECT().SyncSegments(mock.Anything, mock.Anything).RunAndReturn(func(i int64, request *datapb.SyncSegmentsRequest) error { + for _, seg := range request.GetSegmentInfos() { + if seg.GetState() == commonpb.SegmentState_Flushed { + s.new.Add(1) + } + if seg.GetState() == commonpb.SegmentState_Dropped { + s.old.Add(1) + } + } + return nil + }) + + Params.DataCoordCfg.SyncSegmentsInterval.SwapTempValue("1") + defer Params.DataCoordCfg.SyncSegmentsInterval.SwapTempValue("600") + sss := newSyncSegmentsScheduler(s.m, cm, sm) + sss.Start() + + // 2 channels, 2 partitions, 2 segments + // no longer sync dropped segments + for s.new.Load() < 4 { + } + sss.Stop() +} + +func (s *SyncSegmentsSchedulerSuite) Test_SyncSegmentsFail() { + cm := NewMockChannelManager(s.T()) + sm := NewMockSessionManager(s.T()) + + sss := newSyncSegmentsScheduler(s.m, cm, sm) + + s.Run("pk not found", func() { + sss.meta.collections[1].Schema.Fields[0].IsPrimaryKey = false + sss.SyncSegmentsForCollections() + sss.meta.collections[1].Schema.Fields[0].IsPrimaryKey = true + }) + + s.Run("find watcher failed", func() { + cm.EXPECT().FindWatcher(mock.Anything).Return(0, errors.New("mock error")).Twice() + sss.SyncSegmentsForCollections() + }) + + s.Run("sync segment failed", func() { + cm.EXPECT().FindWatcher(mock.Anything).Return(100, nil) + sm.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")) + sss.SyncSegmentsForCollections() + }) +} diff --git a/internal/datanode/compaction/mock_compactor.go b/internal/datanode/compaction/mock_compactor.go index 99dccea0aa54..19a83bf2e1b9 100644 --- a/internal/datanode/compaction/mock_compactor.go +++ b/internal/datanode/compaction/mock_compactor.go @@ -228,38 +228,6 @@ func (_c *MockCompactor_GetPlanID_Call) RunAndReturn(run func() int64) *MockComp return _c } -// InjectDone provides a mock function with given fields: -func (_m *MockCompactor) InjectDone() { - _m.Called() -} - -// MockCompactor_InjectDone_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'InjectDone' -type MockCompactor_InjectDone_Call struct { - *mock.Call -} - -// InjectDone is a helper method to define mock.On call -func (_e *MockCompactor_Expecter) InjectDone() *MockCompactor_InjectDone_Call { - return &MockCompactor_InjectDone_Call{Call: _e.mock.On("InjectDone")} -} - -func (_c *MockCompactor_InjectDone_Call) Run(run func()) *MockCompactor_InjectDone_Call { - _c.Call.Run(func(args mock.Arguments) { - run() - }) - return _c -} - -func (_c *MockCompactor_InjectDone_Call) Return() *MockCompactor_InjectDone_Call { - _c.Call.Return() - return _c -} - -func (_c *MockCompactor_InjectDone_Call) RunAndReturn(run func()) *MockCompactor_InjectDone_Call { - _c.Call.Return(run) - return _c -} - // Stop provides a mock function with given fields: func (_m *MockCompactor) Stop() { _m.Called() diff --git a/internal/datanode/compaction_executor_test.go b/internal/datanode/compaction_executor_test.go index fd6fba2e6f0a..5fd21070e280 100644 --- a/internal/datanode/compaction_executor_test.go +++ b/internal/datanode/compaction_executor_test.go @@ -80,7 +80,6 @@ func TestCompactionExecutor(t *testing.T) { ex.executeWithState(mockC) <-signal } else { - mockC.EXPECT().InjectDone().Return().Maybe() mockC.EXPECT().Compact().RunAndReturn( func() (*datapb.CompactionPlanResult, error) { signal <- struct{}{} diff --git a/internal/datanode/data_node.go b/internal/datanode/data_node.go index a6ab491af68d..295918eedfbe 100644 --- a/internal/datanode/data_node.go +++ b/internal/datanode/data_node.go @@ -297,6 +297,7 @@ func (node *DataNode) Init() error { } else { node.eventManager = NewEventManager() } + log.Info("init datanode done", zap.String("Address", node.address)) }) return initError diff --git a/internal/datanode/metacache/meta_cache.go b/internal/datanode/metacache/meta_cache.go index cd5f635f48de..c2e45f986da0 100644 --- a/internal/datanode/metacache/meta_cache.go +++ b/internal/datanode/metacache/meta_cache.go @@ -50,6 +50,10 @@ type MetaCache interface { GetSegmentIDsBy(filters ...SegmentFilter) []int64 // PredictSegments returns the segment ids which may contain the provided primary key. PredictSegments(pk storage.PrimaryKey, filters ...SegmentFilter) ([]int64, bool) + // DetectMissingSegments returns the segment ids which is missing in datanode. + DetectMissingSegments(segments map[int64]struct{}) []int64 + // UpdateSegmentView updates the segments BF from datacoord view. + UpdateSegmentView(partitionID int64, newSegments []*datapb.SyncSegmentInfo, newSegmentsBF []*BloomFilterSet, allSegments map[int64]struct{}) } var _ MetaCache = (*metaCacheImpl)(nil) @@ -222,3 +226,55 @@ func (c *metaCacheImpl) rangeWithFilter(fn func(id int64, info *SegmentInfo), fi } } } + +func (c *metaCacheImpl) DetectMissingSegments(segments map[int64]struct{}) []int64 { + c.mu.RLock() + defer c.mu.RUnlock() + + missingSegments := make([]int64, 0) + + for segID := range segments { + if _, ok := c.segmentInfos[segID]; !ok { + missingSegments = append(missingSegments, segID) + } + } + + return missingSegments +} + +func (c *metaCacheImpl) UpdateSegmentView(partitionID int64, + newSegments []*datapb.SyncSegmentInfo, + newSegmentsBF []*BloomFilterSet, + allSegments map[int64]struct{}, +) { + c.mu.Lock() + defer c.mu.Unlock() + + for i, info := range newSegments { + // check again + if _, ok := c.segmentInfos[info.GetSegmentId()]; !ok { + segInfo := &SegmentInfo{ + segmentID: info.GetSegmentId(), + partitionID: partitionID, + state: info.GetState(), + level: info.GetLevel(), + flushedRows: info.GetNumOfRows(), + startPosRecorded: true, + bfs: newSegmentsBF[i], + } + c.segmentInfos[info.GetSegmentId()] = segInfo + log.Info("metacache does not have segment, add it", zap.Int64("segmentID", info.GetSegmentId())) + } + } + + for segID, info := range c.segmentInfos { + if info.partitionID != partitionID || + (info.state != commonpb.SegmentState_Flushed && info.state != commonpb.SegmentState_Flushing) { + continue + } + if _, ok := allSegments[segID]; !ok { + log.Info("remove dropped segment", zap.Int64("segmentID", segID)) + delete(c.segmentInfos, segID) + } + } +} diff --git a/internal/datanode/metacache/meta_cache_test.go b/internal/datanode/metacache/meta_cache_test.go index 8b5b7aa5e913..cdb5e0614d56 100644 --- a/internal/datanode/metacache/meta_cache_test.go +++ b/internal/datanode/metacache/meta_cache_test.go @@ -189,6 +189,50 @@ func (s *MetaCacheSuite) TestPredictSegments() { s.EqualValues(1, predict[0]) } +func (s *MetaCacheSuite) Test_DetectMissingSegments() { + segments := map[int64]struct{}{ + 1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 9: {}, 10: {}, + } + + missingSegments := s.cache.DetectMissingSegments(segments) + s.ElementsMatch(missingSegments, []int64{9, 10}) +} + +func (s *MetaCacheSuite) Test_UpdateSegmentView() { + addSegments := []*datapb.SyncSegmentInfo{ + { + SegmentId: 100, + PkStatsLog: nil, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 10240, + }, + } + addSegmentsBF := []*BloomFilterSet{ + NewBloomFilterSet(), + } + segments := map[int64]struct{}{ + 1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 100: {}, + } + + s.cache.UpdateSegmentView(1, addSegments, addSegmentsBF, segments) + + addSegments = []*datapb.SyncSegmentInfo{ + { + SegmentId: 101, + PkStatsLog: nil, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 10240, + }, + } + + segments = map[int64]struct{}{ + 1: {}, 2: {}, 3: {}, 4: {}, 5: {}, 6: {}, 7: {}, 8: {}, 101: {}, + } + s.cache.UpdateSegmentView(1, addSegments, addSegmentsBF, segments) +} + func TestMetaCacheSuite(t *testing.T) { suite.Run(t, new(MetaCacheSuite)) } diff --git a/internal/datanode/metacache/mock_meta_cache.go b/internal/datanode/metacache/mock_meta_cache.go index 8c357eac95f9..0bd69c61766d 100644 --- a/internal/datanode/metacache/mock_meta_cache.go +++ b/internal/datanode/metacache/mock_meta_cache.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package metacache @@ -42,9 +42,9 @@ type MockMetaCache_AddSegment_Call struct { } // AddSegment is a helper method to define mock.On call -// - segInfo *datapb.SegmentInfo -// - factory PkStatsFactory -// - actions ...SegmentAction +// - segInfo *datapb.SegmentInfo +// - factory PkStatsFactory +// - actions ...SegmentAction func (_e *MockMetaCache_Expecter) AddSegment(segInfo interface{}, factory interface{}, actions ...interface{}) *MockMetaCache_AddSegment_Call { return &MockMetaCache_AddSegment_Call{Call: _e.mock.On("AddSegment", append([]interface{}{segInfo, factory}, actions...)...)} @@ -114,6 +114,50 @@ func (_c *MockMetaCache_Collection_Call) RunAndReturn(run func() int64) *MockMet return _c } +// DetectMissingSegments provides a mock function with given fields: segments +func (_m *MockMetaCache) DetectMissingSegments(segments map[int64]struct{}) []int64 { + ret := _m.Called(segments) + + var r0 []int64 + if rf, ok := ret.Get(0).(func(map[int64]struct{}) []int64); ok { + r0 = rf(segments) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]int64) + } + } + + return r0 +} + +// MockMetaCache_DetectMissingSegments_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'DetectMissingSegments' +type MockMetaCache_DetectMissingSegments_Call struct { + *mock.Call +} + +// DetectMissingSegments is a helper method to define mock.On call +// - segments map[int64]struct{} +func (_e *MockMetaCache_Expecter) DetectMissingSegments(segments interface{}) *MockMetaCache_DetectMissingSegments_Call { + return &MockMetaCache_DetectMissingSegments_Call{Call: _e.mock.On("DetectMissingSegments", segments)} +} + +func (_c *MockMetaCache_DetectMissingSegments_Call) Run(run func(segments map[int64]struct{})) *MockMetaCache_DetectMissingSegments_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(map[int64]struct{})) + }) + return _c +} + +func (_c *MockMetaCache_DetectMissingSegments_Call) Return(_a0 []int64) *MockMetaCache_DetectMissingSegments_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockMetaCache_DetectMissingSegments_Call) RunAndReturn(run func(map[int64]struct{}) []int64) *MockMetaCache_DetectMissingSegments_Call { + _c.Call.Return(run) + return _c +} + // GetSegmentByID provides a mock function with given fields: id, filters func (_m *MockMetaCache) GetSegmentByID(id int64, filters ...SegmentFilter) (*SegmentInfo, bool) { _va := make([]interface{}, len(filters)) @@ -153,8 +197,8 @@ type MockMetaCache_GetSegmentByID_Call struct { } // GetSegmentByID is a helper method to define mock.On call -// - id int64 -// - filters ...SegmentFilter +// - id int64 +// - filters ...SegmentFilter func (_e *MockMetaCache_Expecter) GetSegmentByID(id interface{}, filters ...interface{}) *MockMetaCache_GetSegmentByID_Call { return &MockMetaCache_GetSegmentByID_Call{Call: _e.mock.On("GetSegmentByID", append([]interface{}{id}, filters...)...)} @@ -211,7 +255,7 @@ type MockMetaCache_GetSegmentIDsBy_Call struct { } // GetSegmentIDsBy is a helper method to define mock.On call -// - filters ...SegmentFilter +// - filters ...SegmentFilter func (_e *MockMetaCache_Expecter) GetSegmentIDsBy(filters ...interface{}) *MockMetaCache_GetSegmentIDsBy_Call { return &MockMetaCache_GetSegmentIDsBy_Call{Call: _e.mock.On("GetSegmentIDsBy", append([]interface{}{}, filters...)...)} @@ -268,7 +312,7 @@ type MockMetaCache_GetSegmentsBy_Call struct { } // GetSegmentsBy is a helper method to define mock.On call -// - filters ...SegmentFilter +// - filters ...SegmentFilter func (_e *MockMetaCache_Expecter) GetSegmentsBy(filters ...interface{}) *MockMetaCache_GetSegmentsBy_Call { return &MockMetaCache_GetSegmentsBy_Call{Call: _e.mock.On("GetSegmentsBy", append([]interface{}{}, filters...)...)} @@ -336,8 +380,8 @@ type MockMetaCache_PredictSegments_Call struct { } // PredictSegments is a helper method to define mock.On call -// - pk storage.PrimaryKey -// - filters ...SegmentFilter +// - pk storage.PrimaryKey +// - filters ...SegmentFilter func (_e *MockMetaCache_Expecter) PredictSegments(pk interface{}, filters ...interface{}) *MockMetaCache_PredictSegments_Call { return &MockMetaCache_PredictSegments_Call{Call: _e.mock.On("PredictSegments", append([]interface{}{pk}, filters...)...)} @@ -394,7 +438,7 @@ type MockMetaCache_RemoveSegments_Call struct { } // RemoveSegments is a helper method to define mock.On call -// - filters ...SegmentFilter +// - filters ...SegmentFilter func (_e *MockMetaCache_Expecter) RemoveSegments(filters ...interface{}) *MockMetaCache_RemoveSegments_Call { return &MockMetaCache_RemoveSegments_Call{Call: _e.mock.On("RemoveSegments", append([]interface{}{}, filters...)...)} @@ -466,6 +510,42 @@ func (_c *MockMetaCache_Schema_Call) RunAndReturn(run func() *schemapb.Collectio return _c } +// UpdateSegmentView provides a mock function with given fields: partitionID, newSegments, newSegmentsBF, allSegments +func (_m *MockMetaCache) UpdateSegmentView(partitionID int64, newSegments []*datapb.SyncSegmentInfo, newSegmentsBF []*BloomFilterSet, allSegments map[int64]struct{}) { + _m.Called(partitionID, newSegments, newSegmentsBF, allSegments) +} + +// MockMetaCache_UpdateSegmentView_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'UpdateSegmentView' +type MockMetaCache_UpdateSegmentView_Call struct { + *mock.Call +} + +// UpdateSegmentView is a helper method to define mock.On call +// - partitionID int64 +// - newSegments []*datapb.SyncSegmentInfo +// - newSegmentsBF []*BloomFilterSet +// - allSegments map[int64]struct{} +func (_e *MockMetaCache_Expecter) UpdateSegmentView(partitionID interface{}, newSegments interface{}, newSegmentsBF interface{}, allSegments interface{}) *MockMetaCache_UpdateSegmentView_Call { + return &MockMetaCache_UpdateSegmentView_Call{Call: _e.mock.On("UpdateSegmentView", partitionID, newSegments, newSegmentsBF, allSegments)} +} + +func (_c *MockMetaCache_UpdateSegmentView_Call) Run(run func(partitionID int64, newSegments []*datapb.SyncSegmentInfo, newSegmentsBF []*BloomFilterSet, allSegments map[int64]struct{})) *MockMetaCache_UpdateSegmentView_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(int64), args[1].([]*datapb.SyncSegmentInfo), args[2].([]*BloomFilterSet), args[3].(map[int64]struct{})) + }) + return _c +} + +func (_c *MockMetaCache_UpdateSegmentView_Call) Return() *MockMetaCache_UpdateSegmentView_Call { + _c.Call.Return() + return _c +} + +func (_c *MockMetaCache_UpdateSegmentView_Call) RunAndReturn(run func(int64, []*datapb.SyncSegmentInfo, []*BloomFilterSet, map[int64]struct{})) *MockMetaCache_UpdateSegmentView_Call { + _c.Call.Return(run) + return _c +} + // UpdateSegments provides a mock function with given fields: action, filters func (_m *MockMetaCache) UpdateSegments(action SegmentAction, filters ...SegmentFilter) { _va := make([]interface{}, len(filters)) @@ -484,8 +564,8 @@ type MockMetaCache_UpdateSegments_Call struct { } // UpdateSegments is a helper method to define mock.On call -// - action SegmentAction -// - filters ...SegmentFilter +// - action SegmentAction +// - filters ...SegmentFilter func (_e *MockMetaCache_Expecter) UpdateSegments(action interface{}, filters ...interface{}) *MockMetaCache_UpdateSegments_Call { return &MockMetaCache_UpdateSegments_Call{Call: _e.mock.On("UpdateSegments", append([]interface{}{action}, filters...)...)} diff --git a/internal/datanode/services.go b/internal/datanode/services.go index f00b01df6a1e..7af4dc931dda 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -23,6 +23,7 @@ import ( "context" "fmt" + "github.com/samber/lo" "go.uber.org/zap" "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" @@ -30,13 +31,18 @@ import ( "github.com/milvus-io/milvus/internal/datanode/compaction" "github.com/milvus-io/milvus/internal/datanode/importv2" "github.com/milvus-io/milvus/internal/datanode/io" + "github.com/milvus-io/milvus/internal/datanode/metacache" + "github.com/milvus-io/milvus/internal/datanode/util" + "github.com/milvus-io/milvus/internal/metastore/kv/binlog" "github.com/milvus-io/milvus/internal/proto/datapb" "github.com/milvus-io/milvus/internal/proto/internalpb" + "github.com/milvus-io/milvus/internal/storage" "github.com/milvus-io/milvus/internal/util/importutilv2" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/metrics" "github.com/milvus-io/milvus/pkg/tracer" + "github.com/milvus-io/milvus/pkg/util/conc" "github.com/milvus-io/milvus/pkg/util/merr" "github.com/milvus-io/milvus/pkg/util/metricsinfo" "github.com/milvus-io/milvus/pkg/util/tsoutil" @@ -262,6 +268,9 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments log := log.Ctx(ctx).With( zap.Int64("planID", req.GetPlanID()), zap.Int64("nodeID", node.GetNodeID()), + zap.Int64("collectionID", req.GetCollectionId()), + zap.Int64("partitionID", req.GetPartitionId()), + zap.String("channel", req.GetChannelName()), ) log.Info("DataNode receives SyncSegments") @@ -271,8 +280,62 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments return merr.Status(err), nil } - // TODO: sheep, add a new DropCompaction interface, deprecate SyncSegments - node.compactionExecutor.removeTask(req.GetPlanID()) + if len(req.GetSegmentInfos()) <= 0 { + log.Info("sync segments is empty, skip it") + return merr.Success(), nil + } + + ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName()) + if !ok { + node.compactionExecutor.discardPlan(req.GetChannelName()) + err := merr.WrapErrChannelNotFound(req.GetChannelName()) + log.Warn("failed to get flow graph service", zap.Error(err)) + return merr.Status(err), nil + } + + allSegments := make(map[int64]struct{}) + for segID := range req.GetSegmentInfos() { + allSegments[segID] = struct{}{} + } + + missingSegments := ds.metacache.DetectMissingSegments(allSegments) + + newSegments := make([]*datapb.SyncSegmentInfo, 0, len(missingSegments)) + futures := make([]*conc.Future[any], 0, len(missingSegments)) + + for _, segID := range missingSegments { + segID := segID + newSeg := req.GetSegmentInfos()[segID] + newSegments = append(newSegments, newSeg) + future := io.GetOrCreateStatsPool().Submit(func() (any, error) { + var val *metacache.BloomFilterSet + var err error + err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) + if err != nil { + log.Warn("failed to DecompressBinLog", zap.Error(err)) + return val, err + } + pks, err := util.LoadStats(ctx, node.chunkManager, ds.metacache.Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) + if err != nil { + log.Warn("failed to load segment stats log", zap.Error(err)) + return val, err + } + val = metacache.NewBloomFilterSet(pks...) + return val, nil + }) + futures = append(futures, future) + } + + err := conc.AwaitAll(futures...) + if err != nil { + return merr.Status(err), nil + } + + newSegmentsBF := lo.Map(futures, func(future *conc.Future[any], _ int) *metacache.BloomFilterSet { + return future.Value().(*metacache.BloomFilterSet) + }) + + ds.metacache.UpdateSegmentView(req.GetPartitionId(), newSegments, newSegmentsBF, allSegments) return merr.Success(), nil } diff --git a/internal/datanode/services_test.go b/internal/datanode/services_test.go index f9b1f46a25f9..25ac07596bc9 100644 --- a/internal/datanode/services_test.go +++ b/internal/datanode/services_test.go @@ -492,3 +492,163 @@ func (s *DataNodeServicesSuite) TestQuerySlot() { s.NoError(merr.Error(resp.GetStatus())) }) } + +func (s *DataNodeServicesSuite) TestSyncSegments() { + s.Run("node not healthy", func() { + s.SetupTest() + s.node.UpdateStateCode(commonpb.StateCode_Abnormal) + + ctx := context.Background() + status, err := s.node.SyncSegments(ctx, nil) + s.NoError(err) + s.False(merr.Ok(status)) + s.ErrorIs(merr.Error(status), merr.ErrServiceNotReady) + }) + + s.Run("dataSyncService not exist", func() { + s.SetupTest() + ctx := context.Background() + req := &datapb.SyncSegmentsRequest{ + ChannelName: "channel1", + PartitionId: 2, + CollectionId: 1, + SegmentInfos: map[int64]*datapb.SyncSegmentInfo{ + 102: { + SegmentId: 102, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: 2, + NumOfRows: 1024, + }, + }, + } + + status, err := s.node.SyncSegments(ctx, req) + s.NoError(err) + s.False(merr.Ok(status)) + }) + + s.Run("normal case", func() { + s.SetupTest() + cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{ + Schema: &schemapb.CollectionSchema{ + Fields: []*schemapb.FieldSchema{ + { + FieldID: 100, + Name: "pk", + IsPrimaryKey: true, + Description: "", + DataType: schemapb.DataType_Int64, + }, + }, + }, + Vchan: &datapb.VchannelInfo{}, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 100, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Growing, + Level: datapb.SegmentLevel_L0, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 101, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 102, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + cache.AddSegment(&datapb.SegmentInfo{ + ID: 103, + CollectionID: 1, + PartitionID: 2, + InsertChannel: "111", + NumOfRows: 0, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + }, func(*datapb.SegmentInfo) *metacache.BloomFilterSet { + return metacache.NewBloomFilterSet() + }) + mockFlowgraphManager := NewMockFlowgraphManager(s.T()) + mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).Return(&dataSyncService{ + metacache: cache, + }, true) + s.node.flowgraphManager = mockFlowgraphManager + ctx := context.Background() + req := &datapb.SyncSegmentsRequest{ + ChannelName: "channel1", + PartitionId: 2, + CollectionId: 1, + SegmentInfos: map[int64]*datapb.SyncSegmentInfo{ + 103: { + SegmentId: 103, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L0, + NumOfRows: 1024, + }, + 104: { + SegmentId: 104, + PkStatsLog: &datapb.FieldBinlog{ + FieldID: 100, + Binlogs: nil, + }, + State: commonpb.SegmentState_Flushed, + Level: datapb.SegmentLevel_L1, + NumOfRows: 1024, + }, + }, + } + + status, err := s.node.SyncSegments(ctx, req) + s.NoError(err) + s.True(merr.Ok(status)) + + info, exist := cache.GetSegmentByID(100) + s.True(exist) + s.NotNil(info) + + info, exist = cache.GetSegmentByID(101) + s.False(exist) + s.Nil(info) + + info, exist = cache.GetSegmentByID(102) + s.False(exist) + s.Nil(info) + + info, exist = cache.GetSegmentByID(103) + s.True(exist) + s.NotNil(info) + + info, exist = cache.GetSegmentByID(104) + s.True(exist) + s.NotNil(info) + }) +} diff --git a/internal/datanode/syncmgr/mock_sync_manager.go b/internal/datanode/syncmgr/mock_sync_manager.go index 259d09b2da54..ee19d324d394 100644 --- a/internal/datanode/syncmgr/mock_sync_manager.go +++ b/internal/datanode/syncmgr/mock_sync_manager.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package syncmgr @@ -57,7 +57,7 @@ type MockSyncManager_GetEarliestPosition_Call struct { } // GetEarliestPosition is a helper method to define mock.On call -// - channel string +// - channel string func (_e *MockSyncManager_Expecter) GetEarliestPosition(channel interface{}) *MockSyncManager_GetEarliestPosition_Call { return &MockSyncManager_GetEarliestPosition_Call{Call: _e.mock.On("GetEarliestPosition", channel)} } @@ -101,8 +101,8 @@ type MockSyncManager_SyncData_Call struct { } // SyncData is a helper method to define mock.On call -// - ctx context.Context -// - task Task +// - ctx context.Context +// - task Task func (_e *MockSyncManager_Expecter) SyncData(ctx interface{}, task interface{}) *MockSyncManager_SyncData_Call { return &MockSyncManager_SyncData_Call{Call: _e.mock.On("SyncData", ctx, task)} } diff --git a/internal/datanode/syncmgr/mock_task.go b/internal/datanode/syncmgr/mock_task.go index 709ba8199575..130ee752ce07 100644 --- a/internal/datanode/syncmgr/mock_task.go +++ b/internal/datanode/syncmgr/mock_task.go @@ -1,4 +1,4 @@ -// Code generated by mockery v2.30.1. DO NOT EDIT. +// Code generated by mockery v2.32.4. DO NOT EDIT. package syncmgr @@ -115,7 +115,7 @@ type MockTask_HandleError_Call struct { } // HandleError is a helper method to define mock.On call -// - _a0 error +// - _a0 error func (_e *MockTask_Expecter) HandleError(_a0 interface{}) *MockTask_HandleError_Call { return &MockTask_HandleError_Call{Call: _e.mock.On("HandleError", _a0)} } diff --git a/internal/proto/data_coord.proto b/internal/proto/data_coord.proto index 0ec352e3b6c9..1a40dc404f13 100644 --- a/internal/proto/data_coord.proto +++ b/internal/proto/data_coord.proto @@ -496,15 +496,29 @@ message CompactionStateRequest { common.MsgBase base = 1; } +message SyncSegmentInfo { + int64 segment_id = 1; + FieldBinlog pk_stats_log = 2; + common.SegmentState state = 3; + SegmentLevel level = 4; + int64 num_of_rows = 5; +} + message SyncSegmentsRequest { + // Deprecated, after v2.4.3 int64 planID = 1; + // Deprecated, after v2.4.3 int64 compacted_to = 2; + // Deprecated, after v2.4.3 int64 num_of_rows = 3; + // Deprecated, after v2.4.3 repeated int64 compacted_from = 4; + // Deprecated, after v2.4.3 repeated FieldBinlog stats_logs = 5; string channel_name = 6; int64 partition_id = 7; int64 collection_id = 8; + map segment_infos = 9; } message CompactionSegmentBinlogs { diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 9a842867ca10..3208dec4c883 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -2806,6 +2806,7 @@ type dataCoordConfig struct { SingleCompactionDeltalogMaxNum ParamItem `refreshable:"true"` GlobalCompactionInterval ParamItem `refreshable:"false"` ChannelCheckpointMaxLag ParamItem `refreshable:"true"` + SyncSegmentsInterval ParamItem `refreshable:"false"` // LevelZero Segment EnableLevelZeroSegment ParamItem `refreshable:"false"` @@ -3146,6 +3147,14 @@ During compaction, the size of segment # of rows is able to exceed segment max # } p.ChannelCheckpointMaxLag.Init(base.mgr) + p.SyncSegmentsInterval = ParamItem{ + Key: "dataCoord.sync.interval", + Version: "2.4.3", + Doc: "The time interval for regularly syncing segments", + DefaultValue: "600", // 10 * 60 seconds + } + p.SyncSegmentsInterval.Init(base.mgr) + // LevelZeroCompaction p.EnableLevelZeroSegment = ParamItem{ Key: "dataCoord.segment.enableLevelZero",