Skip to content

Commit

Permalink
enhance: Avoid merging insert data when buffering insert msgs (#33562)
Browse files Browse the repository at this point in the history
See also #33561

This PR:
- Use zero copy when buffering insert messages
- Make `storage.InsertCodec` support serialize multiple insert data
chunk into same batch binlog files

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>

---------

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia committed Jun 13, 2024
1 parent 9ab3058 commit 512ea6b
Show file tree
Hide file tree
Showing 20 changed files with 290 additions and 224 deletions.
2 changes: 1 addition & 1 deletion internal/datanode/importv2/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewSyncTask(ctx context.Context,
}

syncPack := &syncmgr.SyncPack{}
syncPack.WithInsertData(insertData).
syncPack.WithInsertData([]*storage.InsertData{insertData}).
WithDeleteData(deleteData).
WithCollectionID(collectionID).
WithPartitionID(partitionID).
Expand Down
14 changes: 14 additions & 0 deletions internal/datanode/iterators/binlog_iterator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (

"github.com/stretchr/testify/suite"

"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
"github.com/milvus-io/milvus/internal/storage"
"github.com/milvus-io/milvus/pkg/common"
)

func TestInsertBinlogIteratorSuite(t *testing.T) {
Expand Down Expand Up @@ -223,27 +225,39 @@ func genTestInsertData() (*storage.InsertData, *etcdpb.CollectionMeta) {
IsPrimaryKey: false,
Description: "binary_vector",
DataType: schemapb.DataType_BinaryVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "8"},
},
},
{
FieldID: FloatVectorField,
Name: "field_float_vector",
IsPrimaryKey: false,
Description: "float_vector",
DataType: schemapb.DataType_FloatVector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
},
},
{
FieldID: Float16VectorField,
Name: "field_float16_vector",
IsPrimaryKey: false,
Description: "float16_vector",
DataType: schemapb.DataType_Float16Vector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
},
},
{
FieldID: BFloat16VectorField,
Name: "field_bfloat16_vector",
IsPrimaryKey: false,
Description: "bfloat16_vector",
DataType: schemapb.DataType_BFloat16Vector,
TypeParams: []*commonpb.KeyValuePair{
{Key: common.DimKey, Value: "4"},
},
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/metacache/bloom_filter_set_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func (s *BloomFilterSetSuite) GetFieldData(ids []int64) storage.FieldData {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, len(ids))
s.Require().NoError(err)

for _, id := range ids {
Expand Down
10 changes: 7 additions & 3 deletions internal/datanode/syncmgr/serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ package syncmgr
import (
"context"

"github.com/samber/lo"

"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/datanode/metacache"
"github.com/milvus-io/milvus/internal/proto/datapb"
Expand All @@ -37,7 +39,7 @@ type SyncPack struct {
metacache metacache.MetaCache
metawriter MetaWriter
// data
insertData *storage.InsertData
insertData []*storage.InsertData
deltaData *storage.DeleteData
// statistics
tsFrom typeutil.Timestamp
Expand All @@ -55,8 +57,10 @@ type SyncPack struct {
level datapb.SegmentLevel
}

func (p *SyncPack) WithInsertData(insertData *storage.InsertData) *SyncPack {
p.insertData = insertData
func (p *SyncPack) WithInsertData(insertData []*storage.InsertData) *SyncPack {
p.insertData = lo.Filter(insertData, func(inData *storage.InsertData, _ int) bool {
return inData != nil
})
return p
}

Expand Down
23 changes: 16 additions & 7 deletions internal/datanode/syncmgr/storage_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,12 @@ func (s *storageV1Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
zap.String("channel", pack.channelName),
)

if pack.insertData != nil {
if len(pack.insertData) > 0 {
memSize := make(map[int64]int64)
for fieldID, fieldData := range pack.insertData.Data {
memSize[fieldID] = int64(fieldData.GetMemorySize())
for _, chunk := range pack.insertData {
for fieldID, fieldData := range chunk.Data {
memSize[fieldID] += int64(fieldData.GetMemorySize())
}
}
task.binlogMemsize = memSize

Expand Down Expand Up @@ -159,7 +161,7 @@ func (s *storageV1Serializer) setTaskMeta(task *SyncTask, pack *SyncPack) {

func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPack) (map[int64]*storage.Blob, error) {
log := log.Ctx(ctx)
blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData)
blobs, err := s.inCodec.Serialize(pack.partitionID, pack.segmentID, pack.insertData...)
if err != nil {
return nil, err
}
Expand All @@ -178,14 +180,21 @@ func (s *storageV1Serializer) serializeBinlog(ctx context.Context, pack *SyncPac
}

func (s *storageV1Serializer) serializeStatslog(pack *SyncPack) (*storage.PrimaryKeyStats, *storage.Blob, error) {
pkFieldData := pack.insertData.Data[s.pkField.GetFieldID()]
rowNum := int64(pkFieldData.RowNum())
var rowNum int64
var pkFieldData []storage.FieldData
for _, chunk := range pack.insertData {
chunkPKData := chunk.Data[s.pkField.GetFieldID()]
pkFieldData = append(pkFieldData, chunkPKData)
rowNum += int64(chunkPKData.RowNum())
}

stats, err := storage.NewPrimaryKeyStats(s.pkField.GetFieldID(), int64(s.pkField.GetDataType()), rowNum)
if err != nil {
return nil, nil, err
}
stats.UpdateByMsgs(pkFieldData)
for _, chunkPkData := range pkFieldData {
stats.UpdateByMsgs(chunkPkData)
}

blob, err := s.inCodec.SerializePkStats(stats, pack.batchSize)
if err != nil {
Expand Down
8 changes: 4 additions & 4 deletions internal/datanode/syncmgr/storage_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func (s *StorageV1SerializerSuite) getBfs() *metacache.BloomFilterSet {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down Expand Up @@ -200,7 +200,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
s.Run("with_empty_data", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0)
pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchSize(0)

_, err := s.serializer.EncodeBuffer(ctx, pack)
s.Error(err)
Expand All @@ -209,7 +209,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
s.Run("with_normal_data", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)

s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once()

Expand Down Expand Up @@ -243,7 +243,7 @@ func (s *StorageV1SerializerSuite) TestSerializeInsert() {
s.Run("with_flush", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)
pack.WithFlush()

bfs := s.getBfs()
Expand Down
8 changes: 5 additions & 3 deletions internal/datanode/syncmgr/storage_v2_serializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s *storageV2Serializer) EncodeBuffer(ctx context.Context, pack *SyncPack)
}

task.space = space
if pack.insertData != nil {
if len(pack.insertData) > 0 {
insertReader, err := s.serializeInsertData(pack)
if err != nil {
log.Warn("failed to serialize insert data with storagev2", zap.Error(err))
Expand Down Expand Up @@ -155,8 +155,10 @@ func (s *storageV2Serializer) serializeInsertData(pack *SyncPack) (array.RecordR
builder := array.NewRecordBuilder(memory.DefaultAllocator, s.arrowSchema)
defer builder.Release()

if err := iTypeutil.BuildRecord(builder, pack.insertData, s.schema.GetFields()); err != nil {
return nil, err
for _, chunk := range pack.insertData {
if err := iTypeutil.BuildRecord(builder, chunk, s.schema.GetFields()); err != nil {
return nil, err
}
}

rec := builder.NewRecord()
Expand Down
8 changes: 4 additions & 4 deletions internal/datanode/syncmgr/storage_v2_serializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func (s *StorageV2SerializerSuite) getBfs() *metacache.BloomFilterSet {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down Expand Up @@ -221,7 +221,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() {
s.Run("empty_insert_data", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getEmptyInsertBuffer()).WithBatchSize(0)
pack.WithInsertData([]*storage.InsertData{s.getEmptyInsertBuffer()}).WithBatchSize(0)

_, err := s.serializer.EncodeBuffer(ctx, pack)
s.Error(err)
Expand All @@ -230,7 +230,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() {
s.Run("with_normal_data", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)

s.mockCache.EXPECT().UpdateSegments(mock.Anything, mock.Anything).Return().Once()

Expand Down Expand Up @@ -264,7 +264,7 @@ func (s *StorageV2SerializerSuite) TestSerializeInsert() {
s.Run("with_flush", func() {
pack := s.getBasicPack()
pack.WithTimeRange(50, 100)
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)
pack.WithFlush()

bfs := s.getBfs()
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/syncmgr/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func (s *SyncTaskSuite) TestRunNormal() {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down Expand Up @@ -299,7 +299,7 @@ func (s *SyncTaskSuite) TestCompactToNull() {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down
4 changes: 2 additions & 2 deletions internal/datanode/syncmgr/taskv2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (s *SyncTaskSuiteV2) getSuiteSyncTask() *SyncTaskV2 {
Timestamp: 1000,
ChannelName: s.channelName,
})
pack.WithInsertData(s.getInsertBuffer()).WithBatchSize(10)
pack.WithInsertData([]*storage.InsertData{s.getInsertBuffer()}).WithBatchSize(10)
pack.WithDeleteData(s.getDeleteBuffer())

storageCache, err := metacache.NewStorageV2Cache(s.schema)
Expand All @@ -203,7 +203,7 @@ func (s *SyncTaskSuiteV2) TestRunNormal() {
Name: "ID",
IsPrimaryKey: true,
DataType: schemapb.DataType_Int64,
})
}, 16)
s.Require().NoError(err)

ids := []int64{1, 2, 3, 4, 5, 6, 7}
Expand Down
35 changes: 20 additions & 15 deletions internal/datanode/writebuffer/insert_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ type InsertBuffer struct {
BufferBase
collSchema *schemapb.CollectionSchema

buffer *storage.InsertData
buffers []*storage.InsertData
}

func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) {
Expand All @@ -87,40 +87,45 @@ func NewInsertBuffer(sch *schemapb.CollectionSchema) (*InsertBuffer, error) {
if estSize == 0 {
return nil, errors.New("Invalid schema")
}
buffer, err := storage.NewInsertData(sch)
if err != nil {
return nil, err
}

sizeLimit := paramtable.Get().DataNodeCfg.FlushInsertBufferSize.GetAsInt64()

return &InsertBuffer{
ib := &InsertBuffer{
BufferBase: BufferBase{
rowLimit: noLimit,
sizeLimit: sizeLimit,
TimestampFrom: math.MaxUint64,
TimestampTo: 0,
},
collSchema: sch,
buffer: buffer,
}, nil
}

return ib, nil
}

func (ib *InsertBuffer) Yield() *storage.InsertData {
if ib.IsEmpty() {
return nil
}
func (ib *InsertBuffer) buffer(inData *storage.InsertData, tr TimeRange, startPos, endPos *msgpb.MsgPosition) {
// buffer := ib.currentBuffer()
// storage.MergeInsertData(buffer.buffer, inData)
ib.buffers = append(ib.buffers, inData)
ib.UpdateStatistics(int64(inData.GetRowNum()), int64(inData.GetMemorySize()), tr, startPos, endPos)
}

return ib.buffer
func (ib *InsertBuffer) Yield() []*storage.InsertData {
result := ib.buffers
// set buffer nil to so that fragmented buffer could get GCed
ib.buffers = nil
return result
}

func (ib *InsertBuffer) Buffer(inData *inData, startPos, endPos *msgpb.MsgPosition) int64 {
bufferedSize := int64(0)
for idx, data := range inData.data {
storage.MergeInsertData(ib.buffer, data)
tsData := inData.tsField[idx]
tr := ib.getTimestampRange(tsData)
ib.buffer(data, tr, startPos, endPos)

// update buffer size
ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), ib.getTimestampRange(tsData), startPos, endPos)
ib.UpdateStatistics(int64(data.GetRowNum()), int64(data.GetMemorySize()), tr, startPos, endPos)
bufferedSize += int64(data.GetMemorySize())
}
return bufferedSize
Expand Down
23 changes: 6 additions & 17 deletions internal/datanode/writebuffer/insert_buffer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,12 @@ func (s *InsertBufferSuite) TestYield() {
result = insertBuffer.Yield()
s.NotNil(result)

pkField, ok := result.Data[common.StartOfUserFieldID]
s.Require().True(ok)
pkData := lo.RepeatBy(pkField.RowNum(), func(idx int) int64 { return pkField.GetRow(idx).(int64) })
var pkData []int64
for _, chunk := range result {
pkField, ok := chunk.Data[common.StartOfUserFieldID]
s.Require().True(ok)
pkData = append(pkData, lo.RepeatBy(pkField.RowNum(), func(idx int) int64 { return pkField.GetRow(idx).(int64) })...)
}
s.ElementsMatch(pks, pkData)
}

Expand Down Expand Up @@ -232,20 +235,6 @@ func (s *InsertBufferConstructSuite) TestCreateFailure() {
Fields: []*schemapb.FieldSchema{},
},
},
{
tag: "missing_type_param",
schema: &schemapb.CollectionSchema{
Name: "test_collection",
Fields: []*schemapb.FieldSchema{
{
FieldID: 100, Name: "pk", DataType: schemapb.DataType_Int64, IsPrimaryKey: true,
},
{
FieldID: 101, Name: "vector", DataType: schemapb.DataType_FloatVector,
},
},
},
},
}
for _, tc := range cases {
s.Run(tc.tag, func() {
Expand Down
2 changes: 1 addition & 1 deletion internal/datanode/writebuffer/segment_buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (buf *segmentBuffer) IsFull() bool {
return buf.insertBuffer.IsFull() || buf.deltaBuffer.IsFull()
}

func (buf *segmentBuffer) Yield() (insert *storage.InsertData, delete *storage.DeleteData) {
func (buf *segmentBuffer) Yield() (insert []*storage.InsertData, delete *storage.DeleteData) {
return buf.insertBuffer.Yield(), buf.deltaBuffer.Yield()
}

Expand Down
Loading

0 comments on commit 512ea6b

Please sign in to comment.