Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: Periodically synchronize segments to datanode watcher #33420

Merged
merged 12 commits into from
May 30, 2024
1 change: 1 addition & 0 deletions configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ dataCoord:
serverMaxRecvSize: 268435456
clientMaxSendSize: 268435456
clientMaxRecvSize: 536870912
syncSegmentsInterval: 300

dataNode:
dataSync:
Expand Down
32 changes: 4 additions & 28 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,18 +487,7 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
// Apply metrics after successful meta update.
metricMutation.commit()
}

nodeID := c.plans[plan.GetPlanID()].dataNodeID
req := &datapb.SyncSegmentsRequest{
PlanID: plan.PlanID,
}

log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
log.Warn("handleCompactionResult: fail to sync segments with node",
zap.Int64("nodeID", nodeID), zap.Error(err))
return err
}
// TODO @xiaocai2333: drop compaction plan on datanode

log.Info("handleCompactionResult: success to handle merge compaction result")
return nil
Expand Down Expand Up @@ -546,13 +535,8 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// task.dataNodeID not match with channel
// Mark this compaction as failure and skip processing the meta
if !c.chManager.Match(task.dataNodeID, task.plan.GetChannel()) {
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
// TODO @xiaocai2333: drop compaction plan on datanode
log.Warn("compaction failed for channel nodeID not match")
if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
continue
}
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
Expand Down Expand Up @@ -617,16 +601,8 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
if nodeUnkonwnPlan, ok := completedPlans[planID]; ok {
nodeID, plan := nodeUnkonwnPlan.A, nodeUnkonwnPlan.B
log := log.With(zap.Int64("planID", planID), zap.Int64("nodeID", nodeID), zap.String("channel", plan.GetChannel()))

// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
log.Info("compaction syncing unknown plan with node")
if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{
PlanID: planID,
}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
return err
}
// TODO @xiaocai2333: drop compaction plan on datanode
log.Info("drop unknown plan with node")
}
}

Expand Down
14 changes: 1 addition & 13 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
4: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 4, State: commonpb.CompactionState_Executing}},
}, nil)

s.mockSessMgr.EXPECT().SyncSegments(int64(100), mock.Anything).Return(nil).Once()
{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
handler := newCompactionPlanHandler(nil, s.mockSessMgr, nil, nil, s.mockAlloc)
Expand Down Expand Up @@ -475,7 +474,6 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
}
return nil
}).Once()
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()

handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
Expand Down Expand Up @@ -517,7 +515,6 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{segment},
&segMetricMutation{}, nil).Once()
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()

handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
Expand All @@ -529,7 +526,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
}

err := handler.handleMergeCompactionResult(plan, compactionResult)
s.Error(err)
s.NoError(err)
})
}

Expand All @@ -549,7 +546,6 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() {
})

s.Run("test complete merge compaction task", func() {
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
// mock for handleMergeCompactionResult
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
Expand Down Expand Up @@ -702,14 +698,6 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
},
}

s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error {
s.EqualValues(nodeID, 222)
s.NotNil(req)
s.Empty(req.GetCompactedFrom())
s.EqualValues(5, req.GetPlanID())
return nil
}).Once()
s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil)
s.mockCm.EXPECT().Match(int64(111), "ch-1").Return(true)
s.mockCm.EXPECT().Match(int64(111), "ch-2").Return(false).Once()

Expand Down
7 changes: 7 additions & 0 deletions internal/datacoord/meta.go
Original file line number Diff line number Diff line change
Expand Up @@ -1569,3 +1569,10 @@ func updateSegStateAndPrepareMetrics(segToUpdate *SegmentInfo, targetState commo
metricMutation.append(segToUpdate.GetState(), targetState, segToUpdate.GetLevel(), segToUpdate.GetNumOfRows())
segToUpdate.State = targetState
}

