Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: add delta log stream new format reader and writer #34116

Merged
merged 1 commit into from
Jul 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions internal/storage/event_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,11 @@ const (
nullableKey = "nullable"
)

const version = "version"

// mark useMultiFieldFormat if there are multi fields in a log file
const MULTI_FIELD = "MULTI_FIELD"

type descriptorEventData struct {
DescriptorEventDataFixPart
ExtraLength int32
Expand Down
10 changes: 10 additions & 0 deletions internal/storage/event_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ func newDescriptorEvent() *descriptorEvent {
}
}

func NewBaseDescriptorEvent(collectionID int64, partitionID int64, segmentID int64) *descriptorEvent {
de := newDescriptorEvent()
de.CollectionID = collectionID
de.PartitionID = partitionID
de.SegmentID = segmentID
de.StartTimestamp = 0
de.EndTimestamp = 0
return de
}

func newInsertEventWriter(dataType schemapb.DataType, nullable bool, dim ...int) (*insertEventWriter, error) {
var payloadWriter PayloadWriterInterface
var err error
Expand Down
60 changes: 59 additions & 1 deletion internal/storage/serde.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

type Record interface {
Schema() map[FieldID]schemapb.DataType
ArrowSchema() *arrow.Schema
Column(i FieldID) arrow.Array
Len() int
Release()
Expand Down Expand Up @@ -83,6 +84,14 @@
return r.schema
}

func (r *compositeRecord) ArrowSchema() *arrow.Schema {
var fields []arrow.Field
for _, rec := range r.recs {
fields = append(fields, rec.Schema().Field(0))
}
return arrow.NewSchema(fields, nil)
}

type serdeEntry struct {
// arrowType returns the arrow type for the given dimension
arrowType func(int) arrow.DataType
Expand Down Expand Up @@ -575,6 +584,10 @@
return r.schema
}

func (r *selectiveRecord) ArrowSchema() *arrow.Schema {
return r.r.ArrowSchema()
}

func (r *selectiveRecord) Column(i FieldID) arrow.Array {
if i == r.selectedFieldId {
return r.r.Column(i)
Expand Down Expand Up @@ -663,9 +676,10 @@

func newSingleFieldRecordWriter(fieldId FieldID, field arrow.Field, writer io.Writer) (*singleFieldRecordWriter, error) {
schema := arrow.NewSchema([]arrow.Field{field}, nil)

// use writer properties as same as payload writer's for now
fw, err := pqarrow.NewFileWriter(schema, writer,
parquet.NewWriterProperties(
parquet.WithMaxRowGroupLength(math.MaxInt64), // No additional grouping for now.
parquet.WithCompression(compress.Codecs.Zstd),
parquet.WithCompressionLevel(3)),
pqarrow.DefaultWriterProps())
Expand All @@ -679,6 +693,46 @@
}, nil
}

var _ RecordWriter = (*multiFieldRecordWriter)(nil)

type multiFieldRecordWriter struct {
fw *pqarrow.FileWriter
fieldIds []FieldID
schema *arrow.Schema

numRows int
}

func (mfw *multiFieldRecordWriter) Write(r Record) error {
mfw.numRows += r.Len()
columns := make([]arrow.Array, len(mfw.fieldIds))
for i, fieldId := range mfw.fieldIds {
columns[i] = r.Column(fieldId)
}
rec := array.NewRecord(mfw.schema, columns, int64(r.Len()))
shaoting-huang marked this conversation as resolved.
Show resolved Hide resolved
defer rec.Release()
return mfw.fw.WriteBuffered(rec)
}

func (mfw *multiFieldRecordWriter) Close() {
mfw.fw.Close()
}

func newMultiFieldRecordWriter(fieldIds []FieldID, fields []arrow.Field, writer io.Writer) (*multiFieldRecordWriter, error) {
schema := arrow.NewSchema(fields, nil)
fw, err := pqarrow.NewFileWriter(schema, writer,
parquet.NewWriterProperties(parquet.WithMaxRowGroupLength(math.MaxInt64)), // No additional grouping for now.
shaoting-huang marked this conversation as resolved.
Show resolved Hide resolved
shaoting-huang marked this conversation as resolved.
Show resolved Hide resolved
pqarrow.DefaultWriterProps())
if err != nil {
return nil, err

Check warning on line 727 in internal/storage/serde.go

View check run for this annotation

Codecov / codecov/patch

internal/storage/serde.go#L727

Added line #L727 was not covered by tests
}
return &multiFieldRecordWriter{
fw: fw,
fieldIds: fieldIds,
schema: schema,
}, nil
}

type SerializeWriter[T any] struct {
rw RecordWriter
serializer Serializer[T]
Expand Down Expand Up @@ -773,6 +827,10 @@
sr.r.Release()
}

func (sr *simpleArrowRecord) ArrowSchema() *arrow.Schema {
return sr.r.Schema()
}

func newSimpleArrowRecord(r arrow.Record, schema map[FieldID]schemapb.DataType, field2Col map[FieldID]int) *simpleArrowRecord {
return &simpleArrowRecord{
r: r,
Expand Down
Loading
Loading