diff --git a/internal/storage/binlog_writer.go b/internal/storage/binlog_writer.go index 583798f112167..27c2b1ab9d25c 100644 --- a/internal/storage/binlog_writer.go +++ b/internal/storage/binlog_writer.go @@ -294,11 +294,14 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID } // NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file. -func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64) *DeleteBinlogWriter { +func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64, FieldID ...int64) *DeleteBinlogWriter { descriptorEvent := newDescriptorEvent() descriptorEvent.PayloadDataType = dataType descriptorEvent.CollectionID = collectionID descriptorEvent.PartitionID = partitionID + if len(FieldID) > 0 { + descriptorEvent.FieldID = FieldID[0] + } descriptorEvent.SegmentID = segmentID w := &DeleteBinlogWriter{ baseBinlogWriter: baseBinlogWriter{ diff --git a/internal/storage/data_codec.go b/internal/storage/data_codec.go index c8a1babefce90..656a4d02cab91 100644 --- a/internal/storage/data_codec.go +++ b/internal/storage/data_codec.go @@ -22,6 +22,7 @@ import ( "fmt" "math" "sort" + "strconv" "github.com/milvus-io/milvus-proto/go-api/v2/schemapb" "github.com/milvus-io/milvus/internal/proto/etcdpb" @@ -770,6 +771,67 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni return blob, nil } +// Serialize transfer delete data to blob. . +// For each delete message, it will save pk and ts string to binlog separately, +// to avoid json marshal. +func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, data *DeleteData) ([]*Blob, error) { + var blobs []*Blob + var writer *DeleteBinlogWriter + length := len(data.Pks) + if length != len(data.Tss) { + return nil, fmt.Errorf("the length of pks, and TimeStamps is not equal") + } + var startTs, endTs Timestamp + startTs, endTs = math.MaxUint64, 0 + for _, ts := range data.Tss { + if ts < startTs { + startTs = ts + } + if ts > endTs { + endTs = ts + } + } + for _, blobKey := range []FieldID{common.RowIDField, common.TimeStampField} { + writer = NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID, blobKey) + eventWriter, err := writer.NextDeleteEventWriter() + if err != nil { + writer.Close() + return nil, err + } + var int64data []int64 + for i := 0; i < length; i++ { + switch blobKey { + case common.RowIDField: + int64data = append(int64data, data.Pks[i].GetValue().(int64)) + case common.TimeStampField: + int64data = append(int64data, int64(data.Tss[i])) + } + } + err = eventWriter.AddInt64ToPayload(int64data, nil) + if err != nil { + return nil, err + } + eventWriter.SetEventTimestamp(startTs, endTs) + writer.SetEventTimeStamp(startTs, endTs) + writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64data))) + err = writer.Finish() + if err != nil { + return nil, err + } + buffer, err := writer.GetBuffer() + if err != nil { + return nil, err + } + blobs = append(blobs, &Blob{ + Key: strconv.Itoa(int(blobKey)), + Value: buffer, + }) + eventWriter.Close() + writer.Close() + } + return blobs, nil +} + // Deserialize deserializes the deltalog blobs into DeleteData func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) { if len(blobs) == 0 { @@ -826,6 +888,62 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID return pid, sid, result, nil } +// Deserialize deserializes the deltalog blobs into DeleteData +func (deleteCodec *DeleteCodec) DeserializeV2(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) { + if len(blobs) == 0 { + return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty") + } + + var pid, sid UniqueID + result := &DeleteData{} + + var blobList BlobList = blobs + sort.Sort(blobList) + var memSize int64 = 0 + + for i, blob := range blobList { + binlogReader, err := NewBinlogReader(blob.Value) + if err != nil { + return InvalidUniqueID, InvalidUniqueID, nil, err + } + pid, sid = binlogReader.PartitionID, binlogReader.SegmentID + + eventReader, err := binlogReader.NextEventReader() + if err != nil { + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, nil, err + } + if eventReader == nil { + break + } + int64data, _, err := eventReader.GetInt64FromPayload() + if err != nil { + eventReader.Close() + binlogReader.Close() + return InvalidUniqueID, InvalidUniqueID, nil, err + } + for _, d := range int64data { + if i == 0 { + pk := &Int64PrimaryKey{Value: d} + result.Pks = append(result.Pks, pk) + memSize += pk.Size() + } else if i == 1 { + result.Tss = append(result.Tss, Timestamp(d)) + memSize += int64(8) + } + } + eventReader.Close() + binlogReader.Close() + } + + if len(result.Tss) != len(result.Pks) { + return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the length of pks and tss should be the same") + } + result.RowCount = int64(len(result.Pks)) + result.memSize = memSize + return pid, sid, result, nil +} + // DataDefinitionCodec serializes and deserializes the data definition // Blob key example: // ${tenant}/data_definition_log/${collection_id}/ts/${log_idx} diff --git a/internal/storage/data_codec_test.go b/internal/storage/data_codec_test.go index b37886cd20a00..f198cad0b7fd9 100644 --- a/internal/storage/data_codec_test.go +++ b/internal/storage/data_codec_test.go @@ -597,6 +597,30 @@ func TestDeleteCodec(t *testing.T) { }) } +func TestDeleteCodecV2(t *testing.T) { + t.Run("int64 pk", func(t *testing.T) { + deleteCodec := NewDeleteCodec() + pk1 := &Int64PrimaryKey{ + Value: 1, + } + deleteData := NewDeleteData([]PrimaryKey{pk1}, []uint64{43757345}) + + pk2 := &Int64PrimaryKey{ + Value: 2, + } + deleteData.Append(pk2, 23578294723) + blob, err := deleteCodec.SerializeV2(CollectionID, 1, 1, deleteData) + assert.NoError(t, err) + assert.Equal(t, 2, len(blob)) + + pid, sid, data, err := deleteCodec.DeserializeV2(blob) + assert.NoError(t, err) + assert.Equal(t, pid, int64(1)) + assert.Equal(t, sid, int64(1)) + assert.Equal(t, data, deleteData) + }) +} + func TestUpgradeDeleteLog(t *testing.T) { t.Run("normal", func(t *testing.T) { binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1) diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 7080de02ae850..0dffeba9ff83f 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -67,7 +67,6 @@ func (crr *compositeBinlogRecordReader) iterateNextBatch() error { if err != nil { return err } - crr.fields[i] = reader.FieldID // TODO: assert schema being the same in every blobs crr.r.schema[reader.FieldID] = reader.PayloadDataType @@ -420,6 +419,7 @@ type DeltalogStreamWriter struct { collectionID UniqueID partitionID UniqueID segmentID UniqueID + fieldSchema *schemapb.FieldSchema memorySize int // To be updated on the fly buf bytes.Buffer @@ -430,17 +430,24 @@ func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) { if dsw.rw != nil { return dsw.rw, nil } - - rw, err := newSingleFieldRecordWriter(0, arrow.Field{ - Name: "delta", - Type: arrow.BinaryTypes.String, - Nullable: false, - }, &dsw.buf) - if err != nil { - return nil, err + switch dsw.fieldSchema.DataType { + case schemapb.DataType_Int64, schemapb.DataType_VarChar: + dim, _ := typeutil.GetDim(dsw.fieldSchema) + rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{ + Name: dsw.fieldSchema.Name, + Type: serdeMap[dsw.fieldSchema.DataType].arrowType(int(dim)), + Nullable: false, // No nullable check here. + }, &dsw.buf) + if err != nil { + return nil, err + } + dsw.rw = rw + return rw, nil + default: + return nil, merr.WrapErrServiceInternal(fmt.Sprintf( + "does not support delta log primary key data type %s", + dsw.fieldSchema.DataType.String())) } - dsw.rw = rw - return rw, nil } func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) { @@ -474,6 +481,7 @@ func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error { de.CollectionID = dsw.collectionID de.PartitionID = dsw.partitionID de.SegmentID = dsw.segmentID + de.FieldID = dsw.fieldSchema.FieldID de.StartTimestamp = 0 de.EndTimestamp = 0 de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize)) @@ -502,6 +510,11 @@ func NewDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *Del collectionID: collectionID, partitionID: partitionID, segmentID: segmentID, + fieldSchema: &schemapb.FieldSchema{ + FieldID: common.RowIDField, + Name: "delta", + DataType: schemapb.DataType_String, + }, } } @@ -542,3 +555,102 @@ func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *De return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), schema, field2Col), memorySize, nil }, batchSize), nil } + +func NewDeltalogStreamWriterV2(collectionID, partitionID, segmentID UniqueID) map[FieldID]*DeltalogStreamWriter { + dws := make(map[FieldID]*DeltalogStreamWriter, 2) + dws[common.RowIDField] = &DeltalogStreamWriter{ + collectionID: collectionID, + partitionID: partitionID, + segmentID: segmentID, + fieldSchema: &schemapb.FieldSchema{ + FieldID: common.RowIDField, + Name: common.RowIDFieldName, + DataType: schemapb.DataType_Int64, + }, + } + dws[common.TimeStampField] = &DeltalogStreamWriter{ + collectionID: collectionID, + partitionID: partitionID, + segmentID: segmentID, + fieldSchema: &schemapb.FieldSchema{ + FieldID: common.TimeStampField, + Name: common.TimeStampFieldName, + DataType: schemapb.DataType_Int64, + }, + } + return dws +} + +func NewDeltalogSerializeWriterV2(partitionID, segmentID UniqueID, eventWriters map[FieldID]*DeltalogStreamWriter, batchSize int, +) (*SerializeWriter[*DeleteLog], error) { + rws := make(map[FieldID]RecordWriter, len(eventWriters)) + for fid := range eventWriters { + w := eventWriters[fid] + rw, err := w.GetRecordWriter() + if err != nil { + return nil, err + } + rws[fid] = rw + } + compositeRecordWriter := newCompositeRecordWriter(rws) + return NewSerializeRecordWriter(compositeRecordWriter, func(v []*DeleteLog) (Record, uint64, error) { + array.NewInt64Builder(memory.DefaultAllocator) + builders := [2]*array.Int64Builder{ + array.NewInt64Builder(memory.DefaultAllocator), + array.NewInt64Builder(memory.DefaultAllocator), + } + var memorySize uint64 + for _, vv := range v { + pk := vv.Pk.GetValue().(int64) + builders[0].Append(pk) + memorySize += uint64(arrow.Int64SizeBytes) + + ts := int64(vv.Ts) + builders[1].Append(ts) + memorySize += uint64(arrow.Int64SizeBytes) + } + arrays := []arrow.Array{builders[0].NewArray(), builders[1].NewArray()} + fields := []arrow.Field{ + { + Name: common.RowIDFieldName, + Type: &arrow.Int64Type{}, + Nullable: false, + }, + { + Name: common.TimeStampFieldName, + Type: &arrow.Int64Type{}, + Nullable: false, + }, + } + field2Col := map[FieldID]int{ + common.RowIDField: 0, + common.TimeStampField: 1, + } + schema := map[FieldID]schemapb.DataType{ + common.RowIDField: schemapb.DataType_Int64, + common.TimeStampField: schemapb.DataType_Int64, + } + return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), schema, field2Col), memorySize, nil + }, batchSize), nil +} + +func NewDeltalogDeserializeReaderV2(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) { + reader, err := newCompositeBinlogRecordReader(blobs) + if err != nil { + return nil, err + } + return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error { + Tss := r.Column(common.TimeStampField).(*array.Int64) + Pks := r.Column(common.RowIDField).(*array.Int64) + if Tss.Len() != Pks.Len() { + return fmt.Errorf("the length of primary keys and timestamps should be the same for delta log") + } + for i := 0; i < r.Len(); i++ { + if v[i] == nil { + v[i] = &DeleteLog{} + } + v[i] = NewDeleteLog(&Int64PrimaryKey{Value: Pks.Value(i)}, uint64(Tss.Value(i))) + } + return nil + }), nil +} diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index 1b0d7ad88fe12..bc79021fba937 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -239,7 +239,7 @@ func TestNull(t *testing.T) { }) } -func generateTestDeltalogData(size int) (*Blob, error) { +func generateTestDeltalogData(size int, useNewFormat ...bool) ([]*Blob, error) { codec := NewDeleteCodec() pks := make([]int64, size) tss := make([]uint64, size) @@ -251,7 +251,11 @@ func generateTestDeltalogData(size int) (*Blob, error) { for i := range pks { data.Append(NewInt64PrimaryKey(pks[i]), tss[i]) } - return codec.Serialize(0, 0, 0, data) + if len(useNewFormat) > 0 { + return codec.SerializeV2(0, 0, 0, data) + } + blob, err := codec.Serialize(0, 0, 0, data) + return []*Blob{blob}, err } func assertTestDeltalogData(t *testing.T, i int, value *DeleteLog) { @@ -272,7 +276,7 @@ func TestDeltalogDeserializeReader(t *testing.T) { size := 3 blob, err := generateTestDeltalogData(size) assert.NoError(t, err) - reader, err := NewDeltalogDeserializeReader([]*Blob{blob}) + reader, err := NewDeltalogDeserializeReader(blob) assert.NoError(t, err) defer reader.Close() @@ -302,7 +306,7 @@ func TestDeltalogSerializeWriter(t *testing.T) { size := 16 blob, err := generateTestDeltalogData(size) assert.NoError(t, err) - reader, err := NewDeltalogDeserializeReader([]*Blob{blob}) + reader, err := NewDeltalogDeserializeReader(blob) assert.NoError(t, err) defer reader.Close() @@ -343,3 +347,84 @@ func TestDeltalogSerializeWriter(t *testing.T) { } }) } + +func TestDeltalogV2(t *testing.T) { + t.Run("test empty data", func(t *testing.T) { + reader, err := NewDeltalogDeserializeReaderV2(nil) + assert.NoError(t, err) + defer reader.Close() + err = reader.Next() + assert.Equal(t, io.EOF, err) + }) + + t.Run("test deserialize v2", func(t *testing.T) { + size := 3 + blob, err := generateTestDeltalogData(size, true) + assert.NoError(t, err) + reader, err := NewDeltalogDeserializeReaderV2(blob) + assert.NoError(t, err) + defer reader.Close() + + for i := 0; i < size; i++ { + err = reader.Next() + assert.NoError(t, err) + + value := reader.Value() + assertTestDeltalogData(t, i, value) + } + + err = reader.Next() + assert.Equal(t, io.EOF, err) + }) + + t.Run("test serialize v2", func(t *testing.T) { + size := 16 + blobs, err := generateTestDeltalogData(size, true) + assert.NoError(t, err) + reader, err := NewDeltalogDeserializeReaderV2(blobs) + assert.NoError(t, err) + defer reader.Close() + + // Copy write the generated data + writers := NewDeltalogStreamWriterV2(0, 0, 0) + writer, err := NewDeltalogSerializeWriterV2(0, 0, writers, 7) + assert.NoError(t, err) + + for i := 0; i < size; i++ { + err = reader.Next() + assert.NoError(t, err) + + value := reader.Value() + assertTestDeltalogData(t, i, value) + err := writer.Write(value) + assert.NoError(t, err) + } + + err = reader.Next() + assert.Equal(t, io.EOF, err) + err = writer.Close() + assert.NoError(t, err) + + // Read from the written data + newblobs := make([]*Blob, len(writers)) + i := 0 + for _, w := range writers { + blob, err := w.Finalize() + assert.NoError(t, err) + assert.NotNil(t, blob) + newblobs[i] = blob + i++ + } + // assert.Equal(t, blobs[0].Value, newblobs[0].Value) + reader, err = NewDeltalogDeserializeReaderV2(blobs) + assert.NoError(t, err) + defer reader.Close() + for i := 0; i < size; i++ { + err = reader.Next() + assert.NoError(t, err, i) + + value := reader.Value() + assertTestDeltalogData(t, i, value) + } + }) +}