func (m *meta) ListCollections() []int64 {
m.RLock()
defer m.RUnlock()

return lo.Keys(m.collections)
}
1 change: 1 addition & 0 deletions internal/datacoord/segment_operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func SetMaxRowCount(maxRow int64) SegmentOperator {
type segmentCriterion struct {
collectionID int64
channel string
partitionID int64
others []SegmentFilter
}

Expand Down
5 changes: 5 additions & 0 deletions internal/datacoord/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ type Server struct {
compactionTrigger trigger
compactionHandler compactionPlanContext
compactionViewManager *CompactionViewManager
syncSegmentsScheduler *SyncSegmentsScheduler

metricsCacheManager *metricsinfo.MetricsCacheManager

Expand Down Expand Up @@ -393,6 +394,8 @@ func (s *Server) initDataCoord() error {
s.importScheduler = NewImportScheduler(s.meta, s.cluster, s.allocator, s.importMeta, s.buildIndexCh)
s.importChecker = NewImportChecker(s.meta, s.broker, s.cluster, s.allocator, s.segmentManager, s.importMeta)

s.syncSegmentsScheduler = newSyncSegmentsScheduler(s.meta, s.channelManager, s.sessionManager)

s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)

log.Info("init datacoord done", zap.Int64("nodeID", paramtable.GetNodeID()), zap.String("Address", s.address))
Expand Down Expand Up @@ -712,6 +715,7 @@ func (s *Server) startServerLoop() {
go s.importScheduler.Start()
go s.importChecker.Start()
s.garbageCollector.start()
s.syncSegmentsScheduler.Start()
}

// startDataNodeTtLoop start a goroutine to recv data node tt msg from msgstream
Expand Down Expand Up @@ -1104,6 +1108,7 @@ func (s *Server) Stop() error {

s.importScheduler.Close()
s.importChecker.Close()
s.syncSegmentsScheduler.Stop()

if Params.DataCoordCfg.EnableCompaction.GetAsBool() {
s.stopCompactionTrigger()
Expand Down
149 changes: 149 additions & 0 deletions internal/datacoord/sync_segments_scheduler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package datacoord

import (
"sync"
"time"

"github.com/samber/lo"
"go.uber.org/zap"

"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/logutil"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)

type SyncSegmentsScheduler struct {
quit chan struct{}
wg sync.WaitGroup

meta *meta
channelManager ChannelManager
sessions SessionManager
}

func newSyncSegmentsScheduler(m *meta, channelManager ChannelManager, sessions SessionManager) *SyncSegmentsScheduler {
return &SyncSegmentsScheduler{
quit: make(chan struct{}),
wg: sync.WaitGroup{},
meta: m,
channelManager: channelManager,
sessions: sessions,
}
}

func (sss *SyncSegmentsScheduler) Start() {
sss.quit = make(chan struct{})
sss.wg.Add(1)

go func() {
defer logutil.LogPanic()
ticker := time.NewTicker(Params.DataCoordCfg.SyncSegmentsInterval.GetAsDuration(time.Second))
defer sss.wg.Done()

for {
select {
case <-sss.quit:
log.Info("sync segments scheduler quit")
ticker.Stop()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove this

return
case <-ticker.C:
sss.SyncSegmentsForCollections()
}
}
}()
log.Info("SyncSegmentsScheduler started...")
}

func (sss *SyncSegmentsScheduler) Stop() {
close(sss.quit)
sss.wg.Wait()
}

func (sss *SyncSegmentsScheduler) SyncSegmentsForCollections() {
collIDs := sss.meta.ListCollections()
for _, collID := range collIDs {
collInfo := sss.meta.GetCollection(collID)
if collInfo == nil {
log.Warn("collection info is nil, skip it", zap.Int64("collectionID", collID))
continue
}
pkField, err := typeutil.GetPrimaryFieldSchema(collInfo.Schema)
if err != nil {
log.Warn("get primary field from schema failed", zap.Int64("collectionID", collID),
zap.Error(err))
continue
}
for _, channelName := range collInfo.VChannelNames {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can execute concurrently at the channel level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thinks it's no necessary. I already add the concurrently for load pk in datanode.

nodeID, err := sss.channelManager.FindWatcher(channelName)
if err != nil {
log.Warn("find watcher for channel failed", zap.Int64("collectionID", collID),
zap.String("channelName", channelName), zap.Error(err))
continue
}
for _, partitionID := range collInfo.Partitions {
if err := sss.SyncSegments(collID, partitionID, channelName, nodeID, pkField.GetFieldID()); err != nil {
log.Warn("sync segment with channel failed, retry next ticker",
zap.Int64("collectionID", collID),
zap.Int64("partitionID", partitionID),
zap.String("channel", channelName),
zap.Error(err))
continue
}
}
}
}
}

func (sss *SyncSegmentsScheduler) SyncSegments(collectionID, partitionID int64, channelName string, nodeID, pkFieldID int64) error {
log := log.With(zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID),
zap.String("channelName", channelName), zap.Int64("nodeID", nodeID))
segments := sss.meta.SelectSegments(WithChannel(channelName), SegmentFilterFunc(func(info *SegmentInfo) bool {
return info.GetPartitionID() == partitionID && isSegmentHealthy(info)
}))
req := &datapb.SyncSegmentsRequest{
ChannelName: channelName,
PartitionId: partitionID,
CollectionId: collectionID,
SegmentInfos: make(map[int64]*datapb.SyncSegmentInfo),
}

for _, seg := range segments {
for _, statsLog := range seg.GetStatslogs() {
if statsLog.GetFieldID() == pkFieldID {
req.SegmentInfos[seg.ID] = &datapb.SyncSegmentInfo{
SegmentId: seg.GetID(),
PkStatsLog: statsLog,
State: seg.GetState(),
Level: seg.GetLevel(),
NumOfRows: seg.GetNumOfRows(),
}
}
}
}

if err := sss.sessions.SyncSegments(nodeID, req); err != nil {
log.Warn("fail to sync segments with node", zap.Error(err))
return err
}
log.Info("sync segments success", zap.Int64s("segments", lo.Map(segments, func(t *SegmentInfo, i int) int64 {
return t.GetID()
})))
return nil
}
Loading
Loading