diff --git a/internal/datanode/compaction/clustering_compactor.go b/internal/datanode/compaction/clustering_compactor.go index 1fb6b8c3acb7..70fdaa5ee365 100644 --- a/internal/datanode/compaction/clustering_compactor.go +++ b/internal/datanode/compaction/clustering_compactor.go @@ -19,6 +19,7 @@ package compaction import ( "context" "fmt" + sio "io" "math" "path" "sort" @@ -481,33 +482,33 @@ func (t *clusteringCompactionTask) mappingSegment( fieldBinlogPaths = append(fieldBinlogPaths, ps) } - for _, path := range fieldBinlogPaths { - bytesArr, err := t.binlogIO.Download(ctx, path) - blobs := make([]*storage.Blob, len(bytesArr)) - var segmentSize int64 - for i := range bytesArr { - blobs[i] = &storage.Blob{Value: bytesArr[i]} - segmentSize = segmentSize + int64(len(bytesArr[i])) - } + for _, paths := range fieldBinlogPaths { + allValues, err := t.binlogIO.Download(ctx, paths) if err != nil { - log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err)) + log.Warn("compact wrong, fail to download insertLogs", zap.Error(err)) return err } - - pkIter, err := storage.NewInsertBinlogIterator(blobs, t.primaryKeyField.GetFieldID(), t.primaryKeyField.GetDataType()) + blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob { + return &storage.Blob{Key: paths[i], Value: v} + }) + pkIter, err := storage.NewBinlogDeserializeReader(blobs, t.primaryKeyField.GetFieldID()) if err != nil { - log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err)) + log.Warn("new insert binlogs Itr wrong", zap.Strings("paths", paths), zap.Error(err)) return err } var offset int64 = -1 - for pkIter.HasNext() { - vInter, _ := pkIter.Next() - v, ok := vInter.(*storage.Value) - if !ok { - log.Warn("transfer interface to Value wrong", zap.Strings("path", path)) - return errors.New("unexpected error") + for { + err := pkIter.Next() + if err != nil { + if err == sio.EOF { + break + } else { + log.Warn("compact wrong, failed to iter through data", zap.Error(err)) + return err + } } + v := pkIter.Value() offset++ // Filtering deleted entity @@ -524,7 +525,7 @@ func (t *clusteringCompactionTask) mappingSegment( row, ok := v.Value.(map[typeutil.UniqueID]interface{}) if !ok { - log.Warn("transfer interface to map wrong", zap.Strings("path", path)) + log.Warn("transfer interface to map wrong", zap.Strings("paths", paths)) return errors.New("unexpected error") }