From 7f27783c9e5389ad2d4c4faf3c121d36db317b9e Mon Sep 17 00:00:00 2001 From: Cai Zhang Date: Tue, 28 May 2024 16:29:08 +0800 Subject: [PATCH] update Signed-off-by: Cai Zhang --- internal/datacoord/compaction.go | 42 +++++++++++++-------------- internal/datacoord/compaction_test.go | 22 ++++++++------ internal/datanode/services.go | 8 +++++ 3 files changed, 42 insertions(+), 30 deletions(-) diff --git a/internal/datacoord/compaction.go b/internal/datacoord/compaction.go index a1e67991bc3de..c9919d7b821ea 100644 --- a/internal/datacoord/compaction.go +++ b/internal/datacoord/compaction.go @@ -488,17 +488,17 @@ func (c *compactionPlanHandler) handleMergeCompactionResult(plan *datapb.Compact metricMutation.commit() } - //nodeID := c.plans[plan.GetPlanID()].dataNodeID - //req := &datapb.SyncSegmentsRequest{ - // PlanID: plan.PlanID, - //} - // - //log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID)) - //if err := c.sessions.SyncSegments(nodeID, req); err != nil { - // log.Warn("handleCompactionResult: fail to sync segments with node", - // zap.Int64("nodeID", nodeID), zap.Error(err)) - // return err - //} + nodeID := c.plans[plan.GetPlanID()].dataNodeID + req := &datapb.SyncSegmentsRequest{ + PlanID: plan.PlanID, + } + + log.Info("handleCompactionResult: syncing segments with node", zap.Int64("nodeID", nodeID)) + if err := c.sessions.SyncSegments(nodeID, req); err != nil { + log.Warn("handleCompactionResult: fail to sync segments with node", + zap.Int64("nodeID", nodeID), zap.Error(err)) + return err + } log.Info("handleCompactionResult: success to handle merge compaction result") return nil @@ -549,10 +549,10 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task // without changing the meta log.Warn("compaction failed for channel nodeID not match") - //if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil { - // log.Warn("compaction failed to sync segments with node", zap.Error(err)) - // continue - //} + if err := c.sessions.SyncSegments(task.dataNodeID, &datapb.SyncSegmentsRequest{PlanID: planID}); err != nil { + log.Warn("compaction failed to sync segments with node", zap.Error(err)) + continue + } c.plans[planID] = c.plans[planID].shadowClone(setState(failed), endSpan()) c.setSegmentsCompacting(task.plan, false) c.scheduler.Finish(task.dataNodeID, task.plan) @@ -621,12 +621,12 @@ func (c *compactionPlanHandler) updateCompaction(ts Timestamp) error { // Sync segments without CompactionFrom segmentsIDs to make sure DN clear the task // without changing the meta log.Info("compaction syncing unknown plan with node") - //if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{ - // PlanID: planID, - //}); err != nil { - // log.Warn("compaction failed to sync segments with node", zap.Error(err)) - // return err - //} + if err := c.sessions.SyncSegments(nodeID, &datapb.SyncSegmentsRequest{ + PlanID: planID, + }); err != nil { + log.Warn("compaction failed to sync segments with node", zap.Error(err)) + return err + } } } diff --git a/internal/datacoord/compaction_test.go b/internal/datacoord/compaction_test.go index 1774c1654afbb..879dfdbbbb9a9 100644 --- a/internal/datacoord/compaction_test.go +++ b/internal/datacoord/compaction_test.go @@ -84,6 +84,7 @@ func (s *CompactionPlanHandlerSuite) TestCheckResult() { 4: {A: 100, B: &datapb.CompactionPlanResult{PlanID: 4, State: commonpb.CompactionState_Executing}}, }, nil) + s.mockSessMgr.EXPECT().SyncSegments(int64(100), mock.Anything).Return(nil).Once() { s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once() handler := newCompactionPlanHandler(nil, s.mockSessMgr, nil, nil, s.mockAlloc) @@ -474,6 +475,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { } return nil }).Once() + s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} @@ -515,6 +517,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { s.mockMeta.EXPECT().CompleteCompactionMutation(mock.Anything, mock.Anything).Return( []*SegmentInfo{segment}, &segMetricMutation{}, nil).Once() + s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once() handler := newCompactionPlanHandler(nil, s.mockSessMgr, s.mockCm, s.mockMeta, s.mockAlloc) handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan} @@ -526,7 +529,7 @@ func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() { } err := handler.handleMergeCompactionResult(plan, compactionResult) - s.NoError(err) + s.Error(err) }) } @@ -546,6 +549,7 @@ func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() { }) s.Run("test complete merge compaction task", func() { + s.mockSessMgr.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once() // mock for handleMergeCompactionResult s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once() segment := NewSegmentInfo(&datapb.SegmentInfo{ID: 100}) @@ -698,14 +702,14 @@ func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() { }, } - //s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error { - // s.EqualValues(nodeID, 222) - // s.NotNil(req) - // s.Empty(req.GetCompactedFrom()) - // s.EqualValues(5, req.GetPlanID()) - // return nil - //}).Once() - //s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil) + s.mockSessMgr.EXPECT().SyncSegments(int64(222), mock.Anything).RunAndReturn(func(nodeID int64, req *datapb.SyncSegmentsRequest) error { + s.EqualValues(nodeID, 222) + s.NotNil(req) + s.Empty(req.GetCompactedFrom()) + s.EqualValues(5, req.GetPlanID()) + return nil + }).Once() + s.mockSessMgr.EXPECT().SyncSegments(int64(111), mock.Anything).Return(nil) s.mockCm.EXPECT().Match(int64(111), "ch-1").Return(true) s.mockCm.EXPECT().Match(int64(111), "ch-2").Return(false).Once() diff --git a/internal/datanode/services.go b/internal/datanode/services.go index 6b9d408d3e127..51d30bc13cc83 100644 --- a/internal/datanode/services.go +++ b/internal/datanode/services.go @@ -276,6 +276,14 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments return merr.Status(err), nil } + if req.GetPlanID() != 0 { + node.compactionExecutor.removeTask(req.GetPlanID()) + } + + if len(req.GetSegmentInfos()) <= 0 { + return merr.Success(), nil + } + ds, ok := node.flowgraphManager.GetFlowgraphService(req.GetChannelName()) if !ok { node.compactionExecutor.discardPlan(req.GetChannelName())