From c288f6408d4a71968a4fdad78577c8f511d604ce Mon Sep 17 00:00:00 2001 From: shaoting-huang Date: Wed, 19 Jun 2024 16:04:56 +0800 Subject: [PATCH] deltalog stream serde pk and ts separate format Signed-off-by: shaoting-huang --- internal/storage/event_data.go | 5 + internal/storage/event_writer.go | 10 + internal/storage/serde.go | 60 +++- internal/storage/serde_events.go | 383 ++++++++++++++++++++++++-- internal/storage/serde_events_test.go | 217 +++++++++++++++ internal/storage/serde_test.go | 18 ++ pkg/common/common.go | 3 + 7 files changed, 670 insertions(+), 26 deletions(-) diff --git a/internal/storage/event_data.go b/internal/storage/event_data.go index 8359e5c9221b..0d9c48d2b736 100644 --- a/internal/storage/event_data.go +++ b/internal/storage/event_data.go @@ -36,6 +36,11 @@ const ( nullableKey = "nullable" ) +const version = "version" + +// mark useMultiFieldFormat if there are multi fields in a log file +const MULTI_FIELD = "MULTI_FIELD" + type descriptorEventData struct { DescriptorEventDataFixPart ExtraLength int32 diff --git a/internal/storage/event_writer.go b/internal/storage/event_writer.go index 58c6c5a9ca2f..6b9390da0a38 100644 --- a/internal/storage/event_writer.go +++ b/internal/storage/event_writer.go @@ -212,6 +212,16 @@ func newDescriptorEvent() *descriptorEvent { } } +func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int64) *descriptorEvent { + de := newDescriptorEvent() + de.CollectionID = collectionID + de.PartitionID = partitionID + de.SegmentID = segmentID + de.StartTimestamp = 0 + de.EndTimestamp = 0 + return de +} + func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int) (*insertEventWriter, error) { var payloadWriter PayloadWriterInterface var err error diff --git a/internal/storage/serde.go b/internal/storage/serde.go index 3ae4e19470ad..3ed966cae81d 100644 --- a/internal/storage/serde.go +++ b/internal/storage/serde.go @@ -35,6 +35,7 @@ import ( type Record interface { Schema() map[FieldID]schemapb.DataType + ArrowSchema() *arrow.Schema Column(i FieldID) arrow.Array Len() int Release() @@ -83,6 +84,14 @@ func (r *compositeRecord) Schema() map[FieldID]schemapb.DataType { return r.schema } +func (r *compositeRecord) ArrowSchema() *arrow.Schema { + var fields []arrow.Field + for _, rec := range r.recs { + fields = append(fields, rec.Schema().Field(0)) + } + return arrow.NewSchema(fields, nil) +} + type serdeEntry struct { // arrowType returns the arrow type for the given dimension arrowType func(int) arrow.DataType @@ -575,6 +584,10 @@ func (r *selectiveRecord) Schema() map[FieldID]schemapb.DataType { return r.schema } +func (r *selectiveRecord) ArrowSchema() *arrow.Schema { + return r.ArrowSchema() +} + func (r *selectiveRecord) Column(i FieldID) arrow.Array { if i == r.selectedFieldId { return r.r.Column(i) @@ -663,9 +676,10 @@ func (sfw *singleFieldRecordWriter) Close() { func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Writer) (*singleFieldRecordWriter, error) { schema := arrow.NewSchema([]arrow.Field{field}, nil) + + // use writer properties as same as payload writer's for now fw, err := pqarrow.NewFileWriter(schema, writer, parquet.NewWriterProperties( - parquet.WithMaxRowGroupLength(math.MaxInt64), // No additional grouping for now. parquet.WithCompression(compress.Codecs.Zstd), parquet.WithCompressionLevel(3)), pqarrow.DefaultWriterProps()) @@ -679,6 +693,46 @@ func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Wr }, nil } +var _ RecordWriter = (*multiFieldRecordWriter)(nil) + +type multiFieldRecordWriter struct { + fw *pqarrow.FileWriter + fieldIds []FieldID + schema *arrow.Schema + + numRows int +} + +func (mfw *multiFieldRecordWriter) Write(r Record) error { + mfw.numRows += r.Len() + columns := make([]arrow.Array, len(mfw.fieldIds)) + for i, fieldId := range mfw.fieldIds { + columns[i] = r.Column(fieldId) + } + rec := array.NewRecord(mfw.schema, columns, int64(r.Len())) + defer rec.Release() + return mfw.fw.WriteBuffered(rec) +} + +func (mfw *multiFieldRecordWriter) Close() { + mfw.fw.Close() +} + +func newMultiFieldRecordWriter(fieldIds []FieldID, fields []arrow.Field, writer io.Writer) (*multiFieldRecordWriter, error) { + schema := arrow.NewSchema(fields, nil) + fw, err := pqarrow.NewFileWriter(schema, writer, + parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(math.MaxInt64)), // No additional grouping for now. + pqarrow.DefaultWriterProps()) + if err != nil { + return nil, err + } + return &multiFieldRecordWriter{ + fw: fw, + fieldIds: fieldIds, + schema: schema, + }, nil +} + type SerializeWriter[T any] struct { rw RecordWriter serializer Serializer[T] @@ -773,6 +827,10 @@ func (sr *simpleArrowRecord) Release() { sr.r.Release() } +func (sr *simpleArrowRecord) ArrowSchema() *arrow.Schema { + return sr.r.Schema() +} + func newSimpleArrowRecord(r arrow.Record, schema map[FieldID]schemapb.DataType, field2Col map[FieldID]int) *simpleArrowRecord { return &simpleArrowRecord{ r: r, diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 7080de02ae85..487e00ac4ac3 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -28,7 +28,6 @@ import ( "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" "github.com/apache/arrow/go/v12/arrow/memory" - "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/util/merr" @@ -231,7 +230,7 @@ func NewBinlogDeserializeReader(blobs []*Blob, PKfieldID UniqueID) (*Deserialize }), nil } -func NewDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { +func NewDeltalogOneFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { reader, err := newCompositeBinlogRecordReader(blobs) if err != nil { return nil, err @@ -314,14 +313,9 @@ func (bsw *BinlogStreamWriter) writeBinlogHeaders(w io.Writer) error { return err } // Write descriptor - de := newDescriptorEvent() + de := NewBaseDescriptorEvent(bsw.collectionID, bsw.partitionID, bsw.segmentID) de.PayloadDataType = bsw.fieldSchema.DataType - de.CollectionID = bsw.collectionID - de.PartitionID = bsw.partitionID - de.SegmentID = bsw.segmentID de.FieldID = bsw.fieldSchema.FieldID - de.StartTimestamp = 0 - de.EndTimestamp = 0 de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(bsw.memorySize)) if err := de.Write(w); err != nil { return err @@ -420,6 +414,7 @@ type DeltalogStreamWriter struct { collectionID UniqueID partitionID UniqueID segmentID UniqueID + fieldSchema *schemapb.FieldSchema memorySize int // To be updated on the fly buf bytes.Buffer @@ -430,17 +425,24 @@ func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) { if dsw.rw != nil { return dsw.rw, nil } - - rw, err := newSingleFieldRecordWriter(0, arrow.Field{ - Name: "delta", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, &dsw.buf) - if err != nil { - return nil, err + switch dsw.fieldSchema.DataType { + case schemapb.DataType_Int64, schemapb.DataType_VarChar, schemapb.DataType_String: + dim, _ := typeutil.GetDim(dsw.fieldSchema) + rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{ + Name: dsw.fieldSchema.Name, + Type: serdeMap[dsw.fieldSchema.DataType].arrowType(int(dim)), + Nullable: false, // No nullable check here. + }, &dsw.buf) + if err != nil { + return nil, err + } + dsw.rw = rw + return rw, nil + default: + return nil, merr.WrapErrServiceInternal(fmt.Sprintf( + "does not support delta log primary key data type %s", + dsw.fieldSchema.DataType.String())) } - dsw.rw = rw - return rw, nil } func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) { @@ -469,13 +471,8 @@ func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error { return err } // Write descriptor - de := newDescriptorEvent() - de.PayloadDataType = schemapb.DataType_String - de.CollectionID = dsw.collectionID - de.PartitionID = dsw.partitionID - de.SegmentID = dsw.segmentID - de.StartTimestamp = 0 - de.EndTimestamp = 0 + de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID) + de.PayloadDataType = dsw.fieldSchema.DataType de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize)) if err := de.Write(w); err != nil { return err @@ -502,6 +499,11 @@ func NewDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *Del collectionID: collectionID, partitionID: partitionID, segmentID: segmentID, + fieldSchema: &schemapb.FieldSchema{ + FieldID: common.RowIDField, + Name: "delta", + DataType: schemapb.DataType_String, + }, } } @@ -542,3 +544,334 @@ func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *De return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), schema, field2Col), memorySize, nil }, batchSize), nil } + +var _ RecordReader = (*simpleArrowRecordReader)(nil) + +type simpleArrowRecordReader struct { + blobs []*Blob + + blobPos int + rr array.RecordReader + closer func() + + r simpleArrowRecord +} + +func (crr *simpleArrowRecordReader) iterateNextBatch() error { + if crr.closer != nil { + crr.closer() + } + + crr.blobPos++ + if crr.blobPos >= len(crr.blobs) { + return io.EOF + } + + reader, err := NewBinlogReader(crr.blobs[crr.blobPos].Value) + if err != nil { + return err + } + + er, err := reader.NextEventReader() + if err != nil { + return err + } + rr, err := er.GetArrowRecordReader() + if err != nil { + return err + } + crr.rr = rr + crr.closer = func() { + crr.rr.Release() + er.Close() + reader.Close() + } + + return nil +} + +func (crr *simpleArrowRecordReader) Next() error { + if crr.rr == nil { + if crr.blobs == nil || len(crr.blobs) == 0 { + return io.EOF + } + crr.blobPos = -1 + crr.r = simpleArrowRecord{ + schema: make(map[FieldID]schemapb.DataType), + field2Col: make(map[FieldID]int), + } + if err := crr.iterateNextBatch(); err != nil { + return err + } + } + + composeRecord := func() bool { + if ok := crr.rr.Next(); !ok { + return false + } + record := crr.rr.Record() + for i := range record.Schema().Fields() { + crr.r.field2Col[FieldID(i)] = i + } + crr.r.r = record + return true + } + + if ok := composeRecord(); !ok { + if err := crr.iterateNextBatch(); err != nil { + return err + } + if ok := composeRecord(); !ok { + return io.EOF + } + } + return nil +} + +func (crr *simpleArrowRecordReader) Record() Record { + return &crr.r +} + +func (crr *simpleArrowRecordReader) Close() { + if crr.closer != nil { + crr.closer() + } +} + +func newSimpleArrowRecordReader(blobs []*Blob) (*simpleArrowRecordReader, error) { + return &simpleArrowRecordReader{ + blobs: blobs, + }, nil +} + +func NewMultiFieldDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID, schema []*schemapb.FieldSchema) *MultiFieldDeltalogStreamWriter { + return &MultiFieldDeltalogStreamWriter{ + collectionID: collectionID, + partitionID: partitionID, + segmentID: segmentID, + fieldSchemas: schema, + } +} + +type MultiFieldDeltalogStreamWriter struct { + collectionID UniqueID + partitionID UniqueID + segmentID UniqueID + fieldSchemas []*schemapb.FieldSchema + + memorySize int // To be updated on the fly + buf bytes.Buffer + rw *multiFieldRecordWriter +} + +func (dsw *MultiFieldDeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) { + if dsw.rw != nil { + return dsw.rw, nil + } + + fieldIds := make([]FieldID, len(dsw.fieldSchemas)) + fields := make([]arrow.Field, len(dsw.fieldSchemas)) + + for i, fieldSchema := range dsw.fieldSchemas { + fieldIds[i] = fieldSchema.FieldID + dim, _ := typeutil.GetDim(fieldSchema) + fields[i] = arrow.Field{ + Name: fieldSchema.Name, + Type: serdeMap[fieldSchema.DataType].arrowType(int(dim)), + Nullable: false, // No nullable check here. + } + } + + rw, err := newMultiFieldRecordWriter(fieldIds, fields, &dsw.buf) + if err != nil { + return nil, err + } + dsw.rw = rw + return rw, nil +} + +func (dsw *MultiFieldDeltalogStreamWriter) Finalize() (*Blob, error) { + if dsw.rw == nil { + return nil, io.ErrUnexpectedEOF + } + dsw.rw.Close() + + var b bytes.Buffer + if err := dsw.writeDeltalogHeaders(&b); err != nil { + return nil, err + } + if _, err := b.Write(dsw.buf.Bytes()); err != nil { + return nil, err + } + return &Blob{ + Value: b.Bytes(), + RowNum: int64(dsw.rw.numRows), + MemorySize: int64(dsw.memorySize), + }, nil +} + +func (dsw *MultiFieldDeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error { + // Write magic number + if err := binary.Write(w, common.Endian, MagicNumber); err != nil { + return err + } + // Write descriptor + de := NewBaseDescriptorEvent(dsw.collectionID, dsw.partitionID, dsw.segmentID) + de.PayloadDataType = schemapb.DataType_Int64 + de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize)) + de.descriptorEventData.AddExtra(version, MULTI_FIELD) + if err := de.Write(w); err != nil { + return err + } + // Write event header + eh := newEventHeader(DeleteEventType) + // Write event data + ev := newDeleteEventData() + ev.StartTimestamp = 1 + ev.EndTimestamp = 1 + eh.EventLength = int32(dsw.buf.Len()) + eh.GetMemoryUsageInBytes() + int32(binary.Size(ev)) + // eh.NextPosition = eh.EventLength + w.Offset() + if err := eh.Write(w); err != nil { + return err + } + if err := ev.WriteEventData(w); err != nil { + return err + } + return nil +} + +func NewDeltalogMultiFieldWriter(partitionID, segmentID UniqueID, eventWriter *MultiFieldDeltalogStreamWriter, batchSize int, +) (*SerializeWriter[*DeleteLog], error) { + rw, err := eventWriter.GetRecordWriter() + if err != nil { + return nil, err + } + return NewSerializeRecordWriter[*DeleteLog](rw, func(v []*DeleteLog) (Record, uint64, error) { + fields := []arrow.Field{ + { + Name: "pk", + Type: serdeMap[schemapb.DataType(v[0].PkType)].arrowType(0), + Nullable: false, + }, + { + Name: "ts", + Type: arrow.PrimitiveTypes.Int64, + Nullable: false, + }, + } + arrowSchema := arrow.NewSchema(fields, nil) + builder := array.NewRecordBuilder(memory.DefaultAllocator, arrowSchema) + defer builder.Release() + + var memorySize uint64 + pkType := schemapb.DataType(v[0].PkType) + switch pkType { + case schemapb.DataType_Int64: + pb := builder.Field(0).(*array.Int64Builder) + for _, vv := range v { + pk := vv.Pk.GetValue().(int64) + pb.Append(pk) + memorySize += uint64(pk) + } + case schemapb.DataType_VarChar: + pb := builder.Field(0).(*array.StringBuilder) + for _, vv := range v { + pk := vv.Pk.GetValue().(string) + pb.Append(pk) + memorySize += uint64(binary.Size(pk)) + } + default: + return nil, 0, fmt.Errorf("unexpected pk type %v", v[0].PkType) + } + + for _, vv := range v { + builder.Field(1).(*array.Int64Builder).Append(int64(vv.Ts)) + memorySize += uint64(vv.Ts) + } + + arr := []arrow.Array{builder.Field(0).NewArray(), builder.Field(1).NewArray()} + + field2Col := map[FieldID]int{ + common.DeltaPkField: 0, + common.TimeStampField: 1, + } + schema := map[FieldID]schemapb.DataType{ + common.DeltaPkField: pkType, + common.TimeStampField: schemapb.DataType_Int64, + } + return newSimpleArrowRecord(array.NewRecord(arrowSchema, arr, int64(len(v))), schema, field2Col), memorySize, nil + }, batchSize), nil +} + +func NewDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { + reader, err := newSimpleArrowRecordReader(blobs) + if err != nil { + return nil, err + } + return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error { + rec, ok := r.(*simpleArrowRecord) + if !ok { + return fmt.Errorf("can not cast to simple arrow record") + } + fields := rec.r.Schema().Fields() + for i, fid := range fields { + switch fid.Name { + case "pk": + switch fid.Type.ID() { + case arrow.INT64: + arr := r.Column(int64(i)).(*array.Int64) + for j := 0; j < r.Len(); j++ { + if v[j] == nil { + v[j] = &DeleteLog{} + } + v[j].Pk = NewInt64PrimaryKey(arr.Value(j)) + } + case arrow.STRING: + arr := r.Column(int64(i)).(*array.String) + for j := 0; j < r.Len(); j++ { + if v[j] == nil { + v[j] = &DeleteLog{} + } + v[j].Pk = NewVarCharPrimaryKey(arr.Value(j)) + } + default: + return fmt.Errorf("unexpected delta log pkType %v", fid.Type.Name()) + } + + case "ts": + arr := r.Column(int64(i)).(*array.Int64) + for j := 0; j < r.Len(); j++ { + v[j].Ts = uint64(arr.Value(j)) + } + default: + return fmt.Errorf("unexpected delta log field: %v", fid.Name) + } + } + return nil + }), nil +} + +// NewDeltalogDeserializeReader is the entry point for the delta log reader. +// It includes NewDeltalogOneFieldReader, which uses the existing log format with only one column in a log file, +// and NewDeltalogMultiFieldReader, which uses the new format and supports multiple fields in a log file. +func NewDeltalogDeserializeReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { + if supportMultiFieldFormat(blobs) { + return NewDeltalogMultiFieldReader(blobs) + } + return NewDeltalogOneFieldReader(blobs) +} + +// check delta log description data to see if it is the format with +// pk and ts column separately +func supportMultiFieldFormat(blobs []*Blob) bool { + if blobs != nil && len(blobs) > 0 { + reader, err := NewBinlogReader(blobs[0].Value) + defer reader.Close() + if err != nil { + return false + } + version := reader.descriptorEventData.Extras[version] + return version != nil && version.(string) == MULTI_FIELD + } + return false +} diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index 1b0d7ad88fe1..2344524e7a50 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -20,6 +20,7 @@ import ( "bytes" "context" "io" + "strconv" "testing" "github.com/apache/arrow/go/v12/arrow" @@ -343,3 +344,219 @@ func TestDeltalogSerializeWriter(t *testing.T) { } }) } + +func TestDeltalogPkTsSeparateFormat(t *testing.T) { + t.Run("test empty data", func(t *testing.T) { + reader, err := NewDeltalogDeserializeReader(nil) + assert.NoError(t, err) + defer reader.Close() + err = reader.Next() + assert.Equal(t, io.EOF, err) + + eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, nil) + writer, err := NewDeltalogMultiFieldWriter(0, 0, eventWriter, 7) + assert.NoError(t, err) + defer writer.Close() + err = writer.Close() + assert.NoError(t, err) + blob, err := eventWriter.Finalize() + assert.NoError(t, err) + assert.NotNil(t, blob) + }) + + testCases := []struct { + name string + pkType schemapb.DataType + newPk func(int) interface{} + assertPk func(t *testing.T, i int, value *DeleteLog) + }{ + { + name: "test int64 pk", + pkType: schemapb.DataType_Int64, + newPk: func(i int) interface{} { return NewInt64PrimaryKey(int64(i)) }, + assertPk: func(t *testing.T, i int, value *DeleteLog) { + assert.Equal(t, &Int64PrimaryKey{int64(i)}, value.Pk) + assert.Equal(t, uint64(i+1), value.Ts) + }, + }, + { + name: "test varchar pk", + pkType: schemapb.DataType_VarChar, + newPk: func(i int) interface{} { return NewVarCharPrimaryKey(strconv.Itoa(i)) }, + assertPk: func(t *testing.T, i int, value *DeleteLog) { + assert.Equal(t, &VarCharPrimaryKey{strconv.Itoa(i)}, value.Pk) + assert.Equal(t, uint64(i+1), value.Ts) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, []*schemapb.FieldSchema{ + {FieldID: common.DeltaPkField, Name: "pk", DataType: tc.pkType}, + {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, + }) + writer, err := NewDeltalogMultiFieldWriter(0, 0, eventWriter, 7) + assert.NoError(t, err) + + size := 10 + pks := make([]interface{}, size) + tss := make([]uint64, size) + for i := 0; i < size; i++ { + pks[i] = tc.newPk(i) + tss[i] = uint64(i + 1) + } + data := make([]*DeleteLog, size) + for i := range pks { + pk, ok := pks[i].(PrimaryKey) + assert.True(t, ok) + data[i] = NewDeleteLog(pk, tss[i]) + } + + // Serialize the data + for i := 0; i < size; i++ { + err := writer.Write(data[i]) + assert.NoError(t, err) + } + err = writer.Close() + assert.NoError(t, err) + + blob, err := eventWriter.Finalize() + assert.NoError(t, err) + assert.NotNil(t, blob) + blobs := []*Blob{blob} + + // Deserialize the data + reader, err := NewDeltalogDeserializeReader(blobs) + assert.NoError(t, err) + defer reader.Close() + for i := 0; i < size; i++ { + err = reader.Next() + assert.NoError(t, err) + + value := reader.Value() + tc.assertPk(t, i, value) + } + }) + } +} + +func BenchmarkDeltalogFormat(b *testing.B) { + size := 10000000 + blob, err := generateTestDeltalogData(size) + assert.NoError(b, err) + + // Benchmark DeserializeReader + b.Run("one string format reader", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + reader, err := NewDeltalogDeserializeReader([]*Blob{blob}) + assert.NoError(b, err) + defer reader.Close() + for j := 0; j < size; j++ { + err = reader.Next() + _ = reader.Value() + assert.NoError(b, err) + } + err = reader.Next() + assert.Equal(b, io.EOF, err) + } + b.ReportAllocs() + }) + + blob, err = generateTestDeltalogNewFormatData(size) + assert.NoError(b, err) + + // Benchmark DeltalogPkTsSeparateFormat + b.Run("pk ts separate format reader", func(b *testing.B) { + // Deserialize the data + b.ResetTimer() + for i := 0; i < b.N; i++ { + reader, err := NewDeltalogDeserializeReader([]*Blob{blob}) + assert.NoError(b, err) + defer reader.Close() + for j := 0; j < size; j++ { + err = reader.Next() + _ = reader.Value() + assert.NoError(b, err) + } + err = reader.Next() + assert.Equal(b, io.EOF, err) + } + b.ReportAllocs() + }) + + b.Run("one string format writer", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + eventWriter := NewDeltalogStreamWriter(0, 0, 0) + writer, _ := NewDeltalogSerializeWriter(0, 0, eventWriter, size) + + for j := 0; j < size; j++ { + value := NewDeleteLog(&Int64PrimaryKey{int64(j)}, uint64(j+1)) + writer.Write(value) + } + writer.Close() + eventWriter.Finalize() + } + b.ReportAllocs() + }) + + b.Run("pk and ts separate format writer", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, []*schemapb.FieldSchema{ + {FieldID: common.DeltaPkField, Name: "pk", DataType: schemapb.DataType_Int64}, + {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, + }) + writer, _ := NewDeltalogMultiFieldWriter(0, 0, eventWriter, size) + + for j := 0; j < size; j++ { + value := NewDeleteLog(&Int64PrimaryKey{int64(j)}, uint64(j+1)) + writer.Write(value) + } + writer.Close() + eventWriter.Finalize() + } + b.ReportAllocs() + }) +} + +func generateTestDeltalogNewFormatData(size int) (*Blob, error) { + eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, []*schemapb.FieldSchema{ + {FieldID: common.DeltaPkField, Name: "pk", DataType: schemapb.DataType_Int64}, + {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, + }) + writer, err := NewDeltalogMultiFieldWriter(0, 0, eventWriter, size) + if err != nil { + return nil, err + } + pks := make([]interface{}, size) + tss := make([]uint64, size) + for i := 0; i < size; i++ { + pks[i] = NewInt64PrimaryKey(int64(i)) + tss[i] = uint64(i + 1) + } + data := make([]*DeleteLog, size) + for i := range pks { + pk, ok := pks[i].(PrimaryKey) + if !ok { + return nil, err + } + data[i] = NewDeleteLog(pk, tss[i]) + } + + // Serialize the data + for i := 0; i < size; i++ { + writer.Write(data[i]) + } + err = writer.Close() + if err != nil { + return nil, err + } + blob, err := eventWriter.Finalize() + if err != nil { + return nil, err + } + return blob, nil +} diff --git a/internal/storage/serde_test.go b/internal/storage/serde_test.go index 87d2aacfa43a..b9bac7ae7fec 100644 --- a/internal/storage/serde_test.go +++ b/internal/storage/serde_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" + "github.com/apache/arrow/go/v12/arrow" "github.com/apache/arrow/go/v12/arrow/array" "github.com/apache/arrow/go/v12/arrow/memory" "github.com/stretchr/testify/assert" @@ -100,6 +101,23 @@ func TestSerDe(t *testing.T) { } } +func TestArrowSchema(t *testing.T) { + t.Run("test composite record", func(t *testing.T) { + fields := []arrow.Field{{Name: "1", Type: arrow.BinaryTypes.String, Nullable: true}} + builder := array.NewBuilder(memory.DefaultAllocator, arrow.BinaryTypes.String) + builder.AppendValueFromString("1") + arrays := []arrow.Array{builder.NewArray()} + cr := &compositeRecord{ + recs: make(map[FieldID]arrow.Record, 1), + schema: make(map[FieldID]schemapb.DataType, 1), + } + cr.recs[0] = array.NewRecord(arrow.NewSchema(fields, nil), arrays, 1) + cr.schema[0] = schemapb.DataType_String + expected := arrow.NewSchema(fields, nil) + assert.Equal(t, expected, cr.ArrowSchema()) + }) +} + func BenchmarkDeserializeReader(b *testing.B) { len := 1000000 blobs, err := generateTestData(len) diff --git a/pkg/common/common.go b/pkg/common/common.go index 8b881c14a2de..a1e5a8b1b4c7 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -37,6 +37,9 @@ const ( // StartOfUserFieldID represents the starting ID of the user-defined field StartOfUserFieldID = 100 + // DeltaPkField is the ID of the primary key field for delta log + DeltaPkField = -1 + // RowIDField is the ID of the RowID field reserved by the system RowIDField = 0