diff --git a/internal/storage/serde_events.go b/internal/storage/serde_events.go index 54f41f8aa066..487e00ac4ac3 100644 --- a/internal/storage/serde_events.go +++ b/internal/storage/serde_events.go @@ -792,11 +792,11 @@ func NewDeltalogMultiFieldWriter(partitionID, segmentID UniqueID, eventWriter *M arr := []arrow.Array{builder.Field(0).NewArray(), builder.Field(1).NewArray()} field2Col := map[FieldID]int{ - common.RowIDField: 0, + common.DeltaPkField: 0, common.TimeStampField: 1, } schema := map[FieldID]schemapb.DataType{ - common.RowIDField: pkType, + common.DeltaPkField: pkType, common.TimeStampField: schemapb.DataType_Int64, } return newSimpleArrowRecord(array.NewRecord(arrowSchema, arr, int64(len(v))), schema, field2Col), memorySize, nil @@ -809,41 +809,39 @@ func NewDeltalogMultiFieldReader(blobs []*Blob) (*DeserializeReader[*DeleteLog], return nil, err } return NewDeserializeReader(reader, func(r Record, v []*DeleteLog) error { - for i := 0; i < r.Len(); i++ { - if v[i] == nil { - v[i] = &DeleteLog{} - } - } rec, ok := r.(*simpleArrowRecord) if !ok { return fmt.Errorf("can not cast to simple arrow record") } - for i, fid := range rec.r.Schema().Fields() { - a := r.Column(int64(i)) - + fields := rec.r.Schema().Fields() + for i, fid := range fields { switch fid.Name { case "pk": switch fid.Type.ID() { case arrow.INT64: - arr := a.(*array.Int64) - for i := 0; i < a.Len(); i++ { - v[i].Pk = &Int64PrimaryKey{Value: arr.Value(i)} - v[i].PkType = int64(schemapb.DataType_Int64) + arr := r.Column(int64(i)).(*array.Int64) + for j := 0; j < r.Len(); j++ { + if v[j] == nil { + v[j] = &DeleteLog{} + } + v[j].Pk = NewInt64PrimaryKey(arr.Value(j)) } case arrow.STRING: - arr := a.(*array.String) - for i := 0; i < a.Len(); i++ { - v[i].Pk = &VarCharPrimaryKey{Value: arr.Value(i)} - v[i].PkType = int64(schemapb.DataType_VarChar) + arr := r.Column(int64(i)).(*array.String) + for j := 0; j < r.Len(); j++ { + if v[j] == nil { + v[j] = &DeleteLog{} + } + v[j].Pk = NewVarCharPrimaryKey(arr.Value(j)) } default: return fmt.Errorf("unexpected delta log pkType %v", fid.Type.Name()) } case "ts": - arr := a.(*array.Int64) - for i := 0; i < arr.Len(); i++ { - v[i].Ts = uint64(arr.Value(i)) + arr := r.Column(int64(i)).(*array.Int64) + for j := 0; j < r.Len(); j++ { + v[j].Ts = uint64(arr.Value(j)) } default: return fmt.Errorf("unexpected delta log field: %v", fid.Name) @@ -872,16 +870,6 @@ func supportMultiFieldFormat(blobs []*Blob) bool { if err != nil { return false } - er, err := reader.NextEventReader() - defer er.Close() - if err != nil { - return false - } - rr, err := er.GetArrowRecordReader() - defer rr.Release() - if err != nil { - return false - } version := reader.descriptorEventData.Extras[version] return version != nil && version.(string) == MULTI_FIELD } diff --git a/internal/storage/serde_events_test.go b/internal/storage/serde_events_test.go index 1123e1a1abbe..2344524e7a50 100644 --- a/internal/storage/serde_events_test.go +++ b/internal/storage/serde_events_test.go @@ -393,7 +393,7 @@ func TestDeltalogPkTsSeparateFormat(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, []*schemapb.FieldSchema{ - {FieldID: common.RowIDField, Name: "pk", DataType: tc.pkType}, + {FieldID: common.DeltaPkField, Name: "pk", DataType: tc.pkType}, {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, }) writer, err := NewDeltalogMultiFieldWriter(0, 0, eventWriter, 7) @@ -506,7 +506,7 @@ func BenchmarkDeltalogFormat(b *testing.B) { b.ResetTimer() for i := 0; i < b.N; i++ { eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, []*schemapb.FieldSchema{ - {FieldID: common.RowIDField, Name: "pk", DataType: schemapb.DataType_Int64}, + {FieldID: common.DeltaPkField, Name: "pk", DataType: schemapb.DataType_Int64}, {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, }) writer, _ := NewDeltalogMultiFieldWriter(0, 0, eventWriter, size) @@ -524,7 +524,7 @@ func BenchmarkDeltalogFormat(b *testing.B) { func generateTestDeltalogNewFormatData(size int) (*Blob, error) { eventWriter := NewMultiFieldDeltalogStreamWriter(0, 0, 0, []*schemapb.FieldSchema{ - {FieldID: common.RowIDField, Name: "pk", DataType: schemapb.DataType_Int64}, + {FieldID: common.DeltaPkField, Name: "pk", DataType: schemapb.DataType_Int64}, {FieldID: common.TimeStampField, Name: "ts", DataType: schemapb.DataType_Int64}, }) writer, err := NewDeltalogMultiFieldWriter(0, 0, eventWriter, size) diff --git a/pkg/common/common.go b/pkg/common/common.go index 5bd3aabc3c1d..83f4dbbd03e5 100644 --- a/pkg/common/common.go +++ b/pkg/common/common.go @@ -37,6 +37,9 @@ const ( // StartOfUserFieldID represents the starting ID of the user-defined field StartOfUserFieldID = 100 + // DeltaPkField is the ID of the primary key field for delta log + DeltaPkField = -1 + // RowIDField is the ID of the RowID field reserved by the system RowIDField = 0