diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index 616ea2eb9e2d..2d8e9cde06aa 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -108,7 +108,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme return err } - newSegments := make(map[UniqueID]*Segment) + newSegments := make(map[UniqueID]*Segment, len(req.Infos)) segmentGC := func() { for _, s := range newSegments { deleteSegment(s) @@ -216,7 +216,6 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment, // for now, there will be multiple copies in the process of data loading into segCore defer debug.FreeOSMemory() - var fieldBinlogs []*datapb.FieldBinlog if segment.getType() == segmentTypeSealed { fieldID2IndexInfo := make(map[int64]*querypb.FieldIndexInfo) for _, indexInfo := range loadInfo.IndexInfos { @@ -227,6 +226,7 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment, } indexedFieldInfos := make(map[int64]*IndexedFieldInfo) + fieldBinlogs := make([]*datapb.FieldBinlog, 0, len(loadInfo.BinlogPaths)) for _, fieldBinlog := range loadInfo.BinlogPaths { fieldID := fieldBinlog.FieldID @@ -244,12 +244,11 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment, if err := loader.loadIndexedFieldData(segment, indexedFieldInfos); err != nil { return err } - if err := loader.loadSealedSegmentFields(segment, fieldBinlogs); err != nil { + if err := loader.loadSealedSegmentFields(segment, fieldBinlogs, loadInfo); err != nil { return err } } else { - fieldBinlogs = loadInfo.BinlogPaths - if err := loader.loadGrowingSegmentFields(segment, fieldBinlogs); err != nil { + if err := loader.loadGrowingSegmentFields(segment, loadInfo.BinlogPaths); err != nil { return err } } @@ -291,7 +290,7 @@ func (loader *segmentLoader) loadGrowingSegmentFields(segment *Segment, fieldBin iCodec := storage.InsertCodec{} // change all field bin log loading into concurrent - loadFutures := make([]*concurrency.Future, 0) + loadFutures := make([]*concurrency.Future, 0, len(fieldBinlogs)) for _, fieldBinlog := range fieldBinlogs { futures := loader.loadFieldBinlogsAsync(fieldBinlog) loadFutures = append(loadFutures, futures...) @@ -342,11 +341,11 @@ func (loader *segmentLoader) loadGrowingSegmentFields(segment *Segment, fieldBin } } -func (loader *segmentLoader) loadSealedSegmentFields(segment *Segment, fields []*datapb.FieldBinlog) error { +func (loader *segmentLoader) loadSealedSegmentFields(segment *Segment, fields []*datapb.FieldBinlog, loadInfo *querypb.SegmentLoadInfo) error { // Load fields concurrently futures := make([]*concurrency.Future, 0, len(fields)) for _, field := range fields { - future := loader.loadSealedFieldAsync(segment, field) + future := loader.loadSealedFieldAsync(segment, field, loadInfo) futures = append(futures, future) } @@ -365,7 +364,7 @@ func (loader *segmentLoader) loadSealedSegmentFields(segment *Segment, fields [] } // async load field of sealed segment -func (loader *segmentLoader) loadSealedFieldAsync(segment *Segment, field *datapb.FieldBinlog) *concurrency.Future { +func (loader *segmentLoader) loadSealedFieldAsync(segment *Segment, field *datapb.FieldBinlog, loadInfo *querypb.SegmentLoadInfo) *concurrency.Future { iCodec := storage.InsertCodec{} // Avoid consuming too much memory if no CPU worker ready, @@ -383,13 +382,16 @@ func (loader *segmentLoader) loadSealedFieldAsync(segment *Segment, field *datap blobs[index] = blob } - _, _, insertData, err := iCodec.Deserialize(blobs) + insertData := storage.InsertData{ + Data: make(map[int64]storage.FieldData), + } + _, _, _, err := iCodec.DeserializeInto(blobs, int(loadInfo.GetNumOfRows()), &insertData) if err != nil { log.Warn(err.Error()) return nil, err } - return nil, loader.loadSealedSegments(segment, insertData) + return nil, loader.loadSealedSegments(segment, &insertData) }) } diff --git a/internal/querynode/segment_loader_test.go b/internal/querynode/segment_loader_test.go index 68e4aa3fe99a..6c3cbd1b12a7 100644 --- a/internal/querynode/segment_loader_test.go +++ b/internal/querynode/segment_loader_test.go @@ -181,7 +181,7 @@ func TestSegmentLoader_loadSegmentFieldsData(t *testing.T) { binlog, _, err := saveBinLog(ctx, defaultCollectionID, defaultPartitionID, defaultSegmentID, defaultMsgLength, schema) assert.NoError(t, err) - err = loader.loadSealedSegmentFields(segment, binlog) + err = loader.loadSealedSegmentFields(segment, binlog, &querypb.SegmentLoadInfo{}) assert.NoError(t, err) } diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index 8ffd2f56efab..0794c17a2ccb 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -448,215 +448,293 @@ func (insertCodec *InsertCodec) DeserializeAll(blobs []*Blob) ( var blobList BlobList = blobs sort.Sort(blobList) - var cID UniqueID - var pID UniqueID - var sID UniqueID - resultData := &InsertData{ + data = &InsertData{ Data: make(map[FieldID]FieldData), } - for _, blob := range blobList { + if collectionID, partitionID, segmentID, err = insertCodec.DeserializeInto(blobs, 0, data); err != nil { + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + } + + return +} + +func (insertCodec *InsertCodec) DeserializeInto(fieldBinlogs []*Blob, rowNum int, insertData *InsertData) ( + collectionID UniqueID, + partitionID UniqueID, + segmentID UniqueID, + err error, +) { + for _, blob := range fieldBinlogs { binlogReader, err := NewBinlogReader(blob.Value) if err != nil { - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err } // read partitionID and SegmentID - cID, pID, sID = binlogReader.CollectionID, binlogReader.PartitionID, binlogReader.SegmentID + collectionID, partitionID, segmentID = binlogReader.CollectionID, binlogReader.PartitionID, binlogReader.SegmentID dataType := binlogReader.PayloadDataType fieldID := binlogReader.FieldID totalLength := 0 + dim := 0 + for { eventReader, err := binlogReader.NextEventReader() if err != nil { - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err } if eventReader == nil { break } switch dataType { case schemapb.DataType_Bool: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &BoolFieldData{} - } - boolFieldData := resultData.Data[fieldID].(*BoolFieldData) singleData, err := eventReader.GetBoolFromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err } + + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &BoolFieldData{ + NumRows: make([]int64, 0), + Data: make([]bool, 0, rowNum), + } + } + boolFieldData := insertData.Data[fieldID].(*BoolFieldData) + boolFieldData.Data = append(boolFieldData.Data, singleData...) totalLength += len(singleData) boolFieldData.NumRows = append(boolFieldData.NumRows, int64(len(singleData))) - resultData.Data[fieldID] = boolFieldData + insertData.Data[fieldID] = boolFieldData + case schemapb.DataType_Int8: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &Int8FieldData{} - } - int8FieldData := resultData.Data[fieldID].(*Int8FieldData) singleData, err := eventReader.GetInt8FromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &Int8FieldData{ + NumRows: make([]int64, 0), + Data: make([]int8, 0, rowNum), + } } + int8FieldData := insertData.Data[fieldID].(*Int8FieldData) + int8FieldData.Data = append(int8FieldData.Data, singleData...) totalLength += len(singleData) int8FieldData.NumRows = append(int8FieldData.NumRows, int64(len(singleData))) - resultData.Data[fieldID] = int8FieldData + insertData.Data[fieldID] = int8FieldData + case schemapb.DataType_Int16: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &Int16FieldData{} - } - int16FieldData := resultData.Data[fieldID].(*Int16FieldData) singleData, err := eventReader.GetInt16FromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &Int16FieldData{ + NumRows: make([]int64, 0), + Data: make([]int16, 0, rowNum), + } } + int16FieldData := insertData.Data[fieldID].(*Int16FieldData) + int16FieldData.Data = append(int16FieldData.Data, singleData...) totalLength += len(singleData) int16FieldData.NumRows = append(int16FieldData.NumRows, int64(len(singleData))) - resultData.Data[fieldID] = int16FieldData + insertData.Data[fieldID] = int16FieldData + case schemapb.DataType_Int32: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &Int32FieldData{} - } - int32FieldData := resultData.Data[fieldID].(*Int32FieldData) singleData, err := eventReader.GetInt32FromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &Int32FieldData{ + NumRows: make([]int64, 0), + Data: make([]int32, 0, rowNum), + } } + int32FieldData := insertData.Data[fieldID].(*Int32FieldData) + int32FieldData.Data = append(int32FieldData.Data, singleData...) totalLength += len(singleData) int32FieldData.NumRows = append(int32FieldData.NumRows, int64(len(singleData))) - resultData.Data[fieldID] = int32FieldData + insertData.Data[fieldID] = int32FieldData + case schemapb.DataType_Int64: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &Int64FieldData{} - } - int64FieldData := resultData.Data[fieldID].(*Int64FieldData) singleData, err := eventReader.GetInt64FromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err } + + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &Int64FieldData{ + NumRows: make([]int64, 0), + Data: make([]int64, 0, rowNum), + } + } + int64FieldData := insertData.Data[fieldID].(*Int64FieldData) + int64FieldData.Data = append(int64FieldData.Data, singleData...) totalLength += len(singleData) int64FieldData.NumRows = append(int64FieldData.NumRows, int64(len(singleData))) - resultData.Data[fieldID] = int64FieldData + insertData.Data[fieldID] = int64FieldData + case schemapb.DataType_Float: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &FloatFieldData{} - } - floatFieldData := resultData.Data[fieldID].(*FloatFieldData) singleData, err := eventReader.GetFloatFromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &FloatFieldData{ + NumRows: make([]int64, 0), + Data: make([]float32, 0, rowNum), + } } + floatFieldData := insertData.Data[fieldID].(*FloatFieldData) + floatFieldData.Data = append(floatFieldData.Data, singleData...) totalLength += len(singleData) floatFieldData.NumRows = append(floatFieldData.NumRows, int64(len(singleData))) - resultData.Data[fieldID] = floatFieldData + insertData.Data[fieldID] = floatFieldData + case schemapb.DataType_Double: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &DoubleFieldData{} - } - doubleFieldData := resultData.Data[fieldID].(*DoubleFieldData) singleData, err := eventReader.GetDoubleFromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &DoubleFieldData{ + NumRows: make([]int64, 0), + Data: make([]float64, 0, rowNum), + } } + doubleFieldData := insertData.Data[fieldID].(*DoubleFieldData) + doubleFieldData.Data = append(doubleFieldData.Data, singleData...) totalLength += len(singleData) doubleFieldData.NumRows = append(doubleFieldData.NumRows, int64(len(singleData))) - resultData.Data[fieldID] = doubleFieldData + insertData.Data[fieldID] = doubleFieldData + case schemapb.DataType_String, schemapb.DataType_VarChar: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &StringFieldData{} - } - stringFieldData := resultData.Data[fieldID].(*StringFieldData) stringPayload, err := eventReader.GetStringFromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err } + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &StringFieldData{ + NumRows: make([]int64, 0), + Data: make([]string, 0, rowNum), + } + } + stringFieldData := insertData.Data[fieldID].(*StringFieldData) + stringFieldData.Data = append(stringFieldData.Data, stringPayload...) totalLength += len(stringPayload) stringFieldData.NumRows = append(stringFieldData.NumRows, int64(len(stringPayload))) - resultData.Data[fieldID] = stringFieldData + insertData.Data[fieldID] = stringFieldData + case schemapb.DataType_BinaryVector: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &BinaryVectorFieldData{} - } - binaryVectorFieldData := resultData.Data[fieldID].(*BinaryVectorFieldData) var singleData []byte - singleData, binaryVectorFieldData.Dim, err = eventReader.GetBinaryVectorFromPayload() + singleData, dim, err = eventReader.GetBinaryVectorFromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err + } + + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &BinaryVectorFieldData{ + NumRows: make([]int64, 0), + Data: make([]byte, 0, rowNum*dim), + } } + binaryVectorFieldData := insertData.Data[fieldID].(*BinaryVectorFieldData) + binaryVectorFieldData.Data = append(binaryVectorFieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err } totalLength += length binaryVectorFieldData.NumRows = append(binaryVectorFieldData.NumRows, int64(length)) - resultData.Data[fieldID] = binaryVectorFieldData + binaryVectorFieldData.Dim = dim + insertData.Data[fieldID] = binaryVectorFieldData + case schemapb.DataType_FloatVector: - if resultData.Data[fieldID] == nil { - resultData.Data[fieldID] = &FloatVectorFieldData{} - } - floatVectorFieldData := resultData.Data[fieldID].(*FloatVectorFieldData) var singleData []float32 - singleData, floatVectorFieldData.Dim, err = eventReader.GetFloatVectorFromPayload() + singleData, dim, err = eventReader.GetFloatVectorFromPayload() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err } + + if insertData.Data[fieldID] == nil { + insertData.Data[fieldID] = &FloatVectorFieldData{ + NumRows: make([]int64, 0), + Data: make([]float32, 0, rowNum*dim), + } + } + floatVectorFieldData := insertData.Data[fieldID].(*FloatVectorFieldData) + floatVectorFieldData.Data = append(floatVectorFieldData.Data, singleData...) length, err := eventReader.GetPayloadLengthFromReader() if err != nil { eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, err + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, err } totalLength += length floatVectorFieldData.NumRows = append(floatVectorFieldData.NumRows, int64(length)) - resultData.Data[fieldID] = floatVectorFieldData + floatVectorFieldData.Dim = dim + insertData.Data[fieldID] = floatVectorFieldData + default: eventReader.Close() binlogReader.Close() - return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("undefined data type %d", dataType) + return InvalidUniqueID, InvalidUniqueID, InvalidUniqueID, fmt.Errorf("undefined data type %d", dataType) } eventReader.Close() } + + if rowNum <= 0 { + rowNum = totalLength + } + if fieldID == common.TimeStampField { blobInfo := BlobInfo{ Length: totalLength, } - resultData.Infos = append(resultData.Infos, blobInfo) + insertData.Infos = append(insertData.Infos, blobInfo) } binlogReader.Close() } - return cID, pID, sID, resultData, nil + return collectionID, partitionID, segmentID, nil } // Deserialize transfer blob back to insert data.