Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
  • Loading branch information
xiaocai2333 committed May 28, 2024
1 parent 75e7446 commit 7f27783
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 30 deletions.
42 changes: 21 additions & 21 deletions internal/datacoord/compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,17 +488,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact
metricMutation.commit()
}

//nodeID := c.plans[plan.GetPlanID()].dataNodeID
//req := &datapb.SyncSegmentsRequest{
// PlanID: plan.PlanID,
//}
//
//log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
//if err := c.sessions.SyncSegments(nodeID, req); err != nil {
// log.Warn("handleCompactionResult: fail to sync segments with node",
// zap.Int64("nodeID", nodeID), zap.Error(err))
// return err
//}
nodeID := c.plans[plan.GetPlanID()].dataNodeID
req := &datapb.SyncSegmentsRequest{
PlanID: plan.PlanID,
}

log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID))
if err := c.sessions.SyncSegments(nodeID, req); err != nil {
log.Warn("handleCompactionResult: fail to sync segments with node",
zap.Int64("nodeID", nodeID), zap.Error(err))
return err
}

log.Info("handleCompactionResult: success to handle merge compaction result")
return nil
Expand Down Expand Up @@ -549,10 +549,10 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
log.Warn("compaction failed for channel nodeID not match")
//if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil {
// log.Warn("compaction failed to sync segments with node", zap.Error(err))
// continue
//}
if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
continue
}
c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan())
c.setSegmentsCompacting(task.plan, false)
c.scheduler.Finish(task.dataNodeID, task.plan)
Expand Down Expand Up @@ -621,12 +621,12 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error {
// Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task
// without changing the meta
log.Info("compaction syncing unknown plan with node")
//if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{
// PlanID: planID,
//}); err != nil {
// log.Warn("compaction failed to sync segments with node", zap.Error(err))
// return err
//}
if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{
PlanID: planID,
}); err != nil {
log.Warn("compaction failed to sync segments with node", zap.Error(err))
return err
}
}
}

Expand Down
22 changes: 13 additions & 9 deletions internal/datacoord/compaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() {
4: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 4, State: commonpb.CompactionState_Executing}},
}, nil)

s.mockSessMgr.EXPECT().SyncSegments(int64(100), mock.Anything).Return(nil).Once()
{
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
handler := newCompactionPlanHandler(nil, s.mockSessMgr, nil, nil, s.mockAlloc)
Expand Down Expand Up @@ -474,6 +475,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
}
return nil
}).Once()
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()

handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
Expand Down Expand Up @@ -515,6 +517,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return(
[]*SegmentInfo{segment},
&segMetricMutation{}, nil).Once()
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()

handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc)
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
Expand All @@ -526,7 +529,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
}

err := handler.handleMergeCompactionResult(plan, compactionResult)
s.NoError(err)
s.Error(err)
})
}

Expand All @@ -546,6 +549,7 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() {
})

s.Run("test complete merge compaction task", func() {
s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
// mock for handleMergeCompactionResult
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100})
Expand Down Expand Up @@ -698,14 +702,14 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
},
}

//s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error {
// s.EqualValues(nodeID, 222)
// s.NotNil(req)
// s.Empty(req.GetCompactedFrom())
// s.EqualValues(5, req.GetPlanID())
// return nil
//}).Once()
//s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil)
s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error {
s.EqualValues(nodeID, 222)
s.NotNil(req)
s.Empty(req.GetCompactedFrom())
s.EqualValues(5, req.GetPlanID())
return nil
}).Once()
s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil)
s.mockCm.EXPECT().Match(int64(111), "ch-1").Return(true)
s.mockCm.EXPECT().Match(int64(111), "ch-2").Return(false).Once()

Expand Down
8 changes: 8 additions & 0 deletions internal/datanode/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,14 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
return merr.Status(err), nil
}

if req.GetPlanID() != 0 {
node.compactionExecutor.removeTask(req.GetPlanID())
}

if len(req.GetSegmentInfos()) <= 0 {
return merr.Success(), nil
}

ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName())
if !ok {
node.compactionExecutor.discardPlan(req.GetChannelName())
Expand Down

0 comments on commit 7f27783

Please sign in to comment.