Skip to content

Commit

Permalink
Avoid growing slice as deserializing binlogs (#17421)
Browse files Browse the repository at this point in the history
Signed-off-by: yah01 <yang.cen@zilliz.com>
  • Loading branch information
yah01 committed Jun 8, 2022
1 parent 1dbfebf commit 70f8bea
Show file tree
Hide file tree
Showing 3 changed files with 167 additions and 87 deletions.
24 changes: 13 additions & 11 deletions internal/querynode/segment_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme
return err
}

newSegments := make(map[UniqueID]*Segment)
newSegments := make(map[UniqueID]*Segment, len(req.Infos))
segmentGC := func() {
for _, s := range newSegments {
deleteSegment(s)
Expand Down Expand Up @@ -216,7 +216,6 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
// for now, there will be multiple copies in the process of data loading into segCore
defer debug.FreeOSMemory()

var fieldBinlogs []*datapb.FieldBinlog
if segment.getType() == segmentTypeSealed {
fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo)
for _, indexInfo := range loadInfo.IndexInfos {
Expand All @@ -227,6 +226,7 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
}

indexedFieldInfos := make(map[int64]*IndexedFieldInfo)
fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(loadInfo.BinlogPaths))

for _, fieldBinlog := range loadInfo.BinlogPaths {
fieldID := fieldBinlog.FieldID
Expand All @@ -244,12 +244,11 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
if err := loader.loadIndexedFieldData(segment, indexedFieldInfos); err != nil {
return err
}
if err := loader.loadSealedSegmentFields(segment, fieldBinlogs); err != nil {
if err := loader.loadSealedSegmentFields(segment, fieldBinlogs, loadInfo); err != nil {
return err
}
} else {
fieldBinlogs = loadInfo.BinlogPaths
if err := loader.loadGrowingSegmentFields(segment, fieldBinlogs); err != nil {
if err := loader.loadGrowingSegmentFields(segment, loadInfo.BinlogPaths); err != nil {
return err
}
}
Expand Down Expand Up @@ -291,7 +290,7 @@ func (loader *segmentLoader) loadGrowingSegmentFields(segment *Segment, fieldBin
iCodec := storage.InsertCodec{}

// change all field bin log loading into concurrent
loadFutures := make([]*concurrency.Future, 0)
loadFutures := make([]*concurrency.Future, 0, len(fieldBinlogs))
for _, fieldBinlog := range fieldBinlogs {
futures := loader.loadFieldBinlogsAsync(fieldBinlog)
loadFutures = append(loadFutures, futures...)
Expand Down Expand Up @@ -342,11 +341,11 @@ func (loader *segmentLoader) loadGrowingSegmentFields(segment *Segment, fieldBin
}
}

func (loader *segmentLoader) loadSealedSegmentFields(segment *Segment, fields []*datapb.FieldBinlog) error {
func (loader *segmentLoader) loadSealedSegmentFields(segment *Segment, fields []*datapb.FieldBinlog, loadInfo *querypb.SegmentLoadInfo) error {
// Load fields concurrently
futures := make([]*concurrency.Future, 0, len(fields))
for _, field := range fields {
future := loader.loadSealedFieldAsync(segment, field)
future := loader.loadSealedFieldAsync(segment, field, loadInfo)

futures = append(futures, future)
}
Expand All @@ -365,7 +364,7 @@ func (loader *segmentLoader) loadSealedSegmentFields(segment *Segment, fields []
}

// async load field of sealed segment
func (loader *segmentLoader) loadSealedFieldAsync(segment *Segment, field *datapb.FieldBinlog) *concurrency.Future {
func (loader *segmentLoader) loadSealedFieldAsync(segment *Segment, field *datapb.FieldBinlog, loadInfo *querypb.SegmentLoadInfo) *concurrency.Future {
iCodec := storage.InsertCodec{}

// Avoid consuming too much memory if no CPU worker ready,
Expand All @@ -383,13 +382,16 @@ func (loader *segmentLoader) loadSealedFieldAsync(segment *Segment, field *datap
blobs[index] = blob
}

_, _, insertData, err := iCodec.Deserialize(blobs)
insertData := storage.InsertData{
Data: make(map[int64]storage.FieldData),
}
_, _, _, err := iCodec.DeserializeInto(blobs, int(loadInfo.GetNumOfRows()), &insertData)
if err != nil {
log.Warn(err.Error())
return nil, err
}

return nil, loader.loadSealedSegments(segment, insertData)
return nil, loader.loadSealedSegments(segment, &insertData)
})
}

Expand Down
2 changes: 1 addition & 1 deletion internal/querynode/segment_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) {
binlog, _, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema)
assert.NoError(t, err)

err = loader.loadSealedSegmentFields(segment, binlog)
err = loader.loadSealedSegmentFields(segment, binlog, &querypb.SegmentLoadInfo{})
assert.NoError(t, err)
}

Expand Down

0 comments on commit 70f8bea

Please sign in to comment.