Skip to content

Commit

Permalink
add delta log stream new format reader and writer
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
  • Loading branch information
shaoting-huang committed Jun 25, 2024
1 parent 380d3f4 commit 09b1394
Show file tree
Hide file tree
Showing 5 changed files with 358 additions and 16 deletions.
5 changes: 4 additions & 1 deletion internal/storage/binlog_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,11 +294,14 @@ func NewInsertBinlogWriter(dataType schemapb.DataType, collectionID, partitionID
}

// NewDeleteBinlogWriter creates DeleteBinlogWriter to write binlog file.
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64) *DeleteBinlogWriter {
func NewDeleteBinlogWriter(dataType schemapb.DataType, collectionID, partitionID, segmentID int64, FieldID ...int64) *DeleteBinlogWriter {
descriptorEvent := newDescriptorEvent()
descriptorEvent.PayloadDataType = dataType
descriptorEvent.CollectionID = collectionID
descriptorEvent.PartitionID = partitionID
if len(FieldID) > 0 {
descriptorEvent.FieldID = FieldID[0]
}
descriptorEvent.SegmentID = segmentID
w := &DeleteBinlogWriter{
baseBinlogWriter: baseBinlogWriter{
Expand Down
118 changes: 118 additions & 0 deletions internal/storage/data_codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"math"
"sort"
"strconv"

"github.com/milvus-io/milvus-proto/go-api/v2/schemapb"
"github.com/milvus-io/milvus/internal/proto/etcdpb"
Expand Down Expand Up @@ -770,6 +771,67 @@ func (deleteCodec *DeleteCodec) Serialize(collectionID UniqueID, partitionID Uni
return blob, nil
}

// Serialize transfer delete data to blob. .
// For each delete message, it will save pk and ts string to binlog separately,
// to avoid json marshal.
func (deleteCodec *DeleteCodec) SerializeV2(collectionID UniqueID, partitionID UniqueID, segmentID UniqueID, data *DeleteData) ([]*Blob, error) {
var blobs []*Blob
var writer *DeleteBinlogWriter
length := len(data.Pks)
if length != len(data.Tss) {
return nil, fmt.Errorf("the length of pks, and TimeStamps is not equal")
}
var startTs, endTs Timestamp
startTs, endTs = math.MaxUint64, 0
for _, ts := range data.Tss {
if ts < startTs {
startTs = ts
}
if ts > endTs {
endTs = ts
}
}
for _, blobKey := range []FieldID{common.RowIDField, common.TimeStampField} {
writer = NewDeleteBinlogWriter(schemapb.DataType_Int64, collectionID, partitionID, segmentID, blobKey)
eventWriter, err := writer.NextDeleteEventWriter()
if err != nil {
writer.Close()
return nil, err
}
var int64data []int64
for i := 0; i < length; i++ {
switch blobKey {
case common.RowIDField:
int64data = append(int64data, data.Pks[i].GetValue().(int64))
case common.TimeStampField:
int64data = append(int64data, int64(data.Tss[i]))
}
}
err = eventWriter.AddInt64ToPayload(int64data, nil)
if err != nil {
return nil, err
}
eventWriter.SetEventTimestamp(startTs, endTs)
writer.SetEventTimeStamp(startTs, endTs)
writer.AddExtra(originalSizeKey, fmt.Sprintf("%v", binary.Size(int64data)))
err = writer.Finish()
if err != nil {
return nil, err
}
buffer, err := writer.GetBuffer()
if err != nil {
return nil, err
}
blobs = append(blobs, &Blob{
Key: strconv.Itoa(int(blobKey)),
Value: buffer,
})
eventWriter.Close()
writer.Close()
}
return blobs, nil
}

// Deserialize deserializes the deltalog blobs into DeleteData
func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) {
if len(blobs) == 0 {
Expand Down Expand Up @@ -826,6 +888,62 @@ func (deleteCodec *DeleteCodec) Deserialize(blobs []*Blob) (partitionID UniqueID
return pid, sid, result, nil
}

// Deserialize deserializes the deltalog blobs into DeleteData
func (deleteCodec *DeleteCodec) DeserializeV2(blobs []*Blob) (partitionID UniqueID, segmentID UniqueID, data *DeleteData, err error) {
if len(blobs) == 0 {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("blobs is empty")
}

var pid, sid UniqueID
result := &DeleteData{}

var blobList BlobList = blobs
sort.Sort(blobList)
var memSize int64 = 0

for i, blob := range blobList {
binlogReader, err := NewBinlogReader(blob.Value)
if err != nil {
return InvalidUniqueID, InvalidUniqueID, nil, err
}
pid, sid = binlogReader.PartitionID, binlogReader.SegmentID

eventReader, err := binlogReader.NextEventReader()
if err != nil {
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
if eventReader == nil {
break
}
int64data, _, err := eventReader.GetInt64FromPayload()
if err != nil {
eventReader.Close()
binlogReader.Close()
return InvalidUniqueID, InvalidUniqueID, nil, err
}
for _, d := range int64data {
if i == 0 {
pk := &Int64PrimaryKey{Value: d}
result.Pks = append(result.Pks, pk)
memSize += pk.Size()
} else if i == 1 {
result.Tss = append(result.Tss, Timestamp(d))
memSize += int64(8)
}
}
eventReader.Close()
binlogReader.Close()
}

if len(result.Tss) != len(result.Pks) {
return InvalidUniqueID, InvalidUniqueID, nil, fmt.Errorf("the length of pks and tss should be the same")
}
result.RowCount = int64(len(result.Pks))
result.memSize = memSize
return pid, sid, result, nil
}

// DataDefinitionCodec serializes and deserializes the data definition
// Blob key example:
// ${tenant}/data_definition_log/${collection_id}/ts/${log_idx}
Expand Down
24 changes: 24 additions & 0 deletions internal/storage/data_codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,30 @@ func TestDeleteCodec(t *testing.T) {
})
}

func TestDeleteCodecV2(t *testing.T) {
t.Run("int64 pk", func(t *testing.T) {
deleteCodec := NewDeleteCodec()
pk1 := &Int64PrimaryKey{
Value: 1,
}
deleteData := NewDeleteData([]PrimaryKey{pk1}, []uint64{43757345})

pk2 := &Int64PrimaryKey{
Value: 2,
}
deleteData.Append(pk2, 23578294723)
blob, err := deleteCodec.SerializeV2(CollectionID, 1, 1, deleteData)
assert.NoError(t, err)
assert.Equal(t, 2, len(blob))

pid, sid, data, err := deleteCodec.DeserializeV2(blob)
assert.NoError(t, err)
assert.Equal(t, pid, int64(1))
assert.Equal(t, sid, int64(1))
assert.Equal(t, data, deleteData)
})
}

func TestUpgradeDeleteLog(t *testing.T) {
t.Run("normal", func(t *testing.T) {
binlogWriter := NewDeleteBinlogWriter(schemapb.DataType_String, CollectionID, 1, 1)
Expand Down
134 changes: 123 additions & 11 deletions internal/storage/serde_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ func (crr *compositeBinlogRecordReader) iterateNextBatch() error {
if err != nil {
return err
}

crr.fields[i] = reader.FieldID
// TODO: assert schema being the same in every blobs
crr.r.schema[reader.FieldID] = reader.PayloadDataType
Expand Down Expand Up @@ -420,6 +419,7 @@ type DeltalogStreamWriter struct {
collectionID UniqueID
partitionID UniqueID
segmentID UniqueID
fieldSchema *schemapb.FieldSchema

memorySize int // To be updated on the fly
buf bytes.Buffer
Expand All @@ -430,17 +430,24 @@ func (dsw *DeltalogStreamWriter) GetRecordWriter() (RecordWriter, error) {
if dsw.rw != nil {
return dsw.rw, nil
}

rw, err := newSingleFieldRecordWriter(0, arrow.Field{
Name: "delta",
Type: arrow.BinaryTypes.String,
Nullable: false,
}, &dsw.buf)
if err != nil {
return nil, err
switch dsw.fieldSchema.DataType {
case schemapb.DataType_Int64, schemapb.DataType_VarChar:
dim, _ := typeutil.GetDim(dsw.fieldSchema)
rw, err := newSingleFieldRecordWriter(dsw.fieldSchema.FieldID, arrow.Field{
Name: dsw.fieldSchema.Name,
Type: serdeMap[dsw.fieldSchema.DataType].arrowType(int(dim)),
Nullable: false, // No nullable check here.
}, &dsw.buf)
if err != nil {
return nil, err
}
dsw.rw = rw
return rw, nil
default:
return nil, merr.WrapErrServiceInternal(fmt.Sprintf(
"does not support delta log primary key data type %s",
dsw.fieldSchema.DataType.String()))
}
dsw.rw = rw
return rw, nil
}

func (dsw *DeltalogStreamWriter) Finalize() (*Blob, error) {
Expand Down Expand Up @@ -474,6 +481,7 @@ func (dsw *DeltalogStreamWriter) writeDeltalogHeaders(w io.Writer) error {
de.CollectionID = dsw.collectionID
de.PartitionID = dsw.partitionID
de.SegmentID = dsw.segmentID
de.FieldID = dsw.fieldSchema.FieldID
de.StartTimestamp = 0
de.EndTimestamp = 0
de.descriptorEventData.AddExtra(originalSizeKey, strconv.Itoa(dsw.memorySize))
Expand Down Expand Up @@ -502,6 +510,11 @@ func NewDeltalogStreamWriter(collectionID, partitionID, segmentID UniqueID) *Del
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: &schemapb.FieldSchema{
FieldID: common.RowIDField,
Name: "delta",
DataType: schemapb.DataType_String,
},
}
}

Expand Down Expand Up @@ -542,3 +555,102 @@ func NewDeltalogSerializeWriter(partitionID, segmentID UniqueID, eventWriter *De
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(field, nil), arr, int64(len(v))), schema, field2Col), memorySize, nil
}, batchSize), nil
}

func NewDeltalogStreamWriterV2(collectionID, partitionID, segmentID UniqueID) map[FieldID]*DeltalogStreamWriter {
dws := make(map[FieldID]*DeltalogStreamWriter, 2)
dws[common.RowIDField] = &DeltalogStreamWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: &schemapb.FieldSchema{
FieldID: common.RowIDField,
Name: common.RowIDFieldName,
DataType: schemapb.DataType_Int64,
},
}
dws[common.TimeStampField] = &DeltalogStreamWriter{
collectionID: collectionID,
partitionID: partitionID,
segmentID: segmentID,
fieldSchema: &schemapb.FieldSchema{
FieldID: common.TimeStampField,
Name: common.TimeStampFieldName,
DataType: schemapb.DataType_Int64,
},
}
return dws
}

func NewDeltalogSerializeWriterV2(partitionID, segmentID UniqueID, eventWriters map[FieldID]*DeltalogStreamWriter, batchSize int,
) (*SerializeWriter[*DeleteLog], error) {
rws := make(map[FieldID]RecordWriter, len(eventWriters))
for fid := range eventWriters {
w := eventWriters[fid]
rw, err := w.GetRecordWriter()
if err != nil {
return nil, err
}
rws[fid] = rw
}
compositeRecordWriter := newCompositeRecordWriter(rws)
return NewSerializeRecordWriter(compositeRecordWriter, func(v []*DeleteLog) (Record, uint64, error) {
array.NewInt64Builder(memory.DefaultAllocator)
builders := [2]*array.Int64Builder{
array.NewInt64Builder(memory.DefaultAllocator),
array.NewInt64Builder(memory.DefaultAllocator),
}
var memorySize uint64
for _, vv := range v {
pk := vv.Pk.GetValue().(int64)
builders[0].Append(pk)
memorySize += uint64(arrow.Int64SizeBytes)

ts := int64(vv.Ts)
builders[1].Append(ts)
memorySize += uint64(arrow.Int64SizeBytes)
}
arrays := []arrow.Array{builders[0].NewArray(), builders[1].NewArray()}
fields := []arrow.Field{
{
Name: common.RowIDFieldName,
Type: &arrow.Int64Type{},
Nullable: false,
},
{
Name: common.TimeStampFieldName,
Type: &arrow.Int64Type{},
Nullable: false,
},
}
field2Col := map[FieldID]int{
common.RowIDField: 0,
common.TimeStampField: 1,
}
schema := map[FieldID]schemapb.DataType{
common.RowIDField: schemapb.DataType_Int64,
common.TimeStampField: schemapb.DataType_Int64,
}
return newSimpleArrowRecord(array.NewRecord(arrow.NewSchema(fields, nil), arrays, int64(len(v))), schema, field2Col), memorySize, nil
}, batchSize), nil
}

func NewDeltalogDeserializeReaderV2(blobs []*Blob) (*DeserializeReader[*DeleteLog], error) {
reader, err := newCompositeBinlogRecordReader(blobs)
if err != nil {
return nil, err
}
return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error {
Tss := r.Column(common.TimeStampField).(*array.Int64)
Pks := r.Column(common.RowIDField).(*array.Int64)
if Tss.Len() != Pks.Len() {
return fmt.Errorf("the length of primary keys and timestamps should be the same for delta log")
}
for i := 0; i < r.Len(); i++ {
if v[i] == nil {
v[i] = &DeleteLog{}
}
v[i] = NewDeleteLog(&Int64PrimaryKey{Value: Pks.Value(i)}, uint64(Tss.Value(i)))
}
return nil
}), nil
}
Loading

0 comments on commit 09b1394

Please sign in to comment.