Skip to content

Commit

Permalink
fix: Prevent import from generating orphaned files (#34070)
Browse files Browse the repository at this point in the history
When import failed, mark the import segment as dropped instead of drop
it directly to prevent generating orphaned files.

issue: #34068

---------

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
  • Loading branch information
bigsheeper committed Jun 25, 2024
1 parent b12c34a commit 940a0ac
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 6 deletions.
6 changes: 3 additions & 3 deletions internal/datacoord/import_scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -344,10 +344,10 @@ func (s *importScheduler) processFailed(task ImportTask) {
if task.GetType() == ImportTaskType {
segments := task.(*importTask).GetSegmentIDs()
for _, segment := range segments {
err := s.meta.DropSegment(segment)
op := UpdateStatusOperator(segment, commonpb.SegmentState_Dropped)
err := s.meta.UpdateSegmentsInfo(op)
if err != nil {
log.Warn("drop import segment failed",
WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...)
log.Warn("drop import segment failed", WrapTaskLog(task, zap.Int64("segment", segment), zap.Error(err))...)
return
}
}
Expand Down
7 changes: 4 additions & 3 deletions internal/datacoord/import_scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand Down Expand Up @@ -250,7 +251,7 @@ func (s *ImportSchedulerSuite) TestProcessFailed() {
})
for _, id := range task.(*importTask).GetSegmentIDs() {
segment := &SegmentInfo{
SegmentInfo: &datapb.SegmentInfo{ID: id, IsImporting: true},
SegmentInfo: &datapb.SegmentInfo{ID: id, State: commonpb.SegmentState_Importing, IsImporting: true},
}
err = s.meta.AddSegment(context.Background(), segment)
s.NoError(err)
Expand All @@ -261,11 +262,11 @@ func (s *ImportSchedulerSuite) TestProcessFailed() {
}

s.cluster.EXPECT().DropImport(mock.Anything, mock.Anything).Return(nil)
s.catalog.EXPECT().DropSegment(mock.Anything, mock.Anything).Return(nil)
s.catalog.EXPECT().AlterSegments(mock.Anything, mock.Anything).Return(nil)
s.scheduler.process()
for _, id := range task.(*importTask).GetSegmentIDs() {
segment := s.meta.GetSegment(id)
s.Nil(segment)
s.Equal(commonpb.SegmentState_Dropped, segment.GetState())
}
task = s.imeta.GetTask(task.GetTaskID())
s.Equal(datapb.ImportTaskStateV2_Failed, task.GetState())
Expand Down

0 comments on commit 940a0ac

Please sign in to comment.