Skip to content

Commit

Permalink
deltalog stream serde pk and ts separate format
Browse files Browse the repository at this point in the history
Signed-off-by: shaoting-huang <shaoting.huang@zilliz.com>
  • Loading branch information
shaoting-huang committed Jul 5, 2024
1 parent e4cece8 commit f4c6a46
Show file tree
Hide file tree
Showing 6 changed files with 605 additions and 42 deletions.
5 changes: 5 additions & 0 deletions internal/storage/event_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
nullableKey = "nullable"
)

const version = "version"

// mark useMultiFieldFormat if there are multi fields in a log file
const MULTI_FIELD = "MULTI_FIELD"

type descriptorEventData struct {
DescriptorEventDataFixPart
ExtraLength int32
Expand Down
10 changes: 10 additions & 0 deletions internal/storage/event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ func newDescriptorEvent() *descriptorEvent {
}
}

func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int64) *descriptorEvent {
de := newDescriptorEvent()
de.CollectionID = collectionID
de.PartitionID = partitionID
de.SegmentID = segmentID
de.StartTimestamp = 0
de.EndTimestamp = 0
return de
}

func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int) (*insertEventWriter, error) {
var payloadWriter PayloadWriterInterface
var err error
Expand Down
60 changes: 59 additions & 1 deletion internal/storage/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (

type Record interface {
Schema() map[FieldID]schemapb.DataType
ArrowSchema() *arrow.Schema
Column(i FieldID) arrow.Array
Len() int
Release()
Expand Down Expand Up @@ -83,6 +84,14 @@ func (r *compositeRecord) Schema() map[FieldID]schemapb.DataType {
return r.schema
}

func (r *compositeRecord) ArrowSchema() *arrow.Schema {
var fields []arrow.Field
for _, rec := range r.recs {
fields = append(fields, rec.Schema().Field(0))
}
return arrow.NewSchema(fields, nil)
}

type serdeEntry struct {
// arrowType returns the arrow type for the given dimension
arrowType func(int) arrow.DataType
Expand Down Expand Up @@ -575,6 +584,10 @@ func (r *selectiveRecord) Schema() map[FieldID]schemapb.DataType {
return r.schema
}

func (r *selectiveRecord) ArrowSchema() *arrow.Schema {
return r.r.ArrowSchema()
}

func (r *selectiveRecord) Column(i FieldID) arrow.Array {
if i == r.selectedFieldId {
return r.r.Column(i)
Expand Down Expand Up @@ -663,9 +676,10 @@ func (sfw *singleFieldRecordWriter) Close() {

func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Writer) (*singleFieldRecordWriter, error) {
schema := arrow.NewSchema([]arrow.Field{field}, nil)

// use writer properties as same as payload writer's for now
fw, err := pqarrow.NewFileWriter(schema, writer,
parquet.NewWriterProperties(
parquet.WithMaxRowGroupLength(math.MaxInt64), // No additional grouping for now.
parquet.WithCompression(compress.Codecs.Zstd),
parquet.WithCompressionLevel(3)),
pqarrow.DefaultWriterProps())
Expand All @@ -679,6 +693,46 @@ func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Wr
}, nil
}

var _ RecordWriter = (*multiFieldRecordWriter)(nil)

type multiFieldRecordWriter struct {
fw *pqarrow.FileWriter
fieldIds []FieldID
schema *arrow.Schema

numRows int
}

func (mfw *multiFieldRecordWriter) Write(r Record) error {
mfw.numRows += r.Len()
columns := make([]arrow.Array, len(mfw.fieldIds))
for i, fieldId := range mfw.fieldIds {
columns[i] = r.Column(fieldId)
}
rec := array.NewRecord(mfw.schema, columns, int64(r.Len()))
defer rec.Release()
return mfw.fw.WriteBuffered(rec)
}

func (mfw *multiFieldRecordWriter) Close() {
mfw.fw.Close()
}

func newMultiFieldRecordWriter(fieldIds []FieldID, fields []arrow.Field, writer io.Writer) (*multiFieldRecordWriter, error) {
schema := arrow.NewSchema(fields, nil)
fw, err := pqarrow.NewFileWriter(schema, writer,
parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(math.MaxInt64)), // No additional grouping for now.
pqarrow.DefaultWriterProps())
if err != nil {
return nil, err

Check warning on line 727 in internal/storage/serde.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/serde.go#L727

Added line #L727 was not covered by tests
}
return &multiFieldRecordWriter{
fw: fw,
fieldIds: fieldIds,
schema: schema,
}, nil
}

type SerializeWriter[T any] struct {
rw RecordWriter
serializer Serializer[T]
Expand Down Expand Up @@ -773,6 +827,10 @@ func (sr *simpleArrowRecord) Release() {
sr.r.Release()
}

func (sr *simpleArrowRecord) ArrowSchema() *arrow.Schema {
return sr.r.Schema()
}

func newSimpleArrowRecord(r arrow.Record, schema map[FieldID]schemapb.DataType, field2Col map[FieldID]int) *simpleArrowRecord {
return &simpleArrowRecord{
r: r,
Expand Down
Loading

0 comments on commit f4c6a46

Please sign in to comment.