Skip to content

Commit

Permalink
Use new stream segment reader in clustering compaction (#34232)
Browse files Browse the repository at this point in the history
#32939

Signed-off-by: wayblink <anyang.wang@zilliz.com>
  • Loading branch information
wayblink committed Jun 30, 2024
1 parent 73ffc1b commit e5d691d
Showing 1 changed file with 20 additions and 19 deletions.
39 changes: 20 additions & 19 deletions internal/datanode/compaction/clustering_compactor.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package compaction
import (
"context"
"fmt"
sio "io"
"math"
"path"
"sort"
Expand Down Expand Up @@ -481,33 +482,33 @@ func (t *clusteringCompactionTask) mappingSegment(
fieldBinlogPaths = append(fieldBinlogPaths, ps)
}

for _, path := range fieldBinlogPaths {
bytesArr, err := t.binlogIO.Download(ctx, path)
blobs := make([]*storage.Blob, len(bytesArr))
var segmentSize int64
for i := range bytesArr {
blobs[i] = &storage.Blob{Value: bytesArr[i]}
segmentSize = segmentSize + int64(len(bytesArr[i]))
}
for _, paths := range fieldBinlogPaths {
allValues, err := t.binlogIO.Download(ctx, paths)
if err != nil {
log.Warn("download insertlogs wrong", zap.Strings("path", path), zap.Error(err))
log.Warn("compact wrong, fail to download insertLogs", zap.Error(err))
return err
}

pkIter, err := storage.NewInsertBinlogIterator(blobs, t.primaryKeyField.GetFieldID(), t.primaryKeyField.GetDataType())
blobs := lo.Map(allValues, func(v []byte, i int) *storage.Blob {
return &storage.Blob{Key: paths[i], Value: v}
})
pkIter, err := storage.NewBinlogDeserializeReader(blobs, t.primaryKeyField.GetFieldID())
if err != nil {
log.Warn("new insert binlogs Itr wrong", zap.Strings("path", path), zap.Error(err))
log.Warn("new insert binlogs Itr wrong", zap.Strings("paths", paths), zap.Error(err))
return err
}

var offset int64 = -1
for pkIter.HasNext() {
vInter, _ := pkIter.Next()
v, ok := vInter.(*storage.Value)
if !ok {
log.Warn("transfer interface to Value wrong", zap.Strings("path", path))
return errors.New("unexpected error")
for {
err := pkIter.Next()
if err != nil {
if err == sio.EOF {
break
} else {
log.Warn("compact wrong, failed to iter through data", zap.Error(err))
return err
}
}
v := pkIter.Value()
offset++

// Filtering deleted entity
Expand All @@ -524,7 +525,7 @@ func (t *clusteringCompactionTask) mappingSegment(

row, ok := v.Value.(map[typeutil.UniqueID]interface{})
if !ok {
log.Warn("transfer interface to map wrong", zap.Strings("path", path))
log.Warn("transfer interface to map wrong", zap.Strings("paths", paths))
return errors.New("unexpected error")
}

Expand Down

0 comments on commit e5d691d

Please sign in to comment.