From 512ea6be5fff77fdafe37386a5097d92bb213a21 Mon Sep 17 00:00:00 2001 From: congqixia Date: Thu, 13 Jun 2024 11:15:56 +0800 Subject: [PATCH] enhance: Avoid merging insert data when buffering insert msgs (#33562) See also #33561 This PR: - Use zero copy when buffering insert messages - Make `storage.InsertCodec` support serialize multiple insert data chunk into same batch binlog files Signed-off-by: Congqi Xia --------- Signed-off-by: Congqi Xia --- internal/datanode/importv2/util.go | 2 +- .../iterators/binlog_iterator_test.go | 14 + .../metacache/bloom_filter_set_test.go | 2 +- internal/datanode/syncmgr/serializer.go | 10 +- .../datanode/syncmgr/storage_serializer.go | 23 +- .../syncmgr/storage_serializer_test.go | 8 +- .../datanode/syncmgr/storage_v2_serializer.go | 8 +- .../syncmgr/storage_v2_serializer_test.go | 8 +- internal/datanode/syncmgr/task_test.go | 4 +- internal/datanode/syncmgr/taskv2_test.go | 4 +- .../datanode/writebuffer/insert_buffer.go | 35 +- .../writebuffer/insert_buffer_test.go | 23 +- .../datanode/writebuffer/segment_buffer.go | 2 +- internal/datanode/writebuffer/write_buffer.go | 10 +- internal/indexnode/chunkmgr_mock.go | 4 + internal/storage/data_codec.go | 299 ++++++++++-------- internal/storage/insert_data.go | 42 +-- internal/storage/print_binlog_test.go | 12 + .../util/importutilv2/binlog/field_reader.go | 2 +- .../util/importutilv2/binlog/reader_test.go | 2 +- 20 files changed, 290 insertions(+), 224 deletions(-) diff --git a/internal/datanode/importv2/util.go b/internal/datanode/importv2/util.go index 1098b3e9edf9..400a339af5a0 100644 --- a/internal/datanode/importv2/util.go +++ b/internal/datanode/importv2/util.go @@ -79,7 +79,7 @@ func NewSyncTask(ctx context.Context, } syncPack := &syncmgr.SyncPack{} - syncPack.WithInsertData(insertData). + syncPack.WithInsertData([]*storage.InsertData{insertData}). WithDeleteData(deleteData). WithCollectionID(collectionID). WithPartitionID(partitionID). diff --git a/internal/datanode/iterators/binlog_iterator_test.go b/internal/datanode/iterators/binlog_iterator_test.go index 3bb747d15c88..08491db87de8 100644 --- a/internal/datanode/iterators/binlog_iterator_test.go +++ b/internal/datanode/iterators/binlog_iterator_test.go @@ -5,9 +5,11 @@ import ( "github.com/stretchr/testify/suite" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/storage" + "github.com/milvus-io/milvus/pkg/common" ) func TestInsertBinlogIteratorSuite(t *testing.T) { @@ -223,6 +225,9 @@ func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) { IsPrimaryKey: false, Description: "binary_vector", DataType: schemapb.DataType_BinaryVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }, }, { FieldID: FloatVectorField, @@ -230,6 +235,9 @@ func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) { IsPrimaryKey: false, Description: "float_vector", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "4"}, + }, }, { FieldID: Float16VectorField, @@ -237,6 +245,9 @@ func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) { IsPrimaryKey: false, Description: "float16_vector", DataType: schemapb.DataType_Float16Vector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "4"}, + }, }, { FieldID: BFloat16VectorField, @@ -244,6 +255,9 @@ func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) { IsPrimaryKey: false, Description: "bfloat16_vector", DataType: schemapb.DataType_BFloat16Vector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "4"}, + }, }, }, }, diff --git a/internal/datanode/metacache/bloom_filter_set_test.go b/internal/datanode/metacache/bloom_filter_set_test.go index 630133075e83..885eb1d37afc 100644 --- a/internal/datanode/metacache/bloom_filter_set_test.go +++ b/internal/datanode/metacache/bloom_filter_set_test.go @@ -46,7 +46,7 @@ func (s *BloomFilterSetSuite) GetFieldData(ids []int64) storage.FieldData { Name: "ID", IsPrimaryKey: true, DataType: schemapb.DataType_Int64, - }) + }, len(ids)) s.Require().NoError(err) for _, id := range ids { diff --git a/internal/datanode/syncmgr/serializer.go b/internal/datanode/syncmgr/serializer.go index 60217277d6e4..cd7be9d06208 100644 --- a/internal/datanode/syncmgr/serializer.go +++ b/internal/datanode/syncmgr/serializer.go @@ -19,6 +19,8 @@ package syncmgr import ( "context" + "github.com/samber/lo" + "github.com/milvus-io/milvus-proto/go-api/v2/msgpb" "github.com/milvus-io/milvus/internal/datanode/metacache" "github.com/milvus-io/milvus/internal/proto/datapb" @@ -37,7 +39,7 @@ type SyncPack struct { metacache metacache.MetaCache metawriter MetaWriter // data - insertData *storage.InsertData + insertData []*storage.InsertData deltaData *storage.DeleteData // statistics tsFrom typeutil.Timestamp @@ -55,8 +57,10 @@ type SyncPack struct { level datapb.SegmentLevel } -func (p *SyncPack) WithInsertData(insertData *storage.InsertData) *SyncPack { - p.insertData = insertData +func (p *SyncPack) WithInsertData(insertData []*storage.InsertData) *SyncPack { + p.insertData = lo.Filter(insertData, func(inData *storage.InsertData, _ int) bool { + return inData != nil + }) return p } diff --git a/internal/datanode/syncmgr/storage_serializer.go b/internal/datanode/syncmgr/storage_serializer.go index 35c0789adf8e..352246278497 100644 --- a/internal/datanode/syncmgr/storage_serializer.go +++ b/internal/datanode/syncmgr/storage_serializer.go @@ -82,10 +82,12 @@ func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) zap.String("channel", pack.channelName), ) - if pack.insertData != nil { + if len(pack.insertData) > 0 { memSize := make(map[int64]int64) - for fieldID, fieldData := range pack.insertData.Data { - memSize[fieldID] = int64(fieldData.GetMemorySize()) + for _, chunk := range pack.insertData { + for fieldID, fieldData := range chunk.Data { + memSize[fieldID] += int64(fieldData.GetMemorySize()) + } } task.binlogMemsize = memSize @@ -159,7 +161,7 @@ func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) { func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) { log := log.Ctx(ctx) - blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData) + blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData...) if err != nil { return nil, err } @@ -178,14 +180,21 @@ func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPac } func (s *storageV1Serializer) serializeStatslog(pack *SyncPack) (*storage.PrimaryKeyStats, *storage.Blob, error) { - pkFieldData := pack.insertData.Data[s.pkField.GetFieldID()] - rowNum := int64(pkFieldData.RowNum()) + var rowNum int64 + var pkFieldData []storage.FieldData + for _, chunk := range pack.insertData { + chunkPKData := chunk.Data[s.pkField.GetFieldID()] + pkFieldData = append(pkFieldData, chunkPKData) + rowNum += int64(chunkPKData.RowNum()) + } stats, err := storage.NewPrimaryKeyStats(s.pkField.GetFieldID(), int64(s.pkField.GetDataType()), rowNum) if err != nil { return nil, nil, err } - stats.UpdateByMsgs(pkFieldData) + for _, chunkPkData := range pkFieldData { + stats.UpdateByMsgs(chunkPkData) + } blob, err := s.inCodec.SerializePkStats(stats, pack.batchSize) if err != nil { diff --git a/internal/datanode/syncmgr/storage_serializer_test.go b/internal/datanode/syncmgr/storage_serializer_test.go index 43894b47b19b..226d41f015f1 100644 --- a/internal/datanode/syncmgr/storage_serializer_test.go +++ b/internal/datanode/syncmgr/storage_serializer_test.go @@ -160,7 +160,7 @@ func (s *StorageV1SerializerSuite) getBfs() *metacache.BloomFilterSet { Name: "ID", IsPrimaryKey: true, DataType: schemapb.DataType_Int64, - }) + }, 16) s.Require().NoError(err) ids := []int64{1, 2, 3, 4, 5, 6, 7} @@ -200,7 +200,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { s.Run("with_empty_data", func() { pack := s.getBasicPack() pack.WithTimeRange(50, 100) - pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0) + pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchSize(0) _, err := s.serializer.EncodeBuffer(ctx, pack) s.Error(err) @@ -209,7 +209,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { s.Run("with_normal_data", func() { pack := s.getBasicPack() pack.WithTimeRange(50, 100) - pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10) s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once() @@ -243,7 +243,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() { s.Run("with_flush", func() { pack := s.getBasicPack() pack.WithTimeRange(50, 100) - pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10) pack.WithFlush() bfs := s.getBfs() diff --git a/internal/datanode/syncmgr/storage_v2_serializer.go b/internal/datanode/syncmgr/storage_v2_serializer.go index 36ed26abc409..ad56e7b94caf 100644 --- a/internal/datanode/syncmgr/storage_v2_serializer.go +++ b/internal/datanode/syncmgr/storage_v2_serializer.go @@ -82,7 +82,7 @@ func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack) } task.space = space - if pack.insertData != nil { + if len(pack.insertData) > 0 { insertReader, err := s.serializeInsertData(pack) if err != nil { log.Warn("failed to serialize insert data with storagev2", zap.Error(err)) @@ -155,8 +155,10 @@ func (s *storageV2Serializer) serializeInsertData(pack *SyncPack) (array.RecordR builder := array.NewRecordBuilder(memory.DefaultAllocator, s.arrowSchema) defer builder.Release() - if err := iTypeutil.BuildRecord(builder, pack.insertData, s.schema.GetFields()); err != nil { - return nil, err + for _, chunk := range pack.insertData { + if err := iTypeutil.BuildRecord(builder, chunk, s.schema.GetFields()); err != nil { + return nil, err + } } rec := builder.NewRecord() diff --git a/internal/datanode/syncmgr/storage_v2_serializer_test.go b/internal/datanode/syncmgr/storage_v2_serializer_test.go index e430de00ab56..ad4bb12db5e4 100644 --- a/internal/datanode/syncmgr/storage_v2_serializer_test.go +++ b/internal/datanode/syncmgr/storage_v2_serializer_test.go @@ -179,7 +179,7 @@ func (s *StorageV2SerializerSuite) getBfs() *metacache.BloomFilterSet { Name: "ID", IsPrimaryKey: true, DataType: schemapb.DataType_Int64, - }) + }, 16) s.Require().NoError(err) ids := []int64{1, 2, 3, 4, 5, 6, 7} @@ -221,7 +221,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() { s.Run("empty_insert_data", func() { pack := s.getBasicPack() pack.WithTimeRange(50, 100) - pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0) + pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchSize(0) _, err := s.serializer.EncodeBuffer(ctx, pack) s.Error(err) @@ -230,7 +230,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() { s.Run("with_normal_data", func() { pack := s.getBasicPack() pack.WithTimeRange(50, 100) - pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10) s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once() @@ -264,7 +264,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() { s.Run("with_flush", func() { pack := s.getBasicPack() pack.WithTimeRange(50, 100) - pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10) pack.WithFlush() bfs := s.getBfs() diff --git a/internal/datanode/syncmgr/task_test.go b/internal/datanode/syncmgr/task_test.go index 5c932265a0de..98c543943c7f 100644 --- a/internal/datanode/syncmgr/task_test.go +++ b/internal/datanode/syncmgr/task_test.go @@ -171,7 +171,7 @@ func (s *SyncTaskSuite) TestRunNormal() { Name: "ID", IsPrimaryKey: true, DataType: schemapb.DataType_Int64, - }) + }, 16) s.Require().NoError(err) ids := []int64{1, 2, 3, 4, 5, 6, 7} @@ -299,7 +299,7 @@ func (s *SyncTaskSuite) TestCompactToNull() { Name: "ID", IsPrimaryKey: true, DataType: schemapb.DataType_Int64, - }) + }, 16) s.Require().NoError(err) ids := []int64{1, 2, 3, 4, 5, 6, 7} diff --git a/internal/datanode/syncmgr/taskv2_test.go b/internal/datanode/syncmgr/taskv2_test.go index bb8b36619129..c9d2fc302c7b 100644 --- a/internal/datanode/syncmgr/taskv2_test.go +++ b/internal/datanode/syncmgr/taskv2_test.go @@ -176,7 +176,7 @@ func (s *SyncTaskSuiteV2) getSuiteSyncTask() *SyncTaskV2 { Timestamp: 1000, ChannelName: s.channelName, }) - pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10) + pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10) pack.WithDeleteData(s.getDeleteBuffer()) storageCache, err := metacache.NewStorageV2Cache(s.schema) @@ -203,7 +203,7 @@ func (s *SyncTaskSuiteV2) TestRunNormal() { Name: "ID", IsPrimaryKey: true, DataType: schemapb.DataType_Int64, - }) + }, 16) s.Require().NoError(err) ids := []int64{1, 2, 3, 4, 5, 6, 7} diff --git a/internal/datanode/writebuffer/insert_buffer.go b/internal/datanode/writebuffer/insert_buffer.go index 417c258b34b4..c17beba4bddb 100644 --- a/internal/datanode/writebuffer/insert_buffer.go +++ b/internal/datanode/writebuffer/insert_buffer.go @@ -74,7 +74,7 @@ type InsertBuffer struct { BufferBase collSchema *schemapb.CollectionSchema - buffer *storage.InsertData + buffers []*storage.InsertData } func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) { @@ -87,13 +87,10 @@ func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) { if estSize == 0 { return nil, errors.New("Invalid schema") } - buffer, err := storage.NewInsertData(sch) - if err != nil { - return nil, err - } + sizeLimit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64() - return &InsertBuffer{ + ib := &InsertBuffer{ BufferBase: BufferBase{ rowLimit: noLimit, sizeLimit: sizeLimit, @@ -101,26 +98,34 @@ func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) { TimestampTo: 0, }, collSchema: sch, - buffer: buffer, - }, nil + } + + return ib, nil } -func (ib *InsertBuffer) Yield() *storage.InsertData { - if ib.IsEmpty() { - return nil - } +func (ib *InsertBuffer) buffer(inData *storage.InsertData, tr TimeRange, startPos, endPos *msgpb.MsgPosition) { + // buffer := ib.currentBuffer() + // storage.MergeInsertData(buffer.buffer, inData) + ib.buffers = append(ib.buffers, inData) + ib.UpdateStatistics(int64(inData.GetRowNum()), int64(inData.GetMemorySize()), tr, startPos, endPos) +} - return ib.buffer +func (ib *InsertBuffer) Yield() []*storage.InsertData { + result := ib.buffers + // set buffer nil to so that fragmented buffer could get GCed + ib.buffers = nil + return result } func (ib *InsertBuffer) Buffer(inData *inData, startPos, endPos *msgpb.MsgPosition) int64 { bufferedSize := int64(0) for idx, data := range inData.data { - storage.MergeInsertData(ib.buffer, data) tsData := inData.tsField[idx] + tr := ib.getTimestampRange(tsData) + ib.buffer(data, tr, startPos, endPos) // update buffer size - ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), ib.getTimestampRange(tsData), startPos, endPos) + ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), tr, startPos, endPos) bufferedSize += int64(data.GetMemorySize()) } return bufferedSize diff --git a/internal/datanode/writebuffer/insert_buffer_test.go b/internal/datanode/writebuffer/insert_buffer_test.go index 9828ba6b88c3..a55b286c88dc 100644 --- a/internal/datanode/writebuffer/insert_buffer_test.go +++ b/internal/datanode/writebuffer/insert_buffer_test.go @@ -168,9 +168,12 @@ func (s *InsertBufferSuite) TestYield() { result = insertBuffer.Yield() s.NotNil(result) - pkField, ok := result.Data[common.StartOfUserFieldID] - s.Require().True(ok) - pkData := lo.RepeatBy(pkField.RowNum(), func(idx int) int64 { return pkField.GetRow(idx).(int64) }) + var pkData []int64 + for _, chunk := range result { + pkField, ok := chunk.Data[common.StartOfUserFieldID] + s.Require().True(ok) + pkData = append(pkData, lo.RepeatBy(pkField.RowNum(), func(idx int) int64 { return pkField.GetRow(idx).(int64) })...) + } s.ElementsMatch(pks, pkData) } @@ -232,20 +235,6 @@ func (s *InsertBufferConstructSuite) TestCreateFailure() { Fields: []*schemapb.FieldSchema{}, }, }, - { - tag: "missing_type_param", - schema: &schemapb.CollectionSchema{ - Name: "test_collection", - Fields: []*schemapb.FieldSchema{ - { - FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true, - }, - { - FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector, - }, - }, - }, - }, } for _, tc := range cases { s.Run(tc.tag, func() { diff --git a/internal/datanode/writebuffer/segment_buffer.go b/internal/datanode/writebuffer/segment_buffer.go index 58ec2b4afda6..6afd64fff7fa 100644 --- a/internal/datanode/writebuffer/segment_buffer.go +++ b/internal/datanode/writebuffer/segment_buffer.go @@ -32,7 +32,7 @@ func (buf *segmentBuffer) IsFull() bool { return buf.insertBuffer.IsFull() || buf.deltaBuffer.IsFull() } -func (buf *segmentBuffer) Yield() (insert *storage.InsertData, delete *storage.DeleteData) { +func (buf *segmentBuffer) Yield() (insert []*storage.InsertData, delete *storage.DeleteData) { return buf.insertBuffer.Yield(), buf.deltaBuffer.Yield() } diff --git a/internal/datanode/writebuffer/write_buffer.go b/internal/datanode/writebuffer/write_buffer.go index 0b68190f4baf..e602ad6243a7 100644 --- a/internal/datanode/writebuffer/write_buffer.go +++ b/internal/datanode/writebuffer/write_buffer.go @@ -404,7 +404,7 @@ func (wb *writeBufferBase) getOrCreateBuffer(segmentID int64) *segmentBuffer { return buffer } -func (wb *writeBufferBase) yieldBuffer(segmentID int64) (*storage.InsertData, *storage.DeleteData, *TimeRange, *msgpb.MsgPosition) { +func (wb *writeBufferBase) yieldBuffer(segmentID int64) ([]*storage.InsertData, *storage.DeleteData, *TimeRange, *msgpb.MsgPosition) { buffer, ok := wb.buffers[segmentID] if !ok { return nil, nil, nil, nil @@ -578,10 +578,12 @@ func (wb *writeBufferBase) getSyncTask(ctx context.Context, segmentID int64) (sy } actions := []metacache.SegmentAction{} - if insert != nil { - batchSize = int64(insert.GetRowNum()) - totalMemSize += float64(insert.GetMemorySize()) + + for _, chunk := range insert { + batchSize = int64(chunk.GetRowNum()) + totalMemSize += float64(chunk.GetMemorySize()) } + if delta != nil { totalMemSize += float64(delta.Size()) } diff --git a/internal/indexnode/chunkmgr_mock.go b/internal/indexnode/chunkmgr_mock.go index 2639054ce406..a839ae79fcea 100644 --- a/internal/indexnode/chunkmgr_mock.go +++ b/internal/indexnode/chunkmgr_mock.go @@ -9,6 +9,7 @@ import ( "golang.org/x/exp/mmap" + "github.com/milvus-io/milvus-proto/go-api/v2/commonpb" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" "github.com/milvus-io/milvus/internal/proto/indexpb" @@ -66,6 +67,9 @@ var ( Description: "", DataType: schemapb.DataType_FloatVector, AutoID: false, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }, }, }, }, diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index fac23ee34452..757d683b9356 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -213,58 +213,70 @@ func (insertCodec *InsertCodec) SerializePkStatsByData(data *InsertData) (*Blob, // From schema, it gets all fields. // For each field, it will create a binlog writer, and write an event to the binlog. // It returns binlog buffer in the end. -func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data *InsertData) ([]*Blob, error) { +func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID UniqueID, data ...*InsertData) ([]*Blob, error) { blobs := make([]*Blob, 0) var writer *InsertBinlogWriter if insertCodec.Schema == nil { return nil, fmt.Errorf("schema is not set") } - timeFieldData, ok := data.Data[common.TimeStampField] - if !ok { - return nil, fmt.Errorf("data doesn't contains timestamp field") - } - if timeFieldData.RowNum() <= 0 { - return nil, fmt.Errorf("there's no data in InsertData") - } - rowNum := int64(timeFieldData.RowNum()) - ts := timeFieldData.(*Int64FieldData).Data + var rowNum int64 var startTs, endTs Timestamp startTs, endTs = math.MaxUint64, 0 - for _, t := range ts { - if uint64(t) > endTs { - endTs = uint64(t) - } - if uint64(t) < startTs { - startTs = uint64(t) + for _, block := range data { + timeFieldData, ok := block.Data[common.TimeStampField] + if !ok { + return nil, fmt.Errorf("data doesn't contains timestamp field") } - } - // sort insert data by rowID - dataSorter := &DataSorter{ - InsertCodec: insertCodec, - InsertData: data, + rowNum += int64(timeFieldData.RowNum()) + + ts := timeFieldData.(*Int64FieldData).Data + + for _, t := range ts { + if uint64(t) > endTs { + endTs = uint64(t) + } + + if uint64(t) < startTs { + startTs = uint64(t) + } + } } - sort.Sort(dataSorter) for _, field := range insertCodec.Schema.Schema.Fields { - singleData := data.Data[field.FieldID] - // encode fields writer = NewInsertBinlogWriter(field.DataType, insertCodec.Schema.ID, partitionID, segmentID, field.FieldID) var eventWriter *insertEventWriter var err error + var dim int64 if typeutil.IsVectorType(field.DataType) { switch field.DataType { case schemapb.DataType_FloatVector: - eventWriter, err = writer.NextInsertEventWriter(singleData.(*FloatVectorFieldData).Dim) + dim, err = typeutil.GetDim(field) + if err != nil { + return nil, err + } + eventWriter, err = writer.NextInsertEventWriter(int(dim)) case schemapb.DataType_BinaryVector: - eventWriter, err = writer.NextInsertEventWriter(singleData.(*BinaryVectorFieldData).Dim) + dim, err = typeutil.GetDim(field) + if err != nil { + return nil, err + } + eventWriter, err = writer.NextInsertEventWriter(int(dim)) case schemapb.DataType_Float16Vector: - eventWriter, err = writer.NextInsertEventWriter(singleData.(*Float16VectorFieldData).Dim) + dim, err = typeutil.GetDim(field) + if err != nil { + return nil, err + } + eventWriter, err = writer.NextInsertEventWriter(int(dim)) case schemapb.DataType_BFloat16Vector: - eventWriter, err = writer.NextInsertEventWriter(singleData.(*BFloat16VectorFieldData).Dim) + dim, err = typeutil.GetDim(field) + if err != nil { + return nil, err + } + eventWriter, err = writer.NextInsertEventWriter(int(dim)) case schemapb.DataType_SparseFloatVector: eventWriter, err = writer.NextInsertEventWriter() default: @@ -277,137 +289,146 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique writer.Close() return nil, err } - eventWriter.SetEventTimestamp(startTs, endTs) - switch field.DataType { - case schemapb.DataType_Bool: - err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BoolFieldData).GetMemorySize())) - case schemapb.DataType_Int8: - err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int8FieldData).GetMemorySize())) - case schemapb.DataType_Int16: - err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int16FieldData).GetMemorySize())) - case schemapb.DataType_Int32: - err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int32FieldData).GetMemorySize())) - case schemapb.DataType_Int64: - err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Int64FieldData).GetMemorySize())) - case schemapb.DataType_Float: - err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatFieldData).GetMemorySize())) - case schemapb.DataType_Double: - err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*DoubleFieldData).GetMemorySize())) - case schemapb.DataType_String, schemapb.DataType_VarChar: - for _, singleString := range singleData.(*StringFieldData).Data { - err = eventWriter.AddOneStringToPayload(singleString) + + var memorySize int64 + for _, block := range data { + singleData := block.Data[field.FieldID] + + blockMemorySize := singleData.GetMemorySize() + memorySize += int64(blockMemorySize) + + switch field.DataType { + case schemapb.DataType_Bool: + err = eventWriter.AddBoolToPayload(singleData.(*BoolFieldData).Data) if err != nil { eventWriter.Close() writer.Close() return nil, err } - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*StringFieldData).GetMemorySize())) - case schemapb.DataType_Array: - for _, singleArray := range singleData.(*ArrayFieldData).Data { - err = eventWriter.AddOneArrayToPayload(singleArray) + + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_Int8: + err = eventWriter.AddInt8ToPayload(singleData.(*Int8FieldData).Data) if err != nil { eventWriter.Close() writer.Close() return nil, err } - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*ArrayFieldData).GetMemorySize())) - case schemapb.DataType_JSON: - for _, singleJSON := range singleData.(*JSONFieldData).Data { - err = eventWriter.AddOneJSONToPayload(singleJSON) + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_Int16: + err = eventWriter.AddInt16ToPayload(singleData.(*Int16FieldData).Data) if err != nil { eventWriter.Close() writer.Close() return nil, err } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_Int32: + err = eventWriter.AddInt32ToPayload(singleData.(*Int32FieldData).Data) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_Int64: + err = eventWriter.AddInt64ToPayload(singleData.(*Int64FieldData).Data) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_Float: + err = eventWriter.AddFloatToPayload(singleData.(*FloatFieldData).Data) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_Double: + err = eventWriter.AddDoubleToPayload(singleData.(*DoubleFieldData).Data) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_String, schemapb.DataType_VarChar: + for _, singleString := range singleData.(*StringFieldData).Data { + err = eventWriter.AddOneStringToPayload(singleString) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_Array: + for _, singleArray := range singleData.(*ArrayFieldData).Data { + err = eventWriter.AddOneArrayToPayload(singleArray) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_JSON: + for _, singleJSON := range singleData.(*JSONFieldData).Data { + err = eventWriter.AddOneJSONToPayload(singleJSON) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_BinaryVector: + err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_FloatVector: + err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_Float16Vector: + err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_BFloat16Vector: + err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim) + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + case schemapb.DataType_SparseFloatVector: + err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData)) + if err != nil { + eventWriter.Close() + writer.Close() + return nil, err + } + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", blockMemorySize)) + default: + return nil, fmt.Errorf("undefined data type %d", field.DataType) } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*JSONFieldData).GetMemorySize())) - case schemapb.DataType_BinaryVector: - err = eventWriter.AddBinaryVectorToPayload(singleData.(*BinaryVectorFieldData).Data, singleData.(*BinaryVectorFieldData).Dim) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BinaryVectorFieldData).GetMemorySize())) - case schemapb.DataType_FloatVector: - err = eventWriter.AddFloatVectorToPayload(singleData.(*FloatVectorFieldData).Data, singleData.(*FloatVectorFieldData).Dim) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*FloatVectorFieldData).GetMemorySize())) - case schemapb.DataType_Float16Vector: - err = eventWriter.AddFloat16VectorToPayload(singleData.(*Float16VectorFieldData).Data, singleData.(*Float16VectorFieldData).Dim) - if err != nil { - eventWriter.Close() - writer.Close() - return nil, err - } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*Float16VectorFieldData).GetMemorySize())) - case schemapb.DataType_BFloat16Vector: - err = eventWriter.AddBFloat16VectorToPayload(singleData.(*BFloat16VectorFieldData).Data, singleData.(*BFloat16VectorFieldData).Dim) - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*BFloat16VectorFieldData).GetMemorySize())) - case schemapb.DataType_SparseFloatVector: - err = eventWriter.AddSparseFloatVectorToPayload(singleData.(*SparseFloatVectorFieldData)) if err != nil { - eventWriter.Close() - writer.Close() return nil, err } - writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", singleData.(*SparseFloatVectorFieldData).GetMemorySize())) - default: - return nil, fmt.Errorf("undefined data type %d", field.DataType) - } - if err != nil { - return nil, err + writer.SetEventTimeStamp(startTs, endTs) } - writer.SetEventTimeStamp(startTs, endTs) err = writer.Finish() if err != nil { @@ -427,7 +448,7 @@ func (insertCodec *InsertCodec) Serialize(partitionID UniqueID, segmentID Unique Key: blobKey, Value: buffer, RowNum: rowNum, - MemorySize: int64(singleData.GetMemorySize()), + MemorySize: memorySize, }) eventWriter.Close() writer.Close() diff --git a/internal/storage/insert_data.go b/internal/storage/insert_data.go index 23e10e529066..7d2de208a9dd 100644 --- a/internal/storage/insert_data.go +++ b/internal/storage/insert_data.go @@ -50,20 +50,24 @@ type InsertData struct { } func NewInsertData(schema *schemapb.CollectionSchema) (*InsertData, error) { + return NewInsertDataWithCap(schema, 0) +} + +func NewInsertDataWithCap(schema *schemapb.CollectionSchema, cap int) (*InsertData, error) { if schema == nil { - return nil, fmt.Errorf("Nil input schema") + return nil, merr.WrapErrParameterMissing("collection schema") } idata := &InsertData{ Data: make(map[FieldID]FieldData), } - for _, fSchema := range schema.Fields { - fieldData, err := NewFieldData(fSchema.DataType, fSchema) + for _, field := range schema.GetFields() { + fieldData, err := NewFieldData(field.DataType, field, cap) if err != nil { return nil, err } - idata.Data[fSchema.FieldID] = fieldData + idata.Data[field.FieldID] = fieldData } return idata, nil } @@ -147,7 +151,7 @@ type FieldData interface { GetDataType() schemapb.DataType } -func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) (FieldData, error) { +func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema, cap int) (FieldData, error) { typeParams := fieldSchema.GetTypeParams() switch dataType { case schemapb.DataType_Float16Vector: @@ -156,7 +160,7 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) return nil, err } return &Float16VectorFieldData{ - Data: make([]byte, 0), + Data: make([]byte, 0, cap), Dim: dim, }, nil case schemapb.DataType_BFloat16Vector: @@ -165,7 +169,7 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) return nil, err } return &BFloat16VectorFieldData{ - Data: make([]byte, 0), + Data: make([]byte, 0, cap), Dim: dim, }, nil case schemapb.DataType_FloatVector: @@ -174,7 +178,7 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) return nil, err } return &FloatVectorFieldData{ - Data: make([]float32, 0), + Data: make([]float32, 0, cap), Dim: dim, }, nil case schemapb.DataType_BinaryVector: @@ -183,56 +187,56 @@ func NewFieldData(dataType schemapb.DataType, fieldSchema *schemapb.FieldSchema) return nil, err } return &BinaryVectorFieldData{ - Data: make([]byte, 0), + Data: make([]byte, 0, cap), Dim: dim, }, nil case schemapb.DataType_SparseFloatVector: return &SparseFloatVectorFieldData{}, nil case schemapb.DataType_Bool: return &BoolFieldData{ - Data: make([]bool, 0), + Data: make([]bool, 0, cap), }, nil case schemapb.DataType_Int8: return &Int8FieldData{ - Data: make([]int8, 0), + Data: make([]int8, 0, cap), }, nil case schemapb.DataType_Int16: return &Int16FieldData{ - Data: make([]int16, 0), + Data: make([]int16, 0, cap), }, nil case schemapb.DataType_Int32: return &Int32FieldData{ - Data: make([]int32, 0), + Data: make([]int32, 0, cap), }, nil case schemapb.DataType_Int64: return &Int64FieldData{ - Data: make([]int64, 0), + Data: make([]int64, 0, cap), }, nil case schemapb.DataType_Float: return &FloatFieldData{ - Data: make([]float32, 0), + Data: make([]float32, 0, cap), }, nil case schemapb.DataType_Double: return &DoubleFieldData{ - Data: make([]float64, 0), + Data: make([]float64, 0, cap), }, nil case schemapb.DataType_JSON: return &JSONFieldData{ - Data: make([][]byte, 0), + Data: make([][]byte, 0, cap), }, nil case schemapb.DataType_Array: return &ArrayFieldData{ - Data: make([]*schemapb.ScalarField, 0), + Data: make([]*schemapb.ScalarField, 0, cap), ElementType: fieldSchema.GetElementType(), }, nil case schemapb.DataType_String, schemapb.DataType_VarChar: return &StringFieldData{ - Data: make([]string, 0), + Data: make([]string, 0, cap), DataType: dataType, }, nil default: diff --git a/internal/storage/print_binlog_test.go b/internal/storage/print_binlog_test.go index 127b39e403c7..89cefe1c4e95 100644 --- a/internal/storage/print_binlog_test.go +++ b/internal/storage/print_binlog_test.go @@ -169,6 +169,9 @@ func TestPrintBinlogFiles(t *testing.T) { IsPrimaryKey: false, Description: "description_10", DataType: schemapb.DataType_BinaryVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }, }, { FieldID: 109, @@ -176,6 +179,9 @@ func TestPrintBinlogFiles(t *testing.T) { IsPrimaryKey: false, Description: "description_11", DataType: schemapb.DataType_FloatVector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "8"}, + }, }, { FieldID: 110, @@ -190,6 +196,9 @@ func TestPrintBinlogFiles(t *testing.T) { IsPrimaryKey: false, Description: "description_13", DataType: schemapb.DataType_BFloat16Vector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "4"}, + }, }, { FieldID: 112, @@ -197,6 +206,9 @@ func TestPrintBinlogFiles(t *testing.T) { IsPrimaryKey: false, Description: "description_14", DataType: schemapb.DataType_Float16Vector, + TypeParams: []*commonpb.KeyValuePair{ + {Key: common.DimKey, Value: "4"}, + }, }, }, }, diff --git a/internal/util/importutilv2/binlog/field_reader.go b/internal/util/importutilv2/binlog/field_reader.go index 2c4ae9aea3eb..324e249a6c11 100644 --- a/internal/util/importutilv2/binlog/field_reader.go +++ b/internal/util/importutilv2/binlog/field_reader.go @@ -40,7 +40,7 @@ func newFieldReader(ctx context.Context, cm storage.ChunkManager, fieldSchema *s } func (r *fieldReader) Next() (storage.FieldData, error) { - fieldData, err := storage.NewFieldData(r.fieldSchema.GetDataType(), r.fieldSchema) + fieldData, err := storage.NewFieldData(r.fieldSchema.GetDataType(), r.fieldSchema, 0) if err != nil { return nil, err } diff --git a/internal/util/importutilv2/binlog/reader_test.go b/internal/util/importutilv2/binlog/reader_test.go index 73ab65dde208..0ddedff5ac52 100644 --- a/internal/util/importutilv2/binlog/reader_test.go +++ b/internal/util/importutilv2/binlog/reader_test.go @@ -282,7 +282,7 @@ func (suite *ReaderSuite) run(dataType schemapb.DataType, elemType schemapb.Data expectInsertData, err := storage.NewInsertData(schema) suite.NoError(err) for _, field := range schema.GetFields() { - expectInsertData.Data[field.GetFieldID()], err = storage.NewFieldData(field.GetDataType(), field) + expectInsertData.Data[field.GetFieldID()], err = storage.NewFieldData(field.GetDataType(), field, suite.numRows) suite.NoError(err) } OUTER: